In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 68kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 41.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=53084073e463d3e1f434caadc21dbe8c71fb1d8dbee22a7d6b691048d40c3e55
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, DataFrame
from pyspark.sql.functions import desc,monotonically_increasing_id,sum,lit,col

import os
import re
from functools import reduce

In [None]:
SUBMIT_ARGS = "--packages graphframes:graphframes:0.8.0-spark3.0-s_2.12 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

In [None]:
conf = SparkConf().setAppName('GraphFrames').setMaster('local')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [None]:
from graphframes import *

## Data cleaning and processing


In [None]:
# Give data path here
data_path = 'wiki_tiny.txt'
dataRDD = sc.textFile(data_path) 
#dataRDD.take(4)

In [None]:
appName = "PySpark Example - Python Array/List to Spark Data Frame"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

In [None]:
samples = dataRDD.collect()
tag = "title"
reg_str = "<" + tag + ">(.*?)</" + tag + ">"
res = re.findall(reg_str,str(samples))
title = [x for x in res]
url1 = []
for i in range(len(samples)):      #creating list of urls
  u = list(re.findall(r'\[([^\[\]]*)\]', str(samples[i])))
  url1.append(u)
tup1 = list(zip((title[i],url1[i][j]) for i in range(len(samples)) for j in range(len(url1[i]))))  #creating list of title,URL
tupf = list(tup1[i][0] for i in range(len(tup1)))     #Final list
#convert list into RDD 
rdd = spark.sparkContext.parallelize(tupf)
#print(tup1)
#print(tupf)

## Graph creation


In [None]:
columns = ['src','dst']   #list of columns
edgesDF =  rdd.toDF(columns)   #converting edgesRDD to edgesDF and changing column names.

In [None]:
edgesDF.show()

+-----+-------------------+
|  src|                dst|
+-----+-------------------+
|April|              month|
|April|               year|
|April|              March|
|April|                May|
|April|                day|
|April|               July|
|April|            January|
|April|           December|
|April|             flower|
|April|          Sweet Pea|
|April|   Asteraceae|Daisy|
|April|         birthstone|
|April|            diamond|
|April|             Spring|
|April|Northern Hemisphere|
|April|              March|
|April|                May|
|April|               June|
|April|          September|
|April|           November|
+-----+-------------------+
only showing top 20 rows



In [None]:
# Get dataframes for source and destination vertices from edgesDF's src and dst columns respectively
srcDF = edgesDF.select('src').distinct()  #getting distinct values from src column
destDF = edgesDF.select('dst').distinct()     #getting distinct values from dst columns

In [None]:
# Union vertices from srcDF and destDF, which will give back another dataframe (named verticesDF) with only one column
df = [srcDF,destDF]
verticesDF = reduce(DataFrame.unionAll,df)

# Rename the column as "Page" and assign back to the same variable (verticesDF)
verticesDF = verticesDF.toDF("Pages")

# Add an ID column to the dataframe and assign back to the same variable (verticesDF)
verticesDF = verticesDF.withColumn("id",monotonically_increasing_id())

In [None]:
verticesDF.show()

+----------------+---+
|           Pages| id|
+----------------+---+
|          Helium|  0|
|            July|  1|
|       Scientist|  2|
|          Volume|  3|
|Operating System|  4|
|           1970s|  5|
| Portsmouth F.C.|  6|
|         Crusade|  7|
|           Drink|  8|
|         Kilauea|  9|
|            Silk| 10|
|  Brest (France)| 11|
|       Bangalore| 12|
|    Saint George| 13|
|               K| 14|
|         Antonym| 15|
|       Dodgeball| 16|
|          Flower| 17|
|       Febuary 9| 18|
|       Magdeburg| 19|
+----------------+---+
only showing top 20 rows



In [None]:
# Create a graphframe from verticesDF and edgesDF
G = GraphFrame(verticesDF,edgesDF)

In [None]:
G

GraphFrame(v:[id: bigint, Pages: string], e:[src: string, dst: string])

## Basic queries on the constructed graph

In [None]:
# a query to show number of edges in the graph
G.vertices.count()

231618

In [None]:

# a query to show number of edges in the graph
G.edges.count()

708787

In [None]:
#getting indegrees of the graph
i = G.inDegrees   
#i.show()
i_dec = i.orderBy(desc("inDegree"))  #sort them in descending order
i_dec.show(15)  #display first 15 rows of the dataframe

+-------------+--------+
|           id|inDegree|
+-------------+--------+
|   footballer|    3835|
|         2014|    2686|
|         2017|    2655|
|         2015|    2405|
|         2018|    2170|
|         2016|    1863|
|United States|    1801|
|         2019|    1726|
|    President|    1585|
|         2013|    1405|
|        actor|    1291|
|         1945|    1252|
|      actress|    1250|
|         1944|    1237|
|         1942|    1192|
+-------------+--------+
only showing top 15 rows



In [None]:
# Getting degrees of the graph
# The output will display which page in wikipedia is having more links(sum of in-links and out-links)
deg = G.degrees
#deg.show()
deg_dec = deg.orderBy(desc("degree"))   #sort them in descending order
deg_dec.show(15)

+--------------------+------+
|                  id|degree|
+--------------------+------+
|Wikipedia:List of...|  5096|
|          footballer|  3835|
|                1992|  2911|
|                2014|  2686|
|                2017|  2655|
|                2015|  2405|
|                1930|  2403|
|                1991|  2291|
|                1980|  2188|
|                2018|  2170|
|              Ankara|  2080|
|                1964|  2046|
|                1984|  1917|
|         Memory Card|  1893|
|        Fifth Avenue|  1885|
+--------------------+------+
only showing top 15 rows



In [None]:
# Getting number of multiple edges between nodes in graph G
# The graph created from Wikipedia articles have multiple edges. Example: 150 edges from the node "April" to the node "August"
# This means there the page "April" has reference the page "August" 150 times
edgesDF.groupBy("src","dst").count().sort(desc("count")).show(15)

+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|Wikipedia:Deletio...|  User:Angela|Angela|  767|
|Wikipedia:Deletio...|User:Netoholic|Ne...|  286|
|                Line|   United States|USA|  141|
|         Popocatpetl|                    |  103|
|              Orbits|             Bavaria|   96|
|List of communist...|      United Kingdom|   94|
|Wikipedia:Deletio...|User:SimonMayer|S...|   93|
|          Superhuman|             La Liga|   88|
|List of Spanish f...|Football League C...|   82|
|List of Italian f...|Football League C...|   79|
|        Chelsea F.C.|Football League C...|   78|
|   Derby County F.C.|Football League C...|   78|
|       Drink driving|Football League C...|   77|
|        Everton F.C.|Football League C...|   72|
|                Line|             Germany|   72|
+--------------------+--------------------+-----+
only showing top 15 rows



## Message Aggregation - Page Ranking

In [None]:
# Getting the graph G's outdegree to outDegDF
OutDegDF = G.outDegrees  #getting outDegrees 
OutDegDF = OutDegDF.toDF("Pages","outDegree")   
verticesDF = verticesDF.join(OutDegDF,["Pages"])  #joining outDegDF and verticesDF

In [None]:
verticesDF.show()

+--------------------+-------------+---------+
|               Pages|           id|outDegree|
+--------------------+-------------+---------+
|                1512|           52|        4|
|                1512|1717986918478|        4|
|               1970s|            5|      206|
|               1970s|1717986918545|      206|
|          24 October|           42|        1|
|          24 October|1717986918584|        1|
|               7 May|           38|       25|
|             Antonym|           15|      139|
|             Antonym|1717986918786|      139|
|           Bangalore|           12|       51|
|           Bangalore|1717986918455|       51|
|      Brest (France)|           11|       15|
|          Carl Radle|           46|       27|
|          Carl Radle|1717986918963|       27|
|       Category:1083|           37|        2|
|       Category:1555|           49|        5|
|Category:1862 deaths|           43|        1|
|   Category:Game Boy|           39|       51|
|      Catego

In [None]:
# Creating a new graph new_G with new verticesDF and existing edgesDF
new_G = GraphFrame(verticesDF,edgesDF)

In [None]:
# Writing an aggregateMessages function on 'new_G' to sum neighbors' outDegree for each vertex 
# messages has been  passed  only via the edge direction
# The aggregated outDegree will be the rank of a page (higher the rank of a page, higher its importance )
msgToSrc = AM.dst["outDegree"]
msgTodst = AM.src["outDegree"]
agg = new_G.aggregateMessages(
    sqlsum(AM.msg).alias("Rank"),
    sendToSrc=msgToSrc,
    sendToDst=msgToDst)
agg.show(15)

+---+----+
| id|Rank|
+---+----+
| 29|   5|
| 50|  32|
| 25| 147|
|  6| 976|
| 33|  15|
|  1| 105|
| 10|  66|
| 44| 502|
|  3|  13|
|  8|   1|
| 11|   9|
|  4|  32|
| 18|   1|
| 23|  29|
+---+----+

