In [1]:
%pip install pyspark graphframes

Note: you may need to restart the kernel to use updated packages.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from graphframes import GraphFrame

spark = SparkSession.builder \
    .appName("Social Network Analysis") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.1-s_2.12") \
    .getOrCreate()

# Load SNAP dataset com-Amazon
input_path = "data/com-amazon.all.dedup.cmty.txt"
output_path = "data/amazon_edges.txt"

with open(input_path, "r") as infile, open(output_path, "w") as outfile:
    for line in infile:
        nodes = line.strip().split()
        src = nodes[0]
        for dst in nodes[1:]:
            if src != dst:
                outfile.write(f"{src}\t{dst}\n")

edges_df = spark.read.csv("amazon_edges.txt", sep="\t", inferSchema=True).toDF("src", "dst")

# Create edges
edges_df = edges_df.union(edges_df.select(col("dst").alias("src"), col("src").alias("dst"))).distinct()

# Create vertices
vertices_df = edges_df.select(col("src").alias("id")).union(edges_df.select(col("dst").alias("id"))).distinct()

# Build GraphFrame
g = GraphFrame(vertices_df, edges_df)

25/04/23 17:45:44 WARN Utils: Your hostname, carlostsai.local resolves to a loopback address: 127.0.0.1; using 192.168.1.121 instead (on interface en0)
25/04/23 17:45:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/xxittysnxx/.ivy2/cache
The jars for the packages stored in: /Users/xxittysnxx/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fc365447-f502-4b50-8885-523713ebf829;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.1-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 60ms :: artifacts dl 3ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.1-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   

:: loading settings :: url = jar:file:/Users/xxittysnxx/miniconda3/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


25/04/23 17:45:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# a. Find the top 5 nodes with the highest outdegree and find the count of the number of outgoing edges in each
g.outDegrees.orderBy("outDegree", ascending=False).show(5)

[Stage 4:>                                                          (0 + 8) / 8]

+---+---------+
| id|outDegree|
+---+---------+
| 21|    62166|
|  7|    53550|
|  6|    45977|
| 11|    40290|
|  1|    26966|
+---+---------+
only showing top 5 rows



                                                                                

In [4]:
# b. Find the top 5 nodes with the highest indegree and find the count of the number of incoming edges in each
g.inDegrees.orderBy("inDegree", ascending=False).show(5)



+---+--------+
| id|inDegree|
+---+--------+
| 21|   62166|
|  7|   53550|
|  6|   45977|
| 11|   40290|
|  1|   26966|
+---+--------+
only showing top 5 rows



                                                                                

In [5]:
# c. Calculate PageRank for each of the nodes and output the top 5 nodes with the highest PageRank values. You are free to define any suitable parameters.
pr = g.pageRank(resetProbability=0.15, maxIter=10)
pr.vertices.orderBy("pagerank", ascending=False).select("id", "pagerank").show(5)

25/04/23 17:45:55 WARN MemoryStore: Not enough space to cache rdd_86_4 in memory! (computed 38.3 MiB so far)
25/04/23 17:45:55 WARN MemoryStore: Not enough space to cache rdd_82_7 in memory! (computed 44.1 MiB so far)
25/04/23 17:45:55 WARN MemoryStore: Not enough space to cache rdd_82_2 in memory! (computed 38.6 MiB so far)
25/04/23 17:45:55 WARN BlockManager: Block rdd_86_4 could not be removed as it was not found on disk or in memory
25/04/23 17:45:55 WARN BlockManager: Block rdd_82_2 could not be removed as it was not found on disk or in memory
25/04/23 17:45:55 WARN BlockManager: Putting block rdd_86_4 failed
25/04/23 17:45:55 WARN BlockManager: Block rdd_82_7 could not be removed as it was not found on disk or in memory
25/04/23 17:45:55 WARN BlockManager: Putting block rdd_82_7 failed
25/04/23 17:45:55 WARN BlockManager: Putting block rdd_82_2 failed
25/04/23 17:45:55 WARN MemoryStore: Not enough space to cache rdd_86_2 in memory! (computed 38.6 MiB so far)
25/04/23 17:45:55 WAR

+---+-----------------+
| id|         pagerank|
+---+-----------------+
| 21|6384.880075830329|
|  6|5652.756230220395|
|  7|5362.062282094499|
| 11|4306.846780175733|
|  1|3595.660142415134|
+---+-----------------+
only showing top 5 rows



                                                                                

In [6]:
# d. Run the connected components algorithm on it and find the top 5 components with the largest number of nodes.
spark.sparkContext.setCheckpointDir("/tmp/graphframes-checkpoints")
cc = g.connectedComponents(algorithm="graphframes", checkpointInterval=2, broadcastThreshold=10000000)
cc.groupBy("component").count().orderBy("count", ascending=False).show(5)

25/04/23 17:46:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/23 17:46:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

+---------+------+
|component| count|
+---------+------+
|        1|306995|
|      278|   344|
|     4203|   253|
|      448|   131|
|     9477|   120|
+---------+------+
only showing top 5 rows



In [7]:
# e. Run the triangle counts algorithm on each of the vertices and output the top 5 vertices with the largest triangle count. In case of ties, you can randomly select the top 5 vertices.
triangles = g.triangleCount()
triangles.orderBy("count", ascending=False).select("id", "count").show(5)

25/04/23 17:47:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/23 17:47:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/23 17:47:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/23 17:47:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/23 17:47:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/23 17:47:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/23 17:47:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/23 17:47:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/23 17:47:34 WARN RowBasedKeyValueBatch: Calling spill() on

+---+------+
| id| count|
+---+------+
| 21|238877|
|  7|199434|
| 11|112001|
|  6|100261|
| 38| 99971|
+---+------+
only showing top 5 rows



                                                                                