In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.6.0-spark2.3-s_2.11 pyspark-shell'

import pyspark
sc = pyspark.SparkContext("local[*]")

In [2]:
from graphframes import *
import hashlib
from pyspark.sql import *
from pyspark.sql.functions import udf

In [3]:
# Play with a dummy graph.
spark = SparkSession.builder.appName('fun').getOrCreate()
vertices = spark.createDataFrame([('1', 'Carter', 'Derrick', 50), 
                                  ('2', 'May', 'Derrick', 26),
                                 ('3', 'Mills', 'Jeff', 80),
                                  ('4', 'Hood', 'Robert', 65),
                                  ('5', 'Banks', 'Mike', 93),
                                 ('98', 'Berg', 'Tim', 28),
                                 ('99', 'Page', 'Allan', 16)],
                                 ['id', 'name', 'firstname', 'age'])
edges = spark.createDataFrame([('1', '2', 'friend'), 
                               ('2', '1', 'friend'),
                              ('3', '1', 'friend'),
                              ('1', '3', 'friend'),
                               ('2', '3', 'follows'),
                               ('3', '4', 'friend'),
                               ('4', '3', 'friend'),
                               ('5', '3', 'friend'),
                               ('3', '5', 'friend'),
                               ('4', '5', 'follows'),
                              ('98', '99', 'friend'),
                              ('99', '98', 'friend')],
                              ['src', 'dst', 'type'])

In [4]:
g = GraphFrame(vertices, edges)

In [5]:
g?

In [6]:
result = g.labelPropagation(maxIter=5)

In [7]:
result.select("id", "label").show()

+---+------------+
| id|       label|
+---+------------+
|  1|154618822656|
|  3|420906795008|
|  2|420906795008|
|  4|154618822656|
| 98|326417514496|
|  5|154618822656|
| 99|317827579904|
+---+------------+



### Load sample webgraph

In [8]:
# This dataset is already filtered to include only links between TLDs, not within TLDs. 
# I also filtered out common sites and resources for a cleaner sample graph.
raw_data = spark.read.parquet("data/outlinks_pq/*.snappy.parquet")
raw_data.count()

18399

In [9]:
# Rename columns to something decent. 
df = raw_data.withColumnRenamed("_c0", "parent")\
.withColumnRenamed("_c1", "parentTLD")\
.withColumnRenamed("_c2", "childTLD")\
.withColumnRenamed("_c3", "child")\
.filter("parentTLD is not null and childTLD is not null")

In [10]:
df.show(5)

+--------------+---------+----------------+--------------------+
|        parent|parentTLD|        childTLD|               child|
+--------------+---------+----------------+--------------------+
|http://msn.com|  msn.com|tradedoubler.com|https://clk.trade...|
|http://msn.com|  msn.com|   microsoft.com|https://go.micros...|
|http://msn.com|  msn.com|     outlook.com|http://www.outloo...|
|http://msn.com|  msn.com|   microsoft.com|https://advertisi...|
|http://msn.com|  msn.com|tradedoubler.com|https://clk.trade...|
+--------------+---------+----------------+--------------------+
only showing top 5 rows



In [11]:
# Select set of parents and children TLDs (your nodes) to assign id for each node.

aggcodes = df.select("parentTLD","childTLD").rdd.flatMap(lambda x: x).distinct()
aggcodes.count()

4613

In [12]:
def hashnode(x):
    return hashlib.sha1(x.encode("UTF-8")).hexdigest()[:8]

hashnode_udf = udf(hashnode)

In [14]:
vertices = aggcodes.map(lambda x: (hashnode(x), x)).toDF(["id","name"])

vertices.show(5)

+--------+----------------+
|      id|            name|
+--------+----------------+
|000db143|         msn.com|
|51a48ea2|tradedoubler.com|
|31312317|   microsoft.com|
|a45016f2|     outlook.com|
|2f5bf4c8|        bing.com|
+--------+----------------+
only showing top 5 rows



In [15]:
edges = df.select("parentTLD","childTLD")\
.withColumn("src", hashnode_udf("parentTLD"))\
.withColumn("dst", hashnode_udf("childTLD"))\
.select("src","dst")

edges.show(5)

+--------+--------+
|     src|     dst|
+--------+--------+
|000db143|51a48ea2|
|000db143|31312317|
|000db143|a45016f2|
|000db143|31312317|
|000db143|51a48ea2|
+--------+--------+
only showing top 5 rows



In [16]:
# create GraphFrame
graph = GraphFrame(vertices, edges)

In [17]:
# Run LPA
communities = graph.labelPropagation(maxIter=5)

In [18]:
communities.persist().show(10)

+--------+--------------------+-------------+
|      id|                name|        label|
+--------+--------------------+-------------+
|407ae1cc|             coop.no| 781684047881|
|1b0357be|  buenacuerdo.com.ar|1245540515843|
|acc8136a|   toptenreviews.com|1537598291986|
|abdd63cd| liberoquotidiano.it| 317827579915|
|db5c0434|          meetme.com| 712964571162|
|0f8dff85|           ameblo.jp| 171798691842|
|b6b04a58|             tlnk.io|1632087572480|
|5bcfd421|         wowhead.com| 429496729618|
|b4d4008d|investingcontrari...| 919123001350|
|ce7a3185|   pokemoncentral.it|1511828488194|
+--------+--------------------+-------------+
only showing top 10 rows



In [19]:
print (f"There are {communities.select('label').distinct().count()} communities in sample graph.")

There are 628 communities in sample graph.
