In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('testing').getOrCreate()

In [3]:
from pyspark.sql.window import Window
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [4]:
sparkdf = spark.read.csv("IMDB Dataset.txt", sep="\t", inferSchema=True, header=True)
sparkdf.show(2, True)

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|"A wonderful litt...| positive|
+--------------------+---------+
only showing top 2 rows



In [5]:
print(sparkdf.count(),',', len(sparkdf.columns))

50000 , 2


In [6]:
sparkdf.groupBy('sentiment').count().show()

+--------------------+-----+
|           sentiment|count|
+--------------------+-----+
|            positive|24995|
|            negative|24995|
|""Rock Star,"" or...|    1|
|There is little m...|    1|
|This is a story o...|    1|
|This movie was sh...|    1|
|The film stars Gi...|    1|
|I would want to k...|    1|
|The film, like ma...|    1|
|The screenplay fo...|    1|
|""One Night at Mc...|    1|
|Wynorski's credit...|    1|
+--------------------+-----+



In [7]:
# Looks like there are some review sentences in the sentiment column

In [8]:
filtered_df = sparkdf.filter((sparkdf['sentiment']=='positive') | (sparkdf['sentiment']=='negative'))

In [9]:
filtered_df.groupBy('sentiment').count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
| positive|24995|
| negative|24995|
+---------+-----+



In [10]:
# def labelling(target_tabel):
#     if target_label == 'positive':
#         return '1'
#     elif target_label == 'negative':
#         return '0'
#     else:
#         return '-99'

In [11]:
# label_udf = F.udf(labelling, T.StringType())

In [12]:
# filtered_df = filtered_df.withColumn('target_label', label_udf(filtered_df['sentiment']))
# # Didnt work

# # Need to look into it

In [13]:
# filtered_df.createOrReplaceTempView('filtered_df')

In [14]:
# new_filtered_df = spark.sql('select * from filtered_df')
# new_filtered_df.select('target_label').distinct().show()

In [15]:
# exp_udf = F.udf(lambda sentiment: '1' if sentiment == 'positive' else '0', T.StringType())

In [16]:
# filtered_df.withColumn('target_label', exp_udf(filtered_df['sentiment'])).show(2, True)

In [17]:
# !pip install sparknlp
# import sparknlp
# sparknlp.version()

In [18]:
df_clean = sparkdf.select('review', F.lower(F.regexp_replace('review', "[^a-zA-Z\\s]", "")).alias('clean_review'), 'sentiment')
df_clean.show(1, True)

+--------------------+--------------------+---------+
|              review|        clean_review|sentiment|
+--------------------+--------------------+---------+
|One of the other ...|one of the other ...| positive|
+--------------------+--------------------+---------+
only showing top 1 row



In [19]:
## df_clean.printSchema()

In [20]:
from pyspark.ml.feature import Tokenizer

In [21]:
tokenizer = Tokenizer(inputCol='clean_review', outputCol='word_tokens')

In [22]:
tokenized_df = tokenizer.transform(df_clean)
tokenized_df.show(2, True)

+--------------------+--------------------+---------+--------------------+
|              review|        clean_review|sentiment|         word_tokens|
+--------------------+--------------------+---------+--------------------+
|One of the other ...|one of the other ...| positive|[one, of, the, ot...|
|"A wonderful litt...|a wonderful littl...| positive|[a, wonderful, li...|
+--------------------+--------------------+---------+--------------------+
only showing top 2 rows



In [23]:
tokenized_df = tokenized_df.filter((tokenized_df['sentiment']=='positive') | (tokenized_df['sentiment']=='negative'))

In [24]:
tokenized_df.groupBy('sentiment').count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
| positive|24995|
| negative|24995|
+---------+-----+



In [25]:
# Stopwords removal

from pyspark.ml.feature import StopWordsRemover
no_stopwords = StopWordsRemover(inputCol='word_tokens', outputCol='refined_word_tokens')

In [26]:
refined_df = no_stopwords.transform(tokenized_df)
refined_df.show(5, True)

+--------------------+--------------------+---------+--------------------+--------------------+
|              review|        clean_review|sentiment|         word_tokens| refined_word_tokens|
+--------------------+--------------------+---------+--------------------+--------------------+
|One of the other ...|one of the other ...| positive|[one, of, the, ot...|[one, reviewers, ...|
|"A wonderful litt...|a wonderful littl...| positive|[a, wonderful, li...|[wonderful, littl...|
|"I thought this w...|i thought this wa...| positive|[i, thought, this...|[thought, wonderf...|
|Basically there's...|basically theres ...| negative|[basically, there...|[basically, there...|
|"Petter Mattei's ...|petter matteis lo...| positive|[petter, matteis,...|[petter, matteis,...|
+--------------------+--------------------+---------+--------------------+--------------------+
only showing top 5 rows



In [27]:
refined_df = refined_df.withColumn('len_review', F.length('review'))
refined_df = refined_df.withColumn('len_clean_review', F.length('clean_review'))
refined_df = refined_df.withColumn('len_word_tokens', F.size('word_tokens'))
refined_df = refined_df.withColumn('len_refined_word_tokens', F.size('refined_word_tokens'))
refined_df.select('len_review', 'len_clean_review', 'len_word_tokens', 'len_refined_word_tokens').show(2, True)

+----------+----------------+---------------+-----------------------+
|len_review|len_clean_review|len_word_tokens|len_refined_word_tokens|
+----------+----------------+---------------+-----------------------+
|      1761|            1682|            307|                    170|
|      1002|             954|            162|                     90|
+----------+----------------+---------------+-----------------------+
only showing top 2 rows



In [28]:
refined_df.printSchema()

root
 |-- review: string (nullable = true)
 |-- clean_review: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- word_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- refined_word_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- len_review: integer (nullable = true)
 |-- len_clean_review: integer (nullable = true)
 |-- len_word_tokens: integer (nullable = false)
 |-- len_refined_word_tokens: integer (nullable = false)



In [29]:
refined_df.groupBy('sentiment').max('len_review').show()
refined_df.groupBy('sentiment').max('len_clean_review').show()
refined_df.groupBy('sentiment').max('len_word_tokens').show()
refined_df.groupBy('sentiment').max('len_refined_word_tokens').show()

refined_df.groupBy('sentiment').avg('len_review').show()
refined_df.groupBy('sentiment').avg('len_clean_review').show()
refined_df.groupBy('sentiment').avg('len_word_tokens').show()
refined_df.groupBy('sentiment').avg('len_refined_word_tokens').show()

+---------+---------------+
|sentiment|max(len_review)|
+---------+---------------+
| positive|          13710|
| negative|           9001|
+---------+---------------+

+---------+---------------------+
|sentiment|max(len_clean_review)|
+---------+---------------------+
| positive|                13331|
| negative|                 8509|
+---------+---------------------+

+---------+--------------------+
|sentiment|max(len_word_tokens)|
+---------+--------------------+
| positive|                2469|
| negative|                1522|
+---------+--------------------+

+---------+----------------------------+
|sentiment|max(len_refined_word_tokens)|
+---------+----------------------------+
| positive|                        1439|
| negative|                         834|
+---------+----------------------------+

+---------+------------------+
|sentiment|   avg(len_review)|
+---------+------------------+
| positive|1327.7240648129625|
| negative|1297.5245849169835|
+---------+--------------

In [30]:
refined_df = refined_df.withColumn('id', F.monotonically_increasing_id())

In [31]:
# Always positive reviews had higher words
# But average remains similar in positive and negative reviews

In [32]:
from pyspark.ml.feature import CountVectorizer

In [33]:
cv = CountVectorizer(inputCol='refined_word_tokens', outputCol='cv_features')

In [34]:
cv_df = cv.fit(refined_df).transform(refined_df)

In [35]:
cv_df.columns

['review',
 'clean_review',
 'sentiment',
 'word_tokens',
 'refined_word_tokens',
 'len_review',
 'len_clean_review',
 'len_word_tokens',
 'len_refined_word_tokens',
 'id',
 'cv_features']

In [36]:
cv_df.select('cv_features').show(5)

+--------------------+
|         cv_features|
+--------------------+
|(175912,[0,1,4,13...|
|(175912,[0,4,9,10...|
|(175912,[0,1,4,7,...|
|(175912,[0,1,2,3,...|
|(175912,[0,2,3,4,...|
+--------------------+
only showing top 5 rows



In [37]:
print(cv.fit(refined_df).vocabulary[:25])

['br', '', 'movie', 'film', 'one', 'like', 'good', 'even', 'time', 'really', 'see', 'story', 'much', 'well', 'get', 'great', 'also', 'bad', 'people', 'first', 'dont', 'movies', 'made', 'make', 'films']


In [38]:
from pyspark.ml.feature import HashingTF, IDF

In [39]:
hashing_vec = HashingTF(inputCol='refined_word_tokens', outputCol='tf_features')

In [40]:
hashing_df = hashing_vec.transform(cv_df)

In [41]:
hashing_df.select('refined_word_tokens', 'tf_features').show(5)

+--------------------+--------------------+
| refined_word_tokens|         tf_features|
+--------------------+--------------------+
|[one, reviewers, ...|(262144,[2325,328...|
|[wonderful, littl...|(262144,[521,2574...|
|[thought, wonderf...|(262144,[1043,139...|
|[basically, there...|(262144,[6512,853...|
|[petter, matteis,...|(262144,[2751,392...|
+--------------------+--------------------+
only showing top 5 rows



In [42]:
hashing_df.columns

['review',
 'clean_review',
 'sentiment',
 'word_tokens',
 'refined_word_tokens',
 'len_review',
 'len_clean_review',
 'len_word_tokens',
 'len_refined_word_tokens',
 'id',
 'cv_features',
 'tf_features']

In [43]:
tf_idf_vec = IDF(inputCol='tf_features', outputCol='tf_idf_features')

In [44]:
tf_idf_df = tf_idf_vec.fit(hashing_df).transform(hashing_df)

In [45]:
tf_idf_df.printSchema()

root
 |-- review: string (nullable = true)
 |-- clean_review: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- word_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- refined_word_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- len_review: integer (nullable = true)
 |-- len_clean_review: integer (nullable = true)
 |-- len_word_tokens: integer (nullable = false)
 |-- len_refined_word_tokens: integer (nullable = false)
 |-- id: long (nullable = false)
 |-- cv_features: vector (nullable = true)
 |-- tf_features: vector (nullable = true)
 |-- tf_idf_features: vector (nullable = true)



In [46]:
tf_idf_df.createOrReplaceTempView('tf_idf_df')

In [47]:
final_df = spark.sql('''SELECT *, 
                        CASE WHEN sentiment = 'positive' THEN 1 
                        WHEN sentiment = 'negative' THEN  0 
                        ELSE -99 END as target_label
                        FROM tf_idf_df
                        ''')

In [48]:
final_df.printSchema()

root
 |-- review: string (nullable = true)
 |-- clean_review: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- word_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- refined_word_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- len_review: integer (nullable = true)
 |-- len_clean_review: integer (nullable = true)
 |-- len_word_tokens: integer (nullable = false)
 |-- len_refined_word_tokens: integer (nullable = false)
 |-- id: long (nullable = false)
 |-- cv_features: vector (nullable = true)
 |-- tf_features: vector (nullable = true)
 |-- tf_idf_features: vector (nullable = true)
 |-- target_label: integer (nullable = false)



In [49]:
model_text_df = final_df.select(['len_review', 'len_clean_review', 'len_word_tokens', 'len_refined_word_tokens'
                                , 'cv_features', 'tf_features', 'tf_idf_features', 'target_label'])

In [50]:
model_text_df.printSchema()

root
 |-- len_review: integer (nullable = true)
 |-- len_clean_review: integer (nullable = true)
 |-- len_word_tokens: integer (nullable = false)
 |-- len_refined_word_tokens: integer (nullable = false)
 |-- cv_features: vector (nullable = true)
 |-- tf_features: vector (nullable = true)
 |-- tf_idf_features: vector (nullable = true)
 |-- target_label: integer (nullable = false)



In [51]:
model_df = model_text_df.select(['len_review', 'tf_idf_features', 'target_label'])

In [52]:
model_df.printSchema()

root
 |-- len_review: integer (nullable = true)
 |-- tf_idf_features: vector (nullable = true)
 |-- target_label: integer (nullable = false)



In [53]:
from pyspark.ml.feature import VectorAssembler

In [54]:
df_assembler = VectorAssembler(inputCols=['len_review', 'tf_idf_features'], outputCol='features_vec')

In [55]:
model_df = df_assembler.transform(model_df)

In [56]:
model_df.printSchema()

root
 |-- len_review: integer (nullable = true)
 |-- tf_idf_features: vector (nullable = true)
 |-- target_label: integer (nullable = false)
 |-- features_vec: vector (nullable = true)



In [57]:
from pyspark.ml.classification import LogisticRegression

In [58]:
train_df, test_df = model_df.randomSplit([0.7, 0.3], seed=3)

In [59]:
train_df.groupBy('target_label').count().show()

Py4JJavaError: An error occurred while calling o375.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 55.0 failed 1 times, most recent failure: Lost task 1.0 in stage 55.0 (TID 183) (192.168.2.16 executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.io.ObjectInputStream$HandleTable.markDependency(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.readArray(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.io.ObjectInputStream$HandleTable.markDependency(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.readArray(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)


In [None]:
test_df.groupBy('target_label').count().show()