In [1]:
from pyspark.sql.types import *

In [2]:
customSchema = StructType([
    StructField("Primary_Index", IntegerType(), True), 
    StructField("Tweet_Id", StringType(), True), 
    StructField("Date_Text", StringType(), True),
    StructField("Flag", StringType(), True), 
    StructField("User", StringType(), True),
    StructField("Tweet_Text", StringType(), True)])

df = spark.read.load('hdfs://localhost:9000/CA2/ProjectTweets.csv', format="csv", header="False", sep=',', schema=customSchema)

df = df.filter(df["Tweet_Id"] == '1467811594')


In [3]:
##Remove commas from the Tweet Text field
##First test an example

In [4]:
filtered_df = df.filter(df["Tweet_Id"] == '1467811594')
# Select only the "Tweet_Text" column from the filtered DataFrame
result = filtered_df.select("Tweet_Text")

# Show the content of column "Tweet_Text"
result.show(truncate=False)

[Stage 1:>                                                          (0 + 1) / 1]

+---------------------------------------------------------------------------------------------------+
|Tweet_Text                                                                                         |
+---------------------------------------------------------------------------------------------------+
|@LOLTrish hey  long time no see! Yes.. Rains a bit ,only a bit  LOL , I'm fine thanks , how's you ?|
+---------------------------------------------------------------------------------------------------+



                                                                                

In [5]:
##Then strip the commas out

In [6]:
from pyspark.sql.functions import regexp_replace
# Remove commas from the "Tweet_Text" column

filtered_df = df.filter(df["Tweet_Id"] == '1467811594')

filtered_df = filtered_df.withColumn("Tweet_Text", regexp_replace(filtered_df["Tweet_Text"], ",", ""))

# Select only the "Tweet_Text" column from the filtered DataFrame
result = filtered_df.select("Tweet_Text")

# Show the content of column "Tweet_Text"
result.show(truncate=False)

[Stage 3:>                                                          (0 + 1) / 1]

+------------------------------------------------------------------------------------------------+
|Tweet_Text                                                                                      |
+------------------------------------------------------------------------------------------------+
|@LOLTrish hey  long time no see! Yes.. Rains a bit only a bit  LOL  I'm fine thanks  how's you ?|
+------------------------------------------------------------------------------------------------+



                                                                                

In [7]:
##Now do it with all the rows

In [8]:
df = df.withColumn("Tweet_Text", regexp_replace(df["Tweet_Text"], ",", ""))

In [9]:
filtered_df = df.filter(df["Tweet_Id"] == '1467811594')
# Select only the "Tweet_Text" column from the filtered DataFrame
result = filtered_df.select("Tweet_Text")

# Show the content of column "Tweet_Text"
result.show(truncate=False)

[Stage 5:>                                                          (0 + 1) / 1]

+------------------------------------------------------------------------------------------------+
|Tweet_Text                                                                                      |
+------------------------------------------------------------------------------------------------+
|@LOLTrish hey  long time no see! Yes.. Rains a bit only a bit  LOL  I'm fine thanks  how's you ?|
+------------------------------------------------------------------------------------------------+



                                                                                

In [10]:

# Calculate the max length of each column
from pyspark.sql.functions import max, length
max_lengths = []

for column_name in df.columns:
    max_length = df.agg(max(length(column_name))).collect()[0][0]
    max_lengths.append((column_name, max_length))

# Display the results
for col, max_len in max_lengths:
    print(f"Max length of {col}: {max_len}")

[Stage 21:>                                                         (0 + 2) / 2]

Max length of Primary_Index: 1
Max length of Tweet_Id: 10
Max length of Date_Text: 28
Max length of Flag: 8
Max length of User: 4
Max length of Tweet_Text: 96




In [11]:
from pyspark.sql.functions import col, sum

# Count null values in each column
null_counts = []

for column_name in df.columns:
    count = df.select(column_name).where(col(column_name).isNull()).count()
    null_counts.append((column_name, count))

# Display the results
for col, count in null_counts:
    print(f"Number of null values in {col}: {count}")

[Stage 39:>                                                         (0 + 2) / 2]

Number of null values in Primary_Index: 0
Number of null values in Tweet_Id: 0
Number of null values in Date_Text: 0
Number of null values in Flag: 0
Number of null values in User: 0
Number of null values in Tweet_Text: 0


                                                                                

In [12]:
from pyspark.sql.functions import countDistinct

# Calculate the number of unique values in each column
unique_counts = []

for column_name in df.columns:
    count = df.agg(countDistinct(column_name)).collect()[0][0]
    unique_counts.append((column_name, count))

# Display the results
for col, count in unique_counts:
    print(f"Number of unique values in {col}: {count}")

[Stage 72:>                                                         (0 + 2) / 2]

Number of unique values in Primary_Index: 1
Number of unique values in Tweet_Id: 1
Number of unique values in Date_Text: 1
Number of unique values in Flag: 1
Number of unique values in User: 1
Number of unique values in Tweet_Text: 1




In [13]:
from pyspark.sql.functions import col
# Find and display an example record with duplicated "Tweet_Id"
duplicate_tweet_id_example = df.groupBy("Tweet_Id").count().filter(col("count") > 1).limit(1)

# Display the result
duplicate_tweet_id_example.show()



+--------+-----+
|Tweet_Id|count|
+--------+-----+
+--------+-----+



In [14]:
# Find and display all records with the Tweet_Id "1469531660"
matching_records = df.filter(col("Tweet_Id") == "1469531660")

# Display the matching records
matching_records.show(truncate=False)

[Stage 82:>                                                         (0 + 1) / 1]

+-------------+--------+---------+----+----+----------+
|Primary_Index|Tweet_Id|Date_Text|Flag|User|Tweet_Text|
+-------------+--------+---------+----+----+----------+
+-------------+--------+---------+----+----+----------+



                                                                                

In [15]:
# Create a DataFrame with unique records
unique_records_df = df.dropDuplicates(["Tweet_Id", "Date_Text", "Flag", "User", "Tweet_Text"])


In [16]:
# Calculate the number of unique values in each column
unique_counts = []

for column_name in df.columns:
    count = unique_records_df.agg(countDistinct(column_name)).collect()[0][0]
    unique_counts.append((column_name, count))

# Display the results
for col, count in unique_counts:
    print(f"Number of unique values in {col}: {count}")

[Stage 117:>                                                        (0 + 2) / 2]

Number of unique values in Primary_Index: 1
Number of unique values in Tweet_Id: 1
Number of unique values in Date_Text: 1
Number of unique values in Flag: 1
Number of unique values in User: 1
Number of unique values in Tweet_Text: 1




In [17]:
from pyspark.sql.functions import col
# Find and display all records with the Tweet_Id "1469531660"
matching_records = unique_records_df.filter(col("Tweet_Id") == "1469531660")

# Display the matching records
matching_records.show(truncate=False)

[Stage 123:>                                                        (0 + 2) / 2]

+-------------+--------+---------+----+----+----------+
|Primary_Index|Tweet_Id|Date_Text|Flag|User|Tweet_Text|
+-------------+--------+---------+----+----+----------+
+-------------+--------+---------+----+----+----------+



                                                                                

In [18]:
df = unique_records_df

In [19]:
# Extract characters at positions 21, 22, and 23 and select unique values
from pyspark.sql.functions import substring
unique_characters = df.select(substring("Date_Text", 21, 3).alias("Date_Text_Characters")).distinct()

# Show the unique characters
unique_characters.show()

[Stage 126:>                                                        (0 + 2) / 2]

+--------------------+
|Date_Text_Characters|
+--------------------+
|                 PDT|
+--------------------+





In [20]:

df =df.withColumn("Year", df.Date_Text.substr(-4, 4))
df = df.withColumn("Month", df.Date_Text.substr(5, 3))
df = df.withColumn("Day", df.Date_Text.substr(9, 2))

In [21]:
df.show(10)

[Stage 129:>                                                        (0 + 2) / 2]

+-------------+----------+--------------------+--------+----+--------------------+----+-----+---+
|Primary_Index|  Tweet_Id|           Date_Text|    Flag|User|          Tweet_Text|Year|Month|Day|
+-------------+----------+--------------------+--------+----+--------------------+----+-----+---+
|            7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|coZZ|@LOLTrish hey  lo...|2009|  Apr| 06|
+-------------+----------+--------------------+--------+----+--------------------+----+-----+---+





In [22]:
# Use the `distinct` method to get unique values in the "Month" column
unique_months = df.select("Month").distinct()

# Show the unique values
unique_months.show()

[Stage 132:>                                                        (0 + 2) / 2]

+-----+
|Month|
+-----+
|  Apr|
+-----+





In [23]:
from pyspark.sql.functions import when, lit

# Use the `when` and `lit` functions to replace values in the "Month" column
df = df.withColumn("Month", when(df["Month"] == "May", lit("05"))
                        .when(df["Month"] == "Apr", lit("04"))
                        .when(df["Month"] == "Jun", lit("06"))
                        .otherwise(df["Month"]))

# Show the updated DataFrame
df.show(10)

[Stage 135:>                                                        (0 + 2) / 2]

+-------------+----------+--------------------+--------+----+--------------------+----+-----+---+
|Primary_Index|  Tweet_Id|           Date_Text|    Flag|User|          Tweet_Text|Year|Month|Day|
+-------------+----------+--------------------+--------+----+--------------------+----+-----+---+
|            7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|coZZ|@LOLTrish hey  lo...|2009|   04| 06|
+-------------+----------+--------------------+--------+----+--------------------+----+-----+---+



                                                                                

In [24]:
from pyspark.sql.functions import concat_ws, expr
from pyspark.sql.types import StringType

# Convert the "Year," "Month," and "Day" columns to string type
df = df.withColumn("Year", df["Year"].cast(StringType()))
df = df.withColumn("Month", df["Month"].cast(StringType()))
df = df.withColumn("Day", df["Day"].cast(StringType()))

# Use `concat_ws` to concatenate the three columns with "/" separator
df = df.withColumn("Date", concat_ws("/", df["Day"], df["Month"], df["Year"]))

# Use `expr` to cast the concatenated string to a date
df = df.withColumn("Date", expr("to_date(Date, 'dd/MM/yyyy')"))

# Show the updated DataFrame
df.show(10)

[Stage 138:>                                                        (0 + 2) / 2]

+-------------+----------+--------------------+--------+----+--------------------+----+-----+---+----------+
|Primary_Index|  Tweet_Id|           Date_Text|    Flag|User|          Tweet_Text|Year|Month|Day|      Date|
+-------------+----------+--------------------+--------+----+--------------------+----+-----+---+----------+
|            7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|coZZ|@LOLTrish hey  lo...|2009|   04| 06|2009-04-06|
+-------------+----------+--------------------+--------+----+--------------------+----+-----+---+----------+



                                                                                

In [25]:
from pyspark.sql.functions import col, lower

df = df.withColumn("Tweet_Text", lower(col("Tweet_Text")))
df.show(10, truncate = False)

[Stage 141:>                                                        (0 + 2) / 2]

+-------------+----------+----------------------------+--------+----+------------------------------------------------------------------------------------------------+----+-----+---+----------+
|Primary_Index|Tweet_Id  |Date_Text                   |Flag    |User|Tweet_Text                                                                                      |Year|Month|Day|Date      |
+-------------+----------+----------------------------+--------+----+------------------------------------------------------------------------------------------------+----+-----+---+----------+
|7            |1467811594|Mon Apr 06 22:20:03 PDT 2009|NO_QUERY|coZZ|@loltrish hey  long time no see! yes.. rains a bit only a bit  lol  i'm fine thanks  how's you ?|2009|04   |06 |2009-04-06|
+-------------+----------+----------------------------+--------+----+------------------------------------------------------------------------------------------------+----+-----+---+----------+





In [26]:
from pyspark.sql.functions import col, regexp_replace
url_pattern = r'(https?://\S+|www\.\S+)'

# Use regexp_replace to remove URLs from the "Tweet_Text" column
df = df.withColumn("Tweet_Text", regexp_replace(col("Tweet_Text"), url_pattern, ''))
df.show(10, truncate = False)

                                                                                

+-------------+----------+----------------------------+--------+----+------------------------------------------------------------------------------------------------+----+-----+---+----------+
|Primary_Index|Tweet_Id  |Date_Text                   |Flag    |User|Tweet_Text                                                                                      |Year|Month|Day|Date      |
+-------------+----------+----------------------------+--------+----+------------------------------------------------------------------------------------------------+----+-----+---+----------+
|7            |1467811594|Mon Apr 06 22:20:03 PDT 2009|NO_QUERY|coZZ|@loltrish hey  long time no see! yes.. rains a bit only a bit  lol  i'm fine thanks  how's you ?|2009|04   |06 |2009-04-06|
+-------------+----------+----------------------------+--------+----+------------------------------------------------------------------------------------------------+----+-----+---+----------+



In [27]:
from pyspark.sql.functions import col, regexp_replace
user_mention_pattern = r'@[\w]+'

# Use regexp_replace to remove user mentions from the "Tweet_Text" column
df = df.withColumn("Tweet_Text", regexp_replace(col("Tweet_Text"), user_mention_pattern, ''))
df.show(10, truncate = False)

[Stage 147:>                                                        (0 + 2) / 2]

+-------------+----------+----------------------------+--------+----+---------------------------------------------------------------------------------------+----+-----+---+----------+
|Primary_Index|Tweet_Id  |Date_Text                   |Flag    |User|Tweet_Text                                                                             |Year|Month|Day|Date      |
+-------------+----------+----------------------------+--------+----+---------------------------------------------------------------------------------------+----+-----+---+----------+
|7            |1467811594|Mon Apr 06 22:20:03 PDT 2009|NO_QUERY|coZZ| hey  long time no see! yes.. rains a bit only a bit  lol  i'm fine thanks  how's you ?|2009|04   |06 |2009-04-06|
+-------------+----------+----------------------------+--------+----+---------------------------------------------------------------------------------------+----+-----+---+----------+



                                                                                

In [28]:
import pandas as pd
import numpy as np
import seaborn as sns
import re
import string
from string import punctuation
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')


import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Dropout
from tensorflow.keras.callbacks import EarlyStopping

[nltk_data] Downloading package stopwords to /home/hduser/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
2023-10-30 22:15:26.011232: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-10-30 22:15:26.490863: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-10-30 22:15:26.490908: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-10-30 22:15:26.493338: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-10-30 22:15:26.603878: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on 

In [29]:
def get_text_processing(text):
    stpword = stopwords.words('english')
    no_punctuation = [char for char in text if char not in string.punctuation]
    no_punctuation = ''.join(no_punctuation)
    return ' '.join([word for word in no_punctuation.split() if word.lower() not in stpword])

In [30]:
final_df = df[['Tweet_Id', 'Date', 'Tweet_Text']]
sentiment_df = final_df.toPandas()

                                                                                

In [31]:
sentiment_df.head(10)

Unnamed: 0,Tweet_Id,Date,Tweet_Text
0,1467811594,2009-04-06,hey long time no see! yes.. rains a bit only...


In [32]:
from datetime import datetime

# Get the current date and time
current_datetime = datetime.now()

# Format the date and time as a string
formatted_datetime = current_datetime.strftime('%Y-%m-%d %H:%M:%S')

# Print the formatted date and time
print("Current date and time:", formatted_datetime)

Current date and time: 2023-10-30 22:15:35


In [33]:
sentiment_df['Tweet_Text_Cleaned'] = sentiment_df['Tweet_Text'].apply(get_text_processing)
sentiment_df.head()

Unnamed: 0,Tweet_Id,Date,Tweet_Text,Tweet_Text_Cleaned
0,1467811594,2009-04-06,hey long time no see! yes.. rains a bit only...,hey long time see yes rains bit bit lol im fin...


In [34]:
from datetime import datetime

# Get the current date and time
current_datetime = datetime.now()

# Format the date and time as a string
formatted_datetime = current_datetime.strftime('%Y-%m-%d %H:%M:%S')

# Print the formatted date and time
print("Current date and time:", formatted_datetime)

Current date and time: 2023-10-30 22:15:35


In [35]:
import pandas as pd
from textblob import TextBlob

# Assuming "df" is your Pandas DataFrame
sentiment_df["Textblob_Sentiment"] = sentiment_df["Tweet_Text_Cleaned"].apply(lambda x: TextBlob(x).sentiment.polarity)

# The "Sentiment" column now contains sentiment polarity scores


In [36]:
def classify_sentiment(polarity):
    if polarity > 0:
        return "Positive"
    elif polarity < 0:
        return "Negative"
    else:
        return "Neutral"

sentiment_df["Textblob_Sentiment_Class"] = sentiment_df["Textblob_Sentiment"].apply(classify_sentiment)

In [37]:
from datetime import datetime

# Get the current date and time
current_datetime = datetime.now()

# Format the date and time as a string
formatted_datetime = current_datetime.strftime('%Y-%m-%d %H:%M:%S')

# Print the formatted date and time
print("Current date and time:", formatted_datetime)

Current date and time: 2023-10-30 22:15:35


In [42]:
import nltk
#nltk.download('vader_lexicon')
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Initialize the VADER sentiment analyzer
analyzer = SentimentIntensityAnalyzer()

# Assuming "df" is your Pandas DataFrame
sentiment_df["NLTK_Sentiment_Scores"] = sentiment_df["Tweet_Text_Cleaned"].apply(lambda x: analyzer.polarity_scores(x))

# Extract the sentiment scores (positive, negative, neutral, compound) into separate columns
sentiment_df["NLTK_Positive_Score"] = sentiment_df["NLTK_Sentiment_Scores"].apply(lambda x: x["pos"])
sentiment_df["NLTK_Negative_Score"] = sentiment_df["NLTK_Sentiment_Scores"].apply(lambda x: x["neg"])
sentiment_df["NLTK_Neutral_Score"] = sentiment_df["NLTK_Sentiment_Scores"].apply(lambda x: x["neu"])
sentiment_df["NLTK_Compound_Score"] = sentiment_df["NLTK_Sentiment_Scores"].apply(lambda x: x["compound"])

# Classify sentiment based on compound score
sentiment_df["NLTK_Sentiment_Class"] = sentiment_df["NLTK_Compound_Score"].apply(lambda x: "Positive" if x > 0 else "Negative" if x < 0 else "Neutral")



In [40]:
from datetime import datetime

# Get the current date and time
current_datetime = datetime.now()

# Format the date and time as a string
formatted_datetime = current_datetime.strftime('%Y-%m-%d %H:%M:%S')

# Print the formatted date and time
print("Current date and time:", formatted_datetime)

Current date and time: 2023-10-30 22:19:17


In [41]:
sentiment_df.head()

Unnamed: 0,Tweet_Id,Date,Tweet_Text,Tweet_Text_Cleaned,Textblob_Sentiment,Textblob_Sentiment_Class,NLTK_Sentiment_Scores,NLTK_Positive_Score,NLTK_Negative_Score,NLTK_Neutral_Score,NLTK_Compound_Score,NLTK_Sentiment_Class
0,1467811594,2009-04-06,hey long time no see! yes.. rains a bit only...,hey long time see yes rains bit bit lol im fin...,0.341667,Positive,"{'neg': 0.0, 'neu': 0.469, 'pos': 0.531, 'comp...",0.531,0.0,0.469,0.8481,Positive
