In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=a7f3c3fe1bb1e4ecfef44a81ec1e89a639ad91947b60a18a97685022b50cad90
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


##  Imports the SparkContext class from the PySpark library

In [2]:
from pyspark import SparkContext

## Initialize SparkContext

In [3]:
sc = SparkContext("local", "TriangleCount")

## The sample data

In [4]:
sample_data = [
    (0, 1),
    (0, 2),
    (1, 2),
    (1, 3),
    (2, 3),
    (0, 3)
]

sample_data

[(0, 1), (0, 2), (1, 2), (1, 3), (2, 3), (0, 3)]

## Create an RDD from the sample data

In [5]:
edges = sc.parallelize(sample_data)
print(edges.collect())

[(0, 1), (0, 2), (1, 2), (1, 3), (2, 3), (0, 3)]


## Create an RDD of unique nodes

In [6]:
nodes = edges.flatMap(lambda edge: [edge[0], edge[1]]).distinct()
nodes
# Print the unique nodes
print(nodes.collect())

[0, 1, 2, 3]


## Generate an RDD containing pairs of nodes and their respective neighbors

In [7]:
neighbors = edges.flatMap(lambda edge: [(edge[0], edge[1]), (edge[1], edge[0])])
neighbors
# Print the pairs of nodes and their respective neighbors
print(neighbors.collect())

[(0, 1), (1, 0), (0, 2), (2, 0), (1, 2), (2, 1), (1, 3), (3, 1), (2, 3), (3, 2), (0, 3), (3, 0)]


## Generate an RDD containing pairs in the format ((u, v), w) with the condition that u is less than v.

In [8]:
edge_pairs = neighbors.map(lambda x: ((min(x[0], x[1]), max(x[0], x[1])), None))
edge_pairs


PythonRDD[7] at RDD at PythonRDD.scala:53

## Group the edge pairs by key

In [9]:
grouped_edges = edge_pairs.groupByKey()
grouped_edges


PythonRDD[12] at RDD at PythonRDD.scala:53

## Define a function to count triangles and collect nodes

In [10]:
def count_triangles(edge_item):
    ((node1, node2), neighbor_values) = edge_item
    neighbor_values_list = list(neighbor_values)
    unconnected_neighbors_count = neighbor_values_list.count(None)

    # Calculate the number of triangles involving this edge
    triangles_count = (unconnected_neighbors_count * (unconnected_neighbors_count - 1)) // 2

    # Determine the nodes participating in this triangle
    triangle_nodes = set([node1, node2])
    return triangles_count, triangle_nodes



## Calculate triangles and collect nodes for each edge

In [11]:
triangle_node_counts = grouped_edges.map(lambda item: count_triangles(item))
triangle_node_counts
# Print the expected results
for triangles, nodes in triangle_node_counts.collect():
    print(f"Triangles Count: {triangles}")
    print(f"Triangle Nodes: {nodes}\n")

Triangles Count: 1
Triangle Nodes: {0, 1}

Triangles Count: 1
Triangle Nodes: {0, 2}

Triangles Count: 1
Triangle Nodes: {1, 2}

Triangles Count: 1
Triangle Nodes: {1, 3}

Triangles Count: 1
Triangle Nodes: {2, 3}

Triangles Count: 1
Triangle Nodes: {0, 3}



## Calculate the total number of triangles

In [12]:
total_triangles = triangle_node_counts.map(lambda x: x[0]).sum()
# Print the total number of triangles
print(f"The total number of triangles in the graph is: {total_triangles}")

The total number of triangles in the graph is: 6


## Calculate the average nodes involved in triangles for each node

In [13]:
average_nodes = triangle_node_counts.flatMap(lambda x: [(node, 1) for node in x[1]]) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda x: (x[0], x[1] / total_triangles))

# Print the average nodes involved in triangles for each node
print("Average nodes involved in triangles for each node:")
for node, average in average_nodes.collect():
    print(f"Node {node}: {average}")

Average nodes involved in triangles for each node:
Node 0: 0.5
Node 1: 0.5
Node 2: 0.5
Node 3: 0.5


## Print the total number of triangles

In [14]:
print(f"The triangle count of the graph is: {total_triangles // 3}")

The triangle count of the graph is: 2


## Print the average nodes involved in triangles for each node

In [15]:
print("The average number of nodes that are involved in triangles for each node in the graph is :")
average_nodes.collect()

The average number of nodes that are involved in triangles for each node in the graph is :


[(0, 0.5), (1, 0.5), (2, 0.5), (3, 0.5)]

## Test in a sample input file(few-edges.txt)

In [18]:
few_edges = sc.textFile("/content/drive/MyDrive/few-edges.txt")
few_edges_data = few_edges.map(lambda line: tuple(map(int, line.strip().split(' '))))
few_edges_triangle_node_counts = few_edges_data.flatMap(lambda edge: [((min(edge[0], edge[1]), max(edge[0], edge[1])), None)]) \
    .groupByKey().map(lambda item: count_triangles(item))
# Print the results for the sample input file
print("Results for the 'few-edges.txt' dataset:")
for triangles, nodes in few_edges_triangle_node_counts.collect():
    print(f"Edge: {nodes} - Triangles Count: {triangles}")

Results for the 'few-edges.txt' dataset:
Edge: {0, 1} - Triangles Count: 1
Edge: {0, 2} - Triangles Count: 1
Edge: {0, 3} - Triangles Count: 1
Edge: {1, 3} - Triangles Count: 1
Edge: {2, 6} - Triangles Count: 1
Edge: {2, 5} - Triangles Count: 1
Edge: {5, 6} - Triangles Count: 1
Edge: {4, 6} - Triangles Count: 1
Edge: {4, 5} - Triangles Count: 1
Edge: {8, 7} - Triangles Count: 1
Edge: {9, 7} - Triangles Count: 1
Edge: {10, 7} - Triangles Count: 1
Edge: {8, 10} - Triangles Count: 1
Edge: {9, 13} - Triangles Count: 1
Edge: {9, 12} - Triangles Count: 1
Edge: {12, 13} - Triangles Count: 1
Edge: {11, 13} - Triangles Count: 1
Edge: {11, 12} - Triangles Count: 1


## Calculate the total number of triangles

In [19]:
few_edges_total_triangles = few_edges_triangle_node_counts.map(lambda x: x[0]).sum()

## Calculate the average nodes involved in triangles for each node

In [20]:
few_edges_average_nodes = few_edges_triangle_node_counts.flatMap(lambda x: [(node, 1) for node in x[1]]) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda x: (x[0], x[1] / few_edges_total_triangles))


## Print the total number of triangles

In [21]:
print(f"The number of triangles in the 'few-edges.txt' graph is: {few_edges_total_triangles // 3}")

The number of triangles in the 'few-edges.txt' graph is: 6


## Print the average nodes involved in triangles for each node

In [22]:
print("The average number of nodes that each node shares a triangle with:")
few_edges_average_nodes.collect()


The average number of nodes that each node shares a triangle with:


[(0, 0.16666666666666666),
 (1, 0.1111111111111111),
 (2, 0.16666666666666666),
 (3, 0.1111111111111111),
 (6, 0.16666666666666666),
 (5, 0.16666666666666666),
 (4, 0.1111111111111111),
 (8, 0.1111111111111111),
 (7, 0.16666666666666666),
 (9, 0.16666666666666666),
 (10, 0.1111111111111111),
 (13, 0.16666666666666666),
 (12, 0.16666666666666666),
 (11, 0.1111111111111111)]

In [23]:
# terminate the Spark application
sc.stop()