In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, when
import nltk
from nltk.corpus import stopwords

In [2]:
# Configure NLTK
nltk.download('stopwords')
stop_words = stopwords.words('english')

[nltk_data] Downloading package stopwords to /home/unamed/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [3]:
# Create Spark session
spark = SparkSession.builder \
    .appName("Twitter Sentiment Analysis") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/11 21:59:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Define schema for the CSV file
schema = StructType([
    StructField("Tweet ID", IntegerType(), True),
    StructField("Entity", StringType(), True),
    StructField("Sentiment", StringType(), True),
    StructField("Tweet content", StringType(), True)
])

In [5]:
# Load CSV data with specified schema
df = spark.read.csv("twitter_training.csv", header=True, schema=schema)

In [6]:
# Replace null values in the 'Tweet content' column with an empty string
df_cleaned = df.withColumn('Tweet content', when(col('Tweet content').isNull(), '').otherwise(col('Tweet content')))

In [7]:
# Define preprocessing stages
tokenizer = Tokenizer(inputCol="Tweet content", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")

In [8]:
# Indexer for Sentiment column (in preprocessing)
sentiment_indexer = StringIndexer(inputCol="Sentiment", outputCol="label")

In [9]:
# Create the preprocessing pipeline (with indexer)
preprocessing_pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, idf, sentiment_indexer])

In [10]:
# Fit and transform the preprocessing pipeline
preprocessed_data = preprocessing_pipeline.fit(df_cleaned).transform(df_cleaned)

24/05/11 22:00:19 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: im getting on borderlands and i will murder you all ,
 Schema: Tweet content
Expected: Tweet content but found: im getting on borderlands and i will murder you all ,
CSV file: file:///home/unamed/Projects/MST/bigData/twitter_training.csv
24/05/11 22:00:25 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Positive
 Schema: Sentiment
Expected: Sentiment but found: Positive
CSV file: file:///home/unamed/Projects/MST/bigData/twitter_training.csv
                                                                                

In [11]:
# Split data into training and test sets
train_data, test_data = preprocessed_data.randomSplit([0.8, 0.2], seed=123)

In [12]:
# Assemble features for the final pipeline (without indexer)
final_assembler = VectorAssembler(inputCols=["features"], outputCol="final_features")

In [13]:
# Logistic Regression model
lr = LogisticRegression(featuresCol='final_features', labelCol='label')

In [14]:
# Create the final pipeline (without indexer)
final_pipeline = Pipeline(stages=[final_assembler, lr])

In [None]:
# Train the final pipeline
pipeline_model = final_pipeline.fit(train_data)

24/05/11 22:00:32 WARN DAGScheduler: Broadcasting large task binary with size 18.8 MiB
24/05/11 22:00:33 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 2401, Borderlands, Positive, im getting on borderlands and i will murder you all ,
 Schema: Tweet ID, Entity, Sentiment, Tweet content
Expected: Tweet ID but found: 2401
CSV file: file:///home/unamed/Projects/MST/bigData/twitter_training.csv
24/05/11 22:00:38 WARN DAGScheduler: Broadcasting large task binary with size 18.8 MiB
24/05/11 22:00:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 2401, Borderlands, Positive, im getting on borderlands and i will murder you all ,
 Schema: Tweet ID, Entity, Sentiment, Tweet content
Expected: Tweet ID but found: 2401
CSV file: file:///home/unamed/Projects/MST/bigData/twitter_training.csv
24/05/11 22:00:42 WARN DAGScheduler: Broadcasting large task binary with size 18.8 MiB
24/05/11 22:00:44 WARN DAGScheduler: Broadcasting large task binary w

In [None]:

# Make predictions on the test data
predictions = pipeline_model.transform(test_data)

In [None]:
# Create evaluators for accuracy, F1 score, and recall
evaluator_accuracy = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="f1")
evaluator_recall = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedRecall")

In [None]:
# Evaluate the model
accuracy = evaluator_accuracy.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)

In [None]:
# Display the results
print("Accuracy of Logistic Regression model:", accuracy)
print("F1 Score of Logistic Regression model:", f1_score)
print("Weighted Recall of Logistic Regression model:", recall)