# AWS Customer Reviews - Preprocessing

In [2]:
import sys
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as F
from pyspark.sql import Row
import boto3
glueContext = GlueContext(SparkContext.getOrCreate())
from pyspark.sql.functions import col, udf
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
import nltk
from nltk.corpus import stopwords
import string
from pyspark.sql.types import StructType, ArrayType


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Import Data

In [3]:
reviews = spark.read.parquet("s3://amazon-reviews-pds/parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Set Constants

In [4]:
MIN_SENTENCE_LENGTH_IN_CHARS = 5
MAX_SENTENCE_LENGTH_IN_CHARS = 5000
ROW_LIMIT = 100

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Inspect Data

In [5]:
df = reviews \
  .distinct() \
  .filter("marketplace = 'US'") \
  .withColumn('body_len', F.length('review_body')) \
  .filter(F.col('body_len') > MIN_SENTENCE_LENGTH_IN_CHARS) \
  .filter(F.col('body_len') < MAX_SENTENCE_LENGTH_IN_CHARS) \

record_count = df.count()
print('Total Record Processing: {}'.format(record_count))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Record Processing: 148739853

### Pre-Process Review Content

#### Default Methods

In [6]:
#word tokenizer
def word_tokenize(x):
    lowerW = x.lower()
    words = lowerW.split()
    return words

def filter_punctuation(x):
    list_punct=list(string.punctuation)
    table = str.maketrans('', '', string.punctuation)
    stripped = [w.translate(table) for w in x]
    return stripped

##load stopwords - can't use NLTK to do this...
def remove_stopwords(x):
    stopwords = [",","'","\"","i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "your", "yours", "yourself", "yourselves", "he", "him", "his", "himself", "she", "her", "hers", "herself", "it", "its", "itself", "they", "them", "their", "theirs", "themselves", "what", "which", "who", "whom", "this", "that", "these", "those", "am", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", "the", "and", "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than", "too", "very", "s", "t", "can", "will", "just", "don", "should", "now"]
    stopW = [word for word in x if word not in stopwords and word !='']
    return stopW

# text = "testing this, because i am someone that need's food # $ %"
def preprocess_text_to_tokens(x):
    tokens = word_tokenize(x)
    tokens = filter_punctuation(tokens)
    tokens = remove_stopwords(tokens)
    return tokens


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Processing method

In [7]:
preprocess_text = udf(lambda row: preprocess_text_to_tokens(row))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
df = df.withColumn('review_body_processed', preprocess_text(col('review_body')))
df = df.withColumn('review_body_processed', regexp_replace('review_body_processed', '"', ''))
df = df.withColumn('review_body', regexp_replace('review_body', '"', ''))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
name 'regexp_replace' is not defined
Traceback (most recent call last):
NameError: name 'regexp_replace' is not defined



### Add Month Column

In [9]:
#convert date to string with format yyyy-mm
func_to_str =  udf (lambda x: datetime.strftime(x, '%Y-%m'))

#apply the udf to the df
df = df.withColumn('review_date_str', func_to_str(col('review_date')))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Partition Data

In [10]:
df = df \
  .repartition("review_date_str")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Save Data to S3

In [11]:
df \
  .write \
  .partitionBy('review_date_str') \
  .mode('overwrite') \
  .csv("s3://demos-amazon-reviews/preprocessed_reviews_csvs/", header=True)

from awsglue.dynamicframe import DynamicFrame

# dyf = DynamicFrame.fromDF(df, glueContext, "enriched")

# glueContext.write_dynamic_frame.from_options(
#        frame = dyf,
#        connection_type = "s3",
#        connection_options = {"path": "s3://demos-amazon-reviews/preprocessed_reviews_csvs/",  "partitionKeys": ["review_date_str"]},
#        format = "csv"
# )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### END OF NOTEBOOK