## Steps :

- Reading data with PySpark
- Parsing data to Spark RDD objects
- Finding first support values of items with “MapReduce”
- Deciding minimum support value
- Creating the following support tables with “MapReduce”
- Calculating confidence values
- Deciding which product customer buys with high confidence

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext

# Spark Context 
# Also extra coniguration can be added 
sc = SparkContext("local" , "Apriori")

In [3]:
## eading the dataset

data = sc.textFile("Fruit_Data.txt")
print(data.collect())

['Apple,Mango,Banana', 'Banana,Mango', 'Apple,Banana', 'Apple,Mango,Coconut', 'Strawberry,Grapes,Lemon,Raspberry', 'Rassberry,Grapes', 'Strawberry,Apple', 'Apple,Mango,Raspberry', 'Mango,Raspberry', 'Mango,Apple', 'Apple,Raspberry', 'Banana,Raspberry,Mango', 'Apple,Mango,Banana', 'Raspberry,Banana', 'Apple,Strawberry', 'Strawberry,Banana,Apple,Mango', 'Mango,Banana,Raspberry,Apple', 'Coconut,Apple,Raspberry', 'Raspberry,Coconut,Banana']


### **Parsing Transaction Items into RDD**

The default textFile() method reads file line by line, it means every line in our CSV file will be a value in RDD. These RDDs include CSV lines as a single string value. That’s why we need to map into every RDD and split these single values by comma in order to obtain every item in lines and split them into an RDD array.

In [4]:
## Splited items  
lblitems = data.map(lambda line: line.split(','))

print(lblitems.collect())

[['Apple', 'Mango', 'Banana'], ['Banana', 'Mango'], ['Apple', 'Banana'], ['Apple', 'Mango', 'Coconut'], ['Strawberry', 'Grapes', 'Lemon', 'Raspberry'], ['Rassberry', 'Grapes'], ['Strawberry', 'Apple'], ['Apple', 'Mango', 'Raspberry'], ['Mango', 'Raspberry'], ['Mango', 'Apple'], ['Apple', 'Raspberry'], ['Banana', 'Raspberry', 'Mango'], ['Apple', 'Mango', 'Banana'], ['Raspberry', 'Banana'], ['Apple', 'Strawberry'], ['Strawberry', 'Banana', 'Apple', 'Mango'], ['Mango', 'Banana', 'Raspberry', 'Apple'], ['Coconut', 'Apple', 'Raspberry'], ['Raspberry', 'Coconut', 'Banana']]


### **Getting First Support Values of Items for Apriori**

Apriori algorithm depends on the frequencies of items. Because of that first, we need to obtain frequencies for every single item. These frequencies will be our first support values in table one (as mentioned in the previous section). In order to that, we need to extract every item in RDDs to whole items of the array. We can do that by using the **“flatMap”** method.

In [5]:
## Whole lines in single array 
wlitems = data.flatMap(lambda line:line.split(','))

print(wlitems.collect())

['Apple', 'Mango', 'Banana', 'Banana', 'Mango', 'Apple', 'Banana', 'Apple', 'Mango', 'Coconut', 'Strawberry', 'Grapes', 'Lemon', 'Raspberry', 'Rassberry', 'Grapes', 'Strawberry', 'Apple', 'Apple', 'Mango', 'Raspberry', 'Mango', 'Raspberry', 'Mango', 'Apple', 'Apple', 'Raspberry', 'Banana', 'Raspberry', 'Mango', 'Apple', 'Mango', 'Banana', 'Raspberry', 'Banana', 'Apple', 'Strawberry', 'Strawberry', 'Banana', 'Apple', 'Mango', 'Mango', 'Banana', 'Raspberry', 'Apple', 'Coconut', 'Apple', 'Raspberry', 'Raspberry', 'Coconut', 'Banana']


As you can see from the result; All our transaction items are in a single array. Now we can calculate each unique item’s frequency. Do not forget that these frequencies will be our first supports values. If we would be working on a “NumPy” array, finding frequencies would be easy. But, we work on RDDs and that’s why we need to find a way to obtain frequencies by considering the “MapReduce” approach. Solution; First we can convert every item to a “tuple” object and add “1” as a second item of “tuple”. We can sum these values by using the **“reduceByKey”** (It is like the groupby method in SQL) method. By summing tuple’s second numbers we can get every unique item’s frequency (how many time occurs on customers’ transactions). We will also need to list unique items in the feature sections. So, we can also obtain unique items by using the “distinct” method.

In [6]:
## Unique frequent items in dataset
uniqueItems = wlitems.distinct()
print(uniqueItems.collect())

['Apple', 'Mango', 'Banana', 'Coconut', 'Strawberry', 'Grapes', 'Lemon', 'Raspberry', 'Rassberry']


In [7]:
## Add 1 as Tuple
supportRdd = wlitems.map(lambda item:(item, 1))
print(supportRdd.collect())

[('Apple', 1), ('Mango', 1), ('Banana', 1), ('Banana', 1), ('Mango', 1), ('Apple', 1), ('Banana', 1), ('Apple', 1), ('Mango', 1), ('Coconut', 1), ('Strawberry', 1), ('Grapes', 1), ('Lemon', 1), ('Raspberry', 1), ('Rassberry', 1), ('Grapes', 1), ('Strawberry', 1), ('Apple', 1), ('Apple', 1), ('Mango', 1), ('Raspberry', 1), ('Mango', 1), ('Raspberry', 1), ('Mango', 1), ('Apple', 1), ('Apple', 1), ('Raspberry', 1), ('Banana', 1), ('Raspberry', 1), ('Mango', 1), ('Apple', 1), ('Mango', 1), ('Banana', 1), ('Raspberry', 1), ('Banana', 1), ('Apple', 1), ('Strawberry', 1), ('Strawberry', 1), ('Banana', 1), ('Apple', 1), ('Mango', 1), ('Mango', 1), ('Banana', 1), ('Raspberry', 1), ('Apple', 1), ('Coconut', 1), ('Apple', 1), ('Raspberry', 1), ('Raspberry', 1), ('Coconut', 1), ('Banana', 1)]


In [8]:
# Method for sum in reduceByKey method
def sumOpertor(x, y):
    return x+y

# sum of values by its key
supportRdd = supportRdd.reduceByKey(sumOpertor)
print(supportRdd.collect())

[('Apple', 12), ('Mango', 10), ('Banana', 9), ('Coconut', 3), ('Strawberry', 4), ('Grapes', 2), ('Lemon', 1), ('Raspberry', 9), ('Rassberry', 1)]


In [9]:
## finding first support values
supports = supportRdd.map(lambda item:item[1]) ## return only support values
print(supports.collect())

[12, 10, 9, 3, 4, 2, 1, 9, 1]


### **Min Support Value**

In order to decide which item-sets will stay in the support tables, we need to define a min support value. We can choose min support value as the minimum frequency that is in the first support values array (table). If any support values in item set arrays are less than min support value, we should remove that item set from that array. If our data hasn’t got many records, min support might be 1. In such cases, we can define min support as 2 or any value more than 1.

In [13]:
## Define minimum support value 
minSupport = supports.min()

# If mininmum support is 1 then replace it with 2 
minSupport = 2 if minSupport == 1 else minSupport

## Filter first supportRdd with minimum support 
supportRdd = supportRdd.filter(lambda item: item[1] >= minSupport )

## Craete base RDD with will be updated every iteration
baseRdd = supportRdd.map(lambda item: ([item[0]] , item[1])) 
print('1 . Table has crated...') 

supportRdd = supportRdd.map(lambda item: item[0])
supportRddCart = supportRdd
print(supportRddCart.collect())

1 . Table has crated...
['Apple', 'Mango', 'Banana', 'Coconut', 'Strawberry', 'Grapes', 'Raspberry']


The Apriori algorithm will filter every combination table according to min support value. When there is no item set while loop will end. Besides, we also need to define a function that can find replications inside the combined item set. As mentioned before; sets of (Apple, Mango) and (Mango, Apple) are the same thing for the Apriori algorithm. Because of that, we need to find such patterns and remove one of them. There is a function called “removeReplica”. This function removes such duplicated items after a combination and returns only one of them.

In [14]:
def removeReplica(record):

    if(isinstance(record[0], tuple)):
        x1 = record[0]
        x2 = record[1]
    else:
        x1 = [record[0]]
        x2 = record[1]

    if(any(x == x2 for x in x1) == False):
        a = list(x1)
        a.append(x2)
        a.sort()
        result = tuple(a)
        return result 
    else:
        return x1

In [18]:
c = 2 # Combination length 

while(supportRdd.isEmpty() == False):

    combined = supportRdd.cartesian(uniqueItems)
    combined = combined.map(lambda item: removeReplica(item))
  
    combined = combined.filter(lambda item: len(item) == c)
    combined = combined.distinct()

    
    combined_2 = combined.cartesian(lblitems)
    combined_2 = combined_2.filter(lambda item: all(x in item[1] for x in item[0]))
    
    combined_2 = combined_2.map(lambda item: item[0])
    combined_2 = combined_2.map(lambda item: (item , 1))
    combined_2 = combined_2.reduceByKey(sumOpertor)
    combined_2 = combined_2.filter(lambda item: item[1] >= minSupport)

    baseRdd = baseRdd.union(combined_2)
    
    combined_2 = combined_2.map(lambda item: item[0])
    supportRdd = combined_2
    print(c ,'. Table has crated... ')
    c = c+1 

2 . Table has crated... 


### Calculate Confidence % value

In [19]:
class Filter():

    def __init__(self):
        
        self.stages = 1


    def filterForConf(self, item , total):
        
        if(len(item[0][0]) > len(item[1][0])  ):
            if(self.checkItemSets(item[0][0] , item[1][0]) == False):
                pass
            else:
                return (item)       
        else:
            pass  
        self.stages = self.stages + 1

    # Check Items sets includes at least one comman item // Example command: # any(l == k for k in z for l in x )
    def checkItemSets(self, item_1 , item_2):

        if(len(item_1) > len(item_2)):
            return all(any(k == l for k in item_1 ) for l in item_2)
        else:
            return all(any(k == l for k in item_2 ) for l in item_1)


    def calculateConfidence(self, item):

        # Parent item list
        parent = set(item[0][0])
        
        # Child item list
        if(isinstance(item[1][0] , str)):
            child  = set([item[1][0]])
        else:
            child  = set(item[1][0])
        # Parent and Child support values
        parentSupport = item[0][1]
        childSupport = item[1][1]
        # Finds the item set confidence is going to be found

        support = (parentSupport / childSupport)*100

        return list([ list(child) ,  list(parent.difference(child)) , support ])

        
# Example ((('x10', 'x3', 'x6', 'x7', 'x9'), 1), (('x10', 'x3', 'x7'), 1))
calcuItems = baseRdd.cartesian(baseRdd)

# Create Filter Object
ff = Filter()

#deneme = calcuItems.map(lambda item: lens(item)) 
total = calcuItems.count()

print('# : Aggregated support values preparing for the confidence calculatations')
baseRddConfidence = calcuItems.filter(lambda item: ff.filterForConf(item , total))
print('# : Aggregated support values are ready !')
baseRddConfidence = baseRddConfidence.map(lambda item: ff.calculateConfidence(item))

  
print(baseRddConfidence.collect())


# : Aggregated support values preparing for the confidence calculatations
# : Aggregated support values are ready !
[[['Apple'], ['Mango'], 58.333333333333336], [['Mango'], ['Apple'], 70.0], [['Apple'], ['Banana'], 41.66666666666667], [['Apple'], ['Coconut'], 16.666666666666664], [['Banana'], ['Apple'], 55.55555555555556], [['Coconut'], ['Apple'], 66.66666666666666], [['Apple'], ['Strawberry'], 25.0], [['Apple'], ['Raspberry'], 33.33333333333333], [['Mango'], ['Banana'], 60.0], [['Banana'], ['Mango'], 66.66666666666666], [['Mango'], ['Raspberry'], 40.0], [['Strawberry'], ['Apple'], 75.0], [['Raspberry'], ['Apple'], 44.44444444444444], [['Raspberry'], ['Mango'], 44.44444444444444], [['Banana'], ['Raspberry'], 44.44444444444444], [['Raspberry'], ['Banana'], 44.44444444444444], [['Coconut'], ['Raspberry'], 66.66666666666666], [['Raspberry'], ['Coconut'], 22.22222222222222], [['Apple'], ['Mango', 'Banana'], 33.33333333333333], [['Mango'], ['Apple', 'Banana'], 40.0], [['Banana'], ['Mango', 

The first item-set in arrays shows the products that customers bought and the second shows customers might buy if they bought products in the first item-set. The last element of the array shows the confidence value for that pattern. For example, a customer who buys Mango might buy Apple with %58 confidence. Another example; customer who buys Mango and Banana might buy Apple with 66.6% confidence. If you look carefully at the first two arrays [“Mango”, “Apple” ] and [“Apple”, “Mango”] have different confidence values. Let’s write the support formula for [“Mango” => “Apple”] (confidence of buying Apple after Mango)

In [20]:
## Import pandas modules
import pandas as pd

## Create an array with collected baseRddConfidence results
result = baseRddConfidence.collect()

## Create Data Frame
confidenceTable = pd.DataFrame(data = result , columns=["Before", "After" , "Confidence"])


                 Before                After  Confidence
0               [Apple]              [Mango]   58.333333
1               [Mango]              [Apple]   70.000000
2               [Apple]             [Banana]   41.666667
3               [Apple]            [Coconut]   16.666667
4              [Banana]              [Apple]   55.555556
5             [Coconut]              [Apple]   66.666667
6               [Apple]         [Strawberry]   25.000000
7               [Apple]          [Raspberry]   33.333333
8               [Mango]             [Banana]   60.000000
9              [Banana]              [Mango]   66.666667
10              [Mango]          [Raspberry]   40.000000
11         [Strawberry]              [Apple]   75.000000
12          [Raspberry]              [Apple]   44.444444
13          [Raspberry]              [Mango]   44.444444
14             [Banana]          [Raspberry]   44.444444
15          [Raspberry]             [Banana]   44.444444
16            [Coconut]        

In [21]:
confidenceTable

Unnamed: 0,Before,After,Confidence
0,[Apple],[Mango],58.333333
1,[Mango],[Apple],70.0
2,[Apple],[Banana],41.666667
3,[Apple],[Coconut],16.666667
4,[Banana],[Apple],55.555556
5,[Coconut],[Apple],66.666667
6,[Apple],[Strawberry],25.0
7,[Apple],[Raspberry],33.333333
8,[Mango],[Banana],60.0
9,[Banana],[Mango],66.666667
