In [2]:
from pyspark import SparkContext, StorageLevel
from pyspark.sql import SparkSession
import sys
import json
import csv
import itertools
from time import time
import math
import random
import os

In [3]:
def process(entry):
    revisedEntries= entry[0].split(',')
    return (revisedEntries[0], revisedEntries[1])

In [4]:
def convertValuesToTuple(entrySet):
    newEntrySet = []
    for entry in entrySet:
        newEntrySet += [(entry, 1)]
    return newEntrySet

In [5]:
def generate_community_users(user_business_map, filter_threshold):
    nearby_users_map = {}
    users = user_business_map.keys()

    for u1 in users:
        related_users = set()
        for u2 in users:
            if u1 != u2:
                u1_businesses = set(user_business_map.get(u1))
                u2_businesses = set(user_business_map.get(u2))
                common_businesses = u1_businesses.intersection(u2_businesses)

                if len(common_businesses) >= filter_threshold:
                    related_users.add(u2)
        if len(related_users) > 0:
            nearby_users_map.update({u1:related_users})

    return nearby_users_map

In [6]:
def generate_betweenness_map(root, nearby_users_map):

    # final result
    edges_betweenness_map = {}

    # map for all nodes at each level
    level_nodes_map = {}

    child_parents_map = {}
    current_level = 0

    current_child_nodes = set([root])
    level_nodes_map.update({root:current_level})

    while len(current_child_nodes) > 0:

        current_level += 1

        # children for next iteration
        children_for_next_iter = set()

        for child in current_child_nodes:
            current_grandchildren = nearby_users_map.get(child)

            for grandchild in current_grandchildren:
                if not level_nodes_map.__contains__(grandchild):
                    level_nodes_map.update({grandchild:current_level})
                    child_parents_map.update({grandchild:set([child])})
                    children_for_next_iter.add(grandchild)
                elif level_nodes_map.get(grandchild) == level_nodes_map.get(child)+1:
                    child_parents_map.get(grandchild).add(child)

        current_child_nodes = children_for_next_iter.difference(set([root]))

    all_nodes = set(level_nodes_map.keys())
    node_partial_credits_map = {}


    for node in all_nodes:
        node_partial_credits_map.update({node:1.0})

    while current_level > 0:
        current_level_nodes = [k for k,v in level_nodes_map.items() if v == current_level]

        for node in current_level_nodes:
            partial_credit = float(node_partial_credits_map.get(node))/len(child_parents_map.get(node))

            parents_of_current_node = child_parents_map.get(node)

            for parent in parents_of_current_node:
                updated_credit = node_partial_credits_map.get(parent) + partial_credit
                node_partial_credits_map.update({parent:updated_credit})

                edges_betweenness_map.update({(node, parent):partial_credit})

        current_level -= 1

    return edges_betweenness_map

In [7]:
def generate_betweenness_result(nearby_users_map):
    all_nodes = nearby_users_map.keys()
    users_betweenness_map = {}

    final_result = set()

    for root in all_nodes:
        edges_betweenness_map = generate_betweenness_map(root, nearby_users_map)
        for edge, betweennness in edges_betweenness_map.items():
            if edge in users_betweenness_map.keys():
                updated_betweenness = users_betweenness_map.get(edge)+ betweennness
                users_betweenness_map.update({edge:updated_betweenness})
            else:
                users_betweenness_map.update({edge:betweennness})

    for edge, betweennness in users_betweenness_map.items():
        final_result.add((tuple(sorted(list(edge))),float(betweennness)/2))

    final_users_betweenness_map = sc.parallelize(final_result)\
            .sortBy(lambda entry: (-entry[1], entry[0][0]))\
            .collectAsMap()

    return final_users_betweenness_map

In [8]:
def generate_adjacency_matrix(nearby_users_map):

    users = nearby_users_map.keys()
    adjacency_matrix = {}
    for u1 in users:
        for u2 in users:
            if set(nearby_users_map.get(u1)).__contains__(u2):
                adjacency_matrix.update({tuple(sorted([u1,u2])):1.0})
            else:
                adjacency_matrix.update({tuple(sorted([u1, u2])): 0.0})

    return adjacency_matrix

In [9]:
def generate_degree_matrix(nearby_users_map):

    users =  nearby_users_map.keys()
    degree_matrix = {}

    for u1 in users:
        degree_matrix.update({u1:len(nearby_users_map.get(u1))})

    return degree_matrix

In [10]:
def generate_user_clusters(graph, vertices):
    adjacent_users_map_copy = graph.copy()
    clusters = set()

    unique_users = set(graph.keys())

    for user in unique_users:
        current_members = set()
        current_cluster = set()

        if user in adjacent_users_map_copy.keys():
            current_members = adjacent_users_map_copy.get(user)
            current_cluster = set([user])

        while len(current_members) > 0:
            members_for_next_iteration = set()
            for current_member in current_members:
                current_cluster.add(current_member)
                if current_member in adjacent_users_map_copy.keys():
                    members_for_next_iteration = members_for_next_iteration.union(set(adjacent_users_map_copy.get(current_member)))

                    adjacent_users_map_copy.pop(current_member)

            current_members = members_for_next_iteration.difference(set([user]))
        if len(current_cluster) > 0:
            clusters.add(tuple(sorted(list(current_cluster))))

    return clusters

In [11]:
def run_girvan_newman(graph, adjacency_matrix, degree_matrix, m, users_betweenness_map, nearby_users_map, vertices):

    number_of_clusters = len(graph)
    clusters = graph

    # calculate Modularity Q based on formula:
    current_q = 0
    
    max_q = float('-inf')
    max_clusters = {}

    while users_betweenness_map:

        total_minus_expected = 0

        for cluster in clusters:
            cl = list(cluster)
            for u1 in cl:
                for u2 in cl:
                    if u1 < u2:
                        aij = adjacency_matrix.get((u1, u2))
                        ki = degree_matrix.get(u1)
                        kj = degree_matrix.get(u2)
                        mod_sum = aij - (float(ki * kj) / (2 * m))
                        total_minus_expected += mod_sum

        current_q = float(total_minus_expected) / (2 * m)
        if current_q > max_q:
            max_q = current_q
            max_clusters = clusters

        edges_to_drop = []
        max_betweenness = max(users_betweenness_map.values())
        for edge_to_drop in users_betweenness_map.keys():
            if users_betweenness_map.get(edge_to_drop) == max_betweenness:
                edges_to_drop.append(edge_to_drop)

                if degree_matrix.get(edge_to_drop[0]) > 0:
                    degree_matrix.update({edge_to_drop[0]: (degree_matrix.get(edge_to_drop[0]) - 1)})
                if degree_matrix.get(edge_to_drop[1]) > 0:
                    degree_matrix.update({edge_to_drop[1]: (degree_matrix.get(edge_to_drop[1]) - 1)})

                updated_u1 = set(nearby_users_map.get(edge_to_drop[0])).difference(set([edge_to_drop[1]]))
                nearby_users_map.update({edge_to_drop[0]: updated_u1})
                updated_u2 = set(nearby_users_map.get(edge_to_drop[1])).difference(set([edge_to_drop[0]]))
                nearby_users_map.update({edge_to_drop[1]: updated_u2})

        for edge in edges_to_drop:
            del users_betweenness_map[edge]

        clusters = generate_user_clusters(nearby_users_map, vertices)
    return max_clusters

In [12]:
if len(sys.argv) != 5:
    print("Usage: spark-submit	firstname_lastname_task2.py	 <filter_threshold> <input_file_path> <betweenness_output_file_path> <community_output_file_path>")
    exit(-1)
else:
    filter_threshold = int(sys.argv[1])
    input_file_path = sys.argv[2]
    betweenness_output_file_path = sys.argv[3]
    community_output_file_path = sys.argv[4]

Usage: spark-submit	firstname_lastname_task2.py	 <filter_threshold> <input_file_path> <betweenness_output_file_path> <community_output_file_path>


In [13]:
result = []
SparkContext.setSystemProperty('spark.executor.memory', '8g')
SparkContext.setSystemProperty('spark.driver.memory', '8g')
SparkContext.setSystemProperty('spark.sql.shuffle.partitions', '4')
sc = SparkContext('local[*]', 'task2')
ss = SparkSession(sc)

In [14]:
input_file_path = "/data/wossidia-edges.csv"
filter_threshold = 1

In [15]:
start = time()
user_businessRdd = sc.textFile(input_file_path).map(lambda entry: entry.split('\n')).map(lambda entry: process(entry))
headers = user_businessRdd.take(1)
finalRdd = user_businessRdd.filter(lambda entry: entry[0] != headers[0][0]).persist()

In [16]:
user_business_map = finalRdd\
    .groupByKey()\
    .mapValues(lambda entry: list(set(entry)))\
    .collectAsMap()

In [17]:
user_business_map

{'1': ['1'],
 '4': ['4'],
 '8': ['1'],
 '12': ['1'],
 '14': ['1'],
 '16': ['1'],
 '17': ['1'],
 '19': ['1'],
 '20': ['1'],
 '21': ['1'],
 '22': ['23'],
 '24': ['23'],
 '26': ['27'],
 '29': ['27'],
 '33': ['23'],
 '34': ['23'],
 '40': ['37'],
 '44': ['41'],
 '45': ['41'],
 '50': ['48'],
 '53': ['37'],
 '54': ['37'],
 '56': ['37'],
 '57': ['37'],
 '60': ['55'],
 '63': ['58'],
 '64': ['58'],
 '68': ['61'],
 '69': ['55'],
 '70': ['55'],
 '73': ['55'],
 '74': ['67'],
 '77': ['67'],
 '82': ['67'],
 '83': ['67'],
 '84': ['67'],
 '86': ['74'],
 '88': ['74'],
 '91': ['78'],
 '93': ['78'],
 '98': ['74'],
 '100': ['74'],
 '102': ['74'],
 '106': ['86'],
 '107': ['86'],
 '111': ['86'],
 '112': ['86'],
 '113': ['86'],
 '115': ['86'],
 '116': ['86'],
 '119': ['97'],
 '121': ['100'],
 '122': ['100'],
 '128': ['97'],
 '130': ['97'],
 '132': ['97'],
 '134': ['110'],
 '141': ['110'],
 '143': ['110'],
 '144': ['110'],
 '145': ['110'],
 '146': ['110'],
 '147': ['110'],
 '150': ['123'],
 '153': ['127'],
 '1

In [18]:
nearby_users_map = generate_community_users(user_business_map, filter_threshold)
print(nearby_users_map)
users_betweenness_map = generate_betweenness_result(nearby_users_map)

{'1': {'18', '17', '14', '3', '2', '20', '13', '21', '12', '8', '16', '15', '19'}, '4': {'136', '7', '5', '6'}, '8': {'18', '17', '14', '3', '2', '20', '13', '21', '12', '1', '16', '15', '19'}, '12': {'18', '14', '17', '3', '2', '20', '13', '21', '1', '8', '16', '15', '19'}, '14': {'18', '17', '3', '2', '20', '13', '21', '12', '1', '8', '16', '15', '19'}, '16': {'18', '14', '17', '3', '2', '13', '21', '12', '1', '8', '20', '15', '19'}, '17': {'18', '14', '3', '2', '20', '13', '21', '12', '1', '8', '16', '15', '19'}, '19': {'18', '14', '17', '3', '2', '20', '13', '12', '1', '8', '16', '15', '21'}, '20': {'18', '14', '17', '3', '2', '13', '21', '12', '1', '8', '16', '15', '19'}, '21': {'18', '14', '17', '3', '2', '20', '13', '12', '1', '8', '16', '15', '19'}, '22': {'24', '34', '30', '25', '36', '23', '35', '37', '33'}, '24': {'34', '30', '25', '36', '23', '22', '35', '37', '33'}, '26': {'28', '29', '27'}, '29': {'28', '26', '27'}, '33': {'24', '34', '30', '25', '36', '23', '22', '35', '

In [19]:
adjacency_matrix = generate_adjacency_matrix(nearby_users_map)

degree_matrix = generate_degree_matrix(nearby_users_map)

m = len(users_betweenness_map.keys())

clusters = generate_user_clusters(nearby_users_map, nearby_users_map.keys())

optimized_clusters = run_girvan_newman(clusters, adjacency_matrix, degree_matrix, m, users_betweenness_map, nearby_users_map, nearby_users_map.keys())


In [20]:
user_communities_rdd = sc.parallelize(optimized_clusters)\
    .map(lambda entry:(sorted(list(entry), key= lambda x: x[0]), len(entry)))\
    .sortBy(lambda entry: (entry[1], entry[0]))\
    .map(lambda entry: entry[0])

In [21]:
user_communities = user_communities_rdd.collect()

In [22]:
print(user_communities)

[['1063', '1064'], ['1156', '1157'], ['176', '177'], ['237', '238'], ['369', '370'], ['424', '425'], ['730', '731'], ['843', '844'], ['178', '179', '180'], ['50', '51', '52'], ['902', '903', '904'], ['1030', '1031', '1032', '1033'], ['1111', '1112', '1113', '1114'], ['1144', '1145', '1146', '1147'], ['1149', '1150', '1151', '1152'], ['120', '121', '122', '123'], ['153', '154', '155', '156'], ['169', '170', '171', '172'], ['189', '190', '191', '192'], ['217', '218', '219', '220'], ['258', '259', '260', '261'], ['26', '27', '28', '29'], ['278', '279', '280', '281'], ['283', '284', '285', '286'], ['305', '306', '307', '308'], ['362', '363', '364', '365'], ['387', '388', '389', '390'], ['405', '406', '407', '408'], ['42', '43', '44', '45'], ['450', '451', '452', '453'], ['466', '467', '468', '469'], ['471', '472', '473', '474'], ['498', '499', '500', '501'], ['503', '504', '505', '506'], ['520', '521', '522', '523'], ['538', '539', '540', '541'], ['554', '555', '556', '557'], ['570', '571'

In [23]:
def countList(lst):
    return len(lst)

In [24]:
print(countList(user_communities))

128


In [1]:
sc.stop()