In [1]:
# imports
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType,ArrayType
from pyspark.sql.functions import explode
from pyspark.sql.functions import lower

# start spark session  
spark = SparkSession.builder.getOrCreate()

def Task2(File):
    #read XML
    df = spark.read.format('xml').options(rowTag='page').load(File)

    # custom UDF to extract internal links
    def parser_string(string):
        results=[]
        try:
            for m in re.finditer("(\[\[.+?\]\])",string):# links defined as [[link]] 
                link=string[m.start()+2:m.end()-2]
                link=link.lower()
                if "|" in link: # take first link if separated with |
                    link=link.split("|")[0]
                if ((":" not in link or link[0:9]=="category:") & ("#" not in link)):# links cannot contain : unless it starts with category: and links cannot contain a hashtag
                        results+=[link]
            return results
        except:
            return None
    parser_stringUDF = udf(lambda z:parser_string(z),ArrayType(StringType()))

    # add new column "links" of data type array based on the UDF  
    df=df.withColumn("links", parser_stringUDF(col("revision.text._VALUE")))

    # explode the internal link arrays so that every title article 
    # will appear next to each link in a new row
    df=df.select(df.title,explode(df.links).alias('links'))

    # sort both the titles and links ascendingly and print the first 10 rows
    df=df.sort("title","links")
    df=df.withColumn("title", lower(col("title")))

    #save output data frame to tab delimited CSV
    df.coalesce(1).write.mode('overwrite').options(header='False', delimiter="\t") \
     .csv('gs://comsys/task2_full.csv')
    return

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/29 20:47:14 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/04/29 20:47:14 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/04/29 20:47:14 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/04/29 20:47:14 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [2]:
def Task3(File):
    # running task 2
    Task2(File)

    # Reading in file created in task 2
    df = spark.read.format('csv').options(delimiter='\t').load('gs://comsys/task2_full.csv/part-00000*.csv')

    # keeping unique titles/links
    unique=df.dropDuplicates(["_c0","_c1"])

    # converting data frame to RDD
    rdd_unique=unique.rdd
    rdd_unique.cache()

    # grouping value: links by key: title
    rdd_unique=rdd_unique.groupByKey()
    rdd_unique.cache()

    # running page rank algorithm for 10 iterations
    for i in range(10):
        if i==0:
            # add initial rank of one and then calculate value:contribution as rank/# neighbors as a tuple for each key: neighbor link
            contribution = rdd_unique.mapValues(lambda x: (1,(x, len(x)))).flatMap(lambda x: [(i,x[1][0]/x[1][1][1]) for i in x[1][1][0]])
        else:
            #Combine outputed rank and then calculate value:contribution as rank/# neighbors as a tuple for each key: neighbor link
            # Note: initially coded to include all observations and not drop those that don't appear as a neighboring link
            # TA said to exclude these in office hours, non-excluding code commented below:
           
            ###contribution = rank.rightOuterJoin(rdd_unique.mapValues(lambda x: (x, len(x)))).flatMap(lambda x: [(i,1/x[1][1][1]) if x[1][0] == None else (i,x[1][0]/x[1][1][1]) for i in x[1][1][0]])
            contribution = rank.rightOuterJoin(rdd_unique.mapValues(lambda x: (x, len(x)))).flatMap(lambda x: [(i,x[1][0]/x[1][1][1]) for i in x[1][1][0] if x[1][0] != None])

        # Sum all contributions of the pages that link to said neighbor, modify it as 0.15+0.85*contributions
        rank =contribution.reduceByKey(lambda x, y: x + y).mapValues(lambda x: 0.15+0.85*x)

    #Convert output to data frame, sort by descending rank, output top 10 to CSV
    df=rank.toDF()
    df.cache()
    df=df.sort(col("_2").desc()).limit(10)
    df.write.mode('overwrite').options(header='False', delimiter="\t") \
     .csv('gs://comsys/task3.csv')
    return

# initiate task 3
Task3('hdfs:/user/wiki-small.xml')

# stop spark session
spark.stop()

                                                                                