## JOB 1

we will look into:

    - loading data from s3
    - analyse and gettting dataframe stats by using `describe`
    - group by, agg operations
    - filtering, selecting cleaning data into desired data features

In [1]:
import sys; sys.path.insert(0, '..')
import findspark; findspark.init()

In [2]:
import json
import configparser
from os import environ, listdir, path

from pyspark import SparkConf
from pyspark import SparkFiles
from pyspark.sql import SparkSession

from src.commons import utils
from src.amazon_reviews import etl, job

In [3]:
# EMR 6.10
environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk:1.11.900,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell"

In [4]:
environ['DEBUG'] = "1"

In [5]:
config = configparser.ConfigParser()
config.read(path.expanduser("~/.aws/credentials"))
access_id = config.get('default', "aws_access_key_id") 
access_key = config.get('default', "aws_secret_access_key")

In [6]:
DIG_MUCSIC_FILE_PATH ='../data/part-00009-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet'
DIG_MUSIC_S3 = "s3a://amazon-reviews-pds/parquet/product_category=Digital_Music_Purchase/*.snappy.parquet"

In [7]:
session, logger= utils.start_spark()

In [8]:
# https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/s3n.html#How_to_migrate_to_to_the_S3A_client
hadoop_conf=session._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", access_id)
hadoop_conf.set("fs.s3a.secret.key", access_key)

In [9]:
# run everything together job.run(session, logger)

In [10]:
digital_munisc_df = utils.extract_parquet_data(session,DIG_MUSIC_S3)

In [11]:
digital_munisc_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)



We aggregate following metrics by customer_id: 
- total products count
- distinct product count
- distinct parent product
- review counts
- purchase/(review) rate

In [12]:
purchase_agg_df = etl.to_stats_aggregation(digital_munisc_df)

In [13]:
purchase_agg_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- products: long (nullable = false)
 |-- products_distinct: long (nullable = false)
 |-- pp_counts: long (nullable = false)
 |-- reviews: long (nullable = false)
 |-- verified_purchase: long (nullable = false)
 |-- purchase_rate: float (nullable = true)



## explore and analyse the digital music stats using describe. 

In [14]:
music_purchase_agg_stats = purchase_agg_df.describe(['products_distinct', 'purchase_rate'])
music_purchase_agg_stats.show()

+-------+------------------+------------------+
|summary| products_distinct|     purchase_rate|
+-------+------------------+------------------+
|  count|            841870|            841870|
|   mean|2.0718056231959805|0.6784817806876001|
| stddev| 5.917138532090482|0.4465739418824318|
|    min|                 1|               0.0|
|    max|              1907|               1.0|
+-------+------------------+------------------+



Based on the above stats, we see in average, customer reviews 2 albumns. Most customers reviews the product based on confirmed purchase.
Majority of the customers reviewed only 1 albumn, but there are customers (likely bots) reviewed 1907 albumns.

In order to train our model properly. Users whom only reviewed a single product are deemed to be too sparse. we want to exclude those data points from traning.

While user who reviewed over thousands of products, are likely to be a bot, those records could skew graph struscutre, which leads to poor results. Subsequently, we want to remove reviews from those costomers too.

## clean up the review data

remove sparse data points, as well as "over active customers"

In [15]:
# get customer_ids based on their distinct product purchase (minimal 3, and maximum 50, are arbitrary numbers)
dense_customer_ids_df = etl.to_dense_user_ids(purchase_agg_df, 3, 50)

In [16]:
dense_customer_ids_df.printSchematSchematSchema()

root
 |-- customer_id: string (nullable = true)



In [17]:
# filter review records by customer_ids
filtered_review_df = etl.to_dense_reviews(digital_munisc_df, dense_customer_ids_df)

## review physical excution plan before action

In [18]:
filtered_review_df.explain()

== Physical Plan ==
*(4) Project [customer_id#1, marketplace#0, review_id#2, product_id#3, product_parent#4, product_title#5, star_rating#6, helpful_votes#7, total_votes#8, vine#9, verified_purchase#10, review_headline#11, review_body#12, review_date#13, year#14]
+- *(4) BroadcastHashJoin [customer_id#1], [customer_id#232], Inner, BuildRight
   :- *(4) Project [marketplace#0, customer_id#1, review_id#2, product_id#3, product_parent#4, product_title#5, star_rating#6, helpful_votes#7, total_votes#8, vine#9, verified_purchase#10, review_headline#11, review_body#12, review_date#13, year#14]
   :  +- *(4) Filter isnotnull(customer_id#1)
   :     +- *(4) ColumnarToRow
   :        +- FileScan parquet [marketplace#0,customer_id#1,review_id#2,product_id#3,product_parent#4,product_title#5,star_rating#6,helpful_votes#7,total_votes#8,vine#9,verified_purchase#10,review_headline#11,review_body#12,review_date#13,year#14] Batched: true, DataFilters: [isnotnull(customer_id#1)], Format: Parquet, Locatio

## save final data frame to target S3 bucket

In [None]:
# util.load_data_to_s3(filtered_review_df,'target_s3_path')

In [19]:
session.stop()

## submit job from your local machine

run command `tox -e pack`
releasable artifact will be generated inside `./dist` folder. 
```
dist/
  ├── dist_files.zip
  └── main.py
```

step into `dist` folder involke `spark submit`:

```
$: spark-submit \
    --master local[3] \
    --deploy-mode client \
    --packages=com.amazonaws:aws-java-sdk:1.11.900,org.apache.hadoop:hadoop-aws:3.2.0 \
    --py-files ./dist_files.zip \
    main.py --job=review --category=Digital_Music_Purchase --local_run=1
```