# 2. Data preparation

In [11]:
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType, ArrayType, LongType

spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "8g").config("spark.executor.memory", "8g").config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","28g").getOrCreate()

## Join data and metadata

In [12]:
categories = ['AMAZON_FASHION', 'All_Beauty', 'Appliances', 'Arts_Crafts_and_Sewing', 'Automotive', 'Books', 'CDs_and_Vinyl', 'Cell_Phones_and_Accessories', 'Clothing_Shoes_and_Jewelry', 'Digital_Music', 'Electronics', 'Gift_Cards', 'Grocery_and_Gourmet_Food', 'Home_and_Kitchen', 'Industrial_and_Scientific', 'Kindle_Store', 'Luxury_Beauty', 'Magazine_Subscriptions', 'Movies_and_TV', 'Musical_Instruments', 'Office_Products', 'Patio_Lawn_and_Garden', 'Pet_Supplies', 'Prime_Pantry', 'Software', 'Sports_and_Outdoors', 'Tools_and_Home_Improvement', 'Toys_and_Games', 'Video_Games']

In [3]:
schema = StructType([
    StructField('asin', StringType()),
    StructField('image', ArrayType(StringType())),
    StructField('overall', DoubleType()),
    StructField('reviewText', StringType()),
    StructField('reviewTime', StringType()),
    StructField('reviewerID', StringType()),
    StructField('reviewerName', StringType()),
    StructField('style', StructType([
        StructField('Color', StringType()),
        StructField('Color Name', StringType()),
        StructField('Design', StringType()),
        StructField('Flavor', StringType()),
        StructField('Format', StringType()),
        StructField('Item Package Quantity', StringType()),
        StructField('Package Quantity', StringType()),
        StructField('Package Type', StringType()),
        StructField('Pattern', StringType()),
        StructField('Scent Name', StringType()),
        StructField('Size', StringType()),
        StructField('Size Name', StringType()),
        StructField('Style', StringType()),
        StructField('Style Name', StringType()),
    ])),
    StructField('summary', StringType()),
    StructField('unixReviewTime', LongType()),
    StructField('verified', BooleanType()),
    StructField('vote', StringType()),
])

In [None]:
for category in categories:
    print(f'Processing {category}')

    data = ps.DataFrame(spark.read.schema(schema).json(f'/data/json/{category}.json'))

    meta = ps.read_json(f'/data/json/meta_{category}.json', index_col='asin')
    meta = meta.drop(['similar_item', 'details', 'tech1', 'tech2'], axis=1)

    df = data.join(meta, on='asin')
    df.reset_index(inplace=True)
    df.set_index(['reviewerID', 'asin'], inplace=True)
    df['category'] = category
    df.to_parquet(f'/data/{category}.parquet', index_col=['reviewerID', 'asin'])

    print(f'Finished {category}')

## Concatenate categories

In [13]:
df_list = [ps.read_parquet(f'/data/{category}.parquet', index_col=['reviewerID', 'asin']) for category in categories]

In [14]:
df = ps.concat(df_list)

In [10]:
df.to_parquet('/data/data.parquet', index_col=['reviewerID', 'asin'])

                                                                                