In [42]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col, concat, lit

In [2]:
# Defining our constants
DATA_NAME = 'reviews_Musical_Instruments_5.json'
APP_NAME = "Sentiment Analysis with Amazon Reviews Exercise"
SPARK_URL = "local[*]"
RANDOM_SEED = 1989
TRAINING_DATA_RATIO = 0.8
RFC_TREES = 10
TFIDF_FEAT = 1000

In [3]:
# Instantiate Spark Context
sc = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

# SQL context for the Spark session
sqlcontext = SQLContext(sc)

In [4]:
# Read in the data
df = sqlcontext.read.json(DATA_NAME)

Previewing the data...

In [5]:
print(df.dtypes)
df.show(5)

[('asin', 'string'), ('helpful', 'array<bigint>'), ('overall', 'double'), ('reviewText', 'string'), ('reviewTime', 'string'), ('reviewerID', 'string'), ('reviewerName', 'string'), ('summary', 'string'), ('unixReviewTime', 'bigint')]
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|1384719342|  [0, 0]|    5.0|Not much to write...|02 28, 2014|A2IBPI20UZIR0U|cassandra tu "Yea...|                good|    1393545600|
|1384719342|[13, 14]|    5.0|The product does ...|03 16, 2013|A14VAT5EAX3D9S|                Jake|                Jake|    1363392000|
|1384719342|  [1, 1]|    5.0|The primary job o...|08 28, 2013|A195EZSQDW3E21|Rick Bennette "

In [6]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



Let's combine the summary and overall columns to generate one text column that we can perform NLP and subsequent modeling on.

In [7]:
fullText = concat(col("reviewText"), lit(" "), col("summary"))
df = df.withColumn("fullText",fullText)

In [8]:
df.columns

['asin',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime',
 'fullText']

Additionally, we should define what a "good" and "bad" review are. The star ratings are contained within the 'overall' column on a scale of 0 to 5.

In [9]:
df.groupBy('overall').count().show()

+-------+-----+
|overall|count|
+-------+-----+
|    1.0|  217|
|    4.0| 2084|
|    3.0|  772|
|    2.0|  250|
|    5.0| 6938|
+-------+-----+



In [38]:
df.groupBy('overall').count().collect()

[Row(overall=1.0, count=217),
 Row(overall=4.0, count=2084),
 Row(overall=3.0, count=772),
 Row(overall=2.0, count=250),
 Row(overall=5.0, count=6938)]

In [40]:
df.groupBy('overall').count().collect()[4][1]/df.count()

0.6761524217912485

Looks like we have some pretty severe class imbalance, with high reviews being much more common. Let's just use whether a review is 5 (about 2/3 of observations) vs. whether it's not as a measure of "good" or "bad." This is imperfect, but will make it a more straightforward problem to model.

We'll need to first create a new column that captures whether each review is good or bad based on this criteria.

In [10]:
df.withColumn('good', when(col('overall') == '5', 1).otherwise(0)).select('good','overall').show(10)

+----+-------+
|good|overall|
+----+-------+
|   1|    5.0|
|   1|    5.0|
|   1|    5.0|
|   1|    5.0|
|   1|    5.0|
|   1|    5.0|
|   1|    5.0|
|   0|    3.0|
|   1|    5.0|
|   1|    5.0|
+----+-------+
only showing top 10 rows



In [11]:
df = df.withColumn('good', when(col('overall') == '5', 1).otherwise(0))

In [12]:
df.columns

['asin',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime',
 'fullText',
 'good']

Let's vectorize our data using TFIDF. A word2vec solution might be preferable if we could import a fully pretrained model, but I suspect this method would be suboptimal if trained solely on our data. So let's stick with the simpler solution for now, TFIDF.

There's a useful example of this in the [Spark documentation](https://spark.apache.org/docs/2.2.0/ml-features.html#tf-idf)

NOTE: I'm vectorizing the entire dataset together, instead of separating out train and test. This is technically not the right way to do it, but I can't figure out how to do the TFIDF train and then transform on the test set. To discuss w/ Vincent...

In [13]:
# This part just provides a new column with an array for each word in the sentence of each
tokenizer = Tokenizer(inputCol="fullText", outputCol="words")
# This creates a new DF with an added column for the words separated out
wordsData = tokenizer.transform(df)

wordsData.groupby('words').mean().collect()[0:2]

[Row(words=['i', 'used', 'this', 'on', 'my', 'guitar', 'tone', 'and', 'volume', 'pots', 'and', 'it', 'clears', 'up', 'the', 'crackle.feels', 'a', 'little', 'oily,', 'but', 'gets', 'the', 'job', 'done', 'and', 'you', "don't", 'have', 'to', 'replacethe', 'potentiometers.', 'works', 'on', 'potentiometers'], avg(overall)=5.0, avg(unixReviewTime)=1306454400.0, avg(good)=1.0),
 Row(words=['quick', 'snap', 'on', 'and', 'off.', 'sturdy.', '', 'if', 'it', 'is', 'not', 'convenient,', 'it', "won't", 'be', 'used.', '', 'in', 'this', 'case,', 'it', 'is', 'very', 'convenient', 'and', 'easy', 'to', 'use,', 'hence,', 'your', 'prized', 'guitar', 'will', 'not', 'drop', 'to', 'the', 'floor', 'by', 'accident.', '', 'well', 'worth', 'the', 'money', 'spent!', "don't", 'let', 'your', 'guitar', 'fall'], avg(overall)=5.0, avg(unixReviewTime)=1358726400.0, avg(good)=1.0)]

In [14]:
# From the documentation:
# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.

# First pass, which I think is just the "frequency" part...
# I'm limiting the numfeatures just to keep things running (I was getting errors later on)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures",numFeatures=TFIDF_FEAT)
# This then transforms the previous dataframe into a new one with the rawFeatures column
featurizedData = hashingTF.transform(wordsData)

featurizedData.groupby('rawFeatures').mean().collect()[0:2]

[Row(rawFeatures=SparseVector(1000, {36: 1.0, 48: 2.0, 83: 1.0, 90: 1.0, 157: 2.0, 237: 1.0, 281: 1.0, 299: 1.0, 329: 1.0, 330: 1.0, 333: 3.0, 338: 1.0, 365: 1.0, 373: 1.0, 388: 1.0, 402: 1.0, 403: 1.0, 420: 1.0, 450: 1.0, 461: 1.0, 489: 1.0, 493: 1.0, 527: 1.0, 646: 1.0, 656: 1.0, 710: 3.0, 725: 2.0, 765: 1.0, 770: 1.0, 839: 1.0, 860: 1.0, 906: 3.0, 916: 1.0, 956: 1.0}), avg(overall)=5.0, avg(unixReviewTime)=1373846400.0, avg(good)=1.0),
 Row(rawFeatures=SparseVector(1000, {13: 1.0, 44: 1.0, 55: 1.0, 82: 1.0, 84: 2.0, 133: 1.0, 170: 1.0, 187: 2.0, 197: 1.0, 210: 1.0, 248: 1.0, 253: 1.0, 260: 1.0, 276: 1.0, 281: 2.0, 333: 1.0, 343: 1.0, 368: 1.0, 372: 1.0, 373: 2.0, 388: 2.0, 425: 1.0, 471: 1.0, 495: 3.0, 497: 1.0, 504: 1.0, 527: 1.0, 533: 1.0, 543: 1.0, 594: 1.0, 600: 1.0, 650: 2.0, 665: 1.0, 692: 2.0, 703: 1.0, 710: 2.0, 744: 1.0, 763: 1.0, 823: 1.0, 904: 1.0, 942: 1.0, 959: 1.0, 964: 1.0, 986: 1.0}), avg(overall)=5.0, avg(unixReviewTime)=1380758400.0, avg(good)=1.0)]

In [15]:
# Second pass, which does the inverse document frequency calcs (idf)...
idf = IDF(inputCol="rawFeatures", outputCol="features")
# For some reason this needs to be fit, unlike the hashing mechanism
idfModel = idf.fit(featurizedData)
# Then, again, just creating a new df that's same as the previous one + the added tfidf column
# "Rescaled" in this case just means rescaled down from just bag of words (document frequency)
rescaledData = idfModel.transform(featurizedData)

rescaledData.groupby('features').mean().collect()[0:2]

[Row(features=SparseVector(1000, {11: 11.6673, 18: 1.0032, 28: 3.8113, 36: 0.8437, 76: 1.618, 82: 1.5393, 83: 1.5712, 95: 3.8891, 157: 1.3337, 170: 1.5813, 183: 2.4427, 187: 1.3118, 189: 3.0645, 193: 2.7334, 213: 1.6378, 223: 2.235, 234: 1.409, 237: 2.3067, 248: 2.5064, 260: 1.068, 263: 1.5051, 281: 3.7149, 284: 2.5731, 299: 1.7631, 309: 3.3013, 310: 1.1198, 313: 2.5503, 320: 4.6271, 328: 2.9373, 329: 1.0906, 333: 1.2335, 335: 2.6201, 341: 3.3811, 343: 1.2142, 346: 1.6165, 347: 3.6014, 348: 2.8715, 352: 2.3136, 364: 3.1771, 368: 2.2618, 372: 3.605, 373: 1.5462, 375: 2.3967, 384: 7.7405, 388: 1.1818, 400: 1.5662, 421: 2.1053, 431: 2.8871, 447: 3.0357, 448: 4.4404, 471: 2.0129, 489: 1.366, 493: 2.4173, 494: 2.7156, 495: 0.9216, 521: 3.6679, 524: 3.7119, 526: 1.5432, 541: 3.6794, 547: 2.4735, 553: 2.7215, 564: 2.235, 572: 1.1535, 590: 2.5478, 592: 3.8939, 597: 1.9337, 637: 8.9878, 650: 0.7542, 656: 1.2022, 666: 3.8327, 675: 2.0006, 690: 4.5765, 691: 2.2645, 710: 2.7993, 718: 2.9483, 723: 

In [16]:
rescaledData.columns

['asin',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime',
 'fullText',
 'good',
 'words',
 'rawFeatures',
 'features']

Sweet. The features column in our rescaled dataset is what we'll train our model on. Let's start with a simple RFC.

In [17]:
# Label indexing
labelIndexer = StringIndexer(inputCol='good', outputCol="indexedLabel").fit(rescaledData)

In [18]:
# now generate the indexed feature vector
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures",maxCategories=4).fit(rescaledData)

# Split the data into training and validation sets (20% held out for testing)
(trainingData, testData) = rescaledData.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

In [19]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", numTrees=RFC_TREES)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

In [20]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

Cool! Now let's try to predict on the test set and evaluate how well it performs. The first step in that operation is 

In [21]:
# predict
predictions = model.transform(testData)

In [22]:
# eval
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.317933
Accuracy = 0.682067


The performance is pretty bad, considering that that's close to baseline accuracy, but I guess that's beside the point! Let's try logit regression just to experiment with how that works.

__NOTE: LRC doesn't seem to need to require labeled indexes for the inputs__

In [69]:
lrc = LogisticRegression(labelCol="good", featuresCol="features",maxIter=10, regParam=0.5, elasticNetParam=0.8)
lrcmodel = lrc.fit(trainingData)

In [70]:
# predict - note that this creates a new column called predictions in the test dataset
predictions = lrcmodel.transform(testData)

In [71]:
predictions.columns

['asin',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime',
 'fullText',
 'good',
 'words',
 'rawFeatures',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [72]:
# eval
evaluator = MulticlassClassificationEvaluator(
    labelCol="good", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.320417
Accuracy = 0.679583


Not very good either! Oh well. Keep in mind that we're only using 1000 words, and didn't do any LSA, so it's to be expected that this didn't perform all that great.

### Work with Vincent on this: how would I get the model to just TFIDF vectorize on the training set, and then just apply that vectorization to the test set? Pyspark does not seem to have the same distinctions between transform and fit_transform that we rely on in sklearn

The start of what I would do...

In [23]:
# TFIDF for just the 
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures",numFeatures=20)
trainFeaturizedData = hashingTF.transform(trainWordsData)
trainFeaturizedData.groupby('rawFeatures').mean().collect()[0:2]
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(trainFeaturizedData)
trainRescaledData = idfModel.transform(trainFeaturizedData)
testWordsData = tokenizer.transform(testData)

# How do I do this part?
testFeaturizedData = hashingTF.transform(testWordsData)
testFeaturizedData.groupby('rawFeatures').mean().collect()[0:2]

NameError: name 'trainWordsData' is not defined