# Task 2

## Chapter 1: Initialisation and imports

In [5]:
from pyspark import SparkConf, SparkContext
import itertools
import time
from colorama import Fore

import warnings
warnings.filterwarnings('ignore')

## Chapter 2: Utility functions

### Mining based utility functions

In [6]:
def create_count_dict(items, support_value):
    item_counts = {}
    for item in items:
        if item not in item_counts.keys():
            item_counts[item] = 1
        else:
            if item_counts[item] < support_value:
                item_counts[item] = item_counts[item] + 1
    return item_counts


def create_count_dict_tuple(chunk, support_value, candidate_tuples):
    tuple_counts = {}
    for basket in chunk:
        for candidate in candidate_tuples:
            if set(candidate).issubset(basket):
                if candidate in tuple_counts and tuple_counts[candidate] < support_value:
                    tuple_counts[candidate] += 1
                elif candidate not in tuple_counts:
                    tuple_counts[candidate] = 1
    return tuple_counts


def filter_by_support(count_dict, support_value, is_tuple):
    frequent_candidates = []
    for candidate, count in count_dict.items():
        if count >= support_value:
            frequent_candidates.append(candidate)
            phase1_candidates.append((candidate if is_tuple else tuple({candidate}), 1))
    return frequent_candidates


def get_frequent_candidate_tuples(frequent_items, size):
    candidates = []
    for item_x in frequent_items:
        for item_y in frequent_items:
            combined_set = tuple(sorted(set(item_x + item_y)))
            if len(combined_set) == size:
                if combined_set not in candidates:
                    previous_candidates = list(itertools.combinations(combined_set, size - 1))
                    flag = True
                    for candidate in previous_candidates:
                        if candidate not in frequent_items:
                            flag = False
                            break
                    if flag:
                        candidates.append(combined_set)
    return candidates


def find_frequent_candidates(chunk, chunk_support, frequent_items, size):
    if size == 2:
        candidates = list(itertools.combinations(sorted(frequent_items), 2))
    else:
        candidates = get_frequent_candidate_tuples(frequent_items, size)
    return filter_by_support(create_count_dict_tuple(chunk, chunk_support, candidates), chunk_support, True)


def apriori(baskets):
    chunk = list(baskets)
    chunk_support = support * (len(chunk) / basket_count)
    items = []
    for basket in chunk:
        for item in basket:
            items.append(item)

    #frequent itemsets of size 1
    frequent_items = filter_by_support(create_count_dict(items, chunk_support), chunk_support, False)
    size = 2
    while len(frequent_items) != 0:
        frequent_items = find_frequent_candidates(chunk, chunk_support, frequent_items, size)
        size += 1

    return [phase1_candidates]

def son(candidate):
    frequent_itemset_count = {}
    for basket in basket_list:
        if isinstance(candidate, tuple):
            if set(candidate).issubset(basket):
                if candidate in frequent_itemset_count and frequent_itemset_count[candidate] < support:
                    frequent_itemset_count[candidate] += 1
                elif candidate not in frequent_itemset_count:
                    frequent_itemset_count[candidate] = 1
        else:
            frequent_itemset_count = create_count_dict(basket, support)

    frequent_itemsets_actual = []
    for itemset, count in frequent_itemset_count.items():
        frequent_itemsets_actual.append((itemset, count))
    return frequent_itemsets_actual

### I/O based utility functions

In [7]:
def get_required_bucket(row):
    line = row.split(",")
    return line[0], line[1]


def write_to_file(candidates, frequent_itemsets, output_file):
    with open(output_file, 'w') as file:
        file.write("Candidates:\n" + candidates + "\n\nFrequent Itemsets:\n" + frequent_itemsets)


def get_output_string(input_tuple, size, output):
    if len(input_tuple) > size:
        output = output[:-1] + "\n\n"

    output = output + "('" + str(input_tuple[0]) + "')," if len(input_tuple) == 1 else output + str(input_tuple) + ","
    return output


def get_output(phase_rdd, is_phase_2):
    size = 1
    output = ""
    sorted_list = sorted(sorted(phase_rdd if is_phase_2 else phase_rdd.map(lambda x: x[0]).collect()), key=len)
    for x in sorted_list:
        output = get_output_string(x, size, output)
        size = len(x)
    return output

## Chapter 3: Datasets chosen

In [8]:
dataset_metadata = {
    'amazon-reviews': {
        'path': './data/amazon-reviews/transactions3.csv',
        'output_dir': './logs/amazon-reviews/',
        'minsup': [23305530],
        'column_names': 'reviewerId,itemId'
    },
    
    
    'groceries': {
        'path': './data/groceries/transactions.csv',
        'output_dir': './logs/groceries/',
        'column_names': 'userId,itemName',
        'minsup': [250, 200, 150, 100, 50],
    },
    
    
    'movielens': {
        'path': './data/movielens/transactions.csv',
        'output_dir': './logs/movielens/',
        'column_names': 'userId,movieId',
        'minsup': [250, 200, 150, 100, 50],
    }
}

In [9]:
dataset_name = 'amazon-reviews'

In [10]:
input_filepath = dataset_metadata[dataset_name]['path']
output_dir = dataset_metadata[dataset_name]['output_dir']
minsup_values_list = dataset_metadata[dataset_name]['minsup']
column_names = dataset_metadata[dataset_name]['column_names']

print(minsup_values_list)

[23305530]


In [11]:
start = time.time()

phase1_candidates = []
support = minsup_values_list[0]

print(Fore.WHITE + f'Running SON Apriori Algorithm on', Fore.GREEN + f'{dataset_name}', 
        Fore.WHITE + f'with minsup value: {support}\n')

SparkContext.setSystemProperty("spark.executor.memory", "20g")
SparkContext.setSystemProperty("spark.driver.memory", "20g")
spark_configuration = SparkConf().setAppName("cz4032_task2_v2").setMaster('local[*]')
sc = SparkContext(conf=spark_configuration)
sc.setLogLevel("ERROR")

try:
    rdd = sc.textFile(input_filepath)
    print('read file into rdd')

    basket_rdd = rdd.filter(
        lambda x: x != column_names).map(
            lambda x: get_required_bucket(x)
            ).groupByKey().mapValues(set).map(
                lambda x: x[1]
                ).persist()
        
    
    basket_list = basket_rdd.collect()
    
    basket_count = len(basket_list)
    
    phase1_map = basket_rdd.mapPartitions(apriori).flatMap(lambda x: x)
    
    phase1_reduce = phase1_map.reduceByKey(lambda x, y: x + y).persist()
    
    phase2_map = phase1_reduce.map(lambda x: son(x[0])).flatMap(lambda x: x)
    
    phase2_reduce = phase2_map.filter(lambda x: x[1] >= support).map(lambda x: x[0]).collect()
    
    phase1_output = get_output(phase1_reduce, False)[:-1]
    phase2_output = get_output(phase2_reduce, True)[:-1]

    sc.stop()
    
    basket_rdd.unpersist()
    phase1_map.unpersist()
    phase1_reduce.unpersist()
    phase2_map.unpersist()
    
    output_filepath = f'{output_dir}{dataset_name}_minsup_{support}.txt'
    write_to_file(phase1_output, phase2_output, output_filepath)
    
    print(Fore.BLUE + f'--'*30)
    print(Fore.WHITE + f"| MINSUP:",Fore.GREEN + f"{support}")
    print(Fore.WHITE + f"| Time taken:", Fore.GREEN + f"{round(time.time() - start, 1)} seconds.")
    print(Fore.WHITE + f"| Outputs saved to", Fore.GREEN +  f"{output_filepath}.")
    print(Fore.BLUE + f'--'*30)
    
except Exception as e:
    sc.stop()
    print(Fore.RED + f'Spark Server stopped due to exception.')
    print(Fore.RED + f'Error: {e}')

[37mRunning SON Apriori Algorithm on [32mamazon-reviews [37mwith minsup value: 23305530



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/20 06:41:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/20 06:41:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


read file into rdd


23/11/20 06:43:49 ERROR Executor: Exception in task 28.0 in stage 0.0 (TID 28)8]
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:776)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.writ

[31mSpark Server stopped due to exception.
[31mError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 0.0 failed 1 times, most recent failure: Lost task 31.0 in stage 0.0 (TID 31) (SCSEGPU-TC1.cm.cluster executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:776)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Ite

23/11/20 06:43:53 ERROR TaskSchedulerImpl: Exception in statusUpdate
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$Lambda$2080/1711697646@6c69ef83 rejected from java.util.concurrent.ThreadPoolExecutor@62ae1c7e[Shutting down, pool size = 4, active threads = 0, queued tasks = 0, completed tasks = 20]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.apache.spark.scheduler.TaskResultGetter.enqueueFailedTask(TaskResultGetter.scala:139)
	at org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:838)
	at org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:811)
	at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/FYP/siddhant005/.conda/envs/dam_proj2/lib/python3.12/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/FYP/siddhant005/.conda/envs/dam_proj2/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/FYP/siddhant005/.conda/envs/dam_proj2/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [8]:
start = time.time()

phase1_candidates = []
support = minsup_values_list[1]

print(Fore.WHITE + f'Running SON Apriori Algorithm on', Fore.GREEN + f'{dataset_name}', 
        Fore.WHITE + f'with minsup value: {support}\n')

SparkContext.setSystemProperty('spark.executor.memory', '15g')
spark_configuration = SparkConf().setAppName("cz4032_task2").setMaster('local[*]')
sc = SparkContext(conf=spark_configuration)
sc.setLogLevel("ERROR")

try:
    rdd = sc.textFile(input_filepath)
    

    basket_rdd = rdd.filter(
        lambda x: x != column_names).map(
            lambda x: get_required_bucket(x)
            ).groupByKey().mapValues(set).map(
                lambda x: x[1]
                ).persist()
        
    basket_list = basket_rdd.collect()
    basket_count = len(basket_list)
    
    phase1_map = basket_rdd.mapPartitions(apriori).flatMap(lambda x: x)
    phase1_reduce = phase1_map.reduceByKey(lambda x, y: x + y).persist()

    phase2_map = phase1_reduce.map(lambda x: son(x[0])).flatMap(lambda x: x)
    phase2_reduce = phase2_map.filter(lambda x: x[1] >= support).map(lambda x: x[0]).collect()

    phase1_output = get_output(phase1_reduce, False)[:-1]
    phase2_output = get_output(phase2_reduce, True)[:-1]

    sc.stop()
    
    output_filepath = f'{output_dir}{dataset_name}_minsup_{support}.txt'
    write_to_file(phase1_output, phase2_output, output_filepath)
    
    print(Fore.BLUE + f'--'*30)
    print(Fore.WHITE + f"| MINSUP:",Fore.GREEN + f"{support}")
    print(Fore.WHITE + f"| Time taken:", Fore.GREEN + f"{round(time.time() - start, 1)} seconds.")
    print(Fore.WHITE + f"| Outputs saved to", Fore.GREEN +  f"{output_filepath}.")
    print(Fore.BLUE + f'--'*30)
    
except Exception as e:
    sc.stop()
    print(Fore.RED + f'Spark Server stopped due to exception.')
    print(Fore.RED + f'Error: {e}')

[37mRunning SON Apriori Algorithm on [32mgroceries [37mwith minsup value: 200



                                                                                

[34m------------------------------------------------------------
[37m| MINSUP: [32m200
[37m| Time taken: [32m4.2 seconds.
[37m| Outputs saved to [32m./logs/groceries/groceries_minsup_200.txt.
[34m------------------------------------------------------------


In [9]:
start = time.time()

phase1_candidates = []
support = minsup_values_list[2]

print(Fore.WHITE + f'Running SON Apriori Algorithm on', Fore.GREEN + f'{dataset_name}', 
        Fore.WHITE + f'with minsup value: {support}\n')


spark_configuration = SparkConf().setAppName("cz4032_task2").setMaster('local[*]')
sc = SparkContext(conf=spark_configuration)
sc.setLogLevel("ERROR")

try:
    rdd = sc.textFile(input_filepath)
    

    basket_rdd = rdd.filter(
        lambda x: x != column_names).map(
            lambda x: get_required_bucket(x)
            ).groupByKey().mapValues(set).map(
                lambda x: x[1]
                ).persist()
        
    basket_list = basket_rdd.collect()
    basket_count = len(basket_list)
    
    phase1_map = basket_rdd.mapPartitions(apriori).flatMap(lambda x: x)
    phase1_reduce = phase1_map.reduceByKey(lambda x, y: x + y).persist()

    phase2_map = phase1_reduce.map(lambda x: son(x[0])).flatMap(lambda x: x)
    phase2_reduce = phase2_map.filter(lambda x: x[1] >= support).map(lambda x: x[0]).collect()

    phase1_output = get_output(phase1_reduce, False)[:-1]
    phase2_output = get_output(phase2_reduce, True)[:-1]

    sc.stop()
    
    output_filepath = f'{output_dir}{dataset_name}_minsup_{support}.txt'
    write_to_file(phase1_output, phase2_output, output_filepath)
    
    print(Fore.BLUE + f'--'*30)
    print(Fore.WHITE + f"| MINSUP:",Fore.GREEN + f"{support}")
    print(Fore.WHITE + f"| Time taken:", Fore.GREEN + f"{round(time.time() - start, 1)} seconds.")
    print(Fore.WHITE + f"| Outputs saved to", Fore.GREEN +  f"{output_filepath}.")
    print(Fore.BLUE + f'--'*30)
    
except Exception as e:
    sc.stop()
    print(Fore.RED + f'Spark Server stopped due to exception.')
    print(Fore.RED + f'Error: {e}')

[37mRunning SON Apriori Algorithm on [32mgroceries [37mwith minsup value: 150



                                                                                

[34m------------------------------------------------------------
[37m| MINSUP: [32m150
[37m| Time taken: [32m4.9 seconds.
[37m| Outputs saved to [32m./logs/groceries/groceries_minsup_150.txt.
[34m------------------------------------------------------------


In [10]:
start = time.time()

phase1_candidates = []
support = minsup_values_list[3]

print(Fore.WHITE + f'Running SON Apriori Algorithm on', Fore.GREEN + f'{dataset_name}', 
        Fore.WHITE + f'with minsup value: {support}\n')


spark_configuration = SparkConf().setAppName("cz4032_task2").setMaster('local[*]')
sc = SparkContext(conf=spark_configuration)
sc.setLogLevel("ERROR")

try:
    rdd = sc.textFile(input_filepath)
    

    basket_rdd = rdd.filter(
        lambda x: x != column_names).map(
            lambda x: get_required_bucket(x)
            ).groupByKey().mapValues(set).map(
                lambda x: x[1]
                ).persist()
        
    basket_list = basket_rdd.collect()
    basket_count = len(basket_list)
    
    phase1_map = basket_rdd.mapPartitions(apriori).flatMap(lambda x: x)
    phase1_reduce = phase1_map.reduceByKey(lambda x, y: x + y).persist()

    phase2_map = phase1_reduce.map(lambda x: son(x[0])).flatMap(lambda x: x)
    phase2_reduce = phase2_map.filter(lambda x: x[1] >= support).map(lambda x: x[0]).collect()

    phase1_output = get_output(phase1_reduce, False)[:-1]
    phase2_output = get_output(phase2_reduce, True)[:-1]

    sc.stop()
    
    output_filepath = f'{output_dir}{dataset_name}_minsup_{support}.txt'
    write_to_file(phase1_output, phase2_output, output_filepath)
    
    print(Fore.BLUE + f'--'*30)
    print(Fore.WHITE + f"| MINSUP:",Fore.GREEN + f"{support}")
    print(Fore.WHITE + f"| Time taken:", Fore.GREEN + f"{round(time.time() - start, 1)} seconds.")
    print(Fore.WHITE + f"| Outputs saved to", Fore.GREEN +  f"{output_filepath}.")
    print(Fore.BLUE + f'--'*30)
    
except Exception as e:
    sc.stop()
    print(Fore.RED + f'Spark Server stopped due to exception.')
    print(Fore.RED + f'Error: {e}')

[37mRunning SON Apriori Algorithm on [32mgroceries [37mwith minsup value: 100



                                                                                

[34m------------------------------------------------------------
[37m| MINSUP: [32m100
[37m| Time taken: [32m5.7 seconds.
[37m| Outputs saved to [32m./logs/groceries/groceries_minsup_100.txt.
[34m------------------------------------------------------------


In [11]:
start = time.time()

phase1_candidates = []
support = minsup_values_list[4]

print(Fore.WHITE + f'Running SON Apriori Algorithm on', Fore.GREEN + f'{dataset_name}', 
        Fore.WHITE + f'with minsup value: {support}\n')


spark_configuration = SparkConf().setAppName("cz4032_task2").setMaster('local[*]')
sc = SparkContext(conf=spark_configuration)
sc.setLogLevel("ERROR")

try:
    rdd = sc.textFile(input_filepath)
    

    basket_rdd = rdd.filter(
        lambda x: x != column_names).map(
            lambda x: get_required_bucket(x)
            ).groupByKey().mapValues(set).map(
                lambda x: x[1]
                ).persist()
        
    basket_list = basket_rdd.collect()
    basket_count = len(basket_list)
    
    phase1_map = basket_rdd.mapPartitions(apriori).flatMap(lambda x: x)
    phase1_reduce = phase1_map.reduceByKey(lambda x, y: x + y).persist()

    phase2_map = phase1_reduce.map(lambda x: son(x[0])).flatMap(lambda x: x)
    phase2_reduce = phase2_map.filter(lambda x: x[1] >= support).map(lambda x: x[0]).collect()

    phase1_output = get_output(phase1_reduce, False)[:-1]
    phase2_output = get_output(phase2_reduce, True)[:-1]

    sc.stop()
    
    output_filepath = f'{output_dir}{dataset_name}_minsup_{support}.txt'
    write_to_file(phase1_output, phase2_output, output_filepath)
    
    print(Fore.BLUE + f'--'*30)
    print(Fore.WHITE + f"| MINSUP:",Fore.GREEN + f"{support}")
    print(Fore.WHITE + f"| Time taken:", Fore.GREEN + f"{round(time.time() - start, 1)} seconds.")
    print(Fore.WHITE + f"| Outputs saved to", Fore.GREEN +  f"{output_filepath}.")
    print(Fore.BLUE + f'--'*30)
    
except Exception as e:
    sc.stop()
    print(Fore.RED + f'Spark Server stopped due to exception.')
    print(Fore.RED + f'Error: {e}')

[37mRunning SON Apriori Algorithm on [32mgroceries [37mwith minsup value: 50



[Stage 3:>                                                          (0 + 2) / 2]

                                                                                

[34m------------------------------------------------------------
[37m| MINSUP: [32m50
[37m| Time taken: [32m14.2 seconds.
[37m| Outputs saved to [32m./logs/groceries/groceries_minsup_50.txt.
[34m------------------------------------------------------------
