<a href="https://colab.research.google.com/github/sapienhwaker/A-Priori-Algorithm/blob/main/A_Priori.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Project-1: Distributed Association Rule Mining
### Prasad Hajare (A20232707)

In [None]:
!pip install pyspark



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import re
from pyspark import SparkContext

sc = SparkContext("local", "Distributed Association Rule Mining")

In [None]:
file = "/content/drive/My Drive/BIGDATA Fall2020/browsing.txt"

# reading input file
fileRDD = sc.textFile(file)

# total Baskets count
print('Total number of baskets = ', fileRDD.count())

Total number of baskets =  31101


In [None]:
basketsRDD = fileRDD.map(lambda line: re.split(r'\W+', line.strip()))
#for i in range(0,10):
  #print(basketsRDD.collect()[i])

In [None]:
# function to give index to every basket
def indexing(record):
  l = []
  for item in record[0]:
    l.append((item,record[1]))
  return l

#every basket will be numbered from 0, 1, 2, ....
#then a table will be created for every item.
#Where item will be the key and list of baskets in which the item is present will be the value
#later this table will be used to find most frequent item sets for different k values

indexedBasketsRDD = basketsRDD.zipWithIndex().map(indexing)
tempMapRDD = indexedBasketsRDD.flatMap(lambda x: x).map(lambda x: (x[0],[x[1]]))
mapRDD = tempMapRDD.reduceByKey(lambda list1,list2: list1 + list2).map(lambda x: (x[0], frozenset(x[1])))

#for i in range(0,10):
  #print(mapRDD.collect()[i])

print('Total Records = ', mapRDD.count())

Total Records =  12592


In [None]:
dictionaryRDD = mapRDD.collectAsMap()
broadDictionaryRDD = sc.broadcast(dictionaryRDD)

***Finding frequent itemsets***

In [None]:
support = 85
k = 1

wordsRDD = fileRDD.flatMap(lambda line: re.split(r'\W+', line.strip()))
singleFrequentItemsRDD = wordsRDD.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).filter(lambda x: x[1] >= support)

#qualifiers is a list which will collect all the qualified frequent item sets which has k=1,2,3,4 and support >= 85
qualifiers = []
qualifiers.append(singleFrequentItemsRDD.collect())
#for i in range(0,10):
  #print(singleFrequentItemsRDD.collect()[i])
#print(f'Unique items with support >= 85 : ', itemsRDD.count())

***Frequent Itemsets for k = 2,3,4***

In [None]:
# function for getting eligible item sets depending on the given support value
support = 85
def eligibleItems(x):
  first_item = True
  for item in x:
    if first_item:
      itemset = broadDictionaryRDD.value[item]
      first_item = False
    else:
      itemset = itemset.intersection(broadDictionaryRDD.value[item])
  
  if len(itemset) >= support:
    return x,len(itemset)
  return

In [None]:
# function for getting frequent itemsets

def frequent_itemset(prev, frequent_items, k, qualifiers):
  if k == 2:
    cartesianRDD = prev.cartesian(frequent_items).map(lambda x: frozenset(x))
  else:
    cartesianRDD = prev.cartesian(frequent_items).map(lambda x: frozenset(x[0]+tuple([x[1]])))
    
  eligibleRDD = cartesianRDD.filter(lambda x: len(x) == k).distinct().map(lambda x: tuple(x))
  #print(f'Total candidate itemsets (k = {k}) : ', eligibleRDD.count())

  mulitpleItemsRDD = eligibleRDD.map(eligibleItems).filter(lambda x: x)
  
  #if k == 2:
    #twoFrequentItemsRDD = sc.parallelize(mulitpleItemsRDD.collect());
  #if k == 3:
    #threeFrequentItemsRDD = sc.parallelize(mulitpleItemsRDD.collect());
  #if k == 4:
    #fourFrequentItemsRDD = sc.parallelize(mulitpleItemsRDD.collect());
  
  qualifiers.append(mulitpleItemsRDD.collect())
  #print(f'Total frequent itemsets (k = {k}) : ', mulitpleItemsRDD.count())

  prev = mulitpleItemsRDD.map(lambda x: x[0])
  prev.persist()

  if k < 4:
    frequent_itemset(prev, frequent_items, k+1, qualifiers)
  else:
    return

singleFrequentItemsKeysRDD = singleFrequentItemsRDD.map(lambda x: x[0])
singleFrequentItemsKeysRDD.persist()
frequent_itemset(singleFrequentItemsKeysRDD, singleFrequentItemsKeysRDD, 2, qualifiers)

***Association rule implementation***

In [None]:
import itertools as it
c = 0.9

#function to get the association
#seperate: indicates how many parameters will be there on the left hand side of the association rule
#x is single record and map is a dictionary

def get_association(x,map,seperate,k):
  li = []
  for item in it.combinations(x[0],seperate):
    if k == 2:
      confidence = x[1]/map[frozenset({item[0]})]
    else:
      confidence = x[1]/map[frozenset(item)]
    if confidence >= c:
      li.append((item, tuple(set(x[0])-set(item)), confidence*100))

  if li:
    return x,li
  return

# converting rdd to the dictionary/map
singleFrequentItemsMapRDD = singleFrequentItemsRDD.map(lambda x: (frozenset({x[0]}),x[1])).collectAsMap()

# qulifier list contains list at index 1 which is a twofrequent items rdd
twoFrequentItemsRDD = sc.parallelize(qualifiers[1]).map(lambda x: (frozenset(x[0]),x[1]))
twoFrequentItemsMapRDD = twoFrequentItemsRDD.collectAsMap()

# qulifier list contains list at index 2 which is a threefrequent items rdd
threeFrequentItemsRDD = sc.parallelize(qualifiers[2]).map(lambda x: (frozenset(x[0]),x[1]))
threeFrequentItemsMapRDD = threeFrequentItemsRDD.collectAsMap()

# qulifier list contains list at index 2 which is a fourfrequentitems rdd
fourFrequentItemsRDD = sc.parallelize(qualifiers[3])

twoItemsConfiedenceRDD = twoFrequentItemsRDD.map(lambda x : get_association(x,singleFrequentItemsMapRDD,1,2)).filter(lambda x: x)
#print('Two items with confidence 90 or greater: ', twoItemsConfiedenceRDD.count())

threeItemsConfiedenceRDD = threeFrequentItemsRDD.map(lambda x : get_association(x,twoFrequentItemsMapRDD,2,3)).filter(lambda x: x)
#print('Three items with confidence 90 or greater: ', threeItemsConfiedenceRDD.count())

fourItemsConfiedenceRDD = fourFrequentItemsRDD.map(lambda x : get_association(x,threeFrequentItemsMapRDD,3,4)).filter(lambda x: x)
#print('Four items with confidence 90 or greater: ', fourItemsConfiedenceRDD.count())

#storing output to the text file
output = open("/content/drive/My Drive/BIGDATA Fall2020/association_rules.txt", "a")

In [None]:
output.write("Association rule for two items\n\n")
for val in twoItemsConfiedenceRDD.collect():
  #print(val[1][0][0][0], '>>', val[1][0][1][0], ' {:.2f}'.format(val[1][0][2]), '%')
  line = val[1][0][0][0] + ' >> ' + val[1][0][1][0] + ' {:.2f}'.format(val[1][0][2]) + '%\n'
  output.write(line)

output.write("\n\n\n")

3

In [None]:
output.write("Association rule for three items\n\n")
for val in threeItemsConfiedenceRDD.collect():
  line = str(val[1][0][0]) + ' >> ' + val[1][0][1][0] + ' {:.2f}'.format(val[1][0][2]) + '%\n'
  output.write(line)

output.write("\n\n\n")

3

In [None]:
output.write("Association rule for four items\n\n")
for val in fourItemsConfiedenceRDD.collect():
  line = str(val[1][0][0]) + ' >> ' + val[1][0][1][0] + ' {:.2f}'.format(val[1][0][2]) + '%\n'
  output.write(line)

output.close()

# Due Date: Sept. 17 at 11:59pm