In [0]:
# Load in one of the tables
df1 = spark.sql("select * from default.video_games_5")
df2 = spark.sql("select * from default.books_5_small")
df3 = spark.sql("select * from default.home_and_kitchen_5_small")
df = df1.union(df2).union(df3)
print((df.count(), len(df.columns)))

In [0]:
df.printSchema()

In [0]:
# Let's look at some quick summary statistics
df.describe().show()

In [0]:
# The count of each overall rating

from pyspark.sql.functions import col
df.groupBy("overall").count().orderBy(col("overall").asc()).show()

In [0]:
# The most common product IDs
df.groupBy("asin").count().orderBy(col("count").desc()).show(10)

In [0]:
# count missing value
from pyspark.sql.functions import isnull, when, count, col
nacounts = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).toPandas()
nacounts

Unnamed: 0,reviewID,overall,verified,reviewTime,reviewerID,asin,reviewerName,reviewText,summary,unixReviewTime,label
0,0,0,0,0,0,0,259,0,410,0,0


In [0]:
# Drop missing value in "reviewerName","summary"
df = df.na.drop(subset=["reviewerName","summary"])

In [0]:
# Check
nacounts = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).toPandas()
nacounts

Unnamed: 0,reviewID,overall,verified,reviewTime,reviewerID,asin,reviewerName,reviewText,summary,unixReviewTime,label
0,0,0,0,0,0,0,0,0,0,0,0


In [0]:
from pyspark.sql.functions import *
df = df.withColumn('ReviewDate', to_timestamp('unixReviewTime').cast('date'))

df.show()

In [0]:
from pyspark.sql.functions import year
from pyspark.sql.functions import to_date

df = df.withColumn("year",year(df.ReviewDate))
#df = df.withColumn("month",month(df.ReviewDate))

In [0]:
df.show()

In [0]:
from pyspark.sql.functions import col

In [0]:
major_df = df.filter(col("label") == 0)
minor_df = df.filter(col("label") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

In [0]:
sampled_majority_df = major_df.sample(False, 1/ratio)
combined_df = sampled_majority_df.unionAll(minor_df)
combined_df.show()

In [0]:
display(combined_df.groupby(["label"]).count().orderBy("label"))

label,count
0,714025
1,626071


In [0]:
combined_df.show()

In [0]:
# For our intitial modeling efforts, we are not going to use the following features
drop_list = [  'reviewID', 'reviewerID', 'reviewTime', 'image', 'style', 'reviewerName', 'ReviewDate']
combined_df = combined_df.select([column for column in combined_df.columns if column not in drop_list])
combined_df.show(5)
print((combined_df.count(), len(combined_df.columns)))

In [0]:
combined_df = combined_df.na.drop(subset=["reviewText", "label"])
combined_df.show(5)
print((combined_df.count(), len(combined_df.columns)))

In [0]:
combined_df.printSchema()

In [0]:
combined_df.groupBy("label").count().show()

In [0]:
combined_df.show(5)

In [0]:
import nltk
nltk.download('vader_lexicon')

In [0]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer
sent = SentimentIntensityAnalyzer()

In [0]:
sent.polarity_scores("I am loving it.")

In [0]:
sent.polarity_scores("The food was horrible.")

In [0]:
sent.polarity_scores("The food was horrible overall but Pizza was amazing.")

In [0]:
def check_sentiment(x):
    return sent.polarity_scores(x)['compound']

In [0]:
checksentimentudf = udf(lambda x: check_sentiment(x))

In [0]:
from pyspark.sql.functions import col
combined_df = combined_df.withColumn('Compound_score', checksentimentudf(col('reviewText')))

In [0]:
combined_df.show()

In [0]:
display(combined_df.groupBy("Compound_score").count().orderBy(col("count").desc()).head(50))

Compound_score,count
0.0,52289
0.6249,25918
0.4404,20932
0.5719,14927
0.6369,14647
0.4215,7897
0.3612,6562
0.6588,6225
0.5994,5554
0.2732,5293


In [0]:
from textblob import TextBlob

In [0]:
tb = TextBlob("I am loving this.")

In [0]:
# check sentiment
tb.sentiment

In [0]:
tb.polarity

In [0]:
TextBlob("The food was horrible.").sentiment

In [0]:
TextBlob("The food was horrible overall but Pizza was amazing.").sentiment

In [0]:
def check_sentiment(x):
    return TextBlob(x).polarity

In [0]:
checksentimentudf = udf(lambda x: check_sentiment(x))

In [0]:
from pyspark.sql.functions import col
combined_df = combined_df.withColumn('polarity_score', checksentimentudf(col('reviewText')))

In [0]:
# The most common product IDs
display(combined_df.groupBy("polarity_score").count().orderBy(col("count").desc()).head(50))

polarity_score,count
0.0,51442
0.5,29203
1.0,27545
0.8,22262
0.7,16847
0.25,11207
0.6,10915
0.2,9738
0.3,8748
0.4,8280


In [0]:
from pyspark.sql.types import *

In [0]:
combined_df = combined_df.withColumn("verified",combined_df.verified.cast(DoubleType()))

In [0]:
combined_df = combined_df.withColumn("label",combined_df.label.cast(DoubleType()))

In [0]:
#combined_df = combined_df.withColumn("polarity_score",combined_df.label.cast(DoubleType()))

In [0]:
#combined_df = combined_df.withColumn("Compound_score",combined_df.label.cast(DoubleType()))

In [0]:
from pyspark.ml.feature import OneHotEncoder

In [0]:
#onehotencoder to qualificationIndex
onehotencoder_verified_vector = OneHotEncoder(inputCol="verified", outputCol="verified_vec")
combined_df_new = onehotencoder_verified_vector.fit(combined_df).transform(combined_df)


In [0]:
combined_df_new.show()

In [0]:
drop_list = ['verified_vec']
combined_df_new = combined_df_new.select([column for column in combined_df_new.columns if column not in drop_list])
combined_df_new.show(5)

In [0]:
df_sample = combined_df.sample(False, 0.30, seed =0)

In [0]:
# set seed for reproducibility
(trainingData, testingData) = df_sample.randomSplit([0.8, 0.2], seed = 47)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testingData.count()))

In [0]:
# In Spark's MLLib, it's considered good practices to combine all the preprocessing steps into a pipeline.
# That way, you can run the same steps on both the training data, and testing data and beyond (new data)
# without copying and pasting any code.

# It is possible to run all of these steps one-by-one, outside of a Pipeline, if desired. But that's
# not how I am going to do it here.

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

# algorithms cannot work directly with string labels (ham/spam)
# so we have to convert it into 1 or 0
#label_encoder = StringIndexer(inputCol='label', outputCol='labelEncoded')

# We'll tokenize the text using a simple RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W")


# Remove standard Stopwords
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")


# Vectorize the sentences using simple BOW method. Other methods are possible:
# https://spark.apache.org/docs/2.2.0/ml-features.html#feature-extractors
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)


# Convert to TF words vector
hashingTF = HashingTF(inputCol="features", outputCol="rawFeatures")

# Convert to TF*IDF words vector
idf = IDF(inputCol="rawFeatures", outputCol="newfeatures")


#pipeline = Pipeline(stages=[label_encoder, regexTokenizer, stopwordsRemover, countVectors, hashingTF, idf])


In [0]:
pipeline = Pipeline(stages=[ regexTokenizer, stopwordsRemover, countVectors, label_encoder ])

In [0]:
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(trainingData)
trainingDataTransformed = pipelineFit.transform(trainingData)
trainingDataTransformed.show(5)

In [0]:
#pipelineFit.save(f"file://dbfs/sourav/{now}_model")

In [0]:
from pyspark.ml.classification import LogisticRegression

# More classification docs: https://spark.apache.org/docs/latest/ml-classification-regression.html

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingDataTransformed)

In [0]:
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

print("Training Accuracy:  " + str(trainingSummary.accuracy))
print("Training Precision: " + str(trainingSummary.precisionByLabel))
print("Training Recall:    " + str(trainingSummary.recallByLabel))
print("Training FMeasure:  " + str(trainingSummary.fMeasureByLabel()))
print("Training AUC:       " + str(trainingSummary.areaUnderROC))

In [0]:
trainingSummary.roc.show()

In [0]:
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
for objective in objectiveHistory:
    print(objective)

In [0]:
testingDataTransform = pipelineFit.transform(testingData)
testingDataTransform.show(5)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = lrModel.transform(testingDataTransform)
predictions.show(5)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [0]:
# Load in the tables
test_df = spark.sql("select * from default.reviews_holdout")
test_df.show(5)
print((test_df.count(), len(test_df.columns)))

In [0]:
test_df_Transform = pipelineFit.transform(test_df)
test_df_Transform.show(5)

In [0]:
predictions = lrModel.transform(test_df_Transform)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

probelement=udf(lambda v:float(v[1]),FloatType())
submission_data = predictions.select('reviewID', probelement('probability')).withColumnRenamed('<lambda>(probability)', 'label')

In [0]:
display(submission_data.select('reviewID', 'label'))

reviewID,label
67000000,0.46559766
67000001,0.46559766
67000002,0.46559766
67000003,0.46559766
67000004,0.46559766
67000005,0.46559766
67000006,0.46559766
67000007,0.46559766
67000008,0.46559766
67000009,0.46559766
