In [1]:
# Load in one of the tables
df1 = spark.sql("select * from default.video_games_5")
df2 = spark.sql("select * from default.home_and_kitchen_5_small")
df3 = spark.sql("select * from default.books_5_small")

df = df1.union(df2).union(df3)

#df = df.sample(False, 0.005, seed=47)

df = df.cache()

print((df.count(), len(df.columns)))

In [2]:
df.printSchema()

In [3]:
# For our intitial modeling efforts, we are not going to use the following features
drop_list = ['summary', 'asin', 'reviewID', 'reviewerID', 'summary', 'unixReviewTime','reviewTime', 'image', 'style', 'reviewerName']
df = df.select([column for column in df.columns if column not in drop_list])
df = df.na.drop(subset=["reviewText", "label"])
df.show(5)
print((df.count(), len(df.columns)))

In [4]:
# In Spark's MLLib, it's considered good practice to combine all the preprocessing steps into a pipeline.
# That way, you can run the same steps on both the training data, and testing data and beyond (new data)
# without copying and pasting any code.

# It is possible to run all of these steps one-by-one, outside of a Pipeline, if desired. But that's
# not how I am going to do it here.

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, VectorAssembler, IDF
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# We'll tokenize the text using a simple RegexTokenizer
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W")

# Remove standard Stopwords
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")

# TODO: insert other clearning steps here (and put into the pipeline, of course!)
# E.g., n-grams? document length?


# Vectorize the sentences using simple BOW method. Other methods are possible:
# https://spark.apache.org/docs/2.2.0/ml-features.html#feature-extractors
tf = CountVectorizer(inputCol="filtered", outputCol="rawFeatures", vocabSize=2000, minTF=1, maxDF=0.40)

# Generate Inverse Document Frequency weighting
idf = IDF(inputCol="rawFeatures", outputCol="idfFeatures", minDocFreq=100)

# Combine all features into one final "features" column
assembler = VectorAssembler(inputCols=["verified", "overall", "idfFeatures"], outputCol="features")

# Machine Learning Algorithm
ml_alg  = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.0)

pipeline = Pipeline(stages=[tokenizer, stopwordsRemover, tf, idf, assembler, ml_alg])

paramGrid = ParamGridBuilder() \
    .addGrid(ml_alg.regParam, [0.3, 0.5, 0.7]) \
    .addGrid(ml_alg.elasticNetParam, [0.0]) \
    .addGrid(tf.minTF, [1, 100, 1000]) \
    .addGrid(tf.vocabSize, [500, 1000, 2500, 5000]) \
    .build()


In [5]:
# set seed for reproducibility
(trainingData, testData) = df.randomSplit([0.9, 0.1], seed = 47)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count:     " + str(testData.count()))

In [6]:
pipelineFit = pipeline.fit(trainingData)

In [7]:
predictions = pipelineFit.transform(testData)
predictions.groupBy("prediction").count().show()

In [8]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
pre_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
rec_evaluator = MulticlassClassificationEvaluator(metricName="weightedRecall")
pr_evaluator  = BinaryClassificationEvaluator(metricName="areaUnderPR")
auc_evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")

#print("Test Accuracy       = %g" % (acc_evaluator.evaluate(predictions)))
#print("Test Precision      = %g" % (pre_evaluator.evaluate(predictions)))
#print("Test Recall         = %g" % (rec_evaluator.evaluate(predictions)))
#print("Test areaUnderPR    = %g" % (pr_evaluator.evaluate(predictions)))
print("Test areaUnderROC   = %g" % (auc_evaluator.evaluate(predictions)))

In [9]:
test_df = spark.sql("select * from default.reviews_kaggle")
kaggle_pred = pipelineFit.transform(test_df)
kaggle_pred.show(5)
kaggle_pred.groupBy("prediction").count().show()

In [10]:
# Download this and submit to Kaggle!
display(kaggle_pred.select(["reviewID", "prediction"]))

reviewID,prediction
67000000,0.0
67000001,0.0
67000002,0.0
67000003,0.0
67000004,0.0
67000005,0.0
67000006,0.0
67000007,0.0
67000008,0.0
67000009,0.0
