## Sentiment Analysis with PySpark
## Using TF-IDF and Logistic Regression
 

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

## 1. Initialize Spark Session

In [23]:
spark = SparkSession.builder \
    .appName("SentimentAnalysisTFIDF") \
    .getOrCreate()

## 2. Load and Preprocess Data

In [24]:
df_raw = spark.read.csv("data/Tweets.csv", header=False)
df_raw.show(5)

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+---+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows


## Select label and tweet text

In [25]:
df = df_raw.select(col("_c0").alias("label"), col("_c5").alias("tweet"))

## Convert labels: 0 -> 0 (negative), 4 -> 1 (positive)

In [26]:
df = df.withColumn("label", when(col("label") == 4, 1).otherwise(0))

## 3. Text Processing Pipeline

In [27]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [28]:

# Tokenization
tokenizer = Tokenizer(inputCol="tweet", outputCol="words")

# Remove stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Convert text to TF features
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)

# Compute IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Logistic Regression Classifier
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Define ML pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

## 4. Train-Test Split

In [29]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

## 5. Train the Model

In [30]:
model = pipeline.fit(train_df)

## 6. Make Predictions

In [31]:
predictions = model.transform(test_df)

## 7. Evaluate the Model

In [32]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction",
    labelCol="label"
)

accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.4f}")

Test Accuracy: 0.8127


### 8. Predict on Sample Tweets

In [None]:
predictions.select("tweet", "label", "prediction").show(truncate=False)

In [42]:
spark.stop()