# Preprocessing

## Setup

In [2]:
import os
import random

import boto3

from itertools import chain
from collections import Counter
import numpy as np
import pyspark

import matplotlib.pyplot as plt

import pickle
import nltk
from nltk.stem.snowball import SnowballStemmer

from pyspark.sql.functions import lit, monotonically_increasing_id, rand, row_number, udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer

from utils import process_review, build_freqs

In [3]:
# set up a giant single executor with many threads and specify memory cap
spark = pyspark.sql.SparkSession \
        .builder \
        .config("spark.executor.instances", 10) \
        .config("spark.driver.memory", "32g") \
        .config('spark.executor.memory', '8g') \
        .getOrCreate()
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

BUCKET_NAME = 'amazon-reviews-pds'
LOCAL_DIR = os.path.expanduser('~') + '/SageMaker/amazon-reviews/data/'
s3_client = boto3.client('s3')
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(BUCKET_NAME)

first = True

for object_summary in my_bucket.objects.filter(Prefix="parquet/product_category=Electronics/"):
    OBJECT_NAME = object_summary.key
    LOCAL_FILE = LOCAL_DIR + OBJECT_NAME.split("/")[-1]
    s3_client.download_file(BUCKET_NAME, OBJECT_NAME, LOCAL_FILE)
    data = spark.read.parquet(LOCAL_FILE)
    if first:
        first = False
    else:
        data = prev_data.union(data)
    prev_data = data

print("done")

done


In [4]:
data.columns

['marketplace',
 'customer_id',
 'review_id',
 'product_id',
 'product_parent',
 'product_title',
 'star_rating',
 'helpful_votes',
 'total_votes',
 'vine',
 'verified_purchase',
 'review_headline',
 'review_body',
 'review_date',
 'year']

In [5]:
# remove unnecessary data to save space
data=data.select('review_body', 'star_rating')
data = data.filter(data['review_body'].isNotNull())

In [6]:
# Positive reviews have a rating greater than 3, and
# negative reviews have a rating less than 3
positive_reviews = data.where(data['star_rating'] > 3)
# assign sentiment value of 1
positive_reviews=positive_reviews.withColumn("sentiment", lit(1))
#NUM_POSITIVE_REVIEWS = positive_reviews.count()
#print("Number of positive reviews: ", NUM_POSITIVE_REVIEWS)
negative_reviews = data.where(data['star_rating'] < 3)
# assign sentiment value of 0
negative_reviews=negative_reviews.withColumn("sentiment", lit(0))
#NUM_NEGATIVE_REVIEWS = negative_reviews.count()
#print("Number of negative reviews: ", NUM_NEGATIVE_REVIEWS)

In [7]:
# free up space
data=0

In [8]:
# put together both in one reviews dataset
reviews = positive_reviews.union(negative_reviews)


In [9]:
# free up space
positive_reviews=0
negative_reviews=0

######## Declare a figure with a custom size
fig = plt.figure(figsize=(5, 5))

######## labels for the two classes
labels = 'Positives', 'Negative'

######## Sizes for each slide
sizes = [NUM_POSITIVE_REVIEWS, NUM_NEGATIVE_REVIEWS] 

####### Declare pie chart, where the slices will be ordered and plotted counter-clockwise:
plt.pie(sizes, labels=labels, autopct='%1.1f%%',
        shadow=True, startangle=90)

######## Equal aspect ratio ensures that pie is drawn as a circle.
plt.axis('equal')  

######## Display the chart
plt.show()

## Preprocess raw text into tokens for sentiment analysis


In [10]:
# tokenize
tokenizer = Tokenizer(outputCol="words")
tokenizer.setInputCol("review_body")
reviews = tokenizer.transform(reviews)

In [11]:
# remove stopwords
remover = StopWordsRemover(inputCol="words", outputCol="tokens")
reviews = remover.transform(reviews)


In [12]:
#stem
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
reviews = reviews.withColumn("stemmed_tokens", stemmer_udf("tokens")).select('stemmed_tokens','sentiment')


In [13]:
reviews.columns

['stemmed_tokens', 'sentiment']

# Split into train and test sets

In [14]:
train, test = reviews.randomSplit([0.8, 0.2])

## Process training set

Calculate the total freqs accross all observations of each word and store them in two dictionaries - 1. pos_freqs, 2. neg_freqs. (after removing stopwords and stemming).

Reviews with sentiment 1:
"The product was very very good"
"The product was great"

Reviews with sentiment 0:
"The product sucks"
"very bad"

pos_freqs = {"product":2, "very":2, "good":1, "great":1"}
neg_freqs = {"product":1, "very":1, "sucks":1, "bad":1"}

We need to convert the Spark DataFrame column to a list before storing them in order to preprocess. We also chunk the dataframe to prevent Out Of Memory errors. (There may be a better way of doing this!) 

In [16]:
# add id by row number
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
train = train.withColumn("id", row_number().over(w))

In [17]:
REVIEW_COUNT=1870213

In [18]:
import time
def store_counts(sent_reviews, directory):
    #review_count = sent_reviews.count()
    review_count = REVIEW_COUNT
    start = 0
    for end in range(0, review_count, 50000):
        t1 = time.time()
        print(end)
        if end == review_count: break
        if end > review_count: end = review_count
        subset = sent_reviews.filter((sent_reviews['id'] > start) & (sent_reviews['id'] < end )).select('stemmed_tokens').collect()
        # get tokens
        subset2 = [t[1] for t in subset]
        # unpack nested list
        subset2 = list(chain.from_iterable(subset2))
        # filter out numbers and non-english
        subset3 = [t for t in subset2 if t.isalpha()]
        counts = Counter(subset3)
        with open('./temp/' + directory + '/counts_'+ str(end) + '.pickle', 'wb') as outputfile:
            pickle.dump(counts, outputfile)
        #free up space
        counts=0
        subset=0
        subset2=0
        subset3=0
        t2 = time.time()
        print("Time elapsed: ", t2 - t1)

        start = end

In [19]:
pos_train = train.filter(train['sentiment'] == 1)
store_counts(pos_train, 'pos_freqs')

neg_train = train.filter(train['sentiment'] == 0)
store_counts(neg_train, 'neg_freqs')

0


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 55664)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/mxnet_latest_p37/lib/python3.7/site-packages/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/mxnet_latest_p37/lib/python3.7/site-packages/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/home/ec2-user/anaconda3/envs/mxnet_latest_p37/lib/python3.7/site-packages/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gatew

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:37199)

Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/mxnet_latest_p37/lib/python3.7/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/ec2-user/anaconda3/envs/mxnet_latest_p37/lib/python3.7/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/home/ec2-user/anaconda3/envs/mxnet_latest_p37/lib/python3.7/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/ec2-user/anaconda3/envs/mxnet_latest_p37/lib/python3.7/socketserver.py", line 720, in __init__
    self.handle()
  File "/home/ec2-user/anaconda3/envs/mxnet_latest_p37/lib/python3.7/site-packages/pyspark/accumulators.py", line 268, in handle
    poll(accum_updates)
  File "/home/ec2-user/anaconda3/envs/mxnet_latest_p37/lib/python3.7/site-packages/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/home/ec2-user/anacon

In [None]:
neg_test = test.filter(test['sentiment'] == 0)
neg_test.count()

In [None]:
test.count()

In [None]:
def get_total_freqs(directory, end):
    freqs = Counter()
    for count in range(0, end, 10000):
        try:
            with open('./temp/'+ directory + '/counts_'+ str(count) + '.pickle', 'rb') as inputfile:
                f = pickle.load(inputfile)
            freqs += f
        except:
            print("not found: ", count)
    return freqs

In [None]:
pos_freqs = get_total_freqs('pos_freqs', 3000000)
neg_freqs = get_total_freqs('neg_freqs', 1000000)

In [None]:
def preprocess_reviews()
review_count = reviews.count()
start = 0
for end in range(0, review_count, 50000):
    print(end)
    if end == review_count: break
    if end > review_count: end = review_count
    subset = sent_reviews.filter((sent_reviews['id'] > start) & (sent_reviews['id'] < end )).collect()
    # get tokens
    subset2 = [t[1] for t in subset]
    # row containing three values: bias, total positive sentiment, total negative sentiment
    rows = []
    for dat in subset:
        row = [1,0,0]
        for word in dat:
            if word in pos_freqs:
                row[1] += pos_freqs[word]
            if word in neg_freqs:
                row[1] += neg_freqs[word]
        rows.append(row)
    row = [1,0,0]
    df = pd.DataFrame(rows)
    df.columns = ['bias', 'positive', 'negative']
    df.to_csv('/temp/' + directory + '/' + end + '.csv' )
    #free up space
    subset=0
    start = end

## Split into training and test set

In [None]:
seed=12

In [None]:
reviews.groupBy("sentiment").count().show()

In [None]:
fractions = {1: 0.8, 0: 0.8}

In [None]:
train = reviews.stat.sampleBy("sentiment", fractions, seed)
train.show()

In [None]:
# do a left anti join to get the 
test = reviews.subtract(train)

In [None]:
test.show(2)

In [None]:
!df -h