## <font color='red'> INSTRUCTIONS </font>

<b> 
1. Write your code only in cells below the "WRITE CODE BELOW" title. Do not modify the code below the "DO NOT MODIFY" title. <br>
2. The expected data types of the output answers for each question are given in the last cell through assertion statements. Your answers must match these expected output data types. Hint: Many of the answers need to be a Python dictionary. Consider methods like to_dict() to convert a Pandas Series to a dictionary. <br>
3. The answers are then written to a JSON file named my_results_PA1.json. You can compare this with the provided expected output file "expected_results_PA1.json". <br>
4. After you complete writing your code, click "Kernel -> Restart Kernel and Run All Cells" on the top toolbar. There should NOT be any syntax/runtime errors, otherwise points will be deducted. <br>
5. For submitting your solution, first download your notebook by clicking "File -> Download". Rename the file as &ltTEAM_ID&gt.ipynb" and upload to Canvas.</b>


## <font color='red'> DO NOT MODIFY </font>

In [1]:
import time
import json
import dask
import dask.dataframe as dd
import pandas as pd
import ast
import re
from dask.distributed import Client
import ctypes
import numpy as np

def trim_memory() -> int:
    """
    helps to fix any memory leaks.
    """
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client = Client("127.0.0.1:8786")
client.run(trim_memory)
client.restart()
print(client)

<Client: 'tcp://172.31.43.150:8786' processes=6 threads=6, memory=23.42 GiB>


In [2]:
start = time.time()

## <font color='blue'> WRITE CODE BELOW </font>

In [3]:
from dask.distributed import as_completed

def get_supercat(cat_str):
    try:
        cat_list = ast.literal_eval(cat_str)
        if isinstance(cat_list, list) and cat_list and isinstance(cat_list[0], list) and cat_list[0] and cat_list[0][0]:
            return cat_list[0][0]
    except:
        return None
    return None

def check_dangling(partition, products_subset):
    merged = partition[['asin']].merge(products_subset, on='asin', how='left', indicator=True)
    return int((merged['_merge'] == 'left_only').any().compute())

def safe_extract(related_str):
    try:
        d = ast.literal_eval(related_str)
        ids = []
        for k in ['also_bought', 'also_viewed', 'bought_together', 'buy_after_viewing']:
            val = d.get(k)
            if isinstance(val, list):
                ids.extend(val)
        return ids
    except:
        return []

def check_related_partition(partition, valid_asins_pd):
    related_series = partition['related'].dropna().apply(safe_extract)
    exploded = related_series.explode().dropna()
    exploded_df = exploded.to_frame(name='asin')
    merged = exploded_df.merge(valid_asins_pd, on='asin', how='left', indicator=True)
    return int((merged['_merge'] == 'left_only').any().compute())

reviews = dd.read_csv('user_reviews.csv', dtype={'asin': 'object'})
products = dd.read_csv('products.csv', dtype={'asin': 'object'})

reviews = reviews.persist()
products = products.persist()

### Question 1
q1 = reviews.isna().sum() / len(reviews) * 100

### Question 2
q2 = products.isna().sum() / len(products) * 100
merged = dd.merge(
    reviews[['asin', 'overall']],
    products[['asin', 'price']],
    on='asin',
    how='inner'
).dropna(subset=['overall', 'price'])

### Question 3
q3 = merged[['overall', 'price']].corr('pearson',numeric_only=True)
price = products['price'].dropna()

### Question 4
q4 = {
    'mean':    price.mean(),
    'std':     price.std(),
    'min':     price.min(),
    'max':     price.max(),
    'median':  price.quantile(0.5)
}

### Question 5
q5 = products['categories'].dropna().map(get_supercat, meta=('supercat', 'object')).value_counts()

ans1_5 = dd.compute(q1,q2,q3,q4,q5)
ans1_5

### Question 6
product_asins_df_future = client.scatter(products[['asin']], broadcast=True)

delayed_parts = reviews.to_delayed()
futures = [client.submit(check_dangling, part, product_asins_df_future) for part in delayed_parts]

q6 = 0
for fut in as_completed(futures):
    if fut.result():
        q6 = 1
        break
client.cancel(futures)

### Question 7
delayed_parts = products[['related']].dropna().to_delayed()
futures = [client.submit(check_related_partition, part, product_asins_df_future) for part in delayed_parts]

q7 = 0
for fut in as_completed(futures):
    if fut.result():
        q7 = 1
        break

In [4]:
### read in the 'user_reviews.csv' and 'products.csv' files, perform your calculations and place the answers in variables ans1 - ans7.


# substitute 'None' with the outputs from your calculations. 
# The expected output types can be seen in the assertion statements below
ans1 = ans1_5[0].round(2).to_dict()
ans2 = ans1_5[1].round(2).to_dict()
ans3 = float(ans1_5[2].iloc[0,1].round(2))
ans4 = {k: float(x.round(2)) for k, x in ans1_5[3].items()}
ans5 = ans1_5[4].sort_values(ascending=False).to_dict()
ans6 = q6
ans7 = q7

## <font color='red'> DO NOT MODIFY </font>

In [5]:
end = time.time()

In [6]:
print(f"execution time = {end-start}s")

execution time = 241.45133423805237s


In [7]:
# DO NOT MODIFY
assert type(ans1) == dict, f"answer to question 1 must be a dictionary like {{'reviewerID':0.2, ..}}, got type = {type(ans1)}"
assert type(ans2) == dict, f"answer to question 2 must be a dictionary like {{'asin':0.2, ..}}, got type = {type(ans2)}"
assert type(ans3) == float, f"answer to question 3 must be a float like 0.8, got type = {type(ans3)}"
# assert type(ans4) == dict, f"answer to question 4 must be a dictionary like {{'mean':0.4,'max':0.6,'median':0.6...}}, got type = {type(ans4)}"
assert type(ans5) == dict, f"answer to question 5 must be a dictionary, got type = {type(ans5)}"         
assert ans6 == 0 or ans6==1, f"answer to question 6 must be 0 or 1, got value = {ans6}" 
assert ans7 == 0 or ans7==1, f"answer to question 7 must be 0 or 1, got value = {ans7}" 

ans_dict = {
    "q1": ans1,
    "q2": ans2,
    "q3": ans3,
    "q4": ans4,
    "q5": ans5,
    "q6": ans6,
    "q7": ans7,
    "runtime": end-start
}
with open('my_results_PA1.json', 'w') as outfile: json.dump(ans_dict, outfile)         