In [88]:
sc

In [98]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

In [99]:
# Load the ProjectTweets.csv into hadoop in the named folder 'user1'
df = spark.read.csv('/user1/ProjectTweets.csv', header=False, inferSchema=True)

                                                                                

In [100]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: long (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



In [101]:
df.show(5)

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+---+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows



In [102]:
# Drop the _c1 column unused
df = df.drop('_c1')

In [103]:
# Named the columns of df
df = df.withColumnRenamed('_c0', 'id') \
       .withColumnRenamed('_c2', 'date') \
       .withColumnRenamed('_c3', 'flag') \
       .withColumnRenamed('_c4', 'user') \
       .withColumnRenamed('_c5', 'text')

In [104]:
# Display the structure of schema
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [105]:
# Display the first 5 rows of the data
df.show(5)

+---+--------------------+--------+---------------+--------------------+
| id|                date|    flag|           user|                text|
+---+--------------------+--------+---------------+--------------------+
|  0|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+---+--------------------+--------+---------------+--------------------+
only showing top 5 rows



In [9]:
df.write.format("mongodb") \
    .option("uri", "mongodb://127.0.0.1:27001/") \
    .option("database", "tweet_mongo") \
    .option("collection", "tweet_collection") \
    .mode("append").save()

                                                                                

In [36]:
# MySQL JDBC connection
jdbc_url = "jdbc:mysql://127.0.0.1:3306/tweet_mysql"

# MySQL user and pass
properties = {
    "user": "root",
    "password": "kalem",
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [11]:
df.write.jdbc(url=jdbc_url, table="tweets", mode="overwrite", properties=properties)

                                                                                

In [12]:
# print of data shape
print('Shape of dataset:',(df.count(),len(df.columns)))

[Stage 6:>                                                          (0 + 2) / 2]

Shape of dataset: (1600000, 5)


                                                                                

In [13]:
# Display the summary statistics of the data
df.describe().show()



+-------+------------------+--------------------+--------+-------------------+--------------------+
|summary|                id|                date|    flag|               user|                text|
+-------+------------------+--------------------+--------+-------------------+--------------------+
|  count|           1600000|             1600000| 1600000|            1600000|             1600000|
|   mean|          799999.5|                null|    null|4.325887521835714E9|                null|
| stddev|461880.35968924535|                null|    null|5.16273321845489E10|                null|
|    min|                 0|Fri Apr 17 20:30:...|NO_QUERY|       000catnap000|                 ...|
|    max|           1599999|Wed May 27 07:27:...|NO_QUERY|         zzzzeus111|ï¿½ï¿½ï¿½ï¿½ï¿½ß§...|
+-------+------------------+--------------------+--------+-------------------+--------------------+



                                                                                

In [39]:
# YCSB test for MYSQL
!/home/hduser/ycsb-0.17.0/bin/ycsb.sh run jdbc -P /home/hduser/ycsb-0.17.0/workloads/workloada -p db.url=jdbc:mysql://localhost:3306/tweet_mysql -p db.user=root -p db.passwd=kalem -p db.driver=com.mysql.cj.jdbc.Driver

/usr/bin/java  -classpath /home/hduser/ycsb-0.17.0/conf:/home/hduser/ycsb-0.17.0/lib/HdrHistogram-2.1.4.jar:/home/hduser/ycsb-0.17.0/lib/core-0.17.0.jar:/home/hduser/ycsb-0.17.0/lib/htrace-core4-4.1.0-incubating.jar:/home/hduser/ycsb-0.17.0/lib/jackson-core-asl-1.9.4.jar:/home/hduser/ycsb-0.17.0/lib/jackson-mapper-asl-1.9.4.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/conf:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/commons-collections-3.2.1.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/commons-lang-2.4.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/commons-pool-1.5.4.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/geronimo-jms_1.1_spec-1.1.1.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/geronimo-jta_1.1_spec-1.1.1.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/jdbc-binding-0.17.0.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/mysql-connector-j-8.0.33.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/openjpa-jdbc-2.1.1.jar:/home/hduser/ycsb-0.17.0/jdbc-binding/lib/openjpa-kernel-2.1.1.jar:/home/hdus

In [42]:
# YCSB test for MONGO DB
!/home/hduser/ycsb-0.17.0/bin/ycsb.sh run mongodb -P /home/hduser/ycsb-0.17.0/workloads/workloada -p mongodb.url=mongodb://localhost:27017 -p mongodb.database=tweet_mongo 


/usr/bin/java  -classpath /home/hduser/ycsb-0.17.0/conf:/home/hduser/ycsb-0.17.0/lib/HdrHistogram-2.1.4.jar:/home/hduser/ycsb-0.17.0/lib/core-0.17.0.jar:/home/hduser/ycsb-0.17.0/lib/htrace-core4-4.1.0-incubating.jar:/home/hduser/ycsb-0.17.0/lib/jackson-core-asl-1.9.4.jar:/home/hduser/ycsb-0.17.0/lib/jackson-mapper-asl-1.9.4.jar:/home/hduser/ycsb-0.17.0/mongodb-binding/lib/logback-classic-1.1.2.jar:/home/hduser/ycsb-0.17.0/mongodb-binding/lib/logback-core-1.1.2.jar:/home/hduser/ycsb-0.17.0/mongodb-binding/lib/mongo-java-driver-3.8.0.jar:/home/hduser/ycsb-0.17.0/mongodb-binding/lib/mongodb-async-driver-2.0.1.jar:/home/hduser/ycsb-0.17.0/mongodb-binding/lib/mongodb-binding-0.17.0.jar:/home/hduser/ycsb-0.17.0/mongodb-binding/lib/slf4j-api-1.7.25.jar:/home/hduser/ycsb-0.17.0/mongodb-binding/lib/snappy-java-1.1.7.1.jar site.ycsb.Client -t -db site.ycsb.db.MongoDbClient -P /home/hduser/ycsb-0.17.0/workloads/workloada -p mongodb.url=mongodb://localhost:27017 -p mongodb.database=tweet_mongo
Com

Nothing updated for key user6184078644860972864
Nothing updated for key user2996251037375572516
Nothing updated for key user7460881851310751334
Nothing updated for key user4640687271668624146
Nothing updated for key user4152828024211893584
Nothing updated for key user1573987489603120213
Nothing updated for key user4626287080316522538
Nothing updated for key user176616418245585776
Nothing updated for key user2538312093647326066
Nothing updated for key user7345341813716561613
Nothing updated for key user6290028003198785340
Nothing updated for key user8502137115770724564
Nothing updated for key user5339761945212382530
Nothing updated for key user6400663543555865497
Nothing updated for key user4987430714527649991
Nothing updated for key user4278642031651057680
Nothing updated for key user3107446538244793121
Nothing updated for key user7123510772490260851
Nothing updated for key user6762367138279133398
Nothing updated for key user195921106834796948
Nothing updated for key user51182725492803

Nothing updated for key user2413276361938144301
Nothing updated for key user4153387984724034032
Nothing updated for key user6877907175873323119
Nothing updated for key user5817347222824138717
Nothing updated for key user1573987489603120213
Nothing updated for key user7934122592197538405
Nothing updated for key user6772299462829809208
Nothing updated for key user3121846729596894729
Nothing updated for key user1574547450115260661
Nothing updated for key user759249448388715426
[OVERALL], RunTime(ms), 718
[OVERALL], Throughput(ops/sec), 1392.757660167131
[TOTAL_GCS_PS_Scavenge], Count, 2
[TOTAL_GC_TIME_PS_Scavenge], Time(ms), 8
[TOTAL_GC_TIME_%_PS_Scavenge], Time(%), 1.1142061281337048
[TOTAL_GCS_PS_MarkSweep], Count, 0
[TOTAL_GC_TIME_PS_MarkSweep], Time(ms), 0
[TOTAL_GC_TIME_%_PS_MarkSweep], Time(%), 0.0
[TOTAL_GCs], Count, 2
[TOTAL_GC_TIME], Time(ms), 8
[TOTAL_GC_TIME_%], Time(%), 1.1142061281337048
[UPDATE-FAILED], Operations, 484
[UPDATE-FAILED], AverageLatency(us

In [106]:
import re

def clean_text_advanced(text):
    cleaned_text = re.sub(r'(http[s]?://\S+)|(www\.\S+)', '', text)
    cleaned_text = re.sub(r'@[A-Za-z0-9_]+', '', cleaned_text)
    cleaned_text = re.sub(r'[^a-zA-Z\s]', '', cleaned_text)
    cleaned_text = re.sub(r'\s+', ' ', cleaned_text)
    cleaned_text = cleaned_text.lower()
    
    return cleaned_text.strip() 

In [107]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

clean_text_udf = udf(lambda text: clean_text_advanced(text) if text else '', StringType())

df = df.withColumn('cleaned_text', clean_text_udf(df['text']))

df.show(5)


[Stage 41:>                                                         (0 + 1) / 1]

+---+--------------------+--------+---------------+--------------------+--------------------+
| id|                date|    flag|           user|                text|        cleaned_text|
+---+--------------------+--------+---------------+--------------------+--------------------+
|  0|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|awww thats a bumm...|
|  1|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|is upset that he ...|
|  2|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|i dived many time...|
|  3|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|my whole body fee...|
|  4|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|no its not behavi...|
+---+--------------------+--------+---------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [108]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- cleaned_text: string (nullable = true)



In [109]:
df.first()

Row(id=0, date='Mon Apr 06 22:19:45 PDT 2009', flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", cleaned_text='awww thats a bummer you shoulda got david carr of third day to do it d')

In [110]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- cleaned_text: string (nullable = true)



In [111]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from textblob import TextBlob

# Duygu analizi fonksiyonu
def analyze_sentiment(text):
    blob = TextBlob(text)
    return blob.sentiment.polarity

# UDF (User Defined Function) tanımlama
sentiment_udf = udf(analyze_sentiment, FloatType())

# Duygu analizini uygula ve sonucu yeni bir sütuna ekle
df = df.withColumn('sentiment', sentiment_udf(df['cleaned_text']))

# İlk beş gözlemi göster
df.select('cleaned_text', 'sentiment').show(20)


[Stage 43:>                                                         (0 + 1) / 1]

+--------------------+-----------+
|        cleaned_text|  sentiment|
+--------------------+-----------+
|awww thats a bumm...|        0.2|
|is upset that he ...|        0.0|
|i dived many time...|        0.5|
|my whole body fee...|        0.2|
|no its not behavi...|     -0.625|
|  not the whole crew|        0.2|
|          need a hug|        0.0|
|hey long time no ...| 0.27333334|
|nope they didnt h...|        0.0|
|        que me muera|        0.0|
|spring break in p...|-0.21428572|
|i just repierced ...|        0.0|
|i couldnt bear to...|        0.0|
|it it counts idk ...|        0.0|
|i wouldve been th...|      0.075|
|i wish i got to w...|        0.0|
|hollis death scen...|        0.0|
| about to file taxes|        0.0|
|ahh ive always wa...|        0.5|
|oh dear were you ...|        0.0|
+--------------------+-----------+
only showing top 20 rows



                                                                                

In [112]:
from pyspark.sql.functions import when
from pyspark.sql.types import IntegerType

# Etiketleme işlemini gerçekleştiren fonksiyon
def label_sentiment(score):
    if score > 0:
        return 1
    elif score < 0:
        return -1
    else:
        return 0

# UDF (User Defined Function) tanımlama
label_udf = udf(label_sentiment, IntegerType())

# Sentiment skorlarına göre etiketleme yap
df = df.withColumn('sentiment_label', label_udf(df['sentiment']))

# İlk beş gözlemi göster
df.show(5)




+---+--------------------+--------+---------------+--------------------+--------------------+---------+---------------+
| id|                date|    flag|           user|                text|        cleaned_text|sentiment|sentiment_label|
+---+--------------------+--------+---------------+--------------------+--------------------+---------+---------------+
|  0|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|awww thats a bumm...|      0.2|              1|
|  1|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|is upset that he ...|      0.0|              0|
|  2|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|i dived many time...|      0.5|              1|
|  3|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|my whole body fee...|      0.2|              1|
|  4|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|no its not behavi...|   -0.625|             -1|
+---+--------------------+--------+-----

[Stage 44:>                                                         (0 + 1) / 1]                                                                                

In [113]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- cleaned_text: string (nullable = true)
 |-- sentiment: float (nullable = true)
 |-- sentiment_label: integer (nullable = true)



In [114]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


In [115]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import to_date, col

df = df.withColumn("date", to_date(col("date"), "EEE MMM dd HH:mm:ss z yyyy"))

In [116]:
df.first()



Row(id=0, date=datetime.date(2009, 4, 7), flag='NO_QUERY', user='_TheSpecialOne_', text="@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D", cleaned_text='awww thats a bummer you shoulda got david carr of third day to do it d', sentiment=0.20000000298023224, sentiment_label=1)

In [117]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- cleaned_text: string (nullable = true)
 |-- sentiment: float (nullable = true)
 |-- sentiment_label: integer (nullable = true)



In [118]:
df.write.jdbc(url=jdbc_url, table="tweets", mode="overwrite", properties=properties)

                                                                                

In [43]:
df = spark.read.jdbc(url=jdbc_url, table="tweets", properties=properties)

In [119]:
df = df.withColumn("date", col("date").cast("timestamp"))

df = df.orderBy("date")
min_date = df.agg({"date": "min"}).collect()[0][0]
max_date = df.agg({"date": "max"}).collect()[0][0]
print("Min Date:", min_date)
print("Max Date:", max_date)

[Stage 50:>                                                         (0 + 2) / 2]

En küçük tarih: 2009-04-07 00:00:00
En büyük tarih: 2009-06-25 00:00:00




In [46]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Temizlenmiş tweet verilerini alın
tweets = df.select('cleaned_text').rdd.flatMap(lambda x: x).collect()

# Tokenizer'ı oluşturun
tokenizer = Tokenizer()
tokenizer.fit_on_texts(tweets)

# Kelimeleri sayısal formata dönüştürün
sequences = tokenizer.texts_to_sequences(tweets)

# Tüm dizileri aynı uzunluğa getirin (dizi dolgusu)
padded_sequences = pad_sequences(sequences)


                                                                                

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense
from sklearn.model_selection import train_test_split
import numpy as np

# labels dizisi NumPy dizisine çevrildiği varsayılarak devam ediyor
labels = np.array(df.select('sentiment_label').rdd.flatMap(lambda x: x).collect())

# Veriyi eğitim ve test setlerine bölün
X_train, X_test, y_train, y_test = train_test_split(padded_sequences, labels, test_size=0.2, random_state=42)

# Modeli oluştur
model = Sequential()
model.add(Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=50, input_length=padded_sequences.shape[1]))
model.add(LSTM(100))
model.add(Dense(1, activation='sigmoid'))

# Modeli derle
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Modeli eğit
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=5, batch_size=64)


                                                                                

In [47]:
from sklearn.model_selection import train_test_split
import numpy as np

# labels = df.select('sentiment').rdd.flatMap(lambda x: x).collect()
labels = np.array(df.select('sentiment_label').rdd.flatMap(lambda x: x).collect())

# Ardından eğitim ve test setlerini oluşturabilirsiniz
X_train, X_test, y_train, y_test = train_test_split(padded_sequences, labels, test_size=0.2, random_state=42)


                                                                                

In [49]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense

# Modeli oluştur
model = Sequential()
model.add(Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=50, input_length=padded_sequences.shape[1]))
model.add(LSTM(100))
model.add(Dense(1, activation='sigmoid'))

# Modeli derle
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Modeli eğit
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=5, batch_size=64)


2023-11-12 20:37:05.438142: W tensorflow/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 83776000 exceeds 10% of free system memory.
2023-11-12 20:37:05.716549: W tensorflow/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 83776000 exceeds 10% of free system memory.
2023-11-12 20:37:06.350559: W tensorflow/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 83776000 exceeds 10% of free system memory.


Epoch 1/5


2023-11-12 20:37:07.238256: W tensorflow/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 204800000 exceeds 10% of free system memory.
2023-11-12 20:37:07.624459: W tensorflow/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 10240000 exceeds 10% of free system memory.


 4064/20000 [=====>........................] - ETA: 35:05 - loss: -34.0944 - accuracy: 0.6754

KeyboardInterrupt: 

In [None]:
import plotly.express as px

# Weekly predictions
fig_week = px.line(predictions_week.toPandas(), x='date', y='prediction', title='Weekly Predictions')

# Monthly predictions
fig_month = px.line(predictions_month.toPandas(), x='date', y='prediction', title='Monthly Predictions')

# 3 Months predictions
fig_3months = px.line(predictions_3months.toPandas(), x='date', y='prediction', title='3-Month Predictions')

# Dashboard
import dash
from dash import dcc, html

app = dash.Dash(__name__)

app.layout = html.Div(children=[
    html.H1(children='Sentiment Analysis Dashboard'),
    
    html.Div(children='''
        Weekly Predictions
    '''),

    dcc.Graph(
        id='fig_week',
        figure=fig_week
    ),

    html.Div(children='''
        Monthly Predictions
    '''),

    dcc.Graph(
        id='fig_month',
        figure=fig_month
    ),

    html.Div(children='''
        3-Month Predictions
    '''),

    dcc.Graph(
        id='fig_3months',
        figure=fig_3months
    )
])

if __name__ == '__main__':
    app.run_server(debug=True)
