### Start a Spark Context

In [1]:
if 'sc' not in locals():
    from pyspark.context import SparkContext
    from pyspark.sql.context import SQLContext
    from pyspark.sql.session import SparkSession
    
    sc = SparkContext()
    sqlContext = SQLContext(sc)
    spark = SparkSession(sc)
    


In [2]:
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover

In [3]:
import pandas as pd
import os
import sys
import gzip

### Loading Data

In [4]:
### data path
path0 = 'Data/'
filename = 'reviews_Amazon_Instant_Video_5.json.gz'
path = path0 + filename

In [5]:
### function to extract and parse the data

def parse(path):
    g = gzip.open(path, 'rb')
    for l in g:
        yield eval(l)

def getDF(path):
    i = 0
    df = {}
    for d in parse(path):
        df[i] = d
        i += 1
    return pd.DataFrame.from_dict(df, orient='index')

df = getDF(path)

In [6]:
df.shape

(37126, 9)

In [7]:
df.head()

Unnamed: 0,reviewerID,asin,reviewerName,helpful,reviewText,overall,summary,unixReviewTime,reviewTime
0,A11N155CW1UV02,B000H00VBQ,AdrianaM,"[0, 0]",I had big expectations because I love English ...,2.0,A little bit boring for me,1399075200,"05 3, 2014"
1,A3BC8O2KCL29V2,B000H00VBQ,Carol T,"[0, 0]",I highly recommend this series. It is a must f...,5.0,Excellent Grown Up TV,1346630400,"09 3, 2012"
2,A60D5HQFOTSOM,B000H00VBQ,"Daniel Cooper ""dancoopermedia""","[0, 1]",This one is a real snoozer. Don't believe anyt...,1.0,Way too boring for me,1381881600,"10 16, 2013"
3,A1RJPIGRSNX4PW,B000H00VBQ,"J. Kaplan ""JJ""","[0, 0]",Mysteries are interesting. The tension betwee...,4.0,Robson Green is mesmerizing,1383091200,"10 30, 2013"
4,A16XRPF40679KG,B000H00VBQ,Michael Dobey,"[1, 1]","This show always is excellent, as far as briti...",5.0,Robson green and great writing,1234310400,"02 11, 2009"


In [8]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 37126 entries, 0 to 37125
Data columns (total 9 columns):
reviewerID        37126 non-null object
asin              37126 non-null object
reviewerName      36797 non-null object
helpful           37126 non-null object
reviewText        37126 non-null object
overall           37126 non-null float64
summary           37126 non-null object
unixReviewTime    37126 non-null int64
reviewTime        37126 non-null object
dtypes: float64(1), int64(1), object(7)
memory usage: 2.8+ MB


In [28]:
from contractions import CONTRACTION_MAP

import re
def expand_contractions(text, contraction_mapping=CONTRACTION_MAP):
    
    contractions_pattern = re.compile('({})'.format('|'.join(contraction_mapping.keys())), 
                                      flags=re.IGNORECASE|re.DOTALL)
    def expand_match(contraction):
        match = contraction.group(0)
        first_char = match[0]
        expanded_contraction = contraction_mapping.get(match)\
                                if contraction_mapping.get(match)\
                                else contraction_mapping.get(match.lower())                       
        expanded_contraction = first_char+expanded_contraction[1:]
        return expanded_contraction
        
    expanded_text = contractions_pattern.sub(expand_match, text)
    expanded_text = re.sub("'", "", expanded_text)
    return expanded_text

expand_contractions("Y'all can't expand contractions I'd think")

'You all cannot expand contractions I would think'

In [23]:
# Create a Spark DataFrame from a Pandas DataFrame
# spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, FloatType, DoubleType

mySchema = StructType([ 
                        StructField("reviewerID", StringType(), True)\
                       ,StructField("asin", StringType(), True)\
#                        StructField("reviewerName", StringType(), True)\
                       ,StructField("helpful", StringType(), True)\
                       ,StructField("reviewText", StringType(), True)\
                       ,StructField("overall", FloatType(), True)\
                       ,StructField("summary", StringType(), True)\
                       ,StructField("unixReviewTime", IntegerType(), True)\
                       ,StructField("reviewTime", DoubleType(), True)
                      ])
sms = spark.createDataFrame(df.drop(columns=['reviewerName']))

In [33]:
# Remove punctuation (REGEX provided) and numbers
wrangled = sms.withColumn('reviewText', regexp_replace(sms.reviewText, '[_():;,.!?\\-"/&#\'*]', ' '))
wrangled = wrangled.withColumn('reviewText', regexp_replace(wrangled.reviewText,'\d', ' '))

# Merge multiple spaces
wrangled = wrangled.withColumn('reviewText', regexp_replace(wrangled.reviewText, ' +', ' '))

# Remove Contraction 
# from pyspark.sql.functions import explode
# from pyspark.sql.functions import udf, array
# expand_contractions_udf = udf(lambda z: expand_contractions(z), StringType())
# wrangled = wrangled.withColumn('reviewText', expand_contractions_udf(wrangled.reviewText))
# Split the text into words
wrangled = Tokenizer(inputCol='reviewText', outputCol='words').transform(wrangled)

wrangled.show(4, truncate=False)

+--------------+----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+---------------------------+--------------+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|reviewerID    |asin      |helpful|reviewText                                                                                                                                                                           |overall|summary                    |unixReviewTime|reviewTime |words                                                                                                                                                                                                                |
+---------

In [12]:
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF

# Remove stop words.
wrangled = StopWordsRemover(inputCol='words', outputCol='terms')\
      .transform(wrangled)

# # Apply the hashing trick
# wrangled = HashingTF(inputCol='terms', outputCol='hash', numFeatures=1024)\
#       .transform(wrangled)

# # Convert hashed symbols to TF-IDF
# tf_idf = IDF(inputCol='hash', outputCol='features')\
#       .fit(wrangled).transform(wrangled)
      
# tf_idf.select('terms', 'features').show(4, truncate=False)

In [13]:
from pyspark.sql.functions import explode
wrangled.select(explode("words")).distinct().show(1111)

+-------------------+
|                col|
+-------------------+
|              still|
|               hope|
|             biting|
|               some|
|          involving|
|              trail|
|                few|
|              those|
|               earl|
|           everyday|
|          standards|
|        requirement|
|          connected|
|             harder|
|              salma|
|            embrace|
|             outfit|
|        interaction|
|           tortured|
|             travel|
|          traveling|
|              ginzu|
|           brackets|
|            finales|
|         creativity|
|              oscar|
|            letdown|
|             poetry|
|             spared|
|              spoil|
|          sidelined|
|             doubts|
|        handicapped|
|              inner|
|                art|
|         lieutenant|
|          recognize|
|            jeebies|
|       terrifically|
|          viewpoint|
|             deftly|
|             online|
|         

In [36]:
import torch