# Amazon Review Data Pipeline

The goal of this jupyter notebook is to:
1. Load Amazon Review data acquired from [https://nijianmo.github.io/amazon/index.html](https://nijianmo.github.io/amazon/index.html)
2. Turn it into a Spark RDD
3. Create a Bag-of-Words with:
    - Single words
    - 1-grams and 2-grams
4. Visualize the frequency of these words occurences in the Review data
5. Create a LabeledPoint RDD for each review and save it as a libsvm file.

## Preparing Data

The data cannot be read directly from it's gzipped format into a Spark DataFrame. It produces Schema errors because there are a few naming collisions in the Json data.

Instead, the data must be unzipped and a few of its errors must be correct before being loaded.

The following script will achieve that. Specifically, it will ensure consistent casing for the field names 'style' and 'styleName'.

Note: it is possible that both `gzip` and `sed` will need to be installed.

In [1]:
# %%bash

# # Decompress Gzip
# gzip -dk $PWD/data/Musical_Instruments_5.json.gz

# # # Replace problematic field names
# sed -i "s:style Name:styleName:" $PWD/data/Musical_Instruments_5.json
# sed -i "s:style name:styleName:" $PWD/data/Musical_Instruments_5.json
# sed -i "s:Style:style:" $PWD/data/Musical_Instruments_5.json

## Imports

In [2]:
from pyspark.sql import SparkSession
from pyspark.mllib.util import MLUtils
import os
import gzip
import json
import re

## Spark DataFrame

The goal of this section is to convert the Pandas dataframe from above into a Spark dataframe, to drop all columns except 'overall' (rating) and 'reviewText' and create an RDD from this data.

#### Creating Spark Session

In [3]:
spark = SparkSession.builder \
    .master("local[16]") \
    .appName("data_pipeline") \
    .getOrCreate()

22/12/15 19:09:58 WARN Utils: Your hostname, Zephyrus resolves to a loopback address: 127.0.1.1; using 172.19.222.122 instead (on interface eth0)
22/12/15 19:09:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/15 19:10:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/15 19:10:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### Creating Spark Dataframe

In [4]:
spark_df = spark.read.json('data/Musical_Instruments_5.json')
spark_df.printSchema()

[Stage 0:>                                                        (0 + 16) / 16]

root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color Name:: string (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Configuration:: string (nullable = true)
 |    |-- Edition:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Item Display Length:: string (nullable = true)
 |    |-- Item Package Quantity:: string (nullable = true)
 |    |-- Length:: string (nullable = true)
 |    |-- Model Number:: string (nullable = true)
 |    |-- Number of Items:: string (nullable = true)
 |    |-- Package Quantity:: string (nullable = true)
 |    |-- Package Type:: string (nullable = true)
 |    |-- Platform for Display:

                                                                                

In [5]:
print(f"There are {spark_df.count()} rows of data")

There are 231392 rows of data


## Cleaning Data

1. Drop all columns except `overall` and `reviewTest`
2. Drop all rows with NA values
3. Convert to RDD
4. Convert to lowercase
5. Strip non-alphabetic/space chars
6. Split into vector of strings
7. Remove empty reviews
8. Simplify rating into 0 or 1

In [6]:
# Drop all columns except reviewText and overall
df = spark_df[['overall', 'reviewText']]
df.printSchema()

# Drop all rows with NA values
df = df.dropna()

# Convert to rdd
rdd = df.rdd

# Convert to lowercase
rdd = rdd.map(lambda x: (x[0], x[1].lower()))

# Strip non-alphanumeric & space chars
rdd = rdd.map(lambda x: (x[0], re.sub(r'[^A-Za-z ]+', '', x[1])))

# Convert review to array of words
rdd = rdd.map(lambda x: (x[0], x[1].split()))

# Convert overall rating to 1 or 0
# 4-5 -> Positive
# 1-3 -> Negative
rdd = rdd.map(lambda x: (1, x[1]) if x[0] in [4,5] else (0, x[1]))

# Filter out empty reviews
basic_rdd = rdd.filter(lambda x: len(x[1]) > 0)

# Cache as basic_rdd
total_rows = basic_rdd.count()
basic_rdd.cache

root
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)



                                                                                

<bound method RDD.cache of PythonRDD[18] at RDD at PythonRDD.scala:53>

## Creating Bags-of-Words

### Method for Filtering Stop Words

For all $(k, v)$ where $v$ is a list of strings, remove all $e$ from $v$ where $e \in \text{stopwords}$. 

Stopwords sourced from [here](http://snowball.tartarus.org/algorithms/english/stop.txt).

In [7]:
stopwords = []

with open("data/stopwords.txt") as file:
    global stopwords
    stopwords = file.read().splitlines()

### Creating Set of OneGrams

In [8]:
def remove_stopwords(x):
    global stopwords
    result = []
    for s in x[1]:
        if not s in stopwords:
            result.append(s)
    return (x[0], result)

In [9]:
positive_onegram_counts = basic_rdd \
    .filter(lambda x: x[0] == 1) \
    .map(remove_stopwords) \
    .flatMap(lambda x: x[1]) \
    .map(lambda x: (tuple([x]), 1)) \
    .reduceByKey(lambda a,b: a+b) \
    .filter(lambda x: x[1] > 1) \
    .sortBy(lambda x: -x[1])

negative_onegram_counts = basic_rdd \
    .filter(lambda x: x[0] == 0) \
    .map(remove_stopwords) \
    .flatMap(lambda x: x[1]) \
    .map(lambda x: (tuple([x]), 1)) \
    .reduceByKey(lambda a,b: a+b) \
    .filter(lambda x: x[1] > 1) \
    .sortBy(lambda x: -x[1])

                                                                                

In [10]:
num_shared_pos_onegrams = positive_onegram_counts.count()
num_shared_neg_onegrams = negative_onegram_counts.count()

positive_onegram_counts.cache
negative_onegram_counts.cache

print(f"There are {num_shared_pos_onegrams} shared Positive OneGrams in the corpus")
print(f"There are {num_shared_neg_onegrams} shared Negative OneGrams in the corpus")

There are 53160 shared Positive OneGrams in the corpus
There are 21554 shared Negative OneGrams in the corpus


In [11]:
print("Positive OneGrams:")
for a in positive_onegram_counts.take(6):
    print('\t', end="")
    print(a)

print("Negative OneGrams:")
for a in negative_onegram_counts.take(6):
    print('\t', end="")
    print(a)

Positive OneGrams:
	(('great',), 74484)
	(('good',), 54966)
	(('guitar',), 51021)
	(('sound',), 47321)
	(('one',), 44627)
	(('like',), 44025)
Negative OneGrams:
	(('one',), 10232)
	(('just',), 9416)
	(('like',), 9228)
	(('good',), 8735)
	(('guitar',), 8474)
	(('sound',), 8006)


Number of distinct words:

### Creating Set of TwoGrams

In [12]:
n = 2

def ngram(x):
    result = []
    for i in range(len(x[1])-n+1):
        gram = []
        for j in range(i, i+n):
            gram.append(x[1][j])
        result.append(tuple(gram))
    return (x[0], result)

def ngram_remove_stopwords(x):
    global stopwords
    result = []
    for s in x[1]:
        if not s[0] in stopwords and not s[-1] in stopwords:
            result.append(s)
    return (x[0], result)

In [13]:
positive_twogram_counts = basic_rdd \
    .filter(lambda x: x[0] == 1) \
    .map(ngram) \
    .map(ngram_remove_stopwords) \
    .flatMap(lambda x: [(a, 1) for a in x[1]]) \
    .reduceByKey(lambda a,b: a+b) \
    .filter(lambda x: x[1] > 1) \
    .sortBy(lambda x: -x[1])

negative_twogram_counts = basic_rdd \
    .filter(lambda x: x[0] == 0) \
    .map(ngram) \
    .map(ngram_remove_stopwords) \
    .flatMap(lambda x: [(a, 1) for a in x[1]]) \
    .reduceByKey(lambda a,b: a+b) \
    .filter(lambda x: x[1] > 1) \
    .sortBy(lambda x: -x[1])

                                                                                

In [14]:
num_shared_pos_twograms = positive_twogram_counts.count()
num_shared_neg_twograms = negative_twogram_counts.count()

positive_twogram_counts.cache
negative_twogram_counts.cache

print(f"There are {num_shared_pos_twograms} shared Positive TwoGrams in the corpus")
print(f"There are {num_shared_neg_twograms} shared Negative TwoGrams in the corpus")

There are 245943 shared Positive TwoGrams in the corpus
There are 59697 shared Negative TwoGrams in the corpus


In [15]:
print("Positive TwoGrams:")
for a in positive_twogram_counts.take(6):
    print('\t', end="")
    print(a)

print("Negative TwoGrams:")
for a in negative_twogram_counts.take(6):
    print('\t', end="")
    print(a)

Positive TwoGrams:
	(('works', 'great'), 7120)
	(('good', 'quality'), 3826)
	(('great', 'product'), 3631)
	(('great', 'price'), 3372)
	(('highly', 'recommend'), 2990)
	(('sounds', 'great'), 2702)
Negative TwoGrams:
	(('much', 'better'), 658)
	(('dont', 'know'), 628)
	(('can', 'get'), 451)
	(('power', 'supply'), 441)
	(('didnt', 'work'), 425)
	(('sound', 'quality'), 424)


### Determining Bag Sizes

In [47]:
onegrams = set(positive_onegram_counts.map(lambda x: x[0]).collect()).union(set(negative_onegram_counts.map(lambda x: x[0]).collect()))
twograms = set(positive_twogram_counts.map(lambda x: x[0]).collect()).union(set(negative_twogram_counts.map(lambda x: x[0]).collect()))

In [50]:
print(f"There are {len(onegrams)} unique shared onegrams")
print(f"There are {len(twograms)} unique shared onegrams")

There are 56745 unique shared onegrams
There are 268549 unique shared onegrams


In [92]:
bagsize_onegram = int(len(onegrams)/10)
bagsize_twogram = int(len(twograms)/10)

print(f"OneGram Bag Size: {bagsize_onegram}")
print(f"TwoGram Bag Size: {bagsize_twogram}")

OneGram Bag Size: 5674
TwoGram Bag Size: 26854


### Generating Bag of Top 10% of OneGrams

In [93]:
top_pos_onegrams = set(positive_onegram_counts.map(lambda x: x[0]).take(bagsize_onegram))
top_neg_onegrams = set(negative_onegram_counts.map(lambda x: x[0]).take(bagsize_onegram))
common_onegrams = top_pos_onegrams.intersection(top_neg_onegrams)

In [94]:
num_pos = int((bagsize_onegram - len(common_onegrams)) / 2)
num_neg = bagsize_onegram - len(common_onegrams) - num_pos

unique_pos_onegrams = set(positive_onegram_counts \
    .map(lambda x: x[0]) \
    .filter(lambda x: not x in common_onegrams) \
    .take(num_pos))

unique_neg_onegrams = set(negative_onegram_counts \
    .map(lambda x: x[0]) \
    .filter(lambda x: not x in common_onegrams) \
    .take(num_neg))

bag_onegram = common_onegrams.union(unique_neg_onegrams).union(unique_pos_onegrams)

In [95]:
print(f"There are {len(common_onegrams)} common OneGrams")
print(f"We will include {len(unique_pos_onegrams)} unique positive OneGrams")
print(f"We will include {len(unique_neg_onegrams)} unique negative OneGrams")
print(f"There are {len(bag_onegram)} OneGrams in total")

There are 4890 common OneGrams
We will include 392 unique positive OneGrams
We will include 392 unique negative OneGrams
There are 5674 OneGrams in total


### Generating Bag of Top 10% of TwoGrams

In [96]:
top_pos_twograms = set(positive_twogram_counts.map(lambda x: x[0]).take(bagsize_twogram))
top_neg_twograms = set(negative_twogram_counts.map(lambda x: x[0]).take(bagsize_twogram))
common_twograms = top_pos_twograms.intersection(top_neg_twograms)

In [97]:
num_pos_two = int((bagsize_twogram - len(common_twograms)) / 2)
num_neg_two = bagsize_twogram - len(common_twograms) - num_pos_two

unique_pos_twograms = set(positive_twogram_counts \
    .map(lambda x: x[0]) \
    .filter(lambda x: not x in common_twograms) \
    .take(num_pos_two))

unique_neg_twograms = set(negative_twogram_counts \
    .map(lambda x: x[0]) \
    .filter(lambda x: not x in common_twograms) \
    .take(num_neg_two))

bag_twogram = common_twograms.union(unique_neg_twograms).union(unique_pos_twograms)

In [98]:
print(f"There are {len(common_twograms)} common TwoGrams")
print(f"We will include {len(unique_pos_twograms)} unique Positive TwoGrams")
print(f"We will include {len(unique_neg_twograms)} unique Negative TwoGrams")
# print(f"We will include {len(bag_3k)} OneGrams")
print(f"There are {len(bag_twogram)} NGrams in total")

There are 13879 common TwoGrams
We will include 6487 unique Positive TwoGrams
We will include 6488 unique Negative TwoGrams
There are 26854 NGrams in total


## Create Data Sets

### Generate OneGram RDD

In [99]:
onegram_tuples_rdd = basic_rdd \
    .map(lambda x: (x[0], [tuple([s]) for s in x[1]]))

### Generate TwoGram RDD

In [100]:
n = 2
def two_grams(x):
    result = []
    # for s in x[1]:
    #     result.append(tuple([s])) # Get all OneGrams
    for i in range(len(x[1])-n+1):
        gram = []
        for j in range(i, i+n):
            gram.append(x[1][j])
        result.append(tuple(gram)) # Get all TwoGrams
    return (x[0], result)

In [101]:
twogram_tuples_rdd = basic_rdd.map(two_grams)

### Filter Data

In [102]:
def bag_filter(x, bag):
    global count
    result = []
    for s in x[1]:
        if s in bag:
            result.append(s)
    return (x[0], result)

In [103]:
count = 0
data_onegram = onegram_tuples_rdd \
    .map(lambda x: bag_filter(x, bag_onegram)) \
    .filter(lambda x: len(x[1]) > 0) # Ensure there is at least one feature

data_twogram = twogram_tuples_rdd \
    .map(lambda x: bag_filter(x, bag_twogram)) \
    .filter(lambda x: len(x[1]) > 0) # Ensure there is at least one feature

num_remaining_one = data_onegram.count()
num_remaining_two = data_twogram.count()

                                                                                

In [104]:
print(f"{num_remaining_one/total_rows*100:.2f}% of rows remain after Bag_OneGram filter")
print(f"{num_remaining_two/total_rows*100:.2f}% of rows remain after Bag_TwoGram filter")

99.53% of rows remain after Bag_OneGram filter
84.54% of rows remain after Bag_TwoGram filter


## Prepare Data for Logistic Regression

### Convert to RDD\<LabeledPoint\>

In [105]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import SparseVector

In [106]:
def convert_to_lp_rdd(x, bag):
    feature_ids = []
    feature_vals = []
    for id,val in enumerate(bag):
        if val in x[1]:
            feature_ids.append(id)
            feature_vals.append(1)
    return LabeledPoint(x[0], SparseVector(len(bag), feature_ids, feature_vals))

In [107]:
from pyspark.mllib.util import MLUtils
import os

In [108]:
data_A = data_onegram.map(lambda x: convert_to_lp_rdd(x, bag_onegram))
data_B = data_twogram.map(lambda x: convert_to_lp_rdd(x, bag_twogram))

if not os.path.isdir("data/data_A"):
    MLUtils.saveAsLibSVMFile(data_A, "data/data_A")
if not os.path.isdir("data/data_B"):
    MLUtils.saveAsLibSVMFile(data_B, "data/data_B")

                                                                                