 #### Setting up Apache Spark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [11]:
import os
import string
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"

In [5]:
import findspark
findspark.init()
findspark.find()

'spark-3.2.0-bin-hadoop3.2'

#### Initializing the SparkSession

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

#### Google Drive Mounting on Colab

In [8]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#### Reading the dataset and extracting the filenames into a list

In [70]:
data_path = '/content/drive/MyDrive/Analysis/data_moviereviews'
files = sc.wholeTextFiles(data_path)
filename_list = files.map(lambda x: x[0].split('/')[-1]).collect()
print(filename_list[:10])

['cv500_10251.txt', 'cv500_10722.txt', 'cv501_11657.txt', 'cv501_12675.txt', 'cv502_10406.txt', 'cv502_10970.txt', 'cv503_10558.txt', 'cv503_11196.txt', 'cv504_29120.txt', 'cv505_12090.txt']


#### Preprocessing of the Movie Reviews data

In [72]:
def preprocess_text(text):
    # Remove punctuation and extra spaces from review
    cleaned_text = text.translate(str.maketrans('', '', string.punctuation)).replace('\n', ' ')
    words = cleaned_text.split()
    return words

preprocessed_data = files.map(lambda x: (x[0].split('/')[-1], preprocess_text(x[1])))
data_df = preprocessed_data.toDF(["Filename", "Review"])
data_df.show()

+---------------+--------------------+
|       Filename|              Review|
+---------------+--------------------+
|cv500_10251.txt|[losing, a, job, ...|
|cv500_10722.txt|[you, always, hav...|
|cv501_11657.txt|[as, much, as, i,...|
|cv501_12675.txt|[synopsis, easily...|
|cv502_10406.txt|[the, postman, de...|
|cv502_10970.txt|[the, blues, brot...|
|cv503_10558.txt|[there, are, cert...|
|cv503_11196.txt|[michael, crichto...|
|cv504_29120.txt|[american, pie, 2...|
|cv505_12090.txt|[some, critics, i...|
|cv504_29243.txt|[i, guess, its, a...|
|cv505_12926.txt|[well, what, are,...|
|cv506_15956.txt|[much, ballyhoo, ...|
|cv506_17521.txt|[this, film, is, ...|
| cv507_9220.txt|[capsule, sidespl...|
|cv508_16006.txt|[maybe, the, most...|
| cv507_9509.txt|[midway, through,...|
|cv509_15888.txt|[as, i, write, th...|
|cv508_17742.txt|[if, the, 70s, no...|
|cv509_17354.txt|[synopsis, nice, ...|
+---------------+--------------------+
only showing top 20 rows



In [73]:
review_RDD = data_df.select('Review').rdd.flatMap(list)

#### Reading Positive words into RDD

In [74]:
pos_rdd = spark.sparkContext.textFile('/content/drive/MyDrive/Analysis/pos.txt')
pos_words = pos_rdd.collect()  # Collect the RDD into a Python list
print(pos_words[:10])

['abound', 'abounds', 'abundance', 'abundant', 'accessable', 'accessible', 'acclaim', 'acclaimed', 'acclamation', 'accolade']


#### Counting of Positive words in Reviews

In [75]:
pos_word_counts = review_RDD.map(lambda review: len([word for word in review if word in pos_words]))
pos_word_counts_list = pos_word_counts.collect()
pos_word_counts.take(10)

[29, 21, 71, 26, 20, 25, 60, 15, 13, 44]

#### Reading Negative words into RDD

In [76]:
neg_rdd = spark.sparkContext.textFile('/content/drive/MyDrive/Analysis/neg.txt')
neg_words = neg_rdd.collect()
print(neg_words[:10])

['abnormal', 'abolish', 'abominable', 'abominably', 'abominate', 'abomination', 'abort', 'aborted', 'aborts', 'abrade']


#### Counting of Negative words in Reviews

In [77]:
neg_word_counts = review_RDD.map(lambda review: len([word for word in review if word in neg_words]))
neg_word_counts_list = neg_word_counts.collect()
neg_word_counts.take(10)

[35, 17, 73, 42, 33, 27, 34, 12, 17, 82]

#### Assigning sentiment based on maximum count of positive/negative count

In [78]:
sentiment_list = list(map(lambda pos, neg: 'positive' if pos > neg else 'negative', pos_word_counts_list, neg_word_counts_list))
print(sentiment_list[:10])

['negative', 'positive', 'negative', 'negative', 'negative', 'negative', 'positive', 'positive', 'negative', 'negative']


In [79]:
columns = ['filename','positive_count','negative_count','sentiment']
df = spark.createDataFrame(zip(filename_list,pos_word_counts_list,neg_word_counts_list,sentiment_list), columns)
df.show()

+---------------+--------------+--------------+---------+
|       filename|positive_count|negative_count|sentiment|
+---------------+--------------+--------------+---------+
|cv500_10251.txt|            29|            35| negative|
|cv500_10722.txt|            21|            17| positive|
|cv501_11657.txt|            71|            73| negative|
|cv501_12675.txt|            26|            42| negative|
|cv502_10406.txt|            20|            33| negative|
|cv502_10970.txt|            25|            27| negative|
|cv503_10558.txt|            60|            34| positive|
|cv503_11196.txt|            15|            12| positive|
|cv504_29120.txt|            13|            17| negative|
|cv505_12090.txt|            44|            82| negative|
|cv504_29243.txt|            28|            26| positive|
|cv505_12926.txt|            21|            29| negative|
|cv506_15956.txt|            66|            54| positive|
|cv506_17521.txt|             1|             2| negative|
| cv507_9220.t