In [None]:
from pyspark import SparkContext, StorageLevel
import sys
import json
import csv
import itertools
from time import time
import math

In [None]:
# The following pararmeters are configurable and can be changed as per the business requirements
filter_threshold = 1
supportThreshold = 30
input_file_path = ".\preometheus-sample.csv"
output_file_path = ".\kiana-match-results.txt"
mapid_map_path = ".\macid-map-john.txt"

In [None]:
def process(entry):
    revisedEntries= entry[0].replace('\'', '').split(',')
    return (revisedEntries[1], str(revisedEntries[0]), str(revisedEntries[2]),str(revisedEntries[3]), str(revisedEntries[5]))

In [None]:
def convertValuesToTuple(entrySet):
    newEntrySet = []
    for entry in entrySet:
        newEntrySet += [(entry, 1)]
    return newEntrySet

In [None]:
def generate_k_candidates(k, current_candidate_set):

    new_k_candidate_set = set()
    current_candidate_list_flattened = frozenset(itertools.chain.from_iterable(current_candidate_set))
    new_candidate = frozenset()
    
    for old_candidate in current_candidate_set:    
        for single_item in current_candidate_list_flattened:
            if single_item not in old_candidate:
                new_candidate = frozenset(sorted(old_candidate.union(frozenset([single_item]))))               
                if len(new_candidate) == k:
                    k_minus_one_subsets = itertools.combinations(new_candidate, k-1)
                    is_valid_candidate = True

                    for subset in k_minus_one_subsets:
                        subset_frozen = frozenset(subset)
                        if not subset_frozen in current_candidate_set:
                            is_valid_candidate = False
                            break
                            
                    if is_valid_candidate:
                        new_k_candidate_set.add(new_candidate)

    new_k_candidate_set = frozenset(sorted(new_k_candidate_set))
    return new_k_candidate_set


In [None]:
def check_and_generate_frequent_k_candidates(original_baskets, current_candidate_set, partition_support_threshold):
   
    current_k_frequent_candidates = {}
    current_k_frequents = set()
    
    for key, values in original_baskets.items():    
        basket_value_set = frozenset([values])
       
        for candidate in current_candidate_set:
            if candidate.issubset(values):
                if candidate in current_k_frequent_candidates.keys():
                    current_k_frequent_candidates[candidate] += 1
                else:
                    current_k_frequent_candidates.update({candidate:1})
                     
    for key, value in current_k_frequent_candidates.items():
        if value >= partition_support_threshold:
            current_k_frequents.add(key)
   
    current_k_frequents = frozenset(sorted(current_k_frequents))
    return current_k_frequents

In [None]:
def apriori_implementation(baskets, support_threshold, total_baskets_count):
    original_baskets = {}
    for key, values in baskets:
        original_baskets.update({key:frozenset(values)})

    partition_support_threshold = math.ceil((float(len(original_baskets))/total_baskets_count) * support_threshold)
    all_frequent_items_set ={}
    
    # calculate singleton matches
    single_frequent_items_candidates = {}
    single_frequent_items = set()
    for key, values in original_baskets.items():
        for val in values:
            if val in single_frequent_items_candidates.keys():
                single_frequent_items_candidates[val] += 1
            else:
                single_frequent_items_candidates.update({val:1})

    for key, value in single_frequent_items_candidates.items():
        if value >= partition_support_threshold:
            single_frequent_items.add(frozenset([key]))

    single_frequent_items = frozenset(sorted(single_frequent_items))
    all_frequent_items_set.update({1:single_frequent_items})

    current_candidate_set = single_frequent_items
    current_frequent_items = set()
   
    # calculate matches of length >1 to identify devices belonging to same person or a group of people walking together
    k=2
    
    while len(current_candidate_set) != 0 :
        current_candidate_set = generate_k_candidates(k, current_candidate_set)
        current_frequent_items = check_and_generate_frequent_k_candidates(original_baskets, current_candidate_set, partition_support_threshold)

        if len(current_frequent_items) != 0:
            all_frequent_items_set.update({k:current_frequent_items})

        k += 1
        current_candidate_set = current_frequent_items

    # writing all teh potential candidate pairs =, can be used to fine tune the results
    with open(output_file_path, "w+") as op:
        op.write("Candidates: " + '\n\n')
        
        for key, itemset in all_frequent_items_set.items():
            values = sorted([tuple(sorted(i)) for i in itemset])
            length = len(values)
            for index, tuple_to_write in enumerate(values):
                tuple_to_write_string = ''
                if key == 1:
                    tuple_to_write_string = '(\'' + tuple_to_write[0] + '\')'
                else:
                    tuple_to_write_string = str(tuple_to_write)

                if index != length - 1:
                    tuple_to_write_string += ','

                op.write(tuple_to_write_string)

            op.write("\n\n")

    return list(all_frequent_items_set.values())

In [None]:
def count_frequent_candidates(basket, candidate_list):
    frequent_candidate_counts = []
    for candidate in candidate_list:
        if candidate.issubset(basket):
            frequent_candidate_counts.append((candidate, 1))

    return frequent_candidate_counts


In [None]:
def son_implementation(basketsRdd, support_threshold, total_baskets_count):

    map_task_1 = basketsRdd\
        .mapPartitions(lambda entrysets: apriori_implementation(entrysets, support_threshold, total_baskets_count))\
        .map(lambda entry: (entry, 1))
    
    reduce_task_1 = map_task_1.reduceByKey(lambda x, y: x+y)\
        .map(lambda entry: entry[0])

    task1_candidates = reduce_task_1.collect()

    task1_candidates_broadcasted = sc.broadcast(task1_candidates).value
    task1_candidates_broadcasted = frozenset(itertools.chain.from_iterable(task1_candidates_broadcasted))

    map_task_2 = basketsRdd.flatMap(lambda entry: count_frequent_candidates(entry[1], task1_candidates_broadcasted))

    reduce_task_2 = map_task_2.reduceByKey(lambda x, y: x+y)

    frequent_itemsets = reduce_task_2.filter(lambda entry: entry[1] >= supportThreshold)\
        .map(lambda entry: (len(entry[0]), frozenset([entry[0]])))\
        .reduceByKey(lambda set1, set2: set1.union(set2)).sortByKey().collect()\

    with open(output_file_path, "a+") as op:
        op.write("Frequent Itemsets: " + '\n\n')
        for itemset in frequent_itemsets:
            values = sorted([tuple(sorted(i)) for i in itemset[1]])
            length = len(values)
            for index, tuple_to_write in enumerate(values):
                tuple_to_write_string = ''
                if itemset[0] == 1:
                    tuple_to_write_string = '(\'' + tuple_to_write[0] + '\')'
                else:
                    tuple_to_write_string = str(tuple_to_write)

                if index != length - 1:
                    tuple_to_write_string += ','

                op.write(tuple_to_write_string)

            op.write("\n\n")

    return frequent_itemsets

In [None]:
def main():
    result = {}
    SparkContext.setSystemProperty('spark.executor.memory', '8g')
    SparkContext.setSystemProperty('spark.driver.memory', '4g')
    sc = SparkContext('local[*]', 'kiana-son-local-task')

    global_macid_dict = {}
    global_macid_revers_dict = {}

    with open(macid_map_path, "r") as fp:
    global_macid_dict = json.load(fp)

    for key, val in global_macid_dict.items():
    global_macid_revers_dict[val] = key

    start = time()
    user_businessRdd = sc.textFile(input_file_path, 5).map(lambda entry: entry.split('\n')).map(lambda entry: process(entry))
    headers = user_businessRdd.take(1)
    finalRdd = user_businessRdd.filter(lambda entry: entry[0] != headers[0][0])


    finalRdd = finalRdd.map(lambda entry: ((entry[1]+entry[2]+entry[3]+entry[4]), entry[0]))\
    .groupByKey()\
    .mapValues(lambda entry: frozenset(entry)).distinct()
    .filter(lambda entry: len(entry[1]) > filter_threshold)
    .persist(StorageLevel.MEMORY_AND_DISK)


    baskets = finalRdd.collect()

    total_baskets_count = finalRdd.count()

    with open("./baskests_user_business.json", "w+") as f:
    for item in baskets:
        f.write(str(item))
        f.write("\n")
    f.close()


    results = son_implementation(finalRdd, supportThreshold, total_baskets_count)

    # writing the final results to the output file configured
    with open(output_file_path, "a+") as op:
        for itemset in results:
            values = sorted([tuple(sorted(i)) for i in itemset[1]])
            length = len(values)
            
            for index, tuple_to_write in enumerate(values):
                tuple_to_write_string = ''
                if itemset[0] == 1:
                    tuple_to_write_string = global_macid_revers_dict[int(tuple_to_write[0])]
                else:
                    for x in tuple_to_write[:-1]:
                        tuple_to_write_string += global_macid_revers_dict[int(x)] + ","
                    tuple_to_write_string += global_macid_revers_dict[int(tuple_to_write[-1])]

                op.write(tuple_to_write_string)
                op.write("\n")
                
    end = time()
    print("Duration to implement matching: " + str(end-start))

In [None]:
main()