# Data Cleaning

In [None]:
from pyspark.sql import SparkSession
import json
import pandas as pd

review_path = "yelp_dataset/review.json"
business_path = 'yelp_dataset/business.json'

ss = SparkSession \
    .builder \
    .appName('Son') \
    .master('local[*]') \
    .getOrCreate()

sc = ss.sparkContext

reviewRDD = sc.textFile(review_path)

review = reviewRDD.map(lambda line: json.loads(line)) \
    .map(lambda line: (line['user_id'], line['business_id'])) \
    .collect()

review_list = list(zip(*review))

businessRDD = sc.textFile(business_path)

business = businessRDD.map(lambda line: json.loads(line)) \
    .map(lambda line: (line['business_id'], line['state'])) \
    .collect()

business_list = list(zip(*business))

review_dict = {"user_id": review_list[0], "business_id": review_list[1]}
business_dict = {"business_id": business_list[0], "state": business_list[1]}

review_df = pd.DataFrame.from_dict(review_dict)
business_df = pd.DataFrame.from_dict(business_dict)

print(review_df.head())
print(business_df.head())

all_df = pd.merge(left=review_df, right=business_df, how='left', left_on=["business_id"], right_on=['business_id'])

print(all_df.head())

all_df_NV = all_df[all_df['state'] == 'NV']

print(all_df_NV.head())

del all_df_NV['state']

all_df_NV.to_csv('task2_data.csv', index=False)


# Helper Functions

In [None]:
from itertools import combinations
import itertools
import operator
from functools import reduce


def create_candidates(candidates_list, length):
    '''
    :param candidates_list:
    :param length:
    :return:
    '''
    for comb in combinations(candidates_list, 2):
        temp = set(comb[0]) | set(comb[1])
        if len(temp) == length:
            yield tuple(temp)


def frequent_items(dict, k, candidates, support):
    '''
    :param dict: {business_id: set(user_id)}
    :param k: int
    :param candidates: list[list]
    :param support: int
    :return: list[list]
    '''
    res = []
    for comb in create_candidates(candidates, k):
        if set(comb) not in res:
            temp = reduce(lambda a, b: a & b, (dict[x] for x in comb))
            if len(temp) >= support:
                res.append(set(comb))
    res = [list(comb) for comb in res]
    return res


def convert(line):
    '''
    :param line: (key, (values))
    :return: list[tuple(key, value)]
    '''
    res = []
    for business in line[1]:
        res.append((business, line[0]))
    return res


def accumulate(l):
    '''
    :param l: list[tuple((key, value))]
    :return:
    '''
    it = itertools.groupby(sorted(l), operator.itemgetter(0))
    res = {}
    for key, subiter in it:
        res[key] = {item[1] for item in subiter}
    return res


def Apriori(partition, support):
    '''
    :param partition: iterator
    :param support: int
    :return: tuple(tuple(int, tuple))
    '''

    temp_partition = list(partition)
    cand_1 = {}
    frequent = {}

    business = []

    for line in temp_partition:
        temp = convert(line)
        business.extend(temp)

    own_business_list = accumulate(business)

    for line in temp_partition:
        for item in line[1]:
            if item not in cand_1:
                cand_1[(item)] = 1
            else:
                cand_1[(item)] += 1

    frequent[1] = [[key] for key, value in cand_1.items() if value >= support]

    k = 2
    while 1:
        print("Creating Candidates and Frequent", k)
        temp_frequent = frequent_items(own_business_list, k, frequent[k - 1], support)
        if len(temp_frequent) == 0:
            break
        frequent[k] = temp_frequent
        k += 1
    res = [(key, {frozenset(pair) for pair in value}) for key, value in frequent.items()]

    return res

# def global_frequent(line, candidates):
#     '''
#     :param line: list
#     :param candidates: list[tuple]
#     :return: list[tuple]
#     '''
#     res = []
#     for candidate in candidates:
#         length_set = candidate[0]
#         items = list(candidate[1])
#         counter = {}
#         for item in items:
#             if set(item).issubset(line):
#                 if item not in counter:
#                     counter[item] = 1
#                 else:
#                     counter[item] += 1
#         res.append((length_set, counter))
#     return res


In [None]:
from pyspark.sql import SparkSession
import Apriori as A
from functools import reduce

ss = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('SON') \
    .getOrCreate()

sc = ss.sparkContext

path = "task2_data.csv"
# path = "small2.csv"

userRDD = sc.textFile(path)
header = userRDD.first()

support = 50
threshold = 70

createCombiner = (lambda line: [line])
mergeValue = (lambda exist, new: exist + [new])
mergeCombiner = (lambda exist1, exist2: exist1 + exist2)

userRDD = userRDD.filter(lambda line: line != header) \
    .map(lambda line: (line.split(',')[0], line.split(',')[1])) \
    .combineByKey(createCombiner, mergeValue, mergeCombiner) \
    .filter(lambda line: len(line[1]) >= threshold)

userRDD.foreach(print)

businessRDD = userRDD.flatMap(lambda line: A.convert(line)) \
    .groupByKey() \
    .mapValues(set)

business = businessRDD.collect()
businessdict = {item[0]: item[1] for item in business}

numOfPar = userRDD.getNumPartitions()

candidates = userRDD.mapPartitions(lambda partition: A.Apriori(partition, support / numOfPar)) \
    .reduceByKey(lambda a, b: a | b) \
    .collect()

candidates = sorted([(key, sorted([list(sets) for sets in value])) for key, value in candidates])

res = []
for cand in candidates:
    temp_1 = []

    if cand[0] == 1:
        for value in cand[1]:
            if len(businessdict[value[0]]) >= support:
                temp_1.append(value)
    else:
        for value in cand[1]:
            temp = reduce(lambda a, b: a & b, (businessdict[x] for x in value))
            if len(temp) >= support:
                temp_1.append(value)
    if len(temp_1) == 0:
        break
    res.append((cand[0], temp_1))

print(res)