# Sentiment Analysis of Customer Feedback - PySpark Starter Code
This notebook demonstrates how to process, train, and evaluate a machine learning model for sentiment analysis of customer feedback using PySpark.

In [1]:
import sys
IN_COLAB = 'google.colab' in sys.modules
print("Running in Colab:", IN_COLAB)

Running in Colab: True


In [2]:
!pip install pyspark gcsfs



In [3]:
from google.colab import auth
auth.authenticate_user()

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Sentiment Analysis") \
    .getOrCreate()

In [5]:
import gcsfs

bucket_name = "cis-415-project-jedwardr"
file_name = "sentiment_small_dataset.csv"

gcs_path = f"gs://{bucket_name}/{file_name}"
local_path = f"/content/{file_name}"

# Download file from GCS to local Colab environment
fs = gcsfs.GCSFileSystem()
with fs.open(gcs_path, 'rb') as f:
    with open(local_path, 'wb') as out_file:
        out_file.write(f.read())

# Load the local CSV file into Spark
df = spark.read.csv(local_path, header=True, inferSchema=True)
df.show(5)
df.printSchema()

+---------------+------------------+---------------+-------------+----------------+-------------------+--------------------+-------------------+-------+
|Sentiment_Score|     Feedback_Text|Feedback_Length|Response_Time|Customer_Segment|Interaction_Channel|           Survey_ID|         Time_Stamp|Country|
+---------------+------------------+---------------+-------------+----------------+-------------------+--------------------+-------------------+-------+
|              0|Very slow response|             18|        52.88|        Standard|              Phone|80167546-ff7f-46d...|2024-05-25 17:34:14|    USA|
|              2| Loved the support|             17|        15.88|           Basic|              Phone|7f1b3d6a-8e0d-4b7...|2024-05-30 22:08:48|Germany|
|              2|Very slow response|             18|        34.02|        Standard|              Phone|be3387a4-1f62-450...|2024-03-13 03:31:09|    USA|
|              1|        Quick help|             10|        29.41|           Basic

In [7]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, CountVectorizer, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

In [8]:
# Exploratory Data Analysis (EDA)
df.describe().show()
# Check for missing values
df.select([col(c).isNull().alias(c) for c in df.columns]).show()

+-------+------------------+------------------+------------------+------------------+----------------+-------------------+--------------------+-------+
|summary|   Sentiment_Score|     Feedback_Text|   Feedback_Length|     Response_Time|Customer_Segment|Interaction_Channel|           Survey_ID|Country|
+-------+------------------+------------------+------------------+------------------+----------------+-------------------+--------------------+-------+
|  count|             50000|             50000|             50000|             49495|           50000|              50000|               50000|  50000|
|   mean|           1.30844|              NULL|         119.20576|30.543744014546732|            NULL|               NULL|                NULL|   NULL|
| stddev|0.7797672262531541|              NULL|1004.0503902725961|17.076529732445334|            NULL|               NULL|                NULL|   NULL|
|    min|                 0|  Agent was polite|                10|               1.0|   

In [9]:
# Data Preprocessing
df = df.na.drop()  # Drop rows with null values
df = df.withColumn('Sentiment_Score', col('Sentiment_Score').cast('int'))  # Ensure target is int
# Tokenizing the 'Feedback_Text' column
tokenizer = Tokenizer(inputCol='Feedback_Text', outputCol='words')
# Vectorizing the words column
vectorizer = CountVectorizer(inputCol='words', outputCol='features')
# Indexing the target variable 'Sentiment_Score'
indexer = StringIndexer(inputCol='Sentiment_Score', outputCol='label')

In [10]:
# Train/Test Split
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
train_df.show(5)

+---------------+----------------+---------------+-------------+----------------+-------------------+--------------------+-------------------+-------+
|Sentiment_Score|   Feedback_Text|Feedback_Length|Response_Time|Customer_Segment|Interaction_Channel|           Survey_ID|         Time_Stamp|Country|
+---------------+----------------+---------------+-------------+----------------+-------------------+--------------------+-------------------+-------+
|              0|Agent was polite|             16|         1.09|        Standard|               Chat|10a3a255-8afc-4c9...|2024-04-15 10:09:10|  India|
|              0|Agent was polite|             16|          1.2|           Basic|              Phone|29c1ef1d-5c45-426...|2024-02-27 06:47:14|    USA|
|              0|Agent was polite|             16|         1.24|           Basic|              Email|637e6a3e-85c0-463...|2024-04-10 00:43:09| Canada|
|              0|Agent was polite|             16|         1.24|           Basic|             

In [11]:
# Train Logistic Regression Model
lr = LogisticRegression(maxIter=10, regParam=0.01)
lr_pipeline = Pipeline(stages=[tokenizer, vectorizer, indexer, lr])

lr_model = lr_pipeline.fit(train_df)
lr_predictions = lr_model.transform(test_df)

lr_predictions.select('Feedback_Text', 'Sentiment_Score', 'prediction').show(5)

+----------------+---------------+----------+
|   Feedback_Text|Sentiment_Score|prediction|
+----------------+---------------+----------+
|Agent was polite|              0|       0.0|
|Agent was polite|              0|       0.0|
|Agent was polite|              0|       0.0|
|Agent was polite|              0|       0.0|
|Agent was polite|              0|       0.0|
+----------------+---------------+----------+
only showing top 5 rows



In [12]:
# Train Naive Bayes Model
nb = NaiveBayes(modelType='multinomial', labelCol='label', featuresCol='features')
nb_pipeline = Pipeline(stages=[tokenizer, vectorizer, indexer, nb])

nb_model = nb_pipeline.fit(train_df)
nb_predictions = nb_model.transform(test_df)

In [13]:
# Model Evaluation (Accuracy + F1)

# Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
lr_accuracy = evaluator.evaluate(lr_predictions)
nb_accuracy = evaluator.evaluate(nb_predictions)

print(f"Logistic Regression Accuracy: {lr_accuracy:.4f}")
print(f"Naive Bayes Accuracy: {nb_accuracy:.4f}")

# F1 Score
f1_eval = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')
lr_f1 = f1_eval.evaluate(lr_predictions)
nb_f1 = f1_eval.evaluate(nb_predictions)

print(f"Logistic Regression F1 Score: {lr_f1:.4f}")
print(f"Naive Bayes F1 Score: {nb_f1:.4f}")

Logistic Regression Accuracy: 0.5113
Naive Bayes Accuracy: 0.5113
Logistic Regression F1 Score: 0.3460
Naive Bayes F1 Score: 0.3460


# Next Steps
Once the model is validated with the small dataset, you can scale this pipeline to the big dataset using Dataproc.