## <font color='red'> INSTRUCTIONS </font>

<b> 
1. Write your code only in cells below the "WRITE CODE BELOW" title. Do not modify the code below the "DO NOT MODIFY" title. <br>
2. The expected data types of the output answers for each question are given in the last cell through assertion statements. Your answers must match these expected output data types. Hint: Many of the answers need to be a Python dictionary. Consider methods like to_dict() to convert a Pandas Series to a dictionary. <br>
3. The answers are then written to a JSON file named my_results_PA1.json. You can compare this with the provided expected output file "expected_results_PA1.json". <br>
4. After you complete writing your code, click "Kernel -> Restart Kernel and Run All Cells" on the top toolbar. There should NOT be any syntax/runtime errors, otherwise points will be deducted. <br>
5. For submitting your solution, first download your notebook by clicking "File -> Download". Rename the file as &ltTEAM_ID&gt.ipynb" and upload to Canvas.</b>


## <font color='red'> DO NOT MODIFY </font>

In [1]:
import time
import json
import dask
import dask.dataframe as dd
import pandas as pd
import ast
import re
from dask.distributed import Client
import ctypes
import numpy as np

def trim_memory() -> int:
    """
    helps to fix any memory leaks.
    """
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client = Client("127.0.0.1:8786")
client.run(trim_memory)
client = client.restart()
print(client)

None


In [2]:
start = time.time()

## <font color='blue'> WRITE CODE BELOW </font>

In [3]:
### read in the 'user_reviews.csv' and 'products.csv' files, perform your calculations and place the answers in variables ans1 - ans7.

df_reviews = dd.read_csv(
    "user_reviews.csv",
    dtype={
        "reviewerID": "object", 
        "asin": "object", 
        "reviewerName": "object",
        "helpful": "object", 
        "reviewText": "object", 
        "overall": "float64",
        "summary": "object", 
        "unixReviewTime": "float64", 
        "reviewTime": "object"
    },
    blocksize="64MB"
)
df_products = dd.read_csv(
    "products.csv",
    dtype={
        "asin": "object", 
        "salesRank": "object", 
        "imUrl": "object",
        "categories": "object", 
        "title": "object", 
        "description": "object",
        "price": "float64", 
        "related": "object", 
        "brand": "object"
    },
    blocksize="64MB"
)

df_merged = df_reviews[['asin', 'overall']].merge(df_products[['asin', 'price']], 
                                       on='asin', how='left', indicator=True)

ans1 = (df_reviews.isnull().mean() * 100).round(2)

ans2 = (df_products.isnull().mean() * 100).round(2)

desc = df_products['price'].describe()

ans1, ans2, desc = dd.compute(ans1, ans2, desc)
ans1 = ans1.to_dict()
ans2 = ans2.to_dict()

desc = desc.loc[['mean', 'std', 'min', 'max', '50%']]
ans4 = {
    'mean': desc['mean'],
    'min': desc['min'],
    'max': desc['max'],
    'std': desc['std'],
    'median': desc['50%']
}

price_rate = df_merged[['price', 'overall']].dropna().persist()

avgPrice = desc['mean']

price_rating = price_rate['overall'].mean()

price_std = price_rate['overall'].std()

avgRating, stdRating = dd.compute(price_rating, price_std)

stdPrice = desc['std']
cov = ((price_rate['price'] - avgPrice) * (price_rate['overall'] - avgRating)).mean()
ans3 = float((cov/(stdPrice * stdRating)).round(2).compute())

def helper(part):
    def get_super(myStr):
        strs = ast.literal_eval(myStr)
        if strs and isinstance(strs, list) and isinstance(strs[0], list):
            return strs[0][0]
        return None
    part['super_category'] = part['categories'].dropna().apply(get_super)
    return part

meta_var = df_products._meta.assign(super_category='object')

df_products = df_products.map_partitions(helper, meta = meta_var)

category_counts = df_products.groupby('super_category')['asin'].count(split_out=48).compute()

ans5 = category_counts.sort_values(ascending=False).to_dict()
ans5.pop("", None)

df_products = df_products.drop(columns = ['categories', 'super_category'], axis = 1).persist()

dangling_exists = (df_merged['_merge'] == 'left_only').any().compute()

ans6 = int(dangling_exists)

#q7
df_asin = df_products['asin'].dropna().persist()
asin_set = set(df_asin.compute())
    
related = df_products['related'].dropna().repartition(npartitions=48).persist()
ans7 = 0

for part in related.to_delayed():
    part_df = part.compute()
    for rel in part_df:
        myDict = ast.literal_eval(rel)
        vals = myDict.values()
        ids = [pid for lst in vals for pid in lst]
        if any(pid not in asin_set for pid in ids):
            ans7 = 1
            break
    if ans7 == 1:
        break


## <font color='red'> DO NOT MODIFY </font>

In [4]:
end = time.time()

In [5]:
print(f"execution time = {end-start}s")

execution time = 230.16935181617737s


In [6]:
# DO NOT MODIFY
assert type(ans1) == dict, f"answer to question 1 must be a dictionary like {{'reviewerID':0.2, ..}}, got type = {type(ans1)}"
assert type(ans2) == dict, f"answer to question 2 must be a dictionary like {{'asin':0.2, ..}}, got type = {type(ans2)}"
assert type(ans3) == float, f"answer to question 3 must be a float like 0.8, got type = {type(ans3)}"
assert type(ans4) == dict, f"answer to question 4 must be a dictionary like {{'mean':0.4,'max':0.6,'median':0.6...}}, got type = {type(ans4)}"
assert type(ans5) == dict, f"answer to question 5 must be a dictionary, got type = {type(ans5)}"         
assert ans6 == 0 or ans6==1, f"answer to question 6 must be 0 or 1, got value = {ans6}" 
assert ans7 == 0 or ans7==1, f"answer to question 7 must be 0 or 1, got value = {ans7}" 

ans_dict = {
    "q1": ans1,
    "q2": ans2,
    "q3": ans3,
    "q4": ans4,
    "q5": ans5,
    "q6": ans6,
    "q7": ans7,
    "runtime": end-start
}
with open('my_results_PA1.json', 'w') as outfile: json.dump(ans_dict, outfile)         