In [0]:
# Load Data
df = spark.sql("select * from default.reviews_train")
print((df.count(), len(df.columns)))

(3138710, 11)


In [0]:
# Drop duplicates

print("Before duplication removal: ", df.count())
df_distinct = df.dropDuplicates(['reviewText', "asin", 'label'])
print("After duplication removal: ", df_distinct.count())

Before duplication removal:  3138710
After duplication removal:  2910132


In [0]:
# convert Unix timestamp
from pyspark.sql import functions
from pyspark.sql.functions import *
from pyspark.sql.types import *

# convert timestamp
df_distinct_with_date = df_distinct.withColumn("reviewTime", to_date(from_unixtime(df_distinct.unixReviewTime))) \
                                                .drop("unixReviewTime")

# create a new column containing the length of review text
df_distinct_with_date = df_distinct_with_date.withColumn("length", length("reviewText"))

# create a new column with current_date
df_distinct_with_date.withColumn("current_date", current_date())

# returns the number of days between current date and review time
df_distinct_with_date = df_distinct_with_date.withColumn("datediff", datediff(col("current_date"), col("reviewTime")))

df_distinct_with_date = df_distinct_with_date.withColumn("verifiedInt", functions.col("verified").cast("int")).drop("verified")

In [0]:
df1 = df_distinct_with_date.select("reviewerID", "overall", "verifiedInt", "datediff", "asin", "length", "reviewText", "summary", "label")

In [0]:
df1=df1.toPandas()
df1.head

Out[11]: <bound method NDFrame.head of              reviewerID  ...  label
0        A2N75YXP8BT2QR  ...      0
1        A3AE7XLS3DUMRQ  ...      1
2        A1JMLEXBKC323A  ...      0
3        A1JMLEXBKC323A  ...      0
4        A1S4Y9XTBI4JEO  ...      0
...                 ...  ...    ...
2910127  A1AQV9U4HSHCZU  ...      0
2910128  A2E95A6QPQ4EDM  ...      0
2910129   A36JTJXJB7T2S  ...      1
2910130  A1W7TXSVG81XD0  ...      1
2910131  A2C2R0PRKU2U6J  ...      0

[2910132 rows x 9 columns]>

In [0]:
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer

sent = SentimentIntensityAnalyzer()

In [0]:
# define a function
def check_compound(x):
    return sent.polarity_scores(x)['compound']

In [0]:
df1['compound_score'] = df1['reviewText'].apply(lambda x: check_compound(x))

In [0]:
df1.head()

Unnamed: 0,reviewerID,overall,verifiedInt,datediff,asin,length,reviewText,summary,label,compound_score
0,A2N75YXP8BT2QR,5.0,1,3004,B001FB59WM,58,Perfect Transaction! Highly Recommended 5 ...,Five Stars,0,0.861
1,A3AE7XLS3DUMRQ,1.0,1,2530,B0012C6VYO,1391,Price Point: 1/5 Quality/Durability: 1/5 ...,Disappointed is an understatment. This was a H...,1,0.9413
2,A1JMLEXBKC323A,5.0,1,2419,0060263415,53,10 Stars HIGHLY RECOMMENDED Seeing IS Belie...,Five Stars,0,0.5928
3,A1JMLEXBKC323A,5.0,1,2419,B000RGVPTC,53,10 Stars HIGHLY RECOMMENDED Seeing IS Belie...,Five Stars,0,0.5928
4,A1S4Y9XTBI4JEO,5.0,1,1775,B0001XAO7A,54,Bowden's Masonry/Another Day Music/Designing ...,Five Stars,0,0.4215


In [0]:
from textblob import TextBlob

In [0]:
# define a function
def check_polarity(x):
    return TextBlob(x).polarity

In [0]:
df1['polarity_score'] = df1['reviewText'].apply(lambda x: check_polarity(x))

In [0]:
df1.head(5)

Unnamed: 0,reviewerID,overall,verifiedInt,datediff,asin,length,reviewText,summary,label,compound_score,polarity_score
0,A2N75YXP8BT2QR,5.0,1,3004,B001FB59WM,58,Perfect Transaction! Highly Recommended 5 ...,Five Stars,0,0.861,0.47
1,A3AE7XLS3DUMRQ,1.0,1,2530,B0012C6VYO,1391,Price Point: 1/5 Quality/Durability: 1/5 ...,Disappointed is an understatment. This was a H...,1,0.9413,0.070251
2,A1JMLEXBKC323A,5.0,1,2419,0060263415,53,10 Stars HIGHLY RECOMMENDED Seeing IS Belie...,Five Stars,0,0.5928,0.2
3,A1JMLEXBKC323A,5.0,1,2419,B000RGVPTC,53,10 Stars HIGHLY RECOMMENDED Seeing IS Belie...,Five Stars,0,0.5928,0.2
4,A1S4Y9XTBI4JEO,5.0,1,1775,B0001XAO7A,54,Bowden's Masonry/Another Day Music/Designing ...,Five Stars,0,0.4215,0.0


In [0]:
df1 = spark.createDataFrame(df1)

In [0]:
from sparknlp.base import *
from sparknlp.annotator import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, IDF, StringIndexer, VectorAssembler, OneHotEncoder, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# convert text column to spark nlp document
document_assembler = DocumentAssembler() \
    .setInputCol("reviewText") \
    .setOutputCol("document")

sentenceDetector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

# convert document to array of tokens
regexTokenizer = RegexTokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("regexToken") \
    .setToLowercase(True) \
    .setPattern("\\s+")
 
# clean tokens 
normalizer = Normalizer() \
    .setInputCols(["regexToken"]) \
    .setOutputCol("normalized")

# # remove stopwords
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols(["normalized"])\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

lemmatizer = LemmatizerModel.pretrained() \
.setInputCols(["cleanTokens"]) \
.setOutputCol("lemma")

# Convert custom document structure to array of tokens.
finisher = Finisher() \
    .setInputCols(["lemma"]) \
    .setOutputCols("token_features") \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

countVectors = CountVectorizer(inputCol="token_features",
                               outputCol="features", 
                               vocabSize=10000, 
                               minTF=1,
                               minDF=50,
                               maxDF=0.40)

idf = IDF(inputCol=countVectors.getOutputCol(), 
          outputCol="idfFeatures") #IDF

#####################################################################################
# for summary
# convert text column to spark nlp document
document_assembler_s = DocumentAssembler() \
    .setInputCol("summary") \
    .setOutputCol("document_s")

sentenceDetector_s = SentenceDetector() \
    .setInputCols(["document_s"]) \
    .setOutputCol("sentence_s")

# convert document to array of tokens
regexTokenizer_s = RegexTokenizer() \
    .setInputCols(["sentence_s"]) \
    .setOutputCol("regexToken_s") \
    .setToLowercase(True) \
    .setPattern("\\s+")
 
# clean tokens 
normalizer_s = Normalizer() \
    .setInputCols(["regexToken_s"]) \
    .setOutputCol("normalized_s")

# # remove stopwords
stopwords_cleaner_s = StopWordsCleaner()\
      .setInputCols(["normalized_s"])\
      .setOutputCol("cleanTokens_s")\
      .setCaseSensitive(False)

lemmatizer_s = LemmatizerModel.pretrained() \
.setInputCols(["cleanTokens_s"]) \
.setOutputCol("lemma_s")

# Convert custom document structure to array of tokens.
finisher_s = Finisher() \
    .setInputCols(["lemma_s"]) \
    .setOutputCols("token_features_s") \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

countVectors_s = CountVectorizer(inputCol="token_features_s",
                               outputCol="features_s", 
                               vocabSize=10000, 
                               minTF=1,
                               minDF=50,
                               maxDF=0.40)

idf_s = IDF(inputCol=countVectors_s.getOutputCol(), 
          outputCol="idfFeatures_s") #IDF

#####################################################################################

# impute missing values
imputer = Imputer(inputCols=["overall", "verifiedInt", "datediff"],
                  outputCols=["imputedOverall", "imputedVerifiedInt", "imputedDatediff"])

# Encode a string column to a label indices
indexer = StringIndexer(inputCols=["imputedOverall", "asin"],
                        outputCols=["overallIndex", "asinIndex"]).setHandleInvalid("keep")

# Encode asin (productID)
encoder = OneHotEncoder(inputCols=["overallIndex", "imputedVerifiedInt", "length", "asinIndex", "imputedDatediff"],
                        outputCols=["encodedOverall", "encodedVerified", "encodedLength", "encodedAsin", "encodedDatediff"])

# Combine all features into one final "features" column
assembler = VectorAssembler(inputCols=["encodedOverall","encodedVerified", "encodedLength", "encodedAsin", "encodedDatediff", "idfFeatures", "idfFeatures_s","compound_score", "polarity_score"],
                            outputCol="assembledFeatures")

# ML Algorithm
lr = LogisticRegression(featuresCol = "assembledFeatures",
                        maxIter=300, 
                        regParam=0.01, 
                        elasticNetParam=0.0)

# Pipeline
nlp_pipeline = Pipeline(
    stages=[document_assembler,
            sentenceDetector,
            regexTokenizer,
            normalizer,
            stopwords_cleaner,
            lemmatizer, 
            finisher,
            countVectors,
            idf,
            document_assembler_s,
            sentenceDetector_s,
            regexTokenizer_s,
            normalizer_s,
            stopwords_cleaner_s,
            lemmatizer_s, 
            finisher_s,
            countVectors_s,
            idf_s,
            imputer,
            indexer,
            encoder,
            assembler,
            lr])

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ][OK!]
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ][OK!]


In [0]:
#  # train/test split
# (trainingData, testingData) = df1.randomSplit([0.85,0.15], seed=47)
# print("Training Dataset Count: " + str(trainingData.count()))
# print("Test Dataset Count: " + str(testingData.count()))

In [0]:
pipeline_model = nlp_pipeline.fit(df1)

In [0]:
# Load test datasets
test_df = spark.sql("select * from default.reviews_test")
# test_df.show(5)
print((test_df.count(), len(test_df.columns)))

(348621, 10)


In [0]:
test_df = test_df.withColumn("reviewTime", to_date(from_unixtime(test_df.unixReviewTime))) \
                                                .drop("unixReviewTime")

test_df = test_df.withColumn("length", length("reviewText"))

# create a new column with current_date
test_df.withColumn("current_date", current_date())

# returns the number of days between current date and review time
test_df = test_df.withColumn("datediff", datediff(col("current_date"), col("reviewTime")))

test_df = test_df.withColumn("verifiedInt", functions.col("verified").cast("int")).drop("verified")

In [0]:
test_df = test_df.select("reviewID", "overall","verifiedInt","datediff","asin","reviewerID","length","reviewText", "summary")

In [0]:
test_df=test_df.toPandas()
test_df['compound_score'] = test_df['reviewText'].apply(lambda x: check_compound(x))

In [0]:
test_df['polarity_score'] = test_df['reviewText'].apply(lambda x: check_polarity(x))

In [0]:
test_df = spark.createDataFrame(test_df)

In [0]:
kaggle_pred = pipeline_model.transform(test_df)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

probelement=udf(lambda v:float(v[1]),FloatType())
submission_data = kaggle_pred.select('reviewID', probelement('probability')).withColumnRenamed('<lambda>(probability)', 'label')

In [0]:
display(submission_data.select('reviewID', 'label'))

reviewID,label
80000001,0.010426694
80000002,0.05640943
80000003,0.016121844
80000004,0.19560656
80000005,0.7740162
80000006,0.42030618
80000007,0.12573153
80000008,0.16405371
80000009,0.095654555
80000010,0.84044164
