# Spark NLP Application
## Working version for AWS EC2 cluster deployment

In [None]:
# Start Spark application

sc

In [None]:
#sc.stop() # use to stop the application

In [None]:
# !!!Assure that all libraries are installed on all computers in the cluster!!!

# General
import json
import re, string
import numpy as np
import pandas as pd
import emot
from collections import Counter
from scipy.stats import spearmanr

# Visual
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import seaborn as sns

# NLP
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.stem import WordNetLemmatizer

# AWS
import boto3

# Data preprocessing
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType, FloatType, ArrayType

# Machine Learning
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import NaiveBayes, SVMWithSGD, SVMModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors as MLLibVectors
from pyspark.ml.clustering import LDA

In [None]:
# In you don't have these modules dowloaded.

#nltk.downloader.download('wordnet')
#nltk.downloader.download('vader_lexicon')

In [None]:
# If you correctly configured Spark cluster with s3a file system credentials you can skipen commented lines below.

sqlContext = SQLContext(sc)
s3 = boto3.resource('s3')
bucket = s3.Bucket('BUCKET_NAME')
#aws_secret_access_key = ''
#aws_access_key_id = ''
#sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", aws_access_key_id)
#sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", aws_secret_access_key)
object_list = [k for k in bucket.objects.all() ]
key_list = [k.key for k in bucket.objects.all()]
key_list

In [None]:
paths = ['s3a://'+o.bucket_name+'/'+ o.key for o in object_list]
path_reviews = paths[0]
data = sqlContext.read.json(path_reviews)
data.createOrReplaceTempView("data")

In [None]:
path_metadata = paths[1]
metadata = spark.read.json(path_metadata)
metadata.createOrReplaceTempView("metadata")

In [None]:
n = data.count()
n_distinct = data.distinct().count()
users = data.select([c for c in data.columns if c == 'reviewerID']).distinct().count()
books = data.select([c for c in data.columns if c == 'asin']).distinct().count()

print('Number of observations: {0}'.format(n))
print('Number of unique observations: {0}'.format(n_distinct))
print('Number of duplicates: {0}'.format(n - n_distinct))
print('Number of users: {0}'.format(users))
print('Number of books: {0}'.format(books))

In [None]:
sample_data = sample_data.dropDuplicates(subset = [c for c in sample_data.columns if c not in ['asin', 'reviewerID']])
sample_data.createOrReplaceTempView("sample_data")

## Data Cleaning and Extraction

In [None]:
# Test custom function to extract emojis from text.

def emoji(string):
    start = ' '.join([i for i in string.split(' ') if not any(a in i for a in ['oo', 'OO', 'xp'])])
    first = emot.emoticons(start)
    second = set([i['value'] for i in first if i and i != ')'])
    return list(second)

emoji_detector = udf(emoji)
data = data.withColumn('Emoji', emoji_detector(col('reviewText')))
data.createOrReplaceTempView("data")

In [None]:
# Concatenation of short summary with the comment
sqlTransform = SQLTransformer(statement="SELECT *, concat(summary, ' ', reviewText) as text, concat(asin, ' ', reviewerID) as ID FROM __THIS__")

# Tokenizing the text
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="[^\\p{L}]")

# Remving stop words from text
stopremover = StopWordsRemover(inputCol="tokens", outputCol="stop_tokens")

# Vectorization of the text
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='rawFeatures')

# Calcualtion of inverted document frequency
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Building feature extraction pipeline
data_preproc = Pipeline(stages=[sqlTransform, tokenizer, stopremover, count_vec, idf])

# Extracting the features
cleaner = data_preproc.fit(data)
data = cleaner.transform(data)
data.createOrReplaceTempView("data")

In [None]:
# VADER sentiment analyzer application

analyzer = SentimentIntensityAnalyzer()

def sent(scoring):
    if scoring['compound'] > 0.5:
        a = 'positive'
    elif scoring['compound'] < 0.5 and scoring['compound'] > -0.5:
        a = 'neutral'
    else:
        a = 'negative'
    return a

# Lemmatization of words

def lemma(x):
    wordnet_lemmatizer = WordNetLemmatizer()
    return [wordnet_lemmatizer.lemmatize(t) for t in x]

# Additional features

sentiment = udf(lambda x: sent(analyzer.polarity_scores(x)), StringType())
votes = udf(lambda h: None if h[1] == 0 else h[1], IntegerType())
helpful = udf(lambda h: h[0], IntegerType())
lemmatize = udf(lemma, ArrayType(StringType(), True))
score = udf(lambda x: analyzer.polarity_scores(x)['compound'], FloatType())

data = data.withColumn('ReviewDate', to_date(from_unixtime(col("unixReviewTime"), format='yyyy-MM-dd'))) \
                     .withColumn('HelpfulVotes', helpful(col("helpful"))) \
                     .withColumn('TotalVotes', votes(col("helpful"))) \
                     .withColumn('RawReviewLength', length(col("reviewText"))) \
                     .withColumn('TokensLength', size(col("tokens"))) \
                     .withColumn('StopTokensLength', size(col("stop_tokens"))) \
                     .withColumn('Lemmatized', lemmatize(col("stop_tokens"))) \
                     .withColumn('Sentiment', sentiment(col("reviewText"))) \
                     .withColumn('Compound_Score', score(col("reviewText"))) \
                     .select('ID', 'asin', 'reviewerID', 'ReviewDate', 'TotalVotes', 'HelpfulVotes', 'overall', 'TokensLength', 'StopTokensLength',
                             'RawReviewLength', 'text', 'stop_tokens', 'Lemmatized', 'Sentiment', 'Compound_Score', 'features')

data.createOrReplaceTempView("data")
#data.cache() # Cache the dataframe if needed

## Data consolidation and outliers removal

In [None]:
# Check fraction of missing values in columns

data.agg(*[(1 - (count(c) / count('*'))).alias(c) for c in data.columns]).show()

In [None]:
# Removing missing values and consolidating the dataset with metadata

data = data.dropna() \
           .withColumn('Helpfulness', col("HelpfulVotes")/col('TotalVotes'))
data = data.filter(data.Helpfulness <= 1)
data.createOrReplaceTempView("data")

metadata = spark.sql("""select asin, title, price, salesRank['Books'] as SalesRank
                            from metadata
                            where asin is not null and price is not null
                                  and salesRank['Books'] is not null""")
metadata.createOrReplaceTempView("working_meta")

data_set = spark.sql("""select * from data a
                        left join metadata b
                        on a.asin = b.asin
                        where b.asin is not null""")
data_set.createOrReplaceTempView("data_set")

In [None]:
# Converting from RDD to Pandas DataFrame

numerical_columns = ['TotalVotes', 'HelpfulVotes', 'overall', 'TokensLength', 'StopTokensLength',
                    'RawReviewLength', 'Compound_Score', 'Helpfulness', 'price', 'SalesRank', 'Sentiment']
pdData = data_set.select(numerical_columns).toPandas()

In [None]:
# Countplot for sentiments groups

plt.clf()
sns.set(style="darkgrid")
hist = sns.countplot(x="Sentiment", data=pdData, palette="Greens_d")
total = float(len(pdData))
for p in hist.patches:
    height = p.get_height()
    hist.text(p.get_x()+p.get_width()/2.,
            height + 3,
            '{:1.2f}'.format(height),
            ha="center") 
#display(hist.figure)
plt.show()

In [None]:
# Distribution plot for price

plt.clf()
sns.distplot(pdData.price, color = 'lightseagreen')
plt.show()

In [None]:
plt.clf()
sns.set(style="ticks")
len_cols =  [i for i in pdData.columns if 'Length' in i]
d = {}
for c in len_cols:
    g = sns.jointplot(x=pdData[c], y=pdData["Helpfulness"], kind='hex', color="#4CB391")
    d[c] = g.fig

d[len_cols[2]]
plt.show()

In [None]:
# Correlation and distribution for 'price' and 'helpfulness'

plt.clf()
g = sns.JointGrid(x="Helpfulness", y="price", data=pdData) 
g.plot_joint(sns.regplot, order=2)
g.plot_marginals(sns.distplot)
plt.show()

In [None]:
# Diagonal correlation matrix for numerival features

plt.clf()

sns.set(style="white")

corr = pdData[num_cols].corr()
mask = np.zeros_like(corr, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True

f, ax = plt.subplots(figsize=(9, 7))

cmap = sns.diverging_palette(200, 20, as_cmap=True)

sns.heatmap(corr, mask=mask, cmap=cmap, vmax=.5, center=0,
            square=True, linewidths=.65, cbar_kws={"shrink": .5})
ax.set_title('Diagonal correlation matrix')
plt.show()

In [None]:
# Scatter plots

plt.clf()
g = sns.FacetGrid(pdData, col="Sentiment")
g.map(plt.scatter, "TotalVotes", "StopTokensLength", alpha=.5)
g.add_legend()
plt.figure(figsize=(15,9))
plt.show()

In [None]:
metrics_by_sentiment = pdData.groupby('Sentiment').agg({"price": "max", "overall": "mean"})
metrics_by_sentiment.columns = ['Average Score', 'Maximum Price']
metrics_by_sentiment

## Outliers

In [None]:
data_set.persist()

In [None]:
# Tukey outlier rule implementation

cols = ['price', 'RawReviewLength']
bounds = {}

for col in cols:
    quantiles = data_set.approxQuantile(col, [0.25, 0.75], 0)
    IQR = quantiles[1] - quantiles[0]
    bounds[col] = [
        quantiles[0] - 1.5 * IQR, 
        quantiles[1] + 1.5 * IQR
]
    
print(bounds)

In [None]:
# Removing outliers

outliers = data_set.select(*['ID'] + [
    (
        (data_set[c] > 0) & (data_set[c] < bounds[c][1])
    ).alias(c + '_out') for c in cols
])
#outliers.show()

In [None]:
df_outliers = data_set.join(outliers, on='ID')
df_out = df_outliers.filter('price_out').filter('RawReviewLength_out').filter(df_outliers.TotalVotes < 200) \
                         .drop('price_out').drop('RawReviewLength_out')
#display(df_out)

In [None]:
pdData_out = df_out.select(num_cols).toPandas()

In [None]:
plt.clf()
plt.figure(figsize=(15,9))
sns.violinplot(x="Sentiment", y="RawReviewLength", data=pdData_out, palette="muted")
plt.show()

In [None]:
plt.clf()
sns.set(style="darkgrid")
hist = sns.countplot(x="Sentiment", data=pdData_out, palette="Greens_d")
total = float(len(pdData_out))
for p in hist.patches:
    height = p.get_height()
    hist.text(p.get_x()+p.get_width()/2.,
            height + 3,
            '{:1.2f}'.format(height),
            ha="center") 
#display(hist.figure)
plt.figure(figsize=(13,10))
plt.show()

In [None]:
plt.clf()
plt.figure(figsize=(13,10))
sns.set(style="ticks")
g = sns.jointplot(x=pdData_out['StopTokensLength'], y=pdData_out["Compound_Score"], kind='hex', color="maroon", stat_func=spearmanr)
g.fig.suptitle('StopTokensLength vs. RawReviewLength')
plt.show()

In [None]:
# Wordcloud

def most_common_words(set, column, n = 100):
    data_pd = set.select(column).toPandas()
    reviews = data_pd[column].tolist()
    out = []
    for i in range(0, len(reviews)):
        out.extend([e.encode('utf-8').strip() for e in reviews[i] if len(e.encode('utf-8').strip()) > 2])
    out = Counter(out).most_common(n)
    toPD = pd.DataFrame(out, columns = ['Words', ' Count']).set_index('Words')
    d = {}
    for i in range(0, n):
        d[list(toPD.index)[i]] = toPD.iloc[:,0][i]
    return d

def show_wordcloud(set, backgroud = "white"):
    wc = WordCloud(background_color = backgroud, max_words=2000,
               max_font_size=40, random_state=42)
    wc.generate_from_frequencies(set)
    wcd = plt.figure()
    plt.imshow(wc, interpolation="bilinear")
    plt.axis("off")
    display(wcd)

top_100 = most_common_words(data_set, 'Lemmatized')
show_wordcloud(top_100)

## Modeling

In [None]:
df_out = df_out.select('ID', 'title', 'price', 'SalesRank', 'ReviewDate', 'overall', 'Lemmatized', 'TotalVotes', 'HelpfulVotes',
                           'Helpfulness', 'RawReviewLength', 'StopTokensLength', 'features', 'Compound_Score', 'Sentiment') \
                   .filter(data_set.Sentiment != 'neutral')

df_out.createOrReplaceTempView("df_out")

indexer = StringIndexer(inputCol="Sentiment", outputCol="label")
fitted_data = indexer.fit(df_out)
indexed_data = fitted_data.transform(df_out)

In [None]:
# Display labeled records

indexed_data.select('Sentiment', 'label').show(n = 10, truncate=False)

In [None]:
# Downsampling for positive records

stratified_data = indexed_data.sampleBy('label', fractions={0: 269356./1832111, 1: 1.0})
stratified_data.groupby('label').count().show()

In [None]:
stratified_data.show(n = 10, truncate=False)

In [None]:
splits = stratified_data.select(['label', 'features']).randomSplit([0.8,0.2],seed=123)
train = splits[0].cache()
test = splits[1].cache()

In [None]:
to_float = udf(lambda x: x['label'], FloatType())
label = stratified_data.rdd.map(lambda x: to_float(x))
features = stratified_data.rdd.map(lambda x: (x['features'], ))
scaler = StandardScaler().fit(features).transform(features)
data1 = label.zip(scaler).toDF(['label', 'features'])
data1.show()

In [None]:
# Naive Bayes model

nb = NaiveBayes(smoothing=3.0, modelType="multinomial")

# Training
model = nb.fit(train)

# Predicting
predictions = model.transform(test)

# Checking the quality of the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
predictions.show()

In [None]:
# Create (prediction, label) pairs
predictionAndLabel = predictions.select("prediction", "label").rdd

# Generate confusion matrix
metrics = MulticlassMetrics(predictionAndLabel)
print(metrics.confusionMatrix())
confMatrx = metrics.confusionMatrix().toArray().astype(int)

In [None]:
plt.clf()
plt.figure(figsize=(13,10))
sns.heatmap(confMatrx, annot=True, fmt="d", cmap="YlGnBu")
#display(ax.figure)
plt.show()

In [None]:
# Gridsearch on smoothing parameter

smoothing = np.arange(0.0, 1.0, 0.1).tolist()

paramGrid = ParamGridBuilder().addGrid(nb.smoothing, smoothing).build()
cvEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
cv = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=5)
cvModel = cv.fit(train)

cvPredictions = cvModel.transform(test)
cvPredictions.select("label", "prediction", "probability").show()

In [None]:
print("Test set accuracy = " + str(cvEvaluator.evaluate(cvPredictions)))

In [None]:
cvPredictions_val = cvPredictions.select("prediction", "label").rdd

metrics_cv = MulticlassMetrics(cvPredictions_val)
print(metrics_cv.confusionMatrix())
confMatrx_cv = metrics_cv.confusionMatrix().toArray().astype(int)

In [None]:
plt.clf()
plt.figure(figsize=(13,10))
sns.heatmap(confMatrx_cv, annot=True, fmt="d", cmap="YlGnBu")
#display(ax.figure)
plt.show()

## 2-Grams model

### Add ngram column

In [None]:
n = 2
ngram = NGram(inputCol = 'Lemmatized', outputCol = 'ngram', n = n)
add_ngram = ngram.transform(stratified_data)

### Count vectorizer and tfidf

In [None]:
cv_ngram = CountVectorizer(inputCol='ngram', outputCol='tf_ngram')
cvModel_ngram = cv_ngram.fit(add_ngram)
cv_df_ngram = cvModel_ngram.transform(add_ngram)

### Create TF-IDF matrix

In [None]:
idf_ngram = IDF().setInputCol('tf_ngram').setOutputCol('tfidf_ngram')
tfidfModel_ngram = idf_ngram.fit(cv_df_ngram)
tfidf_df_ngram = tfidfModel_ngram.transform(cv_df_ngram)

### Split into training & testing set

In [None]:
splits_ngram = tfidf_df_ngram.select(['tfidf_ngram', 'label']).randomSplit([0.8,0.2],seed=123)
train_ngram = splits_ngram[0].cache()
test_ngram = splits_ngram[1].cache()

### Convert feature matrix to LabeledPoint vectors

In [None]:
train_lb_ngram = train_ngram.rdd.map(lambda row: LabeledPoint(row[1], MLLibVectors.fromML(row[0])))
test_lb_ngram = train_ngram.rdd.map(lambda row: LabeledPoint(row[1], MLLibVectors.fromML(row[0])))

### Fit SVM model of only trigrams

In [None]:
# Try to tune this model

numIterations = 50
regParam = 0.3
svm = SVMWithSGD.train(train_lb_ngram, numIterations, regParam=regParam)

### Extract top 20 trigrams based on weights

In [None]:
scoreAndLabels_test = test_lb_ngram.map(lambda x: (float(svm.predict(x.features)), x.label))
score_label_test = spark.createDataFrame(scoreAndLabels_test, ["prediction", "label"])
f1_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
svm_f1 = f1_eval.evaluate(score_label_test)
print("F1 score: %.4f" % svm_f1)

In [None]:
add_ngram.select('ngram').show()

## LDA - topic extraction

In [None]:
lda = LDA(k=10, maxIter=10)
model = lda.fit(train)

ll = model.logLikelihood(train)
lp = model.logPerplexity(train)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics()
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(test)
transformed.show(truncate=False)

In [None]:
count_vec = CountVectorizer(inputCol='Lemmatized',outputCol='rawFeatures')
count_vec_model = count_vec.fit(stratified_data)

vocab = count_vec_model.vocabulary

In [None]:
transformed.show()

In [None]:
topics_words = topics.rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print("topic: ", idx)
    print("----------")
    for word in topic:
        print(word)
    print("----------")