In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install tweet-preprocessor

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install vaderSentiment

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pyspark as spark
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import col,udf,monotonically_increasing_id,unix_timestamp,round,avg
import re
sc = spark.SparkContext()
sql = spark.SQLContext(sc)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
tw = pd.read_csv("/content/drive/MyDrive/DS 340W/tweet_data.csv")
large_tw = pd.read_csv("/content/drive/MyDrive/DS 340W/large_tweet_data_1.csv")
btc = pd.read_csv("/content/drive/MyDrive/DS 340W/Bitstamp_BTCUSD_1h.csv")
eth = pd.read_csv("/content/drive/MyDrive/DS 340W/Bitstamp_ETHUSD_1h.csv")
ltc = pd.read_csv("/content/drive/MyDrive/DS 340W/Bitstamp_LTCUSD_1h.csv")
reddit = pd.read_csv("/content/drive/MyDrive/DS 340W/reddit_data_sentiment_bucketized.csv")

In [None]:
#getting rid of full empty rows
tw = tw.dropna()
large_tw = large_tw.dropna()

In [None]:
# creating pandas df and then changing it to pyspark df
twDF=sql.createDataFrame(tw)
BtcDF=sql.createDataFrame(btc)
EthDF=sql.createDataFrame(eth)
LtcDF=sql.createDataFrame(ltc)
large_twDF=sql.createDataFrame(large_tw)
RedditDF = sql.createDataFrame(reddit)

## Pre-process twitter dataframe

In [None]:
import preprocessor as p 
# cleaning each tweet using tweet-preprocessor like removing hashtags,urls,emojis....
def function_udf(input_str):
    input_str = re.sub(r'RT', '', input_str)
    p.set_options(p.OPT.URL, p.OPT.EMOJI,p.OPT.MENTION)
    input_str = p.clean(input_str)
    return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", input_str).split())
func_udf = udf(function_udf, StringType())

# clean small twitter dataset
CleanDF = twDF.withColumn('CleanedTweets', func_udf(twDF['Tweet']))
# clean large twitter dataset
Clean_large_DF = large_twDF.withColumn('CleanedTweets', func_udf(large_twDF['Tweet']))
CleanDF.show(3)

+--------------------+---------------+--------------------+--------------------+
|                Date|           User|               Tweet|       CleanedTweets|
+--------------------+---------------+--------------------+--------------------+
|2022-09-30 22:38:...|    nayibbukele|Shots fired! 👀\n...| Shots fired Bitcoin|
|2022-09-30 20:18:...|BitcoinMagazine|BREAKING - 🇸🇻 P...|BREAKING Presiden...|
|2022-09-30 19:10:...|BitcoinMagazine|JUST IN - Michael...|JUST IN Michael S...|
+--------------------+---------------+--------------------+--------------------+
only showing top 3 rows



## Sentiment Analysis with Vader packages

In [None]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyser = SentimentIntensityAnalyzer()
def senti_score_udf(sentence):
    snt = analyser.polarity_scores(sentence)
    return ([snt['neg'], snt['neu'], snt['pos'], snt['compound']])
func_udf2 = udf(senti_score_udf, ArrayType(FloatType()))
# sentiment small twitter dataframe
CleanDF = CleanDF.withColumn('p_neg', func_udf2(CleanDF['CleanedTweets'])[0])
CleanDF = CleanDF.withColumn('p_neu', func_udf2(CleanDF['CleanedTweets'])[1])
CleanDF = CleanDF.withColumn('p_pos', func_udf2(CleanDF['CleanedTweets'])[2])
CleanDF = CleanDF.withColumn('p_comp', func_udf2(CleanDF['CleanedTweets'])[3])

# sentiment large twitter dataframe 
Clean_large_DF = Clean_large_DF.withColumn('p_neg', func_udf2(Clean_large_DF['CleanedTweets'])[0])
Clean_large_DF = Clean_large_DF.withColumn('p_neu', func_udf2(Clean_large_DF['CleanedTweets'])[0])
Clean_large_DF = Clean_large_DF.withColumn('p_pos', func_udf2(Clean_large_DF['CleanedTweets'])[0])
Clean_large_DF = Clean_large_DF.withColumn('p_comp', func_udf2(Clean_large_DF['CleanedTweets'])[0])
CleanDF.show(3)
Clean_large_DF.show(3)

+--------------------+---------------+--------------------+--------------------+-----+-----+-----+-------+
|                Date|           User|               Tweet|       CleanedTweets|p_neg|p_neu|p_pos| p_comp|
+--------------------+---------------+--------------------+--------------------+-----+-----+-----+-------+
|2022-09-30 22:38:...|    nayibbukele|Shots fired! 👀\n...| Shots fired Bitcoin|0.643|0.357|  0.0|-0.5574|
|2022-09-30 20:18:...|BitcoinMagazine|BREAKING - 🇸🇻 P...|BREAKING Presiden...|  0.0|0.807|0.193| 0.5789|
|2022-09-30 19:10:...|BitcoinMagazine|JUST IN - Michael...|JUST IN Michael S...|  0.0|  1.0|  0.0|    0.0|
+--------------------+---------------+--------------------+--------------------+-----+-----+-----+-------+
only showing top 3 rows

+--------------------+--------------+--------------------+--------------------+-----+-----+-----+------+
|                Date|          User|               Tweet|       CleanedTweets|p_neg|p_neu|p_pos|p_comp|
+------------------

In [None]:
CleanDF = CleanDF.withColumn("Clean_date",CleanDF['Date'].cast(TimestampType()))
FinalTW = CleanDF.selectExpr("Clean_date as Date_Time", "CleanedTweets", "p_neg", "p_neu", "p_pos", "p_comp")
FinalTW.show(5)

Clean_large_DF = Clean_large_DF.withColumn("Clean_date",Clean_large_DF['Date'].cast(TimestampType()))
FinalLargeTW = CleanDF.selectExpr("Clean_date as Date_Time", "CleanedTweets", "p_neg", "p_neu", "p_pos", "p_comp")
FinalLargeTW.show(5)


+-------------------+--------------------+-----+-----+-----+-------+
|          Date_Time|       CleanedTweets|p_neg|p_neu|p_pos| p_comp|
+-------------------+--------------------+-----+-----+-----+-------+
|2022-09-30 22:38:59| Shots fired Bitcoin|0.643|0.357|  0.0|-0.5574|
|2022-09-30 20:18:48|BREAKING Presiden...|  0.0|0.807|0.193| 0.5789|
|2022-09-30 19:10:02|JUST IN Michael S...|  0.0|  1.0|  0.0|    0.0|
|2022-09-30 18:56:38|The engineers at ...|  0.0|0.677|0.323| 0.9231|
|2022-09-30 17:58:08|People call Bitco...|  0.0|  1.0|  0.0|    0.0|
+-------------------+--------------------+-----+-----+-----+-------+
only showing top 5 rows

+-------------------+--------------------+-----+-----+-----+-------+
|          Date_Time|       CleanedTweets|p_neg|p_neu|p_pos| p_comp|
+-------------------+--------------------+-----+-----+-----+-------+
|2022-09-30 22:38:59| Shots fired Bitcoin|0.643|0.357|  0.0|-0.5574|
|2022-09-30 20:18:48|BREAKING Presiden...|  0.0|0.807|0.193| 0.5789|
|2022-09-

## Pre-process Crypto data

In [None]:
from datetime import datetime 
from dateutil import parser
def Time_format(input_str): #manipulating and casting the strings(DateTime) of dataframe to timestamps
    input_str = re.sub(r'/', '-', input_str)
    input_str += ':00'
    return input_str[:10]+""+input_str[10:]
func_udf = udf(Time_format, StringType())
FullDataBtc = BtcDF.withColumn('Cleaned_BTC_Time', func_udf(BtcDF['date']))
FullDataEth = EthDF.withColumn('Cleaned_ETH_Time', func_udf(EthDF['date']))
FullDataLtc = LtcDF.withColumn('Cleaned_LTC_Time', func_udf(LtcDF['date']))

# limit the date frim 2018-10-01 00:00 to 2022-10-01 00:00
FullDataBtc = FullDataBtc.filter(  (FullDataBtc.unix >= 1538352000) & (FullDataBtc.unix <= 1664582400))
FullDataEth = FullDataEth.filter(  (FullDataEth.unix >= 1538352000) & (FullDataEth.unix <= 1664582400))
FullDataLtc = FullDataLtc.filter(  (FullDataLtc.unix >= 1538352000) & (FullDataLtc.unix <= 1664582400))

In [None]:
# change the column name with space to _
for col in FullDataBtc.columns:
  FullDataBtc = FullDataBtc.withColumnRenamed(col,col.replace(" ", "_"))

for col in FullDataEth.columns:
  FullDataEth = FullDataEth.withColumnRenamed(col,col.replace(" ", "_"))

for col in FullDataLtc.columns:
  FullDataLtc = FullDataLtc.withColumnRenamed(col,col.replace(" ", "_"))

In [None]:
#  casting to timesstamp and  changing col names
CleandfBtc = FullDataBtc.withColumn("Cleaned_BTC_Time_New",FullDataBtc['Cleaned_BTC_Time'].cast(TimestampType()))
FinalBtc = CleandfBtc.selectExpr("Cleaned_BTC_Time_New as Date_Time", "open as open_BTCUSD", "high as high_BTCUSD", "low as low_BTCUSD", "close as close_BTCUSD", "Volume_BTC as volume_BTCUSD")
FinalBtc.show(5)

CleandfEth = FullDataEth.withColumn("Cleaned_ETH_Time_New",FullDataEth['Cleaned_ETH_Time'].cast(TimestampType()))
FinalEth = CleandfEth.selectExpr("Cleaned_ETH_Time_New as Date_Time", "close as close_ETHUSD", "Volume_ETH as volume_ETHUSD")
FinalEth.show(5)

CleandfLtc = FullDataLtc.withColumn("Cleaned_LTC_Time_New",FullDataLtc['Cleaned_LTC_Time'].cast(TimestampType()))
FinalLtc = CleandfLtc.selectExpr("Cleaned_LTC_Time_New as Date_Time", "close as close_LTCUSD", "Volume_LTC as volume_LTCUSD")
FinalEth.show(5)

+-------------------+-----------+-----------+----------+------------+-------------+
|          Date_Time|open_BTCUSD|high_BTCUSD|low_BTCUSD|close_BTCUSD|volume_BTCUSD|
+-------------------+-----------+-----------+----------+------------+-------------+
|2022-10-01 00:00:00|    19419.0|    19484.0|   19358.0|     19399.0|  18.86122559|
|2022-09-30 23:00:00|    19387.0|    19475.0|   19362.0|     19425.0|   15.1325225|
|2022-09-30 22:00:00|    19356.0|    19400.0|   19243.0|     19387.0|  43.42625664|
|2022-09-30 21:00:00|    19411.0|    19419.0|   19295.0|     19361.0|  45.40571271|
|2022-09-30 20:00:00|    19486.0|    19535.0|   19364.0|     19422.0|  92.53103366|
+-------------------+-----------+-----------+----------+------------+-------------+
only showing top 5 rows

+-------------------+------------+-------------+
|          Date_Time|close_ETHUSD|volume_ETHUSD|
+-------------------+------------+-------------+
|2022-10-01 00:00:00|      1327.1|  153.2411798|
|2022-09-30 23:00:00|  

In [None]:
# join Bitcoin, ethereum, and Litecoin dataframe together by date_time
FinalCrypto = FinalBtc.join(FinalEth, 'Date_time')
FinalCrypto = FinalCrypto.join(FinalLtc, 'Date_time')

In [None]:
FinalCrypto.orderBy("Date_time").show()

+-------------------+-----------+-----------+----------+------------+-------------+------------+-------------+------------+-------------+
|          Date_Time|open_BTCUSD|high_BTCUSD|low_BTCUSD|close_BTCUSD|volume_BTCUSD|close_ETHUSD|volume_ETHUSD|close_LTCUSD|volume_LTCUSD|
+-------------------+-----------+-----------+----------+------------+-------------+------------+-------------+------------+-------------+
|2018-10-01 00:00:00|    6597.81|    6624.79|   6597.77|     6623.02|   1195874.09|      232.42|    131905.06|       61.47|     55712.42|
|2018-10-01 01:00:00|    6623.02|    6638.15|   6600.31|     6606.85|    2005262.8|      232.62|     89554.37|       61.26|     79309.16|
|2018-10-01 02:00:00|    6606.85|     6618.9|    6599.2|      6605.1|    602412.47|      232.08|     96407.68|       61.44|      9026.95|
|2018-10-01 03:00:00|     6605.1|    6628.95|    6605.1|     6618.96|    480489.07|      232.01|    109914.02|        61.4|      44891.1|
|2018-10-01 04:00:00|    6618.96| 

## Truncate timestamps to hours and group by hour

In [None]:
FinalTW = FinalTW.withColumn('unix_time',unix_timestamp('Date_Time', 'yyyy-MM-dd')).orderBy('unix_time')
FinalTW = FinalTW.withColumn('rounded_time', (round(FinalTW.unix_time/3600)*3600).cast("timestamp"))
FinalTW = FinalTW.selectExpr("rounded_time as Date_Time","CleanedTweets","p_neg","p_neu","p_pos","p_comp")
FinalTW.show(5)

FinalLargeTW = FinalLargeTW.withColumn('unix_time',unix_timestamp('Date_Time', 'yyyy-MM-dd')).orderBy('unix_time')
FinalLargeTW = FinalLargeTW.withColumn('rounded_time', (round(FinalLargeTW.unix_time/3600)*3600).cast("timestamp"))
FinalLargeTW = FinalLargeTW.selectExpr("rounded_time as Date_Time","CleanedTweets","p_neg","p_neu","p_pos","p_comp")
FinalLargeTW.show(5)

+-------------------+--------------------+-----+-----+-----+-------+
|          Date_Time|       CleanedTweets|p_neg|p_neu|p_pos| p_comp|
+-------------------+--------------------+-----+-----+-----+-------+
|2018-10-02 12:00:00|There will only b...|  0.0|  1.0|  0.0|    0.0|
|2018-10-03 01:00:00|Today the Federal...|  0.0|0.905|0.095| 0.4588|
|2018-10-05 11:00:00|This is really bi...|  0.0|0.786|0.214| 0.8399|
|2018-10-07 22:00:00|History will be u...|0.167|0.833|  0.0|-0.3818|
|2018-10-08 05:00:00|             Bitcoin|  0.0|  1.0|  0.0|    0.0|
+-------------------+--------------------+-----+-----+-----+-------+
only showing top 5 rows

+-------------------+--------------------+-----+-----+-----+-------+
|          Date_Time|       CleanedTweets|p_neg|p_neu|p_pos| p_comp|
+-------------------+--------------------+-----+-----+-----+-------+
|2018-10-02 12:00:00|There will only b...|  0.0|  1.0|  0.0|    0.0|
|2018-10-03 01:00:00|Today the Federal...|  0.0|0.905|0.095| 0.4588|
|2018-10-

In [None]:
FinalTW.registerTempTable("temp")
FinalTW_avg = sql.sql("SELECT Date_Time As DateTime,AVG(p_neg) as P_Neg,AVG(p_neu) as P_Neu,AVG(p_pos) as P_Pos,AVG(p_comp) as P_Comp FROM temp GROUP BY Date_Time")
FinalTW_avg.orderBy("DateTime").show(20)

+-------------------+-------------------+------------------+-------------------+--------------------+
|           DateTime|              P_Neg|             P_Neu|              P_Pos|              P_Comp|
+-------------------+-------------------+------------------+-------------------+--------------------+
|2018-10-02 12:00:00|                0.0|               1.0|                0.0|                 0.0|
|2018-10-03 01:00:00|                0.0|0.9049999713897705| 0.0949999988079071| 0.45879998803138733|
|2018-10-05 11:00:00|                0.0|0.7860000133514404|0.21400000154972076|   0.839900016784668|
|2018-10-07 22:00:00|0.16699999570846558|0.8330000042915344|                0.0|-0.38179999589920044|
|2018-10-08 05:00:00|                0.0|               1.0|                0.0|                 0.0|
|2018-10-10 01:00:00|                0.0|0.5799999833106995|0.41999998688697815|  0.7002999782562256|
|2018-10-11 14:00:00|0.11999999731779099| 0.824999988079071| 0.0560000017285347| -

## Join twitter datafrane and crypto dataframe by datetime

In [None]:
results = FinalCrypto.join(FinalTW_avg, FinalCrypto.Date_Time == FinalTW_avg.DateTime, "left")
results = results.join(RedditDF, results.Date_Time == RedditDF.timestamp, "left")
# add your reddit column 
results = results.selectExpr("Date_Time", "P_Neg", "P_Neu", "P_Pos", "P_Comp", "open_BTCUSD", "high_BTCUSD", "low_BTCUSD", "close_BTCUSD", "volume_BTCUSD", "close_ETHUSD", "volume_ETHUSD", "close_LTCUSD", "volume_LTCUSD", "reddit_flair", "reddit_tb_polarity", "reddit_tb_subjectivity", "reddit_sid_pos", "reddit_sid_neg", "reddit_sid_neu", "reddit_sid_com")
results = results.na.fill(0)
results = results.orderBy("Date_Time")

In [None]:
results.show(10)

+-------------------+-----+-----+-----+------+-----------+-----------+----------+------------+-------------+------------+-------------+------------+-------------+-------------------+-------------------+----------------------+------------------+--------------+------------------+--------------+
|          Date_Time|P_Neg|P_Neu|P_Pos|P_Comp|open_BTCUSD|high_BTCUSD|low_BTCUSD|close_BTCUSD|volume_BTCUSD|close_ETHUSD|volume_ETHUSD|close_LTCUSD|volume_LTCUSD|       reddit_flair| reddit_tb_polarity|reddit_tb_subjectivity|    reddit_sid_pos|reddit_sid_neg|    reddit_sid_neu|reddit_sid_com|
+-------------------+-----+-----+-----+------+-----------+-----------+----------+------------+-------------+------------+-------------+------------+-------------+-------------------+-------------------+----------------------+------------------+--------------+------------------+--------------+
|2018-10-01 00:00:00|  0.0|  0.0|  0.0|   0.0|    6597.81|    6624.79|   6597.77|     6623.02|   1195874.09|      232.

In [None]:
# write df to csv 
Final = results.toPandas()
Final.to_csv("/content/drive/MyDrive/DS 340W/FinalData.csv", index = False)