# Milestone 2 - Data collection and description
The second task is to intimately acquaint yourself with the data, preprocess it and complete all the necessary descriptive statistics tasks. We expect you to have a pipeline in place, fully documented in a notebook, and show us that you’ve advanced with your understanding of the project goals by updating its README description.

In [8]:
import numpy as np
import pandas as pd
import ast
import matplotlib.pyplot as plt
import json
%matplotlib inline

In [4]:
import findspark
findspark.init()

import pyspark
conf = pyspark.SparkConf()\
    .setMaster('local[*]')\
    .set('spark.executor.memory', '2g')\
    .set('spark.driver.memory', '2g')\
    .set('spark.executor.instances', '4')
    
sc = pyspark.SparkContext(conf=conf)

# Gaining insight into the Amazon product network 

## Overview

The Amazon dataset contains relations among products, such as "also viewed", "also bought", "bought together", "bought after viewing". These links can be used to create a graph that represents products with similar characteristics, that is, products that are viewed together but not bought together.
Our idea is to exploit the dataset in order to create clusters of competing products. These clusters may be used not only to identify the best product in terms of rating and sales within a group, but also to investigate how brands can influence the sales and the prices of similar products.

The dataset is transformed into a graph of relations between products, where the vertices represent products, and edges represent competition between products. For instance, if two products are viewed together (people who viewed product A also viewed product B, and vice versa) but not bought together, they are competitors. On the other hand, two products that are viewed together and bought together are not competitors (e.g. a user buys a smartphone and a cover). A way of expressing this in more formal terms is with max-cliques, that is, finding sets of vertices that are totally interconnected.

## Dataset description

**@show** 
- That you can handle the data in its size.

The Amazon dataset consists of two JSON files: 
- *metadata.json*: contains information about the products, such as their unique ID, description and price. The size of the dataset is 9.81 GB (uncompressed, in JSON format).
- *reviews.json*: contains reviews and ratings associated with each product, as well as the helpfulness of each review. The size of the dataset is approximately 100 GB (again, uncompressed and in JSON format).

Since the dataset does not fit in memory, we cannot process it using libraries such as Pandas, unless we reduce its size first.
Therefore, the initial data processing was carried out using **PySpark**, both on the cluster (especially for the reviews dataset) and in local. While it may seem inappropriate at first, using Spark in local makes sense for medium-sized datasets (like the metadata one), as it automatically parallelizes jobs using all cores, and spills to disk intermediate results that cannot fit in main memory.

### Metadata
The dataset contains a list of entries of products with the following fields (some of them may be missing):
- **asin**: unique ID of the product.
- **title**: name of the product.
- **price**: price in US dollars.
- **imUrl**: URL of the product image.
- **related**: related products, which contains the sub-lists: *also bought, also viewed, bought together, buy after viewing*.
- **salesRank**: sales rank information, i.e. how well the product compares to other products in the same category in terms of sales.
- **brand**: brand name.
- **categories**: the category(-ies) to which the product belongs.

These fields are already sufficient for building our graph, since they contain the above-mentioned relations between products, as well as their IDs and names. Here we present an example of such a record:

In [12]:
sc.textFile('data/metadata.json')\
    .map(lambda x: ast.literal_eval(x))\
    .filter(lambda x: 'price' in x and 'related' in x)\
    .take(1)

[{'asin': '0000143561',
  'categories': [['Movies & TV', 'Movies']],
  'description': '3Pack DVD set - Italian Classics, Parties and Holidays.',
  'imUrl': 'http://g-ecx.images-amazon.com/images/G/01/x-site/icons/no-img-sm._CB192198896_.gif',
  'price': 12.99,
  'related': {'also_viewed': ['B0036FO6SI',
    'B000KL8ODE',
    '000014357X',
    'B0037718RC',
    'B002I5GNVU',
    'B000RBU4BM'],
   'buy_after_viewing': ['B0036FO6SI',
    'B000KL8ODE',
    '000014357X',
    'B0037718RC']},
  'salesRank': {'Movies & TV': 376041},
  'title': 'Everyday Italian (with Giada de Laurentiis), Volume 1 (3 Pack): Italian Classics, Parties, Holidays'}]

Note that we had to use `ast.literal_eval` instead of the JSON library because the dataset is not in standard JSON format. It appears to be in Python's `print() ` format.

Before moving forward, we have to decide whether it makes sense to carry out our analysis on the whole dataset, or if it is more appropriate to select only a subset of all categories. It turns out that the latter is the case: sales in categories such as **Music**, **Books**, or **Clothes** depend on people's personal preferences, and are less (if not at all) prone to competition. On the contrary, electronic products are the ones that are subject to real competition, as clients wants to get the best possible product at the lowest cost. Moreover, this selection step allows us to reduce the size of the dataset and process it more efficiently.

Therefore, the first step consists in listing all categories and selecting those in which we are interested. As can be seen from the example above, categories are represented as hierarchies, e.g. `["Sports & Outdoors", "Other Sports", "Dance"]` means that the product can be found in the category **Sports & Outdoors -> Other Sports -> Dance**. Firstly, we extract the list of macro-categories (top-level categories such as **Sports & Outdoors**) along with their product count, and we inspect them manually. Our goal is to choose macro-categories containing products that can be objectively compared in terms of features and characteristics, such as *Electronics* or *Cell phones*. Converseley, categories of which the purchase decision is subjective (e.g. clothes and books) are discarded.

In [13]:
# Get all top-level categories along with their product count
categories_macro = sc.textFile('data/metadata.json')\
    .map(lambda x: ast.literal_eval(x))\
    .filter(lambda x: 'categories' in x)\
    .flatMap(lambda x: x['categories'])\
    .map(lambda x: x[0])\
    .map(lambda x: (x, 1))\
    .reduceByKey(lambda x, y: x + y)\
    .collect()

In [19]:
# We show the top categories sorted by product count
df = pd.DataFrame(categories_macro)
df.columns = ['category', 'count']
df.sort_values('count', ascending=False).head(30)

Unnamed: 0,category,count
49,"Clothing, Shoes & Jewelry",3429257
13,Books,2855617
66,CDs & Vinyl,1523001
30,Kindle Store,1088341
72,Sports & Outdoors,543514
73,Electronics,500600
52,Home & Kitchen,437019
32,Cell Phones & Accessories,357693
36,Toys & Games,336460
9,Automotive,331484


According to our consideration, we have decided to include the following macro categories in our analysis: **Electronics**, **Cell Phones & Accessories**, **Automotive**, **Tools & Home Improvement**, and **Musical Instruments**. The choice is motivated both by their sizes and by the fact that they represent products that are comparable.

Now, we want to have a more detailed description of the categories. Therefore, for each macro-category, we convert the category lists within each product to a tree. We do this in a distributed way, using Spark. The job works as follows:
- **Map phase:** each list is converted to a tree. For instance:

In [25]:
# Transforms a category list into a flat tree (a tree with a linked list topology)
def convert_to_tree(elements):
    root = {}
    node = root
    for element in elements:
        node[element] = (1, {}) # Tuple: (product count, children)
        node = node[element][1]
    return root

In [26]:
convert_to_tree(["Sports & Outdoors", "Other Sports", "Dance"])

{'Sports & Outdoors': (1, {'Other Sports': (1, {'Dance': (1, {})})})}

Each category includes the product count along with its subcategories (children). The product count within a category includes the sum of the product counts of all its children.

- **Reduce phase:** all trees are merged together recursively, creating a huge category tree that reflects the entire dataset.

In [23]:
# Merge nodes
def merge_trees(a, b):
    for key in b:
        if key in a:
            a[key] = (a[key][0] + b[key][0], a[key][1])
            merge_trees(a[key][1], b[key][1])
        else:
            a[key] = b[key]
    return a

In [27]:
a = convert_to_tree(["Sports & Outdoors", "Other Sports", "Dance"])
b = convert_to_tree(["Sports & Outdoors", "Supplies"])
merge_trees(a, b)

{'Sports & Outdoors': (2,
  {'Other Sports': (1, {'Dance': (1, {})}), 'Supplies': (1, {})})}

Now we run the actual job on Spark.

In [28]:
# Build the category tree
category_tree = sc.textFile('data/metadata.json')\
    .map(lambda x: ast.literal_eval(x))\
    .filter(lambda x: 'categories' in x)\
    .flatMap(lambda x: x['categories'])\
    .map(convert_to_tree)\
    .reduce(merge_trees)

In the example below, the subtree of *Cell Phones & Accessories* is shown, with product count for each sub category. As can be seen, sub categories may differ significantly in terms of belonging products. Therefore, some heuristics may be necessary to group categories that contains a small number of products.

In [33]:
# Each node represents a (count, children) tuple
category_tree['Cell Phones & Accessories']

(357693,
 {'Accessories': (109763,
   {'Accessory Kits': (26545, {}),
    'Audio Adapters': (497, {}),
    'Batteries': (9882,
     {'Battery Charger Cases': (560, {}),
      'External Battery Packs': (2079, {}),
      'Internal Batteries': (6651, {})}),
    'Bluetooth Speakers': (782, {}),
    'Car Accessories': (5863,
     {'Car Cradles & Mounts': (4705,
       {'Car Cradles': (424, {}), 'Car Mounts': (4189, {})}),
      'Car Kits': (845, {}),
      'Car Speakerphones': (312, {})}),
    'Chargers': (17463,
     {'Car Chargers': (7650, {}),
      'Cell Phone Docks': (1894, {}),
      'International Chargers': (169, {}),
      'Solar Chargers': (284, {}),
      'Travel Chargers': (7122, {})}),
    'Cradles, Mounts & Stands': (48, {'Stands': (47, {})}),
    'Data Cables': (6671, {}),
    'Headsets': (10197,
     {'Bluetooth Headsets': (5066, {}), 'Wired Headsets': (5023, {})}),
    'Phone Charms': (3073, {}),
    'Replacement Parts': (6592, {}),
    'SIM Cards & Tools': (506, {}),
    '

### Reviews

The reviews dataset (`reviews.json`) has a size of approximately 100 GB. Each review contains the following fields:
- **reviewerID**: unique ID associated to each user.
- **asin**: unique ID associated to each product.
- **reviewerName**: name of the user.
- **helpful**: helpfulness rating of the review (tuple of 2 elements: # helpful and # not helpful).
- **reviewText**: text of the review.
- **overall**: rating of the product from 1 to 5 stars.
- **summary** - summary of the review.
- **unixReviewTime** - unix timestamp of the review.
- **reviewTime** - raw timestamp of the review.

Since our project is mainly focused on products, we are not interested in individual reviews. However, we still need this dataset in order to compute the aggregate ratings for each product and merge them into our dataset.

## Preliminary processing

**@show **
- That you considered ways to enrich, filter, transform the data according to your needs.

#### Reduce the Amazon dataset

Due to the large size of the Amazon dataset, we decided to create a custom dataset prior to performing any further analysis. The custom dataset, which has been named *reduced*, contains only products belonging to the macro-categories selected in the previous parapraph. To further reduce the size, every image URL associated to a product is deleted. In addition, the review ratings of each product are averaged and merged with the products. As a result, we obtain a smaller *metadata* dataset (1.71 GB) that is enriched with the average product rating field.

##### Aggregate ratings
The average product rating is computed from the data in the *reviews* dataset. For each entry, the product ID and the rating are stored, respectively, in the *asin* and *overall* fields. To compute the rating, entries are grouped by product ID and then 
averaged on the *overall* field. In addition to the average rating for each product, we add some additional information which might come handy later: the number of reviews (`num_reviews`), and the ratio of helpful reviews (`helpful_fraction`), defined as **# helpful / (# helpful + # not helpful)**.
The output is saved in JSON format as `aggregate_ratings.json`. The code for processing the data (shown below) was executed on the ADA cluster.

```python
import json
import pyspark.sql.functions as func
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)

rdd = sc.textFile('/datasets/productGraph/complete.json')\
    .map(lambda x: json.loads(x))\
    .map(lambda x: (x['asin'], x['overall'], x['helpful'][0], x['helpful'][1]))

sqlContext.createDataFrame(rdd, ['asin', 'overall', 'helpful', 'not_helpful'])\
    .groupBy('asin')\
    .agg(
        func.mean('overall').alias('average_rating'),
        func.count('overall').alias('num_reviews'),
        (func.sum('helpful') / (func.sum('helpful') + func.sum('not_helpful'))).alias('helpful_fraction')
    )\
    .toJSON()\
    .saveAsTextFile('aggregate_ratings.json')
```

##### Merge the datasets
*Metadata* is filtered so as to mantain only the products belonging to the macro categories of interest, which are then merged with *aggregate_ratings*. The code that generates the *reduced* dataset is shown below.

TODO: write about multiple categories

In [None]:
# The list of macro categories that we want to extract
categories_to_extract = set(['Electronics', 'Cell Phones & Accessories', 'Automotive',\
                             'Tools & Home Improvement', 'Musical Instruments'])

# Extract macro category and delete img url to reduce size
def extract_category(x):
    x['category'] = x['categories'][0]
    del x['categories']
    x['num_reviews'] = 0 # The number of reviews is 0 in the default case
    if 'imUrl' in x:
        del x['imUrl']
    return x

# Load the aggregate ratings
ratings = sc.textFile('data/aggregate_ratings.json')\
    .map(lambda x: json.loads(x))\
    .map(lambda x: (x['asin'], x))

# Filter products and merge datasets.
# Note that we use the left outer join, so as to include products that have no reviews.
sc.textFile('data/metadata.json')\
    .map(lambda x: ast.literal_eval(x))\
    .filter(lambda x: 'categories' in x)\
    .map(extract_category)\
    .filter(lambda x: x['category'][0] in categories_to_extract)\
    .map(lambda x: (x['asin'], x))\
    .leftOuterJoin(ratings)\
    .map(lambda x: x[1])\
    .map(lambda x: x[0] if x[1] == None else {**x[0], **x[1]})\
    .map(lambda x: json.dumps(x))\
    .saveAsTextFile('data/reduced.json')

#### Build a light sandbox dataset

To perform tests on data rapidly, we have decided to further reduce the Amazon dataset. Specifically, from *reduced* we built a lighter dataset containg only a single macro category, being *Musical Instruments*. The code that generates the dataset and the category tree are shown below.

In [None]:
sc.textFile('reduced.json')\
    .filter(lambda x: json.loads(x)['category'][0] == 'Musical Instruments')\
    .coalesce(1)\
    .saveAsTextFile('musical_instruments.json')

category_tree['Musical Instruments'][0].keys()

##  Exploratory analysis

**@show**
- That you understand what’s into the data (formats, distributions, missing values, correlations, etc.).
- That you have updated your plan in a reasonable way, reflecting your improved knowledge after data acquaintance. In particular, discuss how your data suits your project needs and discuss the methods you’re going to use, giving their essential mathematical details in the notebook.


**@todo**
- show some correlation between the variables
- show some cliques and infer some conclusion. are the clicques well constructed? can we claim that  a product is better than the others within the clique? if yes, with what metrics? (analyze correlations)
- Are the cliques meaningful? We have seen that some cliques are composed of the same object with different colors, or even of the same exact products from different vendors. It may be necessary to add some considerations on how dealing with these extreme cases.
- How do we merge cliques? What heuristic do we choose? How do we deal with single-product-size cliques?
- **Build histogram of missing features**

#### Correlation analysis
We performed some analyses on the *musical_instruments* dataset variables. In details, we investigated the correlations among price, review rating and sale rank. 

In [None]:
all_ratings = sc.textFile('musical_instruments.json')\
    .map(lambda x: json.loads(x))\
    .filter(lambda x: x['num_reviews'] > 10 and 'price' in x and 'salesRank' in x and 'Musical Instruments' in x['salesRank'])\
    .map(lambda x: (x['price'], x['average_rating'], x['salesRank']['Musical Instruments']))\
    .collect()
    
df = pd.DataFrame(all_ratings)
print(len(df))
df.columns = ['price', 'rating', 'rank']
corr = df.corr()
display(df.head())
plt.figure(figsize=(10,10))
_ = sns.heatmap(corr, annot=True,
            xticklabels=corr.columns.values,
            yticklabels=corr.columns.values)

sns.pairplot(df)

According to the graphs above, we discuss the following outcomes on the analysis of musical instruments:
- As the price increases, the ratings tend to have lower variance and higher mean. In other words, more expensive products have on average higher ratings, and are less likely to be not popular. 
- As the price increases, the variance of sale rate tends to be lower. Thefore, the sales are less likely to be low.
- As the sale rank decreases, ratings tend do be higher and less distributed over the range.

**@todo**: However, these analysis may vary among different categories.


## Conclusion

**@show **
- That your plan for analysis and communication is now reasonable and sound, potentially discussing alternatives to your choices that you considered but dropped.

**@todo**
- discuss the feasability of the project
- define some further internal steps before milestone 3

# Reminder: Internal steps before milestone2

- Define the rules for creating the graph (i.e. the influence of each relation type).
- Devise an efficient algorithm for extracting cliques or highly connected subgraphs, and, possibly, merging them into clusters.
- Find useful insights into the structure of these clusters, apart from obvious ones (the best product in a cluster). For example:
-- Do people always choose the most cheap product among related products?
-- Conversely, does the best product cost more than the others?
-- Are the best products sold only by well-known brands?