# Pre-processing wikipedia dumps with dask (part 2)
> Part 1 showed how to convert the SQL into dataframes. Further processing those dataframes into usable data.
- toc: true
- branch: master
- badges: true
- comments: false
- author: Nicolas Aspert
- categories: [wikipedia, dask]

After processing the SQL dumps in [part 1](https://dev.clockworkpanda.xyz/wikipedia/dask/2020/12/02/daskwiki.html), we need to further process the data to make it importable in Neo4j. We need to
- find the id of the pages when redirecting
- find the target id of the page in the `pagelinks` table, taking into account page redirections

There are other ways of doing those tasks, and handling redirects can be done directly in Neo4j: you can imagine creating the redirection pages and have links of type `REDIRECTS_TO` in addition to the "normal" links between pages `LINKS_TO`. However, this would add a huge number of relationships in the database (I tried it for you), and makes the queries slower and more complex, without bringing (at least for the studies performed in our lab) additional value. Hence the choice of handling redirections prior import.

Make sure to check the [sparkwiki import guide](https://github.com/epfl-lts2/sparkwiki/tree/master/helpers) to get a global view. The [source code of DumpProcessor](https://github.com/epfl-lts2/sparkwiki/blob/master/src/main/scala/ch/epfl/lts2/wikipedia/DumpProcessor.scala) is also of interest, as we will replicate its functionalities here.

In [1]:
import os
import pandas as pd
import dask
import dask.dataframe as ddf
from dask.distributed import LocalCluster, Client

In [2]:
dask.config.set({'temporary_directory': '/tmp'}) # make sure temp dir is local !

<dask.config.set at 0x7f984190c150>

In [2]:
cluster = LocalCluster(n_workers=6, threads_per_worker=4, memory_limit=24e9) # adapt to your local resources !
client = Client(cluster)

In [5]:
data_path = '/data/wikipedia/20201120'

Read parquet files generated in part 1 from disk

In [4]:
redirect_df = ddf.read_parquet(os.path.join(data_path, 'processed', 'redirect.parquet'))
pages_df = ddf.read_parquet(os.path.join(data_path, 'processed', 'pages.parquet'))
pagelinks_df = ddf.read_parquet(os.path.join(data_path, 'processed', 'pagelinks.parquet'))

Or read my copy from our s3 bucket on switchengines

In [7]:
storage_options={'anon':True, 'client_kwargs':{'endpoint_url':'https://os.unil.cloud.switch.ch'}}

In [3]:
redirect_df = ddf.read_parquet('s3://lts2-wikipedia/enwiki/20201120/redirect.parquet', storage_options=storage_options)
pages_df = ddf.read_parquet('s3://lts2-wikipedia/enwiki/20201120/pages.parquet', storage_options=storage_options)
pagelinks_df = ddf.read_parquet('s3://lts2-wikipedia/enwiki/20201120/pagelinks.parquet', storage_options=storage_options)

Just a quick glance at the dataframe to make sure the data is here

In [5]:
redirect_df.head(10)

Unnamed: 0_level_0,target_ns,title,inter_wiki,fragment
from,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
10,0,Computer_accessibility,'',''
13,0,History_of_Afghanistan,'',''
14,0,Geography_of_Afghanistan,'',''
15,0,Demographics_of_Afghanistan,'',''
18,0,Communications_in_Afghanistan,'',''
19,0,Transport_in_Afghanistan,'',''
20,0,Afghan_Armed_Forces,'',''
21,0,Foreign_relations_of_Afghanistan,'',''
23,0,Assistive_technology,'',''
24,0,Amoeba,'',''


In [6]:
pages_df.head(10)

Unnamed: 0_level_0,namespace,title,is_redirect,is_new
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
10,0,AccessibleComputing,True,False
12,0,Anarchism,False,False
13,0,AfghanistanHistory,True,False
14,0,AfghanistanGeography,True,False
15,0,AfghanistanPeople,True,False
18,0,AfghanistanCommunications,True,False
19,0,AfghanistanTransportations,True,False
20,0,AfghanistanMilitary,True,False
21,0,AfghanistanTransnationalIssues,True,False
23,0,AssistiveTechnology,True,False


In [7]:
pagelinks_df.head(10)

Unnamed: 0,from,namespace,title,from_namespace
0,4748,0,!,0
1,9773,0,!,0
2,15154,0,!,0
3,25213,0,!,0
4,613303,0,!,0
5,1028188,0,!,0
6,1497620,0,!,0
7,2875276,0,!,0
8,2988645,0,!,0
9,4355567,0,!,0


# Merge redirect with pages

We need to find the id of the page we are redirecting to. E.g. page with id=10 'AccessibleComputing' redirects to 'Computer_accessibility'

In [16]:
pages_df[pages_df['title'] == 'Computer_accessibility'].compute()

Unnamed: 0_level_0,namespace,title,is_redirect,is_new
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
411964,0,Computer_accessibility,False,False


Here is how sparkwiki does the join between redirect and page DataFrames. We can skip the join on the namespace as we only kept namespace 0 in our pre-processed DataFrames (sparkwiki was designed to keep categories so additional care needs to be taken to get the proper id).

```
redirectDf.withColumn("id", redirectDf.col("from"))
                .join(pageDf.drop(pageDf.col("title")), "id")
                .select("from", "targetNamespace", "title")
                .withColumn("namespace", redirectDf.col("targetNamespace"))
                .join(pageDf, Seq("title", "namespace")).select("from", "id", "title")
                .as[MergedRedirect]
```
                
Here is the equivalent in python. The `reset_index` calls are needed because we need the indices in the resulting dataframe. Since we only kept namespace 0 pages, we can join using the `title` field only.

In [8]:
redirect_merged = redirect_df.reset_index().merge(pages_df.reset_index(), left_on='title', right_on='title')\
                    .drop(columns=['inter_wiki', 'fragment', 'namespace', 'is_new', 'target_ns'])

We have still have leftover redirects that are not fully resolved

In [9]:
redirect_merged[redirect_merged['is_redirect']].head(20, npartitions=10)

Unnamed: 0,from,title,id,is_redirect
101499,12242480,New_Mexico_State_Road_333,2788393,True
232021,65663584,Senator_Shapiro_(disambiguation),65663584,True
214813,56753023,Ran_Neu-Ner,51045453,True
229234,64811911,Monastery_of_Loukous,65734014,True
220021,59393217,Micro_TDH,59393217,True
231248,65733619,AnTuTu/11_Pro_Max_VS_S20_VS_Xperia_1_ii_VS_Fin...,65733623,True
229301,63009285,Fa_Hai,17826148,True
229302,63371040,Fa_Hai,17826148,True
232399,65521541,Centipetalism,65730750,True
232400,65524209,Centipetalism,65730750,True


Let us just discard them for now...

In [10]:
redirect_merged_filt = redirect_merged[~redirect_merged['is_redirect']]

In [11]:
redirect_merged_filt.to_parquet(os.path.join(data_path, 'processed', 'redirect_merged.parquet'), compression='gzip')

# Merge pagelinks

This is a bit more complicated, two steps are needed.

## Step 1

Find the id of the target page based on the `title` field, quite similar to the redirects we merged before.
Here is the sparkwiki version :

``` val pagelinks_id = pagelinks.join(pages, Seq("title", "namespace"))
                                .select("from", "id", "title", "fromNamespace", "namespace")```

In [12]:
pagelinks_id = pagelinks_df.drop(columns=['namespace', 'from_namespace']).merge(pages_df.drop(columns=['namespace']).reset_index()\
                        .drop(columns=['is_new']), left_on='title', right_on='title')

Since we left out anything not being in namespace 0, a merge on the `title` field is sufficient. 

In [40]:
pagelinks_id.head() # Expensive !

Unnamed: 0,from,title,id,is_redirect
0,364060,!Kung_languages,63813576,True
1,453953,!Kung_languages,63813576,True
2,453979,!Kung_languages,63813576,True
3,453991,!Kung_languages,63813576,True
4,1210028,!Kung_languages,63813576,True


In [32]:
# save the result, can be re-used later

In [41]:
pagelinks_id.to_parquet(os.path.join(data_path, 'processed', 'pagelinks_id.parquet'), compression='gzip')

In [14]:
pagelinks_id[pagelinks_id['from']==774551].head(20)

Unnamed: 0,from,title,id,is_redirect
314,774551,(10397)_1997_SX33,33450953,True
319,774551,(121014)_1999_AJ22,33715493,True
320,774551,(12821)_1996_RG1,33476015,True
321,774551,(136838)_1997_WG22,33716060,True
325,774551,(15901)_1997_RY8,33496407,True
443,774551,(20261)_1998_FM12,33508561,True
591,774551,(246880)_1995_SR54,33737413,True
593,774551,(264290)_1998_SD27,33737645,True
594,774551,(26987)_1997_WP1,33517118,True
602,774551,(31177)_1997_XH11,33538369,True


## Step 2

Now the more tricky part is to resolve redirections. 

```  
val linksDf = pgLinksIdDf.withColumn("inter", pgLinksIdDf.col("id"))
                   .join(redirectDf.withColumn("inter", redirectDf.col("from")).withColumnRenamed("from", "from_r").withColumnRenamed("id", "to_r"), Seq("inter"), "left")
                   .withColumn("dest", when(col("to_r").isNotNull, col("to_r")).otherwise(col("id")))
                   .select("from", "dest")
                   .filter($"from" !== $"dest") // remove self-links
                   .distinct // redirect removal will cause duplicates -> remove them
```

Reload the computed dataframes from disk to avoid computations...

In [13]:
pagelinks_id = ddf.read_parquet(os.path.join(data_path, 'processed', 'pagelinks_id.parquet'))
redirect_merged = ddf.read_parquet(os.path.join(data_path, 'processed', 'redirect_merged.parquet')) 

...or fetch them from S3

In [19]:
# read directly the processed data to avoid recomputation
redirect_merged = ddf.read_parquet('s3://lts2-wikipedia/enwiki/20201120/redirect_merged.parquet', storage_options=storage_options)
pagelinks_id = ddf.read_parquet('s3://lts2-wikipedia/enwiki/20201120/pagelinks_id.parquet', storage_options=storage_options)

### Join pagelinks and redirect

Let us see how our data looks like after the left join

In [15]:
pagelinks_redir_merge = pagelinks_id.merge(redirect_merged.reset_index(), left_on=['id'], right_on=['from'], how='left')

In [16]:
pagelinks_redir_merge.head(20)

Unnamed: 0,from_x,title_x,id_x,is_redirect_x,index,from_y,title_y,id_y,is_redirect_y
0,18301841,.xsi,26107610,True,75539.0,26107610.0,Autodesk_Softimage,1931049.0,False
1,23790514,1001st_Helicopter_Squadron,54167065,True,136648.0,54167065.0,1st_Helicopter_Squadron,8659503.0,False
2,415160,10th_Infantry_Division_(Greece),34514441,True,165820.0,34514441.0,10th_Mechanized_Infantry_Brigade_(Greece),34514432.0,False
3,425895,10th_Infantry_Division_(Greece),34514441,True,165820.0,34514441.0,10th_Mechanized_Infantry_Brigade_(Greece),34514432.0,False
4,3042106,10th_Infantry_Division_(Greece),34514441,True,165820.0,34514441.0,10th_Mechanized_Infantry_Brigade_(Greece),34514432.0,False
5,9540507,10th_Infantry_Division_(Greece),34514441,True,165820.0,34514441.0,10th_Mechanized_Infantry_Brigade_(Greece),34514432.0,False
6,9648456,10th_Infantry_Division_(Greece),34514441,True,165820.0,34514441.0,10th_Mechanized_Infantry_Brigade_(Greece),34514432.0,False
7,9648459,10th_Infantry_Division_(Greece),34514441,True,165820.0,34514441.0,10th_Mechanized_Infantry_Brigade_(Greece),34514432.0,False
8,9648466,10th_Infantry_Division_(Greece),34514441,True,165820.0,34514441.0,10th_Mechanized_Infantry_Brigade_(Greece),34514432.0,False
9,9648476,10th_Infantry_Division_(Greece),34514441,True,165820.0,34514441.0,10th_Mechanized_Infantry_Brigade_(Greece),34514432.0,False


We need to keep id `id_x` column when no redirection exists, and the `id_y` when the page is redirected. This is done by creating a new column (I did not find how to do it using a single statement as the original Scala code does)

In [17]:
# keep redirects where they exist, the original id otherwise
pagelinks_redir_merge['id_merge'] = pagelinks_redir_merge['id_x'].where(pagelinks_redir_merge['id_y'].isnull(), pagelinks_redir_merge['id_y'])

In [18]:
pagelinks_redir_merge_final = pagelinks_redir_merge.drop(columns=['id_x', 'index', 'title_x', 'id_y', 'title_y', 'from_y', 'is_redirect_x', 'is_redirect_y'])\
    .rename(columns={'id_merge':'dest', 'from_x':'from'})

### Self-links removal

The resolution of redirections created self-links

In [20]:
self_links = pagelinks_redir_merge_final[pagelinks_redir_merge_final['from']==pagelinks_redir_merge_final['dest']]

In [22]:
self_links.head()

Unnamed: 0,from,dest
21,34514432,34514432
333,11632630,11632630
3473,63932158,63932158
3575,45449270,45449270
10134,30136816,30136816


Remove the self-links, as they are not useful.

In [23]:
# remove self-links
pagelinks_redir_noself = pagelinks_redir_merge_final[pagelinks_redir_merge_final['from'] != pagelinks_redir_merge_final['dest']]

Last step is to remove links from pages that are redirects, as does sparkwiki:
```
// some redirect pages have regular links -> remove them
    linksDf.withColumn("id", linksDf.col("from"))
           .join(pages, "id")
           .filter($"isRedirect" === false)
           .select("from", "dest")
```


In [30]:
pagelinks_redir_pages = pagelinks_redir_noself.merge(pages_df, left_on='from', right_index=True)\
                                                .drop(columns=['namespace', 'title', 'is_new'])

### Ignore links on redirect pages

Remove links from pages having `is_redirect` is true

In [33]:
pagelinks_redir_clean = pagelinks_redir_pages[~pagelinks_redir_pages['is_redirect']].drop(columns=['is_redirect'])

And finally save the result, always useful to have a checkpoint to restart from if things go bad

In [35]:
pagelinks_redir_clean.to_parquet(os.path.join(data_path, 'processed', 'pagelinks_redir_clean.parquet'), compression='gzip')

### Cleanup - duplicate links removal

If you read carefully, you noticed that the `distinct` call of the first Scala/Spark query. Dask has a [`drop_duplicates`](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.drop_duplicates) that sound to be what we want.

First, let us re-read the processing we did so far from disk to avoid recomputation

In [6]:
pagelinks_redir_clean = ddf.read_parquet(os.path.join(data_path, 'processed', 'pagelinks_redir_clean.parquet'))

As always, our S3 copy is available

In [None]:
pagelinks_redir_clean = ddf.read_parquet('s3://lts2-wikipedia/enwiki/20201120/pagelinks_redir_clean.parquet', storage_options=storage_options)

In [38]:
pagelinks_redir_clean.count().compute()

from    534689528
dest    534689528
dtype: int64

That is almost 535 millions of links.

In [7]:
pagelinks_redir_nodupes = pagelinks_redir_clean.drop_duplicates(['from', 'dest'])

Trying to run the code below might trigger out-of-memory errors and finally the process will fail if your workers do not have more than **55GB to 60GB** memory available ! You have been warned :)  ! A `repartition` call has been added to avoid ending up with a single file.

In [42]:
pagelinks_redir_nodupes.repartition(npartitions=50)\
                       .to_parquet(os.path.join(data_path, 'processed', 'pagelinks_redir_nodupes.parquet'), compression='gzip')

Play it safe, reload from disk ...

In [19]:
pagelinks_redir_nodupes = ddf.read_parquet(os.path.join(data_path, 'processed', 'pagelinks_redir_nodupes.parquet'), engine='pyarrow')

... or from S3

In [None]:
pagelinks_redir_nodupes = ddf.read_parquet('s3://lts2-wikipedia/enwiki/20201120/pagelinks_redir_nodupes.parquet', storage_options=storage_options)

In [16]:
pagelinks_redir_nodupes.count().compute()

from    522219227
dest    522219227
dtype: int64

There was about 15 millions duplicate links removed. Quite worth the computing time

# Conclusions

Dask is a great tool to exploit parallelism of your local computer, without the hassle of setting up Spark. All you have to do is create a conda environment, install dask and you are mostly done. However, while writing this, I compared with a [dockerized spark instance](https://github.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker), running the sparkwiki processing rewritten for pyspark (each worker was allocated 20GB of RAM and 10 cores). It seems that, for now, spark is more efficient. The redirection processing took only a few minutes and at least twice longer using the dask equivalents and no memory trouble was seen, whereas the duplicates removal required a much bigger amount of memory per worker.

Let us compare our results with the pyspark-processed version:

In [12]:
pagelinks_pyspark = ddf.read_parquet(os.path.join(data_path, 'processed', 'pagelinks_ps_clean.parquet'), engine='pyarrow')

In [14]:
pagelinks_pyspark.count().compute()

from    522219227
dest    522219227
dtype: int64

Good news, same amount of data in the pyspark version ! Looks like we managed to process in a similar way.

In [23]:
links_ps = pagelinks_pyspark[pagelinks_pyspark['dest']==1677].compute()

In [24]:
links_dask = pagelinks_redir_nodupes[pagelinks_redir_nodupes['dest']==1677].compute()

In [27]:
links_ps.sort_values('from').head()

Unnamed: 0,from,dest
764,736,1677
743,1676,1677
2063,1688,1677
1167,1689,1677
363,1862,1677


In [43]:
delta = (links_dask.sort_values('from').reset_index()['from']-links_ps.sort_values('from').reset_index()['from'])

In [44]:
delta.min()

0

In [45]:
delta.max()

0