In [1]:
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from IPython.display import display, HTML
from main import download_data
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import *
from pyspark.sql.window import Window
from typing import List, Tuple

In [2]:
CRYPTOS = ['BTC', 'VEN', 'ETH', 'EOS', 'LTC', 'XRP', 'BCH', 'XLM', 'CVC']

In [3]:
download_data(CRYPTOS, currency='USD')

Querying for BTC..................................
Querying for VEN...
Querying for ETH............
Querying for EOS....
Querying for LTC....................
Querying for XRP..............
Querying for BCH...
Querying for XLM.......
Querying for CVC....
Saving file to /home/jovyan/data/data_raw.csv


In [4]:
spark = SparkSession.builder.appName("Crypto Data").getOrCreate()
spark.sparkContext.setCheckpointDir('/tmp/checkpoint/')
spark.sparkContext.setLogLevel('INFO')
df = spark.read.csv("/home/jovyan/data/data_raw.csv", inferSchema=True, encoding='utf-8', header=True)
df.printSchema()

root
 |-- close: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- open: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- volumefrom: double (nullable = true)
 |-- volumeto: double (nullable = true)
 |-- symbol: string (nullable = true)



At this stage, only using a "stupid" average price for structuring the data.
(stupid because we have no way to make a weighed average between high, low, close, open)

In [5]:
df = df.withColumn("trade_volume", F.abs(df['volumefrom']-df['volumeto'])).withColumn("avg-price", (df['high'] + df['close'] + df['open'] + df['low'])/4)
df = df.drop('volumefrom', 'volumeto', 'close', 'open')

Check if the order is still order by time (the last row must match the hour of the extract).
And we will need this date for later processing.

In [6]:
latest_received_date = df.collect()[-1]['time']
latest_received_date

datetime.datetime(2018, 6, 17, 19, 0)

Checking that the latest *date* lines up between all coins

In [7]:
df.groupBy('symbol').agg({'time':'max'}).show()

+------+-------------------+
|symbol|          max(time)|
+------+-------------------+
|   EOS|2018-06-17 19:00:00|
|   LTC|2018-06-17 19:00:00|
|   ETH|2018-06-17 19:00:00|
|   BCH|2018-06-17 19:00:00|
|   VEN|2018-06-17 19:00:00|
|   BTC|2018-06-17 19:00:00|
|   XLM|2018-06-17 19:00:00|
|   CVC|2018-06-17 19:00:00|
|   XRP|2018-06-17 19:00:00|
+------+-------------------+



Cols to be used as features and label for our model.

In [8]:
DELTA_MATCH = {
    'label': {
        'price+24h-max': 24,
#         'price+24h-min': 24,  # not in use for now.
    },
    'price': {
        'price-0h': 0,
        'price-1h': -1,
        'price-2h': -2,
        'price-4h': -4,
        'price-5h': -5,
        'price-6h': -6,
        'price-8h': -8,
        'price-10h': -10,
        'price-12h': -12,
        'price-24h': -24,
        'price-48h': -48,
        'price-96h': -96,
        'price-192h': -192,
        'price-384h': -384,
        'price-768h': -768,
    },
    'volume': {
        'vol-0h': 0,
        'vol-1h': -1,
        'vol-2h': -2,
        'vol-4h': -4,
        'vol-6h': -6,
        'vol-8h': -8,
        'vol-10h': -10,
        'vol-16h': -16,
        'vol-24h': -24,
        'vol-48h': -48,
        'vol-96h': -96,
        'vol-192h': -192,
        'vol-384h': -384,
        'vol-768h': -768,
    }
}

Create and fill columns according to the historic data (in hours timedelta) defined in the PAST_DELTAS dict

In [9]:
columnizer_win = Window.partitionBy("symbol").orderBy("time")
for symb in CRYPTOS:
    for k, v in DELTA_MATCH['price'].items():
        df = df.withColumn(k, F.lag(df['avg-price'], v*-1).over(columnizer_win))
    for k, v in DELTA_MATCH['volume'].items():
        df = df.withColumn(k, F.lag(df['trade_volume'], v*-1).over(columnizer_win))

Label will be the based on the max / min achieved price in following 24h

In [10]:
label_window = Window.partitionBy('symbol').orderBy("time").rowsBetween(1, 24)
df = df.withColumn("price+24h-max", F.max(df['high']).over(label_window))
df = df.withColumn("price+24h-min", F.min(df['low']).over(label_window))

In [11]:
# columnizer_win = Window.partitionBy("symbol").orderBy("time")
# for symb in CRYPTOS:
#     for k, v in DELTA_MATCH['price'].items():
#         df = df.withColumn(k, F.lag(df['avg-price'], v*-1).over(columnizer_win))
#     for k, v in DELTA_MATCH['volume'].items():
#         df = df.withColumn(k, F.lag(df['trade_volume'], v*-1).over(columnizer_win))
#     for k, v in DELTA_MATCH['label'].items():
#         df = df.withColumn(k, F.lag(df['trade_volume'], v*-1).over(columnizer_win))

Remove all the (expected NULL values from the previous operation).
These correspond to the feature columns (all except label) from the earlier dict we declated with PAST_DELTAS.

We are wasting days from the initial time for each coin, but otherwise we wouldn't have so many features.

In [12]:
for key in DELTA_MATCH['price'].keys():
    df = df.filter(df[key].isNotNull())
for key in DELTA_MATCH['volume'].keys():
    df = df.filter(df[key].isNotNull())

Starting to clear of the columns we don't need.

In [13]:
df = df.drop('avg-price', 'trade_volume', 'high', 'low')

In [14]:
# isolate BTC data (since we don't have an aggregate total market info)
df_btc = df.filter(df['symbol'] == "BTC")
df.unpersist()

DataFrame[time: timestamp, symbol: string, price-0h: double, price-1h: double, price-2h: double, price-4h: double, price-5h: double, price-6h: double, price-8h: double, price-10h: double, price-12h: double, price-24h: double, price-48h: double, price-96h: double, price-192h: double, price-384h: double, price-768h: double, vol-0h: double, vol-1h: double, vol-2h: double, vol-4h: double, vol-6h: double, vol-8h: double, vol-10h: double, vol-16h: double, vol-24h: double, vol-48h: double, vol-96h: double, vol-192h: double, vol-384h: double, vol-768h: double, price+24h-max: double, price+24h-min: double]

from `df_btc` Drop unneeded cols and rename needed in preprapation for join

In [15]:
df_btc = df_btc.drop('symbol', 'price+24h-max', 'price+24h-min')
for col in df_btc.columns:
    if col == "time": continue
    df_btc = df_btc.withColumnRenamed(col, "btc-{}".format(col))

In [16]:
# isolate the non-btc data to prepare for join
df_exbtc = df.filter(df['symbol'] != "BTC")

In [17]:
# join dfs so all coins have BTC in their row for benchmark for all
df_joined = df_exbtc.join(df_btc, 'time', 'inner')

In [18]:
df_joined.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- symbol: string (nullable = true)
 |-- price-0h: double (nullable = true)
 |-- price-1h: double (nullable = true)
 |-- price-2h: double (nullable = true)
 |-- price-4h: double (nullable = true)
 |-- price-5h: double (nullable = true)
 |-- price-6h: double (nullable = true)
 |-- price-8h: double (nullable = true)
 |-- price-10h: double (nullable = true)
 |-- price-12h: double (nullable = true)
 |-- price-24h: double (nullable = true)
 |-- price-48h: double (nullable = true)
 |-- price-96h: double (nullable = true)
 |-- price-192h: double (nullable = true)
 |-- price-384h: double (nullable = true)
 |-- price-768h: double (nullable = true)
 |-- vol-0h: double (nullable = true)
 |-- vol-1h: double (nullable = true)
 |-- vol-2h: double (nullable = true)
 |-- vol-4h: double (nullable = true)
 |-- vol-6h: double (nullable = true)
 |-- vol-8h: double (nullable = true)
 |-- vol-10h: double (nullable = true)
 |-- vol-16h: double (nullable = true)
 

In [19]:
df_predict = df_joined.filter(df_joined['time'] == latest_received_date).cache()

In [20]:
df_predict.toPandas().to_csv('/home/jovyan/data/data_predict.csv')

In [21]:
# calculate delta in price to build our label
df_joined = df_joined.withColumn('24h-max-delta', (df_joined['price+24h-max'] / df_joined['price-0h']) - 1).cache()
df_joined = df_joined.withColumn('24h-min-delta', (df_joined['price+24h-min'] / df_joined['price-0h']) - 1).cache()

In [22]:
# df_joined.select('symbol', 'time', 'price+24h-max', 'price-0h', '24h-max-delta').filter((df['24h-max-delta'] > 0. ).(.show()

In [23]:
def categorize_delta(delta) -> int:
    """
    UDF to help categorize growth of a specific crypto currency.
    param: delta - the growth in % we are evaluating
    param: threshold - threshold separating 0 from 1
    """

    if delta is None:                    
        return None
    elif int(delta*100) <= -5:
        return 1
    elif int(delta*100) <= 0:
        return 2
    elif int(delta*100) <= 5:
        return 3
    else:
        return 4
    
# register the UDF
categorize_delta_udf = F.udf(categorize_delta, IntegerType())

In [24]:
df_categorized = df_joined.withColumn('label', categorize_delta_udf(df_joined['24h-max-delta'])).drop('24h-max-delta', '24h-min-delta').cache()
df_joined.unpersist()

DataFrame[time: timestamp, symbol: string, price-0h: double, price-1h: double, price-2h: double, price-4h: double, price-5h: double, price-6h: double, price-8h: double, price-10h: double, price-12h: double, price-24h: double, price-48h: double, price-96h: double, price-192h: double, price-384h: double, price-768h: double, vol-0h: double, vol-1h: double, vol-2h: double, vol-4h: double, vol-6h: double, vol-8h: double, vol-10h: double, vol-16h: double, vol-24h: double, vol-48h: double, vol-96h: double, vol-192h: double, vol-384h: double, vol-768h: double, price+24h-max: double, price+24h-min: double, btc-price-0h: double, btc-price-1h: double, btc-price-2h: double, btc-price-4h: double, btc-price-5h: double, btc-price-6h: double, btc-price-8h: double, btc-price-10h: double, btc-price-12h: double, btc-price-24h: double, btc-price-48h: double, btc-price-96h: double, btc-price-192h: double, btc-price-384h: double, btc-price-768h: double, btc-vol-0h: double, btc-vol-1h: double, btc-vol-2h: do

In [25]:
df_categorized = df_categorized.cache()

In [26]:
# remove the (last) column that has null data, in this case, label should have 192 rows
df_categorized = df_categorized.filter(df_categorized['label'].isNotNull()).cache()

In [27]:
df_categorized.toPandas().to_csv('/home/jovyan/data/data_train_test.csv')

In [28]:
df_categorized.select('label').distinct().show()

+-----+
|label|
+-----+
|    1|
|    3|
|    4|
|    2|
+-----+



In [29]:
spark.catalog.clearCache()