# INF6953I - C.SPÉC.: Fouille de données 
## TP3 - Summer 2018
### Team Components
    - Patrice Béchard
    - Soufiane Lamghari


## Market Basket Analysis
Market Basket Analysis (MBA) is a well-known data mining technique to uncover associations between products or product grouping. MBA aims to explore interesting patterns from a large collection of data, for example:
millions of supermarket transactions, online orders or credit card history. In other words, MBA allows retailers to identify the relationship between the items that people buy, i.e., reveal patterns of items often purchased together. 

A widely approach to explore these patterns is by constructing $\textit{association rules}$ such as

<center> **if** bought *product1* **then** will buy *product2* with **confidence** *x*. </center>

Then, marketers may use these association rules to allocate correlated products close to each other on store shelves or make online suggestions so that customers buy more items. However, mining association rules for large datasets is a very computationally intensive problem, which makes it almost impractical to perform it without a distributed system.

Hence, your goal in this TP is to create a $\textbf{MapReduce}$ solution for identifying patterns and creating association rules for a big dataset with more than three millions transactions. This algorithm will be running in a distributed cloud computing cluster. Finally, it is expected that, analyzing your results, you should be able to help marketer answers questions such as:

    - What items are often bought together?
    - Given a basket, what items should be suggested?
    - How should items be placed together on the shelves?

### Methodology

This TP will be divided into two steps:
    1. The first step is the implementation, where you will code a MapReduce algorithm for the MBA association rules problem. For this step, a small toy dataset is provided in order to test the developed code.

    2. The second part is to use the algorithm created in step 1 for a dataset with more than three millions of supermarket transactions. In this step, you will use a cloud computing grid to run your experiments.


For the implementation step, we will follow the Market Basket Analysis algorithm presented by Jongwook Woo and Yuhang Xu (2012). The Figure below presents the algorithm workflow for this TP. The blue boxes are the ones where you must implement a method to perform a map or reduce function, and the grey boxes represent their expected output. **All these operations are explained in details in the following sections.**

![scale=0.5](workflow.svg "Algorithm Workflow")

### Setting up Spark

To implement this MapReduce solution you will use a tool called **Apache Spark**, a fast and general-purpose cluster computing system. In a nutshell, [Spark](http://spark.apache.org) is an open source framework designed with a *scale-out* methodology which makes it a very powerful tool for programmers or application developers to perform a huge volume of computations and data processing in distributed environments. Sparks provides high-level APIs that make it easy to build parallel apps. Moreover, Sparks can achieve high-performance computation by using a state-of-the-art (job/stage) scheduler, so you do not need to worry about how your code/data are parallelized/distributed, it does it all for you.

Your first task is to get Spark up and running. 
1. First, go to http://spark.apache.org/downloads 
2. Select the newest Spark release (2.3.0) and the pre-built package type
3. Click for download *spark-2.3.0-bin-hadoop2.7.tgz* and unzip it in any folder of your preference. 

4. Next, write the following two commands in your **~/.bashrc** file:
  - export SPARK_HOME=/path/to/spark-2.3.0-bin-hadoop2.7
  - export PYTHONPATH=\$SPARK_HOME/python:\$SPARK_HOME/python/lib/py4j-0.10.6-src.zip:\$SPARK_HOME/python/lib/pyspark.zip

5. Run the command *source ~/.bashrc*, reopen this jupyter notebook file and execute the next cell where the *pyspark* (Spark python API) is loaded.

In [1]:
from pyspark import SparkContext

# Initialize the spark context.
sc = SparkContext(appName='tp3teamNumber')

# Close the spark context
# sc.stop()

###### Word count Example 

It is part of this TP to study the [Spark python API](https://spark.apache.org/docs/latest/api/python/) and learn how to use it. For that, you will work with the [RDD API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD), a great Spark abstraction to work with the MapReduce framework.  RDD is a collection of elements partitioned across the nodes of the cluster that can operate  in parallel. In other words, RDD is how Spark keeps your data ready to operate some function (e.g., a map or reduce function) in parallel. Do not worry if this still sounds confusing, it will be clear once you starting implementing.

In the next cell, the spark context object was used to read a toy dataset, *toy.csv*, that contains four supermarket transactions, one per line. The *textFile* function returns a RDD object and this is your starting point to work with the RDD API. Some useful functions that the API offers are:

- **map** return a new RDD by applying a function to each element of this RDD.
- **flatMap** return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. **Should be used when each entry will yield more than one mapped element**
- **reduce** reduces the elements of this RDD using the specified commutative and associative binary operator.
- **reduceByKey** merge the values for each key using an associative and commutative reduce function
- **groupByKey** group the values for each key in the RDD into a single sequence
- **collect** return a list that contains all of the elements in this RDD. **Should not be used when working with a lot of data**
- **sample** return a sampled subset of this RDD
- **count** return the number of elements in this RDD.
- **filter** return a new RDD containing only the elements that satisfy a predicate.

In [2]:
def map_to_words(transaction):
    """
    Map each transaction into a set of KEY-VALUE elements.
    The KEY is the word itself and the VALUE is its number of apparitions.
    """
    words = transaction.split(',')
    for w in words:
        yield (w,1)

def reduce_words_by_key(value1, value2):
    "Merge the "
    return value1+value2
        

# Read a toy dataset
transactions = sc.textFile('toy.csv')
print("Transactions:\n\t", transactions.collect())

# Map function to identify words
words = transactions.flatMap(map_to_words)
print("Words Found:\n\t", words.collect())

# Reduce function to merge values of elements that share the same KEY
unique_words = words.reduceByKey(reduce_words_by_key)

print('Word count:')
for uw in unique_words.collect():
    print(uw)

Transactions:
	 ['a,b,c', 'a,b,d', 'b,c', 'b,c']
Words Found:
	 [('a', 1), ('b', 1), ('c', 1), ('a', 1), ('b', 1), ('d', 1), ('b', 1), ('c', 1), ('b', 1), ('c', 1)]
Word count:
('d', 1)
('b', 4)
('a', 2)
('c', 3)


## MBA Algorithm 
 The following sections explain how you should develop each step of the algorithm presented in the figure above. 
### 1. Map to Patterns (10 points)
Given a set of transactions, each transaction is **mapped** into a set of purchase patterns found within the transaction. Formally, these patterns are subsets of products that represent a group of items bought together. 
    
For the MapReduce framework, each pattern must be created as a *KEY-VALUE* element, where they KEY can take the form of a singleton, a pair or a trio of products that are present in the transaction. More precisely, for each transaction, all possible **unique** subsets of size one, two or three must be generated.  The VALUE associated with each KEY is the number of times that the KEY appeared in the transaction (if we assume that no product appears more than once in the transaction, this value is always equal to one). 

Now, implement the  **map_to_patterns** function that receives a transaction (a line of the dataset file) and returns the patterns found in the transaction. It is important to notice that, since each entry (transaction) of the map function will yield more than one KEY-VALUE element, a *flatMap* must be invoked for this step.

For the toy dataset, the expected output is:

<div style="border:1px solid black;white-space: pre;font-size: 9pt; line-height: 1.1; background-color:#f2f2f2; height: auto; width: 30em; padding-left:5px">
(('a',), 1)
(('a', 'b'), 1)
(('a', 'b', 'c'), 1)
(('a', 'c'), 1)
(('b',), 1)
(('b', 'c'), 1)
(('c',), 1)
(('a',), 1)
(('a', 'b'), 1)
(('a', 'b', 'd'), 1)
(('a', 'd'), 1)
(('b',), 1)
(('b', 'd'), 1)
(('d',), 1)
(('b',), 1)
(('b', 'c'), 1)
(('c',), 1)
(('b',), 1)
(('b', 'c'), 1)
(('c',), 1)
</div> 





In [10]:
import itertools

def map_to_patterns(transaction):
    
    words = transaction.split(',')
    for i in range(len(words)):
        for elem in itertools.combinations(words, i+1):
            yield(elem, 1)
    
patterns = transactions.flatMap(map_to_patterns)
for pat in patterns.collect():
    print(pat)


(('a',), 1)
(('b',), 1)
(('c',), 1)
(('a', 'b'), 1)
(('a', 'c'), 1)
(('b', 'c'), 1)
(('a', 'b', 'c'), 1)
(('a',), 1)
(('b',), 1)
(('d',), 1)
(('a', 'b'), 1)
(('a', 'd'), 1)
(('b', 'd'), 1)
(('a', 'b', 'd'), 1)
(('b',), 1)
(('c',), 1)
(('b', 'c'), 1)
(('b',), 1)
(('c',), 1)
(('b', 'c'), 1)


## 2. Reduce patterns (5 points)
Once the transactions were processed by different CPUs, a **reduce** function must take place to combine identical KEYS (the subset of products) and compute the total number of its occurrences in the entire dataset. In other words, this reduce procedure must sum the *VALUE* of each identical KEY.

Create a **reduce_patterns** function below that must sum the VALUE of each pattern.
For the toy dataset, the expected output is:
<div style="border:1px solid black;white-space: pre;font-size: 9pt; line-height: 1.1; background-color:#f2f2f2; height: auto; width: 30em; padding-left:5px">
(('a',), 2)
(('a', 'b'), 2)
(('a', 'b', 'c'), 1)
(('a', 'c'), 1)
(('b',), 4)
(('b', 'c'), 3)
(('c',), 3)
(('a', 'b', 'd'), 1)
(('a', 'd'), 1)
(('b', 'd'), 1)
(('d',), 1)
</div> 




In [11]:
def reduce_patterns(value1, value2):
    return value1 + value2

unique_patterns = patterns.reduceByKey(reduce_patterns)
for unique_pat in unique_patterns.collect():
    print(unique_pat)


(('a', 'd'), 1)
(('b', 'd'), 1)
(('a',), 2)
(('b',), 4)
(('c',), 3)
(('a', 'b'), 2)
(('a', 'c'), 1)
(('b', 'c'), 3)
(('d',), 1)
(('a', 'b', 'c'), 1)
(('a', 'b', 'd'), 1)


## 3. Map to subpatterns (15 points)
Following, another **map** function should be applied to generate subpatterns. Once again, the subpatterns are KEY-VALUE elements, where the KEY is a subset of products as well. However, creating the subpattern's KEY is a different procedure. This time, the idea is to break down the list of products of each pattern (pattern KEY), remove one product at a time, and yield the resulting list as the new subpattern KEY. For example, for a given pattern $P$ with three products, $p_1, p_2 $ and $p_3$, then three new subpatterns KEYs are going to be created: (i) remove $p_1$ and yield ($p_2, p_3$); (ii) remove $p_2$ and yield ($p_1,p_3$); and (iii) remove $p_3$ and yield ($p_1,p_2$). Additionally, the subpattern's VALUE structure will also be different. Instead of just single interger value as we had in the patterns, this time a *tuple* should be created for the subpattern VALUE. This tuple contains the product that was removed when yielding the KEY and the number of times the pattern appeared. For the example above, the values should be ($p_1,v$), ($p_2,v$) and ($p_3,v$), respectively, where $v$ is the VALUE of the pattern. The idea behind subpatterns is to create **rules** such as: when the products of KEY were bought, the item present in the VALUE was also bought $v$ times.

Furthermore, each pattern should also yield a subpattern where the KEY is the same list of products of the pattern, but the VALUE is a tuple with a null product (None) and the number of times the pattern appeared. This element will be useful to keep track of how many times such pattern was found and later will be used to compute the confidence value when generating the association rules. 

Now, implement the  **map_to_subpatterns** function that receives a pattern and yields all found subpatterns. Once again, each entry (pattern) will generate more than one KEY-VALUE element, then a flatMap function must be called.

For the toy dataset, the expected output is:

<div style="border:1px solid black;white-space: pre;font-size: 9pt; line-height: 1.1; background-color:#f2f2f2; height: auto; width: 60em; padding-left:5px">
(('a',), (None, 2))
(('a', 'b'), (None, 2))
(('b',), ('a', 2))
(('a',), ('b', 2))
(('a', 'b', 'c'), (None, 1))
(('b', 'c'), ('a', 1))
(('a', 'c'), ('b', 1))
(('a', 'b'), ('c', 1))
(('a', 'c'), (None, 1))
(('c',), ('a', 1))
(('a',), ('c', 1))
(('b',), (None, 4))
(('b', 'c'), (None, 3))
(('c',), ('b', 3))
(('b',), ('c', 3))
(('c',), (None, 3))
(('a', 'b', 'd'), (None, 1))
(('b', 'd'), ('a', 1))
(('a', 'd'), ('b', 1))
(('a', 'b'), ('d', 1))
(('a', 'd'), (None, 1))
(('d',), ('a', 1))
(('a',), ('d', 1))
(('b', 'd'), (None, 1))
(('d',), ('b', 1))
(('b',), ('d', 1))
(('d',), (None, 1))
</div> 



In [32]:
def map_to_subpatterns(pattern):
    
    keys = pattern[0]
    
    if len(keys) == 1:
        yield(keys, (None, pattern[1]))
    else:
        for i in range(len(keys) + 1):
            
            if i == len(keys):
                yield (keys, (None, pattern[1]))
            else:
                yield(tuple([j for j in keys if j != keys[i]]), (keys[i], pattern[1]))
    
subpatterns = unique_patterns.flatMap(map_to_subpatterns)

for subpat in subpatterns.collect():
    i += 1
    print(subpat)


(('d',), ('a', 1))
(('a',), ('d', 1))
(('a', 'd'), (None, 1))
(('d',), ('b', 1))
(('b',), ('d', 1))
(('b', 'd'), (None, 1))
(('a',), (None, 2))
(('b',), (None, 4))
(('c',), (None, 3))
(('b',), ('a', 2))
(('a',), ('b', 2))
(('a', 'b'), (None, 2))
(('c',), ('a', 1))
(('a',), ('c', 1))
(('a', 'c'), (None, 1))
(('c',), ('b', 3))
(('b',), ('c', 3))
(('b', 'c'), (None, 3))
(('d',), (None, 1))
(('b', 'c'), ('a', 1))
(('a', 'c'), ('b', 1))
(('a', 'b'), ('c', 1))
(('a', 'b', 'c'), (None, 1))
(('b', 'd'), ('a', 1))
(('a', 'd'), ('b', 1))
(('a', 'b'), ('d', 1))
(('a', 'b', 'd'), (None, 1))
27


## 4. Reduce Subpattern (5 points)
Once more, a **reduce** function will be required to group all the subpatterns by their KEY. The objective of this reducing procedure is to create a list with all the **rules** that appeared by a KEY. Hence, the expected resulting of the reduce function is also a KEY-VALUE element, where the KEY is the subpattern's KEY and the VALUE is a group containing all the VALUEs of the subpatterns that share the same KEY.

For the toy dataset, the expected output is:

<div style="border:1px solid black;white-space: pre;font-size: 9pt; line-height: 1.1; background-color:#f2f2f2; height: auto; width: 60em; padding-left:5px">
(('a',), [(None, 2), ('b', 2), ('c', 1), ('d', 1)])
(('a', 'b'), [(None, 2), ('c', 1), ('d', 1)])
(('b',), [('a', 2), (None, 4), ('c', 3), ('d', 1)])
(('a', 'b', 'c'), [(None, 1)])
(('b', 'c'), [('a', 1), (None, 3)])
(('a', 'c'), [('b', 1), (None, 1)])
(('c',), [('a', 1), ('b', 3), (None, 3)])
(('a', 'b', 'd'), [(None, 1)])
(('b', 'd'), [('a', 1), (None, 1)])
(('a', 'd'), [('b', 1), (None, 1)])
(('d',), [('a', 1), ('b', 1), (None, 1)])
</div> 




In [None]:
print('to do =)')

## 5. Map to Association Rules (15 points)
Finally, the last step of the algorithm is to create the association rules to perform the market basket analysis. The goal of this map function is to calculate the **confidence** level of buying a product knowing that there is already a set of products in the basket. Thus, the KEY of the subpattern is the set of products placed in the basket and, for each product present in the list of rules, i.e., in the VALUE, the confidence can be calculated as:

\begin{align*}
\frac{\text{number of times the product was bought together with KEY }}{\text{number of times the KEY appeared}}
\end{align*}

For the example given in the figure above, *coffee* was bought 20 times and, in 17 of them, *milk* was bought together. Then, the confidence level of buying *milk* knowing that *coffee* is in the basket is $\frac{17}{20} = 0.85$, which means that in 85% of the times the coffee was bought, milk was purchased as well.

Implement the **map_to_assoc_rules** function that calculates the confidence level for each subpattern.

For the toy dataset, the expected output is:
<div style="border:1px solid black;white-space: pre;font-size: 9pt; line-height: 1.1; background-color:#f2f2f2; height: auto; width: 60em; padding-left:5px">
(('a',), [('b', 1.0), ('c', 0.5), ('d', 0.5)])
(('a', 'b'), [('c', 0.5), ('d', 0.5)])
(('b',), [('a', 0.5), ('c', 0.75), ('d', 0.25)])
(('a', 'b', 'c'), [])
(('b', 'c'), [('a', 0.3333333333333333)])
(('a', 'c'), [('b', 1.0)])
(('c',), [('a', 0.3333333333333333), ('b', 1.0)])
(('a', 'b', 'd'), [])
(('b', 'd'), [('a', 1.0)])
(('a', 'd'), [('b', 1.0)])
(('d',), [('a', 1.0), ('b', 1.0)])
</div>



In [None]:
def map_to_assoc_rules(rule):
    subpatterns = rule[1]
    ocorrences = next(filter(lambda x: (x[0] is None), subpatterns))[1]
#     ocorrences = filter(lambda x: (x[0] is None), subpatterns)[0][1]
    result = []
    for sp in subpatterns:
        if sp[0] is not None:
            result.append((sp[0], float(sp[1]/ocorrences)))
    # print(result)
    return (rule[0], result)

assocRules = rules.map(map_to_assoc_rules)
my_print(assocRules.collect())

## Instacart dataset

With your MBA algorithm ready to be used, now it is time to work on a real dataset. For this second part of the TP, download the [instacart](https://www.instacart.com/datasets/grocery-shopping-2017) dataset and read its [description](https://gist.github.com/jeremystan/c3b39d947d9b88b3ccff3147dbcf6c6b) to understand how the dataset is structured. 

Before applying the developed algorithm on the instacart dataset you must first filter the transactions to be in the same format defined by your algorithm (one transaction per line). A very good Spark API to work with such type of structured data is the [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html#sql), a module that allows you to run SQL queries and/or work with [DataFrame](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame), a distributed collection of data grouped into named columns.

**If you are not familiar with SQL, it is recommended that you follow the [tutorial from W3Schools](https://www.w3schools.com/sql/) to learn the basics.** 

For example, the following code cells use the Spark SQL module initialized by SparkSession to read the orders from the *order_products__train.csv* and the order information from *orders.csv* to construct a dataframe that contains a list of all products ever purchased by each user.

In [4]:
# Initialize the SparkSession
from pyspark.sql import SparkSession
ss = SparkSession(sc)

# Reading the structured data

df_order_prod = ss.read.csv('instacart/order_products__train.csv', header=True, sep=',', inferSchema=True)
print('order_products__train.csv')
df_order_prod.show(5)

df_orders = ss.read.csv('instacart/orders.csv', header=True, sep=',', inferSchema=True)
print('orders.csv')
df_orders.show(5)


order_products__train.csv
+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|       1|     49302|                1|        1|
|       1|     11109|                2|        1|
|       1|     10246|                3|        0|
|       1|     49683|                4|        0|
|       1|     43633|                5|        1|
+--------+----------+-----------------+---------+
only showing top 5 rows

orders.csv
+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
| 2539329|      1|   prior|           1|        2|                8|                  null|
| 2398795|      1|   prior|           2|        3|                7|                  15.0|
|  473747|      1|   prior| 

###### Using SQL

In [5]:
df_order_prod.createOrReplaceTempView("order_prod") # creates a Table order_prod
df_orders.createOrReplaceTempView("orders") # creates a Table orders

df_ex = ss.sql('SELECT o.user_id, COLLECT_LIST(op.product_id) AS products' 
               ' FROM orders o '
               ' INNER JOIN order_prod op ON op.order_id = o.order_id'
               ' GROUP BY user_id ORDER BY o.user_id')

df_ex.show(5)

+-------+--------------------+
|user_id|            products|
+-------+--------------------+
|      1|[196, 25133, 3892...|
|      2|[22963, 7963, 165...|
|      5|[15349, 19057, 16...|
|      7|[12053, 47272, 37...|
|      8|[15937, 5539, 109...|
+-------+--------------------+
only showing top 5 rows



###### Using Dataframe

In [6]:
from pyspark.sql.functions import collect_list


df_ex = df_orders.join(df_order_prod, df_order_prod.order_id == df_orders.order_id, 'inner')\
.groupBy(df_orders.user_id).agg(collect_list(df_order_prod.product_id).alias('products'))\
.orderBy(df_orders.user_id)
                                                                                                                           
df_ex.show(5)

+-------+--------------------+
|user_id|            products|
+-------+--------------------+
|      1|[196, 25133, 3892...|
|      2|[22963, 7963, 165...|
|      5|[15349, 19057, 16...|
|      7|[12053, 47272, 37...|
|      8|[15937, 5539, 109...|
+-------+--------------------+
only showing top 5 rows



## 6. Bonus (5 points) 

To practice the use of Spark SQL module, create a query, using SQL or dataframe, to answer the following questions:

1. Who are the top 10 users with the biggest number of orders? (0.25) 
2. What are the top 10 most purchased products? (0.25)

## 7. Run MBA for the *training* set (25 points)

Using the orders from the *order_products__train.csv*, create a dataframe where each row contain just one column, the transaction, with the list of purchased products.

To convert a dataframe to RDD you can use *dataframe.rdd*. For example, for the df_ex dataframe, one could desire to only work with a RDD containing the first product of each user. To do so, it is enough to run the following code: 

In [7]:
def map_to_first_product(row): # row contains the variables of each row
    return row.products[0]

prods = df_ex.rdd.map(map_to_first_product)
for p in prods.take(5):
    print(p)

196
22963
15349
12053
15937


Now, create a query to construct the transactions and run locally on your computer

In [None]:
print('to do: query the transactions')
print('run MBA algorithm')

Finally, repeat the same process but now using the Google Cloud Platform (GCP) that each team received access. All the instructions for creating a computing cluster with spark and how to submit a job will be explained in both sessions of the laboratory. In any case, the guide line to perform this task can be found [here](https://cloud.google.com/blog/big-data/2017/02/google-cloud-platform-for-data-scientists-using-jupyter-notebooks-with-apache-spark-on-google-cloud).

You should report here the runtime of each experiment as well the CPU configuration that you used to run locally.

## 8. Run MBA for your custom dataset (25 points)

Each team will receive a custom file on its storage bucket that contains a set of *order_id*. For this last task, you must query both the *order_products__prior* and *order_products__train* files to construct your own set of transactions, i.e., search only for the order_ids you have received. You should report the number of transactions and unique products that you will work with.

Moreover, build a list of unique products appearing on the first 10 orders of your custom file and report the association rules when you have the product alone in the basket. In other words, after running your MBA algorithm, print the association rules where the KEY product (alone) is present in this file. **You should print the product's name, not its ID.**  
       
Once again, you should run this experiment using the GCP and report the execution time.

In [None]:
print('to do: query the transactions')
print('run MBA algorithm')