# CIC - Parrallellization

- Rami Tarabishi | @r9119

## Imports

In [2]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
import random

In [3]:
import dask
import dask.dataframe as dd
import dask.array as da

## Data import

In [4]:
# import twitch graph from csv
twitch_gamers = pd.read_csv('./data/large_twitch_edges.csv')
node_attr = pd.read_csv('./data/large_twitch_features.csv')
 
# create graph
G = nx.from_pandas_edgelist(twitch_gamers, source='numeric_id_1', target='numeric_id_2')
 
# Normalize views as new attribute
node_attr['views_normalized'] = (node_attr['views'] - node_attr['views'].mean()) / node_attr['views'].std()
 
for _, row in node_attr.iterrows():
    node_id = row['numeric_id']
    attributes = row.drop('numeric_id').to_dict()
    G.nodes[node_id].update(attributes)

## Subgraph extraction/Graph reduction

In [5]:
np.random.seed(12345)

def sample_connected_subgraph(G, num_nodes):
    # Step 1: Select a random starting node
    start_node = np.random.choice(list(G.nodes))
    
    # Step 2: Perform BFS to collect nodes and edges
    bfs_edges = list(nx.bfs_edges(G, source=start_node))
    bfs_nodes = [start_node] + [v for u, v in bfs_edges]
    
    # If we collected enough nodes, create the subgraph
    if len(bfs_nodes) >= num_nodes:
        sub_nodes = bfs_nodes[:num_nodes]
        subgraph = G.subgraph(sub_nodes).copy()
    else:
        raise ValueError(f"Unable to find {num_nodes} connected nodes in the graph.")
    
    return subgraph
    
num_nodes = 10000

G_sampled = sample_connected_subgraph(G, num_nodes)

# Check the result
print("Is the subgraph connected?", nx.is_connected(G_sampled))

Is the subgraph connected? True


In [6]:
# Print the number of nodes and edges
print("Number of nodes:", G_sampled.number_of_nodes())
print("Number of edges:", G_sampled.number_of_edges())

Number of nodes: 10000
Number of edges: 205177


## Graph statistics/Analysis

In [27]:
# Calculate degree centrality of nodes
degree_centrality = nx.degree_centrality(G)
sorted_degree_centrality = dict(sorted(degree_centrality.items(), key=lambda item: item[1], reverse=True))

In [28]:
max_cent = max(sorted_degree_centrality.values())
print('Node with highest degree centrality:', max_cent)
node = [node_id for node_id in sorted_degree_centrality.keys() if sorted_degree_centrality[node_id] == max_cent][0]
print('Node id:', node)

Node with highest degree centrality: 0.2098528965636209
Node id: 61862


In [29]:
# Calculate the node degrees
node_degrees = [degree for node, degree in G.degree()]

# Calculate the diameter of the graph
diameter = nx.approximation.diameter(G)
print(f'The diameter of the graph is: {diameter}')

# Calculate the average degree of the graph
average_degree = np.mean(node_degrees)
print(f'The average degree of the graph is: {average_degree:.2f}')

# Calculate the average degree of the graph
median_degree = np.median(node_degrees)
print(f'The average degree of the graph is: {median_degree}')

# Calculate the mode of the degree distribution
mode_degree = max(set(node_degrees), key=node_degrees.count)
print(f'The mode of the degree distribution is: {mode_degree}')

The diameter of the graph is: 8
The average degree of the graph is: 80.87
The average degree of the graph is: 32.0
The mode of the degree distribution is: 1


In [30]:
clustering_coefficient = nx.approximation.average_clustering(G)
print(f'The clustering coefficient is: {clustering_coefficient}')

The clustering coefficient is: 0.163


In [30]:
def proximity_prestige(G):
    proximity_prestige_dict = {}
    
    # Calculate shortest path lengths between all pairs of nodes
    shortest_path_lengths = dict(nx.all_pairs_shortest_path_length(G))
    
    # Iterate over each node to calculate its Proximity Prestige
    for node in G.nodes():
        prestige_sum = 0
        
        for target, path_length in shortest_path_lengths[node].items():
            if node != target and path_length > 0:
                prestige_sum += 1 / path_length
        
        proximity_prestige_dict[node] = prestige_sum / (len(G.nodes()) - 1)
    
    return proximity_prestige_dict

proximity_prestige_scores = proximity_prestige(G_sampled)

top_proximity_prestige = sorted(proximity_prestige_scores.items(), key=lambda x: x[1], reverse=True)[:5]
print("Top 5 Knoten nach Proximity Prestige:")
for node, prestige in top_proximity_prestige:
    print(f"Knoten {node}: {prestige:.4f}")

Top 5 Knoten nach Proximity Prestige:
Knoten 7329: 0.9521
Knoten 125642: 0.7468
Knoten 61862: 0.7151
Knoten 110345: 0.7132
Knoten 32338: 0.7066


### Proximity prestige:

Time complexity: O(V^2 + VE) (V = number of vertices, E = number of edges)

Considering 10000 nodes took ~2 minutes, the full graph (~170000 nodes and 6 million edges) would take about 290 times longer, which is about 10 hours.

For this reason I will attempt to parallelize the computation, test it on 10000 nodes and if there is significant improvement let the full graph run.

In [8]:
import dask.bag as db

def compute_proximity_prestige_for_node(G, node, shortest_path_lengths):
    prestige_sum = 0
    for target, path_length in shortest_path_lengths[node].items():
        if node != target and path_length > 0:
            prestige_sum += 1 / path_length
    return node, prestige_sum / (len(G.nodes()) - 1)

def proximity_prestige_parallel(G, num_partitions):
    # Compute all pairs shortest path lengths
    shortest_path_lengths = dict(nx.all_pairs_shortest_path_length(G))
    
    # Create a Dask bag from the nodes
    nodes = list(G.nodes())
    node_bag = db.from_sequence(nodes, npartitions=num_partitions)
    
    # Map the computation across the nodes
    delayed_results = node_bag.map(lambda node: compute_proximity_prestige_for_node(G, node, shortest_path_lengths)).compute()
    
    # Combine results into a dictionary
    proximity_prestige_dict = dict(delayed_results)
    
    return proximity_prestige_dict

num_partitions = 16

# proximity_prestige_scores_parallel = proximity_prestige_parallel(G_sampled, num_partitions)

# top_proximity_prestige_parallel = sorted(proximity_prestige_scores_parallel.items(), key=lambda x: x[1], reverse=True)[:5]
# print("Top 5 Knoten nach Proximity Prestige (parallel):")
# for node, prestige in top_proximity_prestige_parallel:
#     print(f"Knoten {node}: {prestige:.4f}")

In [15]:
from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 52668 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:52668/status,

0,1
Dashboard: http://127.0.0.1:52668/status,Workers: 4
Total threads: 8,Total memory: 7.45 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:52671,Workers: 4
Dashboard: http://127.0.0.1:52668/status,Total threads: 8
Started: Just now,Total memory: 7.45 GiB

0,1
Comm: tcp://127.0.0.1:52691,Total threads: 2
Dashboard: http://127.0.0.1:52694/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:52674,
Local directory: C:\Users\rami0\AppData\Local\Temp\dask-scratch-space\worker-6wbg_i3n,Local directory: C:\Users\rami0\AppData\Local\Temp\dask-scratch-space\worker-6wbg_i3n

0,1
Comm: tcp://127.0.0.1:52699,Total threads: 2
Dashboard: http://127.0.0.1:52700/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:52676,
Local directory: C:\Users\rami0\AppData\Local\Temp\dask-scratch-space\worker-mx5l4o5c,Local directory: C:\Users\rami0\AppData\Local\Temp\dask-scratch-space\worker-mx5l4o5c

0,1
Comm: tcp://127.0.0.1:52696,Total threads: 2
Dashboard: http://127.0.0.1:52697/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:52678,
Local directory: C:\Users\rami0\AppData\Local\Temp\dask-scratch-space\worker-y3xqre0r,Local directory: C:\Users\rami0\AppData\Local\Temp\dask-scratch-space\worker-y3xqre0r

0,1
Comm: tcp://127.0.0.1:52690,Total threads: 2
Dashboard: http://127.0.0.1:52692/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:52680,
Local directory: C:\Users\rami0\AppData\Local\Temp\dask-scratch-space\worker-dctq9l6b,Local directory: C:\Users\rami0\AppData\Local\Temp\dask-scratch-space\worker-dctq9l6b


In [None]:
# Start up a worker

In [16]:
# exit the client
client.shutdown()

In [12]:
import time

def measure_performance(G, num_partitions):
    start_time = time.time()
    prestige = proximity_prestige_parallel(G, num_partitions)
    end_time = time.time()
    duration = end_time - start_time
    return duration, prestige

num_partitions = [1, 2, 4]

for i in num_partitions:
    duration, _ = measure_performance(G_sampled, i)
    print(f"Duration with {i} partitions: {duration:.2f} seconds")
    # Flush the Dask scheduler to release resources
    # dask.distributed.get_client().cancel(dask.distributed.get_client().futures)

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Duration with 1 partitions: 174.07 seconds
Duration with 2 partitions: 170.29 seconds
Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "C:\Users\rami0\AppData\Roaming\Python\Python311\site-packages\IPython\core\interactiveshell.py", line 3550, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\rami0\AppData\Local\Temp\ipykernel_13428\2249196446.py", line 13, in <module>
    duration, _ = measure_performance(G_sampled, i)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rami0\AppData\Local\Temp\ipykernel_13428\2249196446.py", line 5, in measure_performance
    prestige = proximity_prestige_parallel(G, num_partitions)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\rami0\AppData\Local\Temp\ipykernel_13428\3033319911.py", line 12, in proximity_prestige_parallel
    shortest_path_lengths = dict(nx.all_pairs_shortest_path_length(G))
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Python311\Lib\site-packages\networkx\algorithms\shortest_paths\unweighted.py", line 193,

## Influencer detection via community detection

In [16]:
from networkx.algorithms.community import greedy_modularity_communities

communities = greedy_modularity_communities(G_sampled)

In [17]:
sorted_communities = sorted([list(c) for c in communities], key=len, reverse=True)
top_3_communities = sorted_communities[:3]
print(f"Anzahl der erkannten Gemeinschaften: {len(communities)}")
print(f"Anzahl der Gemeinschaften mit mehr als einem Knoten: {len([c for c in communities if len(c) > 1])}")

Anzahl der erkannten Gemeinschaften: 15
Anzahl der Gemeinschaften mit mehr als einem Knoten: 15


In [41]:
community_results = []

for i, community in enumerate(top_3_communities):
    subgraph = G_sampled.subgraph(community)
    
    # Degree Centrality
    degree_centrality_3 = nx.degree_centrality(subgraph)
    top_3_degree = sorted(degree_centrality_3.items(), key=lambda x: x[1], reverse=True)[:3]
    
    # Proximity Prestige
    proximity_prestige_scores_3 = proximity_prestige(subgraph)
    top_3_prestige = sorted(proximity_prestige_scores_3.items(), key=lambda x: x[1], reverse=True)[:3]
    
    # PageRank
    pagerank_scores_3 = nx.pagerank(subgraph)
    top_3_pagerank = sorted(pagerank_scores_3.items(), key=lambda x: x[1], reverse=True)[:3]
    
    for metric, top_3 in zip(['Degree Centrality', 'Proximity Prestige', 'PageRank'], 
                             [top_3_degree, top_3_prestige, top_3_pagerank]):
        for node, score in top_3:
            community_results.append({
                'Community': f'Community {i + 1}',
                'Node': node,
                'Score': score,
                'Metric': metric
            })
    
    print(f"Community {i + 1} ({len(community)} nodes):")
    
    print("Top 3 Akteure nach Degree Centrality:")
    for node, centrality in top_3_degree:
        print(f"  Knoten {node}: Degree Centrality: {centrality:.4f}")
    
    print("Top 3 Akteure nach Proximity Prestige:")
    for node, prestige in top_3_prestige:
        print(f"  Knoten {node}: Proximity Prestige: {prestige:.4f}")
    
    print("Top 3 Akteure nach PageRank:")
    for node, rank in top_3_pagerank:
        print(f"  Knoten {node}: PageRank: {rank:.4f}")
    
    print("-" * 50)
    
df = pd.DataFrame(community_results)

Community 1 (3894 nodes):
Top 3 Akteure nach Degree Centrality:
  Knoten 32338: Degree Centrality: 0.5559
  Knoten 12134: Degree Centrality: 0.3085
  Knoten 123018: Degree Centrality: 0.3016
Top 3 Akteure nach Proximity Prestige:
  Knoten 32338: Proximity Prestige: 0.6620
  Knoten 16108: Proximity Prestige: 0.5658
  Knoten 42177: Proximity Prestige: 0.5629
Top 3 Akteure nach PageRank:
  Knoten 32338: PageRank: 0.0221
  Knoten 12134: PageRank: 0.0133
  Knoten 123018: PageRank: 0.0118
--------------------------------------------------
Community 2 (3539 nodes):
Top 3 Akteure nach Degree Centrality:
  Knoten 7329: Degree Centrality: 0.9446
  Knoten 52703: Degree Centrality: 0.3898
  Knoten 60588: Degree Centrality: 0.3708
Top 3 Akteure nach Proximity Prestige:
  Knoten 7329: Proximity Prestige: 0.9475
  Knoten 52703: Proximity Prestige: 0.6207
  Knoten 60588: Proximity Prestige: 0.6137
Top 3 Akteure nach PageRank:
  Knoten 7329: PageRank: 0.0338
  Knoten 60588: PageRank: 0.0108
  Knoten 52

In [43]:
def calculate_community_metrics(G, community):
    subgraph = G.subgraph(community)
    density = nx.density(subgraph)
    diameter = nx.diameter(subgraph) if nx.is_connected(subgraph) else float('inf')
    return density, diameter

def top_actors_in_all_centralities(G, communities, top_n=3):
    results = []

    for i, community in enumerate(communities):
        subgraph = G.subgraph(community)
        
        degree_centrality_com = nx.degree_centrality(subgraph)
        top_degree = sorted(degree_centrality_com.items(), key=lambda x: x[1], reverse=True)[:top_n]
        top_degree_nodes = {node for node, _ in top_degree}
        
        proximity_prestige_scores_com = proximity_prestige(subgraph)
        top_prestige = sorted(proximity_prestige_scores_com.items(), key=lambda x: x[1], reverse=True)[:top_n]
        top_prestige_nodes = {node for node, _ in top_prestige}
        
        pagerank_scores_com = nx.pagerank(subgraph)
        top_pagerank = sorted(pagerank_scores_com.items(), key=lambda x: x[1], reverse=True)[:top_n]
        top_pagerank_nodes = {node for node, _ in top_pagerank}
        
        # Schnittmenge der Top-Nodes
        top_common_nodes = top_degree_nodes & top_prestige_nodes & top_pagerank_nodes
        
        density = nx.density(subgraph)
        
        results.append((i + 1, list(top_common_nodes), density))
    
    return results

top_10_communities = sorted_communities[:10]
results = top_actors_in_all_centralities(G_sampled, top_10_communities)

for community_id, actors, density in results:
    print(f"Community {community_id} ({len(top_10_communities[community_id - 1])} Knoten):")
    if actors:
        for actor in actors:
            print(f"  Knoten {actor}")
    else:
        print("  Keine Akteure, die in den Top 3 aller Zentralitätsmetriken sind.")
    print(f"Dichte = {density:.4f}")
    print("-" * 50)

Community 1 (3894 Knoten):
  Knoten 32338
Dichte = 0.0061
--------------------------------------------------
Community 2 (3539 Knoten):
  Knoten 7329
  Knoten 60588
  Knoten 52703
Dichte = 0.0093
--------------------------------------------------
Community 3 (2375 Knoten):
  Knoten 110345
  Knoten 125642
  Knoten 61862
Dichte = 0.0046
--------------------------------------------------
Community 4 (152 Knoten):
  Knoten 37637
Dichte = 0.0140
--------------------------------------------------
Community 5 (7 Knoten):
  Knoten 131881
  Knoten 54421
Dichte = 0.3333
--------------------------------------------------
Community 6 (7 Knoten):
  Knoten 25376
  Knoten 76730
Dichte = 0.3333
--------------------------------------------------
Community 7 (6 Knoten):
  Knoten 98446
  Knoten 167238
Dichte = 0.3333
--------------------------------------------------
Community 8 (4 Knoten):
  Knoten 161384
  Knoten 117418
  Knoten 108618
Dichte = 0.5000
--------------------------------------------------


## Influencer detection via greedy search

In [52]:
# import heapq

# def independent_cascade(G, seeds, prob_func, steps=10):
#     active = set(seeds)
#     new_active = set(seeds)
    
#     for _ in range(steps):
#         next_active = set()
        
#         for node in new_active:
#             for neighbor in G.neighbors(node):
#                 if neighbor not in active:
#                     influence_prob = prob_func(G.nodes[node]['views_normalized'], G.nodes[neighbor]['views_normalized'])
#                     if random.random() < influence_prob:
#                         next_active.add(neighbor)
        
#         new_active = next_active
#         active.update(new_active)
        
#         if not new_active:
#             break
    
#     return active

# def spread(G, seeds, prob_func, steps=10, num_simulations=100):
#     spread_count = 0
#     for _ in range(num_simulations):
#         spread_count += len(independent_cascade(G, seeds, prob_func, steps))
#     return spread_count / num_simulations

# def celf_algorithm(G, K, prob_func, steps=10, num_simulations=100):
#     marg_gain = []
#     for node in G.nodes:
#         marg_gain.append((spread(G, [node], prob_func, steps, num_simulations), node))
#     marg_gain.sort(reverse=True)
    
#     S = []
#     for _ in range(K):
#         while marg_gain:
#             spread_gain, node = marg_gain[0]
#             if node not in S:
#                 current_spread = spread(G, S, prob_func, steps, num_simulations)
#                 marginal_gain = spread(G, S + [node], prob_func, steps, num_simulations) - current_spread
#                 heapq.heapreplace(marg_gain, (marginal_gain, node))
#                 marg_gain.sort(reverse=True)
#                 break
#             else:
#                 heapq.heappop(marg_gain)
#         S.append(marg_gain[0][1])
#         heapq.heappop(marg_gain)
#     return S

# # Define the influence probability function
# def prob_func(influence_score_node, influence_score_neighbor):
#     return min(1.0, influence_score_node * 0.1)

# # Number of most influential nodes to find
# K = 5

# # Run the CELF algorithm to find the top K influential nodes
# top_k_nodes = celf_algorithm(G_sampled, K, prob_func, steps=10, num_simulations=100)

# print("Top K influential nodes:", top_k_nodes)
# # Top K influential nodes: [120668, 161062, 160608, 153473, 42177] - 20k nodes

Top K influential nodes: [4318, 88732, 139980, 21913, 48744]


Der Greedy Search oben braucht über 5 Stunden (20000 knoten) und ~50 minuten (10000 knoten), um ausgeführt zu werden, daher ist er auskommentiert. Die Ergebnisse sind unten aufgeführt.

In [3]:
# Top K influential nodes: [120668, 161062, 160608, 153473, 42177] 20k nodes
# top_k_nodes = [120668, 161062, 160608, 153473, 42177]
# Top K influential nodes: [4318, 88732, 139980, 21913, 48744] 10k nodes
# top_k_nodes = [4318, 88732, 139980, 21913, 48744]