Installing and Setting Up Spark

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.3.0-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.3.0-bin-hadoop3"

In [4]:
import findspark
findspark.init()

In [5]:
findspark.find()

'spark-3.3.0-bin-hadoop3'

In [31]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

Connecting to Google Drive

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Reading Movie Review files into RDD

In [99]:
path = '/content/drive/MyDrive/moviereviews/moviereviews'

files = sc.wholeTextFiles(path)

print(len(files.collect()))

1999


Reading files into RDD tuples and removing Punctuations from the textfiles data

In [100]:
import string
all_files = files.map(lambda x:(x[0].split('/')[-1],x[1].translate(str.maketrans('', '', string.punctuation))))
data = all_files.collect()
all_files.take(2)

[('cv988_18740.txt',
  'i wish i could say that there is something more to the new star wars installment than what you see in the commercials  but there isnt  \ngeorge lucas is an expert at crafting triumphant  special effectsbound flicks and he has done it again  \nthis is the first star wars film ive seen on the big screen  so  of course  the impact was much greater than when i watched the three original episodes on video  \nwhat is compelling about the phantom menace is not its disposable story  but its amazing visuals  \nbasically  the films plot is centered around trade disputes between two planets  \nquigon  liam neeson  and his young apprentice obiwan kenobi  ewan mcgregor  set out with the animated jarjar binks to fix the mess that may result in war  \nwhen their ship breaks down and they are stranded  they find a young boy  anakin skywalker  jake lloyd  and his slave mother  \nquigon strongly believes that anakin is destined for jedi greatness and immediately takes him under h

Removing '\n' from the text data

In [102]:
new_all_files = all_files.map(lambda x:(x[0],x[1].replace('\n','')))
new_all_files.take(2)

[('cv988_18740.txt',
  'i wish i could say that there is something more to the new star wars installment than what you see in the commercials  but there isnt  george lucas is an expert at crafting triumphant  special effectsbound flicks and he has done it again  this is the first star wars film ive seen on the big screen  so  of course  the impact was much greater than when i watched the three original episodes on video  what is compelling about the phantom menace is not its disposable story  but its amazing visuals  basically  the films plot is centered around trade disputes between two planets  quigon  liam neeson  and his young apprentice obiwan kenobi  ewan mcgregor  set out with the animated jarjar binks to fix the mess that may result in war  when their ship breaks down and they are stranded  they find a young boy  anakin skywalker  jake lloyd  and his slave mother  quigon strongly believes that anakin is destined for jedi greatness and immediately takes him under his wing  anaki

Removing extra spaces in text data

In [103]:
f_files = new_all_files.map(lambda x:(x[0], x[1].replace('  ',' ')))
f_files.take(2)

[('cv988_18740.txt',
  'i wish i could say that there is something more to the new star wars installment than what you see in the commercials but there isnt george lucas is an expert at crafting triumphant special effectsbound flicks and he has done it again this is the first star wars film ive seen on the big screen so of course the impact was much greater than when i watched the three original episodes on video what is compelling about the phantom menace is not its disposable story but its amazing visuals basically the films plot is centered around trade disputes between two planets quigon liam neeson and his young apprentice obiwan kenobi ewan mcgregor set out with the animated jarjar binks to fix the mess that may result in war when their ship breaks down and they are stranded they find a young boy anakin skywalker jake lloyd and his slave mother quigon strongly believes that anakin is destined for jedi greatness and immediately takes him under his wing anakin participates in a hea

Splitting text data into list of words

In [None]:
nf_filesRDD = f_files.map(lambda x: (x[0],x[1].split()))
data = nf_filesRDD.collect()
nf_filesRDD.take(2)

Converting above RDD into DataFrame

In [105]:
nf_files_df = nf_filesRDD.toDF()
nf_files_df.show()

+---------------+--------------------+
|             _1|                  _2|
+---------------+--------------------+
|cv988_18740.txt|[i, wish, i, coul...|
| cv780_8467.txt|[sometimes, i, wo...|
|cv172_11131.txt|[a, standoff, a, ...|
|cv944_13521.txt|[meteor, threat, ...|
| cv038_9781.txt|[capsule, where, ...|
|cv367_24065.txt|[there, should, b...|
|cv945_13012.txt|[perhaps, best, r...|
| cv226_2618.txt|[i, had, a, chanc...|
|cv700_23163.txt|[the, beach, is, ...|
| cv522_5418.txt|[an, 18foothigh, ...|
|cv050_11175.txt|[national, lampoo...|
|cv575_21150.txt|[underrated, movi...|
|cv010_29063.txt|[best, remembered...|
|cv905_28965.txt|[in, the, year, 2...|
| cv937_9816.txt|[take, a, look, a...|
|cv812_19051.txt|[plot, set, in, t...|
|cv958_13020.txt|[in, times, of, c...|
|cv226_26692.txt|[not, a, great, t...|
|cv110_27788.txt|[plot, a, bunch, ...|
| cv754_7709.txt|[capsule, godawfu...|
+---------------+--------------------+
only showing top 20 rows



Converting above dataframe second column into RDD to get count

In [106]:
file_word_RDD = nf_files_df.select('_2').rdd.flatMap(list)

Reading positive words into RDD and converting into a list

In [107]:
positive_rdd = sc.textFile('/content/sample_data/pos.txt')
positive_list = positive_rdd.collect()
print(positive_list)

['abound', 'abounds', 'abundance', 'abundant', 'accessable', 'accessible', 'acclaim', 'acclaimed', 'acclamation', 'accolade', 'accolades', 'accommodative', 'accomodative', 'accomplish', 'accomplished', 'accomplishment', 'accomplishments', 'accurate', 'accurately', 'achievable', 'achievement', 'achievements', 'achievible', 'acumen', 'adaptable', 'adaptive', 'adequate', 'adjustable', 'admirable', 'admirably', 'admiration', 'admire', 'admirer', 'admiring', 'admiringly', 'adorable', 'adore', 'adored', 'adorer', 'adoring', 'adoringly', 'adroit', 'adroitly', 'adulate', 'adulation', 'adulatory', 'advanced', 'advantage', 'advantageous', 'advantageously', 'advantages', 'adventuresome', 'adventurous', 'advocate', 'advocated', 'advocates', 'affability', 'affable', 'affably', 'affectation', 'affection', 'affectionate', 'affinity', 'affirm', 'affirmation', 'affirmative', 'affluence', 'affluent', 'afford', 'affordable', 'affordably', 'afordable', 'agile', 'agilely', 'agility', 'agreeable', 'agreeabl

Reading negative words into RDD and converting into a list

In [108]:
negative_rdd = sc.textFile('/content/sample_data/neg.txt')
negative_list = negative_rdd.collect()
print(negative_list)



Counting positive words in each review into RDD and converting it into list

In [109]:
positive_count = file_word_RDD.map(lambda x :len([i for i in x if i in positive_list]))
positive_count_list = positive_count.collect()
positive_count.take(2)

[36, 14]

Counting negative words in each review into RDD and converting it into list

In [110]:
negative_count = file_word_RDD.map(lambda x: len([i for i in x if i in negative_list]))
negative_count_list = negative_count.collect()
negative_count.take(2)

[31, 19]

Getting sentiment of each review based on the count of positve and negative words

In [111]:
sentiment_list = []

for i in range(len(positive_count_list)):
  if positive_count_list[i] > negative_count_list[i]:
    sentiment_list.append('positive')
  else:
    sentiment_list.append('negative')

print(sentiment_list[:10])

['positive', 'negative', 'negative', 'negative', 'negative', 'negative', 'negative', 'positive', 'negative', 'negative']


Reading all file names into a RDD then converting it into a list

In [113]:
filename_list = all_files.map(lambda x: x[0].split('/')[-1]).collect()
print(filename_list[:10])

['cv988_18740.txt', 'cv780_8467.txt', 'cv172_11131.txt', 'cv944_13521.txt', 'cv038_9781.txt', 'cv367_24065.txt', 'cv945_13012.txt', 'cv226_2618.txt', 'cv700_23163.txt', 'cv522_5418.txt']


Giving column names for the final DataFrame

In [114]:
columns = ['filename','positivecounts','negativecounts','sentiment']

Creating DataFrame with above column names and showing final output

In [115]:
df = spark.createDataFrame(zip(filename_list,positive_count_list,negative_count_list,sentiment_list), columns)
df.show()

+---------------+--------------+--------------+---------+
|       filename|positivecounts|negativecounts|sentiment|
+---------------+--------------+--------------+---------+
|cv988_18740.txt|            36|            31| positive|
| cv780_8467.txt|            14|            19| negative|
|cv172_11131.txt|            15|            30| negative|
|cv944_13521.txt|            21|            27| negative|
| cv038_9781.txt|            24|            45| negative|
|cv367_24065.txt|            33|            47| negative|
|cv945_13012.txt|            15|            20| negative|
| cv226_2618.txt|            34|            22| positive|
|cv700_23163.txt|            18|            26| negative|
| cv522_5418.txt|             4|            16| negative|
|cv050_11175.txt|             7|            17| negative|
|cv575_21150.txt|            32|            24| positive|
|cv010_29063.txt|            34|            33| positive|
|cv905_28965.txt|            27|            23| positive|
| cv937_9816.t