In [1]:
import pyspark
import pandas as pd

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

In [3]:
spark=SparkSession.builder.appName('df').getOrCreate()

In [4]:
df_raw= spark.read.csv('btc.csv', header=True, inferSchema=True)

df_raw_etf = spark.read.csv('btc_etf.csv', header=True, inferSchema=True)

### Schema Inference

In [5]:
df_raw.printSchema()
df_raw.show(3, truncate=False)

df_raw_etf.printSchema()
df_raw_etf.show(3, truncate=False)

root
 |-- Date: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Vol: string (nullable = true)
 |-- Change %: string (nullable = true)

+----------+---------+---------+---------+---------+-------+--------+
|Date      |Price    |Open     |High     |Low      |Vol    |Change %|
+----------+---------+---------+---------+---------+-------+--------+
|12/31/2024|93,557.20|92,777.20|96,163.40|92,036.20|74.85K |0.84%   |
|12/30/2024|92,779.80|93,718.70|94,936.40|91,522.30|112.43K|-1.00%  |
|12/29/2024|93,716.30|95,282.60|95,315.40|93,026.70|47.71K |-1.65%  |
+----------+---------+---------+---------+---------+-------+--------+
only showing top 3 rows
root
 |-- Date: string (nullable = true)
 |-- IBIT: string (nullable = true)
 |-- FBTC: string (nullable = true)
 |-- BITB: string (nullable = true)
 |-- ARKB: string (nullable = true)
 |-- BTCO: string (nullable = true)
 |--

### Cleaned data

1) Idea is to introduce multiple ways to tackle date handlings (i.e padded/unpadded/month)


2) To introduce csv strings into casted types


3) Earliest dates first

In [6]:
df_clean = df_raw \
    .withColumn("Date_Original", col("Date")) \
    .withColumn(
        "full_date",
        coalesce( 
            to_date(col("Date"), "M/d/yyyy"),
            to_date(col("Date"), "MM/dd/yyyy"),
            to_date(col("Date"), "MMM d, yyyy"),
            to_date(col("Date"), "MMMM d, yyyy")
        )
    ) \
    .withColumn("Price_Clean", regexp_replace(col("Price"), "[,$]", "").cast("double")) \
    .withColumn("Open_Clean", regexp_replace(col("Open"), "[,$]", "").cast("double")) \
    .withColumn("High_Clean", regexp_replace(col("High"), "[,$]", "").cast("double")) \
    .withColumn("Low_Clean", regexp_replace(col("Low"), "[,$]", "").cast("double")) \
    .withColumn("Change_Pct", regexp_replace(col("Change %"), "%", "").cast("double")) \
    .withColumn(
        "Volume_Clean",
        when(col("Vol").endswith("K"), regexp_replace(col("Vol"), "K", "").cast("double") * 1000)
        .when(col("Vol").endswith("M"), regexp_replace(col("Vol"), "M", "").cast("double") * 1_000_000)
        .when(col("Vol").endswith("B"), regexp_replace(col("Vol"), "B", "").cast("double") * 1_000_000_000)
        .otherwise(regexp_replace(col("Vol"), "[,$]", "").cast("double"))
    ) \
    .filter(col("full_date").isNotNull()) \
    .orderBy(col("full_date").asc())

df_etf_fixed = df_raw_etf \
    .withColumn("Date_Fixed", 
        regexp_replace(
            regexp_replace(
                regexp_replace(
                    regexp_replace(col("Date"), "Sept", "Sep"),
                "July", "Jul"),
            "June", "Jun"),
        "March", "Mar")
    )

df_etf_clean = df_etf_fixed \
    .withColumn("Date_Original", col("Date")) \
    .withColumn(
        "full_date",
        coalesce( 
            to_date(col("Date_Fixed"), "d-MMM-yy"),
            to_date(col("Date_Fixed"), "dd-MMM-yy"),
            to_date(col("Date_Fixed"), "M/d/yyyy"),
            to_date(col("Date_Fixed"), "MM/dd/yyyy"),
            to_date(col("Date_Fixed"), "MMM d, yyyy"),
            to_date(col("Date_Fixed"), "MMMM d, yyyy")
        )
    ) \
    .withColumn("Total_Clean", regexp_replace(col("Total"), "[,$]", "").cast("double") * 1000000) \
    .filter(col("full_date").isNotNull()) \
    .orderBy(col("full_date").asc())

In [7]:
df_clean.select("full_date", "Price_Clean", "Open_Clean", "High_Clean", "Low_Clean", "Change_Pct").show(5)

df_etf_clean.select("full_date", "Total_Clean").show(5)

+----------+-----------+----------+----------+---------+----------+
| full_date|Price_Clean|Open_Clean|High_Clean|Low_Clean|Change_Pct|
+----------+-----------+----------+----------+---------+----------+
|2023-01-02|    16674.3|   16618.4|   16766.9|  16551.0|      0.34|
|2023-01-03|    16674.2|   16673.1|   16773.2|  16607.2|       0.0|
|2023-01-04|    16852.1|   16674.2|   16976.5|  16656.5|      1.07|
|2023-01-05|    16829.8|   16852.2|   16877.9|  16772.3|     -0.13|
|2023-01-06|    16950.9|   16829.8|   17012.3|  16707.6|      0.72|
+----------+-----------+----------+----------+---------+----------+
only showing top 5 rows
+----------+-----------+
| full_date|Total_Clean|
+----------+-----------+
|2024-01-11|    6.553E8|
|2024-01-12|     2.03E8|
|2024-01-15|        0.0|
|2024-01-16|    -5.27E7|
|2024-01-17|    4.538E8|
+----------+-----------+
only showing top 5 rows


### Star Schema 

Set-up tables for queries

In [8]:
fact_daily_price = df_clean.select(
    monotonically_increasing_id().alias("price_id"),
    col("full_date").alias("date_key"),
    col("Price_Clean").alias("closing_price"),
    col("Open_Clean").alias("opening_price"),
    col("High_Clean").alias("high_price"),
    col("Low_Clean").alias("low_price"),
    col("Volume_Clean").alias("volume"),
    col("Change_Pct").alias("change_percent")
)

In [9]:
dim_date = df_clean.select(
    col("full_date").alias("date_key"),
    year(col("full_date")).alias("year"),
    quarter(col("full_date")).alias("quarter"),
    month(col("full_date")).alias("month"),
    dayofmonth(col("full_date")).alias("day"),
    dayofweek(col("full_date")).alias("day_of_week"),
    weekofyear(col("full_date")).alias("week_of_year"),
    date_format(col("full_date"), "MMMM").alias("month_name"),
    date_format(col("full_date"), "EEEE").alias("day_name")
).distinct().orderBy("date_key")

##### Moving average indicator

Takes weekly, monthly and spiral root monthly averages

In [10]:
window_spec = Window.orderBy("date_key")

fact_with_ma = fact_daily_price \
    .withColumn("ma_7", avg("closing_price").over(window_spec.rowsBetween(-6, 0))) \
    .withColumn("ma_30", avg("closing_price").over(window_spec.rowsBetween(-29, 0))) \
    .withColumn("ma_200", avg("closing_price").over(window_spec.rowsBetween(-199, 0)))

In [11]:
trend = fact_with_ma.select(
    monotonically_increasing_id().alias("trend_id"),
    col("date_key"),
    col("ma_7"),
    col("ma_30"),
    col("ma_200"),
    when((col("ma_30") > col("ma_200")) & (col("closing_price") > col("ma_30")), "Bull Flag")
        .when((col("ma_30") < col("ma_200")) & (col("closing_price") < col("ma_30")), "Bear Flag")
        .otherwise("Chop").alias("trend_type"),
    when(col("ma_7") > col("ma_30"), "Bullish")
        .when(col("ma_7") < col("ma_30"), "Bearish")
        .otherwise("Neutral").alias("short_term_trend")
)

In [12]:
volatility = fact_with_ma.select(
    monotonically_increasing_id().alias("volatility_id"),
    col("date_key"),
    (col("high_price") - col("low_price")).alias("daily_range"),
    (((col("high_price") - col("low_price")) / col("low_price")) * 100).alias("daily_range_percent"),
    stddev("closing_price").over(window_spec.rowsBetween(-6, 0)).alias("volatility_7day")
).withColumn(
    "volatility_category",
    when(col("daily_range_percent") > 5, "High")
        .when(col("daily_range_percent") > 2, "Medium")
        .otherwise("Low")
)

In [13]:
#fact table
fact_daily_price.show(5)

#dimension tables
dim_date.show(5)
trend.select("date_key", "trend_type", "short_term_trend", "ma_7", "ma_30", "ma_200").show(10)
volatility.show(10)

+--------+----------+-------------+-------------+----------+---------+--------+--------------+
|price_id|  date_key|closing_price|opening_price|high_price|low_price|  volume|change_percent|
+--------+----------+-------------+-------------+----------+---------+--------+--------------+
|       0|2023-01-02|      16674.3|      16618.4|   16766.9|  16551.0|136030.0|          0.34|
|       1|2023-01-03|      16674.2|      16673.1|   16773.2|  16607.2|178730.0|           0.0|
|       2|2023-01-04|      16852.1|      16674.2|   16976.5|  16656.5|247390.0|          1.07|
|       3|2023-01-05|      16829.8|      16852.2|   16877.9|  16772.3|178960.0|         -0.13|
|       4|2023-01-06|      16950.9|      16829.8|   17012.3|  16707.6|233470.0|          0.72|
+--------+----------+-------------+-------------+----------+---------+--------+--------------+
only showing top 5 rows
+----------+----+-------+-----+---+-----------+------------+----------+---------+
|  date_key|year|quarter|month|day|day_

## Querying

#### Table registration

In [14]:
fact_daily_price.createOrReplaceTempView("fact_daily_price")
dim_date.createOrReplaceTempView("dim_date")
trend.createOrReplaceTempView("trend")
volatility.createOrReplaceTempView("volatility")

## Chop, bull and bear flags

We can see that generally bullish flags are more present than bearish flags. The trend for data in 2023 to 2024 remains significantly bullish. 
The key trend observed in 2023 and 2024 include the recovery of market from FTX collapse in 2023 due to significant macroeconomic conditions (rate cuts, pro-crypto president and ETF flows).

It can also be seen that despite uptrends being observed, there was significant downtrend or chop/sideways movement in prices due to war (Israel-gaza and Ukraine-Russia conflicts) and hawkish federal sentiments.

In [15]:
bull_bear = spark.sql("""
    SELECT 
        t.trend_type,
        COUNT(*) as days_count,
        ROUND(AVG(f.closing_price), 2) as avg_price,
        ROUND(MIN(f.closing_price), 2) as min_price,
        ROUND(MAX(f.closing_price), 2) as max_price
    FROM fact_daily_price f
    JOIN trend t ON f.date_key = t.date_key
    GROUP BY t.trend_type
    ORDER BY days_count DESC
""")

In [16]:
bull_bear.show()

+----------+----------+---------+---------+---------+
|trend_type|days_count|avg_price|min_price|max_price|
+----------+----------+---------+---------+---------+
| Bull Flag|       350| 52470.62|  22761.8| 106138.9|
|      Chop|       337| 42535.42|  16674.2|  97795.7|
| Bear Flag|        43| 45547.75|  25166.4|  61158.1|
+----------+----------+---------+---------+---------+



In [17]:
yearly_trend = spark.sql("""
    SELECT 
        d.year,
        t.trend_type,
        COUNT(*) as days_in_trend,
        ROUND(AVG(f.closing_price), 2) as avg_price,
        ROUND(MIN(f.closing_price), 2) as min_price,
        ROUND(MAX(f.closing_price), 2) as max_price
    FROM fact_daily_price f
    JOIN dim_date d ON f.date_key = d.date_key
    JOIN trend t ON f.date_key = t.date_key
    GROUP BY d.year, t.trend_type
    ORDER BY d.year, t.trend_type
""")

In [18]:
yearly_trend.show(20)

+----+----------+-------------+---------+---------+---------+
|year|trend_type|days_in_trend|avg_price|min_price|max_price|
+----+----------+-------------+---------+---------+---------+
|2023| Bear Flag|           17| 26134.79|  25166.4|  26876.6|
|2023| Bull Flag|          164| 32250.55|  22761.8|  44175.5|
|2023|      Chop|          183| 26124.97|  16674.2|  42272.5|
|2024| Bear Flag|           26| 58240.83|  53966.8|  61158.1|
|2024| Bull Flag|          186| 70299.07|  42697.2| 106138.9|
|2024|      Chop|          154| 62036.15|  39556.4|  97795.7|
+----+----------+-------------+---------+---------+---------+



In [19]:
volatility_analysis = spark.sql("""
    SELECT 
        t.trend_type,
        v.volatility_category,
        COUNT(*) as days,
        ROUND(AVG(v.daily_range_percent), 2) as avg_daily_range_pct
    FROM fact_daily_price f
    JOIN trend t ON f.date_key = t.date_key
    JOIN volatility v ON f.date_key = v.date_key
    GROUP BY t.trend_type, v.volatility_category
    ORDER BY t.trend_type, days DESC
""")

volatility_analysis.createOrReplaceTempView("volatility_analysis")
sum_volatility = spark.sql("""
    SELECT
        volatility_category,
        SUM(days)
    FROM volatility_analysis
    GROUP BY volatility_category
""")

sum_volatility.show()
volatility_analysis.show()

+-------------------+---------+
|volatility_category|sum(days)|
+-------------------+---------+
|                Low|      179|
|             Medium|      391|
|               High|      160|
+-------------------+---------+

+----------+-------------------+----+-------------------+
|trend_type|volatility_category|days|avg_daily_range_pct|
+----------+-------------------+----+-------------------+
| Bear Flag|             Medium|  26|               3.39|
| Bear Flag|                Low|  12|               1.26|
| Bear Flag|               High|   5|               6.68|
| Bull Flag|             Medium| 194|               3.23|
| Bull Flag|               High|  88|               6.97|
| Bull Flag|                Low|  68|               1.49|
|      Chop|             Medium| 171|               3.31|
|      Chop|                Low|  99|               1.37|
|      Chop|               High|  67|               7.42|
+----------+-------------------+----+-------------------+



### 2024 bull run deeper dive using ETF flows

We can do a comparison by joining similar date keys to fact table for BTC. 

Inflows will generally mean price movement upwards and like wise, outflows will mean price movement downwards.

However, it should be noted that flow data usually is published a day later when the inflow has already happened.

In [20]:
etf_flows = df_etf_clean.select(
    monotonically_increasing_id().alias("flow_id"),
    col("full_date").alias("date_key"),
    col("Total_Clean").alias("total_flow_usd")
)

etf_flows.createOrReplaceTempView("etf_flows")

In [21]:
## Combining tables: 
combined_etf = spark.sql("""
    SELECT 
        f.date_key,
        f.closing_price as btc_price,
        f.opening_price,
        f.high_price,
        f.low_price,
        f.volume as btc_volume,
        COALESCE(e.total_flow_usd, 0) as tot_flow_usd
    FROM fact_daily_price f
    LEFT JOIN etf_flows e ON f.date_key = e.date_key
    WHERE EXTRACT(YEAR FROM f.date_key) = 2024
    ORDER BY f.date_key
""")

In [22]:
combined_etf.createOrReplaceTempView("combined_etf")

## Heavy inflows coincide with key macroeconomic events

1) May - June priced in a rate cut and strong economic data
2) July - Dec priced in a pro-crypto president win as well as speculation on rate cuts.


We can also see from monthly aggregated data, Bitcoin prices increase substantially with greater inflows.

In [23]:
major_flow_events = spark.sql("""
    WITH monthly AS (
        SELECT
            EXTRACT(year FROM date_key) AS yr,
            EXTRACT(MONTH FROM date_key) AS mth,
            AVG(btc_price) AS btc_price,
            SUM(tot_flow_usd) AS total_flow_usd
        FROM combined_etf
        GROUP BY EXTRACT(year FROM date_key), EXTRACT(MONTH FROM date_key))
    SELECT
        yr, mth,
        btc_price,
        ROUND(total_flow_usd / 1000000, 2) AS flow_millions,
        CASE 
            WHEN total_flow_usd > 300000000 THEN 'MASSIVE INFLOW'
            WHEN total_flow_usd < -300000000 THEN 'MASSIVE OUTFLOW'
        END AS event_type
    FROM monthly
    ORDER BY yr, mth
""")

major_flow_events.show(50)

+----+---+------------------+-------------+---------------+
|  yr|mth|         btc_price|flow_millions|     event_type|
+----+---+------------------+-------------+---------------+
|2024|  1| 42921.76774193548|       1459.6| MASSIVE INFLOW|
|2024|  2|49866.606896551726|       6036.2| MASSIVE INFLOW|
|2024|  3| 67682.90322580645|       4636.6| MASSIVE INFLOW|
|2024|  4| 65877.34333333334|       -345.1|MASSIVE OUTFLOW|
|2024|  5|65278.819354838706|       2072.9| MASSIVE INFLOW|
|2024|  6| 65924.87333333332|        666.5| MASSIVE INFLOW|
|2024|  7|62802.716129032255|       3168.5| MASSIVE INFLOW|
|2024|  8| 59913.38064516128|        -92.2|           NULL|
|2024|  9| 60357.47666666667|       1262.8| MASSIVE INFLOW|
|2024| 10| 65609.49677419355|       5348.2| MASSIVE INFLOW|
|2024| 11| 86533.50000000001|       6465.3| MASSIVE INFLOW|
|2024| 12| 98259.53548387098|       4566.6| MASSIVE INFLOW|
+----+---+------------------+-------------+---------------+



#### This query indicates to us that there is a demand for BTC as a speculative asset. 
Inflows outweight outflows by a significant margin and this is one of the reasons why prices have risen in 2024.

In [24]:
flow_price_correlation = spark.sql("""
    SELECT 
        CASE 
            WHEN tot_flow_usd > 500000000 THEN 'Large Inflow (>$500M)'
            WHEN tot_flow_usd > 100000000 THEN 'Medium Inflow ($100M-$500M)'
            WHEN tot_flow_usd > 0 THEN 'Small Inflow (0-$100M)'
            WHEN tot_flow_usd > -100000000 THEN 'Small Outflow (0 to -$100M)'
            WHEN tot_flow_usd > -500000000 THEN 'Medium Outflow (-$100M to -$500M)'
            ELSE 'Large Outflow (<-$500M)'
        END as flow_category,
    COUNT(*) as days,
    ROUND(AVG(tot_flow_usd) / 1000000, 2) as avg_flow_millions
    FROM combined_etf
    WHERE tot_flow_usd IS NOT NULL
    GROUP BY flow_category
    ORDER BY avg_flow_millions DESC
""")

flow_price_correlation.show()

+--------------------+----+-----------------+
|       flow_category|days|avg_flow_millions|
+--------------------+----+-----------------+
|Large Inflow (>$5...|  28|           733.34|
|Medium Inflow ($1...|  89|           277.58|
|Small Inflow (0-$...|  49|            43.24|
|Small Outflow (0 ...| 156|           -11.88|
|Medium Outflow (-...|  41|          -206.87|
|Large Outflow (<-...|   3|          -592.23|
+--------------------+----+-----------------+



In [25]:
combined_etf.show()

+----------+---------+-------------+----------+---------+------------------+------------+
|  date_key|btc_price|opening_price|high_price|low_price|        btc_volume|tot_flow_usd|
+----------+---------+-------------+----------+---------+------------------+------------+
|2024-01-01|  44183.4|      42272.5|   44187.0|  42196.7|           36300.0|         0.0|
|2024-01-02|  44943.7|      44182.9|   45885.4|  44166.0|           97840.0|         0.0|
|2024-01-03|  42836.1|      44943.7|   45492.7|  40888.3|          117650.0|         0.0|
|2024-01-04|  44157.0|      42836.1|   44744.5|  42632.8|           68050.0|         0.0|
|2024-01-05|  44156.9|      44163.0|   44312.1|  42629.0|           68070.0|         0.0|
|2024-01-06|  43967.9|      44156.6|   44203.2|  43424.0|           24260.0|         0.0|
|2024-01-07|  43927.3|      43973.5|   44481.2|  43627.9|           29530.0|         0.0|
|2024-01-08|  46962.2|      43934.2|   47196.7|  43251.0|          103090.0|         0.0|
|2024-01-0

#### We can also check for consistency of price movement per inflow volume ($USD billion)

1) For months that have high flows AND high monthly price change %, we can infer that spot volume carried price increments
2) For months that have low flows (or outflows) AND high monthly price change % positively, we can infer that open interests carried price changes

In [26]:
flow_efficiency = spark.sql("""
    WITH monthly_metrics AS (
        SELECT 
            EXTRACT(MONTH FROM date_key) as mth,
            ROUND(SUM(tot_flow_usd) / 1000000000, 2) as monthly_flow_billions,
            ROUND(
                (MAX(btc_price) - MIN(btc_price)) / MIN(btc_price) * 100, 
                2
            ) as monthly_price_change_pct
        FROM combined_etf
        WHERE tot_flow_usd IS NOT NULL
        GROUP BY EXTRACT(MONTH FROM date_key)
    )
    
    SELECT 
        mth,
        monthly_flow_billions,
        monthly_price_change_pct,
        CASE 
            WHEN monthly_flow_billions <> 0 
            THEN ROUND(monthly_price_change_pct / monthly_flow_billions, 2)
            ELSE NULL 
        END as eff_ratio
    FROM monthly_metrics
    ORDER BY mth
""")

flow_efficiency.show(12, truncate=False)

+---+---------------------+------------------------+---------+
|mth|monthly_flow_billions|monthly_price_change_pct|eff_ratio|
+---+---------------------+------------------------+---------+
|1  |1.46                 |18.72                   |12.82    |
|2  |6.04                 |46.7                    |7.73     |
|3  |4.64                 |17.86                   |3.85     |
|4  |-0.35                |18.07                   |-51.63   |
|5  |2.07                 |22.44                   |10.84    |
|6  |0.67                 |17.9                    |26.72    |
|7  |3.17                 |22.19                   |7.0      |
|8  |-0.09                |21.11                   |-234.56  |
|9  |1.26                 |22.05                   |17.5     |
|10 |5.35                 |20.58                   |3.85     |
|11 |6.47                 |45.81                   |7.08     |
|12 |4.57                 |14.4                    |3.15     |
+---+---------------------+------------------------+---

### ML

In [27]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [28]:
combined_etf.show()

+----------+---------+-------------+----------+---------+------------------+------------+
|  date_key|btc_price|opening_price|high_price|low_price|        btc_volume|tot_flow_usd|
+----------+---------+-------------+----------+---------+------------------+------------+
|2024-01-01|  44183.4|      42272.5|   44187.0|  42196.7|           36300.0|         0.0|
|2024-01-02|  44943.7|      44182.9|   45885.4|  44166.0|           97840.0|         0.0|
|2024-01-03|  42836.1|      44943.7|   45492.7|  40888.3|          117650.0|         0.0|
|2024-01-04|  44157.0|      42836.1|   44744.5|  42632.8|           68050.0|         0.0|
|2024-01-05|  44156.9|      44163.0|   44312.1|  42629.0|           68070.0|         0.0|
|2024-01-06|  43967.9|      44156.6|   44203.2|  43424.0|           24260.0|         0.0|
|2024-01-07|  43927.3|      43973.5|   44481.2|  43627.9|           29530.0|         0.0|
|2024-01-08|  46962.2|      43934.2|   47196.7|  43251.0|          103090.0|         0.0|
|2024-01-0

In [29]:
ml_features = combined_etf.select(
    "date_key",
    "btc_price",
    "tot_flow_usd",
    "btc_volume"
).withColumn(
    "prev_price", 
    lag("btc_price", 1).over(window_spec)
).withColumn(
    "prev_flow", 
    lag("tot_flow_usd", 1).over(window_spec)
).withColumn(
    "flow_ma7", 
    avg("tot_flow_usd").over(window_spec.rowsBetween(-6, 0))
).withColumn(
    "price_momentum_7d",
    ((col("btc_price") - lag("btc_price", 7).over(window_spec)) / lag("btc_price", 7).over(window_spec)) * 100
).withColumn(
    "next_price",
    lead("btc_price", 1).over(window_spec)
).withColumn(
    "next_day_change_pct",
    ((lead("btc_price", 1).over(window_spec) - col("btc_price")) / col("btc_price")) * 100
).filter(
    col("prev_price").isNotNull()
)

In [30]:
regression_data = ml_features.select(
    "date_key",
    "btc_price",
    "prev_price",
    "prev_flow",
    "flow_ma7",
    "price_momentum_7d",
    "next_day_change_pct"
).na.drop()

regression_data.show()

+----------+---------+----------+---------+--------------------+--------------------+--------------------+
|  date_key|btc_price|prev_price|prev_flow|            flow_ma7|   price_momentum_7d| next_day_change_pct|
+----------+---------+----------+---------+--------------------+--------------------+--------------------+
|2024-01-08|  46962.2|   43927.3|      0.0|                 0.0|   6.289239850260494| -1.7741928614928542|
|2024-01-09|  46129.0|   46962.2|      0.0|                 0.0|  2.6372995547763156|  1.0845671920050357|
|2024-01-10|  46629.3|   46129.0|      0.0|                 0.0|   8.855147877607916| -0.6028398453333115|
|2024-01-11|  46348.2|   46629.3|      0.0| 9.361428571428572E7|   4.962293634078396|  -7.578072071838811|
|2024-01-12|  42835.9|   46348.2|  6.553E8|1.2261428571428572E8|  -2.991604936034912| 0.03595115312156732|
|2024-01-13|  42851.3|   42835.9|   2.03E8|1.2261428571428572E8|  -2.539580011781319| -2.5791516243381283|
|2024-01-14|  41746.1|   42851.3|    

##### Setup feature columns:

In [31]:
feature_cols = [
    "btc_price",
    "prev_price",
    "prev_flow",
    "flow_ma7",
    "price_momentum_7d"
]


assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
scale = StandardScaler(inputCol="features", outputCol = "scaled_data", withStd = True, withMean = True)

##### Train test split:

In [59]:
train_size = int(regression_data.count() * 0.8) # i do standard 0.8/0.2 --> 80% to learn, 20% to generalise
train_data = regression_data.limit(train_size)
test_data = regression_data.subtract(train_data)

DataFrame[date_key: date, btc_price: double, prev_price: double, prev_flow: double, flow_ma7: double, price_momentum_7d: double, next_day_change_pct: double]


#### For demonstration sake, to prove why charting cannot be linear through regression coefficients and products

Infact, it is suicide to use regressive techniques to calculate future price changes

In [45]:
result = LinearRegression(featuresCol="scaled_data", labelCol="next_day_change_pct", predictionCol="predicted_change_pct")

In [46]:
pipeline_res = Pipeline(stages=[assembler, scale, result])
model_res = pipeline_res.fit(train_data)
predictions_res = model_res.transform(test_data)

In [47]:
predictions_res.createOrReplaceTempView("prediction_res")

In [48]:
evaluator = RegressionEvaluator(
    labelCol="next_day_change_pct",
    predictionCol="predicted_change_pct"
)

print(f"RMSE: {evaluator.evaluate(predictions_res, {evaluator.metricName: 'rmse'}):.2f}")
print(f"MAE:  {evaluator.evaluate(predictions_res, {evaluator.metricName: 'mae'}):.2f}")
print(f"R^2: {evaluator.evaluate(predictions_res, {evaluator.metricName: 'r2'}):.2f}")

RMSE: 3.08
MAE:  2.30
R^2: -0.29


### Instead, we can try classification

Predict discrete label for given feature --> check up/down (prediction)

In [49]:
classification_data = ml_features.withColumn(
    "price_direction",
    when(col("next_day_change_pct") > 0, 1.0).otherwise(0.0)
).select(
    "date_key",
    "btc_price",
    "prev_price",
    "prev_flow",
    "flow_ma7",
    "price_momentum_7d",
    "price_direction"
).na.drop()

regression_data.show()
train_clf = classification_data.limit(train_size)
test_clf = classification_data.subtract(train_clf)

+----------+---------+----------+---------+--------------------+--------------------+--------------------+
|  date_key|btc_price|prev_price|prev_flow|            flow_ma7|   price_momentum_7d| next_day_change_pct|
+----------+---------+----------+---------+--------------------+--------------------+--------------------+
|2024-01-08|  46962.2|   43927.3|      0.0|                 0.0|   6.289239850260494| -1.7741928614928542|
|2024-01-09|  46129.0|   46962.2|      0.0|                 0.0|  2.6372995547763156|  1.0845671920050357|
|2024-01-10|  46629.3|   46129.0|      0.0|                 0.0|   8.855147877607916| -0.6028398453333115|
|2024-01-11|  46348.2|   46629.3|      0.0| 9.361428571428572E7|   4.962293634078396|  -7.578072071838811|
|2024-01-12|  42835.9|   46348.2|  6.553E8|1.2261428571428572E8|  -2.991604936034912| 0.03595115312156732|
|2024-01-13|  42851.3|   42835.9|   2.03E8|1.2261428571428572E8|  -2.539580011781319| -2.5791516243381283|
|2024-01-14|  41746.1|   42851.3|    

In [50]:
rf_clf = RandomForestClassifier(
    featuresCol="scaled_data",
    labelCol="price_direction",
    predictionCol="predicted_direction",
    numTrees=100,
    maxDepth=5
)

In [51]:
pipeline_clf = Pipeline(stages=[assembler, scale, rf_clf])
model_clf = pipeline_clf.fit(train_clf)
predictions_clf = model_clf.transform(test_clf)

### BTC price guess

There are also anomalous days to look out for, with flash crash and liquidation hunts to account for, which drives the volatility in the cryptocurrency market --> We cannot be sure that all data is sufficient to help predict prices but instead can only make BEST GUESSES (so I will surmise that >50% accuracy is a good enough bet)

In [58]:
clf_evaluator = BinaryClassificationEvaluator(labelCol="price_direction", rawPredictionCol="rawPrediction")

accuracy = predictions_clf.filter(col("price_direction") == col("predicted_direction")).count() / predictions_clf.count()

print(f"Accuracy: {accuracy:.2%}")
print(f"AUC-ROC: {clf_evaluator.evaluate(predictions_clf):.4f}")

predictions_clf.select(
    "date_key",
    when(col("price_direction") == 1, "UP").otherwise("DOWN").alias("actual"),
    when(col("predicted_direction") == 1, "UP").otherwise("DOWN").alias("predicted"),
    when(col("price_direction") == col("predicted_direction"), "ok").otherwise("wrong").alias("correct")
).show(10)

Accuracy: 50.68%
AUC-ROC: 0.4295
+----------+------+---------+-------+
|  date_key|actual|predicted|correct|
+----------+------+---------+-------+
|2024-10-20|  DOWN|     DOWN|     ok|
|2024-10-21|    UP|     DOWN|  wrong|
|2024-10-22|  DOWN|       UP|  wrong|
|2024-10-23|    UP|       UP|     ok|
|2024-10-24|  DOWN|       UP|  wrong|
|2024-10-25|    UP|       UP|     ok|
|2024-10-26|    UP|       UP|     ok|
|2024-10-27|    UP|       UP|     ok|
|2024-10-28|    UP|     DOWN|  wrong|
|2024-10-29|  DOWN|     DOWN|     ok|
+----------+------+---------+-------+
only showing top 10 rows
