In [2]:
#import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

## Load training data from Minio ##

In [10]:
from minio import Minio
client = Minio("minio:9000",access_key="ffADHWAu70PFl1IscfLH",secret_key="lvJ71G3aJLgPvA556Y3RByUUyhdVwtP0qQ9muFVF", secure=False)
obj = client.get_object("miniobucket", "sentiment_train_data.csv")

In [12]:
import pandas as pd
df=pd.read_csv(obj)

In [13]:
df.head()

Unnamed: 0,sentence,sentiment
0,awww that s a bummer you shoulda got david car...,0
1,is upset that he can t update his facebook by ...,0
2,i dived many times for the ball managed to sav...,0
3,my whole body feels itchy and like its on fire,0
4,no it s not behaving at all i m mad why am i h...,0


In [17]:
#save the csv to Jupyter container
sentiment=df.to_csv('sentiment_train-data.csv')

# Initializing Spark #

In [3]:
appName="Sentimen Analysis Spark"
spark=SparkSession.builder.master('local').appName(appName).getOrCreate()

In [4]:
tweet_csv=spark.read.csv("sentiment_train-data.csv", inferSchema=True, header=True)
tweet_csv.show(truncate=False, n=3)

+---+---------------------------------------------------------------------------------------------------------+---------+
|_c0|sentence                                                                                                 |sentiment|
+---+---------------------------------------------------------------------------------------------------------+---------+
|0  |awww that s a bummer you shoulda got david carr of third day to do it d                                  |0        |
|1  |is upset that he can t update his facebook by texting it and might cry as a result school today also blah|0        |
|2  |i dived many times for the ball managed to save the rest go out of bounds                                |0        |
+---+---------------------------------------------------------------------------------------------------------+---------+
only showing top 3 rows



In [6]:
data=tweet_csv.select(
    "sentence",col("sentiment").cast("Int").alias("label"))
data.show(truncate=False, n=5)

+---------------------------------------------------------------------------------------------------------+-----+
|sentence                                                                                                 |label|
+---------------------------------------------------------------------------------------------------------+-----+
|awww that s a bummer you shoulda got david carr of third day to do it d                                  |0    |
|is upset that he can t update his facebook by texting it and might cry as a result school today also blah|0    |
|i dived many times for the ball managed to save the rest go out of bounds                                |0    |
|my whole body feels itchy and like its on fire                                                           |0    |
|no it s not behaving at all i m mad why am i here because i can t see you all over there                 |0    |
+---------------------------------------------------------------------------------------

In [7]:
#Split training and Testing
split_data=data.randomSplit([0.7,0.3])
train=split_data[0]

#label in test renamed to true label
test=split_data[1].withColumnRenamed("label","true_label")
train_rows=train.count()
test_rows=test.count()

print("Total train :",train_rows)
print("Total test :", test_rows)

Total train : 1067018
Total test : 456957


In [8]:
#Prepare data
tokenizer = Tokenizer(inputCol="sentence", outputCol="SentimentWords")
tokenizedTrain = tokenizer.transform(train)
tokenizedTrain.show(truncate=False,n=5)

+--------+-----+--------------+
|sentence|label|SentimentWords|
+--------+-----+--------------+
|        |0    |[]            |
|        |1    |[]            |
|        |0    |[]            |
|        |1    |[]            |
|        |0    |[]            |
+--------+-----+--------------+
only showing top 5 rows



In [9]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=False, n=5)

+--------+-----+--------------+---------------+
|sentence|label|SentimentWords|MeaningfulWords|
+--------+-----+--------------+---------------+
|        |0    |[]            |[]             |
|        |1    |[]            |[]             |
|        |0    |[]            |[]             |
|        |1    |[]            |[]             |
|        |0    |[]            |[]             |
+--------+-----+--------------+---------------+
only showing top 5 rows



In [10]:
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTrain = hashTF.transform(SwRemovedTrain).select(
    'label', 'MeaningfulWords', 'features')
numericTrain.show(n=3)

+-----+---------------+--------------+
|label|MeaningfulWords|      features|
+-----+---------------+--------------+
|    0|             []|(262144,[],[])|
|    1|             []|(262144,[],[])|
|    0|             []|(262144,[],[])|
+-----+---------------+--------------+
only showing top 3 rows



In [11]:
#Training Model
lr = LogisticRegression(labelCol="label", featuresCol="features", 
                        maxIter=10, regParam=0.01)
model = lr.fit(numericTrain)
print ("Training Done")

Training Done


In [12]:
# Specify the path where you want to save the model
model_lr = "work/notebooks/lr"

# Save the model
model.save(model_lr)

### Loading Test data from Minio ###

In [13]:
from minio import Minio 
client = Minio("minio:9000",access_key="ffADHWAu70PFl1IscfLH",secret_key="lvJ71G3aJLgPvA556Y3RByUUyhdVwtP0qQ9muFVF", secure=False)
obj = client.get_object("miniobucket", "tweets_train.csv")

In [14]:
import pandas as pd
df = pd.read_csv(obj)
df.head(10)

Unnamed: 0,use ledger trezor monero desktop wallet. wouldn't xmr exchange
0,"""institutional investors"". turns out, bitcoin ..."
1,"i'm old remember buy entire bitcoin $19,000"
2,rt : parts world getting ready celebrate new y...
3,"i'm old remember buy entire bitcoin $19,000"
4,rt : parts world getting ready celebrate new y...
5,"i'm old remember buy entire bitcoin $19,000"
6,rt : parts world getting ready celebrate new y...
7,"i'm old remember buy entire bitcoin $19,000"
8,rt : parts world getting ready celebrate new y...
9,"i'm old remember buy entire bitcoin $19,000"


In [15]:
#Giving this dataset a header
columns = ['sentence']
df.columns=columns
df.head()

Unnamed: 0,sentence
0,"""institutional investors"". turns out, bitcoin ..."
1,"i'm old remember buy entire bitcoin $19,000"
2,rt : parts world getting ready celebrate new y...
3,"i'm old remember buy entire bitcoin $19,000"
4,rt : parts world getting ready celebrate new y...


In [18]:
tweets_test=df.to_csv('tweets_test.csv')

In [16]:
tweets_test=spark.read.csv("tweets_test.csv", inferSchema=True, header=True)
tweets_test.show(truncate=False, n=3)

+---+---------------------------------------------------------------------------------------+
|_c0|sentence                                                                               |
+---+---------------------------------------------------------------------------------------+
|0  |"""institutional investors"". turns out                                                |
|1  |i'm old remember buy entire bitcoin $19,000                                            |
|2  |rt : parts world getting ready celebrate new year, kept buying new bitcoin. looks like…|
+---+---------------------------------------------------------------------------------------+
only showing top 3 rows



In [17]:
#Prepare Testing data
tokenizedTest = tokenizer.transform(tweets_test)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest)
numericTest.show(truncate=False, n=2)

+---+-------------------------------------------+---------------------------------------------------+----------------------------------------------+----------------------------------------------------------------------------+
|_c0|sentence                                   |SentimentWords                                     |MeaningfulWords                               |features                                                                    |
+---+-------------------------------------------+---------------------------------------------------+----------------------------------------------+----------------------------------------------------------------------------+
|0  |"""institutional investors"". turns out    |["""institutional, investors""., turns, out]       |["""institutional, investors""., turns]       |(262144,[16108,166604,239666],[1.0,1.0,1.0])                                |
|1  |i'm old remember buy entire bitcoin $19,000|[i'm, old, remember, buy, entire, bitcoin, $19,

In [18]:
#Prediction
raw_prediction = model.transform(numericTest)
raw_prediction.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- sentence: string (nullable = true)
 |-- SentimentWords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- MeaningfulWords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [23]:
Final_prediction = raw_prediction.select("sentence", "prediction")
Final_prediction.show(n=4, truncate = False)

+---------------------------------------------------------------------------------------+----------+
|sentence                                                                               |prediction|
+---------------------------------------------------------------------------------------+----------+
|"""institutional investors"". turns out                                                |0.0       |
|i'm old remember buy entire bitcoin $19,000                                            |0.0       |
|rt : parts world getting ready celebrate new year, kept buying new bitcoin. looks like…|1.0       |
|i'm old remember buy entire bitcoin $19,000                                            |0.0       |
+---------------------------------------------------------------------------------------+----------+
only showing top 4 rows



In [25]:
#merge with original dataset
final=spark.read.csv("processed_tweets_sa.csv", inferSchema=True, header=True)
final.show(truncate=False, n=3)

+------+---------------------------------------------------------------------------------------+--------------------+------------+--------+----+-----+------------+----------+----------+-----+-----+------+------+
|5168.0|use ledger trezor monero desktop wallet. wouldn't xmr exchange                         |01/01/2021          |aantonop    |2       |52  |30930|(monero,xmr)|2021-01-01|XMR       |1.010|1.011|2021.0|NEG   |
+------+---------------------------------------------------------------------------------------+--------------------+------------+--------+----+-----+------------+----------+----------+-----+-----+------+------+
|5167.0|"""institutional investors"". turns out                                                | bitcoin criminals."|02/01/2021  |aantonop|3   |26   |30930       |(bitcoin) |2021-01-02|BTC  |2.0  |1.0   |2021.0|
|1722.0|i'm old remember buy entire bitcoin $19,000                                            |1/2/2021 15:06      |BarrySilbert|455     |7550|10818|(b

In [26]:
final.printSchema()

root
 |-- 5168.0: string (nullable = true)
 |-- use ledger trezor monero desktop wallet. wouldn't xmr exchange: string (nullable = true)
 |-- 01/01/2021: string (nullable = true)
 |-- aantonop: string (nullable = true)
 |-- 2: string (nullable = true)
 |-- 52: string (nullable = true)
 |-- 30930: string (nullable = true)
 |-- (monero,xmr): string (nullable = true)
 |-- 2021-01-01: string (nullable = true)
 |-- XMR: string (nullable = true)
 |-- 1.010: string (nullable = true)
 |-- 1.011: string (nullable = true)
 |-- 2021.0: string (nullable = true)
 |-- NEG: string (nullable = true)



In [46]:
# Provide column names
new_column_names = ['id', 'tweetText', 'time', 'TwitterName', 'Retweets', 'Favorites', 'likes', 'new_coin','dateNew','Symbol','Day','Month','Year','label']

# Rename columns
for i, column_name in enumerate(final.columns):
    final = final.withColumnRenamed(column_name, new_column_names[i])

# Display the DataFrame with renamed columns
final.show()

+-------+--------------------+--------------------+------------+--------+---------+-----+----------------+----------+----------+---+-----+------+------+
|     id|           tweetText|                time| TwitterName|Retweets|Favorites|likes|        new_coin|   dateNew|    Symbol|Day|Month|  Year| label|
+-------+--------------------+--------------------+------------+--------+---------+-----+----------------+----------+----------+---+-----+------+------+
| 5167.0|"""institutional ...| bitcoin criminals."|  02/01/2021|aantonop|        3|   26|           30930| (bitcoin)|2021-01-02|BTC|  2.0|   1.0|2021.0|
| 1722.0|i'm old remember ...|      1/2/2021 15:06|BarrySilbert|     455|     7550|10818|       (bitcoin)|2021-01-02|       BTC|2.0|  1.0|2021.0|   POS|
| 1723.0|rt : parts world ...|       1/2/2021 2:23|BarrySilbert|      91|        0|10818|       (bitcoin)|2021-01-02|       BTC|2.0|  1.0|2021.0|   NEU|
| 3592.0|i'm old remember ...|      1/2/2021 15:06|BarrySilbert|     455|     7550

In [49]:
final = final.withColumnRenamed("tweetText", "sentence")

In [50]:
final.show()

+-------+--------------------+--------------------+------------+--------+---------+-----+----------------+----------+----------+---+-----+------+------+
|     id|            sentence|                time| TwitterName|Retweets|Favorites|likes|        new_coin|   dateNew|    Symbol|Day|Month|  Year| label|
+-------+--------------------+--------------------+------------+--------+---------+-----+----------------+----------+----------+---+-----+------+------+
| 5167.0|"""institutional ...| bitcoin criminals."|  02/01/2021|aantonop|        3|   26|           30930| (bitcoin)|2021-01-02|BTC|  2.0|   1.0|2021.0|
| 1722.0|i'm old remember ...|      1/2/2021 15:06|BarrySilbert|     455|     7550|10818|       (bitcoin)|2021-01-02|       BTC|2.0|  1.0|2021.0|   POS|
| 1723.0|rt : parts world ...|       1/2/2021 2:23|BarrySilbert|      91|        0|10818|       (bitcoin)|2021-01-02|       BTC|2.0|  1.0|2021.0|   NEU|
| 3592.0|i'm old remember ...|      1/2/2021 15:06|BarrySilbert|     455|     7550

In [51]:
# Join the DataFrames based on the common column
joined_df = final.join(Final_prediction, on="sentence", how="inner")

# Show the joined DataFrame
joined_df.show()

+--------------------+------+--------------------+------------+--------+---------+-----+---------+----------+----------+---+-----+------+------+----------+
|            sentence|    id|                time| TwitterName|Retweets|Favorites|likes| new_coin|   dateNew|    Symbol|Day|Month|  Year| label|prediction|
+--------------------+------+--------------------+------------+--------+---------+-----+---------+----------+----------+---+-----+------+------+----------+
|"""institutional ...|5167.0| bitcoin criminals."|  02/01/2021|aantonop|        3|   26|    30930| (bitcoin)|2021-01-02|BTC|  2.0|   1.0|2021.0|       0.0|
|i'm old remember ...|1722.0|      1/2/2021 15:06|BarrySilbert|     455|     7550|10818|(bitcoin)|2021-01-02|       BTC|2.0|  1.0|2021.0|   POS|       0.0|
|i'm old remember ...|1722.0|      1/2/2021 15:06|BarrySilbert|     455|     7550|10818|(bitcoin)|2021-01-02|       BTC|2.0|  1.0|2021.0|   POS|       0.0|
|i'm old remember ...|1722.0|      1/2/2021 15:06|BarrySilbert| 

In [52]:
#Let's merge our original dataframe with the prediction results
# Select columns from df1 and df2 to create a new DataFrame
spark_sa_results = joined_df.select("id", "sentence","time", "TwitterName", "Retweets", "Favorites", "Likes", "dateNew", "Symbol", "Day", "Month", "Year", "prediction")

# Show the new DataFrame
spark_sa_results.show()

+------+--------------------+--------------------+------------+--------+---------+-----+----------+----------+---+-----+------+----------+
|    id|            sentence|                time| TwitterName|Retweets|Favorites|Likes|   dateNew|    Symbol|Day|Month|  Year|prediction|
+------+--------------------+--------------------+------------+--------+---------+-----+----------+----------+---+-----+------+----------+
|5167.0|"""institutional ...| bitcoin criminals."|  02/01/2021|aantonop|        3|   26| (bitcoin)|2021-01-02|BTC|  2.0|   1.0|       0.0|
|1722.0|i'm old remember ...|      1/2/2021 15:06|BarrySilbert|     455|     7550|10818|2021-01-02|       BTC|2.0|  1.0|2021.0|       0.0|
|1722.0|i'm old remember ...|      1/2/2021 15:06|BarrySilbert|     455|     7550|10818|2021-01-02|       BTC|2.0|  1.0|2021.0|       0.0|
|1722.0|i'm old remember ...|      1/2/2021 15:06|BarrySilbert|     455|     7550|10818|2021-01-02|       BTC|2.0|  1.0|2021.0|       0.0|
|1722.0|i'm old remember ..

In [54]:
spark_sa_results.write.csv('spark_sa_results.csv', header = True, mode = 'overwrite')

In [78]:
SparkSession.stop(spark)

# Load sentiment analysis to Postgres #

In [1]:
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,array,ArrayType,DateType,TimestampType, FloatType
from pyspark.sql import functions as f
from pyspark.sql.functions import udf
import hashlib
import datetime
import urllib.request
import json
from datetime import timedelta, date
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark import SQLContext
from itertools import islice
from pyspark.sql.functions import col
import sys

In [2]:
# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("load-postgres")
         # Add postgres jar
         .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-42.4.0.jar")
         .getOrCreate())
sc = spark.sparkContext

In [3]:
spark_sa=spark.read.csv("spark_sa_results/spark_sentiment_analysis.csv", inferSchema=True, header=True)
spark_sa.show(truncate=True, n=20)

+------+--------------------+-------------------+------------+--------+---------+-----+----------+----------+---+-----+------+----------+
|    id|            sentence|               time| TwitterName|Retweets|Favorites|Likes|   dateNew|    Symbol|Day|Month|  Year|prediction|
+------+--------------------+-------------------+------------+--------+---------+-----+----------+----------+---+-----+------+----------+
|5167.0|"""institutional ...|bitcoin criminals."|  02/01/2021|aantonop|        3|   26| (bitcoin)|2021-01-02|BTC|  2.0|   1.0|       0.0|
|1722.0|i'm old remember ...|     1/2/2021 15:06|BarrySilbert|     455|     7550|10818|2021-01-02|       BTC|2.0|  1.0|2021.0|       0.0|
|1722.0|i'm old remember ...|     1/2/2021 15:06|BarrySilbert|     455|     7550|10818|2021-01-02|       BTC|2.0|  1.0|2021.0|       0.0|
|1722.0|i'm old remember ...|     1/2/2021 15:06|BarrySilbert|     455|     7550|10818|2021-01-02|       BTC|2.0|  1.0|2021.0|       0.0|
|1722.0|i'm old remember ...|     

In [4]:
spark_sa.write.format("jdbc").option("url", "jdbc:postgresql://host.docker.internal:5432/airflow").option("dbtable", "public.spark_sa").option("user", "airflow").option("password", "airflow").mode("append").save()