In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.14.0 pyspark-shell'

In [2]:

   
import regex
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode, lower
from pyspark.sql.types import StringType, ArrayType

spark = SparkSession.builder.getOrCreate()
df = spark.read.format('xml').options(rowTag='page').load('hdfs:/enwiki_small.xml')

def extractLink(text):
    try:
        results = regex.findall(r'\[\[((?:[^[\]]+|(?R))*+)\]\]', text)
    except:
        return []
    output = []
    for res in results:
        for link in res.split('|'):
            if '#' in link:
                continue
            elif ':' in link and 'Category:' not in link:
                continue
            else:
                output.append(link.lower())
                break
    return output


link_udf = udf(lambda text: extractLink(text), ArrayType(StringType()))
new_df = df.withColumn("article", explode(link_udf(col("revision.text._VALUE"))))
new_df = new_df.select(lower(col('title')).alias('title'), 'article').orderBy('title', 'article')
new_df.repartition(10).write.mode("overwrite").option("delimiter", "\t").csv('p1t2_small')

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.databricks#spark-xml_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-87b1707c-81cc-4614-bc4a-5f07dd5cc5ac;1.0
	confs: [default]
	found com.databricks#spark-xml_2.12;0.14.0 in central
	found commons-io#commons-io;2.8.0 in central
	found org.glassfish.jaxb#txw2;2.3.4 in central
	found org.apache.ws.xmlschema#xmlschema-core;2.2.5 in central
:: resolution report :: resolve 306ms :: artifacts dl 8ms
	:: modules in use:
	com.databricks#spark-xml_2.12;0.14.0 from central in [default]
	commons-io#commons-io;2.8.0 from central in [default]
	org.apache.ws.xmlschema#xmlschema-core;2.2.5 from central in [default]
	org.glassfish.jaxb#txw2;2.3.4 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.option("delimiter", "\t").csv('hdfs:/user/root/p1t2_small')


:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.databricks#spark-xml_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b8edc72b-1b31-4667-812a-bcd9853296cf;1.0
	confs: [default]
	found com.databricks#spark-xml_2.12;0.14.0 in central
	found commons-io#commons-io;2.8.0 in central
	found org.glassfish.jaxb#txw2;2.3.4 in central
	found org.apache.ws.xmlschema#xmlschema-core;2.2.5 in central
:: resolution report :: resolve 292ms :: artifacts dl 8ms
	:: modules in use:
	com.databricks#spark-xml_2.12;0.14.0 from central in [default]
	commons-io#commons-io;2.8.0 from central in [default]
	org.apache.ws.xmlschema#xmlschema-core;2.2.5 from central in [default]
	org.glassfish.jaxb#txw2;2.3.4 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|

In [3]:
df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------------+
|                 _c0|                 _c1|
+--------------------+--------------------+
|        forrest gump|afi's 100 years.....|
|      ibm system/360|dynamic address t...|
|intact dilation a...|        samuel alito|
|    han van meegeren|     pieter de hooch|
|      genesis (band)|      no son of mine|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [4]:
df = df.filter("_c1 is not null")
df.count()

                                                                                

4413753

In [9]:
from pyspark.sql.functions import lit

df_count = df.groupBy("_c0").count()
# Set intial rank as 1
new_df = df_count.withColumn("rank", lit(1))
new_df = new_df.rdd.map(lambda x: (x[0], x[1], x[2], x[2]/x[1])).toDF(['article', 'count', 'rank', 'contribution'])

                                                                                

In [10]:
new_df.show()

+--------------------+-----+----+--------------------+
|             article|count|rank|        contribution|
+--------------------+-----+----+--------------------+
|      greek language|  315|   1|0.003174603174603...|
|       james madison|  296|   1|0.003378378378378...|
|georg wilhelm fri...|  527|   1|0.001897533206831...|
|            guernsey|  311|   1|0.003215434083601286|
|indo-iranian lang...|  153|   1|0.006535947712418301|
|hypothetical type...|  406|   1|0.002463054187192...|
|geography of moldova|  148|   1|0.006756756756756757|
|james madison uni...|   94|   1|0.010638297872340425|
|      frederick abel|   69|   1|0.014492753623188406|
|johns hopkins uni...|  391|   1|0.002557544757033...|
| green mountain boys|  106|   1|0.009433962264150943|
|                 hci|   16|   1|              0.0625|
|         java applet|   94|   1|0.010638297872340425|
|       georges perec|  111|   1|0.009009009009009009|
|       gary, indiana|  230|   1|0.004347826086956522|
|         

In [11]:
for _ in range(10):
    new_df = df.join(new_df, df._c0 == new_df.article, 'inner')
    new_df = new_df.groupBy(['_c1']).sum('contribution').rdd.map(lambda x: (x[0], 0.15 + 0.85 * x[1])).toDF(['article', 'rank'])
    new_df = df_count.join(new_df,  new_df.article == df._c0 , 'left')
    # We get null values, we replace them at 0.15 as the base value
    new_df = new_df.na.fill(0.15)
    new_df = new_df.rdd.map(lambda x:(x[0], x[1], x[3], x[3]/x[1])).toDF(['article', 'count', 'rank', 'contribution'])


22/04/30 17:45:32 WARN org.apache.spark.deploy.yarn.YarnAllocator: Container from a bad node: container_1651337730025_0003_01_000002 on host: cluster-5ead-w-1.c.deep-learning-final-345119.internal. Exit status: 134. Diagnostics: [2022-04-30 17:45:32.373]Exception from container-launch.
Container id: container_1651337730025_0003_01_000002
Exit code: 134

[2022-04-30 17:45:32.376]Container exited with a non-zero exit code 134. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
/bin/bash: line 1:  4122 Aborted                 /usr/lib/jvm/temurin-8-jdk-amd64/bin/java -server -Xmx5739m -Djava.io.tmpdir=/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1651337730025_0003/container_1651337730025_0003_01_000002/tmp '-Dspark.driver.port=39507' '-Dspark.ui.port=0' '-Dspark.rpc.message.maxSize=512' -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/userlogs/application_1651337730025_0003/container_1651337730025_0003_01_000002 -XX:OnOutOfMemoryError='kill %p' org.apach

In [12]:
new_df.show()

+-------------+-----+-------------------+--------------------+
|      article|count|               rank|        contribution|
+-------------+-----+-------------------+--------------------+
|           0s|  317|0.16250646260195992|5.126386832869398E-4|
|    1 e+12 m²|  161| 0.2864059071754309|0.001778918678108...|
|    1 e+15 m²|   12|0.30567320993672553|0.025472767494727128|
|     1 e+7 m²|   58|0.15637317352616037|0.002696089198726903|
|     1 e-1 m²|   23|0.17978600746317697|0.007816782933181608|
|     1 e-1 m³|   16| 0.1566060235157928| 0.00978787646973705|
|    1 e-25 kg|    1|               0.15|                0.15|
|      1 e-5 s|    1|               0.15|                0.15|
|      1 e13 s|    1|0.15166483220817573| 0.15166483220817573|
|       1 e3 s|    1|0.15461882671616706| 0.15461882671616706|
|      1 e39 s|    1|               0.15|                0.15|
|       1 e8 s|    1|               0.15|                0.15|
|   1 exametre|   23|0.27749999999999997|0.012065217391

In [13]:
new_df = new_df.select('article', 'rank').orderBy('article', 'rank')
new_df.show()




+--------------------+-------------------+
|             article|               rank|
+--------------------+-------------------+
|    "love and theft"|               0.15|
|         'ndrangheta|0.17348307397432003|
|    's-hertogenbosch|0.18902817716794112|
|                -gry|0.15028475762130802|
|...baby one more ...|0.15243544668825165|
|          0 (number)|0.41265480254607023|
|007 (disambiguation)|               0.15|
|                  0s|0.16250646260195992|
|               0s bc|0.15114233855850528|
|                   1|0.19440862683036556|
|          1 (number)|0.29031988274554066|
|                1 bc|0.24361567678356452|
|        1 centimetre|0.15289772727272727|
|       1 corinthians|0.17609912517881804|
|         1 decametre|               0.15|
|         1 decimetre|               0.15|
|            1 e+1 m²| 0.1789043717799753|
|           1 e+10 m²|0.15216153250183354|
|           1 e+11 m²|0.15652478115302815|
|           1 e+12 m²| 0.2864059071754309|
+----------

                                                                                

In [14]:
new_df.repartition(10).write.option("delimiter", "\t").csv('pagerank_small')

22/04/30 17:47:36 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 5.0 in stage 281.0 (TID 596) (cluster-5ead-w-1.c.deep-learning-final-345119.internal executor 9): java.lang.StackOverflowError
	at java.io.ObjectInputStream$BlockDataInputStream.readByte(ObjectInputStream.java:3277)
	at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1780)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1634)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
	at java.io.ObjectInputStream.readObje

In [16]:
new_df.limit(5).toPandas().to_csv('/home/saisur/pagerank_small.csv', sep='\t', index=False)

                                                                                