# INF6953I - C.SPÉC.: Fouille de données 
## TP3 - Summer 2018
### Team Components
    - Gilles Eric Zagre
    - Foromo Daniel Soromou

## Market Basket Analysis
Market Basket Analysis (MBA) is a well-known data mining technique to uncover associations between products or product grouping. MBA aims to explore interesting patterns from a large collection of data, for example:
millions of supermarket transactions, online orders or credit card history. In other words, MBA allows retailers to identify the relationship between the items that people buy, i.e., reveal patterns of items often purchased together. 

A widely approach to explore these patterns is by constructing $\textit{association rules}$ such as

<center> **if** bought *product1* **then** will buy *product2* with **confidence** *x*. </center>

Then, marketers may use these association rules to allocate correlated products close to each other on store shelves or make online suggestions so that customers buy more items. However, mining association rules for large datasets is a very computationally intensive problem, which makes it almost impractical to perform it without a distributed system.

Hence, your goal in this TP is to create a $\textbf{MapReduce}$ solution for identifying patterns and creating association rules for a big dataset with more than three millions transactions. This algorithm will be running in a distributed cloud computing cluster. Finally, it is expected that, analyzing your results, you should be able to help marketer answers questions such as:

    - What items are often bought together?
    - Given a basket, what items should be suggested?
    - How should items be placed together on the shelves?

### Methodology

This TP will be divided into two steps:
    1. The first step is the implementation, where you will code a MapReduce algorithm for the MBA association rules problem. For this step, a small toy dataset is provided in order to test the developed code.

    2. The second part is to use the algorithm created in step 1 for a dataset with more than three millions of supermarket transactions. In this step, you will use a cloud computing grid to run your experiments.


For the implementation step, we will follow the Market Basket Analysis algorithm presented by Jongwook Woo and Yuhang Xu (2012). The Figure below presents the algorithm workflow for this TP. The blue boxes are the ones where you must implement a method to perform a map or reduce function, and the grey boxes represent their expected output. **All these operations are explained in details in the following sections.**

![scale=0.5](workflow.svg "Algorithm Workflow")

### Setting up Spark

To implement this MapReduce solution you will use a tool called **Apache Spark**, a fast and general-purpose cluster computing system. In a nutshell, [Spark](http://spark.apache.org) is an open source framework designed with a *scale-out* methodology which makes it a very powerful tool for programmers or application developers to perform a huge volume of computations and data processing in distributed environments. Sparks provides high-level APIs that make it easy to build parallel apps. Moreover, Sparks can achieve high-performance computation by using a state-of-the-art (job/stage) scheduler, so you do not need to worry about how your code/data are parallelized/distributed, it does it all for you.

Your first task is to get Spark up and running. 
1. First, go to http://spark.apache.org/downloads 
2. Select the newest Spark release (2.3.0) and the pre-built package type
3. Click for download *spark-2.3.0-bin-hadoop2.7.tgz* and unzip it in any folder of your preference. 

4. Next, write the following two commands in your **~/.bashrc** file:
  - export SPARK_HOME=/path/to/spark-2.3.0-bin-hadoop2.7
  - export PYTHONPATH=\$SPARK_HOME/python:\$SPARK_HOME/python/lib/py4j-0.10.6-src.zip:\$SPARK_HOME/python/lib/pyspark.zip

5. Run the command *source ~/.bashrc*, reopen this jupyter notebook file and execute the next cell where the *pyspark* (Spark python API) is loaded.

In [None]:
!pip uninstall --yes pyspark 

In [None]:
!pip install -Iv pyspark==2.3.0

In [None]:
!pip install pyspark

In [109]:
from pyspark import SparkContext

# Initialize the spark context.
sc = SparkContext(appName='tp3team7')

# Close the spark context
# sc.stop()

###### Word count Example 

It is part of this TP to study the [Spark python API](https://spark.apache.org/docs/latest/api/python/) and learn how to use it. For that, you will work with the [RDD API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD), a great Spark abstraction to work with the MapReduce framework.  RDD is a collection of elements partitioned across the nodes of the cluster that can operate  in parallel. In other words, RDD is how Spark keeps your data ready to operate some function (e.g., a map or reduce function) in parallel. Do not worry if this still sounds confusing, it will be clear once you starting implementing.

In the next cell, the spark context object was used to read a toy dataset, *toy.csv*, that contains four supermarket transactions, one per line. The *textFile* function returns a RDD object and this is your starting point to work with the RDD API. Some useful functions that the API offers are:

- **map** return a new RDD by applying a function to each element of this RDD.
- **flatMap** return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. **Should be used when each entry will yield more than one mapped element**
- **reduce** reduces the elements of this RDD using the specified commutative and associative binary operator.
- **reduceByKey** merge the values for each key using an associative and commutative reduce function
- **groupByKey** group the values for each key in the RDD into a single sequence
- **collect** return a list that contains all of the elements in this RDD. **Should not be used when working with a lot of data**
- **sample** return a sampled subset of this RDD
- **count** return the number of elements in this RDD.
- **filter** return a new RDD containing only the elements that satisfy a predicate.

In [110]:
def map_to_words(transaction):
    """
    Map each transaction into a set of KEY-VALUE elements.
    The KEY is the word itself and the VALUE is its number of apparitions.
    """
    words = transaction.split(',')
    for w in words:
        yield (w,1)

def reduce_words_by_key(value1, value2):
    "Merge the "
    return value1+value2

    
# Read a toy dataset
transactions = sc.textFile('gs://bucket-team7/notebooks/toy.csv')
print("Transactions:\n\t", transactions.collect())

# Map function to identify words
words = transactions.flatMap(map_to_words)
print("Words Found:\n\t", words.collect())

# Reduce function to merge values of elements that share the same KEY
unique_words = words.reduceByKey(reduce_words_by_key)

print('Word count:')
for uw in unique_words.collect():
    print(uw)

Transactions:
	 ['a,b,c', 'a,b,d', 'b,c', 'b,c']
Words Found:
	 [('a', 1), ('b', 1), ('c', 1), ('a', 1), ('b', 1), ('d', 1), ('b', 1), ('c', 1), ('b', 1), ('c', 1)]
Word count:
('b', 4)
('c', 3)
('d', 1)
('a', 2)


## MBA Algorithm 
 The following sections explain how you should develop each step of the algorithm presented in the figure above. 
### 1. Map to Patterns (10 points)
Given a set of transactions, each transaction is **mapped** into a set of purchase patterns found within the transaction. Formally, these patterns are subsets of products that represent a group of items bought together. 
    
For the MapReduce framework, each pattern must be created as a *KEY-VALUE* element, where they KEY can take the form of a singleton, a pair or a trio of products that are present in the transaction. More precisely, for each transaction, all possible **unique** subsets of size one, two or three must be generated.  The VALUE associated with each KEY is the number of times that the KEY appeared in the transaction (if we assume that no product appears more than once in the transaction, this value is always equal to one). 

Now, implement the  **map_to_patterns** function that receives a transaction (a line of the dataset file) and returns the patterns found in the transaction. It is important to notice that, since each entry (transaction) of the map function will yield more than one KEY-VALUE element, a *flatMap* must be invoked for this step.

For the toy dataset, the expected output is:

<div style="border:1px solid black;white-space: pre;font-size: 9pt; line-height: 1.1; background-color:#f2f2f2; height: auto; width: 30em; padding-left:5px">
(('a',), 1)
(('a', 'b'), 1)
(('a', 'b', 'c'), 1)
(('a', 'c'), 1)
(('b',), 1)
(('b', 'c'), 1)
(('c',), 1)
(('a',), 1)
(('a', 'b'), 1)
(('a', 'b', 'd'), 1)
(('a', 'd'), 1)
(('b',), 1)
(('b', 'd'), 1)
(('d',), 1)
(('b',), 1)
(('b', 'c'), 1)
(('c',), 1)
(('b',), 1)
(('b', 'c'), 1)
(('c',), 1)
</div> 





In [111]:
def map_to_patterns(transaction):
    words = transaction.split(',')
    for i in range(len(words)):
        yield ((words[i],),1)
        for j in range(i+1, len(words)):
            yield ((words[i],words[j]),1)
            for k in range(j+1, len(words)): 
                yield ((words[i],words[j],words[k]),1)

# Map function to identify patterns
patterns = transactions.flatMap(map_to_patterns)

print("Patterns found:")
for uw in patterns.collect():
    print(uw)

Patterns found:
(('a',), 1)
(('a', 'b'), 1)
(('a', 'b', 'c'), 1)
(('a', 'c'), 1)
(('b',), 1)
(('b', 'c'), 1)
(('c',), 1)
(('a',), 1)
(('a', 'b'), 1)
(('a', 'b', 'd'), 1)
(('a', 'd'), 1)
(('b',), 1)
(('b', 'd'), 1)
(('d',), 1)
(('b',), 1)
(('b', 'c'), 1)
(('c',), 1)
(('b',), 1)
(('b', 'c'), 1)
(('c',), 1)


## 2. Reduce patterns (5 points)
Once the transactions were processed by different CPUs, a **reduce** function must take place to combine identical KEYS (the subset of products) and compute the total number of its occurrences in the entire dataset. In other words, this reduce procedure must sum the *VALUE* of each identical KEY.

Create a **reduce_patterns** function below that must sum the VALUE of each pattern.
For the toy dataset, the expected output is:
<div style="border:1px solid black;white-space: pre;font-size: 9pt; line-height: 1.1; background-color:#f2f2f2; height: auto; width: 30em; padding-left:5px">
(('a',), 2)
(('a', 'b'), 2)
(('a', 'b', 'c'), 1)
(('a', 'c'), 1)
(('b',), 4)
(('b', 'c'), 3)
(('c',), 3)
(('a', 'b', 'd'), 1)
(('a', 'd'), 1)
(('b', 'd'), 1)
(('d',), 1)
</div> 




In [112]:
def reduce_patterns_by_key(value1, value2):
    "Merge the "
    return value1+value2

# Reduce function to merge values of elements that share the same KEY
unique_patterns = patterns.reduceByKey(reduce_patterns_by_key)

print('Unique patterns count:')
for uw in unique_patterns.collect():
    print(uw)

Unique patterns count:
(('a',), 2)
(('a', 'b', 'c'), 1)
(('b', 'c'), 3)
(('a', 'b', 'd'), 1)
(('b', 'd'), 1)
(('b',), 4)
(('c',), 3)
(('a', 'b'), 2)
(('a', 'c'), 1)
(('a', 'd'), 1)
(('d',), 1)


## 3. Map to subpatterns (15 points)
Following, another **map** function should be applied to generate subpatterns. Once again, the subpatterns are KEY-VALUE elements, where the KEY is a subset of products as well. However, creating the subpattern's KEY is a different procedure. This time, the idea is to break down the list of products of each pattern (pattern KEY), remove one product at a time, and yield the resulting list as the new subpattern KEY. For example, for a given pattern $P$ with three products, $p_1, p_2 $ and $p_3$, then three new subpatterns KEYs are going to be created: (i) remove $p_1$ and yield ($p_2, p_3$); (ii) remove $p_2$ and yield ($p_1,p_3$); and (iii) remove $p_3$ and yield ($p_1,p_2$). Additionally, the subpattern's VALUE structure will also be different. Instead of just single interger value as we had in the patterns, this time a *tuple* should be created for the subpattern VALUE. This tuple contains the product that was removed when yielding the KEY and the number of times the pattern appeared. For the example above, the values should be ($p_1,v$), ($p_2,v$) and ($p_3,v$), respectively, where $v$ is the VALUE of the pattern. The idea behind subpatterns is to create **rules** such as: when the products of KEY were bought, the item present in the VALUE was also bought $v$ times.

Furthermore, each pattern should also yield a subpattern where the KEY is the same list of products of the pattern, but the VALUE is a tuple with a null product (None) and the number of times the pattern appeared. This element will be useful to keep track of how many times such pattern was found and later will be used to compute the confidence value when generating the association rules. 

Now, implement the  **map_to_subpatterns** function that receives a pattern and yields all found subpatterns. Once again, each entry (pattern) will generate more than one KEY-VALUE element, then a flatMap function must be called.

For the toy dataset, the expected output is:

<div style="border:1px solid black;white-space: pre;font-size: 9pt; line-height: 1.1; background-color:#f2f2f2; height: auto; width: 60em; padding-left:5px">
(('a',), (None, 2))
(('a', 'b'), (None, 2))
(('b',), ('a', 2))
(('a',), ('b', 2))
(('a', 'b', 'c'), (None, 1))
(('b', 'c'), ('a', 1))
(('a', 'c'), ('b', 1))
(('a', 'b'), ('c', 1))
(('a', 'c'), (None, 1))
(('c',), ('a', 1))
(('a',), ('c', 1))
(('b',), (None, 4))
(('b', 'c'), (None, 3))
(('c',), ('b', 3))
(('b',), ('c', 3))
(('c',), (None, 3))
(('a', 'b', 'd'), (None, 1))
(('b', 'd'), ('a', 1))
(('a', 'd'), ('b', 1))
(('a', 'b'), ('d', 1))
(('a', 'd'), (None, 1))
(('d',), ('a', 1))
(('a',), ('d', 1))
(('b', 'd'), (None, 1))
(('d',), ('b', 1))
(('b',), ('d', 1))
(('d',), (None, 1))
</div> 



In [113]:
def map_to_subpatterns(pattern):
    yield (pattern[0],(None,pattern[1]))
    if len(pattern[0])>1:
        for i in range(len(pattern[0])): 
            temp = list(pattern[0])
            temp.pop(i)
            temp = tuple(temp)
            yield (temp,(pattern[0][i],pattern[1]))

# Map function to identify subpatterns in unique patterns
subpatterns = unique_patterns.flatMap(map_to_subpatterns)

print("Subpatterns found:")
for uw in subpatterns.collect():
    print(uw)

Subpatterns found:
(('a',), (None, 2))
(('a', 'b', 'c'), (None, 1))
(('b', 'c'), ('a', 1))
(('a', 'c'), ('b', 1))
(('a', 'b'), ('c', 1))
(('b', 'c'), (None, 3))
(('c',), ('b', 3))
(('b',), ('c', 3))
(('a', 'b', 'd'), (None, 1))
(('b', 'd'), ('a', 1))
(('a', 'd'), ('b', 1))
(('a', 'b'), ('d', 1))
(('b', 'd'), (None, 1))
(('d',), ('b', 1))
(('b',), ('d', 1))
(('a', 'b'), (None, 2))
(('b',), ('a', 2))
(('a',), ('b', 2))
(('a', 'c'), (None, 1))
(('c',), ('a', 1))
(('a',), ('c', 1))
(('b',), (None, 4))
(('c',), (None, 3))
(('a', 'd'), (None, 1))
(('d',), ('a', 1))
(('a',), ('d', 1))
(('d',), (None, 1))


## 4. Reduce Subpattern (5 points)
Once more, a **reduce** function will be required to group all the subpatterns by their KEY. The objective of this reducing procedure is to create a list with all the **rules** that appeared by a KEY. Hence, the expected resulting of the reduce function is also a KEY-VALUE element, where the KEY is the subpattern's KEY and the VALUE is a group containing all the VALUEs of the subpatterns that share the same KEY.

For the toy dataset, the expected output is:

<div style="border:1px solid black;white-space: pre;font-size: 9pt; line-height: 1.1; background-color:#f2f2f2; height: auto; width: 60em; padding-left:5px">
(('a',), [(None, 2), ('b', 2), ('c', 1), ('d', 1)])
(('a', 'b'), [(None, 2), ('c', 1), ('d', 1)])
(('b',), [('a', 2), (None, 4), ('c', 3), ('d', 1)])
(('a', 'b', 'c'), [(None, 1)])
(('b', 'c'), [('a', 1), (None, 3)])
(('a', 'c'), [('b', 1), (None, 1)])
(('c',), [('a', 1), ('b', 3), (None, 3)])
(('a', 'b', 'd'), [(None, 1)])
(('b', 'd'), [('a', 1), (None, 1)])
(('a', 'd'), [('b', 1), (None, 1)])
(('d',), [('a', 1), ('b', 1), (None, 1)])
</div> 




In [114]:
def group_subpatterns_by_key(value1, value2):
    if not isinstance(value1, list):
        value1 = [value1]
    if not isinstance(value2, list):
        value2 = [value2]
    return value1+value2   

def map_to_subpatterns2(pattern):
    yield (pattern[0],[(None,pattern[1])])
    if len(pattern[0])>1:
        for i in range(len(pattern[0])): 
            temp = list(pattern[0])
            temp.pop(i)
            temp = tuple(temp)
            yield (temp,[(pattern[0][i],pattern[1])])

# Map function to identify subpatterns in unique patterns
subpatterns = unique_patterns.flatMap(map_to_subpatterns2)
# Reduce function to merge values of elements that share the same KEY
grouped_subpatterns = subpatterns.reduceByKey(group_subpatterns_by_key)

print('Grouped Subpatterns:')
for uw in grouped_subpatterns.collect():
    print(uw)

Grouped Subpatterns:
(('a',), [('b', 2), ('c', 1), ('d', 1), (None, 2)])
(('a', 'b', 'c'), [(None, 1)])
(('b', 'c'), [('a', 1), (None, 3)])
(('a', 'b', 'd'), [(None, 1)])
(('b', 'd'), [('a', 1), (None, 1)])
(('a', 'c'), [('b', 1), (None, 1)])
(('a', 'b'), [('c', 1), ('d', 1), (None, 2)])
(('c',), [('b', 3), ('a', 1), (None, 3)])
(('b',), [('c', 3), ('d', 1), ('a', 2), (None, 4)])
(('a', 'd'), [('b', 1), (None, 1)])
(('d',), [('b', 1), ('a', 1), (None, 1)])


## 5. Map to Association Rules (15 points)
Finally, the last step of the algorithm is to create the association rules to perform the market basket analysis. The goal of this map function is to calculate the **confidence** level of buying a product knowing that there is already a set of products in the basket. Thus, the KEY of the subpattern is the set of products placed in the basket and, for each product present in the list of rules, i.e., in the VALUE, the confidence can be calculated as:

\begin{align*}
\frac{\text{number of times the product was bought together with KEY }}{\text{number of times the KEY appeared}}
\end{align*}

For the example given in the figure above, *coffee* was bought 20 times and, in 17 of them, *milk* was bought together. Then, the confidence level of buying *milk* knowing that *coffee* is in the basket is $\frac{17}{20} = 0.85$, which means that in 85% of the times the coffee was bought, milk was purchased as well.

Implement the **map_to_assoc_rules** function that calculates the confidence level for each subpattern.

For the toy dataset, the expected output is:
<div style="border:1px solid black;white-space: pre;font-size: 9pt; line-height: 1.1; background-color:#f2f2f2; height: auto; width: 60em; padding-left:5px">
(('a',), [('b', 1.0), ('c', 0.5), ('d', 0.5)])
(('a', 'b'), [('c', 0.5), ('d', 0.5)])
(('b',), [('a', 0.5), ('c', 0.75), ('d', 0.25)])
(('a', 'b', 'c'), [])
(('b', 'c'), [('a', 0.3333333333333333)])
(('a', 'c'), [('b', 1.0)])
(('c',), [('a', 0.3333333333333333), ('b', 1.0)])
(('a', 'b', 'd'), [])
(('b', 'd'), [('a', 1.0)])
(('a', 'd'), [('b', 1.0)])
(('d',), [('a', 1.0), ('b', 1.0)])
</div>



In [115]:
def map_to_assoc_rules(grouped_subpattern):
    subpatterns = grouped_subpattern[1]
    ocurrences = next(filter(lambda x: (x[0] is None), subpatterns))[1]
    result = []
    for sp in subpatterns:
        if sp[0] is not None:
            result.append((sp[0], float(sp[1]/ocurrences)))
    return (grouped_subpattern[0], result)

# Define associations rules (probabilities) identified in subpatterns
assocRules = grouped_subpatterns.map(map_to_assoc_rules)

print('Association rules:')
for uw in assocRules.collect():
    print(uw)

Association rules:
(('a',), [('b', 1.0), ('c', 0.5), ('d', 0.5)])
(('a', 'b', 'c'), [])
(('b', 'c'), [('a', 0.3333333333333333)])
(('a', 'b', 'd'), [])
(('b', 'd'), [('a', 1.0)])
(('a', 'c'), [('b', 1.0)])
(('a', 'b'), [('c', 0.5), ('d', 0.5)])
(('c',), [('b', 1.0), ('a', 0.3333333333333333)])
(('b',), [('c', 0.75), ('d', 0.25), ('a', 0.5)])
(('a', 'd'), [('b', 1.0)])
(('d',), [('b', 1.0), ('a', 1.0)])


## Instacart dataset

With your MBA algorithm ready to be used, now it is time to work on a real dataset. For this second part of the TP, download the [instacart](https://www.instacart.com/datasets/grocery-shopping-2017) dataset and read its [description](https://gist.github.com/jeremystan/c3b39d947d9b88b3ccff3147dbcf6c6b) to understand how the dataset is structured. 

Before applying the developed algorithm on the instacart dataset you must first filter the transactions to be in the same format defined by your algorithm (one transaction per line). A very good Spark API to work with such type of structured data is the [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html#sql), a module that allows you to run SQL queries and/or work with [DataFrame](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame), a distributed collection of data grouped into named columns.

**If you are not familiar with SQL, it is recommended that you follow the [tutorial from W3Schools](https://www.w3schools.com/sql/) to learn the basics.** 

For example, the following code cells use the Spark SQL module initialized by SparkSession to read the orders from the *order_products__train.csv* and the order information from *orders.csv* to construct a dataframe that contains a list of all products ever purchased by each user.

In [116]:
# Initialize the SparkSession
from pyspark.sql import SparkSession
ss = SparkSession(sc)

# Reading the structured data
df_order_prod = ss.read.csv('gs://bucket-team7/notebooks/instacart_2017_05_01/order_products__train.csv', header=True, sep=',', inferSchema=True)
print('order_products__train.csv')
df_order_prod.show(5)

df_orders = ss.read.csv('gs://bucket-team7/notebooks/instacart_2017_05_01/orders.csv', header=True, sep=',', inferSchema=True)
print('orders.csv')
df_orders.show(5)


order_products__train.csv
+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|       1|     49302|                1|        1|
|       1|     11109|                2|        1|
|       1|     10246|                3|        0|
|       1|     49683|                4|        0|
|       1|     43633|                5|        1|
+--------+----------+-----------------+---------+
only showing top 5 rows

orders.csv
+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
| 2539329|      1|   prior|           1|        2|                8|                  null|
| 2398795|      1|   prior|           2|        3|                7|                  15.0|
|  473747|      1|   prior| 

###### Using SQL

In [117]:
df_order_prod.createOrReplaceTempView("order_prod") # creates a Table order_prod
df_orders.createOrReplaceTempView("orders") # creates a Table orders

df_ex = ss.sql('SELECT o.user_id, COLLECT_LIST(op.product_id) AS products' 
               ' FROM orders o '
               ' INNER JOIN order_prod op ON op.order_id = o.order_id'
               ' GROUP BY user_id ORDER BY o.user_id')

df_ex.show(5)

+-------+--------------------+
|user_id|            products|
+-------+--------------------+
|      1|[196, 25133, 3892...|
|      2|[22963, 7963, 165...|
|      5|[15349, 19057, 16...|
|      7|[12053, 47272, 37...|
|      8|[15937, 5539, 109...|
+-------+--------------------+
only showing top 5 rows



###### Using Dataframe

In [10]:
from pyspark.sql.functions import collect_list

df_ex = df_orders.join(df_order_prod, df_order_prod.order_id == df_orders.order_id, 'inner')\
.groupBy(df_orders.user_id).agg(collect_list(df_order_prod.product_id).alias('products'))\
.orderBy(df_orders.user_id)
                                                                                                                           
df_ex.show(5)

+-------+--------------------+
|user_id|            products|
+-------+--------------------+
|      1|[196, 25133, 3892...|
|      2|[22963, 7963, 165...|
|      5|[15349, 19057, 16...|
|      7|[12053, 47272, 37...|
|      8|[15937, 5539, 109...|
+-------+--------------------+
only showing top 5 rows



## 6. Bonus (5 points) 

To practice the use of Spark SQL module, create a query, using SQL or dataframe, to answer the following questions:

1. Who are the top 10 users with the biggest number of orders? (0.25) 
2. What are the top 10 most purchased products? (0.25)

<font color=blue>**Top 10 users with the biggest number of orders:**

In [13]:
df_ex2 = ss.sql('SELECT o.user_id, o.order_number' 
               ' FROM orders o '
               ' ORDER BY o.order_number DESC')
print('Top 10 users (user_id) with corresponding highest number of orders (order_number):')
df_ex2.show(10)

Top 10 users (user_id) with corresponding highest number of orders (order_number):
+-------+------------+
|user_id|order_number|
+-------+------------+
|  42520|         100|
|  43529|         100|
|  42711|         100|
|  40817|         100|
|  42920|         100|
|  41356|         100|
|  42971|         100|
|  41952|         100|
|  42975|         100|
|  42037|         100|
+-------+------------+
only showing top 10 rows



<font color=blue>**Top 10 most purchased products:**

In [14]:
df_ex3 = ss.sql('SELECT op.product_id, COUNT(op.order_id) AS nb FROM order_prod op'
                ' GROUP BY product_id ORDER by nb DESC')

print('Top 10 products (product_id) with highest number of orders:')
df_ex3.show(10)

Top 10 products (product_id) with highest number of orders:
+----------+-----+
|product_id|   nb|
+----------+-----+
|     24852|18726|
|     13176|15480|
|     21137|10894|
|     21903| 9784|
|     47626| 8135|
|     47766| 7409|
|     47209| 7293|
|     16797| 6494|
|     26209| 6033|
|     27966| 5546|
+----------+-----+
only showing top 10 rows



## 7. Run MBA for the *training* set (25 points)

Using the orders from the *order_products__train.csv*, create a dataframe where each row contain just one column, the transaction, with the list of purchased products.

To convert a dataframe to RDD you can use *dataframe.rdd*. For example, for the df_ex dataframe, one could desire to only work with a RDD containing the first product of each user. To do so, it is enough to run the following code: 

In [123]:
def map_to_first_product(row): # row contains the variables of each row
    return row.products[0]

prods = df_ex.rdd.map(map_to_first_product)
for p in prods.take(5):
    print(p)

196
22963
15349
12053
15937


Now, create a query to construct the transactions and run locally on your computer

<font color=blue>**7.1 - Query the transactions**

<font color=blue>First, let's create a dataframe with only the order_id (transaction) and the list of products in that order as entries (FOR VERIFICTION):

In [124]:
df_ex4 = ss.sql('SELECT op.order_id AS transaction, COLLECT_LIST(op.product_id) AS products' 
               ' FROM order_prod op '
               ' GROUP BY order_id ORDER BY transaction')

df_ex4.show(10)

+-----------+--------------------+
|transaction|            products|
+-----------+--------------------+
|          1|[49302, 11109, 10...|
|         36|[39612, 19660, 49...|
|         38|[11913, 18159, 44...|
|         96|[20574, 30391, 40...|
|         98|[8859, 19731, 436...|
|        112|[27104, 21174, 41...|
|        170|[18394, 37766, 13...|
|        218|[1194, 5578, 3815...|
|        226|[28199, 24852, 29...|
|        349|[33000, 11361, 27...|
+-----------+--------------------+
only showing top 10 rows



<font color=blue>Now, let's create a dataframe with only the list of products per transaction as entries:

In [125]:
df_ex5 = ss.sql('SELECT COLLECT_LIST(op.product_id) AS products' 
               ' FROM order_prod op '
               ' GROUP BY op.order_id ORDER BY op.order_id')

df_ex5.show(10)

+--------------------+
|            products|
+--------------------+
|[49302, 11109, 10...|
|[39612, 19660, 49...|
|[11913, 18159, 44...|
|[20574, 30391, 40...|
|[8859, 19731, 436...|
|[27104, 21174, 41...|
|[18394, 37766, 13...|
|[1194, 5578, 3815...|
|[28199, 24852, 29...|
|[33000, 11361, 27...|
+--------------------+
only showing top 10 rows



<font color=blue>Now let's convert that dataframe to a RDD with only the purchased items in each transaction

In [126]:
def map_to_all_products(row): # row contains the variables of each row
    return ','.join(str(x) for x in row.products) 

# 0- Map function to convert dataframe patterns
prods = df_ex5.rdd.map(map_to_all_products)
    
for p in prods.take(5):
    print(p)

49302,11109,10246,49683,43633,13176,47209,22035
39612,19660,49235,43086,46620,34497,48679,46979
11913,18159,4461,21616,23622,32433,28842,42625,39693
20574,30391,40706,25610,27966,24489,39275
8859,19731,43654,13176,4357,37664,34065,35951,43560,9896,27509,15455,27966,47601,40396,35042,40986,1939,46313,329,30776,36695,27683,15995,27344,47333,48287,45204,24964,18117,46413,34126,9373,22935,46720,44479,790,18441,45007,20520,7461,26317,3880,36364,32463,41387,31066,17747,25659


<font color=blue>**7.2 - Now let's run the MBA algorithm Locally on this dataset**

In [127]:
import time
startCPUtime = time.clock()
start = time.time()

In [128]:
# 1- Map function to identify patterns
patterns = prods.flatMap(map_to_patterns)

print("Patterns found:")
for uw in patterns.take(10):
    print(uw)

Patterns found:
(('49302',), 1)
(('49302', '11109'), 1)
(('49302', '11109', '10246'), 1)
(('49302', '11109', '49683'), 1)
(('49302', '11109', '43633'), 1)
(('49302', '11109', '13176'), 1)
(('49302', '11109', '47209'), 1)
(('49302', '11109', '22035'), 1)
(('49302', '10246'), 1)
(('49302', '10246', '49683'), 1)


In [129]:
# 2- Reduce function to merge values of elements that share the same KEY
unique_patterns = patterns.reduceByKey(reduce_patterns_by_key)

print('Unique patterns count:')
for uw in unique_patterns.take(20):
    print(uw)

Unique patterns count:
(('26209', '11136', '33731'), 3)
(('26209', '28928', '17794'), 1)
(('46906', '21077', '5769'), 1)
(('46906', '24964', '17794'), 4)
(('46906', '19068', '4163'), 1)
(('46886', '19068', '30720'), 1)
(('46886', '5769', '17794'), 1)
(('47042', '11136', '4163'), 1)
(('20114', '19068'), 10)
(('21077', '24964', '17794'), 1)
(('21077', '30720', '39332'), 1)
(('24964', '17794', '19508'), 2)
(('47209', '11136', '33731'), 1)
(('30720', '17794', '19508'), 1)
(('39332', '3627', '23348'), 1)
(('27966', '26736', '45104'), 1)
(('9076', '14992', '8479'), 1)
(('47140', '31022', '16489'), 1)
(('26604', '16797', '23180'), 1)
(('16797', '13208', '16489'), 1)


In [130]:
# 3- Map function to identify subpatterns in unique patterns
subpatterns = unique_patterns.flatMap(map_to_subpatterns)

print("Subpatterns found:")
for uw in subpatterns.take(10):
    print(uw)

Subpatterns found:
(('47672', '7806', '24852'), (None, 1))
(('7806', '24852'), ('47672', 1))
(('47672', '24852'), ('7806', 1))
(('47672', '7806'), ('24852', 1))
(('47672', '47734', '6348'), (None, 1))
(('47734', '6348'), ('47672', 1))
(('47672', '6348'), ('47734', 1))
(('47672', '47734'), ('6348', 1))
(('7806', '6348', '260'), (None, 1))
(('6348', '260'), ('7806', 1))


In [131]:
# 4- Reduce function to merge values of elements that share the same KEY
subpatterns = unique_patterns.flatMap(map_to_subpatterns2)# Map function to identify subpatterns in unique patterns (new format)
grouped_subpatterns = subpatterns.reduceByKey(group_subpatterns_by_key)

print('Grouped Subpatterns:')
for uw in grouped_subpatterns.take(10):
    print(uw)

Grouped Subpatterns:
(('42240', '15623'), [('31683', 1), ('28560', 1), ('35849', 1), ('29529', 1), ('24852', 1), ('24101', 1), (None, 1), ('2113', 1), ('23909', 1), ('43967', 1), ('12564', 1), ('5782', 1), ('196', 1), ('22788', 1), ('36274', 1), ('36133', 1), ('12053', 1), ('41844', 1), ('29615', 1), ('3990', 1), ('2463', 1), ('8680', 1), ('7693', 1), ('8204', 1), ('27568', 1), ('40144', 1), ('5793', 1), ('34886', 1), ('22835', 1), ('25986', 1)])
(('39928', '5479'), [('46069', 1), ('11777', 1), ('14945', 1), ('26369', 2), ('42736', 1), ('28437', 1), ('5876', 2), ('32429', 1), ('17352', 1), ('27086', 1), ('49075', 1), ('30720', 1), ('41065', 1), ('17949', 1), ('11182', 1), ('24838', 1), ('16145', 1), ('49192', 2), ('27913', 1), ('19057', 1), ('26209', 2), ('32747', 1), ('2855', 1), ('13176', 5), ('39619', 1), ('29898', 1), ('8006', 1), ('38452', 1), ('26369', 1), ('34371', 1), ('47119', 1), ('35842', 1), ('16479', 1), ('21137', 1), ('23554', 1), ('46206', 1), ('22825', 1), ('22035', 1),

In [132]:
# 5- Define associations rules (probabilities) identified in subpatterns
assocRules = grouped_subpatterns.map(map_to_assoc_rules)

print('Association rules:')
for uw in assocRules.take(20):
    print(uw)

Association rules:
(('26528', '28560'), [('40648', 1.0), ('6873', 1.0), ('17363', 1.0), ('27086', 1.0), ('34063', 1.0), ('41844', 1.0), ('10049', 1.0), ('11408', 1.0), ('45438', 1.0), ('27104', 1.0), ('47759', 1.0), ('5077', 1.0), ('44123', 1.0), ('44487', 1.0), ('34530', 1.0), ('28227', 1.0), ('28384', 1.0), ('27360', 1.0), ('43772', 1.0), ('31955', 1.0), ('24834', 1.0), ('42024', 1.0), ('26665', 1.0), ('17122', 1.0), ('14381', 1.0), ('31942', 1.0), ('7631', 1.0), ('38739', 1.0), ('35124', 1.0), ('38730', 1.0), ('24852', 1.0)])
(('25242', '28842'), [('22234', 1.0), ('42803', 1.0), ('29127', 1.0), ('39108', 1.0), ('7781', 1.0), ('22935', 1.0), ('24535', 1.0), ('16169', 1.0), ('30391', 1.0), ('16759', 1.0), ('13176', 1.0), ('22899', 1.0), ('43961', 1.0), ('34428', 1.0), ('41842', 1.0), ('44303', 1.0), ('45504', 1.0), ('13629', 1.0), ('6187', 1.0), ('47546', 1.0), ('18441', 1.0), ('48775', 1.0), ('46979', 1.0), ('23191', 1.0), ('11777', 1.0), ('4357', 1.0), ('21137', 1.0), ('34126', 1.0)

In [30]:
endCPUtime = time.clock()
end = time.time()

In [33]:
CPUtime = endCPUtime - startCPUtime
print(' CPUtime:',CPUtime,' Time: ',end-start)

 CPUtime: 3596.459961143392  Time:  3596.460121154785


<font color=blue>Runtime: 3596.4 seconds !!

CPU config: Intel Core i5-4210U CPU @ 1.70GHz with 8GB RAM

Finally, repeat the same process but now using the Google Cloud Platform (GCP) that each team received access. All the instructions for creating a computing cluster with spark and how to submit a job will be explained in both sessions of the laboratory. In any case, the guide line to perform this task can be found [here](https://cloud.google.com/blog/big-data/2017/02/google-cloud-platform-for-data-scientists-using-jupyter-notebooks-with-apache-spark-on-google-cloud).

You should report here the runtime of each experiment as well the CPU configuration that you used to run locally.

<font color=blue>** 7.3 - With the Google Cloud Platform (GCP)**

<font color=blue>We just repeat the same previous process (from step0 to 5) with the GCP with a 1 Master and 30 Workers configuration (124 vCPUs).

In [133]:
# Query the Transactions Only
df_ex5 = ss.sql('SELECT COLLECT_LIST(op.product_id) AS products' 
               ' FROM order_prod op '
               ' GROUP BY op.order_id ORDER BY op.order_id')

print('Transactions table: ')
df_ex5.show(10)

# Map function to convert dataframe patterns
def map_to_all_products(row): 
    return ','.join(str(x) for x in row.products) 

prods = df_ex5.rdd.map(map_to_all_products)
print('\nTransactions RDD: ')
for p in prods.take(5):
    print(p)

# Initiate time
import time
startTime = time.time()

# 1- Map function to identify patterns
patterns = prods.flatMap(map_to_patterns)

# 2- Reduce function to merge values of elements that share the same KEY
unique_patterns = patterns.reduceByKey(reduce_patterns_by_key)
    
# 3- Map function to identify subpatterns in unique patterns
subpatterns = unique_patterns.flatMap(map_to_subpatterns)

# 4- Reduce function to merge values of elements that share the same KEY
subpatterns = unique_patterns.flatMap(map_to_subpatterns2)# Map function to identify subpatterns in unique patterns (new format)
grouped_subpatterns = subpatterns.reduceByKey(group_subpatterns_by_key)
    
# 5- Define associations rules (probabilities) identified in subpatterns
assocRules = grouped_subpatterns.map(map_to_assoc_rules)
print('\nAssociation rules:')
for uw in assocRules.take(5):
    print(uw)
    
endTime = time.time()

CPUtime = endCPUtime - startCPUtime
print('Running Time: ', endTime-startTime)

Transactions table: 
+--------------------+
|            products|
+--------------------+
|[49302, 11109, 10...|
|[39612, 19660, 49...|
|[11913, 18159, 44...|
|[20574, 30391, 40...|
|[8859, 19731, 436...|
|[27104, 21174, 41...|
|[18394, 37766, 13...|
|[1194, 5578, 3815...|
|[28199, 24852, 29...|
|[33000, 11361, 27...|
+--------------------+
only showing top 10 rows


Transactions RDD: 
49302,11109,10246,49683,43633,13176,47209,22035
39612,19660,49235,43086,46620,34497,48679,46979
11913,18159,4461,21616,23622,32433,28842,42625,39693
20574,30391,40706,25610,27966,24489,39275
8859,19731,43654,13176,4357,37664,34065,35951,43560,9896,27509,15455,27966,47601,40396,35042,40986,1939,46313,329,30776,36695,27683,15995,27344,47333,48287,45204,24964,18117,46413,34126,9373,22935,46720,44479,790,18441,45007,20520,7461,26317,3880,36364,32463,41387,31066,17747,25659

Association rules:
(('38164',), [('12206', 0.006507592190889371), ('1194', 0.0021691973969631237), ('5479', 0.006507592190889371), ('190

<font color=blue>Runtime (GCP): 108,4 seconds

CPU config (GCP): GCP  1 Master + 30 Workers configuration (124 vCPUs)

**We see that the results are similar but the runtime has been drastically reduced when compared to the local run (of a factor of almost 30) **

## 8. Run MBA for your custom dataset (25 points)

Each team will receive a custom file on its storage bucket that contains a set of *order_id*. For this last task, you must query both the *order_products__prior* and *order_products__train* files to construct your own set of transactions, i.e., search only for the order_ids you have received. You should report the number of transactions and unique products that you will work with.

Moreover, build a list of unique products appearing on the first 10 orders of your custom file and report the association rules when you have the product alone in the basket. In other words, after running your MBA algorithm, print the association rules where the KEY product (alone) is present in this file. **You should print the product's name, not its ID.**  
       
Once again, you should run this experiment using the GCP and report the execution time.

<font color=blue>**8.1 - First let's query the transactions: **

In [136]:
# Reading the structured data
df_order_prod_train = ss.read.csv('gs://bucket-team7/notebooks/instacart_2017_05_01/order_products__train.csv', header=True, sep=',', inferSchema=True)
print('order_products__train.csv')
df_order_prod_train.show(5)

df_order_prod_prior = ss.read.csv('gs://bucket-team7/notebooks/instacart_2017_05_01/order_products__prior.csv', header=True, sep=',', inferSchema=True)
print('order_products__prior.csv')
df_order_prod_prior.show(5)

df_orders_t7 = ss.read.csv('gs://bucket-team7/orders_team7.csv', header=True, sep=',', inferSchema=True)
print('orders.csv (Team 7)')
df_orders_t7.show(5)


order_products__train.csv
+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|       1|     49302|                1|        1|
|       1|     11109|                2|        1|
|       1|     10246|                3|        0|
|       1|     49683|                4|        0|
|       1|     43633|                5|        1|
+--------+----------+-----------------+---------+
only showing top 5 rows

order_products__prior.csv
+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|       2|     33120|                1|        1|
|       2|     28985|                2|        1|
|       2|      9327|                3|        0|
|       2|     45918|                4|        1|
|       2|     30035|                5|        0|
+--------+----------+-----------------+---------+
only showing top 5 rows

In [138]:
# Create necessary join tables
df_order_prod_train.createOrReplaceTempView("order_prod_T") # creates a Table order_prod_train
df_order_prod_prior.createOrReplaceTempView("order_prod_P") # creates a Table order_prod_prior
df_orders_t7.createOrReplaceTempView("orders_t7") # creates a Table orders list

# Create a table joining Train and Prior Order&Products Tables
df_opFull = ss.sql('SELECT order_prod_T.order_id, order_prod_T.product_id FROM order_prod_T '
                   'UNION '
                   'SELECT order_prod_P.order_id, order_prod_P.product_id FROM order_prod_P '
                   'ORDER BY order_id') 
df_opFull.show(20)
df_opFull.createOrReplaceTempView("order_prod_FULL") # creates a Table order_prod_prior+train

+--------+----------+
|order_id|product_id|
+--------+----------+
|       1|     11109|
|       1|     47209|
|       1|     13176|
|       1|     43633|
|       1|     49683|
|       1|     22035|
|       1|     49302|
|       1|     10246|
|       2|     45918|
|       2|     40141|
|       2|     33120|
|       2|     17794|
|       2|     43668|
|       2|     28985|
|       2|     30035|
|       2|      9327|
|       2|      1819|
|       3|     24838|
|       3|     33754|
|       3|     21903|
+--------+----------+
only showing top 20 rows



In [140]:
# Table of SELECTED order_id vs list of products in the selected orders
df_ex6 = ss.sql('SELECT o.order_id, COLLECT_LIST(op.product_id) AS products' 
               ' FROM orders_t7 o '
               ' INNER JOIN order_prod_FULL op ON op.order_id = o.order_id'
               ' GROUP BY o.order_id ORDER by order_id')

df_ex6.show(20)

+--------+--------------------+
|order_id|            products|
+--------+--------------------+
|       1|[49683, 10246, 22...|
|       5|[41176, 13245, 23...|
|       6|[15873, 40462, 41...|
|       7|      [34050, 46802]|
|      14|[45066, 20392, 16...|
|      15|[19660, 32463, 21...|
|      16|[9755, 45437, 25466]|
|      22|[40593, 19068, 31...|
|      23|[44299, 13497, 32...|
|      27|[38226, 17794, 41...|
|      30|[42944, 48559, 1158]|
|      37|[33059, 43721, 35...|
|      38|[32433, 28842, 18...|
|      40|[42450, 33198, 10...|
|      48|[17600, 14129, 12...|
|      49|[39276, 19677, 20...|
|      57|[21903, 48857, 19...|
|      59|[32549, 46077, 12...|
|      60|[22963, 48628, 36...|
|      62|[10017, 37766, 28...|
+--------+--------------------+
only showing top 20 rows



<font color = blue>Computing the number of distinct orders and products in our analysis:

In [144]:
# Table with selected orders and products
df_ex6_long = ss.sql('SELECT o.order_id, op.product_id AS products' 
                     ' FROM orders_t7 o '
                     ' INNER JOIN order_prod_FULL op ON op.order_id = o.order_id')
df_ex6_long.show(5)
df_ex6_long.createOrReplaceTempView("selected_op") # creates a Table order_prod_prior+train

# Number of distinct orders:
df_oCount = ss.sql('SELECT COUNT(*) FROM (SELECT DISTINCT order_id FROM selected_op)')
df_oCount.show()

# Number of distinct products:
df_oCount = ss.sql('SELECT COUNT(*) FROM (SELECT DISTINCT products FROM selected_op)')
df_oCount.show()


+--------+--------+
|order_id|products|
+--------+--------+
|       1|   49302|
|       1|   43633|
|       1|   22035|
|       1|   13176|
|       1|   11109|
+--------+--------+
only showing top 5 rows

+--------+
|count(1)|
+--------+
| 1004517|
+--------+

+--------+
|count(1)|
+--------+
|   48542|
+--------+



<font color=blue>Our analysis has **1004517** different orders and **48542** different products. (Note that we could have had the same result using a GROUP BY request)

<font color=blue> Now let's take a look at the products in the top 10 orders:

In [145]:
# Table of first 10 orders
df_top10 = ss.sql('SELECT * FROM orders_t7 LIMIT 10')
df_top10.createOrReplaceTempView("top10_orders")
print('First 10 orders: ')
df_top10.show()

# First, show the products appearung in the 10 first orders
df_prod_vs_10 = ss.sql('SELECT o10.order_id, COLLECT_LIST(op.product_id) AS products' 
               ' FROM top10_orders o10'
               ' INNER JOIN order_prod_FULL op ON op.order_id = o10.order_id'
               ' GROUP BY o10.order_id ')
print('\nProducts in the first 10 orders: ')
df_prod_vs_10.show()

First 10 orders: 
+--------+
|order_id|
+--------+
| 2539329|
|  431534|
| 2295261|
| 1501582|
| 1718559|
| 1492625|
| 2037211|
| 1402502|
| 2774568|
| 3343014|
+--------+


Products in the first 10 orders: 
+--------+--------------------+
|order_id|            products|
+--------+--------------------+
| 1402502|[18599, 24810, 23...|
| 1492625|[11913, 45613, 93...|
| 2539329|[196, 14084, 1242...|
|  431534|[10258, 196, 1712...|
| 3343014|[2707, 7350, 3546...|
| 2295261|[12427, 25133, 19...|
| 1718559|[47792, 9124, 120...|
| 1501582|[47766, 8138, 847...|
| 2037211|[16965, 12845, 93...|
+--------+--------------------+



In [146]:
# List of products appearing in first 10 orders
df_ex7 = ss.sql('SELECT op.product_id, COLLECT_LIST(o10.order_id) AS orders' 
               ' FROM order_prod_FULL op '
               ' INNER JOIN top10_orders o10 ON o10.order_id = op.order_id'
               ' GROUP BY product_id')
print('\nUnique products in the first 10 orders: ')
df_ex7.show(20)


Unique products in the first 10 orders: 
+----------+--------------------+
|product_id|              orders|
+----------+--------------------+
|     38928|   [427167, 1494591]|
|     11885|           [2525084]|
|     32478|[422664, 3158632,...|
|     22802|           [2654438]|
|     30292|           [2999252]|
|     26348|           [1611455]|
|     23826|           [2654438]|
|     34148|            [427167]|
|     11759|           [2525084]|
|     21137|            [422664]|
|     13914|[1611455, 3158632...|
|     12341|           [2525084]|
|     30591|           [1611455]|
|     26088|[427167, 3158632,...|
|     27839|           [1611455]|
|      7249|  [2654438, 3158632]|
|     46061|           [1611455]|
|     42500|           [2999252]|
|     43739|           [2999252]|
|       196|[1611455, 697113,...|
+----------+--------------------+
only showing top 20 rows



<font color=blue>Now let's extract and format the transactions only:

In [147]:
# Map function to convert dataframe patterns
def map_to_all_products(row): 
    return ','.join(str(x) for x in row.products) 

prods = df_ex6.rdd.map(map_to_all_products)
print('\nTransactions RDD: ')
for p in prods.take(5):
    print(p)


Transactions RDD: 
10246,13176,49683,49302,43633,22035,47209,11109
13245,23909,24773,48002,27966,37011,8479,48825,41176,47329,9633,48370,18569,20914,48366,6348,13176,47209,27360,12962,6184,46522,15005,45698,40878,38693
40462,15873,41897
46802,34050
39475,20995,27845,45066,20392,162,23032,8575,41890,10096,2452


<font color=blue>**8.2 - Run the MBA algorithm on the selected data **

In [149]:
# Initiate time
import time
startTime = time.time()

# 1- Map function to identify patterns
patterns = prods.flatMap(map_to_patterns)

# 2- Reduce function to merge values of elements that share the same KEY
unique_patterns = patterns.reduceByKey(reduce_patterns_by_key)
    
# 3- Map function to identify subpatterns in unique patterns
subpatterns = unique_patterns.flatMap(map_to_subpatterns)

# 4- Reduce function to merge values of elements that share the same KEY
subpatterns = unique_patterns.flatMap(map_to_subpatterns2)# Map function to identify subpatterns in unique patterns (new format)
grouped_subpatterns = subpatterns.reduceByKey(group_subpatterns_by_key)
    
# 5- Define associations rules (probabilities) identified in subpatterns
assocRules = grouped_subpatterns.map(map_to_assoc_rules)
print('\nAssociation rules:')
for uw in assocRules.take(5):
    print(uw)
    
endTime = time.time()

CPUtime = endCPUtime - startCPUtime
print('Running Time: ', endTime-startTime)


Association rules:
(('18465', '8955'), [('34225', 0.02), ('49533', 0.04), ('6193', 0.02), ('17008', 0.02), ('11352', 0.02), ('11130', 0.02), ('46979', 0.02), ('19370', 0.02), ('6598', 0.02), ('20082', 0.02), ('27730', 0.02), ('3957', 0.02), ('26443', 0.02), ('19057', 0.02), ('1202', 0.02), ('27966', 0.08), ('31717', 0.02), ('30391', 0.08), ('13280', 0.02), ('48437', 0.02), ('41220', 0.02), ('25824', 0.02), ('28123', 0.02), ('44534', 0.02), ('14074', 0.02), ('5618', 0.02), ('18288', 0.02), ('15902', 0.04), ('27690', 0.02), ('33731', 0.04), ('36389', 0.02), ('45013', 0.02), ('47209', 0.1), ('41570', 0.02), ('5451', 0.02), ('43490', 0.02), ('2452', 0.02), ('42183', 0.02), ('46696', 0.02), ('24413', 0.02), ('18441', 0.02), ('20114', 0.04), ('5025', 0.02), ('14462', 0.02), ('46575', 0.02), ('12063', 0.02), ('33195', 0.02), ('16771', 0.02), ('32666', 0.02), ('27392', 0.02), ('31506', 0.06), ('8174', 0.02), ('42617', 0.02), ('27521', 0.06), ('2314', 0.04), ('9755', 0.02), ('13176', 0.02), ('

<font color = blue> We have the associations rules for all combinations of 1, 2 or 3 of the products. Let's print the associations rules for the single products in our 10 first orders only. We will do this at the last reduce step.

In [None]:
# List of products appearing in 10 first orders:
def map_to_all_products(row): 
    return ','.join(str(x) for x in row.products) 

prods10 = df_ex7.rdd.map(map_to_all_products)
print('\nTransactions RDD: ')
for p in prods10.take(5):
    print(p)


Transactions RDD: 


In [None]:
def group_subpatterns_by_key(value1, value2):
    if not isinstance(value1, list):
        value1 = [value1]
    if not isinstance(value2, list):
        value2 = [value2]
    return value1+value2   

def map_to_subpatterns2(pattern):
    yield (pattern[0],[(None,pattern[1])])
    if len(pattern[0])>1:
        for i in range(len(pattern[0])): 
            temp = list(pattern[0])
            temp.pop(i)
            temp = tuple(temp)
            yield (temp,[(pattern[0][i],pattern[1])])

# Map function to identify subpatterns in unique patterns
subpatternsRED = unique_patterns.flatMap(map_to_subpatterns2)
# Reduce function to merge values of elements that share the same KEY
grouped_subpatternsRED = subpatternsRED.reduceByKey(group_subpatterns_by_key)

print('Grouped Subpatterns:')
for uw in grouped_subpatternsRED.take(10)::
    print(uw)
    
# 5- Define associations rules (probabilities) identified in subpatterns
assocRulesRED = grouped_subpatternsRED.map(map_to_assoc_rules)
print('\nAssociation rules:')
for uw in assocRules.take(10):
    print(uw)


In [108]:
# Close the spark context
sc.stop()