In [1]:
# Importing PySpark related libraries
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import concat_ws, regexp_replace, col, lower, to_date, date_format
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

# Importing MongoDB related library
from pymongo import MongoClient

# Importing Dash and Plotly for data visualization
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash import html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
import plotly.express as px

# Additional PySpark functions and features
from pyspark.sql.functions import avg
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col

# Importing NLTK for natural language processing
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Downloading NLTK datasets if needed
# nltk.download('omw-1.4')
# nltk.download('wordnet')
# nltk.download('vader_lexicon')

# Importing datetime library
from datetime import timedelta, datetime, date

The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`
  import dash_core_components as dcc
The dash_html_components package is deprecated. Please replace
`import dash_html_components as html` with `from dash import html`
  import dash_html_components as html


In [2]:
spark = SparkSession.builder.appName('ProjectTweets').getOrCreate()

# DATA PREPARATION

In [3]:
df = spark.read.csv('/user1/ProjectTweets.csv', header=True, inferSchema=True)

                                                                                

In [4]:
#df.printSchema()

In [5]:
#df.show(5, truncate=False)

In [6]:
new_cols = ['ids', 'date', 'flag', 'user', 'text']

for i, column_name in enumerate(new_cols):
    df = df.withColumnRenamed(df.columns[i + 1], column_name)

In [7]:
spark.conf.set('spark.sql.legacy.timeParserPolicy', 'LEGACY')

In [8]:
date_column = df.select('date')

In [9]:
df = df.withColumn('date', to_date(df['date'], 'EEE MMM dd HH:mm:ss zzz yyyy'))

In [10]:
df = df.withColumn('date', to_date(col('date'), 'dd/MM/yyyy'))

In [11]:
#df.show()

In [12]:
#df.printSchema()

In [13]:
# Convert text data to lowercase and clean unnecessary characters
df = df.withColumn("text", lower(regexp_replace(col("text"), "[^a-zA-Z0-9\\s]", " ")))

In [14]:
# Remove special symbols, and links from text data
df = df.withColumn("text", regexp_replace(col("text"), r'[@#]\w+|https?://\S+|\W', " "))

In [15]:
#df.show(1, truncate=False)

## Sentiment Analysis without Tokenization, Lemmatization and Removing Stopwords

In [16]:
df_for_sentiment_1 = df.select('date', 'text')

In [17]:
# Create Vader SentimentIntensityAnalyzer
sia = SentimentIntensityAnalyzer()

# Define a function for UDF
def analyze_sentiment(text):
    sentiment = sia.polarity_scores(text)
    return sentiment['compound']

# Save UDF
sentiment_udf = udf(analyze_sentiment, DoubleType())

In [18]:
# Apply Vader analysis and add the results to a new column
df_for_sentiment_1 = df_for_sentiment_1.withColumn("sentiment_score", sentiment_udf(df_for_sentiment_1["text"]))

#df_for_sentiment_1.show()

In [19]:
# Convert "date" column to 'yyyy-MM-dd' format
df_for_sentiment_1 = df_for_sentiment_1.withColumn("date", F.to_date(df_for_sentiment_1["date"]))

In [20]:
# Ensure you're using the correct column name in the aggregation
daily_sentiment_1 = df_for_sentiment_1.groupBy("date").agg(avg("sentiment_score").alias("avg_sentiment_score")).orderBy("date")

In [21]:
# Create a full date array to include the entire date range
min_date = daily_sentiment_1.selectExpr("min(date) as min_date").first().min_date
max_date = daily_sentiment_1.selectExpr("max(date) as max_date").first().max_date

                                                                                

In [22]:
# Create date array
date_range = [min_date + timedelta(days=x) for x in range((max_date - min_date).days + 1)]
date_range_df = spark.createDataFrame([(date,) for date in date_range], ["date"])

In [23]:
# Fill the gap in date column
daily_sentiment_1 = date_range_df.join(daily_sentiment_1, on=["date"], how="left").orderBy("date").fillna(0, subset=["avg_sentiment_score"])

In [24]:
daily_sentiment_1.printSchema()

root
 |-- date: date (nullable = true)
 |-- avg_sentiment_score: double (nullable = false)



In [25]:
# Get the column names
columns = daily_sentiment_1.columns

# Find and print the count of null values in each column
for column in columns:
    null_count = daily_sentiment_1.filter(daily_sentiment_1[column].isNull()).count()
    print(f"Column '{column}' contains {null_count} null values.")

                                                                                

Column 'date' contains 0 null values.
Column 'avg_sentiment_score' contains 0 null values.


### Linear Interpolation

In [27]:
# Define a window specification to order the data by date
window_spec = Window.orderBy("date")

# Calculate the next non-null value using the 'last' function
interpolated_df = daily_sentiment_1.withColumn("next_value", F.last("avg_sentiment_score", True).over(window_spec))

# Calculate the previous non-null value using the 'first' function
interpolated_df = interpolated_df.withColumn("prev_value", F.first("avg_sentiment_score", True).over(window_spec))

# Calculate the linearly interpolated value
interpolated_df = interpolated_df.withColumn(
    "interpolated_value",
    F.when(F.col("avg_sentiment_score") == 0, (F.col("next_value") + F.col("prev_value")) / 2).otherwise(F.col("avg_sentiment_score"))
)

# Drop the 'next_value' and 'prev_value' columns if not needed
interpolated_df = interpolated_df.drop("next_value", "prev_value")

In [28]:
interpolated_df.printSchema()

root
 |-- date: date (nullable = true)
 |-- avg_sentiment_score: double (nullable = false)
 |-- interpolated_value: double (nullable = true)



In [29]:
interpolated_df = interpolated_df.select('date', 'interpolated_value')
interpolated_df.show()

2023-11-08 23:37:03,599 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

+----------+-------------------+
|      date| interpolated_value|
+----------+-------------------+
|2009-04-07| 0.1638104692791461|
|2009-04-08|0.08190523463957305|
|2009-04-09|0.08190523463957305|
|2009-04-10|0.08190523463957305|
|2009-04-11|0.08190523463957305|
|2009-04-12|0.08190523463957305|
|2009-04-13|0.08190523463957305|
|2009-04-14|0.08190523463957305|
|2009-04-15|0.08190523463957305|
|2009-04-16|0.08190523463957305|
|2009-04-17|0.08190523463957305|
|2009-04-18|0.18738602157202913|
|2009-04-19|0.18894100089100915|
|2009-04-20|0.17782408521710552|
|2009-04-21|0.17327567762269075|
|2009-04-22|0.08190523463957305|
|2009-04-23|0.08190523463957305|
|2009-04-24|0.08190523463957305|
|2009-04-25|0.08190523463957305|
|2009-04-26|0.08190523463957305|
+----------+-------------------+
only showing top 20 rows



                                                                                

In [30]:
# Get the column names
columns = interpolated_df.columns

# Find and print the count of null values in each column
for column in columns:
    null_count = interpolated_df.filter(interpolated_df[column].isNull()).count()
    print(f"Column '{column}' contains {null_count} null values.")

                                                                                

Column 'date' contains 0 null values.


2023-11-08 23:42:34,536 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

Column 'interpolated_value' contains 0 null values.


                                                                                

In [31]:
# Start DASH
app = dash.Dash(__name__)

# Configure DASH
app.layout = html.Div([
    dcc.Graph(
        id='line-chart',
        figure=px.line(interpolated_df, x='date', y='interpolated_value', title='Daily Average Sentiment Score')
    )
])

if __name__ == '__main__':
    app.run_server(debug=True)

2023-11-08 23:46:43,875 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [None]:
asdfasgdfhdsh

# Lemmatization, Tokenization, StopWordsRemover

In [None]:
# Lemmatization using NLTK
lemmatizer = WordNetLemmatizer()

def lemmatize_text(text):
    words = text.split()
    lemmatized_words = [lemmatizer.lemmatize(word) for word in words]
    return ' '.join(lemmatized_words)

lemmatize_udf = udf(lemmatize_text, StringType())
df = df.withColumn("text", lemmatize_udf("text"))

In [None]:
# Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="filtered_words")
df = tokenizer.transform(df)

In [None]:
# Use StopWordsRemover on the "filtered_words" column in your example DataFrame
remover = StopWordsRemover(inputCol="filtered_words", outputCol="filtered_words_without_stopwords")
df = remover.transform(df)

# You can update the column name as per your needs
df = df.withColumnRenamed("filtered_words_without_stopwords", "filtered_words_final")

# Preparing the Dataset for the Databases

In [None]:
# Just pick the necessary columns
df = df.select('0', 'ids', 'date', 'flag', 'user', 'filtered_words_final')

# Rename the "0" column to "index"
df = df.withColumnRenamed("0", "tweet_index")

# Show the result
df.show(truncate=False)

In [None]:
df.dropna()

In [None]:
# Count the total number of values in the dataframe
total_count = df.count()

# Show the total count
print("Total count of values in the dataframe", total_count)

# MySQL

In [None]:
import pymysql

# Connect to the database
connection = pymysql.connect(
    host="localhost",
    user="root",
    password="password",
    database="ProjectTweets",
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

In [None]:
# # Create a cursor
cursor = connection.cursor()

# # Create a table
# create_table_sql = """
# CREATE TABLE Tweets (
#     tweet_index INT AUTO_INCREMENT PRIMARY KEY,
#     ids BIGINT,
#     date DATE,
#     flag VARCHAR(55),
#     user VARCHAR(255),
#     filtered_words_final TEXT
# );
# """

In [None]:
# Create a table
#cursor.execute(create_table_sql)

# Save changes
#connection.commit()

In [None]:
df.printSchema()

In [None]:
# Combine the column named 'filtered_words_final' into a comma-separated column of text.
df = df.withColumn('concatenated_words', concat_ws(",", df['filtered_words_final']))
df.printSchema()

In [None]:
df = df.select('tweet_index', 'ids', 'date', 'flag', 'user', 'concatenated_words')
df.show(1, truncate=False)

In [None]:
df.printSchema()

In [None]:
mysql_url = "jdbc:mysql://localhost:3306/ProjectTweets"
mysql_properties = {
    "user": "root",
    "password": "password",
}


In [None]:
# df.write.jdbc(url=mysql_url, table="Tweets", mode="overwrite", properties=mysql_properties)

In [None]:
# # Execute the ALTER TABLE query
# alter_table_sql = "ALTER TABLE Tweets ADD COLUMN YCSB_KEY VARCHAR(255);"
# cursor.execute(alter_table_sql)

In [None]:
connection.commit()

In [None]:
df_from_mysql = spark.read.jdbc(url=mysql_url, table="Tweets", properties=mysql_properties)

### Showing results from MySQL

In [None]:
# Checking the database after insertin the dataframe
df_from_mysql.show()

In [None]:
# # Create a cursor
# cursor = connection.cursor()

# # Create a table
# create_table_sql = """
# CREATE TABLE YCSB_TEST (
#     tweet_index INT AUTO_INCREMENT PRIMARY KEY,
#     ids BIGINT,
#     date DATE,
#     flag VARCHAR(55),
#     user VARCHAR(255),
#     filtered_words_final TEXT,
#     YCSB_KEY VARCHAR(255)
# );
# """



In [None]:
# #Create a table
# cursor.execute(create_table_sql)

# #Save changes
# connection.commit()

In [None]:
import subprocess

command = "/home/hduser/ycsb-0.17.0/bin/ycsb.sh load jdbc -P /home/hduser/ycsb-0.17.0/jdbc-binding/conf/db.properties -P /home/hduser/ycsb-0.17.0/workloads/workloada -p db.connection_properties=\"user=root&password=password&useSSL=false\" -p jdbc.url=jdbc:mysql://localhost:3306/ProjectTweets -p table=YCSB_TEST"

process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()

if process.returncode == 0:
    print("YCSB operation completed successfully.")
    print("Output:")
    print(stdout.decode('utf-8'))
else:
    print("YCSB operation failed. Error message:")
    print(stderr.decode('utf-8'))


### Due to unidentified Issue YCSB did not work. I decided to use cProfile.

In [None]:
# Örnek bir sorgu
query = "SELECT * FROM Tweets WHERE concatenated_words"

In [None]:
import cProfile

def perform_query():
    cursor = connection.cursor()
    cursor.execute(query)
    results = cursor.fetchall()
    cursor.close()

if __name__ == '__main__':
    cProfile.run("perform_query()", sort="cumulative")

    Total calls: 1,131,181
    Total time: 3.526 seconds

Top time-consuming functions:

    {built-in method builtins.exec}: 3.526 seconds
    <string>:1(<module>): 3.526 seconds
    3302925674.py:3(perform_query): 3.524 seconds
    cursors.py:133(execute): 3.524 seconds
    cursors.py:319(_query): 3.524 seconds

# Hive

In [None]:
df.createOrReplaceTempView("temp_table")

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS ProjectTweets")

In [None]:
create_table_sql = """
CREATE TABLE IF NOT EXISTS ProjectTweets.Tweets (
    tweet_index INT,
    ids BIGINT,
    date DATE,
    flag STRING,
    user STRING,
    concatenated_words STRING
)
STORED AS PARQUET
"""
spark.sql(create_table_sql)

In [None]:
hive_insert_data_sql = """
INSERT INTO ProjectTweets SELECT * FROM temp_table
"""

In [None]:
spark.sql(hive_insert_data_sql)

### Showing results from Hive

In [None]:
# Query
result = spark.sql("SELECT * FROM ProjectTweets")

# Show Result
result.show()

In [None]:
import subprocess

command = "/home/hduser/ycsb-0.17.0/bin/ycsb.sh load jdbc -P /home/hduser/ycsb-0.17.0/jdbc-binding/conf/db.properties -P /home/hduser/ycsb-0.17.0/workloads/workloada -p db.connection_properties=\"user=root&password=password\" -p jdbc.url=jdbc:hive2://hive_server:10000/ProjectTweets"

process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()

if process.returncode == 0:
    print("YCSB operation completed successfully.")
    print("Output:")
    print(stdout.decode('utf-8'))
else:
    print("YCSB operation failed. Error message:")
    print(stderr.decode('utf-8'))

In [None]:
import pstats

In [None]:
# my_hive_script.py
def hive_query():
    query
    pass

if __name__ == "__main__":
    cProfile.run("hive_query()", sort="cumulative")
    
    # Show the results with pstat
    p = pstats.Stats()
    p.print_stats()

# SENTIMENT ANALYSIS AFTER TOKENIZATION, LEMMATIZATION AND STOPWORDS REMOVAL

In [None]:
df_for_sentiment_2 = df.select('date', 'concatenated_words')

In [None]:
# # Create Vader SentimentIntensityAnalyzer
# sia = SentimentIntensityAnalyzer()

# # Define a function for the UDF
# def analyze_sentiment(text):
#     sentiment = sia.polarity_scores(text)
#     return sentiment['compound']

# # Save UDF
# sentiment_udf = udf(analyze_sentiment, DoubleType())

In [None]:
# Apply Vader analysis and add the results to a new column
df_for_sentiment_2 = df_for_sentiment_2.withColumn("sentiment_score", sentiment_udf(df_for_sentiment_2["concatenated_words"]))

df_for_sentiment_2.show()

In [None]:
# Convert "date" column to 'yyyy-MM-dd' format
df_for_sentiment_2 = df_for_sentiment_2.withColumn("date", F.to_date(df_for_sentiment_2["date"]))

# Create a data frame for date and average sentiment scores
daily_sentiment_2 = df_for_sentiment_2.groupBy("date").agg(avg("sentiment_score").alias("avg_sentiment_score")).orderBy("date")

In [None]:
# Create a full date array to include the entire date range
min_date = daily_sentiment_2.selectExpr("min(date) as min_date").first().min_date
max_date = daily_sentiment_2.selectExpr("max(date) as max_date").first().max_date

In [None]:
# Create the date array
date_range = [min_date + timedelta(days=x) for x in range((max_date - min_date).days + 1)]
date_range_df = spark.createDataFrame([(date,) for date in date_range], ["date"])

In [None]:
# Fill in missing dates
daily_sentiment_2 = date_range_df.join(daily_sentiment_2, on=["date"], how="left").orderBy("date").fillna(0, subset=["avg_sentiment_score"])

In [None]:
daily_sentiment_2.show()

In [None]:
# Start the Dash application
app = dash.Dash(__name__)

In [None]:
# Create the layout of the application
app.layout = html.Div([
    dcc.Graph(
        id='sentiment-line-chart',
        figure=px.line(daily_sentiment_2, x='date', y='avg_sentiment_score', title='Daily Average Sentiment Score')
    )
])

if __name__ == '__main__':
    app.run_server(debug=True)

## This is why I chose the dataframe without lemmatization, tokenization, and stop word removal.
- Because there is not enough sentiment score it is almost 0.
- In this case removing special symbols and making the text letters smaller helped to keep the meaning of the text.

# TIME SERIES ANALYSIS

In [None]:
# Import necessary modules to work with PySpark
from pyspark.sql.functions import lag, lead, when, coalesce, expr, col
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import matplotlib.pyplot as plt

In [None]:
# Keeping the original dataframe
first_daily_sentiment_1 = daily_sentiment_1

In [None]:
window_spec = Window.partitionBy("date").orderBy("date")

In [None]:
# 1. Apply Backward Fill interpolation
daily_sentiment_1 = daily_sentiment_1.withColumn("backward_fill", lag(daily_sentiment_1["avg_sentiment_score"]).over(window_spec))

In [None]:
# Extract data for MSE calculation
interpolation_1 = daily_sentiment_1.select("avg_sentiment_score", "backward_fill")

# Calculate the differences and MSE
differences_1 = interpolation_1.withColumn("difference", (col("avg_sentiment_score") - col("backward_fill").cast("double")) ** 2)
mse_score_1 = differences_1.agg({"difference": "mean"}).collect()[0][0]

print("Mean Squared Error (MSE) Score:", mse_score_1)

In [None]:
# 3. Apply Quadratic Interpolation
daily_sentiment_1 = daily_sentiment_1.withColumn("quadratic_fill", coalesce(
    (lag(daily_sentiment_1["avg_sentiment_score"]).over(window_spec) + 2 * daily_sentiment_1["avg_sentiment_score"] - lead(daily_sentiment_1["avg_sentiment_score"]).over(window_spec)),
    daily_sentiment_1["avg_sentiment_score"]
))

In [None]:
# Extract data for MSE calculation
interpolation_3 = daily_sentiment_1.select("avg_sentiment_score", "quadratic_fill")

# Calculate the differences and MSE
differences_3 = interpolation_3.withColumn("difference", (col("avg_sentiment_score") - col("quadratic_fill").cast("double")) ** 2)
mse_score_3 = differences_3.agg({"difference": "mean"}).collect()[0][0]

print("Mean Squared Error (MSE) Score:", mse_score_3)

In [None]:
# 4. Apply Mean of Nearest Neighbors interpolation
daily_sentiment_1 = daily_sentiment_1.withColumn("knn_mean", coalesce(
    (daily_sentiment_1["avg_sentiment_score"] + (lag(daily_sentiment_1["avg_sentiment_score"]).over(window_spec) + lead(daily_sentiment_1["avg_sentiment_score"]).over(window_spec)) / 2),
    daily_sentiment_1["avg_sentiment_score"]
))

In [None]:
# Extract data for MSE calculation
interpolation_4 = daily_sentiment_1.select("avg_sentiment_score", "knn_mean")

# Calculate the differences and MSE
differences_4 = interpolation_4.withColumn("difference", (col("avg_sentiment_score") - col("knn_mean").cast("double")) ** 2)
mse_score_4 = differences_4.agg({"difference": "mean"}).collect()[0][0]

print("Mean Squared Error (MSE) Score:", mse_score_4)

In [None]:
# 5. Apply Mean of Seasonal Counterparts interpolation
daily_sentiment_1 = daily_sentiment_1.withColumn("seasonal_mean", coalesce(
    (daily_sentiment_1["avg_sentiment_score"] + (lag(daily_sentiment_1["avg_sentiment_score"], 7).over(window_spec) + lead(daily_sentiment_1["avg_sentiment_score"], -7).over(window_spec)) / 2),
    daily_sentiment_1["avg_sentiment_score"]
))

In [None]:
# Extract data for MSE calculation
interpolation_5 = daily_sentiment_1.select("avg_sentiment_score", "seasonal_mean")

# Calculate the differences and MSE
differences_5 = interpolation_5.withColumn("difference", (col("avg_sentiment_score") - col("seasonal_mean").cast("double")) ** 2)
mse_score_5 = differences_5.agg({"difference": "mean"}).collect()[0][0]

print("Mean Squared Error (MSE) Score:", mse_score_5)

In [None]:
# Collect the results for visualization
interpolated_data = daily_sentiment_1.select("date", "avg_sentiment_score", "backward_fill", "quadratic_fill", "knn_mean", "seasonal_mean").collect()

In [None]:
# Extract data for plotting
dates = [row.date for row in interpolated_data]
original_scores = [row.avg_sentiment_score for row in interpolated_data]
interpolation_methods = ["backward_fill", "quadratic_fill", "knn_mean", "seasonal_mean"]

# Create a loop to plot each interpolation method
for method in interpolation_methods:
    plt.figure(figsize=(12, 6))
    plt.plot(dates, original_scores, label='Original')
    plt.plot(dates, [row[method] for row in interpolated_data], label=method.replace("_", " ").title())  # Use the method name as the label
    plt.xlabel('Date')
    plt.ylabel('Sentiment Score')
    plt.title(f'Original vs. {method.replace("_", " ").title()}')
    plt.legend()
    plt.show()

In [None]:
# Assuming you have mse_score values in these variables
mse_scores = [mse_score_1, mse_score_3, mse_score_4, mse_score_5]

# Filter the mse_scores that are between 0.0 and 0.5
filtered_mse_scores = [score for score in mse_scores if 0.0 <= score <= 0.5]

# Create a bar chart
plt.bar(range(len(filtered_mse_scores)), filtered_mse_scores)
plt.xlabel("MSE Score Index")
plt.ylabel("MSE Score")
plt.title("MSE Scores")
plt.xticks(range(len(filtered_mse_scores)))
plt.show()

- Backward fill MSE Score: 0.0096
- Quadratic fill MSE Score: 0.0263
- KNN Mean MSE Score: 0.0107
- Seasonal Mean MSE Score: 0.0140