In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-eu.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz 
!tar xvf spark-2.4.3-bin-hadoop2.7.tgz

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

# Start a SparkSession
This will start a local Spark session.

In [0]:
!pip install -q findspark

In [0]:
import findspark
findspark.init()

In [0]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark import HiveContext,SQLContext

In [0]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]").set("spark.executor.memory", "10g"))

In [0]:
sqlContext = SQLContext(sc)

### Importing Required Packages

In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
import os
import sys

### Creating Spark and Hive Context

In [0]:
from pyspark import SparkContext
from pyspark import HiveContext

sqlContext = HiveContext(sc)

### Reading Dataset

In [0]:
review_df =sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true',inferschema='true')\
.load("reviews_data_cleaned.csv")

review_df = review_df.drop("date","listing_id","reviewer_id","reviewer_name","id","_c0")

In [15]:
review_df.show(4)

+--------------------+
|            comments|
+--------------------+
|         Great host |
|Nice room for the...|
|Very nice apt.  N...|
|Great place to st...|
+--------------------+
only showing top 4 rows



### Splitting Data into Train and Test Sets

In [0]:
splits = review_df.randomSplit([0.5, 0.5])
reviews_df = splits[0]
reviews_df_test = splits[1]

In [15]:
reviews_df.printSchema()

root
 |-- comments: string (nullable = true)



### Cleaning Data

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import regexp_replace, col

# Removing comments with only . (dots)
reviews_df = reviews_df.withColumn('comments',regexp_replace(reviews_df['comments'],'[\.,)]',' '))

# Dropping NA from Train and Test
reviews_df = reviews_df.dropna()
reviews_df_test = reviews_df_test.dropna()


# Function to Drop NAN and NULLS and Clean String
from pyspark.sql.functions import col, isnan, when, trim
def to_null(c):
    return when(~(col(c).isNull() | col(c).isin([" "]) |col(c).isin(["nan"]) | (trim(col(c)) == "")), col(c))


# Applying to_null Function
reviews_df=reviews_df.select([to_null(c).alias(c) for c in reviews_df.columns]).na.drop()
reviews_df_test = reviews_df_test.select([to_null(c).alias(c) for c in reviews_df_test.columns]).na.drop()

In [19]:
reviews_df.count()

552936

### Generating Text Score

In [0]:
from pyspark.sql.types import *
from textblob import TextBlob
from pyspark.sql.functions import udf

# Function to get sentiment score
def sentiment_analyzer(text):
    return TextBlob(text).sentiment.polarity

sentiment_analyzer_udf = udf(sentiment_analyzer, FloatType())


# Generating Score
df = reviews_df.withColumn("sentiment_score", sentiment_analyzer_udf(reviews_df["comments"]))
df_test = reviews_df_test.withColumn("sentiment_score", sentiment_analyzer_udf(reviews_df_test["comments"]))

In [21]:
df.show(10,True)

+------------------------------+---------------+
|                      comments|sentiment_score|
+------------------------------+---------------+
|          	Evelyn Badia's l...|      0.4577972|
|                   First of...|      0.2171875|
|                  My girlfr...|     0.14798659|
|                 I am a gra...|      0.3895009|
|       像这样和房东住在一套...|            0.0|
|                Santiago an...|      0.4224359|
|                       so cute|            0.5|
|               Allison is a...|          0.275|
|               Cynthia was ...|     0.24724358|
|               First off le...|     0.24210373|
+------------------------------+---------------+
only showing top 10 rows



In [0]:
# Categorizing Sentences

def sentiment(r): 
    if (r >= 0.1):
        label = 1
    elif(r <= 0):
        label = 0
    else:
        label = r
    return label

sentiments_udf = udf(sentiment, IntegerType())

### Handling String Data:

- Removing Non_Ascii
- Checking Blanks
- Fixing Abbreviations
- Removing Stop Words
- Lemmatizing Text

In [20]:
!pip install langid

Collecting langid
[?25l  Downloading https://files.pythonhosted.org/packages/ea/4c/0fb7d900d3b0b9c8703be316fbddffecdab23c64e1b46c7a83561d78bd43/langid-1.1.6.tar.gz (1.9MB)
[K     |████████████████████████████████| 1.9MB 3.4MB/s 
Building wheels for collected packages: langid
  Building wheel for langid (setup.py) ... [?25l[?25hdone
  Stored in directory: /root/.cache/pip/wheels/29/bc/61/50a93be85d1afe9436c3dc61f38da8ad7b637a38af4824e86e
Successfully built langid
Installing collected packages: langid
Successfully installed langid-1.1.6


In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, DoubleType, DateType
from nltk.stem.wordnet import WordNetLemmatizer 
from nltk.corpus import stopwords
from nltk import pos_tag
import langid
import string
import re

In [0]:
# Crating UDF To Strip Non_Ascii

def strip_non_ascii(data_str):
    ''' Returns the string without non ASCII characters''' 
    stripped = (c for c in data_str if 0 < ord(c) < 127) 
    return ''.join(stripped)

strip_non_ascii_udf = udf(strip_non_ascii, StringType())

df = df.withColumn('text_non_asci',strip_non_ascii_udf(df['comments']))
df_test = df_test.withColumn('text_non_asci',strip_non_ascii_udf(df_test['comments']))

In [23]:
df.show(5)

+--------------------+---------------+--------------------+
|            comments|sentiment_score|       text_non_asci|
+--------------------+---------------+--------------------+
|	Evelyn Badia's l...|      0.4577972|	Evelyn Badia's l...|
|        My girlfr...|     0.14798659|        My girlfr...|
|       I am a gra...|      0.3895009|       I am a gra...|
|       Robert  is...|     0.38421488|       Robert  is...|
|             so cute|            0.5|             so cute|
+--------------------+---------------+--------------------+
only showing top 5 rows



In [0]:
# Crating UDF To Check Blanks

def check_blanks(data_str):
    is_blank = str(data_str.isspace()) 
    return is_blank

In [25]:
# Crating UDF To Fix Abbrs

def fix_abbreviation(data_str):
    data_str = data_str.lower()
    data_str = re.sub(r'\bthats\b', 'that is', data_str) 
    data_str = re.sub(r'\bive\b', 'i have', data_str) 
    data_str = re.sub(r'\bim\b', 'i am', data_str)
    data_str = re.sub(r'\bya\b', 'yeah', data_str)
    data_str = re.sub(r'\bcant\b', 'can not', data_str) 
    data_str = re.sub(r'\bdont\b', 'do not', data_str) 
    data_str = re.sub(r'\bwont\b', 'will not', data_str) 
    data_str = re.sub(r'\bid\b', 'i would', data_str) 
    data_str = re.sub(r'wtf', 'what the fuck', data_str) 
    data_str = re.sub(r'\bwth\b', 'what the hell', data_str) 
    data_str = re.sub(r'\br\b', 'are', data_str)
    data_str = re.sub(r'\bu\b', 'you', data_str)
    data_str = re.sub(r'\bk\b', 'OK', data_str)
    data_str = re.sub(r'\bsux\b', 'sucks', data_str) 
    data_str = re.sub(r'\bno+\b', 'no', data_str)
    data_str = re.sub(r'\bcoo+\b', 'cool', data_str) 
    data_str = re.sub(r'rt\b', '', data_str)
    data_str = data_str.strip()
    return data_str


fix_abbr_udf = udf(fix_abbreviation, StringType()) 

df = df.withColumn('fix_abbr',fix_abbr_udf(df['text_non_asci']))
df_test = df_test.withColumn('fix_abbr',fix_abbr_udf(df_test['text_non_asci']))
df.show(5)

+--------------------+---------------+--------------------+--------------------+
|            comments|sentiment_score|       text_non_asci|            fix_abbr|
+--------------------+---------------+--------------------+--------------------+
|	Evelyn Badia's l...|      0.4577972|	Evelyn Badia's l...|evelyn badia's li...|
|        My girlfr...|     0.14798659|        My girlfr...|my girlfriend and...|
|       I am a gra...|      0.3895009|       I am a gra...|i am a graduate s...|
|       Robert  is...|     0.38421488|       Robert  is...|robe  is really s...|
|             so cute|            0.5|             so cute|             so cute|
+--------------------+---------------+--------------------+--------------------+
only showing top 5 rows



In [26]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [27]:
# Crating UDF To Remove Stop Words

stops = set(stopwords.words("english"))
def remove_stops(data_str,stops=stops):
    # expects a string

    list_pos = 0
    cleaned_str = ''
    #data_str = data_str.strip()
    text = data_str.split()
    for word in text:
        if word not in stops:
            if list_pos == 0: 
                cleaned_str = word
            else:
                cleaned_str = cleaned_str + ' ' + word
        list_pos += 1 
    return cleaned_str

remove_stops_udf = udf(remove_stops, StringType())


df = df.withColumn('stop_text',remove_stops_udf(df['fix_abbr']))
df_test = df_test.withColumn('stop_text',remove_stops_udf(df_test['fix_abbr']))

df.show(5,True)

+--------------------+---------------+--------------------+--------------------+--------------------+
|            comments|sentiment_score|       text_non_asci|            fix_abbr|           stop_text|
+--------------------+---------------+--------------------+--------------------+--------------------+
|	Evelyn Badia's l...|      0.4577972|	Evelyn Badia's l...|evelyn badia's li...|evelyn badia's li...|
|        My girlfr...|     0.14798659|        My girlfr...|my girlfriend and...| girlfriend plann...|
|       I am a gra...|      0.3895009|       I am a gra...|i am a graduate s...| graduate student...|
|       Robert  is...|     0.38421488|       Robert  is...|robe  is really s...|robe really super...|
|             so cute|            0.5|             so cute|             so cute|                cute|
+--------------------+---------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [28]:
# Crating UDF To Remove Features

def remove_features(data_str): # compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?') 
    punc_re = re.compile('[%s]' % re.escape(string.punctuation)) 
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    # convert to lowercase
    data_str = data_str.lower()
    # remove hyperlinks
    data_str = url_re.sub(' ', data_str)
    # remove @mentions
    data_str = mention_re.sub(' ', data_str)
    # remove puncuation
    data_str = punc_re.sub(' ', data_str)
    # remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
    # remove non a-z 0-9 characters and words shorter than 1 characters 
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = word 
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = cleaned_str + ' ' + word 
            else:
                cleaned_str += ' '
        list_pos += 1
    # remove unwanted space, *.split() will automatically split on 
    # whitespace and discard duplicates, the " ".join() joins the 
    # resulting list into one string.
    return " ".join(cleaned_str.split())
    # setup pyspark udf function
    
remove_features_udf = udf(remove_features, StringType())


df = df.withColumn('removed',remove_features_udf(df['stop_text']))
df_test = df_test.withColumn('removed',remove_features_udf(df_test['stop_text']))

df.show(5,True)

+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+
|            comments|sentiment_score|       text_non_asci|            fix_abbr|           stop_text|             removed|
+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+
|	Evelyn Badia's l...|      0.4577972|	Evelyn Badia's l...|evelyn badia's li...|evelyn badia's li...|evelyn badia livi...|
|        My girlfr...|     0.14798659|        My girlfr...|my girlfriend and...| girlfriend plann...|girlfriend planne...|
|       I am a gra...|      0.3895009|       I am a gra...|i am a graduate s...| graduate student...|graduate student ...|
|       Robert  is...|     0.38421488|       Robert  is...|robe  is really s...|robe really super...|robe really super...|
|             so cute|            0.5|             so cute|             so cute|                cute|                cute|
+---------------

In [29]:
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.


True

In [30]:
# lemmatization
def lemmatize(data_str):
    # expects a string
    list_pos = 0
    cleaned_str = ''
    lmtzr = WordNetLemmatizer() 
    text = data_str.split() 
    tagged_words = pos_tag(text) 
    for word in tagged_words:
        if 'v' in word[1].lower():
            lemma = lmtzr.lemmatize(word[0], pos='v')
        else:
            lemma = lmtzr.lemmatize(word[0], pos='n')
        if list_pos == 0: 
            cleaned_str = lemma
        else:
            cleaned_str = cleaned_str + ' ' + lemma
        list_pos += 1 
    return cleaned_str

lemmatize_udf = udf(lemmatize, StringType())


df = df.withColumn('lemmatized_text',lemmatize_udf(df['removed']))
df_test = df_test.withColumn('lemmatized_text',lemmatize_udf(df_test['removed']))


df.select("lemmatized_text").show(5)

+--------------------+
|     lemmatized_text|
+--------------------+
|evelyn badia livi...|
|girlfriend plan v...|
|graduate student ...|
|robe really super...|
|                cute|
+--------------------+
only showing top 5 rows



In [0]:
# Taking Only Sentiment Score and Lemmatized Text 

lemmat_text = df.select("sentiment_score","lemmatized_text")
lemmat_text_test = df_test.select("sentiment_score","lemmatized_text")

### Model Building

- Tokenizing
- Hashing
- IDF Matrix
- Naive Bayes
- Decision Tree

In [0]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier 
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer 
from pyspark.ml.feature import CountVectorizer

#### Tokenizing

In [0]:
tokenizer = Tokenizer(inputCol="lemmatized_text", outputCol="words")
tokenized = tokenizer.transform(lemmat_text)
tokenized_test = tokenizer.transform(lemmat_text_test)

In [34]:
tokenized_test.show(2)

+---------------+--------------------+--------------------+
|sentiment_score|     lemmatized_text|               words|
+---------------+--------------------+--------------------+
|     0.38333333|ouii great hostsh...|[ouii, great, hos...|
|      0.2171875|first all angelo ...|[first, all, ange...|
+---------------+--------------------+--------------------+
only showing top 2 rows



#### Hashing

In [0]:
# Hashing the tokenized document
hashingTF = HashingTF (inputCol="words", outputCol="rawFeatures")
hashtf = hashingTF.transform(tokenized)
hashtf_test = hashingTF.transform(tokenized_test)

In [51]:
hashtf.show(2)

+---------------+--------------------+--------------------+--------------------+
|sentiment_score|     lemmatized_text|               words|         rawFeatures|
+---------------+--------------------+--------------------+--------------------+
|      0.4577972|evelyn badia livi...|[evelyn, badia, l...|(262144,[6183,736...|
|      0.2171875|first angelo grea...|[first, angelo, g...|(262144,[14,3067,...|
+---------------+--------------------+--------------------+--------------------+
only showing top 2 rows



#### TF-IDF

In [0]:
## Creating TF-IDF Matrix
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=3)
idf = idf.fit(hashtf)
tfidf = idf.transform(hashtf)
tfidf_test = idf.transform(hashtf_test)

In [0]:
tfidf.count()

552909

In [53]:
tfidf.show(5)

+---------------+--------------------+--------------------+--------------------+--------------------+
|sentiment_score|     lemmatized_text|               words|         rawFeatures|            features|
+---------------+--------------------+--------------------+--------------------+--------------------+
|      0.4577972|evelyn badia livi...|[evelyn, badia, l...|(262144,[6183,736...|(262144,[6183,736...|
|      0.2171875|first angelo grea...|[first, angelo, g...|(262144,[14,3067,...|(262144,[14,3067,...|
|     0.14798659|girlfriend plan v...|[girlfriend, plan...|(262144,[14,991,1...|(262144,[14,991,1...|
|      0.3895009|graduate student ...|[graduate, studen...|(262144,[11018,13...|(262144,[11018,13...|
|            0.0|carlos carlos car...|[carlos, carlos, ...|(262144,[251381],...|(262144,[251381],...|
+---------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [0]:
# Selecting reuqired columns

tfidf_clean = tfidf.withColumn("label",sentiments_udf(tfidf["sentiment_score"]))
tfidf_clean_test = tfidf_test.withColumn("label",sentiments_udf(tfidf_test["sentiment_score"]))

tfidf_clean= tfidf_clean.select("features","label")
tfidf_clean_test= tfidf_clean_test.select("features","label")

In [55]:
tfidf_clean.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(262144,[6183,736...|    1|
|(262144,[14,3067,...|    1|
|(262144,[14,991,1...|    1|
+--------------------+-----+
only showing top 3 rows



In [38]:
tfidf_clean=tfidf_clean.filter(col("label").isin([1,0]))
tfidf_clean_test=tfidf_clean_test.filter(col("label").isin([1,0]))

tfidf_clean.show(2)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(262144,[6183,736...|    1|
|(262144,[14,991,1...|    1|
+--------------------+-----+
only showing top 2 rows



### Fitting Naive Bayes Model

In [0]:
# Define Classifier
nb = NaiveBayes(modelType="multinomial")

# Fittinf on Train Data
nb_fit=nb.fit(tfidf_clean)

# Predictions on Train 
predictions = nb_fit.transform(tfidf_clean)


In [0]:
# Evaluating on Train Set

from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction") 
accuracy_nb_train = evaluator.evaluate(predictions)
accuracy_nb_train

0.9484898504777135

In [0]:
# Prediction on test
predictions_test = nb_fit.transform(tfidf_clean_test)

# Accuracy Evaluation test
accuracy_nb_test = evaluator.evaluate(predictions_test)
accuracy_nb_test

0.9471685247710722

### Fitting Decision Tree

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [0]:
dt_fit=dt.fit(tfidf_clean)

In [0]:
predictions_dt_train = dt_fit.transform(tfidf_clean)

In [0]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction") 
evaluator.evaluate(predictions_dt_train)

0.9564995456968696

In [0]:
predictions_dt_test = dt_fit.transform(tfidf_clean_test)
accuracy_dt_test = evaluator.evaluate(predictions_dt_test)
accuracy_dt_test

0.9515064939642668