In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('IMDB_reviews_BDT').getOrCreate()

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, BooleanType, ArrayType

In [3]:
columns = []
columns.append(StructField('review_id', StringType(), True))
columns.append(StructField('reviewer', StringType(), True))
columns.append(StructField('movie', StringType(), True))
columns.append(StructField('rating', IntegerType(), True))
columns.append(StructField('review_summary', StringType(), True))
columns.append(StructField('review_date', TimestampType(), True))
columns.append(StructField('spoiler_tag', BooleanType(), True))
columns.append(StructField('review_detail', StringType(), True))
columns.append(StructField('helpful', StringType(), True))

In [4]:
schema = StructType(fields=columns)

# Original CSV

In [None]:
df_original = spark.read.csv("reviews.csv", header=True, schema=schema)

In [None]:
df_original.printSchema()

In [None]:
df_original.show(3)

In [None]:
df_original.columns

In [None]:
df_original.count()

In [None]:
df_original.select('movie').show(5)

In [None]:
df_original.select(['reviewer', 'movie', 'rating']).show(7)

Drop rows where are missing values

In [None]:
df1 = df_original.na.drop(subset=['movie']) # if any of the columns in the subset is null, the row is dropped

In [None]:
df1.select('movie').show(5)

In [None]:
df1.count() # - 1,011,481 rows

In [None]:
df2 = df1.na.drop(subset=['review_detail']) # if any of the columns in the subset is null, the row is dropped

In [None]:
df2.count() # - 829,429 rows

In [None]:
df3 = df2.na.drop(subset=['review_summary']) # if any of the columns in the subset is null, the row is dropped

In [None]:
df3.count() # - 92 rows

In [None]:
df_drop = df3.na.drop(subset=['rating']) # if any of the columns in the subset is null, the row is dropped

In [None]:
df_drop.count() # - 878,822 rows

In [None]:
df_drop.orderBy(df_drop['reviewer'].desc()).show()

In [None]:
# Save the resulting DataFrame to a CSV file
output_path = 'review_drop_1.csv'
df_drop.write.csv(output_path, header=True, mode='overwrite')

# After Droping CSV


In [5]:
df_drop = spark.read.csv("review_drop.csv", header=True, schema=schema)

In [6]:
df_drop.orderBy(df_drop['reviewer'].desc()).show(10)

+---------+---------+--------------------+------+--------------------+-----------+-----------+--------------------+------------+
|review_id| reviewer|               movie|rating|      review_summary|review_date|spoiler_tag|       review_detail|     helpful|
+---------+---------+--------------------+------+--------------------+-----------+-----------+--------------------+------------+
|rw0420294|    ángel|The Horse Whisper...|     3|The kiss of the h...|       NULL|       NULL|"In this film a h...|  ['2', '7']|
|rw0473637|    ángel|The Thin Red Line...|     9|            the best|       NULL|       NULL|Such a simple thi...|  ['0', '0']|
|rw0438715|    ángel|The Truman Show (...|     7|God is a T.V. pro...|       NULL|       NULL|The beginning (fi...|  ['0', '0']|
|rw0351091|Øystein-3|      Timecop (1994)|    10|Van Damme as we w...|       NULL|       NULL|This isn't at all...|['13', '18']|
|rw0054257|Øystein-3|American Farmers ...|     5|American farmers ...|       NULL|       NULL|Thi


### Add Bool Ratings Column

In [7]:
from pyspark.sql import functions as F

In [8]:
df = df_drop.withColumn('ratingboolpos', F.when(df_drop.rating >= 7, 1).otherwise(0))

In [9]:
df.orderBy(df['reviewer'].desc()).show(10)

+---------+---------+--------------------+------+--------------------+-----------+-----------+--------------------+------------+-------------+
|review_id| reviewer|               movie|rating|      review_summary|review_date|spoiler_tag|       review_detail|     helpful|ratingboolpos|
+---------+---------+--------------------+------+--------------------+-----------+-----------+--------------------+------------+-------------+
|rw0420294|    ángel|The Horse Whisper...|     3|The kiss of the h...|       NULL|       NULL|"In this film a h...|  ['2', '7']|            0|
|rw0473637|    ángel|The Thin Red Line...|     9|            the best|       NULL|       NULL|Such a simple thi...|  ['0', '0']|            1|
|rw0438715|    ángel|The Truman Show (...|     7|God is a T.V. pro...|       NULL|       NULL|The beginning (fi...|  ['0', '0']|            1|
|rw0054257|Øystein-3|American Farmers ...|     5|American farmers ...|       NULL|       NULL|This movie is a q...|        NULL|            0|

# Text Preprocessing

In [10]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("TextPreprocessing") \
    .config("spark.executor.instances", "2") \
    .getOrCreate()

In [11]:
! pip install nltk

Collecting nltk
  Downloading nltk-3.8.1-py3-none-any.whl.metadata (2.8 kB)
Collecting regex>=2021.8.3 (from nltk)
  Downloading regex-2024.5.15-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.9/40.9 kB[0m [31m930.4 kB/s[0m eta [36m0:00:00[0m
Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m12.8 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?25hDownloading regex-2024.5.15-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (785 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m785.0/785.0 kB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: regex, nltk
Successfully installed nltk-3.8.1 regex-2024.5.15


In [12]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /home/jovyan/nltk_data...


True

In [13]:
import re
import time
import string
from pyspark.sql.functions import col, udf, explode
from pyspark.sql.types import StringType, ArrayType
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

In [14]:
# Initialize lemmatizer and stop words
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))

In [15]:
@udf(returnType=ArrayType(StringType()))
def tokenize_words(text):
    if text is not None:
        return word_tokenize(text)
    else:
        return []

@udf(returnType=ArrayType(StringType()))
def remove_punctuation(words):
    translator = str.maketrans('', '', string.punctuation)
    return [word.translate(translator) for word in words]
    
@udf(returnType=ArrayType(StringType()))
def to_lowercase(words):
    return [x.lower() for x in words]

@udf(returnType=ArrayType(StringType()))
def remove_numbers(words):
    return [x for x in words if not x.isdigit()]

@udf(returnType=ArrayType(StringType()))
def remove_stopwords(words):
    return [x for x in words if x not in stop_words]

@udf(returnType=ArrayType(StringType()))
def lemmatize_words(words):
    return [lemmatizer.lemmatize(x) for x in words]

@udf(returnType=StringType())
def join_words(words):
    return " ".join(words)

In [16]:
# Define UDF for removing frequent words
@udf(returnType=ArrayType(StringType()))
def remove_frequent_words(words, frequent_words):
    return [word for word in words if word not in frequent_words]


In [17]:
def preprocess_data(df_preprocess, suf):
    start_time = time.time()
    
    # Apply transformations to 'review_summary' column
    df_preprocess_tokenize = df_preprocess.withColumn(f"{suf}_tokenize", tokenize_words(col(f"{suf}")))
    df_preprocess_punctuation = df_preprocess_tokenize.withColumn(f"{suf}_punctuation", remove_punctuation(col(f"{suf}_tokenize")))
    df_preprocess_lower = df_preprocess_punctuation.withColumn(f"{suf}_lower", to_lowercase(col(f"{suf}_punctuation")))
    df_preprocess_number = df_preprocess_lower.withColumn(f"{suf}_number", remove_numbers(col(f"{suf}_lower")))
    df_preprocess_stopwords = df_preprocess_number.withColumn(f"{suf}_stopwords", remove_stopwords(col(f"{suf}_number")))
    df_preprocess_lemmatization = df_preprocess_stopwords.withColumn(f"{suf}_lemmatization", lemmatize_words(col(f"{suf}_stopwords")))
    df_preprocessed = df_preprocess_lemmatization.withColumn(f"{suf}_join", join_words(col(f"{suf}_lemmatization")))
    
    end_time = time.time()
    preprocess_time = end_time - start_time
    return preprocess_time, df_preprocessed


## review_summary preprocess

In [18]:
input_path = 'review_summary_preprocess.csv'
df_drop.select('review_summary').write.csv(input_path, header=True, mode='overwrite')


In [19]:
# Read the CSV file into DataFrame
df_preprocess = spark.read.csv(input_path, header=True, inferSchema=True)

In [20]:
suf = "review_summary"
preprocess_time, df_preprocessed = preprocess_data(df_preprocess, suf)
print("Time taken:", preprocess_time, "seconds")
# df_preprocessed.show(5)

Time taken: 0.17024946212768555 seconds


In [21]:
df_preprocessed.select('review_summary', 'review_summary_join').show(5)

+--------------------+--------------------+
|      review_summary| review_summary_join|
+--------------------+--------------------+
|I just love this ...|         love movie |
|So sad, but oh-so...|       sad  ohsotrue|
|A smart, dark, so...|smart  dark  soci...|
|When I Viewed thi...|viewed movie blow...|
|            my ideas|                idea|
+--------------------+--------------------+
only showing top 5 rows



## movie_detail preprocess

In [22]:
input_path = 'review_detail_preprocess.csv'
df_drop.select('review_detail').write.csv(input_path, header=True, mode='overwrite')

In [23]:
# Read the CSV file into DataFrame
df_preprocess_review = spark.read.csv(input_path, header=True, inferSchema=True)

In [None]:
suf = "review_detail"
preprocess_time, df_preprocess_review = preprocess_data(df_preprocess_review, suf)
print("Time taken:", preprocess_time, "seconds")
df_preprocessed.show(5)

In [None]:
df_preprocess_review.select('review_detail', 'review_detail_join').show(5)

# Logistic Regression

In [26]:
import time
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Start the timer
start_time = time.time()

# Convert 'review_summary_join' into TF-IDF features
tokenizer = Tokenizer(inputCol="review_summary_join", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
vectorizer = CountVectorizer(inputCol="filtered", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Index the target column
indexer = StringIndexer(inputCol="ratingboolpos", outputCol="label")

# Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Pipeline to streamline the process
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, idf, indexer, lr])

# Split the data into training and test sets
train_data, test_data = df_preprocessed.randomSplit([0.8, 0.2], seed=42)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

# Show some of the predictions
predictions.select("review_summary", "label", "prediction", "probability").show(5)

# Stop the timer and calculate the elapsed time
end_time = time.time()
elapsed_time = end_time - start_time
print("Time taken for sentiment analysis: {:.2f} seconds".format(elapsed_time))


Py4JJavaError: An error occurred while calling o244.fit.
: org.apache.spark.SparkException: Input column ratingboolpos does not exist.
	at org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema(StringIndexer.scala:123)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema$(StringIndexer.scala:115)
	at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:145)
	at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:252)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:237)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
