In [1]:
# Sentiment Analysis
# Classify the sentence/comment/review/feedback into positive or negative

# Text Processing - NLP - Natural Language Processing
"""
1. Tokenization
2. Remove stopwords and punctuations
3. Lemmatization
4. Vectorization
"""

"""
Example - The product is very bad and it's a waste of money
Step - 1 : tokenization + lowercase
= ["the", "product", "is", "very", "bad", "and", "it's", "a", "waste", "of", "money"]

Step - 2 : remove stopwords (and, is, are, the, that, a, if, but) and punctuations (!,@,#,$,%,^,&,*)
= ["product", "very", "bad", "waste", "money"]

Step - 3 : lemmatization - play - playing, played... waste - wasting, wasted
= []

Step - 4 : Vectorization - Count Vectorization, TF-IDF, Hashing
TF - Term Frequency
IDF - Inverse Document Frequency
"""

from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, StringIndexer, HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
spark = SparkSession.builder.master("local[*]").appName("sentiment_analysis_app_2").config("spark.executor.memory", "8g").config("spark.driver.memory", "8g").config("spark.executor.cores", "4").getOrCreate()

24/10/25 14:22:19 WARN Utils: Your hostname, Ravikants-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.10.47 instead (on interface en0)
24/10/25 14:22:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/25 14:22:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/25 14:22:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
df = spark.read.csv("IMDB Dataset.csv", header=True, inferSchema=True)

In [6]:
spark

In [7]:
df.printSchema()

root
 |-- review: string (nullable = true)
 |-- sentiment: string (nullable = true)



In [11]:
df.show(5)

+--------------------+--------------------+
|              review|           sentiment|
+--------------------+--------------------+
|One of the other ...|            positive|
|"A wonderful litt...| not only is it w...|
|"I thought this w...| but spirited you...|
|Basically there's...|            negative|
|"Petter Mattei's ...| power and succes...|
+--------------------+--------------------+
only showing top 5 rows



In [13]:
# df.toPandas().head(5)

In [15]:
# Optional step - renaming columns
df = df.withColumnRenamed("sentiment", "label").withColumnRenamed("review", "text")

In [17]:
df.filter(df["label"].isNull()).show()

+--------------------+-----+
|                text|label|
+--------------------+-----+
|".... may seem fa...| NULL|
|"And that comes f...| NULL|
|"Sorry everyone,,...| NULL|
|"With a special t...| NULL|
|"I've seen a lot ...| NULL|
|"I happened to se...| NULL|
|"seriously what t...| NULL|
+--------------------+-----+



In [19]:
df = df.filter(df["label"].isNotNull()).withColumn("label", df["label"].cast("string"))

In [29]:
df = df.sample(fraction=0.5)
df = df.persist()

In [31]:
df.count()

15931

In [33]:
# Convert sentiment labels (positive and negative) to numeric format
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")

In [35]:
# Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="words")

In [37]:
# Remove stopwords
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

In [39]:
# CountVectorization - Convert words to numerical feature vectors
# vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="features")

# Using hashing instead of CountVectorizer
vectorizer = HashingTF(inputCol="filtered_words", outputCol="features", numFeatures=2000)

In [41]:
logistic = LogisticRegression(featuresCol="features", labelCol="label_index")

In [43]:
# Build Pipeline - setup stages one by one
pipeline = Pipeline(stages=[label_indexer, tokenizer, stopwords_remover, vectorizer, logistic])

In [45]:
# split the data into training and testing
# we train on 80% of data and testing on 20% of data
train_df, test_df = df.randomSplit([0.7, 0.3])

In [None]:
# Train the model  - execute pipeline
model = pipeline.fit(train_df)

24/10/25 14:22:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/10/25 14:22:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
[Stage 117:=====>                                                 (1 + 10) / 11]