
# **Frequent Itemsets with PySpark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system.

Follow the steps to install the dependencies:

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -qN https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [3]:
!pip install -q findspark

Set the location of Java and Spark by running the following code:

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.1-bin-hadoop3.2"

Install PySpark and run a local spark session to test the installation:

In [5]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 54.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=34c458f9f1528f93e791011ea107b2f3154e6b6523f44a69de9a030ba03ed6fd
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [6]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

Let's create a spark DataFrame to confirm that we can run PySpark, and preload that DataFrame with test baskets.

| Transaction ID | Stock Items |
|:--------------:|:---------------- |
| 100 | milk, coke, beer |
| 200 | milk, pepsi, juice |
| 300 | milk, beer |
| 400 | coke, juice |
| 500 | milk, pepsi, beer |
| 600 | milk, coke, beer, juice |
| 700 | coke, beer, juice |
| 800 | beer, coke |

Each DataFrame row is `<Transaction ID, [Stock Items]>`

In [7]:
m,c,b,p,j = 12,3,2,15,9
basket_df = spark.createDataFrame([
    (100, [m,c,b]),
    (200, [m,p,j]),
    (300, [m,b]),
    (400, [c,j]),
    (500, [m,p,b]),
    (600, [m,c,b,j]),
    (700, [c,b,j]),
    (800, [b,c])
], ["id", "items"])
basket_df.show()
stockIDs = {b: 'Beer', c: 'Coke', m: 'Milk', j: 'Juice', p: 'Pepsi'}

+---+-------------+
| id|        items|
+---+-------------+
|100|   [12, 3, 2]|
|200|  [12, 15, 9]|
|300|      [12, 2]|
|400|       [3, 9]|
|500|  [12, 15, 2]|
|600|[12, 3, 2, 9]|
|700|    [3, 2, 9]|
|800|       [2, 3]|
+---+-------------+



<hr>

# PySpark Code

## FP-Growth Algorithm

Ready to run FP-Growth? References:

* PySpark [introduction for FP-Growth](https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html#fp-growth).
* [PySpark Dataframes](https://sparkbyexamples.com/pyspark/convert-pandas-to-pyspark-dataframe/)

First, run FP-Growth example from the documentation

In [8]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(basket_df)

# Display generated association rules.
model.associationRules.show()



+----------+----------+------------------+------------------+-------+
|antecedent|consequent|        confidence|              lift|support|
+----------+----------+------------------+------------------+-------+
|       [3]|       [2]|               0.8|1.0666666666666667|    0.5|
|      [12]|       [2]|               0.8|1.0666666666666667|    0.5|
|       [2]|       [3]|0.6666666666666666|1.0666666666666667|    0.5|
|       [2]|      [12]|0.6666666666666666|1.0666666666666667|    0.5|
+----------+----------+------------------+------------------+-------+



## Q1. Interpreting association rules [15] (In the writeup)

The above table, has columns antecedent, consequent, confidence, lift and support. 

1. Explain the first row, `[3] [2] 0.8 1.0666666666666667 0.5` in plain English.
2. The first and the third rows have the antecedent and consequent switched, but different confidence values. (Same with second and fourth rows). How do you explain those results?
3. What does support = 0.5 for all the rows mean?

#### Association Rules with changed minSupport and minConfidence values

Modify the support threshold to be 0.375 and minimum confidence to be 0.75 to make the parameters consistent with the settings in the textbook.

In [9]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.375, minConfidence=0.75)
model = fpGrowth.fit(basket_df)

# Display frequent itemsets.
model.freqItemsets.show()

# Display generated association rules.
model.associationRules.show()



+-------+----+
|  items|freq|
+-------+----+
|    [3]|   5|
| [3, 2]|   4|
|    [2]|   6|
|   [12]|   5|
|[12, 2]|   4|
|    [9]|   4|
| [9, 3]|   3|
+-------+----+

+----------+----------+----------+------------------+-------+
|antecedent|consequent|confidence|              lift|support|
+----------+----------+----------+------------------+-------+
|       [3]|       [2]|       0.8|1.0666666666666667|    0.5|
|      [12]|       [2]|       0.8|1.0666666666666667|    0.5|
|       [9]|       [3]|      0.75|               1.2|  0.375|
+----------+----------+----------+------------------+-------+



## Q2. Interpreting the new association rules [10] (In the writeup)

The third row of the result shows a low support (0.375) and a high lift (1.2). What does this line tell us?


## Q3. Association Rules for an Online Retail Dataset [5]

The main part of this exercise involves processing a sampled dataset from a UK-based online retailer. We'll be working with a 8050 record subset.

* Read in the data from the dataset `online_retail_III.csv`. For your convenience, I have already thrown away bad records using `dropna()`.
* There are a couple of wrinkles to keep in mind in case you are curious, though you may not really need them.
    * An invoice represents a shopping cart and it can contain multiple items. 
    * Some invoice numbers start with a "C." Invoice number C123456 is to be interpreted as a return of items in invoice 123456. The `inum` column represents the Invoice number as well as the credit (return). In other words, Invoice numbers `123456` and `C123456` would have `inum` == 123456.

In [10]:
import pandas as pd
df_orig = pd.read_csv('https://storage.googleapis.com/119-quiz7-files/online_retail_II.csv')
df_orig.dropna(inplace=True)
df_orig

Unnamed: 0,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
0,489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01 07:45:00,6.95,13085.0,United Kingdom
1,489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
2,489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
3,489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01 07:45:00,2.10,13085.0,United Kingdom
4,489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01 07:45:00,1.25,13085.0,United Kingdom
...,...,...,...,...,...,...,...,...
1067366,581587,22899,CHILDREN'S APRON DOLLY GIRL,6,2011-12-09 12:50:00,2.10,12680.0,France
1067367,581587,23254,CHILDRENS CUTLERY DOLLY GIRL,4,2011-12-09 12:50:00,4.15,12680.0,France
1067368,581587,23255,CHILDRENS CUTLERY CIRCUS PARADE,4,2011-12-09 12:50:00,4.15,12680.0,France
1067369,581587,22138,BAKING SET 9 PIECE RETROSPOT,3,2011-12-09 12:50:00,4.95,12680.0,France


#### Data Scrubbing

Remove the rows we should filter away. They aren't necessarily visible in the summary view but we know they exist.

* StockCode `POST`,
* StockCode `M`.


In [11]:
# filter out rows with POST and M
df_orig = df_orig[(df_orig.StockCode != 'POST') & (df_orig.StockCode != 'M')]

In [12]:
# mkae inum column (remove character 'C' in Invoice)
df_orig.loc[:,'inum'] = [x if x[0] != 'C' else x[1:] for x in df_orig.Invoice]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self.obj[key] = value


In [13]:
# make into type int
df_orig['inum'] = df_orig['inum'].astype('int')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  


In [14]:
# get all unique stockcode to make a mapping of integers to code
stockcode = pd.unique(df_orig.StockCode)
stockcode

array(['85048', '79323P', '79323W', ..., '23562', '23561', '23843'],
      dtype=object)

In [15]:
# make the unique integer to stock codes map
code_map = {}
for index, code in enumerate(stockcode):
  code_map[code] = index

## Q4. Connecting Online Retail Data to FP-Growth [30]

Adapt the DataFrame to look like `df_basket` above. 
* `df_orig` is a Pandas DataFrame whereas `df_basket`-equivalent will have to be Spark DataFrames.
* `Invoice` and `StockCode` are strings but FP-Growth needs inputs to be integers. You'd need to map strings to integers before feeding them to FP-Growth and convert the resulting antecedents and consequents back.

In [16]:
from collections import defaultdict
df_dict = defaultdict(set)
for _, row in df_orig.iterrows():
  inv = row['inum']
  code_num = code_map[row['StockCode']]
  df_dict[inv].add(code_num)

In [17]:
df_list = []
for inv in df_dict:
  df_list.append((inv, list(df_dict[inv])))

In [18]:
df_basket = spark.createDataFrame(df_list, ["invoice", "stock"])
df_basket.show()

+-------+--------------------+
|invoice|               stock|
+-------+--------------------+
| 489434|[0, 1, 2, 3, 4, 5...|
| 489435|      [8, 9, 10, 11]|
| 489436|[12, 13, 14, 15, ...|
| 489437|[30, 31, 32, 33, ...|
| 489438|[64, 65, 66, 67, ...|
| 489439|[33, 5, 70, 71, 7...|
| 489440|              [8, 9]|
| 489441|    [87, 30, 86, 71]|
| 489442|[18, 30, 48, 49, ...|
| 489443|[3, 107, 108, 109...|
| 489445|[128, 33, 71, 86,...|
| 489446|[129, 130, 131, 1...|
| 489448|[152, 149, 150, 151]|
| 489449|[6, 136, 46, 153,...|
| 489450|[6, 136, 46, 153,...|
| 489459|[160, 161, 162, 1...|
| 489460|[1, 2, 71, 72, 17...|
| 489461|[131, 132, 4, 24,...|
| 489462|[160, 161, 162, 1...|
| 489465|[129, 131, 137, 1...|
+-------+--------------------+
only showing top 20 rows



### Establish the mapping between Invoice IDs, StockCodes and unique integers.

## Q5. Fine-tuning FP-Growth runs [20]

* Set `minConfidence` = 0.75.
* Set `minSupport` such that the total number of association rules is between 10 and 20. (If `minSupport` is small, the number of association rules will increase. As it increases, the number of association rules will decrease.).



In [19]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="stock", minSupport=0.01, minConfidence=0.75)
model = fpGrowth.fit(df_basket)

In [20]:
# Display generated association rules.
model.associationRules.show()



+------------+----------+------------------+------------------+--------------------+
|  antecedent|consequent|        confidence|              lift|             support|
+------------+----------+------------------+------------------+--------------------+
| [855, 3012]|     [140]|0.7538940809968847| 9.959836101473948|0.010958904109589041|
|[3728, 3729]|    [3727]|0.8440366972477065|32.959222576432325|0.012498584852258576|
|      [3839]|    [3843]|0.7641357027463651| 60.37218839319001|0.010709838107098382|
|[3729, 3310]|    [3727]|0.8330241187384044|32.529186741009404|0.010166421374391487|
|[3728, 3727]|    [3729]|            0.8832| 38.85112350597609|0.012498584852258576|
|      [3728]|    [3729]|0.8154613466334164|  35.8713649144072|0.014808105966262877|
|      [3728]|    [3727]|0.7793017456359103|30.431354196295295|0.014151477414242046|
|      [3843]|    [3839]|0.8461538461538461| 60.37218839319001|0.010709838107098382|
|       [376]|     [387]|            0.7872| 49.88047058823529| 0

## Q6. Final Association Rules [20]

Present the resulting Association Rules in terms of the original StockCodes and Descriptions, in descending order of lift.

In [21]:
final_rules = model.associationRules.select('antecedent', 'consequent','lift').toPandas()



In [22]:
final_rules.columns = ['Invoice', 'StockCodes', 'Lift']

In [23]:
inv_map = {v: k for k, v in code_map.items()}

In [24]:
new_list = []
for index, c in enumerate(final_rules.StockCodes):
  map_back = inv_map[c[0]]
  new_list.append(map_back)
new_list

['85099B',
 '22699',
 '22745',
 '22699',
 '22697',
 '22697',
 '22699',
 '22748',
 '21094',
 '21122',
 '82580',
 '22699']

In [25]:
final_rules['StockCodes'] = new_list
final_rules

Unnamed: 0,Invoice,StockCodes,Lift
0,"[855, 3012]",85099B,9.959836
1,"[3728, 3729]",22699,32.959223
2,[3839],22745,60.372188
3,"[3729, 3310]",22699,32.529187
4,"[3728, 3727]",22697,38.851124
5,[3728],22697,35.871365
6,[3728],22699,30.431354
7,[3843],22748,60.372188
8,[376],21094,49.880471
9,[441],21122,47.691399


In [26]:
final_rules.sort_values('Lift', ascending=False, inplace=True)

In [27]:
description_df = df_orig[['StockCode', 'Description']].drop_duplicates()

In [39]:
with_desc = final_rules.merge(description_df, how = 'left', left_on = 'StockCodes', right_on = 'StockCode')
with_desc.drop("StockCode", axis=1, inplace=True)

In [40]:
with_desc.drop_duplicates(subset=["StockCodes", "Lift"], keep='first', inplace=True)
with_desc

Unnamed: 0,Invoice,StockCodes,Lift,Description
0,[3839],22745,60.372188,POPPY'S PLAYHOUSE BEDROOM
1,[3843],22748,60.372188,POPPY'S PLAYHOUSE KITCHEN
2,[376],21094,49.880471,SET/6 RED SPOTTY PAPER PLATES
3,[441],21122,47.691399,SET/10 PINK SPOTTY PARTY CANDLES
5,"[3728, 3727]",22697,38.851124,GREEN REGENCY TEACUP AND SAUCER
7,[3728],22697,35.871365,GREEN REGENCY TEACUP AND SAUCER
9,[1013],82580,35.634767,BATHROOM METAL SIGN
10,"[3728, 3729]",22699,32.959223,ROSES REGENCY TEACUP AND SAUCER
12,"[3729, 3310]",22699,32.529187,ROSES REGENCY TEACUP AND SAUCER
14,[3728],22699,30.431354,ROSES REGENCY TEACUP AND SAUCER
