In [0]:
# Download modules
%pip install nltk

Python interpreter will be restarted.
Collecting nltk
  Using cached nltk-3.8.1-py3-none-any.whl (1.5 MB)
Collecting tqdm
  Using cached tqdm-4.65.0-py3-none-any.whl (77 kB)
Collecting regex>=2021.8.3
  Using cached regex-2023.3.23-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (768 kB)
Installing collected packages: tqdm, regex, nltk
Successfully installed nltk-3.8.1 regex-2023.3.23 tqdm-4.65.0
Python interpreter will be restarted.


#Import modules

In [0]:
import nltk
from pyspark.sql import SparkSession
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
from nltk.corpus import stopwords
stop = stopwords.words('english')
spark = SparkSession.builder.appName("612_Proj").config("spark.task.cpus", "2").getOrCreate()
from nltk.tokenize import word_tokenize
from pyspark.sql.functions import udf, split, col
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import CountVectorizer
from nltk.stem import WordNetLemmatizer
from pyspark.sql.functions import concat
from pyspark.sql.functions import concat_ws




[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


In [0]:
#change the csv to a dataframe for databricks to use
def csv_to_df(fname):
    
    #Location of dbfs
    dbfs_loc = "dbfs:/FileStore/tables/"
    filename_complete = dbfs_loc + fname
    filetype="csv"
    
    #Options for loading
    inf_sch = "true"
    is_header = "true"
    delim = ","
    multiline = "true"
    escape = "\""
    
    # Load the data into a dataframe using options above
    df = spark.read.format(filetype)\
                .option("header", is_header)\
                .option("inferSchema", inf_sch)\
                .option("multiline", multiline)\
                .option("escape", escape).load(filename_complete)
    
    return df

In [0]:
# Load CSVs
all_reviews = csv_to_df("IMDB_dataset_rated_final.csv")

In [0]:
#concatenate review title and review content with space inbetween
all_reviews = all_reviews.withColumn("Complete_Content", concat_ws(" ", "Review Title", "Review Content"))

In [0]:
from nltk.stem import SnowballStemmer
# Cleans the text by removing non-alphabet characters or whitespace
def clean_text(text):
    text = ''.join(c for c in text if c.isalpha() or c.isspace())
    text = text.lower()
    return text

# Function to remove stop words
def remove_stopwords(text):
    stop = set(nltk.corpus.stopwords.words('english'))
    tokens = word_tokenize(text)
    filtered_tokens = [token for token in tokens if token.lower() not in stop]
    
    return filtered_tokens

# Function to lemmatize tokens with POS
def lemmatize_text_pos(filtered_tokens):
    lemmatizer = WordNetLemmatizer()
    pos_tagged = nltk.pos_tag(filtered_tokens)
    lemmatized_tokens = [lemmatizer.lemmatize(token, get_wordnet_pos(pos)) if get_wordnet_pos(pos) else token for token, pos in pos_tagged]
    return lemmatized_tokens
  
def stemmer_snowball(filtered_tokens):
    snowball = SnowballStemmer(language = 'english')
    print(filtered_tokens)
    stemmed_words = [snowball.stem(word) for word in filtered_tokens]
    return stemmed_words

# Helper function to get the WordNet POS tag from the NLTK POS tag
def get_wordnet_pos(nltk_pos_tag):
    if nltk_pos_tag.startswith('J'):
        return 'a'
    elif nltk_pos_tag.startswith('V'):
        return 'v'
    elif nltk_pos_tag.startswith('N'):
        return 'n'
    elif nltk_pos_tag.startswith('R'):
        return 'r'
    else:
        return None

# Function that applies the cleaning, stop word removal, and lemmatization functions
def preprocess_text(text):
#     if text is None or not isinstance(text, str):
#         return []
    cleaned = clean_text(text)
    filtered = remove_stopwords(cleaned)
    lemmatized = lemmatize_text_pos(filtered)
    print("this is sam")
    return lemmatized

#Function applies cleaning, stop word removal and snowballstemming
def preprocess_text_stemming(text):
#     if text is None or not isinstance(text, str):
#         return []
    cleaned = clean_text(text)
    filtered = remove_stopwords(cleaned)
    stemmed = stemmer_snowball(filtered)
    return stemmed

#default of preprocessing text using lemmatization
preprocess_text_udf = udf(preprocess_text, ArrayType(StringType()))

#preprocessing text using snowball stemming
preprocess_text_udf_stemmed = udf(preprocess_text_stemming, ArrayType(StringType()))

# Apply the user-defined function to the "Review Content" column and create a new column "lemmatized_text"
df = all_reviews.withColumn("lemmatized_text", preprocess_text_udf("Complete_Content"))
df = df.withColumn("stemmed_text", preprocess_text_udf_stemmed("Complete_Content"))

In [0]:
# Converts the lemmatized text to a vector and fits it to and IDF function
from pyspark.ml.feature import IDF, CountVectorizer,VectorAssembler
# Create a CountVectorizer object for the lemmatized words
cv = CountVectorizer(inputCol="lemmatized_text", outputCol="lemmatized_text_vector_idf")

# Fit the CountVectorizer model on the dataframe
cv_model = cv.fit(df)

# Transform the dataframe to add the count vectorized features column
count_vectorized_df = cv_model.transform(df)

# Define an IDF function to compute the inverse document frequency on the lemmatized text
idf = IDF(inputCol="lemmatized_text_vector_idf", outputCol="TFIDF_features_uni-gram_lemm")

# Fit the IDF function to the dataframe to compute the IDF values
idf_model = idf.fit(count_vectorized_df)

# Apply the IDF model to the dataframe to create a new column with the normalized features
df = idf_model.transform(count_vectorized_df)


In [0]:
# Converts the Stemmed text to a vector and fits it to and IDF function

# Create a CountVectorizer object for the lemmatized words
cv = CountVectorizer(inputCol="stemmed_text", outputCol="stemmed_text_vector_idf")

# Fit the CountVectorizer model on the dataframe
cv_model = cv.fit(df)

# Transform the dataframe to add the count vectorized features column
count_vectorized_df = cv_model.transform(df)

# define an IDF function to compute the inverse document frequency
idf = IDF(inputCol="stemmed_text_vector_idf", outputCol="TFIDF_features_uni-gram_stemmed")

# fit the IDF function to the dataframe to compute the IDF values
idf_model = idf.fit(count_vectorized_df)

# apply the IDF model to the dataframe to create a new column with the normalized features
df = idf_model.transform(count_vectorized_df)

In [0]:
# create a new column "Month" by splitting the "Review Date" column on dash and accessing the second element
df = df.withColumn("Month", split(col("Review Date"), "-")[1])

In [0]:
#seperate genre into new column
df = df.withColumn("genre_lemmatized", preprocess_text_udf("Movie Genre"))
df = df.withColumn("genre_stemmed", preprocess_text_udf_stemmed("Movie Genre"))

In [0]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

#bi-gram functions
df = df.withColumn('new_lemmatized_text', concat_ws(' ', 'lemmatized_text'))
tokenizer = Tokenizer(inputCol="new_lemmatized_text", outputCol="words_2")

n = 2
df_words = tokenizer.transform(df)

def get_bigrams(words):
    ngrams = []
    for i in range(len(words) - n + 1):
        ngrams.append(' '.join(words[i:i+n]))
    return ngrams


get_ngrams_udf = udf(get_bigrams, ArrayType(StringType()))


df = df_words.withColumn("bi_grams_lemm", get_ngrams_udf("words_2"))

In [0]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

#same as above
df = df.withColumn('new_stemmed_text', concat_ws(' ', 'stemmed_text'))
tokenizer = Tokenizer(inputCol="new_stemmed_text", outputCol="words_2s")

n = 2
df_words = tokenizer.transform(df)

def get_bigrams(words):
    ngrams = []
    for i in range(len(words) - n + 1):
        ngrams.append(' '.join(words[i:i+n]))
    return ngrams


get_ngrams_udf = udf(get_bigrams, ArrayType(StringType()))


df = df_words.withColumn("bi_grams_stem", get_ngrams_udf("words_2s"))

####Generating vector of bi-grams

In [0]:
#CV fOR bigram for lemmatized text
# Create a CountVectorizer object for the lemmatized words
cv = CountVectorizer(inputCol="bi_grams_lemm", outputCol="bi_grams_lemmatized_text_vector")

# Fit the CountVectorizer model on the dataframe
cv_model = cv.fit(df)

# Transform the dataframe to add the count vectorized features column
count_vectorized_df = cv_model.transform(df)


In [0]:
#CV fOR bigram for stemmed text
# Create a CountVectorizer object for the stemmed words
cv = CountVectorizer(inputCol="bi_grams_stem", outputCol="bi_grams_stemmed_text_vector")

# Fit the CountVectorizer model on the dataframe
cv_model = cv.fit(count_vectorized_df)

# Transform the dataframe to add the count vectorized features column
count_vectorized_df = cv_model.transform(count_vectorized_df)

####Generating vector of reviews (bag of words)

In [0]:
#CV fOR BoW for lemmatized text
# Create a CountVectorizer object for the lemmatized words
cv = CountVectorizer(inputCol="lemmatized_text", outputCol="lemmatized_text_vector")

# Fit the CountVectorizer model on the dataframe
cv_model = cv.fit(count_vectorized_df)

# Transform the dataframe to add the count vectorized features column
count_vectorized_df = cv_model.transform(count_vectorized_df)

In [0]:
#CV fOR BoW for stemmed text
from pyspark.ml.feature import IDF

# Create a CountVectorizer object for the stemmed words
cv = CountVectorizer(inputCol="stemmed_text", outputCol="stemmed_text_vector")

# Fit the CountVectorizer model on the dataframe
cv_model = cv.fit(count_vectorized_df)

# Transform the dataframe to add the count vectorized features column
count_vectorized_df = cv_model.transform(count_vectorized_df)

####Generating Vectors of Genres

In [0]:
# Create a CountVectorizer object for the genre
cv = CountVectorizer(inputCol="genre_lemmatized", outputCol="genre_lemmatized_vector")

# Fit the CountVectorizer model on the dataframe
cv_model = cv.fit(count_vectorized_df)

# Transform the dataframe to add the count vectorized features column
count_vectorized_df = cv_model.transform(count_vectorized_df)

In [0]:
# Create a CountVectorizer object for the genre using stemming
cv = CountVectorizer(inputCol="genre_stemmed", outputCol="genre_stemmed_vector")

# Fit the CountVectorizer model on the dataframe
cv_model = cv.fit(count_vectorized_df)

# Transform the dataframe to add the count vectorized features column
count_vectorized_df = cv_model.transform(count_vectorized_df)

In [0]:
count_vectorized_df = count_vectorized_df.withColumn("Month_Vector", split(count_vectorized_df["Month"], ","))

####Generating vectors for month

In [0]:
# Create a CountVectorizer object for the month
cv = CountVectorizer(inputCol="Month_Vector", outputCol="Month_vectorized")

# Fit the CountVectorizer model on the dataframe
cv_model = cv.fit(count_vectorized_df)

# Transform the dataframe to add the count vectorized features column
count_vectorized_df = cv_model.transform(count_vectorized_df)

####Combining vectors of bag of words, bi-gram, TF-IDF, month and genre

In [0]:
#combine all vectors into one vector for ML
from pyspark.ml.feature import VectorAssembler

# Assume you have a Spark DataFrame `df` with two vector columns `vector1` and `vector2`.

# Create a VectorAssembler object
assembler = VectorAssembler(inputCols=["bi_grams_lemmatized_text_vector", "Month_vectorized", "genre_lemmatized_vector", "lemmatized_text_vector", "TFIDF_features_uni-gram_lemm" ], outputCol="combined_vectors_lem")
assembler1 = VectorAssembler(inputCols=["bi_grams_stemmed_text_vector", "Month_vectorized", "genre_lemmatized_vector", "stemmed_text_vector", "TFIDF_features_uni-gram_stemmed"], outputCol="combined_vectors_stem")

# Apply the assembler to the DataFrame
assembled_df = assembler.transform(count_vectorized_df)
assembled_df = assembler1.transform(assembled_df)


In [0]:
df_test = assembled_df.select(assembled_df["combined_vectors_lem"],assembled_df["combined_vectors_stem"],assembled_df["Manual_Combined"])

In [0]:
from pyspark.sql.functions import col

def increment_column(df, column_name):
    # select the column and add 1 to each value
    incremented_col = col(column_name) + 1    
    # create a new dataframe with the incremented column
    new_df = df.withColumn(column_name, incremented_col)
    
    return new_df

In [0]:
df_test2 = increment_column(df_test, "Manual_Combined")


In [0]:
(training, testing) = df_test2.randomSplit([0.7, 0.3])

#Models

####Naive Bayes

#####using lemmatized text

In [0]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#updated NB
# Create initial Naïve Bayes model
nb = NaiveBayes(labelCol="Manual_Combined", featuresCol="combined_vectors_lem", modelType="multinomial")

# Evaluate model
nbevaluator = MulticlassClassificationEvaluator(labelCol="Manual_Combined", metricName="weightedFMeasure")

params_nb = [0.0001,  0.01, 1.0, 5.0]

f1_scores_nb =[]
recall_scores_nb = []
precision_scores_nb = []


for i in params_nb:
        # Create ParamGrid for Cross Validation
    nbparamGrid = (ParamGridBuilder()
                   .addGrid(nb.smoothing, [i])
                   .build())


    # Create 5-fold CrossValidator
    nbcv = CrossValidator(estimator = nb,
                        estimatorParamMaps = nbparamGrid,
                        evaluator = nbevaluator,
                        numFolds = 5)
    nbcvModel = nbcv.fit(training)

    prediction = nbcvModel.transform(testing).cache()


    evaluator_f1_nb = MulticlassClassificationEvaluator(labelCol="Manual_Combined", predictionCol="prediction", metricName="weightedFMeasure")
    f1_score = evaluator_f1_nb.evaluate(prediction)
    f1_scores_nb.append(f1_score)

    evaluator_recall_nb = MulticlassClassificationEvaluator(labelCol="Manual_Combined", predictionCol="prediction", metricName="weightedRecall")
    recall_score = evaluator_recall_nb.evaluate(prediction, {evaluator_recall_nb.metricName: "weightedRecall"})
    recall_scores_nb.append(recall_score)

    evaluator_precision_nb = MulticlassClassificationEvaluator(labelCol="Manual_Combined", predictionCol="prediction", metricName="weightedPrecision")
    precision_score = evaluator_precision_nb.evaluate(prediction, {evaluator_precision_nb.metricName: "weightedPrecision"})
    precision_scores_nb.append(precision_score)



In [0]:
#Printing NB to dataframe
nb_total_result = []
for i in range(len(params_nb)):
    templist = []
    templist.append(params_nb[i])
    templist.append(f1_scores_nb[i])
    templist.append(recall_scores_nb[i])
    templist.append(precision_scores_nb[i])
    nb_total_result.append(templist)

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("Smoothing", StringType(), True),
    StructField("f1_score", StringType(), True),
    StructField("recall_score", StringType(), True),
    StructField("precision_score", StringType(), True)
])

df_nb_results = spark.createDataFrame(data=nb_total_result, schema=schema)
df_nb_results.display()

Smoothing,f1_score,recall_score,precision_score
0.0001,0.5827081755902929,0.6181102362204725,0.5837107318106975
0.01,0.6295205843901643,0.6377952755905512,0.6241296419098143
1.0,0.6231347386466997,0.6653543307086615,0.6233490809931812
5.0,0.4403412153340887,0.5393700787401574,0.5531496062992126


#####using snowball stemmer

In [0]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#updated NB
# Create initial Naïve Bayes model
nb = NaiveBayes(labelCol="Manual_Combined", featuresCol="combined_vectors_stem", modelType="multinomial")

# Evaluate model
nbevaluator = MulticlassClassificationEvaluator(labelCol="Manual_Combined", metricName="weightedFMeasure")

params_nb = [0.0001,  0.01, 1.0, 5.0]

f1_scores_nb =[]
recall_scores_nb = []
precision_scores_nb = []


for i in params_nb:
        # Create ParamGrid for Cross Validation
    nbparamGrid = (ParamGridBuilder()
                   .addGrid(nb.smoothing, [i])
                   .build())


    # Create 5-fold CrossValidator
    nbcv = CrossValidator(estimator = nb,
                        estimatorParamMaps = nbparamGrid,
                        evaluator = nbevaluator,
                        numFolds = 5)
    nbcvModel = nbcv.fit(training)

    prediction = nbcvModel.transform(testing).cache()


    evaluator_f1_nb = MulticlassClassificationEvaluator(labelCol="Manual_Combined", predictionCol="prediction", metricName="weightedFMeasure")
    f1_score = evaluator_f1_nb.evaluate(prediction)
    f1_scores_nb.append(f1_score)

    evaluator_recall_nb = MulticlassClassificationEvaluator(labelCol="Manual_Combined", predictionCol="prediction", metricName="weightedRecall")
    recall_score = evaluator_recall_nb.evaluate(prediction, {evaluator_recall_nb.metricName: "weightedRecall"})
    recall_scores_nb.append(recall_score)

    evaluator_precision_nb = MulticlassClassificationEvaluator(labelCol="Manual_Combined", predictionCol="prediction", metricName="weightedPrecision")
    precision_score = evaluator_precision_nb.evaluate(prediction, {evaluator_precision_nb.metricName: "weightedPrecision"})
    precision_scores_nb.append(precision_score)

In [0]:
#Printing NB to dataframe
nb_total_result = []
for i in range(len(params_nb)):
    templist = []
    templist.append(params_nb[i])
    templist.append(f1_scores_nb[i])
    templist.append(recall_scores_nb[i])
    templist.append(precision_scores_nb[i])
    nb_total_result.append(templist)

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("Smoothing", StringType(), True),
    StructField("f1_score", StringType(), True),
    StructField("recall_score", StringType(), True),
    StructField("precision_score", StringType(), True)
])

df_nb_results = spark.createDataFrame(data=nb_total_result, schema=schema)
df_nb_results.display()

Smoothing,f1_score,recall_score,precision_score
0.0001,0.6132175172386799,0.6377952755905512,0.6206866282182955
0.01,0.6211514865044401,0.6220472440944882,0.620434442224475
1.0,0.6054139156727427,0.6496062992125984,0.6075974016247335
5.0,0.4465575435562189,0.5433070866141732,0.5679133858267716
