# CS 5683 - Big Data Analytics
## Project - 1: Frequent itemset mining with *a priori algorithm*
### Suraj Pawar

In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 58kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 45.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=f71ab810bf155d0579f7296dbcb817d9e7f084208b0c19da571c7b50d0ecaa46
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# import libraries
import os
import shutil
import sys
from operator import add
import itertools
import numpy as np
from pyspark import SparkContext

# initialize the spark context
sc = SparkContext(appName="Spark Apriori")


In [None]:
DEBUG = 0 

if DEBUG:
  f_input = "/content/drive/My Drive/Coursework/CS 5683_Big_Data/Project-1/browsing_test.dat"
  # min_sup to frequency
  sup = 2
else:
  f_input = "/content/drive/My Drive/Coursework/CS 5683_Big_Data/Project-1/browsing.txt"
  # min_sup to frequency
  sup = 85

# import the data
data = sc.textFile(f_input)

# count the total number of baskets
print('Total number of baskets = ', data.count())

Total number of baskets =  31101


In [None]:
def indexassign(x):
  '''
  Assign the basket index to each item in the basket

  Input : x = tuple of the rdd (key, value)

  Output : li = Each element of the list is tuple (item, basket index)
  '''
  li = []
  for item in x[0]:
    li.append((item,x[1]))
  return li

# remove spaces and make each basket as an element of the rdd
itemset = data.map(lambda line: ([item for item in line.strip().split(' ')]))

# itemset_i rdd has each element as the key and the list of baskets indices in 
# which it is present as the value 
# zipWithIndex : Assign index of the basket as a value 
# map(indexassign) : map with indexassign function
# flatMap : flatten the basket list
# map: convert the value to list(value)
# reduceByKey: concatenate the value for each element
itemset_i = itemset.zipWithIndex().map(indexassign).flatMap(lambda x: x).map(lambda x: (x[0],[x[1]])).reduceByKey(lambda a,b: a+b)

# map the value to frozenset
itemset_i = itemset_i.map(lambda x: (x[0], frozenset(x[1])))

# convert the rdd to dictionary and broadcast it
item_dict = itemset_i.collectAsMap()
item_dict_ib = sc.broadcast(item_dict)

# print total number of unique elements
print('Total number of elements = ', itemset_i.count())

if DEBUG:
  for val in itemset_i.collect():
    print(val)


Total number of elements =  12592


***Finding frequent itemsets***

In [None]:
k = 1

# frequent_itemset stores the singeltons, pairs, triplets and so on for 
# postprocessing 
frequent_itemset = []

# generate the frequent singeltons
fi_1 = data.flatMap(lambda line: line.strip().split(' ')).map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).filter(lambda x: x[1]>=sup)

# for postprocessing
frequent_itemset.append(fi_1.collect())

# print the count of frequent singeltons
print(f'Total number of frequent items (k = {k}) : ', fi_1.count())

if DEBUG:
  for val in fi_1.collect():
    print(val)
  


Total number of frequent items (k = 1) :  735


***Finding subsequent itemsets in recursive manner***

In [None]:
def get_sup(x):
  '''
  Check if the itemset is frequent or not based on the support threshold

  Input : x = tuple of the rdd (key, value)
          item_dict_ib = broadcast dictionary
          sup = support theshold 

  Output: returns a tuple (itemset, support) 
  '''

  k = 0
  # loop through each item in the itemset
  for item in x:
  # for the first item, initilize the set
    if k == 0:
      s = item_dict_ib.value[item]
      k = k + 1
    else:
  # for subsequent items take the intersection to compute the common baskets
      s = s.intersection(item_dict_ib.value[item])
  
  # if the length of the set is more than the support, it means that the itemset
  # is is frequent
  if len(s) >= sup:
    return x,len(s)
  # if the support is less return an empty tuple
  else:
    return ()

In [None]:
def find_frequent_itemset(prev, frequent_items, sup, k, frequent_itemset):
  '''
  Find frequnt itemsets in a recursive manner
  Inputs : prev = previous frequent itemsets
           frequent_items = frequent singeltons
           sup = support threshold
           k = number of items in the itemset to be generated
           frequent_itemset = a list to store the frequent itemsets for 
                              postprocessing
  
  Output : Call the same function recursively until the stopping criteria
  '''
  
  # perform the cartesian operation to generate candidate sets
  # cartesian : generate the combinations
  # map : convert to frozenset
  # filter : remove itemsets if the element is repeated
  # distict : remove duplicate entries
  # map : cpnvert to a tuple
  if k == 2:
    fk_cartesian = prev.cartesian(frequent_items).map(lambda x: frozenset(x)).filter(lambda x: len(x)>k-1).distinct().map(lambda x: tuple(x))
  else:
    fk_cartesian = prev.cartesian(frequent_items).map(lambda x: frozenset(x[0]+tuple([x[1]]))).filter(lambda x: len(x)>k-1).distinct().map(lambda x: tuple(x))

  # print the number of candidate items
  print(f'Total number of candidate items (k = {k}) : ', fk_cartesian.count())

  # get the next frequent itemsets with k elements
  next = fk_cartesian.map(get_sup).filter(lambda x: x)

  # store the solution for postprocessing
  frequent_itemset.append(next.collect())

  # print the number of frequent itemsets
  print(f'Total number of frequent items (k = {k}) : ', next.count())

  # for the next itemset generation
  prev = next.map(lambda x: x[0])
  prev.persist()

  # increment k for next itemset generation
  k = k + 1

  # stop after quatruples
  if k == 5:    
    return 

  # call the same function 
  find_frequent_itemset(prev, frequent_items, sup, k, frequent_itemset)



In [None]:
# map : extract only keys of frequent singeltons
fi_1_k = fi_1.map(lambda x: x[0])
fi_1_k.persist()

# start the a priori algorithm
find_frequent_itemset(fi_1_k, fi_1_k, sup, 2, frequent_itemset)

Total number of candidate items (k = 2) :  269745
Total number of frequent items (k = 2) :  1778
Total number of candidate items (k = 3) :  1219350
Total number of frequent items (k = 3) :  365
Total number of candidate items (k = 4) :  262958
Total number of frequent items (k = 4) :  39


***Finding association rule***

In [None]:
def association_pair(x,dicta,kleft,cmin):
  '''
  Generate association rule for pairs
  Inputs : x = tuple of the rdd (key, value)
           dicta = dictionary of the previous frequnet itemsets (i.e., k = 1)
           kleft = number of elements on the left hand side of the association
                   rule
           cmin = minimum confidence
  
  Output : Return a tuple if the confidence of association is more than minimum
           threshold
  '''
  li = []
  for item in itertools.combinations(x[0],kleft):
  # compute the confidence
    confidence = x[1]/dicta[frozenset({item[0]})]
  # store the associated pair
    if confidence >= cmin:
      li.append((item, tuple(set(x[0])-set(item)), confidence*100))
  
  if li != []:
    return x,li
  else:
    return ()


def association_k(x,dicta,kleft,cmin):
  '''
  Generate association rule for k-items
  Inputs : x = tuple of the rdd (key, value)
           dicta = dictionary of the previous frequnet itemsets (i.e., k - 1)
           kleft = number of elements on the left hand side of the association
                   rule
           cmin = minimum confidence
  
  Output : Return a tuple if the confidence of association is more than minimum
           threshold
  '''
  li = []
  for item in itertools.combinations(x[0],kleft):
  # compute the confidence
    confidence = x[1]/dicta[frozenset(item)]
  # store the associated combindation for frequnet k-itemsets
    if confidence >= cmin:
      li.append((item, tuple(set(x[0])-set(item)), confidence*100))

  if li != []:
    return x,li
  else:
    return ()

frequent_singelton_rdd = sc.parallelize(frequent_itemset[0]).map(lambda x: (frozenset({x[0]}),x[1]))
frequent_singelton_dict = frequent_singelton_rdd.collectAsMap()

frequent_pair_rdd = sc.parallelize(frequent_itemset[1]).map(lambda x: (frozenset(x[0]),x[1]))
frequent_pair_dict = frequent_pair_rdd.collectAsMap()

frequent_triplet_rdd = sc.parallelize(frequent_itemset[2]).map(lambda x: (frozenset(x[0]),x[1]))
frequent_triplet_dict = frequent_triplet_rdd.collectAsMap()

frequent_quatruple_rdd = sc.parallelize(frequent_itemset[3])

frequent_pair_rdd_c = frequent_pair_rdd.map(lambda x : association_pair(x,frequent_singelton_dict,1,0.9)).filter(lambda x: x)
print('k = 2 with c > 90% = ', frequent_pair_rdd_c.count())

frequent_triplet_rdd_c = frequent_triplet_rdd.map(lambda x : association_k(x,frequent_pair_dict,2,0.9)).filter(lambda x: x)
print('k = 3 with c > 90% = ', frequent_triplet_rdd_c.count())

frequent_quatruple_rdd_c = frequent_quatruple_rdd.map(lambda x : association_k(x,frequent_triplet_dict,3,0.9)).filter(lambda x: x)
print('k = 4 with c > 90% = ', frequent_quatruple_rdd_c.count())

if DEBUG:
  for val in frequent_pair_rdd_c.collect():   
    print(val)

  for val in frequent_triplet_rdd_c.collect():   
    print(val)
  
  for val in frequent_quatruple_rdd_c.collect():   
    print(val)

k = 2 with c > 90% =  8
k = 3 with c > 90% =  46
k = 4 with c > 90% =  21


***Association rule for pairs***

In [None]:
# print results for association rules of frequent pairs
for val in frequent_pair_rdd_c.collect():
  print('(', val[1][0][0][0], ')', '-------------->', '(', val[1][0][1][0], ')', ' {:.2f}'.format(val[1][0][2]), '%')

( DAI93865 ) --------------> ( FRO40251 )  100.00 %
( DAI23334 ) --------------> ( DAI62779 )  95.45 %
( DAI88079 ) --------------> ( FRO40251 )  98.67 %
( DAI43868 ) --------------> ( SNA82528 )  97.30 %
( GRO38636 ) --------------> ( FRO40251 )  99.07 %
( ELE12951 ) --------------> ( FRO40251 )  99.06 %
( GRO85051 ) --------------> ( FRO40251 )  99.92 %
( FRO92469 ) --------------> ( FRO40251 )  98.35 %


***Association rule for triplets***

In [None]:
# print results for association rules of frequent triplets
for val in frequent_triplet_rdd_c.collect():
  print(val[1][0][0], '-------------->', '(', val[1][0][1][0], ')', ' {:.2f}'.format(val[1][0][2]), '%')

('SNA18336', 'GRO81087') --------------> ( DAI62779 )  93.14 %
('GRO85051', 'DAI55911') --------------> ( FRO40251 )  100.00 %
('FRO92469', 'DAI75645') --------------> ( FRO40251 )  98.05 %
('SNA18336', 'ELE17451') --------------> ( DAI62779 )  91.39 %
('SNA18336', 'DAI23334') --------------> ( DAI62779 )  100.00 %
('GRO85051', 'ELE74009') --------------> ( FRO40251 )  100.00 %
('DAI88079', 'DAI75645') --------------> ( FRO40251 )  99.33 %
('SNA55762', 'GRO85051') --------------> ( FRO40251 )  100.00 %
('DAI31081', 'GRO85051') --------------> ( FRO40251 )  100.00 %
('GRO85051', 'DAI42493') --------------> ( FRO40251 )  98.91 %
('FRO92469', 'ELE20847') --------------> ( FRO40251 )  100.00 %
('ELE92920', 'SNA90094') --------------> ( DAI62779 )  95.65 %
('GRO85051', 'GRO73461') --------------> ( FRO40251 )  100.00 %
('GRO85051', 'ELE26917') --------------> ( FRO40251 )  100.00 %
('GRO85051', 'DAI83948') --------------> ( FRO40251 )  100.00 %
('DAI55148', 'FRO92469') --------------> ( FRO

***Association rule for quadruples***

In [None]:
# print results for association rules of frequent quadruples
for val in frequent_quatruple_rdd_c.collect():
  print(val[1][0][0], '-------------->', '(', val[1][0][1][0], ')', ' {:.2f}'.format(val[1][0][2]), '%')

('GRO85051', 'SNA80324', 'ELE17451') --------------> ( FRO40251 )  100.00 %
('GRO85051', 'SNA80324', 'DAI62779') --------------> ( FRO40251 )  100.00 %
('SNA18336', 'ELE92920', 'ELE17451') --------------> ( DAI62779 )  100.00 %
('FRO92469', 'DAI75645', 'SNA80324') --------------> ( FRO40251 )  97.48 %
('FRO92469', 'SNA80324', 'DAI62779') --------------> ( FRO40251 )  96.52 %
('GRO85051', 'SNA80324', 'ELE17451') --------------> ( DAI62779 )  93.04 %
('SNA53220', 'SNA93860', 'FRO19221') --------------> ( DAI62779 )  95.61 %
('SNA18336', 'DAI85309', 'ELE17451') --------------> ( DAI62779 )  95.80 %
('GRO85051', 'DAI75645', 'SNA80324') --------------> ( FRO40251 )  100.00 %
('FRO85978', 'ELE59028', 'SNA93860') --------------> ( DAI62779 )  96.72 %
('SNA18336', 'ELE92920', 'GRO81087') --------------> ( DAI62779 )  100.00 %
('GRO85051', 'DAI62779', 'ELE17451') --------------> ( FRO40251 )  100.00 %
('SNA18336', 'ELE92920', 'DAI85309') --------------> ( DAI62779 )  96.40 %
('SNA18336', 'ELE92

# Due Date: Sept. 17 at 11:59pm