# Pyspark Sentiment Analysis

## Text Blob

In [100]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Sentiment Analysis example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [101]:
df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').\
            load("df_first_10000.csv",header=True);

In [102]:
df1= df.select("review_body")

In [103]:
# Text Preprocessing
#remove non ASCII characters

In [104]:
from pyspark.sql.types import DoubleType, StringType, IntegerType
from pyspark.sql.functions import isnull, when, count, col
from pyspark.sql.functions import avg
from pyspark.sql.functions import col, countDistinct
import pandas as pd
import pyspark.sql as sparksql

In [105]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk import pos_tag
import string
import re

# remove non ASCII characters
def strip_non_ascii(data_str):
    ''' Returns the string without non ASCII characters'''
    stripped = (c for c in data_str if 0 < ord(c) < 127)
    return ''.join(stripped)
# setup pyspark udf function
strip_non_ascii_udf = udf(strip_non_ascii, StringType())

In [106]:
# fixed abbreviation
def fix_abbreviation(data_str):
    data_str = data_str.lower()
    data_str = re.sub(r'\bthats\b', 'that is', data_str)
    data_str = re.sub(r'\bive\b', 'i have', data_str)
    data_str = re.sub(r'\bim\b', 'i am', data_str)
    data_str = re.sub(r'\bya\b', 'yeah', data_str)
    data_str = re.sub(r'\bcant\b', 'can not', data_str)
    data_str = re.sub(r'\bdont\b', 'do not', data_str)
    data_str = re.sub(r'\bwont\b', 'will not', data_str)
    data_str = re.sub(r'\bid\b', 'i would', data_str)
    data_str = re.sub(r'wtf', 'what the fuck', data_str)
    data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
    data_str = re.sub(r'\br\b', 'are', data_str)
    data_str = re.sub(r'\bu\b', 'you', data_str)
    data_str = re.sub(r'\bk\b', 'OK', data_str)
    data_str = re.sub(r'\bsux\b', 'sucks', data_str)
    data_str = re.sub(r'\bno+\b', 'no', data_str)
    data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
    data_str = re.sub(r'rt\b', '', data_str)
    data_str = data_str.strip()
    return data_str

fix_abbreviation_udf = udf(fix_abbreviation, StringType())

In [107]:
#remove irrelevant features
def remove_features(data_str):
    # compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    # convert to lowercase
    data_str = data_str.lower()
    # remove hyperlinks
    data_str = url_re.sub(' ', data_str)
    # remove @mentions
    data_str = mention_re.sub(' ', data_str)
    # remove puncuation
    data_str = punc_re.sub(' ', data_str)
    # remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
    # remove non a-z 0-9 characters and words shorter than 1 characters
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = word
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = cleaned_str + ' ' + word
            else:
                cleaned_str += ' '
        list_pos += 1
    # remove unwanted space, *.split() will automatically split on
    # whitespace and discard duplicates, the " ".join() joins the
    # resulting list into one string.
    return " ".join(cleaned_str.split())
# setup pyspark udf function
remove_features_udf = udf(remove_features, StringType())

In [108]:
df1 = df1.withColumn('text_non_asci',strip_non_ascii_udf(df1['review_body']))
df1.show(5,True)

+--------------------+--------------------+
|         review_body|       text_non_asci|
+--------------------+--------------------+
|What a great stov...|What a great stov...|
|        worked great|        worked great|
|Part exactly what...|Part exactly what...|
|Love my refrigera...|Love my refrigera...|
|No more running t...|No more running t...|
+--------------------+--------------------+
only showing top 5 rows



In [109]:
df1 = df1.withColumn('fixed_abbrev',fix_abbreviation_udf(df1['text_non_asci']))
df1.show(5,True)

+--------------------+--------------------+--------------------+
|         review_body|       text_non_asci|        fixed_abbrev|
+--------------------+--------------------+--------------------+
|What a great stov...|What a great stov...|what a great stov...|
|        worked great|        worked great|        worked great|
|Part exactly what...|Part exactly what...|pa exactly what i...|
|Love my refrigera...|Love my refrigera...|love my refrigera...|
|No more running t...|No more running t...|no more running t...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [110]:
df1 = df1.withColumn('removed',remove_features_udf(df1['fixed_abbrev']))
df1.show(5,True)

+--------------------+--------------------+--------------------+--------------------+
|         review_body|       text_non_asci|        fixed_abbrev|             removed|
+--------------------+--------------------+--------------------+--------------------+
|What a great stov...|What a great stov...|what a great stov...|what great stove ...|
|        worked great|        worked great|        worked great|        worked great|
|Part exactly what...|Part exactly what...|pa exactly what i...|pa exactly what n...|
|Love my refrigera...|Love my refrigera...|love my refrigera...|love my refrigera...|
|No more running t...|No more running t...|no more running t...|no more running t...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [111]:
!pip install textblob

[33mYou are using pip version 10.0.1, however version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [112]:
from pyspark.sql.types import FloatType

from textblob import TextBlob

def sentiment_analysis(text):
    return TextBlob(text).sentiment.polarity

sentiment_analysis_udf = udf(sentiment_analysis , FloatType())

In [113]:
df1  = df1.withColumn("sentiment_score", sentiment_analysis_udf( df1['removed'] ))
df1.show(10,True)

+--------------------+--------------------+--------------------+--------------------+---------------+
|         review_body|       text_non_asci|        fixed_abbrev|             removed|sentiment_score|
+--------------------+--------------------+--------------------+--------------------+---------------+
|What a great stov...|What a great stov...|what a great stov...|what great stove ...|     0.73333335|
|        worked great|        worked great|        worked great|        worked great|            0.8|
|Part exactly what...|Part exactly what...|pa exactly what i...|pa exactly what n...|           0.25|
|Love my refrigera...|Love my refrigera...|love my refrigera...|love my refrigera...|          -0.05|
|No more running t...|No more running t...|no more running t...|no more running t...|          0.375|
|It would not cool...|It would not cool...|it would not cool...|it would not cool...|         -0.175|
|Works awesome for...|Works awesome for...|works awesome for...|works awesome for.

In [114]:
def condition(r):
    if (r >=0.1):
        label = "positive"
    elif(r <= -0.1):
        label = "negative"
    else:
        label = "neutral"
    return label

sentiment_udf = udf(lambda x: condition(x), StringType())

In [115]:
df1  = df1.withColumn("sentiment_type", sentiment_udf( df1['sentiment_score'] ))

In [116]:
df1.show()

+--------------------+--------------------+--------------------+--------------------+---------------+--------------+
|         review_body|       text_non_asci|        fixed_abbrev|             removed|sentiment_score|sentiment_type|
+--------------------+--------------------+--------------------+--------------------+---------------+--------------+
|What a great stov...|What a great stov...|what a great stov...|what great stove ...|     0.73333335|      positive|
|        worked great|        worked great|        worked great|        worked great|            0.8|      positive|
|Part exactly what...|Part exactly what...|pa exactly what i...|pa exactly what n...|           0.25|      positive|
|Love my refrigera...|Love my refrigera...|love my refrigera...|love my refrigera...|          -0.05|       neutral|
|No more running t...|No more running t...|no more running t...|no more running t...|          0.375|      positive|
|It would not cool...|It would not cool...|it would not cool...|

In [117]:
print("Total data:", df1.count())

Total data: 10000


In [118]:
df1 = df1.drop('review_body','text_non_asci','fixed_abbrev','removed')

In [119]:
df1 = df1.withColumn('sentiment_score', df1['sentiment_score'].cast(DoubleType()))

In [120]:
type(df1)

pyspark.sql.dataframe.DataFrame

In [121]:
df1.show(500)

+--------------------+--------------+
|     sentiment_score|sentiment_type|
+--------------------+--------------+
|  0.7333333492279053|      positive|
|   0.800000011920929|      positive|
|                0.25|      positive|
|-0.05000000074505806|       neutral|
|               0.375|      positive|
|-0.17499999701976776|      negative|
|  0.4166666567325592|      positive|
|                0.25|      positive|
|                 0.0|       neutral|
| 0.11249999701976776|      positive|
|0.004999999888241291|       neutral|
|  0.6666666865348816|      positive|
|               0.375|      positive|
|  0.3166666626930237|      positive|
|  0.3333333432674408|      positive|
|                 0.0|       neutral|
|                0.25|      positive|
| 0.47129252552986145|      positive|
| 0.10000000149011612|       neutral|
|  0.4000000059604645|      positive|
|  0.3708333373069763|      positive|
| -0.4000000059604645|      negative|
|   0.800000011920929|      positive|
|  0.6000000

In [122]:
df1.count()

10000

In [125]:
model_data = df1.dropna()

In [126]:
model_data.show()

+--------------------+--------------+
|     sentiment_score|sentiment_type|
+--------------------+--------------+
|  0.7333333492279053|      positive|
|   0.800000011920929|      positive|
|                0.25|      positive|
|-0.05000000074505806|       neutral|
|               0.375|      positive|
|-0.17499999701976776|      negative|
|  0.4166666567325592|      positive|
|                0.25|      positive|
|                 0.0|       neutral|
| 0.11249999701976776|      positive|
|0.004999999888241291|       neutral|
|  0.6666666865348816|      positive|
|               0.375|      positive|
|  0.3166666626930237|      positive|
|  0.3333333432674408|      positive|
|                 0.0|       neutral|
|                0.25|      positive|
| 0.47129252552986145|      positive|
| 0.10000000149011612|       neutral|
|  0.4000000059604645|      positive|
+--------------------+--------------+
only showing top 20 rows

