# Association Analysis using the Apriori Algorithm and PySpark

First I will be doing data manipulation on the basket data csv file

In [2]:
import pandas as pd
import numpy as np

In [3]:
Data=pd.read_csv("Basket_Data.csv")

In [4]:
Data.head()

Unnamed: 0,Transaction,Item
0,1,Bread
1,2,Scandinavian
2,2,Scandinavian
3,3,Hot chocolate
4,3,Jam


In [None]:
###add and underscore to cells with 2+ word items to make association analysis easier

In [5]:
Data=Data.replace(' ', '_', regex=True)

The next goal is to format the data where each transaction is a string of items 
More in a list form grouped by transaction

In [6]:
# Convert 'value' column to string.
Data['Item'] = Data['Item'].astype(str)

# Perform a groupby and apply a string join.
Data_Transformed = Data.groupby('Transaction')['Item'].apply(' '.join).reset_index()

In [7]:
###looking at the new data to make sure it worked
Data_Transformed

Unnamed: 0,Transaction,Item
0,1,Bread
1,2,Scandinavian Scandinavian
2,3,Hot_chocolate Jam Cookies
3,4,Muffin
4,5,Coffee Pastry Bread
5,6,Medialuna Pastry Muffin
6,7,Medialuna Pastry Coffee Tea
7,8,Pastry Bread
8,9,Bread Muffin
9,10,Scandinavian Medialuna


In [8]:
df_item = Data_Transformed[['Item']]

In [9]:
df_item

Unnamed: 0,Item
0,Bread
1,Scandinavian Scandinavian
2,Hot_chocolate Jam Cookies
3,Muffin
4,Coffee Pastry Bread
5,Medialuna Pastry Muffin
6,Medialuna Pastry Coffee Tea
7,Pastry Bread
8,Bread Muffin
9,Scandinavian Medialuna


In [18]:
tfile = open('item.txt', 'a')
tfile.write(df_item.to_string())
tfile.close()

Initializing spark and using the apriori algorithm

In [20]:
import itertools
import pyspark

In [22]:
sc.stop()

In [23]:
sc=pyspark.SparkContext("local","Apriori")

In [24]:
text_input = sc.textFile('item.txt')

In [25]:
text_input

item.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [26]:
from operator import add
no_of_baskets = text_input.map(lambda line: line.split()).count()
no_of_baskets

9532

In [27]:
txt_RDD = text_input.map(lambda line:line.split())

In [28]:
individual_counts = text_input.flatMap(lambda line: line.split()).map(lambda x: (x,1)).reduceByKey(add)
##individual_counts.take(10)

In [29]:
##filtering to only include items that appear more than 100 times
freq_items_counts = individual_counts.filter(lambda x: x[1]>=100).sortBy(lambda x: x[0])
freq_items_counts.take(10)

[('Alfajores', 348),
 ('Baguette', 151),
 ('Bread', 3304),
 ('Brownie', 370),
 ('Cake', 1011),
 ('Chicken_Stew', 117),
 ('Coffee', 5447),
 ('Coke', 181),
 ('Cookies', 515),
 ('Farm_House', 370)]

In [30]:
freq_items_counts_list = individual_counts.filter(lambda x: x[1]>=100).sortBy(lambda x: x[0]).collect()

In [31]:
freq_items = freq_items_counts.map(lambda x: x[0])
freq_items.take(10)

['Alfajores',
 'Baguette',
 'Bread',
 'Brownie',
 'Cake',
 'Chicken_Stew',
 'Coffee',
 'Coke',
 'Cookies',
 'Farm_House']

In [33]:
freq_items_list = freq_items_counts.map(lambda x: x[0]).collect()

In [34]:
freq_items_list[:10]

['Alfajores',
 'Baguette',
 'Bread',
 'Brownie',
 'Cake',
 'Chicken_Stew',
 'Coffee',
 'Coke',
 'Cookies',
 'Farm_House']

In [35]:
no_of_freq_items=freq_items.count()
no_of_freq_items

29

In [36]:
def filter_and_pair(lst):
    ret=[]
    pairs=[]
    for i in lst:
        if i in freq_items_list:
            ret.append(i)
    ret.sort()
    for pair in itertools.combinations(ret,2):
        pairs.append((pair,1))
    return(pairs)

In [37]:
def confidence(elem):
    lst=[]
    conf1 = elem[1]/(freq_pair_counts.filter(lambda x: x[0]==elem[0][0]).collect())[0][1]
    conf2 = elem[1]/(freq_items_counts.filter(lambda x: x[0]==elem[0][1]).collect())[0][1]
    lst.append(((elem[0],elem[1]),conf1))
    lst.append(((elem[1],elem[0]),conf2))
    return(lst)
    

In [38]:
pair_counts = txt_RDD.flatMap(lambda x: filter_and_pair(x)).reduceByKey(add).sortBy( lambda x: x[0])
pair_counts.take(10)

[(('Alfajores', 'Alfajores'), 22),
 (('Alfajores', 'Baguette'), 4),
 (('Alfajores', 'Bread'), 105),
 (('Alfajores', 'Brownie'), 24),
 (('Alfajores', 'Cake'), 37),
 (('Alfajores', 'Coffee'), 243),
 (('Alfajores', 'Coke'), 4),
 (('Alfajores', 'Cookies'), 19),
 (('Alfajores', 'Farm_House'), 4),
 (('Alfajores', 'Fudge'), 5)]

In [39]:
freq_pair_counts = pair_counts.filter(lambda x: x[1]>=100)
freq_pair_counts.take(10)
freq_pair_counts_list = freq_pair_counts.collect()
freq_pair_counts_list[:10]


[(('Alfajores', 'Bread'), 105),
 (('Alfajores', 'Coffee'), 243),
 (('Bread', 'Bread'), 234),
 (('Bread', 'Brownie'), 110),
 (('Bread', 'Cake'), 241),
 (('Bread', 'Coffee'), 1053),
 (('Bread', 'Cookies'), 145),
 (('Bread', 'Hot_chocolate'), 139),
 (('Bread', 'Medialuna'), 167),
 (('Bread', 'NONE'), 213)]

In [40]:
freq_pairs = freq_pair_counts.map(lambda x: x[0])
freq_pairs.take(10)

[('Alfajores', 'Bread'),
 ('Alfajores', 'Coffee'),
 ('Bread', 'Bread'),
 ('Bread', 'Brownie'),
 ('Bread', 'Cake'),
 ('Bread', 'Coffee'),
 ('Bread', 'Cookies'),
 ('Bread', 'Hot_chocolate'),
 ('Bread', 'Medialuna'),
 ('Bread', 'NONE')]

In [41]:
conf_lst=[]
for i in freq_pair_counts_list:
    conf1 = i[1]/(freq_items_counts.filter(lambda x: x[0]==i[0][0]).collect())[0][1]
    conf2 = i[1]/(freq_items_counts.filter(lambda x: x[0]==i[0][1]).collect())[0][1]
    conf_lst.append(((i[0][0],i[0][1]),conf1))
    conf_lst.append(((i[0][1],i[0][0]),conf2))

In [42]:
conf_lst.sort(key=lambda x: -x[1])
conf_lst[:5]

[(('Toast', 'Coffee'), 0.9140127388535032),
 (('Pastry', 'Coffee'), 0.706021251475797),
 (('Alfajores', 'Coffee'), 0.6982758620689655),
 (('Spanish_Brunch', 'Coffee'), 0.6927710843373494),
 (('Cake', 'Coffee'), 0.6923837784371909)]