In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
#import wget
from pyspark.ml.feature import Bucketizer,RegexTokenizer,StopWordsRemover,CountVectorizer,IDF
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression, RandomForestClassificationModel, RandomForestClassifier, GBTClassificationModel, GBTClassifier
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator


spark = SparkSession.builder.appName(
    "Opinion mining on Amazon Fashion product reviews"
).getOrCreate()

spark.sparkContext.setLogLevel("WARN")

In [None]:
# File location and type
file_location = "/FileStore/tables/All_Beauty_json.gz"
#file_type = "gz"


# The applied options are for CSV files. For other file types, these will be ignored.
Beauty = spark.read.json(file_location)

Beauty = Beauty.withColumn("text",concat(col("summary"), lit(" "),col("reviewText")))\
 .drop("helpful")\
 .drop("reviewerID")\
 .drop("reviewerName")\
 .drop("reviewTime")\
 .drop("style")


In [None]:
# File location and type
file_location = "/FileStore/tables/AMAZON_FASHION_json.gz"


# The applied options are for CSV files. For other file types, these will be ignored.
Fashion = spark.read.json(file_location)

Fashion.columns

Fashion = Fashion.withColumn("text",concat(col("summary"), lit(" "),col("reviewText")))\
 .drop("helpful")\
 .drop("reviewerID")\
 .drop("reviewerName")\
 .drop("reviewTime")\
 .drop("style")

In [None]:
Beauty.dtypes

Out[31]: [('asin', 'string'),
 ('image', 'array<string>'),
 ('overall', 'double'),
 ('reviewText', 'string'),
 ('summary', 'string'),
 ('unixReviewTime', 'bigint'),
 ('verified', 'boolean'),
 ('vote', 'string'),
 ('text', 'string')]

In [None]:
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
 
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)
 
df_union  = unionAll(Beauty,Fashion)

In [None]:
display(df.columns)
display(df_union.columns)


In [None]:
# File location and type
file_location = "/FileStore/tables/reviews_Clothing_Shoes_and_Jewelry_5_json.gz"
file_type = "gz"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
clsj = spark.read.json(file_location)

In [None]:
# File location and type
file_location = "/FileStore/tables/reviews_Beauty_5_json.gz"
file_type = "gz"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bt = spark.read.json(file_location)

In [None]:
df_union_2  = unionAll(bt,clsj)

In [None]:
df_union = df_union.drop("image")\
    .drop("verified")\
    .drop("vote")
df_union.columns

Out[76]: ['asin', 'overall', 'reviewText', 'summary', 'unixReviewTime', 'text']

In [None]:
df_union_2.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [None]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- text: string (nullable = true)



In [None]:
df_union_2 = df_union_2.withColumn("text",concat(col("summary"), lit(" "),col("reviewText")))\
 .drop("helpful")\
 .drop("reviewerID")\
 .drop("reviewerName")\
 .drop("reviewTime")
df_union_2.count()

Out[93]: 477179

In [None]:
df_union_2.describe("overall").show()

+-------+------------------+
|summary|           overall|
+-------+------------------+
|  count|            477179|
|   mean|4.2223610007984425|
| stddev|1.1306299814841152|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



In [None]:
#Bucketize data and create labels 0 if overall rating is in (1.0,2.0), otherwise 1
df1 = df_union_2.filter("overall !=3")

splits = [-float("inf"), 4.0, float("inf")]

bucketizer = Bucketizer(splits=splits, inputCol="overall", outputCol="label")

df2= bucketizer.transform(df1)

df2.groupBy("overall","label").count().show()

+-------+-----+------+
|overall|label| count|
+-------+-----+------+
|    2.0|  0.0| 26919|
|    5.0|  1.0|277771|
|    1.0|  0.0| 21718|
|    4.0|  1.0| 98098|
+-------+-----+------+



In [None]:
#take sample to create train and test dataset
fractions = {1.0 : .1, 0.0 : 1.0}
df3 = df2.stat.sampleBy("label", fractions, 36)
df3.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|48637|
|  1.0|37519|
+-----+-----+



In [None]:
#Split data as 80-20% Train and Test dataset
splitSeed = 5043
trainingData, testData = df3.randomSplit([0.8, 0.2], splitSeed)

In [None]:
#Tokenize the sentence based on the regex pattern 
tokenizer = RegexTokenizer(inputCol="text",outputCol="reviewTokensUf",pattern="\\s+|[,.()\"]")

#Remove Stop Words that do not contribute in any way to our analysis 
stopwords_remover = StopWordsRemover(stopWords=StopWordsRemover.loadDefaultStopWords("english"),inputCol="reviewTokensUf",outputCol="reviewTokens")

#converts word documents to vectors of token counts
cv = CountVectorizer(inputCol="reviewTokens",outputCol="cv",vocabSize=296337)

#IDF model
idf = IDF(inputCol="cv",outputCol="features")

#Logistic Boosted Classifier
lr = LogisticRegression(maxIter=100,regParam=0.02,elasticNetParam=0.3)


In [None]:
#Create a pipeline by combining all the functions we defined above - tokenizer , stopwords_remover, cv, idf, gbtc
steps =  [tokenizer, stopwords_remover, cv, idf, lr]
pipeline = Pipeline(stages=steps)

In [None]:
#fit the training dataset dataset into the pipeline 
model = pipeline.fit(trainingData)

In [None]:
#Obtain the predictions from the model 
predictions = model.transform(testData)

In [None]:
#Call the Binary Classification Evaluator function 
evaluator = BinaryClassificationEvaluator()  
areaUnderROC = evaluator.evaluate(predictions)

In [None]:
#model evaluation
lp = predictions.select("label", "prediction")
counttotal = predictions.count()
correct = lp.filter(col("label") == col("prediction")).count()
wrong = lp.filter(~(col("label") == col("prediction"))).count()
ratioWrong = float(wrong) / float(counttotal)
lp = predictions.select(  "prediction","label")
counttotal = float(predictions.count())
correct = lp.filter(col("label") == col("prediction")).count()
wrong = lp.filter("label != prediction").count()
ratioWrong=wrong/counttotal
ratioCorrect=correct/counttotal
trueneg =( lp.filter(col("label") == 0.0).filter(col("label") == col("prediction")).count()) /counttotal
truepos = (lp.filter(col("label") == 1.0).filter(col("label") == col("prediction")).count())/counttotal
falseneg = (lp.filter(col("label") == 0.0).filter(~(col("label") == col("prediction"))).count())/counttotal
falsepos = (lp.filter(col("label") == 1.0).filter(~(col("label") == col("prediction"))).count())/counttotal

precision= truepos / (truepos + falsepos)
recall= truepos / (truepos + falseneg)
#fmeasure= 2  precision  recall / (precision + recall)
accuracy=(truepos + trueneg) / (truepos + trueneg + falsepos + falseneg)

In [None]:
print('counttotal   :', counttotal     )
print('correct      :', correct        )
print('wrong        :', wrong          )
print('ratioWrong   :', ratioWrong     )
print('ratioCorrect :', ratioCorrect   )
print('truen        :', trueneg          )
print('truep        :', truepos          )
print('falsen       :', falseneg         )
print('falsep       :', falsepos         )
print('precision    :', precision      )
print('recall       :', recall         )
#print('fmeasure     :', fmeasure       )
print('accuracy     :', accuracy       )

counttotal   : 7598.0
correct      : 6597
wrong        : 1001
ratioWrong   : 0.13174519610423796
ratioCorrect : 0.868254803895762
truen        : 0.5461963674651223
truep        : 0.32205843643063964
falsen       : 0.04830218478546986
falsep       : 0.08344301131876809
precision    : 0.7942226549821487
recall       : 0.8695806680881307
accuracy     : 0.868254803895762


In [None]:
#Create a pipeline by combining all the functions we defined above - tokenizer , stopwords_remover, cv, idf, gbtc
gbtc = GBTClassifier(maxIter=20)
steps =  [tokenizer, stopwords_remover, cv, idf, gbtc]
pipeline = Pipeline(stages=steps)

In [None]:
model = pipeline.fit(trainingData)

In [None]:
predictions = model.transform(testData)


In [None]:
evaluator = BinaryClassificationEvaluator()  
areaUnderROC = evaluator.evaluate(predictions)
print('Test Area Under ROC', areaUnderROC)

In [None]:
#model evaluation
lp = predictions.select("label", "prediction")
counttotal = predictions.count()
correct = lp.filter(col("label") == col("prediction")).count()
wrong = lp.filter(~(col("label") == col("prediction"))).count()
ratioWrong = float(wrong) / float(counttotal)
lp = predictions.select(  "prediction","label")
counttotal = float(predictions.count())
correct = lp.filter(col("label") == col("prediction")).count()
wrong = lp.filter("label != prediction").count()
ratioWrong=wrong/counttotal
ratioCorrect=correct/counttotal
trueneg =( lp.filter(col("label") == 0.0).filter(col("label") == col("prediction")).count()) /counttotal
truepos = (lp.filter(col("label") == 1.0).filter(col("label") == col("prediction")).count())/counttotal
falseneg = (lp.filter(col("label") == 0.0).filter(~(col("label") == col("prediction"))).count())/counttotal
falsepos = (lp.filter(col("label") == 1.0).filter(~(col("label") == col("prediction"))).count())/counttotal

precision= truepos / (truepos + falsepos)
recall= truepos / (truepos + falseneg)
#fmeasure= 2  precision  recall / (precision + recall)
accuracy=(truepos + trueneg) / (truepos + trueneg + falsepos + falseneg)

In [None]:
print('counttotal   :', counttotal     )
print('correct      :', correct        )
print('wrong        :', wrong          )
print('ratioWrong   :', ratioWrong     )
print('ratioCorrect :', ratioCorrect   )
print('truen        :', trueneg          )
print('truep        :', truepos          )
print('falsen       :', falseneg         )
print('falsep       :', falsepos         )
print('precision    :', precision      )
print('recall       :', recall         )
#print('fmeasure     :', fmeasure       )
print('accuracy     :', accuracy       )

In [None]:
#Create a pipeline by combining all the functions we defined above - tokenizer , stopwords_remover, cv, idf, gbtc
bigram = NGram(inputCol = "tokens", OutpuCol = "bigrams", n = 2)
tf5  = HashingTF(inputCol="bigrams", outputCol="features")

steps =  [tokenizer, stopwords_remover, bigram, tfs, cv, idf, lr]
pipeline = Pipeline(stages=steps)