In [1]:
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.functions import explode
from pyspark.sql.functions import lower
from operator import add

## Task2

In [2]:
spark = SparkSession.builder.getOrCreate()
df = spark.read.format('xml').options(rowTag='page').load('hdfs:/enwiki_small.xml')

In [3]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- redirect: string (nullable = true)
 |-- restrictions: string (nullable = true)
 |-- revision: struct (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _deleted: string (nullable = true)
 |    |-- contributor: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- ip: string (nullable = true)
 |    |    |-- username: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- minor: string (nullable = true)
 |    |-- text: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _xml:space: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |-- title: string (nullable = true)



In [3]:
def get_titles(text):
    title_lst = []
    text_lower = text['text']['_VALUE']
    #Titles = re.findall(r'\[\[(.*?)\]\]',text_lower)
    if text_lower is not None:
        #convert text to lower cases
        Titles = re.findall(r'\[\[(.*?)\]\]',text_lower.lower())
        for title in Titles:
            if "#" not in title:
                if ":" in title:
                    if "category:" in title:
                        #strop all title start with empty
                        trim_title=title.split('|')[0].strip()
                        title_lst.append(trim_title)
                else:
                    trim_title=title.split('|')[0].strip()
                    title_lst.append(trim_title)
    return title_lst



In [21]:
def PairLinks(links):
    return links[0], links[1]

In [4]:
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
title_list = udf(lambda y: get_titles(y), ArrayType(StringType()))

In [5]:
revision_list = title_list(col('revision'))
df2 = df.withColumn("revisions",revision_list)

df2_table = explode(df2.revisions)
article_table = df2.select(lower(col('title')),df2_table)
article_table= article_table.orderBy("title","col",ascending=True)
article_table = article_table.select(col("lower(title)").alias("article_name"), col("col").alias("links"))



In [6]:
article_table.show()

+----------------+--------------------+
|    article_name|               links|
+----------------+--------------------+
|"love and theft"|                2001|
|"love and theft"|accidents & accus...|
|"love and theft"|           accordion|
|"love and theft"|            allmusic|
|"love and theft"|   americana (music)|
|"love and theft"|anthology of amer...|
|"love and theft"|        augie meyers|
|"love and theft"|        augie meyers|
|"love and theft"|               banjo|
|"love and theft"|         bass guitar|
|"love and theft"|       billboard 200|
|"love and theft"|       billboard 200|
|"love and theft"|  blender (magazine)|
|"love and theft"|               blues|
|"love and theft"|           bob dylan|
|"love and theft"|           bob dylan|
|"love and theft"|           bob dylan|
|"love and theft"|           bob dylan|
|"love and theft"|           bob dylan|
|"love and theft"|           bob dylan|
+----------------+--------------------+
only showing top 20 rows



## Task3

In [13]:
#caculate contritbuions with corresponding rank for all neighbors
def Contribution(neighbor, rank):
    for links in neighbor:
        yield (links, rank / len(neighbor))

In [15]:
#convert t2 to rdd form table with title and internal links
neighbor_article = article_table.rdd
#for each title find their neighbors
neighbor_table = lines.map(lambda links: PairLinks(links)).distinct().groupByKey().cache()

In [None]:
rank_table = neighbor_table.map(lambda neighbors: (neighbors[0], 1.0))

In [16]:
for iteration in range(10):
    #contributions
    contributions = neighbor_table.join(rank_table).flatMap(lambda neighbor_rank: Contribution(neighbor_rank[1][0], neighbor_rank[1][1]))
    #ranks
    rank_table = contributions.reduceByKey(add).mapValues(lambda article_rank: article_rank * 0.85 + 0.15)

In [None]:
#convert rdd form back to dataframe
final_table = rank_table.toDF()

In [None]:
#order by title and rank
final_table= final_table.orderBy("_1","_2",ascending=True)

In [None]:
final_table.show()

+--------------------+-------------------+
|                  _1|                 _2|
+--------------------+-------------------+
|                    |0.16530696425076816|
|                   !|0.15490812663468756|
|                 !!!| 0.1506720648853998|
|          !karapuri!|0.15260782274534734|
|      !kung language| 0.1512953660490697|
|        !kung people|0.15047510351721316|
|             !wowow!|0.15027220168350128|
|                !xóõ|0.15545032056327937|
|       !xóõ language| 0.1533434083318205|
|                   "|0.15052328784593352|
|                 "*"|0.15141240303084214|
|                 "+"|0.15141240303084214|
|  "a piece of steak"|0.15140337745780408|
|            "a+0(m)"| 0.1512858454888697|
|    "broadway micky"|0.15108184684160522|
|  "crocodile" dundee|0.15061857758320338|
|"do not disturb" ...|0.15035621269746224|
|"dr. death" steve...| 0.1509905725621399|
| "einstein" anderson| 0.1529306624230988|
|  "from hell" letter|0.15102813279112887|
+----------

In [None]:
spark.stop()

RDD reference: https://tinyurl.com/yc48trsk