# The aim is to extract all links from the wikitext of an article

In [1]:
import os, sys
import string
import pickle
import pandas as pd
import mwparserfromhell
import re
import urllib
import pyarrow.parquet as pq

import findspark
findspark.init('/usr/lib/spark2')
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T, Window
import wmfdata.spark as wmfspark

spark =wmfspark.get_session(
    type='yarn-large', 
    app_name='Pyspark notebook'
)
spark

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


In [2]:
wiki_id = 'enwiki'
snapshot='2021-01'
PATH_hadoop = "/user/mgerlach/test/"

In [3]:
def normalise_title(title):
    """
    Normalising title (links)
    - deal with quotes
    - strip()
    - '_'--> ' '
    - capitalize first letter
    """
    title = urllib.parse.unquote(title)
    title = title.strip()
    if len(title) > 0:
        title = title[0].upper() + title[1:]
    n_title = title.replace("_", " ")
    if "#" in n_title:
        n_title = n_title.split("#")[0]
    return n_title


def normalise_anchor(anchor):
    #     anchor = urllib.parse.unquote(anchor)
    n_anchor = anchor.strip()  # .replace("_", " ")
    return n_anchor.lower()


def extract_article(row):
    """Extract the content of the article.
    normalize the titles.
    return: pid, title, title_rd, wikitext
    """
    #     redirect = row.page_redirect_title if row.page_redirect_title is not None else ""
    return T.Row(
        pid=row.page_id,
        title=normalise_title(row.page_title),
        title_rd=normalise_title(row.page_redirect_title),
        wikitext=row.revision_text,
    )

links_regex = re.compile(
    r"\[\[(?P<link>[^\n\|\]\[\<\>\{\}]{0,256})(?:\|(?P<anchor>[^\[]*?))?\]\]"
)
def get_links(page):
    '''
    extract all links from the wikitext of an article.
    return page_id, page_title, link, anchor
    '''
    links = []
    for m in links_regex.findall(page.wikitext):
        link = normalise_title(m[0])
        anchor = m[1] if len(m) > 1 and len(m[1]) > 0 else link
        if len(link) > 0:
            links.append(
                T.Row(
                    pid=page.pid,
                    title=page.title,
                    link=link,
                    anchor=normalise_anchor(anchor),
                )
            )
    return links

In [5]:
# all articles in a wiki of main namespace
wikipedia_all = (
    ## select table
    spark.read.table('wmf.mediawiki_wikitext_current')
    ## select wiki project
    .where( F.col('wiki_db') == wiki_id )
    .where( F.col('snapshot') == snapshot )
    ## main namespace
    .where(F.col('page_namespace') == 0 )
    ## no redirect-pages
#     .where(F.col('page_redirect_title')=='')
    .where(F.col('revision_text').isNotNull())
    .where(F.length(F.col('revision_text'))>0)
)

## extracting pid, title, title_rd, and the wikitext (including redirects)
## titles are normalized
wikipedia = spark.createDataFrame(wikipedia_all.rdd.map(extract_article).filter(lambda r: r is not None))

## all articles that are not redirect-pages
articles = (
    wikipedia.where(F.col('title_rd')=='')
    .select(
        'pid',
        'title',
        'wikitext'
    )
)

# extract the links from the articles
links = spark.createDataFrame(
    articles
    .rdd.flatMap(get_links)
)

##make a redirects-table to solve page-titles of redirect-pages
redirects = spark.createDataFrame(
    wikipedia
    .where(F.col('title_rd')!='')
    .rdd.map(lambda r: T.Row(
        title_from=r.title, 
        title_to=r.title_rd)
            )
).distinct()

## join the redirects into the links to resolve page-titles that are redirects
links_resolved = (
    links
    .join(
        redirects,
        links['link'] == redirects['title_from'],
        how='leftouter'
    )
    .select(
        'pid',
        'title',
        'anchor',
        ## resolved link: if redirect use title_to otherwise original link
        F.coalesce(F.col('title_to'),F.col('link') ).alias('link'),
        
    )
)

## filter only those links that are to a main-namespace article in the same wiki
links_resolved_articles = (
    links_resolved
    .join(
        articles.select("title"),
        links_resolved['link']==articles['title'],
        how='leftsemi'
    )
)

## write to parquet
FILE_hadoop = PATH_hadoop+"%s.links_resolved_articles.parquet"%wiki_id
links_resolved_articles.write.mode("overwrite").parquet(FILE_hadoop)

Py4JJavaError: An error occurred while calling o296.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 581 in stage 4.0 failed 4 times, most recent failure: Lost task 581.3 in stage 4.0 (TID 1601, an-worker1094.eqiad.wmnet, executor 113): ExecutorLostFailure (executor 113 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits.  64.1 GB of 45.9 GB virtual memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 33 more


In [None]:
df_tmp = spark.read.load(FILE_hadoop)
df_tmp.show()