#### Import Warnings

In [2]:
import warnings
warnings.filterwarnings("ignore")

### Create Spark Session

In [3]:
from pyspark.sql import SparkSession

In [4]:
# create new SparkSession
spark = SparkSession.builder\
            .appName("model1")\
            .config("spark.driver.memory", "4g") \
            .config("spark.executor.memory", "4g") \
            .getOrCreate()

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


### Load Data From HDFS/Local 

In [5]:
# create streaming dataframe -- source = files (format=json)
# dirPath = "file:///home/yogesh/Sunbeam_DBDA/0.Project/001.Big_Data/movie_data/"
dirPath="hdfs://localhost:9000/user/yogesh/sentiment_analysis/movies_data/movie_data"

df = spark.read\
            .format("json")\
            .option("nullValue", "")\
            .option("multiline","true")\
            .option("path", dirPath)\
            .load()

SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
                                                                                

In [6]:
#spark.stop()

In [5]:
df.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- movie_rating: string (nullable = true)
 |-- review_detailed_main: string (nullable = true)
 |-- review_rating: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_username: string (nullable = true)



In [6]:
df.show(truncate=False,n=2)

+---------+-------------------------------------------------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
print(df.count())



140596


                                                                                

In [None]:
#df.show(truncate=False, n=5)

+---------+-------------------------------------------------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
df.count()

140596

In [9]:
df.columns

['movie_id',
 'movie_name',
 'movie_rating',
 'review_detailed_main',
 'review_rating',
 'review_title',
 'review_username']

In [10]:
len(df.columns)

7

In [11]:
from pyspark.sql.functions import col, sum,when

#### Adding NA for ""

In [12]:
df = df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in df.columns])

#### Check for NA

In [13]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()



+--------+----------+------------+--------------------+-------------+------------+---------------+
|movie_id|movie_name|movie_rating|review_detailed_main|review_rating|review_title|review_username|
+--------+----------+------------+--------------------+-------------+------------+---------------+
|       0|         0|           0|               36764|        10008|           0|              0|
+--------+----------+------------+--------------------+-------------+------------+---------------+



                                                                                

In [14]:
df = df.na.drop(subset=["review_rating"])

In [15]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()



+--------+----------+------------+--------------------+-------------+------------+---------------+
|movie_id|movie_name|movie_rating|review_detailed_main|review_rating|review_title|review_username|
+--------+----------+------------+--------------------+-------------+------------+---------------+
|       0|         0|           0|               26756|            0|           0|              0|
+--------+----------+------------+--------------------+-------------+------------+---------------+



                                                                                

In [16]:
df = df.na.drop(subset=["review_title"])

In [17]:
df.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- movie_rating: string (nullable = true)
 |-- review_detailed_main: string (nullable = true)
 |-- review_rating: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_username: string (nullable = true)



In [18]:
from pyspark.sql.functions import count

#### Check for Duplicates

In [19]:
df.groupBy(df.columns).count().filter(col("count") > 1).show()



+---------+--------------------+------------+--------------------+-------------+--------------------+-------------------+-----+
| movie_id|          movie_name|movie_rating|review_detailed_main|review_rating|        review_title|    review_username|count|
+---------+--------------------+------------+--------------------+-------------+--------------------+-------------------+-----+
|tt0369226|   Alone in the Dark|         2.4|I'm speechless, p...|            1|I am scarred...fo...|              Asp87|    2|
|tt0369226|   Alone in the Dark|         2.4|                NULL|            3|Such potential, s...|        Fishman1966|    2|
|tt0327554|            Catwoman|         3.4|                NULL|            4|No logic to speak...|          edblask04|    2|
|tt1098327|Dragonball Evolution|         2.5|It's truly amazin...|            5|A true Hollywood ...|        supernick-3|    2|
|tt0466342|          Date Movie|         2.8|                NULL|            1|Absolutely horren...|   

                                                                                

#### Removing Duplicates

In [20]:
df = df.dropDuplicates()

In [21]:
df.groupBy(df.columns).count().filter(col("count") > 1).show()

                                                                                

+--------+----------+------------+--------------------+-------------+------------+---------------+-----+
|movie_id|movie_name|movie_rating|review_detailed_main|review_rating|review_title|review_username|count|
+--------+----------+------------+--------------------+-------------+------------+---------------+-----+
+--------+----------+------------+--------------------+-------------+------------+---------------+-----+



In [22]:
df.describe(["review_rating","movie_rating"]).show()

[Stage 26:>                                                         (0 + 8) / 9]

+-------+------------------+------------------+
|summary|     review_rating|      movie_rating|
+-------+------------------+------------------+
|  count|            114495|            114495|
|   mean| 5.632813659985152|5.5527944451724345|
| stddev|3.4682241791436024|2.1589945317927253|
|    min|                 1|               1.2|
|    max|                 9|               9.3|
+-------+------------------+------------------+



                                                                                

In [23]:
from pyspark.sql.functions import concat_ws

df = df.withColumn("Review", concat_ws(" : ", df["review_title"], df["review_detailed_main"]))


In [None]:
#df.show(truncate=False)



+---------+-----------------------+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [24]:
df.select("Review").limit(1).show(truncate=False)



+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

#### Adding NA for ""

In [25]:
df = df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in df.columns])

#### Check for NA 

In [26]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()



+--------+----------+------------+--------------------+-------------+------------+---------------+------+
|movie_id|movie_name|movie_rating|review_detailed_main|review_rating|review_title|review_username|Review|
+--------+----------+------------+--------------------+-------------+------------+---------------+------+
|       0|         0|           0|               23562|            0|           0|              0|     0|
+--------+----------+------------+--------------------+-------------+------------+---------------+------+



                                                                                

#### Data Labelling

In [27]:
df = df.withColumn(
    "sentiment",
    when(col("review_rating") >= 8, "Positive")
    .when(col("review_rating") >= 5, "Neutral")
    .when(col("review_rating") < 5, "Negative")
    .otherwise(None)
)

In [29]:
#df.show()

In [30]:
df = df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in df.columns])

In [31]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()



+--------+----------+------------+--------------------+-------------+------------+---------------+------+---------+
|movie_id|movie_name|movie_rating|review_detailed_main|review_rating|review_title|review_username|Review|sentiment|
+--------+----------+------------+--------------------+-------------+------------+---------------+------+---------+
|       0|         0|           0|               23562|            0|           0|              0|     0|        0|
+--------+----------+------------+--------------------+-------------+------------+---------------+------+---------+



                                                                                

In [32]:
#df.show()

#### Sentiment Wise Count

In [33]:
df.groupBy("sentiment").count().show()



+---------+-----+
|sentiment|count|
+---------+-----+
|  Neutral|25714|
| Positive|43100|
| Negative|45681|
+---------+-----+



                                                                                

In [34]:
df.columns

['movie_id',
 'movie_name',
 'movie_rating',
 'review_detailed_main',
 'review_rating',
 'review_title',
 'review_username',
 'Review',
 'sentiment']

In [35]:
df = df.select("Review", "sentiment")

In [36]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()



+------+---------+
|Review|sentiment|
+------+---------+
|     0|        0|
+------+---------+



                                                                                

In [37]:
df.show()



+--------------------+---------+
|              Review|sentiment|
+--------------------+---------+
|This ol' movie re...|  Neutral|
|This movie is ama...| Positive|
|25 years of hesit...|  Neutral|
|My Fav-or-ite mov...| Positive|
|Lightning in a Bo...| Positive|
|Wow!!!! : I saw t...| Negative|
|We've All Got Iss...|  Neutral|
|SILVER LININGS PL...| Positive|
|One of the best R...| Positive|
|Pretty okay Drama...|  Neutral|
|A stark portrayal...|  Neutral|
|Proof That Rating...| Negative|
|Excellent! My Fal...| Positive|
|Most wonderful 2n...| Positive|
|well done : One o...| Positive|
|Great, Great Movi...| Positive|
|Not bad entertain...|  Neutral|
|Could Have Been M...|  Neutral|
|Horrible : After ...| Negative|
|And you thought R...| Negative|
+--------------------+---------+
only showing top 20 rows



                                                                                

In [38]:
df.groupBy("sentiment").count().show()



+---------+-----+
|sentiment|count|
+---------+-----+
|  Neutral|25714|
| Positive|43100|
| Negative|45681|
+---------+-----+



                                                                                

In [39]:
df.show()



+--------------------+---------+
|              Review|sentiment|
+--------------------+---------+
|This ol' movie re...|  Neutral|
|This movie is ama...| Positive|
|25 years of hesit...|  Neutral|
|My Fav-or-ite mov...| Positive|
|Lightning in a Bo...| Positive|
|Wow!!!! : I saw t...| Negative|
|We've All Got Iss...|  Neutral|
|SILVER LININGS PL...| Positive|
|One of the best R...| Positive|
|Pretty okay Drama...|  Neutral|
|A stark portrayal...|  Neutral|
|Proof That Rating...| Negative|
|Excellent! My Fal...| Positive|
|Most wonderful 2n...| Positive|
|well done : One o...| Positive|
|Great, Great Movi...| Positive|
|Not bad entertain...|  Neutral|
|Could Have Been M...|  Neutral|
|Horrible : After ...| Negative|
|And you thought R...| Negative|
+--------------------+---------+
only showing top 20 rows



                                                                                

### Import Libraries

In [11]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import lower, regexp_replace,udf, explode
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

In [41]:
df = df.withColumn("Review", lower(col("Review")))

In [42]:
df = df.withColumn("Review", regexp_replace(col("Review"), "[^a-zA-Z\s]", ""))

In [43]:
df.show(truncate=False,n=10)



+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

### Tokenization

In [44]:
tokenizer = Tokenizer(inputCol="Review", outputCol="words")
df_tokens = tokenizer.transform(df)

### Removing Stopwords

In [45]:
stopwords_list = stopwords.words("english")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords = stopwords_list)
df_filtered = remover.transform(df_tokens)

In [None]:
#df_filtered.show()



+--------------------+---------+--------------------+--------------------+
|              Review|sentiment|               words|      filtered_words|
+--------------------+---------+--------------------+--------------------+
|this ol movie rea...|  Neutral|[this, ol, movie,...|[ol, movie, reall...|
|this movie is ama...| Positive|[this, movie, is,...|[movie, amazingtr...|
| years of hesitat...|  Neutral|[, years, of, hes...|[, years, hesitat...|
|my favorite movie...| Positive|[my, favorite, mo...|[favorite, movie,...|
|lightning in a bo...| Positive|[lightning, in, a...|[lightning, bottl...|
|wow  i saw this i...| Negative|[wow, , i, saw, t...|[wow, , saw, thea...|
|weve all got issu...|  Neutral|[weve, all, got, ...|[weve, got, issue...|
|silver linings pl...| Positive|[silver, linings,...|[silver, linings,...|
|one of the best r...| Positive|[one, of, the, be...|[one, best, roman...|
|pretty okay drama...|  Neutral|[pretty, okay, dr...|[pretty, okay, dr...|
|a stark portrayal...|  N

                                                                                

### Lemmatization

In [46]:
from pyspark.sql.types import ArrayType, StringType
lemmatizer = WordNetLemmatizer()
lemmatize_udf = udf(lambda words: [lemmatizer.lemmatize(word) for word in words], ArrayType(StringType()))

In [None]:
#df_filtered.show()

+--------------------+---------+--------------------+--------------------+
|              Review|sentiment|               words|      filtered_words|
+--------------------+---------+--------------------+--------------------+
|this ol movie rea...|  Neutral|[this, ol, movie,...|[ol, movie, reall...|
|this movie is ama...| Positive|[this, movie, is,...|[movie, amazingtr...|
| years of hesitat...|  Neutral|[, years, of, hes...|[, years, hesitat...|
|my favorite movie...| Positive|[my, favorite, mo...|[favorite, movie,...|
|lightning in a bo...| Positive|[lightning, in, a...|[lightning, bottl...|
|wow  i saw this i...| Negative|[wow, , i, saw, t...|[wow, , saw, thea...|
|weve all got issu...|  Neutral|[weve, all, got, ...|[weve, got, issue...|
|silver linings pl...| Positive|[silver, linings,...|[silver, linings,...|
|one of the best r...| Positive|[one, of, the, be...|[one, best, roman...|
|pretty okay drama...|  Neutral|[pretty, okay, dr...|[pretty, okay, dr...|
|a stark portrayal...|  N

In [47]:
df_filtered.printSchema()

root
 |-- Review: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [48]:
df_lemmatized = df_filtered.withColumn("filtered_words_lem", lemmatize_udf(col("filtered_words")))

In [None]:
#df_lemmatized.show()

[Stage 89:>                                                         (0 + 1) / 1]

+--------------------+---------+--------------------+--------------------+--------------------+
|              Review|sentiment|               words|      filtered_words|  filtered_words_lem|
+--------------------+---------+--------------------+--------------------+--------------------+
|this ol movie rea...|  Neutral|[this, ol, movie,...|[ol, movie, reall...|[ol, movie, reall...|
|this movie is ama...| Positive|[this, movie, is,...|[movie, amazingtr...|[movie, amazingtr...|
| years of hesitat...|  Neutral|[, years, of, hes...|[, years, hesitat...|[, year, hesitati...|
|my favorite movie...| Positive|[my, favorite, mo...|[favorite, movie,...|[favorite, movie,...|
|lightning in a bo...| Positive|[lightning, in, a...|[lightning, bottl...|[lightning, bottl...|
|wow  i saw this i...| Negative|[wow, , i, saw, t...|[wow, , saw, thea...|[wow, , saw, thea...|
|weve all got issu...|  Neutral|[weve, all, got, ...|[weve, got, issue...|[weve, got, issue...|
|silver linings pl...| Positive|[silver,

                                                                                

In [49]:
df_lemmatized.printSchema()

root
 |-- Review: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words_lem: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [50]:
df_lemmatized.write.mode("overwrite").parquet("df_lemmatized")

                                                                                

In [7]:
df_lemmatized = spark.read.parquet("hdfs://localhost:9000/user/yogesh/sentiment_analysis/df_lemmatized")

SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
                                                                                

In [8]:
df_lemmatized.show(n=5)

                                                                                

+--------------------+---------+--------------------+--------------------+--------------------+
|              Review|sentiment|               words|      filtered_words|  filtered_words_lem|
+--------------------+---------+--------------------+--------------------+--------------------+
|this ol movie rea...|  Neutral|[this, ol, movie,...|[ol, movie, reall...|[ol, movie, reall...|
|this movie is ama...| Positive|[this, movie, is,...|[movie, amazingtr...|[movie, amazingtr...|
| years of hesitat...|  Neutral|[, years, of, hes...|[, years, hesitat...|[, year, hesitati...|
|my favorite movie...| Positive|[my, favorite, mo...|[favorite, movie,...|[favorite, movie,...|
|lightning in a bo...| Positive|[lightning, in, a...|[lightning, bottl...|[lightning, bottl...|
+--------------------+---------+--------------------+--------------------+--------------------+
only showing top 5 rows



### Create Vocabulary

In [54]:
#vocabulary = df_lemmatized.select(explode(col("filtered_words_lem"))).distinct().rdd.flatMap(lambda x: x).collect()

In [None]:
#len(vocabulary)

155281

#### Save Vocabulary

In [None]:
# import pickle

# with open("vocabulary.pkl", "wb") as f:
#     pickle.dump(vocabulary, f)

#### Load Vocabulary

In [10]:
import pickle

with open("hdfs://localhost:9000/user/yogesh/sentiment_analysis/vocabulary.pkl", "rb") as f:
    vocabulary = pickle.load(f)


In [11]:
len(vocabulary)

155281

### Vectorizer

In [61]:
#vectorizer = CountVectorizer.from_voca(inputCol="words", outputCol="rawFeatures")

from pyspark.ml.feature import CountVectorizerModel

# Assuming 'vocabulary' is a Python list containing unique words
cv_model = CountVectorizerModel.from_vocabulary(vocabulary, inputCol="words", outputCol="rawFeatures")


#### Save Count Vectorizer

In [12]:
from pyspark.ml.feature import CountVectorizerModel
cv_model = CountVectorizerModel.load("hdfs://localhost:9000/user/yogesh/sentiment_analysis/count_vectorizer2")

#### Apply Count Vectorizer

In [13]:
#cv_model = vectorizer.fit(df_filtered)
df_tf = cv_model.transform(df_lemmatized)

#### Save Count Vectorizer

In [52]:
cv_model.write().overwrite().save("hdfs://localhost:9000/user/yogesh/sentiment_analysis/count_vectorizer2")

                                                                                

### TF-IDF for Textual to Numerical Conversion

In [16]:
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [17]:
idf_model = idf.fit(df_tf)
df_tfidf = idf_model.transform(df_tf)

                                                                                

In [61]:
df_tfidf.show()

[Stage 103:>                                                        (0 + 1) / 1]

+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|              Review|sentiment|               words|      filtered_words|  filtered_words_lem|         rawFeatures|            features|
+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|this ol movie rea...|  Neutral|[this, ol, movie,...|[ol, movie, reall...|[ol, movie, reall...|(155281,[0,255,25...|(155281,[0,255,25...|
|this movie is ama...| Positive|[this, movie, is,...|[movie, amazingtr...|[movie, amazingtr...|(155281,[258,1010...|(155281,[258,1010...|
| years of hesitat...|  Neutral|[, years, of, hes...|[, years, hesitat...|[, year, hesitati...|(155281,[510,1239...|(155281,[510,1239...|
|my favorite movie...| Positive|[my, favorite, mo...|[favorite, movie,...|[favorite, movie,...|(155281,[1,510,12...|(155281,[1,510,12...|
|lightning in a bo...| Positive|[l

                                                                                

#### Save TF-IDF Model

In [56]:
idf_model.write().overwrite().save("hdfs://localhost:9000/user/yogesh/sentiment_analysis/idf_model")

####  Load TF-IDF

In [58]:
from pyspark.ml.feature import IDFModel
idf_model = IDFModel.load("hdfs://localhost:9000/user/yogesh/sentiment_analysis/idf_model")

#### Indexing or Encoding using StringIndexer

In [18]:
indexer = StringIndexer(inputCol="sentiment", outputCol="label")
df_final = indexer.fit(df_tfidf).transform(df_tfidf)
indexer_model = indexer.fit(df_tfidf)

                                                                                

#### Checking Labels for Indexer

In [19]:
indexer_model.labels

['Negative', 'Positive', 'Neutral']

#### Save LabelEncoder/StringIndexer

In [59]:
indexer_model.write().overwrite().save("hdfs://localhost:9000/user/yogesh/sentiment_analysis/indexer_main")

In [64]:
df_final.show()

[Stage 118:>                                                        (0 + 1) / 1]

+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|              Review|sentiment|               words|      filtered_words|  filtered_words_lem|         rawFeatures|            features|label|
+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|this ol movie rea...|  Neutral|[this, ol, movie,...|[ol, movie, reall...|[ol, movie, reall...|(155281,[0,255,25...|(155281,[0,255,25...|  2.0|
|this movie is ama...| Positive|[this, movie, is,...|[movie, amazingtr...|[movie, amazingtr...|(155281,[258,1010...|(155281,[258,1010...|  1.0|
| years of hesitat...|  Neutral|[, years, of, hes...|[, years, hesitat...|[, year, hesitati...|(155281,[510,1239...|(155281,[510,1239...|  2.0|
|my favorite movie...| Positive|[my, favorite, mo...|[favorite, movie,...|[favorite, movie,...|(155281,[1,510,12...|(155281,[1,510,12...

                                                                                

#### Save Final Dataframe

In [20]:
df_final.write.mode("overwrite").parquet("df_final")

                                                                                

#### Load Final Dataframe

In [5]:
df_final = spark.read.parquet("hdfs://localhost:9000/user/yogesh/sentiment_analysis/df_final")

SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
                                                                                

In [6]:
df_final.show(n=5)

                                                                                

+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|              Review|sentiment|               words|      filtered_words|  filtered_words_lem|         rawFeatures|            features|label|
+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|this ol movie rea...|  Neutral|[this, ol, movie,...|[ol, movie, reall...|[ol, movie, reall...|(155281,[0,255,25...|(155281,[0,255,25...|  2.0|
|this movie is ama...| Positive|[this, movie, is,...|[movie, amazingtr...|[movie, amazingtr...|(155281,[258,1010...|(155281,[258,1010...|  1.0|
| years of hesitat...|  Neutral|[, years, of, hes...|[, years, hesitat...|[, year, hesitati...|(155281,[510,1239...|(155281,[510,1239...|  2.0|
|my favorite movie...| Positive|[my, favorite, mo...|[favorite, movie,...|[favorite, movie,...|(155281,[1,510,12...|(155281,[1,510,12...

### Split DataFrame into Training and Testing

In [7]:
train_data, test_data = df_final.randomSplit([0.7, 0.3], seed=123456)

### Train Model Using Logistic Regression

In [None]:
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)



#### Save Logistic Regresion Model

In [63]:
lr_model.write().overwrite().save("hdfs://localhost:9000/user/yogesh/sentiment_analysis/logistic_reg_model")

                                                                                

#### Load Logistic Regression Model

In [8]:
from pyspark.ml.classification import LogisticRegressionModel
lr_model = LogisticRegressionModel.load("hdfs://localhost:9000/user/yogesh/sentiment_analysis/logistic_reg_model")

                                                                                

### Model Evaluation on Test Data

In [9]:
predictions = lr_model.transform(test_data)
predictions.select("Review", "sentiment", "prediction").show(10, truncate=False)

[Stage 5:>                                                          (0 + 1) / 1]

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [26]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")

In [None]:
accuracy = evaluator.evaluate(predictions.limit(1000))

[Stage 20:>                                                         (0 + 8) / 9]

In [None]:
print(f"Test Accuracy: {accuracy:.4f}")

In [12]:
sampled_predictions = predictions.sample(fraction=0.05, seed=42)  # Small sample
sampled_predictions = sampled_predictions.select("label", "prediction").dropna()

evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(sampled_predictions)

print(f"Test Accuracy: {accuracy:.4f}")


                                                                                

Test Accuracy: 0.8099
