In [6]:
from pyspark.sql.types import TimestampType, DecimalType, LongType, StructType, StructField, MapType, StringType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from nltk.sentiment import SentimentIntensityAnalyzer
import json


PRICE_FILE_PATH = '/Users/salmanmukhi/Downloads/BTC-USD_2017-2022.csv'
TWEETS_FILE_PATH = '/Users/salmanmukhi/Downloads/BTC_tweets.csv'


def create_spark_session():
    """Create Spark Session"""

    return (SparkSession
            .builder
            .appName('test')
            .master("local[*]")
            .enableHiveSupport()
            .getOrCreate()
           )


def read_stock_price_data(spark: SparkSession, file_path: str) -> DataFrame:
    """Read stock price CSV file from Yahoo Finance"""

    schema = StructType([
        StructField("Date", TimestampType(), True),
        StructField("Open", DecimalType(10,2), True),
        StructField("High", DecimalType(10,2), True),
        StructField("Low", DecimalType(10,2), True),
        StructField("Close", DecimalType(10,2), True),
        StructField("Adj Close", DecimalType(10,2), True),
        StructField("Volume", LongType(), True),
    ])
    df = (spark.read.csv(PRICE_FILE_PATH, header=True, schema=schema)
        .withColumnRenamed("Adj Close", "Adj_Close")
        .withColumnRenamed("Date", "TradeDate")
        .withColumn("Ticker", F.lit("BTC"))
        )

    return df


def read_tweets_data(spark: SparkSession, file_path: str) -> DataFrame:
    """Read raw tweets CSV file"""

    schema = StructType([
        StructField("id",StringType(),True),
        StructField("user",StringType(),True),
        StructField("fullname",StringType(),True),
        StructField("url",StringType(),True),
        StructField("timestamp",TimestampType(),True),
        StructField("replies",StringType(),True),
        StructField("likes",LongType(),True),
        StructField("retweets",LongType(),True),
        StructField("text",StringType(),True)]
    )
    tweets_df = (spark
        .read
        .option('delimiter', ';')
        .csv(file_path, header=True, schema=schema))
    
    # TODO: Remove Limit on Dataframe when running on cluster
    tweets_df_subset = tweets_df.limit(50000)

    return (tweets_df_subset
        .withColumnRenamed("timestamp", "tweet_timestamp")
        .where('text IS NOT NULL'))


def transform_stock_price_data(df: DataFrame,
                               window: int = 7,
                               bollinger_window: int = 20,
                               bollinger_stdvs: int = 2) -> DataFrame:
    """Add rolling averages and Bollinger Bands to dataframe"""

    df.createOrReplaceTempView('btc')
    df_transformed = spark.sql(f"""
        WITH btc_transform1 AS
            (SELECT
            *,
            AVG(Open) OVER(
                PARTITION BY Ticker
                ORDER BY TradeDate ASC
                RANGE BETWEEN INTERVAL {window} DAYS PRECEDING AND CURRENT ROW) AS rolling_{window}_day_avg,
            AVG(Close) OVER(
                PARTITION BY Ticker
                ORDER BY TradeDate ASC
                RANGE BETWEEN INTERVAL {bollinger_window} DAYS PRECEDING AND CURRENT ROW) AS bollinger_rolling_avg,
            STDDEV(Close) OVER(
                PARTITION BY Ticker
                ORDER BY TradeDate ASC
                RANGE BETWEEN INTERVAL {bollinger_window} DAYS PRECEDING AND CURRENT ROW) AS bollinger_rolling_std
            FROM btc)

        SELECT
        *,
        (bollinger_rolling_avg - (bollinger_rolling_std * {bollinger_stdvs})) AS bollinger_band_lower,
        (bollinger_rolling_avg + (bollinger_rolling_std * {bollinger_stdvs})) AS bollinger_band_upper
        FROM btc_transform1
        """)
    
    return df_transformed


def analyze_tweet_sentiment(tweets_df: DataFrame) -> DataFrame:
    """
        Use NLTK's pretrained SentimentIntensityAnalyzer to measure the sentiment
        of each tweet. Filter out tweets such as those in different languages that will default to a score
        of neutral=1.0.
    """
    
    sia = SentimentIntensityAnalyzer()

    # Wrap function in UDF
    def get_tweet_sentiment(tweet: str) -> str:
        sentiment_score = sia.polarity_scores(tweet)
        return json.dumps(sentiment_score)

    sentiment_udf = F.udf(get_tweet_sentiment, StringType())
    # sentiment_udf = F.udf(get_tweet_sentiment, MapType(StringType(), DoubleType()))
    return (tweets_df
        .withColumn('tweets_analyzed', sentiment_udf('text'))
        .select("*",
            F.get_json_object('tweets_analyzed', '$.neg').cast(DecimalType(4,3)).alias('negative'),
            F.get_json_object('tweets_analyzed', '$.neu').cast(DecimalType(4,3)).alias('neutral'),
            F.get_json_object('tweets_analyzed', '$.pos').cast(DecimalType(4,3)).alias('positive'),
            F.get_json_object('tweets_analyzed', '$.compound').cast(DecimalType(4,3)).alias('compound')
        )
        .where('neutral != 1.0')
    )


def aggregate_tweet_sentiment(df):
    """Aggregate sentiment data from individual tweets to a daily level of granularity"""

    df.createOrReplaceTempView('tweet_sentiment_unagg')
    df_agg = spark.sql("""
    WITH tweet_sentiment_labaled AS
        (SELECT CAST(tweet_timestamp AS Date) AS tweet_date,
            CASE
                WHEN negative > positive
                    AND negative > neutral
                    THEN 'negative'
                WHEN positive > negative
                    AND positive > neutral
                    THEN 'positive'
                WHEN neutral > positive
                    AND neutral > negative
                    THEN 'neutral'
                END AS overall_sentiment
        FROM
            tweet_sentiment_unagg)
    
    SELECT
        tweet_date,
        AVG(CASE WHEN overall_sentiment = 'positive' THEN 1 ELSE 0 END) AS pct_positive,
        AVG(CASE WHEN overall_sentiment = 'negative' THEN 1 ELSE 0 END) AS pct_negative,
        AVG(CASE WHEN overall_sentiment = 'neutral' THEN 1 ELSE 0 END) AS pct_neutral,
        COUNT(*) AS tweet_volume
    FROM
        tweet_sentiment_labaled
    GROUP BY tweet_date
    """)
    
    return df_agg



if __name__ == "__main__":
    spark = create_spark_session()
    df_price_raw = read_stock_price_data(spark, PRICE_FILE_PATH)
    df_price = transform_stock_price_data(df_price_raw)
    price_cols = ["TradeDate",
                  "rolling_20_day_avg",
                  "bollinger_band_lower",
                  "bollinger_band_upper"]
    df_price.select(price_cols).show()
    df_tweets = read_tweets_data(spark, TWEETS_FILE_PATH)
    df_sentiment = analyze_tweet_sentiment(df_tweets)
    df_sentiment.printSchema()
    tweet_cols = ['tweet_timestamp',
                  'likes',
                  'retweets',
                  'negative',
                  'positive',
                  'neutral',
                  'compound']
    df_sentiment.select(tweet_cols).show()
    df_agg_sentiment = aggregate_tweet_sentiment(df_sentiment)
    df_agg_sentiment.show()
    
    df_joined = df_price.join(df_agg_sentiment, df_price.TradeDate == df_agg_sentiment.tweet_date, 'inner')
    df_joined.show()
    

AnalysisException: cannot resolve 'rolling_20_day_avg' given input columns: [btc_transform1.Adj_Close, btc_transform1.Close, btc_transform1.High, btc_transform1.Low, btc_transform1.Open, btc_transform1.Ticker, btc_transform1.TradeDate, btc_transform1.Volume, bollinger_band_lower, bollinger_band_upper, btc_transform1.bollinger_rolling_avg, btc_transform1.bollinger_rolling_std, btc_transform1.rolling_7_day_avg];
'Project [TradeDate#673, 'rolling_20_day_avg, bollinger_band_lower#690, bollinger_band_upper#691]
+- WithCTE
   :- CTERelationDef 3
   :  +- SubqueryAlias btc_transform1
   :     +- Project [TradeDate#673, Open#652, High#653, Low#654, Close#655, Adj_Close#665, Volume#657L, Ticker#681, rolling_7_day_avg#692, bollinger_rolling_avg#693, bollinger_rolling_std#694]
   :        +- Project [TradeDate#673, Open#652, High#653, Low#654, Close#655, Adj_Close#665, Volume#657L, Ticker#681, _w0#714, rolling_7_day_avg#692, bollinger_rolling_avg#693, bollinger_rolling_std#694, rolling_7_day_avg#692, bollinger_rolling_avg#693, bollinger_rolling_std#694]
   :           +- Window [avg(Open#652) windowspecdefinition(Ticker#681, TradeDate#673 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -INTERVAL '7' DAY, currentrow$())) AS rolling_7_day_avg#692, avg(Close#655) windowspecdefinition(Ticker#681, TradeDate#673 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -INTERVAL '20' DAY, currentrow$())) AS bollinger_rolling_avg#693, stddev(_w0#714) windowspecdefinition(Ticker#681, TradeDate#673 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -INTERVAL '20' DAY, currentrow$())) AS bollinger_rolling_std#694], [Ticker#681], [TradeDate#673 ASC NULLS FIRST]
   :              +- Project [TradeDate#673, Open#652, High#653, Low#654, Close#655, Adj_Close#665, Volume#657L, Ticker#681, cast(Close#655 as double) AS _w0#714]
   :                 +- SubqueryAlias btc
   :                    +- View (`btc`, [TradeDate#673,Open#652,High#653,Low#654,Close#655,Adj_Close#665,Volume#657L,Ticker#681])
   :                       +- Project [TradeDate#673, Open#652, High#653, Low#654, Close#655, Adj_Close#665, Volume#657L, BTC AS Ticker#681]
   :                          +- Project [Date#651 AS TradeDate#673, Open#652, High#653, Low#654, Close#655, Adj_Close#665, Volume#657L]
   :                             +- Project [Date#651, Open#652, High#653, Low#654, Close#655, Adj Close#656 AS Adj_Close#665, Volume#657L]
   :                                +- Relation [Date#651,Open#652,High#653,Low#654,Close#655,Adj Close#656,Volume#657L] csv
   +- Project [TradeDate#673, Open#652, High#653, Low#654, Close#655, Adj_Close#665, Volume#657L, Ticker#681, rolling_7_day_avg#692, bollinger_rolling_avg#693, bollinger_rolling_std#694, (cast(bollinger_rolling_avg#693 as double) - (bollinger_rolling_std#694 * cast(2 as double))) AS bollinger_band_lower#690, (cast(bollinger_rolling_avg#693 as double) + (bollinger_rolling_std#694 * cast(2 as double))) AS bollinger_band_upper#691]
      +- SubqueryAlias btc_transform1
         +- CTERelationRef 3, true, [TradeDate#673, Open#652, High#653, Low#654, Close#655, Adj_Close#665, Volume#657L, Ticker#681, rolling_7_day_avg#692, bollinger_rolling_avg#693, bollinger_rolling_std#694]


In [3]:
df_joined.toPandas()

                                                                                

Unnamed: 0,TradeDate,Open,High,Low,Close,Adj_Close,Volume,Ticker,rolling_7_day_avg,rolling_200_day_avg,rolling_20_day_avg,rolling_20_day_std,bollinger_band_lower,bollinger_band_upper,tweet_date,pct_positive,pct_negative,pct_neutral,tweet_volume
0,2019-05-08,5849.48,5989.98,5794.72,5982.46,5982.46,15320605300,BTC,5655.740000,4469.216716,4466.955473,996.010342,3470.945131,5462.965815,2019-05-08,0.025641,0.000000,0.974359,39
1,2018-09-01,7044.81,7242.29,7038.05,7193.25,7193.25,4116050000,BTC,6903.840000,7910.455075,7901.675124,1377.102412,6524.572712,9278.777536,2018-09-01,0.000000,0.000000,1.000000,1
2,2019-05-27,8674.07,8907.17,8668.71,8805.78,8805.78,27949839564,BTC,8051.415000,4575.811592,4587.443980,1284.345026,3303.098954,5871.789006,2019-05-27,0.062903,0.012903,0.919355,620
3,2019-04-28,5271.75,5326.23,5255.68,5285.14,5285.14,12819992056,BTC,5355.911250,4512.795373,4506.320995,1055.109045,3451.211950,5561.430040,2019-04-28,0.000000,0.000000,1.000000,1
4,2019-03-17,4047.72,4054.12,4006.41,4025.23,4025.23,8221625400,BTC,3947.572500,4905.297015,4890.617313,1359.438495,3531.178818,6250.055808,2019-03-17,0.000000,0.000000,1.000000,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
61,2019-03-31,4105.46,4113.02,4094.10,4105.40,4105.40,9045122443,BTC,4045.777500,4710.204776,4699.571393,1263.280727,3436.290666,5962.852120,2019-03-31,0.000000,0.000000,1.000000,1
62,2019-04-12,5061.20,5103.27,4955.85,5089.54,5089.54,13675206312,BTC,5137.817500,4617.933333,4610.129204,1183.201823,3426.927381,5793.331027,2019-04-12,0.500000,0.000000,0.500000,2
63,2019-05-16,8194.50,8320.82,7729.61,7884.91,7884.91,33167197581,BTC,7088.031250,4493.505323,4500.628308,1084.723988,3415.904320,5585.352296,2019-05-16,0.000000,0.000000,1.000000,3
64,2019-04-02,4156.92,4905.95,4155.32,4879.88,4879.88,21315047816,BTC,4071.153750,4688.269602,4680.511443,1251.787127,3428.724316,5932.298570,2019-04-02,0.000000,0.000000,1.000000,1


In [None]:
# from nltk.sentiment import SentimentIntensityAnalyzer

# sia.polarity_scores("this is awesome!")
# print(sia.polarity_scores("【毎日プレゼント企画】"))
# print(sia.polarity_scores('È appena uscito un nuovo video! LES CRYPTOMONNAIES QUI PULVÉRISENT BITCOIN EN 2019 https://t.co/yCsQMvRnyS'))

In [5]:
# Price to SMA Ratio
# MACD (moving average converging divergence)
# Bollinger Bands
# RSI (Relative Strength Index)
# Stochastic Oscillator