<a href="https://colab.research.google.com/github/mslikker/mslikker.github.io/blob/main/DISproject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Set-Up**

In [1]:
!pip install pyspark
!pip install findspark
!pip install -U -q PyDrive
!pip install pyngrok
!pip install graphframes
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

!wget https://repos.spark-packages.org/graphframes/graphframes/0.8.4-spark3.5-s_2.12/graphframes-0.8.4-spark3.5-s_2.12.jar
!pip install graphframes

openjdk-8-jdk-headless is already the newest version (8u422-b05-1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.
--2024-10-15 14:21:21--  https://repos.spark-packages.org/graphframes/graphframes/0.8.4-spark3.5-s_2.12/graphframes-0.8.4-spark3.5-s_2.12.jar
Resolving repos.spark-packages.org (repos.spark-packages.org)... 13.35.166.111, 13.35.166.93, 13.35.166.66, ...
Connecting to repos.spark-packages.org (repos.spark-packages.org)|13.35.166.111|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 247575 (242K) [binary/octet-stream]
Saving to: ‘graphframes-0.8.4-spark3.5-s_2.12.jar.1’


2024-10-15 14:21:21 (18.0 MB/s) - ‘graphframes-0.8.4-spark3.5-s_2.12.jar.1’ saved [247575/247575]



**Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.**

In [None]:
# Authenticate and create the PyDrive client (ipython-input-2-6083209ed024)
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)



**Replace the ID with your own ID: put the file in your Google Drive and click 'copy link', the ID is in the link**

In [None]:
id='1Pli2SAi1wEzY3eRsQNK1H3Erdko0_4di'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('telecom_calls_community.csv')

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [None]:
# Install GraphFrames Python package
!pip install graphframes

# Download the GraphFrames JAR file
!wget https://repos.spark-packages.org/graphframes/graphframes/0.8.4-spark3.5-s_2.12/graphframes-0.8.4-spark3.5-s_2.12.jar

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkConf

# Path to the downloaded GraphFrames JAR file
graphframes_jar_path = "/content/graphframes-0.8.4-spark3.5-s_2.12.jar"

# Modify your existing Spark session configuration to include the GraphFrames JAR
conf = SparkConf()
conf.set("spark.ui.port", "4050")
conf.setAppName("project")
conf.setMaster("local[*]")
conf.set("spark.driver.memory", "2G")
conf.set("spark.driver.maxResultSize", "2g")
conf.set("spark.executor.memory", "1G")
conf.set("spark.jars", graphframes_jar_path)  # Add GraphFrames JAR here

# Create the Spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

spark


***Check the active sessions***

In [None]:
import pyspark
from pyspark.sql import SparkSession # Import SparkSession explicitly

# Check for active SparkSession
if SparkSession.getActiveSession() is not None:  # Use getActiveSession() directly
    print("Active SparkSession exists")
    print(SparkSession.getActiveSession())  # Get the active session
else:
    print("No active SparkSession")

# Check for active SparkContext
if pyspark.SparkContext._active_spark_context is not None:
    print("Active SparkContext exists")
    print(pyspark.SparkContext.getOrCreate())  # Get the active context
else:
    print("No active SparkContext")

In [None]:
spark

In [None]:
# read the file
telecom_calls_community = spark.read.csv("telecom_calls_community.csv", header=True, inferSchema=True)
# show a formatted version of the file
telecom_calls_community.show(10)

In [None]:
# random things
print("In total there are {0} calls".format(telecom_calls_community.count()))

In [None]:
# Combine both caller columns
all_callers = telecom_calls_community.select("caller_id_1").union(telecom_calls_community.select("caller_id_2"))

# Count distinct callers
num_different_callers = all_callers.distinct().count()

print(f"Number of different callers: {num_different_callers}")

# Finding communities using Connected Components

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

df = spark.read.csv("telecom_calls_community.csv", header=True, inferSchema=True)
df.show(5)

# Rename columns to src which stands for 'source_node' and dst which stands for'destination_node'
edges = df.selectExpr("caller_id_1 as src", "caller_id_2 as dst")
edges.show(20)

# Convert edges DataFrame to an RDD of (source, destination) pairs
edges_rdd = edges.rdd.map(lambda row: (row['src'], row['dst']))

print("RDD data:")
print(edges_rdd.take(5))


# for every source, if the id is the same in a row (for example in row 2: src is 27 and in row 17: src is 27, but the dst is 14 in row 2 and 8 in row 17), i want it to

# Using GraphFrames to identify communities in large datasets

**Dataset**

In [None]:
from graphframes import GraphFrame # This imports GraphFrame from the graphframes library

# Create vertices DataFrame
vertices = telecom_calls_community.select("caller_id_1").distinct().withColumnRenamed("caller_id_1", "id").union(telecom_calls_community.select("caller_id_2").distinct().withColumnRenamed("caller_id_2", "id")).distinct()

# Create the GraphFrame
g = GraphFrame(vertices, edges) # We can use GraphFrame class now since we imported it.

# Show vertices and edges
g.vertices.show()
g.edges.show()

# Set checkpoint directory before running connected components
spark.sparkContext.setCheckpointDir("/tmp/checkpoint") # Choose a suitable directory

# Run the connected components algorithm
result = g.connectedComponents()
result.show()


# WORK IN PROGRESS minhash

In [None]:
## TRY OUT
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.linalg import Vectors
import numpy as np
import hashlib


#Transform into DF where each row represents a community and its nodes as a set
community_nodes = result.groupBy("component").agg(F.collect_list("id").alias("nodes"))

# Show the community nodes DataFrame
community_nodes.show()

In [None]:
# Import libraries (partially redundant)
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.linalg import Vectors
import numpy as np
import hashlib
from pyspark.sql import SparkSession


# Transform into DF where each row represents a community and its nodes as a set
community_nodes = result.groupBy("component").agg(F.collect_list("id").alias("nodes"))

# Show the community nodes DataFrame
community_nodes.show(truncate=False)


In [None]:
# Number of hash functions (we can set what we think right)
num_hashes = 30

# Hashing function to produce MinHash signatures
def min_hash_signature(nodes):
    # Create a random hash function
    random_hashes = np.random.randint(1, 1000, size=num_hashes)  # Random hash seeds
    min_hash_values = []
    for i in range(num_hashes):
        # Apply the hash function to each node and take the minimum
        hashed_values = [hash(node) % random_hashes[i] for node in nodes]
        min_hash_values.append(min(hashed_values))
    return Vectors.dense(min_hash_values)

In [None]:
# Register user defined function
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType

min_hash_udf = udf(lambda nodes: min_hash_signature(nodes).toArray().tolist(), ArrayType(DoubleType()))

# Apply the MinHashing to each community
community_signatures = community_nodes.withColumn("signature", min_hash_udf("nodes"))

print(community_signatures)

# Show the community signatures DataFrame
#community_signatures.show(truncate=False)

# WORK IN PROGRESS END

# Louvain Algorithm (via Label Propagation)

Here you have to set the max iterations

In [None]:
import time
from graphframes import GraphFrame

def run_louvain_algorithm(g):
    print("Running Louvain (Label Propagation) algorithm...")
    start_time = time.time()
    louvain_result = g.labelPropagation(maxIter=10)  # Set max iterations
    execution_time = time.time() - start_time
    print(f"Louvain (Label Propagation) execution time: {execution_time:.2f} seconds")
    louvain_result.show(10)
    return louvain_result

vertices = telecom_calls_community.select("caller_id_1").distinct().withColumnRenamed("caller_id_1", "id").union(
    telecom_calls_community.select("caller_id_2").distinct().withColumnRenamed("caller_id_2", "id")).distinct()

edges = telecom_calls_community.selectExpr("caller_id_1 as src", "caller_id_2 as dst")

g = GraphFrame(vertices, edges)

spark.sparkContext.setCheckpointDir("/tmp/checkpoint")

louvain_result = run_louvain_algorithm(g)



# K-Cliques Algorithm

I think this is not relevant for us, you have to define K but we dont want that

In [None]:
from pyspark.sql import functions as F
from graphframes import GraphFrame
import time

def find_k_cliques(g, k):
    """
    Find K-Cliques in the graph using a custom PySpark implementation.
    """
    print(f"Running K-Cliques algorithm for k={k}...")

    # Measure start time
    start_time = time.time()

    # Initialize the cliques as 2-cliques (edges of the graph)
    cliques = g.edges.select(F.col("src").alias("node1"), F.col("dst").alias("node2"))

    # Iteratively join to find k-cliques
    for i in range(2, k):
        cliques = cliques.join(g.edges, cliques["node" + str(i)] == g.edges["src"]) \
                         .select([F.col("node" + str(j)) for j in range(1, i+1)] + [F.col("dst").alias("node" + str(i+1))]) \
                         .distinct()

    # Display the found k-cliques
    print(f"Found {cliques.count()} {k}-cliques")
    cliques.show(10)

    # Measure execution time
    execution_time = time.time() - start_time
    print(f"K-Cliques execution time: {execution_time:.2f} seconds")

    return cliques

# Prepare vertices and edges as before
vertices = telecom_calls_community.select("caller_id_1").distinct().withColumnRenamed("caller_id_1", "id").union(
    telecom_calls_community.select("caller_id_2").distinct().withColumnRenamed("caller_id_2", "id")).distinct()

edges = telecom_calls_community.selectExpr("caller_id_1 as src", "caller_id_2 as dst")

# Create the GraphFrame
g = GraphFrame(vertices, edges)

# Run the K-Cliques algorithm with k=3 (for example)
k = 3
k_cliques_result = find_k_cliques(g, k)


# Connected Components

In [None]:
import time
from graphframes import GraphFrame

def run_connected_components(g):
    print("Running Connected Components algorithm...")
    start_time = time.time()
    result = g.connectedComponents()
    execution_time = time.time() - start_time
    print(f"Connected Components execution time: {execution_time:.2f} seconds")
    result.show(10)
    return result

vertices = telecom_calls_community.select("caller_id_1").distinct().withColumnRenamed("caller_id_1", "id").union(
    telecom_calls_community.select("caller_id_2").distinct().withColumnRenamed("caller_id_2", "id")).distinct()

edges = telecom_calls_community.selectExpr("caller_id_1 as src", "caller_id_2 as dst")

g = GraphFrame(vertices, edges)

spark.sparkContext.setCheckpointDir("/tmp/checkpoint")

connected_components_result = run_connected_components(g)

# Save the output as CSV for Cytoscape visualisation
# 1. Save nodes (id, component) - the result of the connected components algorithm
nodes_df = connected_components_result.toPandas()
nodes_df.to_csv("nodes.csv", index=False)
print("Nodes data saved to 'nodes.csv'.")

# 2. Save edges (src, dst) - the original edges connecting the nodes
edges_df = edges.toPandas()
edges_df.to_csv("edges.csv", index=False)
print("Edges data saved to 'edges.csv'.")


**Further analysis on Connected components results:**

In [None]:
num_components = connected_components_result.select("component").distinct().count()
print(f"Total number of connected components: {num_components}")

component_sizes = connected_components_result.groupBy("component").count().orderBy("count", ascending=False)
component_sizes.show(10)  # Show the sizes of the top 10 largest components
print("Component = community; count = amount of connected nodes")


In [None]:
# Additional Libraries
import networkx as nx
import numpy as np
from itertools import combinations
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean

In [None]:
import networkx as nx
import numpy as np

def convert_to_networkx(connected_components_result, edges):
    """
    Convert the connected components to NetworkX graphs for each community.
    """
    community_graphs = []
    component_ids = connected_components_result.select("component").distinct().collect()

    for component in component_ids:
        component_id = component['component']
        component_data = connected_components_result.filter(col("component") == component_id)

        G = nx.Graph()

        # Collect node IDs as a set for better filtering in PySpark
        node_ids = {row['id'] for row in component_data.collect()}

        # Add nodes to the graph
        for node_id in node_ids:
            G.add_node(node_id)

        # Filter edges that are part of the current component
        edges_in_component = edges.filter(
            (edges['src'].isin(node_ids)) |
            (edges['dst'].isin(node_ids))
        ).select(col("src"), col("dst"))

        # Add the edges to the graph
        for edge in edges_in_component.collect():
            G.add_edge(edge['src'], edge['dst'])

        community_graphs.append(G)

    return community_graphs


In [None]:
# Convert connected components to NetworkX graphs
community_graphs = convert_to_networkx(connected_components_result, edges)

In [None]:
import matplotlib.pyplot as plt
import networkx as nx

def visualize_community_graphs(community_graphs, num_to_visualize=3):
    """
    Function to visualize a few community graphs and print their nodes and edges.
    Args:
        community_graphs (list): A list of NetworkX graphs representing each community.
        num_to_visualize (int): The number of community graphs to visualize.
    """
    for i, G in enumerate(community_graphs[:num_to_visualize]):
        print(f"Community {i + 1}:")
        print(f"Nodes: {G.nodes()}")
        print(f"Edges: {G.edges()}")

        # Draw the graph
        plt.figure(figsize=(5, 5))
        nx.draw(G, with_labels=True, node_color="skyblue", node_size=500, edge_color="gray")
        plt.title(f"Community {i + 1}")
        plt.show()

# Example usage: Visualize and inspect the first 3 community graphs
visualize_community_graphs(community_graphs, num_to_visualize=6)

In [None]:
# Function to create graphlet vectors
def get_graphlet_vector(G):
    """
    Create a vector representing the graphlets (subgraphs) in G.
    For simplicity, this will be a vector of node degrees.
    """
    nodes = list(G.nodes)
    vector = []
    for node in nodes:
        degree = G.degree[node]  # Get the degree (number of connections) of the node
        vector.append(degree)
    return np.array(vector)

In [None]:
from builtins import max as builtin_max  # Import the built-in max function

# 3. Function to Calculate gcd_11 (Spatial Similarity)
def calculate_gcd_11(communities):
    """
    Function to calculate GCD-11 distance between pairs of communities.
    """
    # Find the maximum size of any graphlet vector
    max_vector_length = builtin_max([len(get_graphlet_vector(g)) for g in communities]) # Use the built-in max

    gcd_11_distances = {}

    for i, j in combinations(range(len(communities)), 2):
        g1 = communities[i]
        g2 = communities[j]

        v1 = get_graphlet_vector(g1)
        v2 = get_graphlet_vector(g2)

        # Pad the shorter vector with zeros to match the length of the longer one
        if len(v1) < max_vector_length:
            v1 = np.pad(v1, (0, max_vector_length - len(v1)))
        if len(v2) < max_vector_length:
            v2 = np.pad(v2, (0, max_vector_length - len(v2)))

        # Calculate the GCD-11 distance using Euclidean distance
        distance = euclidean(v1, v2)
        gcd_11_distances[(i, j)] = distance

    return gcd_11_distances

In [None]:
# 2. Extract Time Sequences (for DTW)
def extract_time_sequences(connected_components_result, df):
    """
    Extract the start and end times of interactions in each community.
    """
    community_time_sequences = []
    component_ids = connected_components_result.select("component").distinct().collect()

    for component in component_ids:
        component_id = component['component']
        component_data = connected_components_result.filter(col("component") == component_id)

        # Correct the component_data filtering
        time_sequence = df.filter(
            (df['caller_id_1'].isin([row['id'] for row in component_data.collect()])) |
            (df['caller_id_2'].isin([row['id'] for row in component_data.collect()]))
        ).select(col("start_time"), col("end_time")).collect()

        community_time_sequences.append(time_sequence)

    return community_time_sequences

In [None]:
# Extract time sequences for communities
community_time_sequences = extract_time_sequences(connected_components_result, telecom_calls_community)

In [None]:
# 3. Function to Calculate DTW (Temporal Similarity)
def calculate_dtw(time_sequences):
    """
    Function to calculate DTW (Dynamic Time Warping) distance between pairs of time sequences.

    Args:
        time_sequences (list): A list of time sequences (start and end times for each community).

    Returns:
        dtw_distances (dict): A dictionary where keys are pairs of community indices, and values are the DTW distances.
    """
    dtw_distances = {}
    for i, j in combinations(range(len(time_sequences)), 2):
        seq1 = time_sequences[i]
        seq2 = time_sequences[j]

        # Calculate the DTW distance
        distance, _ = fastdtw(seq1, seq2, dist=euclidean)
        dtw_distances[(i, j)] = distance

    return dtw_distances

In [None]:
# 4. Combine GCD-11 and DTW Similarities
def combine_similarities(gcd_11_distances, dtw_distances):
    """
    Function to combine spatial (GCD-11) and temporal (DTW) similarities.

    Args:
        gcd_11_distances (dict): GCD-11 distance for each community pair.
        dtw_distances (dict): DTW distance for each community pair.
    Returns:
        combined_distances (dict): Combined distance for each community pair.
    """
    combined_distances = {}
    for key in gcd_11_distances:
        combined_distances[key] =  gcd_11_distances[key] +  dtw_distances[key]

    return combined_distances

In [None]:
gcd_11_distances = calculate_gcd_11(community_graphs)
dtw_distances = calculate_dtw(community_time_sequences)

# Combine the distances
combined_distances = combine_similarities(gcd_11_distances, dtw_distances)

# Printing the combined similarities for each pair of communities
print("Combined GCD-11 and DTW Similarities between community pairs:")
for (community_a, community_b), combined_distance in combined_distances.items():
    print(f"Community {community_a} and Community {community_b}: Combined Distance = {combined_distance:.4f}")

In [None]:
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt

# Convert the combined distances dictionary to a distance matrix
def create_distance_matrix(combined_distances, num_communities):
    distance_matrix = np.zeros((num_communities, num_communities))
    for (i, j), dist in combined_distances.items():
        distance_matrix[i, j] = dist
        distance_matrix[j, i] = dist  # Since the matrix is symmetric
    return distance_matrix

# Create the distance matrix
num_communities = len(community_graphs)
distance_matrix = create_distance_matrix(combined_distances, num_communities)

# Ensure it is symmetric and non-negative
print("Distance Matrix:")
print(distance_matrix)

In [None]:
# Using the Elbow Method to determine the best k
distortions = []
K = range(2, 6)  # Testing between 2 and 6 clusters
for k in K:
    kmeans = KMeans(n_clusters=k, random_state=42)
    kmeans.fit(distance_matrix)
    distortions.append(kmeans.inertia_)  # Inertia is the within-cluster sum of squares

# Plot the Elbow curve
plt.figure(figsize=(8, 5))
plt.plot(K, distortions, 'bx-')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('Distortion')
plt.title('Elbow Method for Optimal k')
plt.show()


Elbow Method for Optimal
𝑘
k:

This graph plots the distortion (or within-cluster sum of squares) against the number of clusters.
The idea is to find the "elbow point," where the rate of distortion reduction significantly slows down. This suggests that increasing the number of clusters beyond that point does not provide much improvement.
In this case, the elbow seems to occur around
𝑘
=
3
k=3, where the distortion starts flattening. This suggests that 3 clusters might be a good choice, as adding more clusters does not reduce distortion significantly.

In [None]:
# Run K-Means again with the optimal number of clusters
optimal_k = 3  # Replace this with the K chosen from Elbow
kmeans = KMeans(n_clusters=optimal_k, random_state=42)
final_labels = kmeans.fit_predict(distance_matrix)

# Print the final groupings of communities
print("Final Groupings of Communities:")
for i, label in enumerate(final_labels):
    print(f"Community {i}: Cluster {label}")