<a href="https://colab.research.google.com/github/jewbe22/eecs4415_Big_data/blob/main/project_1/Task_2_Pattern_Discovery_in_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# EECS 4415 - Task 2
## Pattern Discovery in Spark

### Setup

Let's set up Spark on your Colab environment.  Run the cell below!

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u422-b05-1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [3]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)



In [4]:
id='1dhi1F78ssqR8gE6U-AgB80ZW7V_9snX4'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('products.csv')

id='1KZBNEaIyMTcsRV817us6uLZgm-Mii8oU'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('order_products__train.csv')

If you executed the cells above, you should be able to see the dataset we will need for this Colab under the "Files" tab on the left panel.

In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [6]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

### Your task

If you run successfully the setup stage, you are ready to work with the **3 Million Instacart Orders** dataset. In case you want to read more about it, check the [official Instacart blog post](https://tech.instacart.com/3-million-instacart-orders-open-sourced-d40d29ead6f2) about it, a concise [schema description](https://gist.github.com/jeremystan/c3b39d947d9b88b3ccff3147dbcf6c6b) of the dataset, and the [download page](https://www.instacart.com/datasets/grocery-shopping-2017).

In this Colab, we will be working with a subset training dataset (~131K orders) to perform fast and scalable Frequent Pattern Mining.

In [7]:
products = spark.read.csv('products.csv', header=True, inferSchema=True)
orders = spark.read.csv('order_products__train.csv', header=True, inferSchema=True)

In [8]:
products.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- aisle_id: string (nullable = true)
 |-- department_id: string (nullable = true)



In [9]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)



Use the Spark Dataframe API to join 'products' and 'orders', so that you will be able to see the product names in each transaction (and not only their ids).  Then, group by the orders by 'order_id' to obtain one row per basket (i.e., set of products purchased together by one customer). Display the top 20 rows.

In [10]:
''' 2-3 lines of code expected '''
# YOUR CODE HERE
product_order = products.join(orders, products.product_id == orders.product_id).groupBy('order_id').agg(collect_set('product_name').alias('basket'))
product_order.show(20)

+--------+--------------------+
|order_id|              basket|
+--------+--------------------+
|       1|[Bag of Organic B...|
|      96|[Roasted Turkey, ...|
|     112|[Umcka Elderberry...|
|     218|[Okra, Black Plum...|
|     456|[Petite Peas, Lar...|
|     473|[Organic Whole Mi...|
|     631|[Organic Cilantro...|
|     762|[Organic Cucumber...|
|     774|[Nacho Cheese Sau...|
|     844|[Organic Red Radi...|
|     904|[Zero Calorie Col...|
|     988|[Whipped Light Cr...|
|    1032|[Organic Living B...|
|    1077|[Sparkling Water,...|
|    1119|[Shallot, Large L...|
|    1139|[Cinnamon Rolls w...|
|    1143|[Water, Natural P...|
|    1145|[Mexican Casserol...|
|    1275|[Small Hass Avoca...|
|    1280|[Vanilla Soy Milk...|
+--------+--------------------+
only showing top 20 rows



In this Colab we will first explore [MLlib](https://spark.apache.org/mllib/), Apache Spark's scalable machine learning library. Specifically, you should use its implementation of the [FP-Growth](https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html#fp-growth) algorithm to perform fast Frequent Pattern Mining in Spark.
Use the Python example in the documentation, and train a model with

```minSupport=0.01``` and ```minConfidence=0.5```

In [11]:
''' 3 lines of code expected '''
# YOUR CODE HERE
from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="basket", minSupport=0.01, minConfidence=0.5)
model = fpGrowth.fit(product_order)

Compute and print how many frequent itemsets and association rules were generated by running FP-growth alongside visalizing top frequent itemsets and association rules. Show top 20 rows.


In [12]:
''' 5 lines of code in total expected but can differ based on your style. for sub-parts of the question, creating different cells of code would be recommended.'''
# YOUR CODE HERE
df_freq = model.freqItemsets
df_freq.show(20, truncate=False)


+----------------------------------------------+-----+
|items                                         |freq |
+----------------------------------------------+-----+
|[Green Onions]                                |1445 |
|[Red Raspberries]                             |1493 |
|[Organic Banana]                              |2332 |
|[Jalapeno Peppers]                            |1899 |
|[Organic Large Extra Fancy Fuji Apple]        |2891 |
|[Organic Whole String Cheese]                 |1993 |
|[Organic Peeled Whole Baby Carrots]           |2460 |
|[Limes]                                       |6033 |
|[Limes, Large Lemon]                          |1595 |
|[Limes, Banana]                               |1331 |
|[Raspberries]                                 |3279 |
|[Hass Avocado]                                |1633 |
|[Organic Broccoli Florets]                    |1361 |
|[Uncured Genoa Salami]                        |1788 |
|[Spring Water]                                |2225 |
|[Michigan

In [13]:
df_assoc = model.associationRules
df_assoc.show(20)

+----------+----------+----------+----+-------+
|antecedent|consequent|confidence|lift|support|
+----------+----------+----------+----+-------+
+----------+----------+----------+----+-------+



Now retrain the FP-growth model changing only
```minsupport=0.001```.
Compute and print how many frequent itemsets and association rules were generated. Show top 20 rows for both.


In [14]:
''' 6 lines of code in total expected but can differ based on your style. for sub-parts of the question, creating different cells of code would be recommended.'''
# YOUR CODE HERE
fpGrowth_aux = FPGrowth(itemsCol="basket", minSupport=0.001, minConfidence=0.5)
model_aux = fpGrowth_aux.fit(product_order)
df_freq = model_aux.freqItemsets
df_freq.show(20, truncate=False)

+-------------------------------------------------------+----+
|items                                                  |freq|
+-------------------------------------------------------+----+
|[White Cheddar Popcorn]                                |370 |
|[Organic YoKids Very Berry Smoothies]                  |259 |
|[Total 0% Nonfat Greek Yogurt]                         |993 |
|[Total 0% Nonfat Greek Yogurt, Organic Avocado]        |137 |
|[Total 0% Nonfat Greek Yogurt, Bag of Organic Bananas] |157 |
|[Total 0% Nonfat Greek Yogurt, Organic Baby Spinach]   |143 |
|[Total 0% Nonfat Greek Yogurt, Banana]                 |258 |
|[Organic Large Green Asparagus]                        |730 |
|[Organic Large Green Asparagus, Organic Strawberries]  |185 |
|[Organic Large Green Asparagus, Bag of Organic Bananas]|263 |
|[Organic Large Green Asparagus, Organic Baby Spinach]  |136 |
|[Organic Large Green Asparagus, Organic Hass Avocado]  |164 |
|[Organic Cream Cheese Bar]                            

In [15]:
df_assoc = model_aux.associationRules
df_assoc.show(20, truncate=False)

+-----------------------------------------------------------------+------------------------+------------------+------------------+---------------------+
|antecedent                                                       |consequent              |confidence        |lift              |support              |
+-----------------------------------------------------------------+------------------------+------------------+------------------+---------------------+
|[Organic Whole String Cheese, Organic Hass Avocado]              |[Bag of Organic Bananas]|0.5314685314685315|4.504745125675359 |0.0011584571180330617|
|[Organic Broccoli, Organic Hass Avocado]                         |[Bag of Organic Bananas]|0.5048231511254019|4.278897986822536 |0.001196564260073623 |
|[Organic Navel Orange, Organic Raspberries]                      |[Bag of Organic Bananas]|0.5412186379928315|4.587387356098284 |0.0011508356896249496|
|[Organic Kiwi, Organic Hass Avocado]                             |[Bag of Organic

We ask you to inspect the resulting dataframes, and report a few results. Sort frequent items in descending order by frequency and display 50 rows.

In [16]:
# YOUR CODE HERE
df_freq.sort(desc("freq")).show(50, truncate=False)

+----------------------------------------------+-----+
|items                                         |freq |
+----------------------------------------------+-----+
|[Banana]                                      |18726|
|[Bag of Organic Bananas]                      |15480|
|[Organic Strawberries]                        |10894|
|[Organic Baby Spinach]                        |9784 |
|[Large Lemon]                                 |8135 |
|[Organic Avocado]                             |7409 |
|[Organic Hass Avocado]                        |7293 |
|[Strawberries]                                |6494 |
|[Limes]                                       |6033 |
|[Organic Raspberries]                         |5546 |
|[Organic Blueberries]                         |4966 |
|[Organic Whole Milk]                          |4908 |
|[Organic Cucumber]                            |4613 |
|[Organic Zucchini]                            |4589 |
|[Organic Yellow Onion]                        |4290 |
|[Organic 

Also, sort assocation rules in descending order by confidence and display 20 rows.

In [17]:
# YOUR CODE HERE
df_assoc.sort(desc("confidence")).show(20, truncate=False)

+-----------------------------------------------------------------+------------------------+------------------+------------------+---------------------+
|antecedent                                                       |consequent              |confidence        |lift              |support              |
+-----------------------------------------------------------------+------------------------+------------------+------------------+---------------------+
|[Organic Raspberries, Organic Hass Avocado, Organic Strawberries]|[Bag of Organic Bananas]|0.5984251968503937|5.072272070642333 |0.0017376856770495927|
|[Organic Cucumber, Organic Hass Avocado, Organic Strawberries]   |[Bag of Organic Bananas]|0.546875          |4.635330870478036 |0.0010669999771357147|
|[Organic Kiwi, Organic Hass Avocado]                             |[Bag of Organic Bananas]|0.5459770114942529|4.627719489738336 |0.001448071397541327 |
|[Organic Navel Orange, Organic Raspberries]                      |[Bag of Organic

What are your observations by varying support parameters? Answer in 2 sentences.

As minSupport decreases, the number of association rules found tends to increases. minSupport is a threshold for association rules to identified as "frequent enough".

Your next task is to develop the **SON Map Reduce algorithm** to compute frequent itemsets by dividing the data into 10 chunks. Assume the *global* support to be:
```minsupport=0.001```.

Hint:
1. To simulate the SON Map Reduce algorithm for frequent itemset mining in Apache Spark, even if the input file is located on a single node, you can divide the file into smaller chunks (partitions) and process each partition as though it were distributed across multiple nodes. Spark will treat these partitions as separate subsets of the data, effectively mimicking how the SON algorithm would work in a true distributed environment.
2. The repartition function redistributes the data into the desired number of partitions (use 10 in this case). Example:

    ```data_partitioned = data.repartition(10)```

The .repartition(10) method repartitions the DataFrame into 10 partitions, but it does not change the underlying structure of data. It remains a Spark DataFrame, just with a different partitioning scheme.

3. Apply FP-Growth to each partition without converting to RDD to generate candidates. Get the DataFrame for each partition by using e.g., the filter function(). If you want to apply FP-Growth to each partition while keeping everything within the DataFrame API, you must avoid using RDD-based operations like .rdd.mapPartitions().



Perform map and reduce operations using Spark transformations and actions.

 In Phase 1, find candidate itemsets.  

In [33]:
# YOUR CODE HERE (map and reduce operations)
from pyspark.ml.fpm import FPGrowth
NUM_PARTITIONS = 10
GLOBAL_MIN_SUPPORT = 0.001
LOCAL_MIN_SUPPORT = GLOBAL_MIN_SUPPORT/NUM_PARTITIONS

data_partitioned = product_order.repartition(NUM_PARTITIONS, "order_id")
data_partitioned = data_partitioned.withColumn("pid", spark_partition_id())
partitions = []
candidates = []
for pid in range(0,data_partitioned.rdd.getNumPartitions() - 1):
  df = data_partitioned.filter(data_partitioned.pid == pid)
  df = df.drop("pid")
  partitions.append(df)

fpGrowth_son = FPGrowth(itemsCol="basket", minSupport=LOCAL_MIN_SUPPORT, minConfidence=0.5)

for partition in partitions:
  model = fpGrowth_son.fit(partition)
  candidates.append(model.freqItemsets)


Can Phase 1 cause false negatives? Provide an answer including justification in one sentence.

An itemset that belongs to none of the candidates must have frequency less than GLOBAL_MIN_SUPPORT, which means it cannot be false negative at any occasion.

In Phase 2, find true frequent itemsets.

In [45]:
# YOUR CODE HERE (map and reduce operations)
result = candidates[0]
for i in range(1, len(candidates)):
  result = result.union(candidates[i])

result = result.groupBy("items").agg(sum("freq").alias("freq"))
result = result.filter(result.freq >= GLOBAL_MIN_SUPPORT*product_order.count())

What is the benefit of Phase 2? Provide an answer in one sentence.

It removes false positive cases.

:Compute and print how many frequent itemsets were generated.

In [48]:
# YOUR CODE HERE
result.count()

3772

Sort frequent items in descending order by frequency and display 50 rows.

In [49]:
# YOUR CODE HERE
result.sort(desc("freq")).show(50, truncate=False)

+----------------------------------------------+-----+
|items                                         |freq |
+----------------------------------------------+-----+
|[Banana]                                      |16874|
|[Bag of Organic Bananas]                      |13923|
|[Organic Strawberries]                        |9796 |
|[Organic Baby Spinach]                        |8819 |
|[Large Lemon]                                 |7352 |
|[Organic Avocado]                             |6675 |
|[Organic Hass Avocado]                        |6549 |
|[Strawberries]                                |5869 |
|[Limes]                                       |5410 |
|[Organic Raspberries]                         |5038 |
|[Organic Blueberries]                         |4469 |
|[Organic Whole Milk]                          |4433 |
|[Organic Cucumber]                            |4142 |
|[Organic Zucchini]                            |4141 |
|[Organic Yellow Onion]                        |3873 |
|[Organic 

Write a paragraph of conclusions below summarizing your insights.

It seems that the SON mapReduce yields the same result as the regular frequent pattern mining. However, due to the nature of SON mapReduce stating that each partition can be distributed to multiple nodes to be processed, the efficiency of SON mapReduce increases if the size of dataset gets bigger.


Once you obtained the desired results, **head over to eClass and submit your solution for this Colab**!