## Sentiment Analysis and Text Classification

## Dataset: Amazon Fine Food Reviews

## Steps
### 1. Importing Libraries
### 2. Creating SparkContext Object
 
### 3. Preparing Data
#### a. Understanding Data and removing unwanted columns
####  b. Filtering neutral reviews
####  c. Assigning Positive and Negative Sentiment to Reviews based on Score
####  d. Assigning Binary Rating as Target Variable 1: Positive 0: Negative
 
### 4. Text Pre-processing
#### a. Create UDF Functions for text processing: Convert to lower case, Remove Punctuations and alphanumeric words, Remove Stop words
#### b. POS tagging
#### c. Text Lemmatization

### 5. Preparing Data For Modelling
#### a. Creating Final Dataset with apt columns
#### b. Dividing it into Training and Test Set
#### c. Tokenizing the Training set anc creating TF-IDF matrix using HashingTF


### 6. Modelling the Data and Evaluating the Model
#### a. Logistic Regression Model
#### b. Naive Bayes Model







#### Importing findSPark to run Pyspark in Jupyter notebook

In [1]:
import findspark
findspark.init()

## 1.Importing Libraries

In [2]:
import pyspark

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

import re
import string
import nltk
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk import pos_tag

In [8]:
#!pip install vaderSentiment
#Importing Library and setting environment path
import os
import sys
#set the path 

%matplotlib inline
import pandas as pd
import numpy as np
import nltk
import string
import matplotlib.pyplot as plt
import numpy as np
import string

from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.cross_validation import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn import metrics
from sklearn.metrics import roc_curve, auc
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from nltk import word_tokenize   

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import SQLContext

from pyspark import HiveContext

from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import LogisticRegression, NaiveBayes, GBTClassifier
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel,LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from vaderSentiment import vaderSentiment
from pyspark.ml.feature import NGram
import matplotlib
matplotlib.style.use('ggplot')

### 2. Creating SparkContext Object

In [4]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
sqlContext=HiveContext(sc)

### 3. Preparing Data

In [None]:
#### a. Understanding Data and removing unwanted columns

In [5]:
df1 =sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true',inferschema='true')\
.load(r"C:\Users\singh\OneDrive\Documents\Spring-19\Big Data\Project\Food Review\Food_Sentiment.csv")

In [9]:
df1.count()

568454

In [10]:
df=df1

####  b. Assigning Positive and Negative Sentiment to Reviews based on Score

In [11]:
def condition(r):
    if (r <3):
        label="negative"
    elif(r>3):
        label="positive"
    else:
        label="neutral"
    return label
sentiment_udf = udf(lambda x: condition(x), StringType())

df = df.withColumn('Sentiment',sentiment_udf(df['Score']))
df.show(4,True)

+-----+--------------------+---------+
|Score|                Text|Sentiment|
+-----+--------------------+---------+
|    5|I have bought sev...| positive|
|    1|"Product arrived ...| negative|
|    4|"This is a confec...| positive|
|    2|If you are lookin...| negative|
+-----+--------------------+---------+
only showing top 4 rows



####  c. Filtering neutral reviews

In [12]:
import pyspark.sql.functions as f

df=df.filter((f.col('Score')!=3))
df.count()

525814

####  d. Assigning Binary Rating as Target Variable 1: Positive 0: Negative

In [13]:
def toBinary(score):
    if score >= 3: return 1
    else: return 0
udfScoretoBinary=udf(toBinary, StringType())

df = df.withColumn("Target", udfScoretoBinary("Score"))
df.show(4)


+-----+--------------------+---------+------+
|Score|                Text|Sentiment|Target|
+-----+--------------------+---------+------+
|    5|I have bought sev...| positive|     1|
|    1|"Product arrived ...| negative|     0|
|    4|"This is a confec...| positive|     1|
|    2|If you are lookin...| negative|     0|
+-----+--------------------+---------+------+
only showing top 4 rows



### 4. Text Pre-processing

#### a. Create UDF Functions for text processing: Convert to lower case, Remove Punctuations and alphanumeric words, Remove Stop words
#### b. POS tagging
#### c. Text Lemmatization

In [14]:
#TEXT Pre-processing

##COnvert to lower
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def lower(text):
    return text.lower()

lower_udf =udf(lower,StringType())


##Remove nonAscii
def strip_non_ascii(data_str):
#''' Returns the string without non ASCII characters'''
    stripped = (c for c in data_str if 0 < ord(c) < 127)
    return ''.join(stripped)
# setup pyspark udf function
strip_non_ascii_udf = udf(strip_non_ascii, StringType())

##FIx abbreviations
def fix_abbreviation(data_str):
    data_str = data_str.lower()
    data_str = re.sub(r'\bthats\b', 'that is', data_str)
    data_str = re.sub(r'\bive\b', 'i have', data_str)
    data_str = re.sub(r'\bim\b', 'i am', data_str)
    data_str = re.sub(r'\bya\b', 'yeah', data_str)
    data_str = re.sub(r'\bcant\b', 'can not', data_str)
    data_str = re.sub(r'\bdont\b', 'do not', data_str)
    data_str = re.sub(r'\bwont\b', 'will not', data_str)
    data_str = re.sub(r'\bid\b', 'i would', data_str)
    data_str = re.sub(r'wtf', 'what the fuck', data_str)
    data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
    data_str = re.sub(r'\br\b', 'are', data_str)
    data_str = re.sub(r'\bu\b', 'you', data_str)
    data_str = re.sub(r'\bk\b', 'OK', data_str)
    data_str = re.sub(r'\bsux\b', 'sucks', data_str)
    data_str = re.sub(r'\bno+\b', 'no', data_str)
    data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
    data_str = re.sub(r'rt\b', '', data_str)
    data_str = data_str.strip()
    return data_str


##Remove punctuations mentions and alphanumeric characters
def remove_features(data_str):
# compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
# convert to lowercase
    data_str = data_str.lower()
# remove hyperlinks
    data_str = url_re.sub(' ', data_str)
# remove @mentions
    data_str = mention_re.sub(' ', data_str)
# remove puncuation
    data_str = punc_re.sub(' ', data_str)
# remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
# remove non a-z 0-9 characters and words shorter than 1 characters
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word):
                cleaned_str = word
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word):
                cleaned_str = cleaned_str + ' ' + word
            else:
                cleaned_str += ' '
        list_pos += 1
# remove unwanted space, *.split() will automatically split on
# whitespace and discard duplicates, the " ".join() joins the
# resulting list into one string.
    return " ".join(cleaned_str.split())
# setup pyspark udf function



##Remove stop words
def remove_stops(data_str):
# expects a string
    stops = set(stopwords.words("english"))
    list_pos = 0
    cleaned_str = ''
    text = data_str.split()
    for word in text:
        if word not in stops:
# rebuild cleaned_str
            if list_pos == 0:
                cleaned_str = word
            else:
                cleaned_str = cleaned_str + ' ' + word
            list_pos += 1
    return cleaned_str


# Part-of-Speech Tagging
def tag_and_remove(data_str):
    cleaned_str = ' '
# noun tags
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
# adjectives
    jj_tags = ['JJ', 'JJR', 'JJS']
# verbs
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
    nltk_tags = nn_tags + jj_tags + vb_tags
# break string into 'words'
    text = data_str.split()
# tag the text and keep only those with the right tags
    tagged_text = pos_tag(text)
    for tagged_word in tagged_text:
        if tagged_word[1] in nltk_tags:
            cleaned_str += tagged_word[0] + ' '
    return cleaned_str


##Lemmatization
def lemmatize(data_str):
# expects a string
    list_pos = 0
    cleaned_str = ''
    lmtzr = WordNetLemmatizer()
    text = data_str.split()
    tagged_words = pos_tag(text)
    for word in tagged_words:
        if 'v' in word[1].lower():
            lemma = lmtzr.lemmatize(word[0], pos='v')
        else:
            lemma = lmtzr.lemmatize(word[0], pos='n')
        if list_pos == 0:
            cleaned_str = lemma
        else:
            cleaned_str = cleaned_str + ' ' + lemma
        list_pos += 1
    return cleaned_str



In [15]:
lower_udf =udf(lower,StringType())
strip_non_ascii_udf = udf(strip_non_ascii, StringType())
fix_abbreviation_udf = udf(fix_abbreviation, StringType())
remove_features_udf = udf(remove_features, StringType())
remove_stops_udf = udf(remove_stops, StringType())
tag_and_remove_udf = udf(tag_and_remove, StringType())
lemmatize_udf = udf(lemmatize, StringType())

In [16]:
df = df.withColumn("lower_text",lower_udf(df["Text"]))
df = df.withColumn("text_non_asci",fix_abbreviation_udf(df["lower_text"]))
df = df.withColumn("fixed_abbrev",fix_abbreviation_udf(df["text_non_asci"]))
df = df.withColumn('removed_features',remove_features_udf(df['fixed_abbrev']))
#df.show(5,True)

In [17]:
#df = df.withColumn('lemmatize_udf',remove_features_udf(df['tag_and_remove_udf']))


In [18]:
df_no_stop_words = df.withColumn("removed_stops", remove_stops_udf("removed_features")).select('Text','removed_stops','Target')
df_no_stop_words.show(5)


+--------------------+--------------------+------+
|                Text|       removed_stops|Target|
+--------------------+--------------------+------+
|I have bought sev...|bought several vi...|     1|
|"Product arrived ...|product arrived l...|     0|
|"This is a confec...|confection around...|     1|
|If you are lookin...|looking secret in...|     0|
|Great taffy at a ...|great taffy great...|     1|
+--------------------+--------------------+------+
only showing top 5 rows



In [19]:
df_pos_tagging=df_no_stop_words.withColumn("tag_and_remove_pos", tag_and_remove_udf("removed_stops")).select('Text','tag_and_remove_pos','Target')

In [20]:

#Tokenizing the document
tokenizer = Tokenizer(inputCol="tag_and_remove_pos", outputCol="words")
wordsDataFrame = tokenizer.transform(df_pos_tagging)
for words_label in wordsDataFrame.select("words", "Target").take(3):
    print(words_label)

df_text = df.withColumn("text_lower",lower_udf(df["Text"])).select('text_lower','Target')

Row(words=['', 'bought', 'several', 'vitality', 'canned', 'dog', 'food', 'products', 'found', 'good', 'quality', 'product', 'looks', 'stew', 'processed', 'meat', 'smells', 'labrador', 'finicky', 'appreciates', 'product'], Target='1')
Row(words=['', 'product', 'arrived', 'labeled', 'jumbo', 'salted', 'peanuts', 'peanuts', 'small', 'sized', 'unsalted', 'sure', 'error', 'vendor', 'intended', 'represent', 'product', 'jumbo'], Target='0')
Row(words=['', 'confection', 'centuries', 'light', 'pillowy', 'citrus', 'gelatin', 'nuts', 'case', 'filberts', 'cut', 'tiny', 'squares', 'coated', 'powdered', 'sugar', 'tiny', 'mouthful', 'heaven', 'chewy', 'flavorful', 'recommend', 'yummy', 'treat', 'familiar', 'story', 'c', 'lion'], Target='1')


In [21]:
remover = StopWordsRemover(inputCol="words", outputCol="words_filtered")
wordsDataFrame1 = remover.transform(wordsDataFrame).select("Target","words_filtered")
wordsDataFrame1.show(2)

+------+--------------------+
|Target|      words_filtered|
+------+--------------------+
|     1|[, bought, severa...|
|     0|[, product, arriv...|
+------+--------------------+
only showing top 2 rows



In [22]:
df_text_lemma=df_pos_tagging.withColumn("lemmatized_text",lemmatize_udf("tag_and_remove_pos")).select('Text','lemmatized_text','Target')
df_text_lemma.show(5)

+--------------------+--------------------+------+
|                Text|     lemmatized_text|Target|
+--------------------+--------------------+------+
|I have bought sev...|buy several vital...|     1|
|"Product arrived ...|product arrive la...|     0|
|"This is a confec...|confection centur...|     1|
|If you are lookin...|look secret ingre...|     0|
|Great taffy at a ...|great taffy great...|     1|
+--------------------+--------------------+------+
only showing top 5 rows



In [23]:

from pyspark.sql.functions import monotonically_increasing_id
# Create Unique ID
df_text_lemma = df_text_lemma.withColumn("uid", monotonically_increasing_id())
df_text_lemma.show(4)


+--------------------+--------------------+------+---+
|                Text|     lemmatized_text|Target|uid|
+--------------------+--------------------+------+---+
|I have bought sev...|buy several vital...|     1|  0|
|"Product arrived ...|product arrive la...|     0|  1|
|"This is a confec...|confection centur...|     1|  2|
|If you are lookin...|look secret ingre...|     0|  3|
+--------------------+--------------------+------+---+
only showing top 4 rows



### 5. Preparing Data For Modelling

#### a. Creating Final Dataset with apt columns

In [24]:
data = df_text_lemma.select('uid', 'lemmatized_text','Target')
#data=wordsDataFrame2
data.show(4)

+---+--------------------+------+
|uid|     lemmatized_text|Target|
+---+--------------------+------+
|  0|buy several vital...|     1|
|  1|product arrive la...|     0|
|  2|confection centur...|     1|
|  3|look secret ingre...|     0|
+---+--------------------+------+
only showing top 4 rows



In [25]:
#data_pd=data.toPandas()

In [27]:
#data_pd.to_csv("Sentiment_csv.csv")

In [25]:
data

DataFrame[uid: bigint, lemmatized_text: string, Target: string]

#### b. Dividing it into Training and Test Set

In [26]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [27]:
# Caching the RDD for training
trainingData
#Renaming features for modeling
training = trainingData.selectExpr("lemmatized_text as text", "Target as label")
training = training.withColumn("label", training["label"].cast(DoubleType()))

In [28]:
# Caching the RDD for test
testData
#Renaming features for modeling
test = testData.selectExpr("lemmatized_text as text", "Target as label")
test = test.withColumn("label", test["label"].cast(DoubleType()))

In [62]:
#training.take(2)

#### c. Tokenizing the Training set anc creating TF-IDF matrix using HashingTF


In [37]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="hashing")
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")


### 6. Modelling and Evaluating

#### a. Logistic Regression Model

In [None]:
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, lr])
# Training the model
model = pipeline.fit(training)

In [30]:
#Predicing Output
prediction = model.transform(test)

In [33]:
prediction.select("label", "prediction").show(10,False)


+-----+----------+
|label|prediction|
+-----+----------+
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |1.0       |
+-----+----------+
only showing top 10 rows



In [35]:
prediction.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: double (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- hashing: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [40]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(prediction)


0.914613708621185

#### b. Naive Bayes Model

In [41]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
pipeline_nb = Pipeline(stages=[tokenizer, hashingTF, idf, nb])
# Training the model
model_nb = pipeline_nb.fit(training)

In [42]:
#Predicing Output
prediction_nb = model_nb.transform(test)

In [44]:
prediction_nb.select("label", "prediction").show(10,False)

+-----+----------+
|label|prediction|
+-----+----------+
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |0.0       |
|1.0  |0.0       |
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |0.0       |
|1.0  |1.0       |
|1.0  |1.0       |
|1.0  |1.0       |
+-----+----------+
only showing top 10 rows



In [43]:
prediction_nb.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: double (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- hashing: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [46]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator_nb = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_nb.evaluate(prediction_nb)

0.8780527561465762