In [1]:
# Importing PySpark related libraries
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import concat_ws, regexp_replace, col, lower, to_date, date_format
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

# Importing MongoDB related library
from pymongo import MongoClient

# Importing Dash and Plotly for data visualization
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
import plotly.express as px

# Additional PySpark functions and features
from pyspark.sql.functions import avg
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col

# Importing NLTK for natural language processing
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Downloading NLTK datasets if needed
# nltk.download('omw-1.4')
# nltk.download('wordnet')
# nltk.download('vader_lexicon')

# Importing datetime library
from datetime import timedelta, datetime, date

The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`
  import dash_core_components as dcc
The dash_html_components package is deprecated. Please replace
`import dash_html_components as html` with `from dash import html`
  import dash_html_components as html


In [2]:
spark = SparkSession.builder.appName('ProjectTweets').enableHiveSupport().getOrCreate()

# DATA PREPARATION

In [3]:
df = spark.read.csv('/user1/ProjectTweets.csv', header=True, inferSchema=True)

                                                                                

In [4]:
df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- 1467810369: long (nullable = true)
 |-- Mon Apr 06 22:19:45 PDT 2009: string (nullable = true)
 |-- NO_QUERY: string (nullable = true)
 |-- _TheSpecialOne_: string (nullable = true)
 |-- @switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D: string (nullable = true)



In [5]:
df.show(5, truncate=False)

+---+----------+----------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------+
|0  |1467810369|Mon Apr 06 22:19:45 PDT 2009|NO_QUERY|_TheSpecialOne_|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|
+---+----------+----------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------+
|1  |1467810672|Mon Apr 06 22:19:49 PDT 2009|NO_QUERY|scotthamilton  |is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!    |
|2  |1467810917|Mon Apr 06 22:19:53 PDT 2009|NO_QUERY|mattycus       |@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds                          |
|3  |1467811184|Mon Apr 06 22:19:57 PDT 2009|NO_QUERY|ElleCTF    

In [6]:
new_cols = ['ids', 'date', 'flag', 'user', 'text']

for i, column_name in enumerate(new_cols):
    df = df.withColumnRenamed(df.columns[i + 1], column_name)

In [7]:
spark.conf.set('spark.sql.legacy.timeParserPolicy', 'LEGACY')

In [8]:
date_column = df.select('date')

In [9]:
df = df.withColumn('date', to_date(df['date'], 'EEE MMM dd HH:mm:ss zzz yyyy'))

In [10]:
df = df.withColumn('date', to_date(col('date'), 'dd/MM/yyyy'))

In [11]:
df.show()

+---+----------+----------+--------+---------------+--------------------+
|  0|       ids|      date|    flag|           user|                text|
+---+----------+----------+--------+---------------+--------------------+
|  1|1467810672|2009-04-07|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|2009-04-07|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|2009-04-07|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|2009-04-07|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|2009-04-07|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|2009-04-07|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|2009-04-07|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|2009-04-07|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|  9|1467812025|2009-04-07|NO_QUERY|        mimismo|@twittera que me ...|
| 10|1467812416|2009-04-07|NO_QUERY| erinx3leannexo|spring break in p...|
| 11|1467812579|2009-04-07|NO_QUERY|  

In [12]:
df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- ids: long (nullable = true)
 |-- date: date (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [13]:
# Convert text data to lowercase and clean unnecessary characters
df = df.withColumn("text", lower(regexp_replace(col("text"), "[^a-zA-Z0-9\\s]", " ")))

In [14]:
# Remove special symbols, and links from text data
df = df.withColumn("text", regexp_replace(col("text"), r'[@#]\w+|https?://\S+|\W', " "))

In [15]:
df.show(1, truncate=False)

+---+----------+----------+--------+-------------+---------------------------------------------------------------------------------------------------------------+
|0  |ids       |date      |flag    |user         |text                                                                                                           |
+---+----------+----------+--------+-------------+---------------------------------------------------------------------------------------------------------------+
|1  |1467810672|2009-04-07|NO_QUERY|scotthamilton|is upset that he can t update his facebook by texting it    and might cry as a result  school today also  blah |
+---+----------+----------+--------+-------------+---------------------------------------------------------------------------------------------------------------+
only showing top 1 row



## Sentiment Analysis without Tokenization, Lemmatization and Removing Stopwords

In [16]:
df_for_sentiment = df.select('date', 'text')

In [17]:
# Vader SentimentIntensityAnalyzer'ı oluşturun
sia = SentimentIntensityAnalyzer()

# UDF için bir işlev tanımlayın
def analyze_sentiment(text):
    sentiment = sia.polarity_scores(text)
    return sentiment['compound']

# UDF'yi kaydedin
sentiment_udf = udf(analyze_sentiment, DoubleType())

In [18]:
# Vader analizini uygulayın ve sonuçları yeni bir sütuna ekleyin
df_for_sentiment = df_for_sentiment.withColumn("sentiment_score", sentiment_udf(df_for_sentiment["text"]))

# Sonuçları göstermek için ilk birkaç satırı görüntüleyebilirsiniz
df_for_sentiment.show()

[Stage 5:>                                                          (0 + 1) / 1]

+----------+--------------------+---------------+
|      date|                text|sentiment_score|
+----------+--------------------+---------------+
|2009-04-07|is upset that he ...|        -0.7269|
|2009-04-07| kenichan i dived...|         0.4939|
|2009-04-07|my whole body fee...|          -0.25|
|2009-04-07| nationwideclass ...|        -0.6597|
|2009-04-07| kwesidei not the...|            0.0|
|2009-04-07|         need a hug |         0.4767|
|2009-04-07| loltrish hey  lo...|         0.7906|
|2009-04-07| tatiana k nope t...|            0.0|
|2009-04-07| twittera que me ...|            0.0|
|2009-04-07|spring break in p...|            0.0|
|2009-04-07|i just re pierced...|            0.0|
|2009-04-07| caregiving i cou...|        -0.5994|
|2009-04-07| octolinz16 it it...|        -0.1027|
|2009-04-07| smarrison i woul...|        -0.4767|
|2009-04-07| iamjazzyfizzle i...|         0.2732|
|2009-04-07|hollis  death sce...|        -0.9081|
|2009-04-07|about to file taxes |            0.0|


                                                                                

In [19]:
# "date" sütununu 'yyyy-MM-dd' formatına dönüştürün
df_for_sentiment = df_for_sentiment.withColumn("date", F.to_date(df_for_sentiment["date"]))

In [20]:
# Ensure you're using the correct column name in the aggregation
daily_sentiment = df_for_sentiment.groupBy("date").agg(avg("sentiment_score").alias("avg_sentiment_score")).orderBy("date")

In [21]:
daily_sentiment.show()

                                                                                

+----------+-------------------+
|      date|avg_sentiment_score|
+----------+-------------------+
|2009-04-07| 0.1638104692791461|
|2009-04-18|0.18738602157202913|
|2009-04-19|0.18894100089100915|
|2009-04-20|0.17782408521710552|
|2009-04-21|0.17327567762269075|
|2009-05-02|0.18204625353743725|
|2009-05-03| 0.1826210820523057|
|2009-05-04|0.17069686148275146|
|2009-05-10|0.21331721973947454|
|2009-05-11| 0.1945495898343243|
|2009-05-12|0.17012766364070722|
|2009-05-14|0.16996535817151087|
|2009-05-17|0.19635329935688794|
|2009-05-18|0.20330047123239273|
|2009-05-22|  0.205010983837318|
|2009-05-24|0.23156863905325445|
|2009-05-25|0.20778757396449699|
|2009-05-27|0.17629271882261635|
|2009-05-29|  0.195280581307947|
|2009-05-30|  0.197959609577845|
+----------+-------------------+
only showing top 20 rows



In [22]:
# Tüm tarih aralığını içerecek şekilde bir tam tarih dizisi oluşturun
min_date = daily_sentiment.selectExpr("min(date) as min_date").first().min_date
max_date = daily_sentiment.selectExpr("max(date) as max_date").first().max_date

                                                                                

In [23]:
# Tarih dizisini oluşturun (tüm günleri içerecek)
date_range = [min_date + timedelta(days=x) for x in range((max_date - min_date).days + 1)]
date_range_df = spark.createDataFrame([(date,) for date in date_range], ["date"])

In [24]:
# Eksik tarihleri doldurun
daily_sentiment = date_range_df.join(daily_sentiment, on=["date"], how="left").orderBy("date").fillna(0, subset=["avg_sentiment_score"])

In [25]:
# Sonuçları göstermek için ilk birkaç satırı görüntüleyebilirsiniz
daily_sentiment.show()

                                                                                

+----------+-------------------+
|      date|avg_sentiment_score|
+----------+-------------------+
|2009-04-07| 0.1638104692791461|
|2009-04-08|                0.0|
|2009-04-09|                0.0|
|2009-04-10|                0.0|
|2009-04-11|                0.0|
|2009-04-12|                0.0|
|2009-04-13|                0.0|
|2009-04-14|                0.0|
|2009-04-15|                0.0|
|2009-04-16|                0.0|
|2009-04-17|                0.0|
|2009-04-18|0.18738602157202913|
|2009-04-19|0.18894100089100915|
|2009-04-20|0.17782408521710552|
|2009-04-21|0.17327567762269075|
|2009-04-22|                0.0|
|2009-04-23|                0.0|
|2009-04-24|                0.0|
|2009-04-25|                0.0|
|2009-04-26|                0.0|
+----------+-------------------+
only showing top 20 rows



In [26]:
# Dash uygulamasını başlatın
app = dash.Dash(__name__)

In [27]:
# Uygulamanın düzenini oluşturun
app.layout = html.Div([
    dcc.Graph(
        id='sentiment-line-chart',
        figure=px.line(daily_sentiment, x='date', y='avg_sentiment_score', title='Daily Average Sentiment Score')
    )
])

if __name__ == '__main__':
    app.run_server(debug=True)

                                                                                

# Lemmatization, Tokenization, StopWordsRemover

In [28]:
# Lemmatization using NLTK
lemmatizer = WordNetLemmatizer()

def lemmatize_text(text):
    words = text.split()
    lemmatized_words = [lemmatizer.lemmatize(word) for word in words]
    return ' '.join(lemmatized_words)

lemmatize_udf = udf(lemmatize_text, StringType())
df = df.withColumn("text", lemmatize_udf("text"))

In [29]:
# Tokenization
tokenizer = Tokenizer(inputCol="text", outputCol="filtered_words")
df = tokenizer.transform(df)

In [30]:
# Use StopWordsRemover on the "filtered_words" column in your example DataFrame
remover = StopWordsRemover(inputCol="filtered_words", outputCol="filtered_words_without_stopwords")
df = remover.transform(df)

# You can update the column name as per your needs
df = df.withColumnRenamed("filtered_words_without_stopwords", "filtered_words_final")

# Preparing the Dataset for the Databases

In [31]:
# Just pick the necessary columns
df = df.select('0', 'ids', 'date', 'flag', 'user', 'filtered_words_final')

# Rename the "0" column to "index"
df = df.withColumnRenamed("0", "tweet_index")

# Show the result
df.show(truncate=False)

[Stage 24:>                                                         (0 + 1) / 1]

+-----------+----------+----------+--------+---------------+---------------------------------------------------------------------------------+
|tweet_index|ids       |date      |flag    |user           |filtered_words_final                                                             |
+-----------+----------+----------+--------+---------------+---------------------------------------------------------------------------------+
|1          |1467810672|2009-04-07|NO_QUERY|scotthamilton  |[upset, update, facebook, texting, might, cry, result, school, today, also, blah]|
|2          |1467810917|2009-04-07|NO_QUERY|mattycus       |[kenichan, dived, many, time, ball, managed, save, 50, rest, go, bound]          |
|3          |1467811184|2009-04-07|NO_QUERY|ElleCTF        |[whole, body, feel, itchy, like, fire]                                           |
|4          |1467811193|2009-04-07|NO_QUERY|Karoli         |[nationwideclass, behaving, m, mad, see]                                         |

                                                                                

In [32]:
df.dropna().show()

[Stage 25:>                                                         (0 + 1) / 1]

+-----------+----------+----------+--------+---------------+--------------------+
|tweet_index|       ids|      date|    flag|           user|filtered_words_final|
+-----------+----------+----------+--------+---------------+--------------------+
|          1|1467810672|2009-04-07|NO_QUERY|  scotthamilton|[upset, update, f...|
|          2|1467810917|2009-04-07|NO_QUERY|       mattycus|[kenichan, dived,...|
|          3|1467811184|2009-04-07|NO_QUERY|        ElleCTF|[whole, body, fee...|
|          4|1467811193|2009-04-07|NO_QUERY|         Karoli|[nationwideclass,...|
|          5|1467811372|2009-04-07|NO_QUERY|       joy_wolf|[kwesidei, whole,...|
|          6|1467811592|2009-04-07|NO_QUERY|        mybirch|         [need, hug]|
|          7|1467811594|2009-04-07|NO_QUERY|           coZZ|[loltrish, hey, l...|
|          8|1467811795|2009-04-07|NO_QUERY|2Hood4Hollywood|[tatiana, k, nope...|
|          9|1467812025|2009-04-07|NO_QUERY|        mimismo|[twittera, que, m...|
|         10|146

                                                                                

In [33]:
# Count the total number of values in the dataframe
total_count = df.count()

# Show the total count
print("Total count of values in the dataframe", total_count)



Total count of values in the dataframe 1599999


                                                                                

In [34]:
# # Checking the value inside flag column

# # Let's check flag column distribution
# def create_bar_chart(df):
#     flag_counts = df.groupBy("flag").count().orderBy("flag")
#     x = flag_counts.select("flag").rdd.flatMap(lambda x: x).collect()
#     y = flag_counts.select("count").rdd.flatMap(lambda x: x).collect()

#     data = [go.Bar(x=x, y=y)]

#     layout = go.Layout(title="Flag Distribution", xaxis=dict(title="Flag"), yaxis=dict(title="Count"))
#     fig = go.Figure(data=data, layout=layout)

#     return fig

# # Dash uygulamasını oluşturun
# app = dash.Dash(__name__)

# app.layout = html.Div([
#     dcc.Graph(id='flag-chart'),
# ])

# @app.callback(
#     Output('flag-chart', 'figure'),
#     [Input('flag-chart', 'relayoutData')]
# )
# def update_chart(relayoutData):
#     return create_bar_chart(df)

# if __name__ == '__main__':
#     app.run_server(debug=True)

# MySQL

In [35]:
import pymysql

# Connect to the database
connection = pymysql.connect(
    host="localhost",
    user="root",
    password="password",
    database="ProjectTweets",
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

In [36]:
# # Create a cursor
cursor = connection.cursor()

# # Create a table
# create_table_sql = """
# CREATE TABLE Tweets (
#     tweet_index INT AUTO_INCREMENT PRIMARY KEY,
#     ids BIGINT,
#     date DATE,
#     flag VARCHAR(55),
#     user VARCHAR(255),
#     filtered_words_final TEXT
# );
# """

In [37]:
# Create a table
#cursor.execute(create_table_sql)

# Save changes
#connection.commit()

In [38]:
df.printSchema()

root
 |-- tweet_index: integer (nullable = true)
 |-- ids: long (nullable = true)
 |-- date: date (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- filtered_words_final: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [39]:
# 'filtered_words_final' adındaki sütunu virgülle ayrılmış bir metin sütunu olarak birleştirin.
df = df.withColumn('concatenated_words', concat_ws(",", df['filtered_words_final']))
df.printSchema()

root
 |-- tweet_index: integer (nullable = true)
 |-- ids: long (nullable = true)
 |-- date: date (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- filtered_words_final: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- concatenated_words: string (nullable = false)



In [40]:
df = df.select('tweet_index', 'ids', 'date', 'flag', 'user', 'concatenated_words')
df.show(1, truncate=False)

[Stage 28:>                                                         (0 + 1) / 1]

+-----------+----------+----------+--------+-------------+---------------------------------------------------------------------+
|tweet_index|ids       |date      |flag    |user         |concatenated_words                                                   |
+-----------+----------+----------+--------+-------------+---------------------------------------------------------------------+
|1          |1467810672|2009-04-07|NO_QUERY|scotthamilton|upset,update,facebook,texting,might,cry,result,school,today,also,blah|
+-----------+----------+----------+--------+-------------+---------------------------------------------------------------------+
only showing top 1 row



                                                                                

In [41]:
df.printSchema()

root
 |-- tweet_index: integer (nullable = true)
 |-- ids: long (nullable = true)
 |-- date: date (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- concatenated_words: string (nullable = false)



In [42]:
mysql_url = "jdbc:mysql://localhost:3306/ProjectTweets"
mysql_properties = {
    "user": "root",
    "password": "password",
}


In [43]:
# df.write.jdbc(url=mysql_url, table="Tweets", mode="overwrite", properties=mysql_properties)

In [44]:
# # ALTER TABLE sorgusunu çalıştır
# alter_table_sql = "ALTER TABLE Tweets ADD COLUMN YCSB_KEY VARCHAR(255);"
# cursor.execute(alter_table_sql)

In [45]:
# Değişiklikleri kaydet
connection.commit()

In [46]:
df_from_mysql = spark.read.jdbc(url=mysql_url, table="Tweets", properties=mysql_properties)

In [47]:
df_from_mysql.show()

[Stage 29:>                                                         (0 + 1) / 1]

+-----------+----------+----------+--------+---------------+--------------------+--------+
|tweet_index|       ids|      date|    flag|           user|  concatenated_words|YCSB_KEY|
+-----------+----------+----------+--------+---------------+--------------------+--------+
|          1|1467810672|2009-04-07|NO_QUERY|  scotthamilton|upset,update,face...|    null|
|          2|1467810917|2009-04-07|NO_QUERY|       mattycus|kenichan,dived,ma...|    null|
|          3|1467811184|2009-04-07|NO_QUERY|        ElleCTF|whole,body,feel,i...|    null|
|          4|1467811193|2009-04-07|NO_QUERY|         Karoli|nationwideclass,b...|    null|
|          5|1467811372|2009-04-07|NO_QUERY|       joy_wolf| kwesidei,whole,crew|    null|
|          6|1467811592|2009-04-07|NO_QUERY|        mybirch|            need,hug|    null|
|          7|1467811594|2009-04-07|NO_QUERY|           coZZ|loltrish,hey,long...|    null|
|          8|1467811795|2009-04-07|NO_QUERY|2Hood4Hollywood| tatiana,k,nope,didn|    null|

                                                                                

In [48]:
# # Create a cursor
# cursor = connection.cursor()

# # Create a table
# create_table_sql = """
# CREATE TABLE YCSB_TEST (
#     tweet_index INT AUTO_INCREMENT PRIMARY KEY,
#     ids BIGINT,
#     date DATE,
#     flag VARCHAR(55),
#     user VARCHAR(255),
#     filtered_words_final TEXT,
#     YCSB_KEY VARCHAR(255)
# );
# """



In [49]:
# #Create a table
# cursor.execute(create_table_sql)

# #Save changes
# connection.commit()

In [50]:
import subprocess

command = "/home/hduser/ycsb-0.17.0/bin/ycsb.sh load jdbc -P /home/hduser/ycsb-0.17.0/jdbc-binding/conf/db.properties -P /home/hduser/ycsb-0.17.0/workloads/workloada -p db.connection_properties=\"user=root&password=password&useSSL=false\" -p jdbc.url=jdbc:mysql://localhost:3306/ProjectTweets -p table=YCSB_TEST"

process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()

if process.returncode == 0:
    print("YCSB operation completed successfully.")
    print("Output:")
    print(stdout.decode('utf-8'))
else:
    print("YCSB operation failed. Error message:")
    print(stderr.decode('utf-8'))


YCSB operation completed successfully.
Output:
/usr/bin/java  -classpath /home/hduser/ycsb-0.17.0/conf:/home/hduser/ycsb-0.17.0/lib/HdrHistogram-2.1.4.jar:/home/hduser/ycsb-0.17.0/lib/core-0.17.0.jar:/home/hduser/ycsb-0.17.0/lib/htrace-core4-4.1.0-incubating.jar:/home/hduser/ycsb-0.17.0/lib/jackson-core-asl-1.9.4.jar:/home/hduser/ycsb-0.17.0/lib/jackson-mapper-asl-1.9.4.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/conf:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/commons-collections-3.2.1.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/commons-lang-2.4.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/commons-pool-1.5.4.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/geronimo-jms_1.1_spec-1.1.1.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/geronimo-jta_1.1_spec-1.1.1.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/jdbc-binding-0.17.0.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/mysql-connector-j-8.0.33.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/mysql-connector-java-8.0.30.jar:/home/hduser/ycsb-0.1

In [51]:
df_from_mysql.show()

[Stage 30:>                                                         (0 + 1) / 1]

+-----------+----------+----------+--------+---------------+--------------------+--------+
|tweet_index|       ids|      date|    flag|           user|  concatenated_words|YCSB_KEY|
+-----------+----------+----------+--------+---------------+--------------------+--------+
|          1|1467810672|2009-04-07|NO_QUERY|  scotthamilton|upset,update,face...|    null|
|          2|1467810917|2009-04-07|NO_QUERY|       mattycus|kenichan,dived,ma...|    null|
|          3|1467811184|2009-04-07|NO_QUERY|        ElleCTF|whole,body,feel,i...|    null|
|          4|1467811193|2009-04-07|NO_QUERY|         Karoli|nationwideclass,b...|    null|
|          5|1467811372|2009-04-07|NO_QUERY|       joy_wolf| kwesidei,whole,crew|    null|
|          6|1467811592|2009-04-07|NO_QUERY|        mybirch|            need,hug|    null|
|          7|1467811594|2009-04-07|NO_QUERY|           coZZ|loltrish,hey,long...|    null|
|          8|1467811795|2009-04-07|NO_QUERY|2Hood4Hollywood| tatiana,k,nope,didn|    null|

                                                                                

In [52]:
df_from_mysql.printSchema()

root
 |-- tweet_index: integer (nullable = true)
 |-- ids: long (nullable = true)
 |-- date: date (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- concatenated_words: string (nullable = true)
 |-- YCSB_KEY: string (nullable = true)



### Due to unidentified Issue I decided to use cProfile instead of YCSB

In [53]:
# Örnek bir sorgu
query = "SELECT * FROM Tweets WHERE concatenated_words"

In [54]:
import cProfile

def perform_query():
    cursor = connection.cursor()
    cursor.execute(query)
    results = cursor.fetchall()
    cursor.close()

if __name__ == '__main__':
    cProfile.run("perform_query()", sort="cumulative")

         1131333 function calls in 2.047 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    2.047    2.047 {built-in method builtins.exec}
        1    0.001    0.001    2.047    2.047 <string>:1(<module>)
        1    0.000    0.000    2.046    2.046 3302925674.py:3(perform_query)
        1    0.000    0.000    2.046    2.046 cursors.py:133(execute)
        1    0.000    0.000    2.046    2.046 cursors.py:319(_query)
        1    0.000    0.000    1.994    1.994 connections.py:552(query)
        1    0.000    0.000    1.991    1.991 connections.py:810(_read_query_result)
        1    0.000    0.000    1.991    1.991 connections.py:1198(read)
        1    0.000    0.000    1.972    1.972 connections.py:1281(_read_result_packet)
        1    0.051    0.051    1.972    1.972 connections.py:1327(_read_rowdata_packet)
    16867    0.045    0.000    1.475    0.000 connections.py:730(_read_packet)
  

    Total calls: 1,131,181
    Total time: 3.526 seconds

Top time-consuming functions:

    {built-in method builtins.exec}: 3.526 seconds
    <string>:1(<module>): 3.526 seconds
    3302925674.py:3(perform_query): 3.524 seconds
    cursors.py:133(execute): 3.524 seconds
    cursors.py:319(_query): 3.524 seconds

# Hive

In [55]:
df.createOrReplaceTempView("temp_table")

In [56]:
# create_table_sql = """
# CREATE TABLE ProjectTweets (
#     tweet_index INT,
#     ids BIGINT,
#     date DATE,
#     flag STRING,
#     user STRING,
#     filtered_words_final STRING
# )
# STORED AS PARQUET
# """
# spark.sql(create_table_sql)

In [57]:
# hive_insert_data_sql = """
# INSERT INTO ProjectTweets SELECT * FROM temp_table
# """

In [58]:
# spark.sql(hive_insert_data_sql)

In [59]:
# Query
result = spark.sql("SELECT * FROM ProjectTweets")

# Show Result
result.show()

2023-11-06 13:03:24,623 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2023-11-06 13:03:24,633 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2023-11-06 13:03:28,698 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
2023-11-06 13:03:28,698 WARN metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore hduser@127.0.1.1
[Stage 31:>                                                         (0 + 1) / 1]

+-----------+----------+----------+--------+---------------+--------------------+
|tweet_index|       ids|      date|    flag|           user|filtered_words_final|
+-----------+----------+----------+--------+---------------+--------------------+
|          1|1467810672|2009-04-07|NO_QUERY|  scotthamilton|upset,update,face...|
|          2|1467810917|2009-04-07|NO_QUERY|       mattycus|kenichan,dived,ma...|
|          3|1467811184|2009-04-07|NO_QUERY|        ElleCTF|whole,body,feel,i...|
|          4|1467811193|2009-04-07|NO_QUERY|         Karoli|nationwideclass,b...|
|          5|1467811372|2009-04-07|NO_QUERY|       joy_wolf| kwesidei,whole,crew|
|          6|1467811592|2009-04-07|NO_QUERY|        mybirch|            need,hug|
|          7|1467811594|2009-04-07|NO_QUERY|           coZZ|loltrish,hey,long...|
|          8|1467811795|2009-04-07|NO_QUERY|2Hood4Hollywood| tatiana,k,nope,didn|
|          9|1467812025|2009-04-07|NO_QUERY|        mimismo|  twittera,que,muera|
|         10|146

                                                                                

In [60]:
import subprocess

command = "/home/hduser/ycsb-0.17.0/bin/ycsb.sh load jdbc -P /home/hduser/ycsb-0.17.0/jdbc-binding/conf/db.properties -P /home/hduser/ycsb-0.17.0/workloads/workloada -p db.connection_properties=\"user=root&password=password\" -p jdbc.url=jdbc:hive2://hive_server:10000/ProjectTweets"

process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()

if process.returncode == 0:
    print("YCSB operation completed successfully.")
    print("Output:")
    print(stdout.decode('utf-8'))
else:
    print("YCSB operation failed. Error message:")
    print(stderr.decode('utf-8'))

YCSB operation completed successfully.
Output:
/usr/bin/java  -classpath /home/hduser/ycsb-0.17.0/conf:/home/hduser/ycsb-0.17.0/lib/HdrHistogram-2.1.4.jar:/home/hduser/ycsb-0.17.0/lib/core-0.17.0.jar:/home/hduser/ycsb-0.17.0/lib/htrace-core4-4.1.0-incubating.jar:/home/hduser/ycsb-0.17.0/lib/jackson-core-asl-1.9.4.jar:/home/hduser/ycsb-0.17.0/lib/jackson-mapper-asl-1.9.4.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/conf:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/commons-collections-3.2.1.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/commons-lang-2.4.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/commons-pool-1.5.4.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/geronimo-jms_1.1_spec-1.1.1.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/geronimo-jta_1.1_spec-1.1.1.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/jdbc-binding-0.17.0.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/mysql-connector-j-8.0.33.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/mysql-connector-java-8.0.30.jar:/home/hduser/ycsb-0.1

In [61]:
import pstats

In [62]:
# my_hive_script.py
def hive_query():
    query
    pass

if __name__ == "__main__":
    cProfile.run("hive_query()", sort="cumulative")
    
    # İşte çıktıyı görüntülemek için pstats modülünü kullanın:
    p = pstats.Stats()
    p.print_stats()

         4 function calls in 0.000 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.000    0.000 {built-in method builtins.exec}
        1    0.000    0.000    0.000    0.000 <string>:1(<module>)
        1    0.000    0.000    0.000    0.000 2046835300.py:2(hive_query)
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}


         0 function calls in 0.000 seconds



# SENTIMENT ANALYSIS AFTER TOKENIZATION, LEMMATIZATION AND STOPWORDS REMOVAL

In [63]:
df_for_sentiment = df.select('date', 'concatenated_words')

In [64]:
# Vader SentimentIntensityAnalyzer'ı oluşturun
sia = SentimentIntensityAnalyzer()

# UDF için bir işlev tanımlayın
def analyze_sentiment(text):
    sentiment = sia.polarity_scores(text)
    return sentiment['compound']

# UDF'yi kaydedin
sentiment_udf = udf(analyze_sentiment, DoubleType())

In [65]:
# Vader analizini uygulayın ve sonuçları yeni bir sütuna ekleyin
df_for_sentiment = df_for_sentiment.withColumn("sentiment_score", sentiment_udf(df_for_sentiment["concatenated_words"]))

# Sonuçları göstermek için ilk birkaç satırı görüntüleyebilirsiniz
df_for_sentiment.show()

[Stage 32:>                                                         (0 + 1) / 1]

+----------+--------------------+---------------+
|      date|  concatenated_words|sentiment_score|
+----------+--------------------+---------------+
|2009-04-07|upset,update,face...|            0.0|
|2009-04-07|kenichan,dived,ma...|            0.0|
|2009-04-07|whole,body,feel,i...|            0.0|
|2009-04-07|nationwideclass,b...|            0.0|
|2009-04-07| kwesidei,whole,crew|            0.0|
|2009-04-07|            need,hug|            0.0|
|2009-04-07|loltrish,hey,long...|            0.0|
|2009-04-07| tatiana,k,nope,didn|            0.0|
|2009-04-07|  twittera,que,muera|            0.0|
|2009-04-07|spring,break,plai...|            0.0|
|2009-04-07|      re,pierced,ear|            0.0|
|2009-04-07|caregiving,couldn...|            0.0|
|2009-04-07|octolinz16,count,...|            0.0|
|2009-04-07|smarrison,ve,firs...|            0.0|
|2009-04-07|iamjazzyfizzle,wi...|            0.0|
|2009-04-07|hollis,death,scen...|            0.0|
|2009-04-07|            file,tax|            0.0|


                                                                                

In [66]:
# "date" sütununu 'yyyy-MM-dd' formatına dönüştürün
df_for_sentiment = df_for_sentiment.withColumn("date", F.to_date(df_for_sentiment["date"]))

# Tarih ve ortalama sentiment puanları için bir veri çerçevesi oluşturun
daily_sentiment = df_for_sentiment.groupBy("date").agg(avg("sentiment_score").alias("avg_sentiment_score")).orderBy("date")

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [67]:
# Tüm tarih aralığını içerecek şekilde bir tam tarih dizisi oluşturun
min_date = daily_sentiment.selectExpr("min(date) as min_date").first().min_date
max_date = daily_sentiment.selectExpr("max(date) as max_date").first().max_date

                                                                                

In [68]:
# Tarih dizisini oluşturun (tüm günleri içerecek)
date_range = [min_date + timedelta(days=x) for x in range((max_date - min_date).days + 1)]
date_range_df = spark.createDataFrame([(date,) for date in date_range], ["date"])

In [69]:
# Eksik tarihleri doldurun
daily_sentiment = date_range_df.join(daily_sentiment, on=["date"], how="left").orderBy("date").fillna(0, subset=["avg_sentiment_score"])

In [70]:
daily_sentiment.show()



+----------+--------------------+
|      date| avg_sentiment_score|
+----------+--------------------+
|2009-04-07|-2.22104499274310...|
|2009-04-08|                 0.0|
|2009-04-09|                 0.0|
|2009-04-10|                 0.0|
|2009-04-11|                 0.0|
|2009-04-12|                 0.0|
|2009-04-13|                 0.0|
|2009-04-14|                 0.0|
|2009-04-15|                 0.0|
|2009-04-16|                 0.0|
|2009-04-17|                 0.0|
|2009-04-18|-2.23096950161170...|
|2009-04-19|-1.03626373626373...|
|2009-04-20|-7.62942483872716...|
|2009-04-21|3.864925709140026E-5|
|2009-04-22|                 0.0|
|2009-04-23|                 0.0|
|2009-04-24|                 0.0|
|2009-04-25|                 0.0|
|2009-04-26|                 0.0|
+----------+--------------------+
only showing top 20 rows





In [71]:
# Dash uygulamasını başlatın
#app = dash.Dash(__name__)

In [72]:
# Uygulamanın düzenini oluşturun
app.layout = html.Div([
    dcc.Graph(
        id='sentiment-line-chart',
        figure=px.line(daily_sentiment, x='date', y='avg_sentiment_score', title='Daily Average Sentiment Score')
    )
])

if __name__ == '__main__':
    app.run_server(debug=True)

                                                                                