In [1]:
from pyspark.sql import SparkSession
import requests
import os
from pyspark.sql.functions import expr, udf
from pyspark.sql.types import StringType
import re
import nltk
from nltk.corpus import stopwords
import numpy as np
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, SimpleRNN, LSTM, Dense, SpatialDropout1D
from sklearn.model_selection import train_test_split

ModuleNotFoundError: No module named 'nltk'

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Sentiment Analysis") \
    .getOrCreate()

# URL of the dataset
file_url = "https://drive.google.com/uc?id=1erMx3v_-yZUELUaXeQTowcfAvdSz0IhT&export=download"

# Path where the file will be saved in the current directory
local_file_path = os.path.join(os.getcwd(), "rt_reviews.csv")

# Download the file
response = requests.get(file_url, stream=True)
with open(local_file_path, "wb") as file:
    for chunk in response.iter_content(chunk_size=1024):
        if chunk:
            file.write(chunk)

# Read the dataset into a Spark DataFrame
df = spark.read.option("header", "true").csv(local_file_path).toDF("Label", "Text")

In [None]:
df.show()

+------+--------------------+
| Label|                Text|
+------+--------------------+
| fresh| Manakamana doesn...|
| fresh| Wilfully offensi...|
|rotten| It would be diff...|
|rotten| Despite the gust...|
|rotten| If there was a g...|
|rotten| Gleeson goes the...|
| fresh| It was the heigh...|
|rotten|" Everyone in ""T...|
|rotten| Actor encourages...|
| fresh| Slight, containe...|
| fresh| Bell's debut fea...|
| fresh| ... except for i...|
|rotten| This is not one ...|
| fresh| Son of Saul empl...|
|rotten| It isn't thrilli...|
|rotten| Hiddleston and S...|
| fresh| Yossi is a stron...|
| fresh| I feel like a Sp...|
|rotten| Long, unfocused ...|
| fresh| Kubo and the Two...|
+------+--------------------+
only showing top 20 rows



In [None]:
# Convert the Freshness column to 1 and 0 using expr
df = df.withColumn("Label", expr("CASE WHEN Label = 'fresh' THEN 1 ELSE 0 END"))

In [None]:
df.show()

+-----+--------------------+
|Label|                Text|
+-----+--------------------+
|    1| Manakamana doesn...|
|    1| Wilfully offensi...|
|    0| It would be diff...|
|    0| Despite the gust...|
|    0| If there was a g...|
|    0| Gleeson goes the...|
|    1| It was the heigh...|
|    0|" Everyone in ""T...|
|    0| Actor encourages...|
|    1| Slight, containe...|
|    1| Bell's debut fea...|
|    1| ... except for i...|
|    0| This is not one ...|
|    1| Son of Saul empl...|
|    0| It isn't thrilli...|
|    0| Hiddleston and S...|
|    1| Yossi is a stron...|
|    1| I feel like a Sp...|
|    0| Long, unfocused ...|
|    1| Kubo and the Two...|
+-----+--------------------+
only showing top 20 rows



In [None]:
# Define a UDF to clean text
def clean_text(text):
    text = text.lower()
    text = re.sub(r'<.*?>', '', text)
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

clean_text_udf = udf(lambda x: clean_text(x), StringType())

# Apply the UDF to the Text column
df = df.withColumn("cleaned_text", clean_text_udf(df["Text"]))

In [None]:
# Show the DataFrame with cleaned text
df.show(truncate=False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Label|Text                                                                                                                                                                                                                               |cleaned_text                                                                                                                                                                                                            |
+-----+---------------------------------------------------------------------------------------

In [None]:
# Collect the cleaned text and labels
reviews = df.select("cleaned_text").rdd.flatMap(lambda x: x).collect()
labels = df.select("Label").rdd.flatMap(lambda x: x).collect()

                                                                                

                                                                                

In [None]:
# Tokenize the text
tokenizer = Tokenizer(num_words=5000)
tokenizer.fit_on_texts(reviews)
sequences = tokenizer.texts_to_sequences(reviews)

# Pad the sequences
max_sequence_length = 250
padded_sequences = pad_sequences(sequences, maxlen=max_sequence_length)

# Convert labels to numpy array
labels = np.array(labels)

In [None]:
# Define the model
model = Sequential()
model.add(Embedding(input_dim=5000, output_dim=100, input_length=max_sequence_length))
model.add(SpatialDropout1D(0.2))
model.add(SimpleRNN(units=32, return_sequences=True, dropout=0.2, recurrent_dropout=0.2))
model.add(LSTM(100, dropout=0.2, recurrent_dropout=0.2))
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])



In [None]:
# Show the model summary
model.summary()

In [None]:
# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(padded_sequences, labels, test_size=0.2, random_state=42)

# Train the model
history = model.fit(X_train, y_train, epochs=10, batch_size=64, validation_data=(X_val, y_val), verbose=2)

Epoch 1/10
6000/6000 - 3871s - 645ms/step - accuracy: 0.7467 - loss: 0.5062 - val_accuracy: 0.7858 - val_loss: 0.4539
Epoch 2/10
6000/6000 - 3281s - 547ms/step - accuracy: 0.7923 - loss: 0.4424 - val_accuracy: 0.8023 - val_loss: 0.4252
Epoch 3/10
6000/6000 - 2935s - 489ms/step - accuracy: 0.8017 - loss: 0.4239 - val_accuracy: 0.8089 - val_loss: 0.4122
Epoch 4/10
6000/6000 - 2898s - 483ms/step - accuracy: 0.8028 - loss: 0.4228 - val_accuracy: 0.8099 - val_loss: 0.4067
Epoch 5/10
6000/6000 - 2736s - 456ms/step - accuracy: 0.8094 - loss: 0.4124 - val_accuracy: 0.8126 - val_loss: 0.4039
Epoch 6/10
6000/6000 - 2729s - 455ms/step - accuracy: 0.7992 - loss: 0.4283 - val_accuracy: 0.7997 - val_loss: 0.4256
Epoch 7/10
6000/6000 - 2793s - 465ms/step - accuracy: 0.8049 - loss: 0.4197 - val_accuracy: 0.8107 - val_loss: 0.4044
Epoch 8/10
6000/6000 - 2796s - 466ms/step - accuracy: 0.8126 - loss: 0.4052 - val_accuracy: 0.8127 - val_loss: 0.4033
Epoch 9/10
6000/6000 - 2769s - 461ms/step - accuracy: 0.

In [None]:
model.summary()

In [None]:
# Evaluate the model
loss, accuracy = model.evaluate(X_val, y_val, verbose=2)
print(f'Validation Loss: {loss}')
print(f'Validation Accuracy: {accuracy}')

3000/3000 - 281s - 94ms/step - accuracy: 0.8217 - loss: 0.3878
Validation Loss: 0.38777291774749756
Validation Accuracy: 0.8217499852180481


In [None]:
# Save the model to the current directory
model_save_path = os.path.join(os.getcwd(), "sentiment_analysis_model.h5")
model.save(model_save_path)
print(f"Model saved to {model_save_path}")



Model saved to /Users/tejas/College/Summer 2024/CS 6350 Big data/Project/sentiment_analysis_model.h5
