# A Large Scale Sentiment Data Classification For Online Reviews Under Apache Spark

This dataset consists of a few million Amazon customer reviews (input text) and star ratings (output labels) for learning how to train fastText for sentiment analysis. The idea here is a dataset is more than a toy - real business data on a reasonable scale - but can be trained in minutes on a modest laptop.

Team Member : 

Seldi Kurnia T				13516042

I Putu Eka Surya A			13516061

Tanor Abaraham 				13516088

Gerardus Samudra S 			13516103

Kevin Andrian L 			13516118

Regi Arjuna Purba 			13516149

## Read from File and Convert to Dataframe

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml import Pipeline
import re
from pyspark_dist_explore import hist
import matplotlib.pyplot as plt

spark = SparkSession \
    .builder \
    .appName("dataframe-spark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

def readandConverttoDF(path):
    fields = StructType([
                StructField("label", IntegerType(), True), 
                StructField("title", StringType(), True),
                StructField("text", StringType(), True),
            ])
    df = spark.read.csv(path, schema = fields, sep = ",").select("title", "text", "label")
    return df

train_data = readandConverttoDF("./data/train.csv")
test_data = readandConverttoDF("./data/test.csv")

# Beware, these prints take approximately 10 minutes in total. You can stop the execution here if you want to examine other cells
train_data.show()
print("Total train data:", train_data.count())
print("Total train data with positive class:", train_data.filter(col('label') == 2).count())
print("Total train data with negative class:", train_data.filter(col('label') == 1).count())
test_data.show()
print("Total test data:", test_data.count())
print("Total test data with positive class:", test_data.filter(col('label') == 2).count())
print("Total test data with negative class:", test_data.filter(col('label') == 1).count())

+--------------------+--------------------+-----+
|               title|                text|label|
+--------------------+--------------------+-----+
|Stuning even for ...|This sound track ...|    2|
|The best soundtra...|I'm reading a lot...|    2|
|            Amazing!|"This soundtrack ...|    2|
|Excellent Soundtrack|I truly like this...|    2|
|Remember, Pull Yo...|If you've played ...|    2|
|an absolute maste...|I am quite sure a...|    2|
|        Buyer beware|"This is a self-p...|    1|
|      Glorious story|I loved Whisper o...|    2|
|    A FIVE STAR BOOK|I just finished r...|    2|
|Whispers of the W...|This was a easy t...|    2|
|          The Worst!|A complete waste ...|    1|
|          Great book|This was a great ...|    2|
|          Great Read|I thought this bo...|    2|
|           Oh please|I guess you have ...|    1|
|Awful beyond belief!|"I feel I have to...|    1|
|Don't try to fool...|It's glaringly ob...|    1|
|A romantic zen ba...|"When you hear fo...|    2|


## Exploratory Data Analysis

### 1. Checking Null Values

In [17]:
# Beware, these checks take approximately 15 minutes in total.
train_data.where(col("text").isNull()).show()
train_data.where(col("title").isNull()).show()
train_data.where(col("label").isNull()).show()

+--------------------+----+-----+
|               title|text|label|
+--------------------+----+-----+
|"It's just like e...|null|    1|
|"Green in a ""Par...|null|    2|
|"TOO HARD FOR ME ...|null|    1|
|"second the ""Try...|null|    1|
|"too bad ""fold y...|null|    2|
|"I have only ""On...|null|    2|
|"i got a defect :...|null|    1|
|"Not Impressed:-\...|null|    1|
|              """Out|null|    2|
|In the words of L...|null|    1|
|"Should be called...|null|    1|
|": ","Doesn't wor...|null|    1|
|"did not work:","...|null|    1|
+--------------------+----+-----+

+-----+--------------------+-----+
|title|                text|label|
+-----+--------------------+-----+
| null|"What separates t...|    1|
| null|"Falkenbach retur...|    2|
| null|"I am a Shakespea...|    2|
| null|Goes at quite a s...|    2|
| null|This Japanese ban...|    1|
| null|this cd isnt band...|    1|
| null|If your intereste...|    2|
| null|"Whether the lyri...|    1|
| null|This show lacks m...|    1|
|

In [18]:
test_data.where(col("text").isNull()).show()
test_data.where(col("title").isNull()).show()
test_data.where(col("label").isNull()).show()

+--------------------+----+-----+
|               title|text|label|
+--------------------+----+-----+
|"My brain hurts f...|null|    2|
+--------------------+----+-----+

+-----+--------------------+-----+
|title|                text|label|
+-----+--------------------+-----+
| null|I can't give you ...|    1|
| null|"Some good rockin...|    1|
| null|The Moon Under th...|    2|
| null|Very interesting ...|    2|
+-----+--------------------+-----+

+-----+----+-----+
|title|text|label|
+-----+----+-----+
+-----+----+-----+



### 2. Word Count and Length Text

In [19]:
data_train = train_data.withColumn('wordCount', size(split(col('text'), ' '))).withColumn('textLength', length(col('text')))
data_test = test_data.withColumn('wordCount', size(split(col('text'), ' '))).withColumn('textLength', length(col('text')))
data_train.show()
data_test.show()

+--------------------+--------------------+-----+---------+----------+
|               title|                text|label|wordCount|textLength|
+--------------------+--------------------+-----+---------+----------+
|Stuning even for ...|This sound track ...|    2|       75|       394|
|The best soundtra...|I'm reading a lot...|    2|       91|       470|
|            Amazing!|"This soundtrack ...|    2|       63|       370|
|Excellent Soundtrack|I truly like this...|    2|      116|       721|
|Remember, Pull Yo...|If you've played ...|    2|       77|       425|
|an absolute maste...|I am quite sure a...|    2|      139|       800|
|        Buyer beware|"This is a self-p...|    1|      116|       631|
|      Glorious story|I loved Whisper o...|    2|      103|       506|
|    A FIVE STAR BOOK|I just finished r...|    2|       99|       506|
|Whispers of the W...|This was a easy t...|    2|       58|       270|
|          The Worst!|A complete waste ...|    1|       33|       204|
|     

In [20]:
data = data_train.union(data_test)

In [None]:
def plot_to_hist(data):
    data_positive = data.filter(col('label') == 2)
    data_negative = data.filter(col('label') == 1)
    fig, ax = plt.subplots(nrows=2, ncols=2)
    fig.set_size_inches(20, 20)

    # Beware, these histograms take approximately 15 minutes in total
    hist(ax[0,0], data_negative.select("wordCount").alias("wordCountNegativeClass"), bins = 50)
    ax[0,0].set_title('wordCountNegativeClass')
    hist(ax[0,1], data_positive.select("wordCount").alias("wordCountPositiveClass"), bins = 50)
    ax[0,1].set_title('wordCountPositiveClass')
    hist(ax[1,0], data_negative.select("textLength").alias("textLengthNegativeClass"), bins = 50)
    ax[1,0].set_title('textLengthNegativeClass')
    hist(ax[1,1], data_positive.select("textLength").alias("textLengthPositiveClass"), bins = 50)
    ax[1,1].set_title('textLengthNegativeClass')

plot_to_hist(data_train)

## Data Preprocessing

### 1. Removing Null Reviews

In [22]:
train_data = train_data.filter(train_data.text.isNotNull())
print("Total train data after removing null reviews:", train_data.count())
test_data = test_data.filter(test_data.text.isNotNull())
print("Total test data after removing null reviews:", test_data.count())

Total train data after removing null reviews: 3599987
Total test data after removing null reviews: 399999


### 2. Noise Removal

In [23]:
train_data = train_data.select(regexp_replace(col("text"),"[^a-zA-Z\s]", "").alias("noiseRemoval"), "label")
train_data.show(truncate=False)
test_data = test_data.select(regexp_replace(col("text"),"[^a-zA-Z\s]", "").alias("noiseRemoval"), "label")
test_data.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|noiseRemoval                                                                                                                                                                                                                     

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|noiseRemoval                                                                                                                                               

### 3. Tokenization

In [24]:
tokenizer = Tokenizer(inputCol="noiseRemoval", outputCol="tokens")

### 4. Stopwords Removal

In [25]:
remover = StopWordsRemover(inputCol="tokens", outputCol="tokensWithoutStopwords")

## Feature Extraction

In [26]:
count_vectorizer = CountVectorizer(inputCol="tokensWithoutStopwords", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
label_to_index = StringIndexer(inputCol = "label", outputCol = "target")

pipeline = Pipeline(stages=[tokenizer, remover, count_vectorizer, idf, label_to_index])

fitting = pipeline.fit(train_data)
train_data = fitting.transform(train_data)
test_data = fitting.transform(test_data)
train_data.show()
test_data.show()

+--------------------+-----+--------------------+----------------------+--------------------+--------------------+------+
|        noiseRemoval|label|              tokens|tokensWithoutStopwords|         rawFeatures|            features|target|
+--------------------+-----+--------------------+----------------------+--------------------+--------------------+------+
|This sound track ...|    2|[this, sound, tra...|  [sound, track, be...|(262144,[13,14,31...|(262144,[13,14,31...|   0.0|
|Im reading a lot ...|    2|[im, reading, a, ...|  [im, reading, lot...|(262144,[2,15,20,...|(262144,[2,15,20,...|   0.0|
|This soundtrack i...|    2|[this, soundtrack...|  [soundtrack, favo...|(262144,[0,3,8,42...|(262144,[0,3,8,42...|   0.0|
|I truly like this...|    2|[i, truly, like, ...|  [truly, like, sou...|(262144,[0,2,3,8,...|(262144,[0,2,3,8,...|   0.0|
|If youve played t...|    2|[if, youve, playe...|  [youve, played, g...|(262144,[4,30,31,...|(262144,[4,30,31,...|   0.0|
|I am quite sure a...|  

### Naive Bayes Algorithm

In [28]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train_data.select(col("features"), col("target").alias("label")))

# select example rows to display.
predictions = model.transform(test_data.select(col("features"), col("target").alias("label")))
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(262144,[1,2,3,4,...|  0.0|[-2164.4309793119...|[1.0,1.6957215180...|       0.0|
|(262144,[2,3,5,13...|  0.0|[-2699.5954853610...|[1.0,1.6919322396...|       0.0|
|(262144,[0,6,13,1...|  1.0|[-1189.6424315314...|[4.60928361890479...|       1.0|
|(262144,[0,22,91,...|  0.0|[-1194.0128720608...|[1.33065926102642...|       1.0|
|(262144,[10,13,27...|  0.0|[-2021.8494276535...|[0.99999995224607...|       0.0|
|(262144,[2,7,13,1...|  1.0|[-1386.2800130818...|[4.00757576297259...|       1.0|
|(262144,[2,3,9,14...|  1.0|[-1133.6048473531...|[4.81895883807624...|       1.0|
|(262144,[67,94,10...|  1.0|[-1071.3197696635...|[3.32563981196525...|       1.0|
|(262144,[995,2266...|  0.0|[-533.00659254808...|[0.99999999998883...|       0.0|
|(262144,[1,5,26

### Support Vector Machine Algorithm

In [None]:
from pyspark.ml.classification import LinearSVC

lsvc = LinearSVC(maxIter=10, regParam=0.1, featuresCol='features', labelCol='label')

lsvcModel = lsvc.fit(train_data.select(col("features"), col("target").alias("label")))

# select example rows to display.
predictions = lsvcModel.transform(test_data.select(col("features"), col("target").alias("label")))
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")