In [85]:
from os import walk
import pandas as pd
import re
import pickle

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sql_context = SQLContext(sc)

In [4]:
# Directory of the .tsv.gz files (original dataset approx. 33G)
DATA_DIR = './data/'

The Amazon review dataset consists of set of tab separated csv files with the following columns:
 - marketplace (country id, 2 digits), 
 - customer_id (a unique reviewer identifier),
 - review_id,
 - product_id,
 - product_parent (a unique product identifier),
 - product_title,
 - product_category,
 - star_rating (0-5 scale),
 - helpful_votes (number of votes saying that review is useful)
 - total_votes (number of votes in total),
 - vine (true/false if the review was written as part of Vine program),
 - verfied_purchase (true/false),
 - review_headline (review title),
 - review_body (text),
 - review_date.
 
 Here comes some sample data:

# Sample data

In [26]:
sample_pd = pd.read_csv('sample_us.tsv', delimiter='\t', error_bad_lines=False)
sample_pd.head()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date
0,US,18778586,RDIJS7QYB6XNR,B00EDBY7X8,122952789,Monopoly Junior Board Game,Toys,5,0,0,N,Y,Five Stars,Excellent!!!,2015-08-31
1,US,24769659,R36ED1U38IELG8,B00D7JFOPC,952062646,56 Pieces of Wooden Train Track Compatible wit...,Toys,5,0,0,N,Y,Good quality track at excellent price,Great quality wooden track (better than some o...,2015-08-31
2,US,44331596,R1UE3RPRGCOLD,B002LHA74O,818126353,Super Jumbo Playing Cards by S&S Worldwide,Toys,2,1,1,N,Y,Two Stars,Cards are not as big as pictured.,2015-08-31
3,US,23310293,R298788GS6I901,B00ARPLCGY,261944918,Barbie Doll and Fashions Barbie Gift Set,Toys,5,0,0,N,Y,my daughter loved it and i liked the price and...,my daughter loved it and i liked the price and...,2015-08-31
4,US,38745832,RNX4EXOBBPN5,B00UZOPOFW,717410439,Emazing Lights eLite Flow Glow Sticks - Spinni...,Toys,1,1,1,N,Y,DONT BUY THESE!,Do not buy these! They break very fast I spun ...,2015-08-31


# Filter data out and save to parquet

As first step, we will gather all filenames in order to filter out the relevant information and then save resulting dataframes in more convenient parquet format. The parquet format can be read by either pyspark or pandas.

In [38]:
# Get list of all files in DATA_DIR
review_files = []
for (dirpath, dirnames, filenames) in walk(DATA_DIR):
    review_files.extend(filenames)
    break
    
# Put them in a DataFrame
df_all_reviews = pd.DataFrame({'filename' : review_files})
df_all_reviews.head()

Unnamed: 0,filename
0,amazon_reviews_us_Shoes_v1_00.tsv.gz
1,amazon_reviews_us_Mobile_Apps_v1_00.tsv.gz
2,amazon_reviews_us_Personal_Care_Appliances_v1_...
3,amazon_reviews_multilingual_US_v1_00.tsv.gz
4,amazon_reviews_us_Home_Improvement_v1_00.tsv.gz


In [54]:
# Extract basename (without the extension)
df_all_reviews['parquet'] = df_all_reviews['filename'].apply(lambda s: re.search(r'(.*?).tsv.gz', s).group(1))+'.parquet.gzip'
df_all_reviews.head()

Unnamed: 0,filename,parquet
0,amazon_reviews_us_Shoes_v1_00.tsv.gz,amazon_reviews_us_Shoes_v1_00.parquet.gzip
1,amazon_reviews_us_Mobile_Apps_v1_00.tsv.gz,amazon_reviews_us_Mobile_Apps_v1_00.parquet.gzip
2,amazon_reviews_us_Personal_Care_Appliances_v1_...,amazon_reviews_us_Personal_Care_Appliances_v1_...
3,amazon_reviews_multilingual_US_v1_00.tsv.gz,amazon_reviews_multilingual_US_v1_00.parquet.gzip
4,amazon_reviews_us_Home_Improvement_v1_00.tsv.gz,amazon_reviews_us_Home_Improvement_v1_00.parqu...


Now, let's choose columns we want to analyse and save resulting data frames in parquet format.

In [40]:
# Directory of the reduced datasets in parquet format (approx. 2.6 GB)
PARQUET_DIR = './parquet/'

In [67]:
# First, define a function that properly reads a tsv file to pyspark DF
def read_tsv_to_pyspark_DF(filename):
    schema = StructType([
        StructField('marketplace', StringType(), True), #2 letter country code
        StructField('customer_id', IntegerType(), True), #author identifier
        StructField('review_id', StringType(), True), #unique review ID
        StructField('product_id', StringType(), True), # unique product ID
        StructField('product_parent', IntegerType(), True), # product identifier to be used to aggregate reviews for a product
        StructField('product_title', StringType(), True),
        StructField('product_category', StringType(), True),
        StructField('star_rating', IntegerType(), True), # 1-5 star rating 
        StructField('helpful_votes', IntegerType(), True), # positive votes for the review
        StructField('total_votes', IntegerType(), True), # total votes for the review
        StructField('vine', StringType(), True), # review is part of Vine Program
        StructField('verfied_purchase', StringType(), True), # Review is on Verified Purchase
        StructField('review_headline', StringType(), True), # title of the review
        StructField('review_body', StringType(), True), # text
        StructField('review_date', DateType(), True)]) # date of review 

    return sql_context.read.option('sep', '\t').csv(filename, schema=schema, header=True)

In [42]:
# We choose columns here
columns = ['customer_id', 'review_id', 'product_parent',
          'product_category', 'star_rating', 'helpful_votes', 'total_votes', 'review_date']

In [43]:
# Iterate over rows and create parquets
for index, row in df_all_reviews.iterrows():
    # Read tsv file
    df = read_tsv_to_pyspark_DF(DATA_DIR + row['filename'])

    # Filter out columns
    df_filtered = df.select(columns).dropna()
    
    # Write to parquet
    df_filtered.write.parquet(PARQUET_DIR + row['parquet'], compression='gzip')

# Split multilingual and reviews by category

There are two subsets in the set: multilingual reviews and US reviews by categories. We split them in separate data frames.

In [55]:
multilingual_reviews = df_all_reviews[df_all_reviews.filename.str.contains("multilingual")].copy()
multilingual_reviews

Unnamed: 0,filename,parquet
3,amazon_reviews_multilingual_US_v1_00.tsv.gz,amazon_reviews_multilingual_US_v1_00.parquet.gzip
14,amazon_reviews_multilingual_FR_v1_00.tsv.gz,amazon_reviews_multilingual_FR_v1_00.parquet.gzip
27,amazon_reviews_multilingual_UK_v1_00.tsv.gz,amazon_reviews_multilingual_UK_v1_00.parquet.gzip
36,amazon_reviews_multilingual_JP_v1_00.tsv.gz,amazon_reviews_multilingual_JP_v1_00.parquet.gzip
39,amazon_reviews_multilingual_DE_v1_00.tsv.gz,amazon_reviews_multilingual_DE_v1_00.parquet.gzip


In [56]:
# Extract country info
multilingual_reviews['country'] = multilingual_reviews['filename'].apply(lambda s: re.search(r'multilingual_(.*?)_', s).group(1))
multilingual_reviews.head()

Unnamed: 0,filename,parquet,country
3,amazon_reviews_multilingual_US_v1_00.tsv.gz,amazon_reviews_multilingual_US_v1_00.parquet.gzip,US
14,amazon_reviews_multilingual_FR_v1_00.tsv.gz,amazon_reviews_multilingual_FR_v1_00.parquet.gzip,FR
27,amazon_reviews_multilingual_UK_v1_00.tsv.gz,amazon_reviews_multilingual_UK_v1_00.parquet.gzip,UK
36,amazon_reviews_multilingual_JP_v1_00.tsv.gz,amazon_reviews_multilingual_JP_v1_00.parquet.gzip,JP
39,amazon_reviews_multilingual_DE_v1_00.tsv.gz,amazon_reviews_multilingual_DE_v1_00.parquet.gzip,DE


In [58]:
reviews_categories = df_all_reviews.drop(multilingual_reviews.index).copy()
reviews_categories.head()

Unnamed: 0,filename,parquet
0,amazon_reviews_us_Shoes_v1_00.tsv.gz,amazon_reviews_us_Shoes_v1_00.parquet.gzip
1,amazon_reviews_us_Mobile_Apps_v1_00.tsv.gz,amazon_reviews_us_Mobile_Apps_v1_00.parquet.gzip
2,amazon_reviews_us_Personal_Care_Appliances_v1_...,amazon_reviews_us_Personal_Care_Appliances_v1_...
4,amazon_reviews_us_Home_Improvement_v1_00.tsv.gz,amazon_reviews_us_Home_Improvement_v1_00.parqu...
5,amazon_reviews_us_Digital_Video_Download_v1_00...,amazon_reviews_us_Digital_Video_Download_v1_00...


In [59]:
# Extract country info
reviews_categories['category'] = reviews_categories['filename'].apply(lambda s: re.search(r'us_(.*?)_v', s).group(1))
reviews_categories

Unnamed: 0,filename,parquet,category
0,amazon_reviews_us_Shoes_v1_00.tsv.gz,amazon_reviews_us_Shoes_v1_00.parquet.gzip,Shoes
1,amazon_reviews_us_Mobile_Apps_v1_00.tsv.gz,amazon_reviews_us_Mobile_Apps_v1_00.parquet.gzip,Mobile_Apps
2,amazon_reviews_us_Personal_Care_Appliances_v1_...,amazon_reviews_us_Personal_Care_Appliances_v1_...,Personal_Care_Appliances
4,amazon_reviews_us_Home_Improvement_v1_00.tsv.gz,amazon_reviews_us_Home_Improvement_v1_00.parqu...,Home_Improvement
5,amazon_reviews_us_Digital_Video_Download_v1_00...,amazon_reviews_us_Digital_Video_Download_v1_00...,Digital_Video_Download
6,amazon_reviews_us_Kitchen_v1_00.tsv.gz,amazon_reviews_us_Kitchen_v1_00.parquet.gzip,Kitchen
7,amazon_reviews_us_Electronics_v1_00.tsv.gz,amazon_reviews_us_Electronics_v1_00.parquet.gzip,Electronics
8,amazon_reviews_us_Digital_Music_Purchase_v1_00...,amazon_reviews_us_Digital_Music_Purchase_v1_00...,Digital_Music_Purchase
9,amazon_reviews_us_Video_v1_00.tsv.gz,amazon_reviews_us_Video_v1_00.parquet.gzip,Video
10,amazon_reviews_us_Musical_Instruments_v1_00.ts...,amazon_reviews_us_Musical_Instruments_v1_00.pa...,Musical_Instruments


We notice that in the case of reviews by category there are Books and Digital Ebook Purchase that contains more volumes than one. In the case of books there are three volumes in total. We will merge these.

In [70]:
duplicate_books = reviews_categories[reviews_categories.category == 'Books']
duplicate_books

Unnamed: 0,filename,parquet,category
21,amazon_reviews_us_Books_v1_00.tsv.gz,amazon_reviews_us_Books_v1_00.parquet.gzip,Books
40,amazon_reviews_us_Books_v1_02.tsv.gz,amazon_reviews_us_Books_v1_02.parquet.gzip,Books
46,amazon_reviews_us_Books_v1_01.tsv.gz,amazon_reviews_us_Books_v1_01.parquet.gzip,Books


In [71]:
duplicate_digital = reviews_categories[reviews_categories.category == 'Digital_Ebook_Purchase']
duplicate_digital

Unnamed: 0,filename,parquet,category
15,amazon_reviews_us_Digital_Ebook_Purchase_v1_00...,amazon_reviews_us_Digital_Ebook_Purchase_v1_00...,Digital_Ebook_Purchase
16,amazon_reviews_us_Digital_Ebook_Purchase_v1_01...,amazon_reviews_us_Digital_Ebook_Purchase_v1_01...,Digital_Ebook_Purchase


In [78]:
# Read books' parquets
books = []
for index, row in duplicate_books.iterrows():
    books.append(sql_context.read.parquet(PARQUET_DIR + row['parquet']))
# concatenate them
books_all = books[0].union(books[1].union(books[2]))
# and save to one big parquet
all_books_parquet = 'amazon_reviews_us_Books_All.parquet.gzip'
books_all.write.parquet(PARQUET_DIR+all_books_parquet , compression='gzip')

In [79]:
# Read Digital Ebook Purchase parquets
digital = []
for index, row in duplicate_digital.iterrows():
    digital.append(sql_context.read.parquet(PARQUET_DIR + row['parquet']))
# concatenate them
digital_all = digital[0].union(digital[1])
# and save to one big parquet
all_digital_parquet = 'amazon_reviews_us_Digital_Ebook_Purchase_All.parquet.gzip'
digital_all.write.parquet(PARQUET_DIR+ all_digital_parquet, compression='gzip')

In [88]:
reviews_categories_cleaned = reviews_categories.drop(duplicate_books.index).drop(duplicate_digital.index)
reviews_categories_cleaned = reviews_categories_cleaned.append([{'parquet': all_books_parquet, 'category': 'Books'},
                                  {'parquet': all_digital_parquet, 'category': 'Digital_Ebook_Purchase'}],
                                  ignore_index=True)

In [89]:
with open('category_reviews.pickle', 'wb') as file:
    pickle.dump(reviews_categories_cleaned, file)
    
with open('multilingual_reviews.pickle', 'wb') as file:
    pickle.dump(multilingual_reviews, file)