In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from textblob import TextBlob

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

In [2]:
# !pip install pyspark

In [3]:
# Create a Spark session
spark = SparkSession.builder.appName("Catch_tweets").getOrCreate()

# Load the CSV file into a Spark DataFrame
tweets_data = spark.read.csv("/content/tweets.csv", header=True, inferSchema=True)

# Define a user-defined function (UDF) for sentiment analysis using TextBlob
def analyze_sentiment(text):
    analysis = TextBlob(text)
    # Classify polarity as 'positive', 'negative', or 'neutral'
    if analysis.sentiment.polarity > 0:
        return 'positive'
    elif analysis.sentiment.polarity < 0:
        return 'negative'
    else:
        return 'neutral'

In [4]:
# Register the UDF with Spark
sentiment_udf = udf(analyze_sentiment, StringType())

In [5]:
tweets_data = tweets_data.withColumn("sentiments", sentiment_udf("tweets"))

In [6]:
# Display the resulting DataFrame
tweets_data.show()

+----------+-------------------+--------+---------------+--------------------+----------+
|        id|               date|    flag|       username|              tweets|sentiments|
+----------+-------------------+--------+---------------+--------------------+----------+
|1467810672|2009-04-06 22:19:49|NO_QUERY|  scotthamilton|is upset that he ...|   neutral|
|1467810917|2009-04-06 22:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|  positive|
|1467811184|2009-04-06 22:19:57|NO_QUERY|        ElleCTF|my whole body fee...|  positive|
|1467811193|2009-04-06 22:19:57|NO_QUERY|         Karoli|@nationwideclass ...|  negative|
|1467811372|2009-04-06 22:20:00|NO_QUERY|       joy_wolf|@Kwesidei not the...|  positive|
|1467811592|2009-04-06 22:20:03|NO_QUERY|        mybirch|         Need a hug |   neutral|
|1467811594|2009-04-06 22:20:03|NO_QUERY|           coZZ|@LOLTrish hey  lo...|  positive|
|1467811795|2009-04-06 22:20:05|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|   neutral|
|146781202

In [7]:
col = ['tweets','sentiments']
data = tweets_data.select(col)
data.show()

+--------------------+----------+
|              tweets|sentiments|
+--------------------+----------+
|is upset that he ...|   neutral|
|@Kenichan I dived...|  positive|
|my whole body fee...|  positive|
|@nationwideclass ...|  negative|
|@Kwesidei not the...|  positive|
|         Need a hug |   neutral|
|@LOLTrish hey  lo...|  positive|
|@Tatiana_K nope t...|   neutral|
|@twittera que me ...|   neutral|
|spring break in p...|  negative|
|I just re-pierced...|   neutral|
|@caregiving I cou...|   neutral|
|@octolinz16 It it...|   neutral|
|@smarrison i woul...|  positive|
|@iamjazzyfizzle I...|   neutral|
|Hollis' death sce...|   neutral|
|about to file taxes |   neutral|
|@LettyA ahh ive a...|  positive|
|@FakerPattyPattz ...|   neutral|
|@alydesigns i was...|  positive|
+--------------------+----------+
only showing top 20 rows



In [8]:
# convert label into integer
from pyspark.sql.functions import when


sentiment_mapping = {"positive": 1, "negative": 2, "neutral": 0}

# Create a new column 'label' using when and otherwise
data = data.withColumn(
    "label",
    when(data["sentiments"] == "positive", sentiment_mapping["positive"])
    .when(data["sentiments"] == "negative", sentiment_mapping["negative"])
    .otherwise(sentiment_mapping["neutral"])
)


In [9]:
# Show the resulting DataFrame
data.select("tweets","sentiments", "label").show()

+--------------------+----------+-----+
|              tweets|sentiments|label|
+--------------------+----------+-----+
|is upset that he ...|   neutral|    0|
|@Kenichan I dived...|  positive|    1|
|my whole body fee...|  positive|    1|
|@nationwideclass ...|  negative|    2|
|@Kwesidei not the...|  positive|    1|
|         Need a hug |   neutral|    0|
|@LOLTrish hey  lo...|  positive|    1|
|@Tatiana_K nope t...|   neutral|    0|
|@twittera que me ...|   neutral|    0|
|spring break in p...|  negative|    2|
|I just re-pierced...|   neutral|    0|
|@caregiving I cou...|   neutral|    0|
|@octolinz16 It it...|   neutral|    0|
|@smarrison i woul...|  positive|    1|
|@iamjazzyfizzle I...|   neutral|    0|
|Hollis' death sce...|   neutral|    0|
|about to file taxes |   neutral|    0|
|@LettyA ahh ive a...|  positive|    1|
|@FakerPattyPattz ...|   neutral|    0|
|@alydesigns i was...|  positive|    1|
+--------------------+----------+-----+
only showing top 20 rows



In [25]:
col = ['tweets','label']
t_data = data.select(col)
t_data.show()

+--------------------+-----+
|              tweets|label|
+--------------------+-----+
|is upset that he ...|    0|
|@Kenichan I dived...|    1|
|my whole body fee...|    1|
|@nationwideclass ...|    2|
|@Kwesidei not the...|    1|
|         Need a hug |    0|
|@LOLTrish hey  lo...|    1|
|@Tatiana_K nope t...|    0|
|@twittera que me ...|    0|
|spring break in p...|    2|
|I just re-pierced...|    0|
|@caregiving I cou...|    0|
|@octolinz16 It it...|    0|
|@smarrison i woul...|    1|
|@iamjazzyfizzle I...|    0|
|Hollis' death sce...|    0|
|about to file taxes |    0|
|@LettyA ahh ive a...|    1|
|@FakerPattyPattz ...|    0|
|@alydesigns i was...|    1|
+--------------------+-----+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import regexp_replace

def clean_tweet_text(text):
    # Remove URLs
    text = regexp_replace(text, r"http\S+|www\S+|https\S+", "")
    # Remove mentions
    text = regexp_replace(text, r"@\w+", "")
    # Remove special characters and numbers, keep only letters
    text = regexp_replace(text, "[^a-zA-Z\s]", "")
    return text

# Apply the cleaning function to the 'tweets' column
t_data = data.withColumn(clean_tweet_text("tweets"))

# Show the resulting DataFrame with cleaned tweets
t_data.select.show(truncate=False)

In [26]:
#Divide data into 70% for training, 30% for testing
dividedData = t_data.randomSplit([0.7, 0.3])
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print ("Training data rows:", train_rows, "; Testing data rows:", test_rows)

Training data rows: 1120077 ; Testing data rows: 479922


***Clean training data***

In [23]:
# from pyspark.sql.functions import regexp_replace

# def clean_tweet_text(text):
#     # Remove URLs
#     text = regexp_replace(text, r"http\S+|www\S+|https\S+", "")
#     # Remove mentions
#     text = regexp_replace(text, r"@\w+", "")
#     # Remove special characters and numbers, keep only letters
#     text = regexp_replace(text, "[^a-zA-Z\s]", "")
#     return text

# # Apply the cleaning function to the 'tweets' column
# tweets_data_cleaned = data.withColumn("cleaned_tweets", clean_tweet_text("tweets"))

# # Show the resulting DataFrame with cleaned tweets
# tweets_data_cleaned.select("tweets", "cleaned_tweets").show(truncate=False)

In [27]:
# Separate "tweets" into individual words using tokenizer
tokenizer = Tokenizer(inputCol="tweets", outputCol="tweetWords")
tokenizedTrain = tokenizer.transform(trainingData)
tokenizedTrain.show(truncate=False, n=5)

+-------------------------------------------------------------------------------------------------+-----+-------------------------------------------------------------------------------------------------------------------------+
|tweets                                                                                           |label|tweetWords                                                                                                               |
+-------------------------------------------------------------------------------------------------+-----+-------------------------------------------------------------------------------------------------------------------------+
|                                           exhausted                                             |2    |[, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , exhausted]                        |
|                                     I miss her so much already...                     

In [28]:
# Removing stop words (unimportant words to be features)
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(),
                       outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=False, n=5)

+-------------------------------------------------------------------------------------------------+-----+-------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+
|tweets                                                                                           |label|tweetWords                                                                                                               |MeaningfulWords                                                                                   |
+-------------------------------------------------------------------------------------------------+-----+-------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+
|                  

In [None]:
# Converting words feature into numerical feature withHashingTF funtion for model training
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTrainData = hashTF.transform(SwRemovedTrain).select(
    'label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)

***Modeling***

In [29]:
# Train our classifier model using training data
lr = LogisticRegression(labelCol="label", featuresCol="features",
                        maxIter=10, regParam=0.01)
model = lr.fit(numericTrainData)
print ("Training is done!")

Training is done!


In [22]:
testingData.show( n=3)

+--------------------+----------+-----+--------------------+--------------------+
|              tweets|sentiments|label|      cleaned_tweets|      sentimentWords|
+--------------------+----------+-----+--------------------+--------------------+
|                 ...|  positive|    1|                 ...|[, , , , , , , , ...|
|            Miss ...|  positive|    1|            Miss ...|[, , , , , , , , ...|
|         or i jus...|  positive|    1|         or i jus...|[, , , , , , , , ...|
+--------------------+----------+-----+--------------------+--------------------+
only showing top 3 rows



In [30]:
# # Prepare testing data
tokenizedTest = tokenizer.transform(testingData)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest).select(
    'Label', 'MeaningfulWords', 'features')
numericTest.show(truncate=False, n=2)

+-----+---------------------------------------------------------------------+-------------------------------------------------------------------+
|Label|MeaningfulWords                                                      |features                                                           |
+-----+---------------------------------------------------------------------+-------------------------------------------------------------------+
|1    |[, , , , , , , , , , , , , , , , , , , missed, new, moon, trailer...]|(262144,[89833,165360,201103,244504,249180],[1.0,1.0,1.0,1.0,19.0])|
|0    |[, , , , , , , , , , , , , , , practising.....how, feel]             |(262144,[61899,231362,249180],[1.0,1.0,15.0])                      |
+-----+---------------------------------------------------------------------+-------------------------------------------------------------------+
only showing top 2 rows



In [31]:
# Predict testing data and calculate the accuracy model
prediction = model.transform(numericTest)
predictionFinal = prediction.select(
    "MeaningfulWords", "prediction", "Label")
predictionFinal.show(n=4, truncate = False)
correctPrediction = predictionFinal.filter(
    predictionFinal['prediction'] == predictionFinal['Label']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData,
      ", accuracy:", correctPrediction/totalData)

+------------------------------------------------------------------------------------------------------------------------------+----------+-----+
|MeaningfulWords                                                                                                               |prediction|Label|
+------------------------------------------------------------------------------------------------------------------------------+----------+-----+
|[, , , , , , , , , , , , , , , , , , , missed, new, moon, trailer...]                                                         |1.0       |1    |
|[, , , , , , , , , , , , , , , practising.....how, feel]                                                                      |0.0       |0    |
|[, , , , , , , , , , , , miss, love, jamie]                                                                                   |1.0       |1    |
|[, , , , , , , , , , .., omgaga., im, sooo, , im, gunna, cry., dentist, since, 11.., suposed, 2, get, crown, put, (30mins).

In [None]:
# # Save the DataFrame to a CSV file
tweets_data.coalesce(1).write.csv('sentiments.csv', header=True, mode='overwrite')

In [None]:
df3 = pd.read_csv('/content/sentiments.csv/open.csv', error_bad_lines=False)

In [None]:
df3

In [None]:
# import csv
# field = ["id","date","flag","username","tweets","sentiments"]
# def spark_to_csv(data, file_path):
#     """ Converts spark dataframe to CSV file """
#     with open(file_path, "w") as f:
#         writer = csv.DictWriter(f, fieldnames= field)
#         writer.writerow(dict(zip(fieldnames, fieldnames)))
#         for row in data.toLocalIterator():
#             writer.writerow(row.asDict())

In [None]:
df3['date'] = pd.to_datetime(df3['date'].dt.strftime('%Y-%m-%d %H:%M:%S'))

In [None]:
df3