# Assignment 2 DSC 102 2020 WI

## Introduction

In this assignment we will conduct data engineering for the Amazon dataset. The extracted features will be used for your next assignment, where you train a model (or models) to predict user ratings for a product.

We will be using Apache Spark for this assignment. The default Spark API will be DataFrame, as it is now the recommended choice over the RDD API. That being said, please feel free to switch back to the RDD API if you see it as a better fit for the task. We provide you an option to request RDD format to start with. Also you can switch between DataFrame and RDD in your solution. 

Another newer API is Koalas, which is also avaliable. However, it has constraints and is not applicable to most tasks. Refer to the PA statement for detail.


### Set the following parameters

In [1]:
PID = 'a14664383' # your pid, for instance: 'a43223333'
INPUT_FORMAT = 'dataframe' # choose a format of your input data, valid options: 'dataframe', 'rdd', 'koalas'

In [2]:
# Boiler plates, do not modify
%load_ext autoreload
%autoreload 2
import os
from pyspark.sql import SparkSession
from utilities import SEED
from utilities import PA2Test
from utilities import PA2Data
from utilities import data_cat
from pa2_main import PA2Executor
import time
if INPUT_FORMAT == 'dataframe':
    import pyspark.ml as M
    import pyspark.sql.functions as F
    import pyspark.sql.types as T
if INPUT_FORMAT == 'koalas':
    import databricks.koalas as ks
elif INPUT_FORMAT == 'rdd':
    import pyspark.mllib as M
    from pyspark.mllib.feature import Word2Vec
    from pyspark.mllib.linalg import Vectors
    from pyspark.mllib.linalg.distributed import RowMatrix

os.environ['PYSPARK_SUBMIT_ARGS'] = '--py-files utilities.py,assignment2.py \
--deploy-mode client \
pyspark-shell'

class args:
    review_filename = data_cat.review_filename
    product_filename = data_cat.product_filename
    product_processed_filename = data_cat.product_processed_filename
    ml_features_train_filename = data_cat.ml_features_train_filename
    ml_features_test_filename = data_cat.ml_features_test_filename
    output_root = '/home/{}-pa2/test_results'.format(PID)
    test_results_root = data_cat.test_results_root
    pid = PID

pa2 = PA2Executor(args, input_format=INPUT_FORMAT)
data_io = pa2.data_io
data_dict = pa2.data_dict  
begin = time.time()

Loading datasets ...Done


In [3]:
# Import your own dependencies
from pyspark.ml.stat import Summarizer


#-----------------------------

# Part 1: Feature Engineering

In [4]:
# Bring the part_1 datasets to memory and de-cache part_2 datasets. 
# Execute this once before you start working on this Part
data_dict, _ = data_io.cache_switch(data_dict, 'part_1')

# Task0: warm up 
This task is provided for you to get familiar with Spark API. We will use the dataframe API to demonstrate. Solution is given to you and this task won't be graded.

Refer to https://spark.apache.org/docs/latest/api/python/pyspark.sql.html for API guide.

The task is to implement the function below. Given the ```product_data``` table:
1. Take and print five rows.

1. Select only the ```asin``` column, then print five rows of it.

1. Select the row where ```asin = B00I8KEOTM``` and print it.

1. Count the total number of rows.

1. Calculate the mean ```price```.

1. You need to conduct the above operations, then extract some statistics out of the generated columns. You need to put the statistics in a python dictionary named ```res```. The description and schema of it are as follows:
    ```
    res
     | -- count_total: int -- count of total rows of the entire table after your operations
     | -- mean_price: float -- mean value of column price
    ```

In [5]:
def task_0(data_io, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    asin_column = 'asin'
    overall_column = 'overall'
    # Outputs:
    mean_rating_column = 'meanRating'
    count_rating_column = 'countRating'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------

    product_data.show(5)
    product_data[['asin']].show(5)
    product_data.where(F.col('asin') == 'B00I8KEOTM').show()
    count_rows = product_data.count()
    mean_price = product_data.select(F.avg(F.col('price'))).head()[0]
    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    # Calculate the values programmaticly. Do not change the keys and do not
    # hard-code values in the dict. Your submission will be evaluated with
    # different inputs.
    # Modify the values of the following dictionary accordingly.
    res = {'count_total': None, 'mean_price': None}
    # Modify res:

    res['count_total'] = count_rows
    res['mean_price'] = mean_price

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    return res
    # -------------------------------------------------------------------------

In [6]:
if INPUT_FORMAT == 'dataframe':
    res = task_0(data_io, data_dict['product'])
    pa2.tests.test(res, 'task_0')

+----------+--------------------+--------------------+--------------------+-----+--------------------+
|      asin|           salesRank|          categories|               title|price|             related|
+----------+--------------------+--------------------+--------------------+-----+--------------------+
|B00I8HVV6E|[Home &amp; Kitch...|[[Home & Kitchen,...|Intelligent Desig...|27.99|[also_viewed -> [...|
|B00I8KEOTM|                null|[[Apps for Androi...|                null| null|[also_viewed -> [...|
|B00I8KCW4G|[Clothing -> 2233...|[[Clothing, Shoes...|eShakti Women's P...|41.95|[also_viewed -> [...|
|B00I8JKCQW|[Clothing -> 1405...|[[Clothing, Shoes...|Lady Slimming Mid...| null|[also_viewed -> [...|
|B00I8JKI8E|[Home &amp; Kitch...|[[Clothing, Shoes...|3 Tier Bangle Bra...|24.99|[also_viewed -> [...|
+----------+--------------------+--------------------+--------------------+-----+--------------------+
only showing top 5 rows

+----------+
|      asin|
+----------+
|B00I8HVV

# Task1

In [7]:
# %load -s task_1 assignment2.py
def task_1(data_io, review_data, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    asin_column = 'asin'
    overall_column = 'overall'
    # Outputs:
    mean_rating_column = 'meanRating'
    count_rating_column = 'countRating'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------
    
    # old method time: 120.27334499359131s
    # combine method time: 100.53635740280151
    #p_asin = p.select(p['asin'])
    #combine = p_asin.join(agg_df, on='asin', how='left')
    
    # step 1&2
    # use agg to calculate the mean and count
    gr = review_data.groupBy(review_data.asin)
    agg_df = gr.agg({'overall':'mean', 'asin':'count'})
    # rename the table
    agg_df = agg_df.withColumnRenamed('avg(overall)', 'meanRating').withColumnRenamed('count(asin)', 'countRating')
    # join the agg_df to product_data
    p_asin = product_data.select(product_data['asin'])
    product_data = p_asin.join(agg_df, on='asin', how='left')
    
    # step 3
    count_row = product_data.count()
    
    mean_mr = product_data.select(F.avg(F.col('meanRating'))).head()[0]
    var_mr = product_data.select(F.var_samp(F.col('meanRating'))).head()[0]
    numNulls_mr = product_data.filter(product_data['meanRating'].isNull()).count()
    
    mean_cr = product_data.select(F.avg(F.col('countRating'))).head()[0]
    var_cr = product_data.select(F.var_samp(F.col('countRating'))).head()[0]
    numNulls_cr = product_data.filter(product_data['countRating'].isNull()).count()


    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    # Calculate the values programmaticly. Do not change the keys and do not
    # hard-code values in the dict. Your submission will be evaluated with
    # different inputs.
    # Modify the values of the following dictionary accordingly.
    res = {
        'count_total': None,
        'mean_meanRating': None,
        'variance_meanRating': None,
        'numNulls_meanRating': None,
        'mean_countRating': None,
        'variance_countRating': None,
        'numNulls_countRating': None
    }
    # Modify res:
    
    res['count_total'] = count_row
    res['mean_meanRating'] = mean_mr
    res['variance_meanRating'] = var_mr
    res['numNulls_meanRating'] = numNulls_mr
    res['mean_countRating'] = mean_cr
    res['variance_countRating'] = var_cr
    res['numNulls_countRating'] = numNulls_cr

    # TODO: delete this
    print(res)
    
    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_1')
    return res
    # -------------------------------------------------------------------------


In [8]:
res = task_1(data_io, data_dict['review'], data_dict['product'])
pa2.tests.test(res, 'task_1')

{'count_total': 9430000, 'mean_meanRating': 4.1513144826254775, 'variance_meanRating': 1.1297362704403955, 'numNulls_meanRating': 1973780, 'mean_countRating': 7.444083463202534, 'variance_countRating': 2647.0014292689057, 'numNulls_countRating': 1973780}




tests for task_1 --------------------------------------------------------------
Test 1/7 : count_total ... Pass
Test 2/7 : mean_countRating ... Pass
Test 3/7 : mean_meanRating ... Pass
Test 4/7 : numNulls_countRating ... Pass
Test 5/7 : numNulls_meanRating ... Pass
Test 6/7 : variance_countRating ... Pass
Test 7/7 : variance_meanRating ... Pass
7/7 passed
-------------------------------------------------------------------------------


True


# Task 2

In [9]:
# %load -s task_2 assignment2.py
def task_2(data_io, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    salesRank_column = 'salesRank'
    categories_column = 'categories'
    asin_column = 'asin'
    # Outputs:
    category_column = 'category'
    bestSalesCategory_column = 'bestSalesCategory'
    bestSalesRank_column = 'bestSalesRank'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------
    
    # TODO: can we use alias?
    p = product_data.alias('p')

    # step 1
    p = p.withColumn('category', F.flatten(p.categories))
    p = p.withColumn('category', p['category'].getItem(0))
    p = p.replace({'': None}, subset=['category'])
    
    # step 2
    p = p.withColumn('bestSalesCategory', F.map_keys(F.col('salesRank')).getItem(0))
    p = p.withColumn('bestSalesRank', F.map_values(F.col('salesRank')).getItem(0))
    
    product_data = p
    
    # step 3
    cnt_total = product_data.count()
    mean_bsr = product_data.select(F.avg(F.col('bestSalesRank'))).head()[0]
    var_bsr = product_data.select(F.var_samp(F.col('bestSalesRank'))).head()[0]
    numNulls_cat = product_data.filter(product_data['category'].isNull()).count()
    cntDis_cat = product_data.select(F.countDistinct(product_data.category)).head()[0]
    numNulls_bsc = product_data.filter(product_data['bestSalesCategory'].isNull()).count()
    cntDis_bsc = product_data.select(F.countDistinct(product_data['bestSalesCategory'])).head()[0]
    

    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    res = {
        'count_total': None,
        'mean_bestSalesRank': None,
        'variance_bestSalesRank': None,
        'numNulls_category': None,
        'countDistinct_category': None,
        'numNulls_bestSalesCategory': None,
        'countDistinct_bestSalesCategory': None
    }
    # Modify res:
    
    res['count_total'] = cnt_total
    res['mean_bestSalesRank'] = mean_bsr
    res['variance_bestSalesRank'] = var_bsr
    res['numNulls_category'] = numNulls_cat
    res['countDistinct_category'] = cntDis_cat
    res['numNulls_bestSalesCategory'] = numNulls_bsc
    res['countDistinct_bestSalesCategory'] = cntDis_bsc

    # TODO: delete this
    print(res)
    
    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_2')
    return res
    # -------------------------------------------------------------------------


In [10]:
res = task_2(data_io, data_dict['product'])
pa2.tests.test(res, 'task_2')

{'count_total': 9430000, 'mean_bestSalesRank': 1102421.3114573301, 'variance_bestSalesRank': 3616424023407.04, 'numNulls_category': 270169, 'countDistinct_category': 82, 'numNulls_bestSalesCategory': 2480772, 'countDistinct_bestSalesCategory': 33}
tests for task_2 --------------------------------------------------------------
Test 1/7 : countDistinct_bestSalesCategory ... Pass
Test 2/7 : countDistinct_category ... Pass
Test 3/7 : count_total ... Pass
Test 4/7 : mean_bestSalesRank ... Pass
Test 5/7 : numNulls_bestSalesCategory ... Pass
Test 6/7 : numNulls_category ... Pass
Test 7/7 : variance_bestSalesRank ... Pass
7/7 passed
-------------------------------------------------------------------------------


True

# Task 3





In [11]:
# %load -s task_3 assignment2.py
def task_3(data_io, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    asin_column = 'asin'
    price_column = 'price'
    attribute = 'also_viewed'
    related_column = 'related'
    # Outputs:
    meanPriceAlsoViewed_column = 'meanPriceAlsoViewed'
    countAlsoViewed_column = 'countAlsoViewed'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------
    
    price = product_data.select('asin', 'price')
    price = price.withColumnRenamed('asin', 'priceAsin')
    p = product_data.alias('p')
    p = p.select('asin', 'related')
    

    # step 1
    p = p.withColumn('also_viewed', p['related']['also_viewed'])
    expl = p.select('asin', F.explode_outer(p['also_viewed']))
    expl = expl.withColumnRenamed('col', 'avAsin')
    joined = expl.join(price, expl['avAsin'] == price['priceAsin'], how='left')
    
    gr = joined.groupBy(joined.asin)
    agg_df = gr.agg({'price':'mean'})
    p1 = agg_df.withColumnRenamed('avg(price)', 'meanPriceAlsoViewed')
    
    # step 2: add length column
    p2 = p.withColumn('countAlsoViewed', F.size(p['also_viewed']))
    p2 = p2.replace({-1: None}, subset=['countAlsoViewed'])

    # step 3
    cnt_total = p1.count()
    mean_mpav = p1.select(F.avg(F.col('meanPriceAlsoViewed'))).head()[0]
    var_mpav = p1.select(F.var_samp(F.col('meanPriceAlsoViewed'))).head()[0]
    numNulls_mpav = p1.filter(p1['meanPriceAlsoViewed'].isNull()).count()
    mean_cav = p2.select(F.avg(F.col('countAlsoViewed'))).head()[0]
    var_cav = p2.select(F.var_samp(F.col('countAlsoViewed'))).head()[0]
    numNulls_cav = p2.filter(p2['countAlsoViewed'].isNull()).count()


    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    res = {
        'count_total': None,
        'mean_meanPriceAlsoViewed': None,
        'variance_meanPriceAlsoViewed': None,
        'numNulls_meanPriceAlsoViewed': None,
        'mean_countAlsoViewed': None,
        'variance_countAlsoViewed': None,
        'numNulls_countAlsoViewed': None
    }
    # Modify res:
    
    res["count_total"] = cnt_total
    
    res["mean_meanPriceAlsoViewed"] =  mean_mpav
    res['variance_meanPriceAlsoViewed'] =  var_mpav
    res['numNulls_meanPriceAlsoViewed'] =  numNulls_mpav
    res['mean_countAlsoViewed'] = mean_cav
    res['variance_countAlsoViewed'] = var_cav
    res['numNulls_countAlsoViewed'] =  numNulls_cav
    
    # TODO: delete this!
    print(res)

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_3')
    return res
    # -------------------------------------------------------------------------


In [12]:
res = task_3(data_io, data_dict['product'])
pa2.tests.test(res, 'task_3')

{'count_total': 9430000, 'mean_meanPriceAlsoViewed': 45.285018283030176, 'variance_meanPriceAlsoViewed': 5135.311790051084, 'numNulls_meanPriceAlsoViewed': 5835773, 'mean_countAlsoViewed': 31.561228904913023, 'variance_countAlsoViewed': 562.8028467039483, 'numNulls_countAlsoViewed': 5408566}
tests for task_3 --------------------------------------------------------------
Test 1/7 : count_total ... Pass
Test 2/7 : mean_countAlsoViewed ... Pass
Test 3/7 : mean_meanPriceAlsoViewed ... Pass
Test 4/7 : numNulls_countAlsoViewed ... Pass
Test 5/7 : numNulls_meanPriceAlsoViewed ... Pass
Test 6/7 : variance_countAlsoViewed ... Pass
Test 7/7 : variance_meanPriceAlsoViewed ... Pass
7/7 passed
-------------------------------------------------------------------------------


True

# Task 4

In [13]:
# %load -s task_4 assignment2.py
def task_4(data_io, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    price_column = 'price'
    title_column = 'title'
    # Outputs:
    meanImputedPrice_column = 'meanImputedPrice'
    medianImputedPrice_column = 'medianImputedPrice'
    unknownImputedTitle_column = 'unknownImputedTitle'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------
    
    # TODO: can we use alias?
    p = product_data.alias('p')
    
    # Step 1
    p = p.withColumn('price', p['price'].cast('float'))
    mean_price = p.select(F.avg(F.col('price'))).head()[0]
    p = p.withColumn('meanImputedPrice', p['price'])

    # Step 2
    median_price = p.approxQuantile('price', [0.5], 0.001)[0]
    p = p.withColumn('medianImputedPrice', p['price'])
    
    # Step 3
    p = p.withColumn('unknownImputedTitle', p.title)
    p = p.replace({'':'unknown'}, subset=['unknownImputedTitle'])

    # fill nulls
    p = p.fillna({'medianImputedPrice': median_price, 
                  'meanImputedPrice': mean_price,
                  'unknownImputedTitle': 'unknown'})
    
    product_data = p
    
    # Step 4
    cnt_total = product_data.count()
    mean_mip = product_data.select(F.avg(F.col('meanImputedPrice'))).head()[0]
    var_mip = product_data.select(F.var_samp(F.col('meanImputedPrice'))).head()[0]
    numNulls_mip = product_data.filter(product_data['meanImputedPrice'].isNull()).count()
    
    mean_meip = product_data.select(F.avg(F.col('medianImputedPrice'))).head()[0]
    var_meip = product_data.select(F.var_samp(F.col('medianImputedPrice'))).head()[0]
    numNulls_meip = product_data.filter(product_data['medianImputedPrice'].isNull()).count()
    numUnk = product_data.filter(product_data['unknownImputedTitle'] == 'unknown').count()
    

    #"mean_medianImputedPrice":27.815470873162305,
    # 0 = 14.989999771118164
    # 0.001 = 14.989999771118164
    # 0.0001 = 14.989999771118164
    # 0.25?

    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    res = {
        'count_total': None,
        'mean_meanImputedPrice': None,
        'variance_meanImputedPrice': None,
        'numNulls_meanImputedPrice': None,
        'mean_medianImputedPrice': None,
        'variance_medianImputedPrice': None,
        'numNulls_medianImputedPrice': None,
        'numUnknowns_unknownImputedTitle': None
    }
    # Modify res:
    res['count_total'] = cnt_total
    res['mean_meanImputedPrice'] = mean_mip
    res['variance_meanImputedPrice'] = var_mip
    res['numNulls_meanImputedPrice'] = numNulls_mip
    res['mean_medianImputedPrice'] = mean_meip
    res['variance_medianImputedPrice'] = var_meip
    res['numNulls_medianImputedPrice'] = numNulls_meip
    res['numUnknowns_unknownImputedTitle'] = numUnk
    
    # TODO: delete this
    print(res)

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_4')
    return res
    # -------------------------------------------------------------------------


In [14]:
res = task_4(data_io, data_dict['product'])
pa2.tests.test(res, 'task_4')

{'count_total': 9430000, 'mean_meanImputedPrice': 34.93735571858629, 'variance_meanImputedPrice': 3265.3113437199545, 'numNulls_meanImputedPrice': 0, 'mean_medianImputedPrice': 27.81547087316232, 'variance_medianImputedPrice': 3356.6528865054042, 'numNulls_medianImputedPrice': 0, 'numUnknowns_unknownImputedTitle': 1432648}
tests for task_4 --------------------------------------------------------------
Test 1/8 : count_total ... Pass
Test 2/8 : mean_meanImputedPrice ... Pass
Test 3/8 : mean_medianImputedPrice ... Pass
Test 4/8 : numNulls_meanImputedPrice ... Pass
Test 5/8 : numNulls_medianImputedPrice ... Pass
Test 6/8 : numUnknowns_unknownImputedTitle ... Pass
Test 7/8 : variance_meanImputedPrice ... Pass
Test 8/8 : variance_medianImputedPrice ... Pass
8/8 passed
-------------------------------------------------------------------------------


True

# Task 5

In [15]:
# %load -s task_5 assignment2.py
def task_5(data_io, product_processed_data, word_0, word_1, word_2):
    # -----------------------------Column names--------------------------------
    # Inputs:
    title_column = 'title'
    # Outputs:
    titleArray_column = 'titleArray'
    titleVector_column = 'titleVector'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------
    
    p = product_processed_data.alias('p')
    
    # step 1
    p = p.withColumn('titleArray', F.lower(p['title']))
    p = p.withColumn('titleArray', F.split(p['titleArray'], '\s'))
    
    # step 2
    w2v = M.feature.Word2Vec(minCount=100, vectorSize=16, seed=SEED, 
                             numPartitions=4, inputCol='titleArray', outputCol='vec')
    model = w2v.fit(p)
    
    
    product_processed_data_output = p


    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    res = {
        'count_total': None,
        'size_vocabulary': None,
        'word_0_synonyms': [(None, None), ],
        'word_1_synonyms': [(None, None), ],
        'word_2_synonyms': [(None, None), ]
    }
    # Modify res:
    res['count_total'] = product_processed_data_output.count()
    res['size_vocabulary'] = model.getVectors().count()
    for name, word in zip(
        ['word_0_synonyms', 'word_1_synonyms', 'word_2_synonyms'],
        [word_0, word_1, word_2]
    ):
        res[name] = model.findSynonymsArray(word, 10)
    # TODO: delete this
    print(res)
    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_5')
    return res
    # -------------------------------------------------------------------------


In [16]:
res = task_5(data_io, data_dict['product_processed'], 'piano', 'rice', 'laptop')
pa2.tests.test(res, 'task_5')

{'count_total': 9430000, 'size_vocabulary': 42457, 'word_0_synonyms': [('mozart:', 0.9652077555656433), ('suites', 0.9600690603256226), ('sonatas', 0.9555727243423462), ('oboe', 0.9510754346847534), ('orchestra', 0.9495903849601746), ('brahms', 0.9470160603523254), ('orchestral', 0.9435944557189941), ('concerto,', 0.9435223340988159), ('orchestra,', 0.9414055347442627), ('etudes', 0.9398797750473022)], 'word_1_synonyms': [('shrimp', 0.9786536693572998), ('yogurt', 0.978205680847168), ('seasoning', 0.9746150374412537), ('seasoned', 0.9745858311653137), ('spice', 0.9742987155914307), ('barley', 0.9725334048271179), ('bites,', 0.9722008109092712), ('roasted', 0.9718379974365234), ('gluten', 0.97142493724823), ('lentil', 0.9708153009414673)], 'word_2_synonyms': [('notebook', 0.9813675284385681), ('g570', 0.965702474117279), ('lenovo', 0.9624518752098083), ('ultrabook', 0.961560845375061), ('17.3-inch', 0.9614698886871338), ('netbook', 0.9583598375320435), ('aspire', 0.9573705792427063), ('

True

# Task 6

In [17]:
# %load -s task_6 assignment2.py
def task_6(data_io, product_processed_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    category_column = 'category'
    # Outputs:
    categoryIndex_column = 'categoryIndex'
    categoryOneHot_column = 'categoryOneHot'
    categoryPCA_column = 'categoryPCA'
    # -------------------------------------------------------------------------    

    # ---------------------- Your implementation begins------------------------
    
    p = product_processed_data.alias('p')
    
    # step 1
    # convert category column to numerical indices
    indx = M.feature.StringIndexer(inputCol = 'category', outputCol='indexed')
    indx_model = indx.fit(p)
    p = indx_model.transform(p)
    # one-hot encoding
    ohe = M.feature.OneHotEncoderEstimator(inputCols=['indexed'], 
                                          outputCols=['categoryOneHot'], dropLast=False)
    ohe_model = ohe.fit(p)
    p = ohe_model.transform(p)
    
    # step 2
    pca = M.feature.PCA(inputCol='categoryOneHot', outputCol='categoryPCA', k=15)
    pca_model = pca.fit(p)
    p = pca_model.transform(p)
    
    # step 4
    cnt_total = p.count()
    summ = Summarizer.metrics('mean')
    mean_onehot = p.select(summ.summary(p.categoryOneHot)).head()[0][0]
    mean_onehot = mean_onehot.values.tolist()
    mean_pca = p.select(summ.summary(p.categoryPCA)).head()[0][0]
    mean_pca = mean_pca.values.tolist()
    
    
    
    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    res = {
        'count_total': None,
        'meanVector_categoryOneHot': [None, ],
        'meanVector_categoryPCA': [None, ]
    }
    # Modify res:
    
    res['count_total'] = cnt_total
    res['meanVector_categoryOneHot'] = mean_onehot
    res['meanVector_categoryPCA'] = mean_pca
    
    # delete this
    print(res)

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_6')
    return res
    # -------------------------------------------------------------------------


In [18]:
res = task_6(data_io, data_dict['product_processed'])
pa2.tests.test(res, 'task_6')

{'count_total': 9430000, 'meanVector_categoryOneHot': [0.25131601272534465, 0.1522659597030753, 0.05620243902439025, 0.05254252386002121, 0.05214347826086956, 0.04633690349946978, 0.0367677624602333, 0.03557656415694592, 0.03510593849416755, 0.028671261930010603, 0.028649946977730645, 0.02846967126193001, 0.02781728525980912, 0.02748356309650053, 0.020665323435843055, 0.01821325556733828, 0.014239766702014846, 0.012449946977730646, 0.011721633085896077, 0.01154305408271474, 0.0075626723223753975, 0.007260021208907741, 0.006526405090137858, 0.005181760339342524, 0.005004878048780487, 0.003249946977730647, 0.002560445387062566, 0.001878897136797455, 0.0012358430540827148, 0.0010625662778366914, 0.0008027571580063626, 0.0007919406150583245, 0.0006977730646871687, 0.0006583244962884412, 0.0006481442205726405, 0.0005278897136797455, 0.0004906680805938494, 0.000471898197242842, 0.000436373276776246, 0.00041823966065747615, 0.00040943796394485684, 0.00039777306468716863, 0.0003738069989395546

True

In [19]:
print ("End to end time: {}".format(time.time()-begin))

End to end time: 860.2674803733826


# Part 2: Model Selection

In [20]:
# Bring the part_2 datasets to memory and de-cache part_1 datasets.
# Execute this once before you start working on this Part
data_dict, _ = data_io.cache_switch(data_dict, 'part_2')

# Task 7

In [21]:
def task_7(data_io, train_data, test_data):
    
    # ---------------------- Your implementation begins------------------------
    
    # step 1
    dt = M.regression.DecisionTreeRegressor(labelCol='overall', maxDepth=5)
    dt_model = dt.fit(train_data)
    
    # step 2
    test_data_pred = dt_model.transform(test_data)
    evaluator = M.evaluation.RegressionEvaluator(predictionCol='prediction',
                                                labelCol='overall', metricName='rmse')
    rmse = evaluator.evaluate(test_data_pred)
    
    
    
    
    
    # -------------------------------------------------------------------------
    
    
    # ---------------------- Put results in res dict --------------------------
    res = {
        'test_rmse': None
    }
    # Modify res:
    res['test_rmse'] = rmse
    
    # TODO: delete this
    print(res)

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_7')
    return res
    # -------------------------------------------------------------------------

In [22]:
res = task_7(data_io, data_dict['ml_features_train'], data_dict['ml_features_test'])
pa2.tests.test(res, 'task_7')

{'test_rmse': 0.8915823231982257}
tests for task_7 --------------------------------------------------------------
Test 1/1 : test_rmse ... Pass
1/1 passed
-------------------------------------------------------------------------------


True

# Task 8

In [6]:
def task_8(data_io, train_data, test_data):
    
    # ---------------------- Your implementation begins------------------------
    
    train, test = train_data.randomSplit([0.75, 0.25], seed=12345)
    para_array = [5,7,9,12]
    rmse_array = []
    model_array = []
    for x in para_array:
        dt = M.regression.DecisionTreeRegressor(labelCol='overall', maxDepth=x)
        dt_model = dt.fit(train)
        test_data_pred = dt_model.transform(test)
        evaluator = M.evaluation.RegressionEvaluator(predictionCol='prediction',
                                                labelCol='overall', metricName='rmse')
        rmse = evaluator.evaluate(test_data_pred)
        rmse_array.append(rmse)
        model_array.append(dt_model)
    
    # use the best
    index = rmse_array.index(min(rmse_array))
    best_model = model_array[index]
    
    test_data_pred = best_model.transform(test_data)
    evaluator = M.evaluation.RegressionEvaluator(predictionCol='prediction',
                                                labelCol='overall', metricName='rmse')
    rmse_best = evaluator.evaluate(test_data_pred)
    
    
    
    # -------------------------------------------------------------------------
    
    
    # ---------------------- Put results in res dict --------------------------
    res = {
        'test_rmse': None,
        'valid_rmse_depth_5': None,
        'valid_rmse_depth_7': None,
        'valid_rmse_depth_9': None,
        'valid_rmse_depth_12': None,
    }
    # Modify res:
    
    res["test_rmse"] =  rmse_best
    res["valid_rmse_depth_5"] = rmse_array[0]
    res["valid_rmse_depth_7"] = rmse_array[1]
    res["valid_rmse_depth_9"] = rmse_array[2]
    res["valid_rmse_depth_12"] = rmse_array[3]
    
    # TODO: delete this
    print(res)

    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_8')
    return res
    # -------------------------------------------------------------------------

In [7]:
res = task_8(data_io, data_dict['ml_features_train'], data_dict['ml_features_test'])
pa2.tests.test(res, 'task_8')

{'test_rmse': 0.8657692186955247, 'valid_rmse_depth_5': 0.8891954350031208, 'valid_rmse_depth_7': 0.8734240833724545, 'valid_rmse_depth_9': 0.8677561873751738, 'valid_rmse_depth_12': 0.8654243192267065}




tests for task_8 --------------------------------------------------------------
Test 1/5 : test_rmse ... Pass
Test 2/5 : valid_rmse_depth_12 ... Pass
Test 3/5 : valid_rmse_depth_5 ... Pass
Test 4/5 : valid_rmse_depth_7 ... Pass
Test 5/5 : valid_rmse_depth_9 ... Pass
5/5 passed
-------------------------------------------------------------------------------


True

In [25]:
print ("End to end time: {}".format(time.time()-begin))

End to end time: 1215.5420143604279
