In [1]:
#pip install pyspark

In [2]:
from pyspark import SparkContext
import networkx as nx
import networkx as nx
import numpy as np
from scipy.sparse import lil_matrix
from sklearn.decomposition import NMF

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Task 1: Graph Construction (30 points)

In [4]:
def create_adjacency_graph(user_edges_rdd):
    rdd_1 = user_edges_rdd.groupByKey().mapValues(set)
    rdd_2 = user_edges_rdd.map(lambda x: (x[1], x[0])).groupByKey().mapValues(set)
    return rdd_1.union(rdd_2).reduceByKey(lambda x, y: x.union(y))

from itertools import combinations


def main():

  threshold = 4 # the filter threshold to generate edges between user nodes. If threshold is 2, two nodes are connected if they both review at least two same businesses

  sc = SparkContext.getOrCreate()
  rdd = sc.textFile("/content/drive/MyDrive/big data -3/sample data.txt")
  N = rdd.count()  # N is the number of rows (lines) in the textfile
  print("number of rows: ", N)
  rdd = rdd.map(lambda line: line.split(","))
  print(rdd.take(2))

  # find (user_id, {business1, business2, ...}) and save them in users_rdd:
  users_rdd = rdd.map(lambda x: (x[0], x[1])).groupByKey().mapValues(set)
  users_rdd.persist()
  print("users_rdd take 1:", users_rdd.take(1)) # key is user_id, value is a set of all businesses that this user reviewed

  # create users_dict dictionary (key, value) for fast search, since keys in dict are hashed
  users_dict = dict(users_rdd.collect())

  # get user_id information only:
  distinct_users = users_rdd.keys().collect()

  print("rdd: number of distinct users:", users_rdd.count())
  print("dict: number of distinct users:", len(users_dict))

  # let us create edges between users:
  original_user_edges_list = []  # each item in original_user_edges_list is en edge connecting two users
  for temp_user in combinations(distinct_users, 2): # for every two userIDs, check if the number of common businesses they reviewed is >= threshold
    if len(users_dict[temp_user[0]].intersection(users_dict[temp_user[1]])) >= threshold:
      edge_sort_key = tuple(sorted([temp_user[0], temp_user[1]]))
      original_user_edges_list.append(edge_sort_key)

  # each item in user_edges_rdd is en edge connecting two users
  user_edges_rdd = sc.parallelize(original_user_edges_list).persist()
  print("user_edges_rdd take 2", user_edges_rdd.take(2))

  # based on edges, we create an adjacency graph:
  user_adjacency_rdd = create_adjacency_graph(user_edges_rdd) # each item in user_adjacency_rdd is (userID, a set of neighbors)
  print("user_adjacency_rdd take 2", user_adjacency_rdd.take(2))

  user_adjacency_dict = user_adjacency_rdd.collectAsMap()  # collectAsMap: return the key-value pairs in this RDD to the master as a dictionary.

  node_degrees_dict = user_adjacency_rdd.map(lambda x: (x[0], len(x[1]))).collectAsMap() # collectAsMap: return the key-value pairs in this RDD to the master as a dictionary.

  print("node_degrees_dict", node_degrees_dict)  # each item in this dict is 'userID' : node-degree

  # now we have a graph, saved in user_edges_rdd

  sc.stop()




# call main() method
main()

number of rows:  38649
[['user_id', 'business_id'], ['---1lKK3aKOuomHnwAkAow', 'uhb7nSskHRm5RFbGkGlGwQ']]
users_rdd take 1: [('user_id', {'business_id'})]
rdd: number of distinct users: 3375
dict: number of distinct users: 3375
user_edges_rdd take 2 [('---1lKK3aKOuomHnwAkAow', '--2vR0DIsmQ6WfcSzKWigw'), ('---1lKK3aKOuomHnwAkAow', '--YhjyV-ce1nFLYxP49C5A')]
user_adjacency_rdd take 2 [('---1lKK3aKOuomHnwAkAow', {'AuvaQaRHBowFaFtoEMqkVg', '-Anyb0vB5LrW273whytNRw', '-KpEgEen1tj-jdjIS7uVOw', 'FAf4gaxpWe2dM4ehN3NtbQ', '1Vh2juPPQAw-EYFnulb7cQ', '0kVTI6YwlwU-lfQ-Fvk5yg', '0sOleKBI26BKfpW0SEG6Fw', '-7bM_DeL2Kj2CuYuVDsLNg', 'FmtnXwc_eyvpIv1Q1-bVCQ', 'AH3OZj4glpss6xXpOEHF7Q', '-shHTy1CEmSMPVSehdASaw', '-9b4s874f_CnznTu4JorRg', '6gaxwVD2d1C0_qIBOkDkJg', 'OdhUSNW6TIeYw9rIM1CDUA', 'iH5YQPtx1qrNG4t2-DjI3Q', '5ODmmPtp1EnKRedWFgeyKA', 'C4KfHQbd-cP9SyvgJYNY1w', '-kMn9KvDYmTMeroUf3nVfg', '1J-CWvcyB8FDmyZ_GEmvpw', '-k5yFUChotBMGQgHdFZvLQ', '1GbtKqRpDAfv13fUYIBBmA', '1HJK0w8v_zYMWlmcdeIv3w', '7cgp0EXG11v-y

## Task 2: Implement BigCLAM to detect communities (50 points)

In [5]:
def load_data(file_path):
    # Loading the Input dataset
    with open(file_path, 'r') as file:
        lines = file.readlines()

    # Create a directed graph from the data
    g = nx.DiGraph()
    for line in lines:
        user_id, business_id = line.strip().split(',')
        g.add_edge(user_id, business_id)

    return g

def create_adjacency_matrix(graph):
    # Creating an adjacency matrix from the graph
    nodes = list(graph.nodes())
    node_index = {node: i for i, node in enumerate(nodes)}

    adjacency_matrix = lil_matrix((len(nodes), len(nodes)), dtype=int)
    for edge in graph.edges():
        u, v = edge
        adjacency_matrix[node_index[u], node_index[v]] = 1

    return adjacency_matrix, node_index

def bigclam(adjacency_matrix, number_communities):
    # Apply BigCLAM algorithm using NMF
    model = NMF(n_components=number_communities, init='random', random_state=15)
    factors = model.fit_transform(adjacency_matrix)

    return factors

def detect_communities(graph, number_communities):
    adjacency_matrix, node_index = create_adjacency_matrix(graph)

    # Run BigCLAM algorithm
    factors = bigclam(adjacency_matrix, number_communities)

    # Extract communities based on the learned factors
    communities = {i: [] for i in range(number_communities)}
    for i, node in enumerate(graph.nodes()):
        community_memberships = np.where(factors[i] > 0.5)[0]
        for community_id in community_memberships:
            communities[community_id].append(node)

    return communities
# load Input dataset
file_path = "/content/drive/MyDrive/big data -3/sample data.txt"
graph = load_data(file_path)

# detect communities
number_communities = 5
communities = detect_communities(graph, number_communities)

# Print the detected communities
for community_id, members in communities.items():
    print(f"Community {community_id + 1}: {members}")


Community 1: ['---1lKK3aKOuomHnwAkAow', '--YhjyV-ce1nFLYxP49C5A', '-p2ISrtcOFS87T2pkyIaNw', '-sby2p3gq-Ou0kBv6FsoXA', '0nqshyLgABOSyTfJUTthjQ', '0xRAAStEi_dBFLD-1xSMHQ', '1QABw9xW72L-3EkGmIllaQ', '1Vh2juPPQAw-EYFnulb7cQ', '29DEnpFQ5rL2NTmpXQJfLQ', '2N692MSW2fVTGiWB-793FQ', '3EN4rQgpR8cR1NxuJNzi9Q', '3HR86J_kKOMpS4ad4Lu5yg', '3fhIgNqd1jq_4sTWwA79Xg', '3kKfcfYKpNjQAOhhB5l7Vw', '4D6LLuJfao_eHGA6XZR-bA', '4FnosmTZU4Kq3110Ykt7ZA', '4yG4J05aKzE2zov0Jr37kg', '62OHTL6ZocdL4hcyvW09sA', '7O1_x_3IlgjpY9BkiZ16xQ', '8YaFdh5fGg-M6KPIHv7ivw', '8apKnC2iVfWnfB4cIgUBMw', '8b8tlDCc2HE9G7rLEMF2lg', 'BE4fE4R3TaVn8xy4sYYjbg', 'Bbar1EB4JEviStEAqUEFOg', 'C9Xy03vm_oJ-vWzn-8TB5w', 'D8PJTlkMlwAyDjow3pup_Q', 'GYz2-BBolFD5h_D6S5gi3w', 'H5U7NJWU7pA3-xLQg70OFw', 'ITa3vh5ERI90G_WP4SmGUQ', 'IUodcXlyXaYxGuC8kojtlA', 'Ic6-gs1_FjrWGx6JIr95Mw', 'J_qauqGJ3DQ_FZwSqvTPdw', 'JeERFBZyECRwzscG3dvKQQ', 'K6i3CeTT5NCScDns4BSITA', 'KOWl_ZhqhYqY33-CbaJubw', 'Kib7CLruGTMwG57uRWCPHQ', 'LmWO_u9v7e1vMJlNykjUoA', 'LtVQD4Axwr21Aw2eR-tkJg'

## Task 3: Finding the optimal number of communities (20 points)


In [6]:
# Calculates the modularity of the  graph and communities
def calculate_modularity(graph, communities):
    modularity = 0.0
    for community_id, members in communities.items():
        # Determine the number of community edges.
        num_intra_community_edges = 0
        for u, v in graph.edges():
            if u in members and v in members:
                num_intra_community_edges += 1

        # Determine how many edges should exist inside the community depending on the expected degree distribution
        expected_num_intra_community_edges = 0.0
        for u in members:
            expected_num_intra_community_edges += graph.degree(u) * sum(graph.degree(v) for v in members) / (2 * graph.size())

        # Determine the community's contribution to modularity
        modularity_contribution = (num_intra_community_edges - expected_num_intra_community_edges) / (2 * graph.size())

        # To get the total modularity, add the contribution to modularity
        modularity += modularity_contribution

    return modularity

# load Input dataset
file_path = "/content/drive/MyDrive/big data -3/sample data.txt"
graph = load_data(file_path)

# Try different numbers of communities
modularity_values = {}
for num_communities in range(2, 10):
    communities = detect_communities(graph, num_communities)
    modularity = calculate_modularity(graph, communities)
    modularity_values[num_communities] = modularity

# Find the number of communities with the highest modularity
optimal_num_communities = max(modularity_values.keys(), key=modularity_values.get)

# Display the optimal number of communities
print("The optimal number of communities is:", optimal_num_communities)

The optimal number of communities is: 2
