In [1]:
#By: Akshay Engolikar, Vijayendra Kosigi, Chinmaya Gokul Madhusudhan
# Install necessary packages
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
!tar xf spark-3.3.3-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=b1afab049e0976d3b399d63c955de2685dc08648fe364fbac405cfe7373154ba
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
# Set environment variables for Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"


In [3]:
# Initialize Spark
import findspark
findspark.init()
from pyspark import SparkContext
from google.colab import files

In [4]:
from operator import sub
# Upload files
files.upload()

Saving triangle.txt to triangle.txt


{'triangle.txt': b'0, 1\r\n0, 2\r\n1, 2\r\n1, 3\r\n2, 3'}

In [5]:
# Create Spark context
sc = SparkContext.getOrCreate()

In [6]:
# Step 1: Load edges from the text file
edges_rdd = sc.textFile("triangle.txt")
print(edges_rdd.collect())

['0, 1', '0, 2', '1, 2', '1, 3', '2, 3']


In [7]:
# Step 1: Convert each line to a tuple (u, v)
def line_to_tuple(line):
    values = line.split(', ')
    return tuple(map(int, values))

edgestotuples = edges_rdd.map(line_to_tuple)
print(edgestotuples.collect())

[(0, 1), (0, 2), (1, 2), (1, 3), (2, 3)]


In [8]:
# Step 1: Map each edge to two key-value pairs, (u, v) and (v, u)
def edge_to_tuples(edge):
    return [(edge[0], edge[1]), (edge[1], edge[0])]

mappededges = edgestotuples.flatMap(edge_to_tuples)
print(mappededges.collect())


[(0, 1), (1, 0), (0, 2), (2, 0), (1, 2), (2, 1), (1, 3), (3, 1), (2, 3), (3, 2)]


In [9]:
# Step 1: Group by key to get the list of neighbors for each node
neighborsgrouped = mappededges.groupByKey().mapValues(list)
neighborsgrouped.collect()

[(0, [1, 2]), (2, [0, 1, 3]), (1, [0, 2, 3]), (3, [1, 2])]

In [10]:
# Step 2: Generate triangle candidates by emitting (w1, w2) for each pair of neighbors of u
def generate_triangle_candidates(data):
    node, neighbors = data
    candidates = [((w1, w2), "to check") for w1 in neighbors for w2 in neighbors if w1 != w2]
    return candidates

triangle_candidates = neighborsgrouped.flatMap(generate_triangle_candidates)
print(triangle_candidates.collect())


[((1, 2), 'to check'), ((2, 1), 'to check'), ((0, 1), 'to check'), ((0, 3), 'to check'), ((1, 0), 'to check'), ((1, 3), 'to check'), ((3, 0), 'to check'), ((3, 1), 'to check'), ((0, 2), 'to check'), ((0, 3), 'to check'), ((2, 0), 'to check'), ((2, 3), 'to check'), ((3, 0), 'to check'), ((3, 2), 'to check'), ((1, 2), 'to check'), ((2, 1), 'to check')]


In [11]:
# Step 2: Tag each present edge in the graph with "present edge"
present_edges = edgestotuples.map(lambda edge: (edge, "present edge"))
present_edges.collect()

[((0, 1), 'present edge'),
 ((0, 2), 'present edge'),
 ((1, 2), 'present edge'),
 ((1, 3), 'present edge'),
 ((2, 3), 'present edge')]

In [12]:
# Step 2: Combine triangle candidates and present edges
all_edges = triangle_candidates.union(present_edges)
all_edges.collect()

[((1, 2), 'to check'),
 ((2, 1), 'to check'),
 ((0, 1), 'to check'),
 ((0, 3), 'to check'),
 ((1, 0), 'to check'),
 ((1, 3), 'to check'),
 ((3, 0), 'to check'),
 ((3, 1), 'to check'),
 ((0, 2), 'to check'),
 ((0, 3), 'to check'),
 ((2, 0), 'to check'),
 ((2, 3), 'to check'),
 ((3, 0), 'to check'),
 ((3, 2), 'to check'),
 ((1, 2), 'to check'),
 ((2, 1), 'to check'),
 ((0, 1), 'present edge'),
 ((0, 2), 'present edge'),
 ((1, 2), 'present edge'),
 ((1, 3), 'present edge'),
 ((2, 3), 'present edge')]

In [13]:
# Step 2: Count triangles by checking if an edge is present and how many times it needs to be checked
def reduce_triangles(key, values):
    if "present edge" in values:
        return values.count("to check")

In [14]:
count_triangles = all_edges.groupByKey().mapValues(list).flatMap(lambda x: [reduce_triangles(x[0], x[1])])
count_triangles.collect()

[1, None, 2, 1, None, 1, 1, None, None, None, None, None]

In [15]:
# Step 2: Sum up the counts, each triangle is counted 3 times
total_triangles = count_triangles.filter(lambda x: x is not None).sum() // 3
total_triangles

2

In [16]:
# Step 3: Calculate the average number of triangles per node
total_nodes = neighborsgrouped.count()
avg_triangles_pn = total_triangles / total_nodes if total_nodes else 0

In [17]:
# Step 3: Print the total number of triangles and the average
print(f"The number of triangles on the graph is: {total_triangles}")
print(f"The average number of triangles per node is: {avg_triangles_pn}")

The number of triangles on the graph is: 2
The average number of triangles per node is: 0.5
