## Create spark session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [2]:
df = spark.createDataFrame(
    [(1,'I really liked this movie'),
     (2,'I would recommend this movie to my friends'),
     (3,'movie was alright but acting was horrible'),
     (4,'I am never watching that movie ever again')],
    ['user_id','review'])
df.show(5,False)

+-------+------------------------------------------+
|user_id|review                                    |
+-------+------------------------------------------+
|1      |I really liked this movie                 |
|2      |I would recommend this movie to my friends|
|3      |movie was alright but acting was horrible |
|4      |I am never watching that movie ever again |
+-------+------------------------------------------+



## Tokenization

In [3]:
from pyspark.ml.feature import Tokenizer
tokenization = Tokenizer(inputCol='review',outputCol='tokens')
tokenized_df = tokenization.transform(df)
tokenized_df.show(4,False)

+-------+------------------------------------------+---------------------------------------------------+
|user_id|review                                    |tokens                                             |
+-------+------------------------------------------+---------------------------------------------------+
|1      |I really liked this movie                 |[i, really, liked, this, movie]                    |
|2      |I would recommend this movie to my friends|[i, would, recommend, this, movie, to, my, friends]|
|3      |movie was alright but acting was horrible |[movie, was, alright, but, acting, was, horrible]  |
|4      |I am never watching that movie ever again |[i, am, never, watching, that, movie, ever, again] |
+-------+------------------------------------------+---------------------------------------------------+



## stopwords removal 

In [4]:
from pyspark.ml.feature import StopWordsRemover
stopword_removal = StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_df = stopword_removal.transform(tokenized_df)
refined_df.select(['user_id','tokens','refined_tokens']).show(10,False)

+-------+---------------------------------------------------+----------------------------------+
|user_id|tokens                                             |refined_tokens                    |
+-------+---------------------------------------------------+----------------------------------+
|1      |[i, really, liked, this, movie]                    |[really, liked, movie]            |
|2      |[i, would, recommend, this, movie, to, my, friends]|[recommend, movie, friends]       |
|3      |[movie, was, alright, but, acting, was, horrible]  |[movie, alright, acting, horrible]|
|4      |[i, am, never, watching, that, movie, ever, again] |[never, watching, movie, ever]    |
+-------+---------------------------------------------------+----------------------------------+



## Count Vectorizer

In [5]:
from pyspark.ml.feature import CountVectorizer
count_vec = CountVectorizer(inputCol='refined_tokens',outputCol='features')
cv_df = count_vec.fit(refined_df).transform(refined_df)
cv_df.select(['user_id','refined_tokens','features']).show(4,False)

+-------+----------------------------------+--------------------------------+
|user_id|refined_tokens                    |features                        |
+-------+----------------------------------+--------------------------------+
|1      |[really, liked, movie]            |(11,[0,4,7],[1.0,1.0,1.0])      |
|2      |[recommend, movie, friends]       |(11,[0,1,10],[1.0,1.0,1.0])     |
|3      |[movie, alright, acting, horrible]|(11,[0,5,6,9],[1.0,1.0,1.0,1.0])|
|4      |[never, watching, movie, ever]    |(11,[0,2,3,8],[1.0,1.0,1.0,1.0])|
+-------+----------------------------------+--------------------------------+



In [6]:
count_vec.fit(refined_df).vocabulary

['movie',
 'horrible',
 'liked',
 'really',
 'watching',
 'alright',
 'friends',
 'recommend',
 'ever',
 'never',
 'acting']

## Tf-idf

In [7]:
from pyspark.ml.feature import HashingTF,IDF
hashing_vec = HashingTF(inputCol='refined_tokens',outputCol='tf_features')
hashing_df = hashing_vec.transform(refined_df)
hashing_df.select(['user_id','refined_tokens','tf_features']).show(4,False)

+-------+----------------------------------+-------------------------------------------------------+
|user_id|refined_tokens                    |tf_features                                            |
+-------+----------------------------------+-------------------------------------------------------+
|1      |[really, liked, movie]            |(262144,[14,32675,155321],[1.0,1.0,1.0])               |
|2      |[recommend, movie, friends]       |(262144,[129613,155321,222394],[1.0,1.0,1.0])          |
|3      |[movie, alright, acting, horrible]|(262144,[80824,155321,236263,240286],[1.0,1.0,1.0,1.0])|
|4      |[never, watching, movie, ever]    |(262144,[63139,155321,203802,245806],[1.0,1.0,1.0,1.0])|
+-------+----------------------------------+-------------------------------------------------------+



In [8]:
tf_idf_vec = IDF(inputCol='tf_features',outputCol='tf_idf_features')
tf_idf_df=tf_idf_vec.fit(hashing_df).transform(hashing_df)
tf_idf_df.select(['user_id','tf_idf_features']).show(4,False)

+-------+----------------------------------------------------------------------------------------------------+
|user_id|tf_idf_features                                                                                     |
+-------+----------------------------------------------------------------------------------------------------+
|1      |(262144,[14,32675,155321],[0.9162907318741551,0.9162907318741551,0.0])                              |
|2      |(262144,[129613,155321,222394],[0.9162907318741551,0.0,0.9162907318741551])                         |
|3      |(262144,[80824,155321,236263,240286],[0.9162907318741551,0.0,0.9162907318741551,0.9162907318741551])|
|4      |(262144,[63139,155321,203802,245806],[0.9162907318741551,0.0,0.9162907318741551,0.9162907318741551])|
+-------+----------------------------------------------------------------------------------------------------+



## Classification 

In [9]:
text_df = spark.read.csv('data/Movie_reviews.csv', inferSchema=True, header=True, sep=',')
text_df.printSchema()

root
 |-- Review: string (nullable = true)
 |-- Sentiment: string (nullable = true)



In [10]:
text_df.count()

7087

In [11]:
from pyspark.sql.functions import rand 
text_df.orderBy(rand()).show(10,False)

+------------------------------------------------------------------------+---------+
|Review                                                                  |Sentiment|
+------------------------------------------------------------------------+---------+
|I think I hate Harry Potter because it outshines much better reading mat|0        |
|So as felicia's mom is cleaning the table, felicia grabs my keys and we |1        |
|He's like,'YEAH I GOT ACNE AND I LOVE BROKEBACK MOUNTAIN '..            |1        |
|This quiz sucks and Harry Potter sucks ok bye..                         |0        |
|The Da Vinci Code is awesome..                                          |1        |
|the people who are worth it know how much i love the da vinci code.     |1        |
|I am going to start reading the Harry Potter series again because that i|1        |
|I wanted desperately to love'The Da Vinci Code as a film.               |1        |
|I don't agree with homophobes, but they have every right to say 

In [12]:
text_df = text_df.filter(((text_df.Sentiment =='1') | (text_df.Sentiment =='0')))
text_df.count()

6990

In [13]:
text_df.groupBy('Sentiment').count().show()

+---------+-----+
|Sentiment|count|
+---------+-----+
|        0| 3081|
|        1| 3909|
+---------+-----+



In [14]:
text_df = text_df.withColumn("Label", text_df.Sentiment.cast('float')).drop('Sentiment')
text_df.orderBy(rand()).show(10,False)

+------------------------------------------------------------------------+-----+
|Review                                                                  |Label|
+------------------------------------------------------------------------+-----+
|Da Vinci Code sucks be...                                               |0.0  |
|Harry Potter is AWESOME I don't care if anyone says differently!..      |1.0  |
|we're gonna like watch Mission Impossible or Hoot.(                     |1.0  |
|i love being a sentry for mission impossible and a station for bonkers. |1.0  |
|i love being a sentry for mission impossible and a station for bonkers. |1.0  |
|MISSION IMPOSSIBLE 3 SUCKS!!..                                          |0.0  |
|I want to be here because I love Harry Potter, and I really want a place|1.0  |
|My dad's being stupid about brokeback mountain...                       |0.0  |
|I like Mission Impossible movies because you never know who's on the rig|1.0  |
|I, too, like Harry Potter..

In [15]:
from pyspark.sql.functions import length
text_df = text_df.withColumn('length',length(text_df['Review']))
text_df.orderBy(rand()).show(10,False)

+------------------------------------------------------------------------+-----+------+
|Review                                                                  |Label|length|
+------------------------------------------------------------------------+-----+------+
|The Da Vinci Code is awesome!!                                          |1.0  |30    |
|we're gonna like watch Mission Impossible or Hoot.(                     |1.0  |51    |
|I HATE Harry Potter.                                                    |0.0  |20    |
|Brokeback Mountain was boring.                                          |0.0  |30    |
|, she helped me bobbypin my insanely cool hat to my head, and she laughe|0.0  |72    |
|I hate Harry Potter, that daniel wotshisface needs a fucking slap...    |0.0  |68    |
|Da Vinci Code sucks.                                                    |0.0  |20    |
|And I like Harry Potter.                                                |1.0  |24    |
|The Da Vinci Code is awesome!! 

In [16]:
text_df.groupBy('Label').agg({'Length':'mean'}).show()

+-----+-----------------+
|Label|      avg(Length)|
+-----+-----------------+
|  1.0|47.61882834484523|
|  0.0|50.95845504706264|
+-----+-----------------+



In [17]:
tokenization = Tokenizer(inputCol='Review',outputCol='tokens')
tokenized_df = tokenization.transform(text_df)
tokenized_df.show()

+--------------------+-----+------+--------------------+
|              Review|Label|length|              tokens|
+--------------------+-----+------+--------------------+
|The Da Vinci Code...|  1.0|    39|[the, da, vinci, ...|
|this was the firs...|  1.0|    72|[this, was, the, ...|
|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|
|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|
|I liked the Da Vi...|  1.0|    72|[i, liked, the, d...|
|that's not even a...|  1.0|    72|[that's, not, eve...|
|I loved the Da Vi...|  1.0|    72|[i, loved, the, d...|
|i thought da vinc...|  1.0|    57|[i, thought, da, ...|
|The Da Vinci Code...|  1.0|    45|[the, da, vinci, ...|
|I thought the Da ...|  1.0|    51|[i, thought, the,...|
|The Da Vinci Code...|  1.0|    68|[the, da, vinci, ...|
|The Da Vinci Code...|  1.0|    62|[the, da, vinci, ...|
|then I turn on th...|  1.0|    66|[then, i, turn, o...|
|The Da Vinci Code...|  1.0|    34|[the, da, vinci, ...|
|i love da vinci c...|  1.0|   

In [18]:
stopword_removal = StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_text_df = stopword_removal.transform(tokenized_df)
refined_text_df.show()

+--------------------+-----+------+--------------------+--------------------+
|              Review|Label|length|              tokens|      refined_tokens|
+--------------------+-----+------+--------------------+--------------------+
|The Da Vinci Code...|  1.0|    39|[the, da, vinci, ...|[da, vinci, code,...|
|this was the firs...|  1.0|    72|[this, was, the, ...|[first, clive, cu...|
|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|[liked, da, vinci...|
|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|[liked, da, vinci...|
|I liked the Da Vi...|  1.0|    72|[i, liked, the, d...|[liked, da, vinci...|
|that's not even a...|  1.0|    72|[that's, not, eve...|[even, exaggerati...|
|I loved the Da Vi...|  1.0|    72|[i, loved, the, d...|[loved, da, vinci...|
|i thought da vinc...|  1.0|    57|[i, thought, da, ...|[thought, da, vin...|
|The Da Vinci Code...|  1.0|    45|[the, da, vinci, ...|[da, vinci, code,...|
|I thought the Da ...|  1.0|    51|[i, thought, the,...|[thought

In [19]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *

In [20]:
len_udf = udf(lambda s: len(s), IntegerType())
refined_text_df = refined_text_df.withColumn("token_count", len_udf(col('refined_tokens')))
refined_text_df.orderBy(rand()).show(10)

+--------------------+-----+------+--------------------+--------------------+-----------+
|              Review|Label|length|              tokens|      refined_tokens|token_count|
+--------------------+-----+------+--------------------+--------------------+-----------+
|I love Brokeback ...|  1.0|    26|[i, love, brokeba...|[love, brokeback,...|          3|
|I love Brokeback ...|  1.0|    26|[i, love, brokeba...|[love, brokeback,...|          3|
|da vinci code was...|  1.0|    37|[da, vinci, code,...|[da, vinci, code,...|          5|
|I want to be here...|  1.0|    72|[i, want, to, be,...|[want, love, harr...|          7|
|After I'd read Oo...|  1.0|    72|[after, i'd, read...|[read, ootp, sixt...|          7|
|Da Vinci Code suc...|  0.0|    22|[da, vinci, code,...|[da, vinci, code,...|          4|
|the people who ar...|  1.0|    67|[the, people, who...|[people, worth, k...|          8|
|I love Harry Potter.|  1.0|    20|[i, love, harry, ...|[love, harry, pot...|          3|
|The Da Vi

In [21]:
count_vec = CountVectorizer(inputCol='refined_tokens',outputCol='features')
cv_text_df = count_vec.fit(refined_text_df).transform(refined_text_df)
cv_text_df.select(['refined_tokens','token_count','features','Label']).show(10)

+--------------------+-----------+--------------------+-----+
|      refined_tokens|token_count|            features|Label|
+--------------------+-----------+--------------------+-----+
|[da, vinci, code,...|          5|(2302,[0,1,4,43,2...|  1.0|
|[first, clive, cu...|          9|(2302,[11,51,229,...|  1.0|
|[liked, da, vinci...|          5|(2302,[0,1,4,53,3...|  1.0|
|[liked, da, vinci...|          5|(2302,[0,1,4,53,3...|  1.0|
|[liked, da, vinci...|          8|(2302,[0,1,4,53,6...|  1.0|
|[even, exaggerati...|          6|(2302,[46,229,271...|  1.0|
|[loved, da, vinci...|          8|(2302,[0,1,22,30,...|  1.0|
|[thought, da, vin...|          7|(2302,[0,1,4,228,...|  1.0|
|[da, vinci, code,...|          6|(2302,[0,1,4,33,2...|  1.0|
|[thought, da, vin...|          7|(2302,[0,1,4,223,...|  1.0|
+--------------------+-----------+--------------------+-----+
only showing top 10 rows



In [22]:
model_text_df = cv_text_df.select(['features','token_count','Label'])
from pyspark.ml.feature import VectorAssembler
df_assembler = VectorAssembler(inputCols=['features','token_count'], outputCol='features_vec')
model_text_df = df_assembler.transform(model_text_df)
model_text_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- token_count: integer (nullable = true)
 |-- Label: float (nullable = true)
 |-- features_vec: vector (nullable = true)



In [23]:
from pyspark.ml.classification import LogisticRegression
training_df,test_df = model_text_df.randomSplit([0.75,0.25])
training_df.groupBy('Label').count().show()

+-----+-----+
|Label|count|
+-----+-----+
|  1.0| 2953|
|  0.0| 2278|
+-----+-----+



In [24]:
test_df.groupBy('Label').count().show()

+-----+-----+
|Label|count|
+-----+-----+
|  1.0|  956|
|  0.0|  803|
+-----+-----+



In [25]:
log_reg = LogisticRegression(featuresCol='features_vec', labelCol='Label').fit(training_df)
results = log_reg.evaluate(test_df).predictions
results.show()

+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+
|            features|token_count|Label|        features_vec|       rawPrediction|         probability|prediction|
+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+
|(2302,[0,1,4,5,65...|          5|  1.0|(2303,[0,1,4,5,65...|[-18.432535219019...|[9.88215502628804...|       1.0|
|(2302,[0,1,4,12,1...|         10|  1.0|(2303,[0,1,4,12,1...|[-27.159688839005...|[1.60212724915018...|       1.0|
|(2302,[0,1,4,12,1...|          5|  1.0|(2303,[0,1,4,12,1...|[-19.953084328213...|[2.16015830113407...|       1.0|
|(2302,[0,1,4,12,1...|          5|  1.0|(2303,[0,1,4,12,1...|[-19.953084328213...|[2.16015830113407...|       1.0|
|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-21.236799489720...|[5.98377380278856...|       1.0|
|(2302,[0,1,4,12,3...|          5|  1.0|(2303,[0,1,4,12,3...|[-21.236799489720..

In [26]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
#confusion matrix
true_postives = results[(results.Label == 1) & (results.prediction == 1)].count()
true_negatives = results[(results.Label == 0) & (results.prediction == 0)].count()
false_positives = results[(results.Label == 0) & (results.prediction == 1)].count()
false_negatives = results[(results.Label == 1) & (results.prediction == 0)].count()

recall = float(true_postives)/(true_postives + false_negatives)
print(recall)

precision = float(true_postives) / (true_postives + false_positives)
print(precision)

accuracy=float((true_postives+true_negatives) /(results.count()))
print(accuracy)

0.9801255230125523
0.9669762641898865
0.9710062535531552
