<a href="https://colab.research.google.com/github/karipuf/crypto/blob/master/crypto_tools.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Setting stuff up

In [None]:
#@title Imports


from Historic_Crypto import HistoricalData,Cryptocurrencies,LiveCryptoData
import loguru,pylab as pl,mlflow,pyspark,lightgbm,sklearn,pandas as pd,\
numpy as np,pyspark.sql.functions as F,re
from pyspark.sql.window import Window
from itertools import count
from functools import reduce
from lightgbm import LGBMRegressor,LGBMClassifier
from loguru import logger

spark=pyspark.sql.SparkSession.builder.getOrCreate()
root_path="/content/drive/MyDrive/cryptooo/"

mlflow.set_tracking_uri(f"{root_path}mlflow")
mlflow.set_experiment("cryptooo")
logger.add(open(f"{root_path}logs/log1","w+"))


1

In [None]:
#@title Local definitions
# Local definitions

rollback=lambda l,n:Window.orderBy("time").rowsBetween(-(l+n),Window.currentRow-n)
def rollingRows(coins=['btc','eth','ltc'],winlens=[3,7],
                avg_kpis=["open","close","volume"],
                min_kpis=['low'],max_kpis=['high'],lag=3):
  
  rows=[]
  for l in winlens:
    rows.append([F.avg(tmp).over(rollback(l,lag)).alias(f"{tmp}_rolling{l}") for tmp in 
    (f"{coin}_{kpi}" for coin in coins for kpi in avg_kpis)])
    rows.append([F.min(tmp).over(rollback(l,lag)).alias(f"{tmp}_rolling{l}") for tmp in 
    (f"{coin}_{kpi}" for coin in coins for kpi in min_kpis)])
    rows.append([F.max(tmp).over(rollback(l,lag)).alias(f"{tmp}_rolling{l}") for tmp in 
    (f"{coin}_{kpi}" for coin in coins for kpi in max_kpis)])

  return reduce(lambda x,y:x+y,rows)

## Get some data to play about with

In [None]:
# Generate list of currencies
pf=Cryptocurrencies(coin_search="USD").find_crypto_pairs()

for fname,symb in [("btc.csv","BTC-USD"),
                   ("eth.csv","ETH-USD"),
                   ("ltc.csv","LTC-USD")]:
  try:
    pf_ = pd.read_csv(f"{root_path}/data/{fname}")
  except FileNotFoundError:
    pf_ = HistoricalData(symb,3600,'2020-08-01-00-00').retrieve_data()
    pf_.to_csv(f"{root_path}/data/{fname}")

  # Creating pandas and spark dataframes
  varname=re.compile("(\S+)\.csv").findall(fname)[0]
  exec(f"p{varname}=pf_")
  exec(f"d{varname}=spark.createDataFrame(p{varname}.reset_index())")

  logger.info(f"Created p{varname} and d{varname}")

Connected to the CoinBase Pro API.
Found 133 instances containing the term USD.


2021-09-11 18:03:37.267 | INFO     | __main__:<module>:19 - Created pbtc and dbtc
2021-09-11 18:03:38.207 | INFO     | __main__:<module>:19 - Created peth and deth
2021-09-11 18:03:39.112 | INFO     | __main__:<module>:19 - Created pltc and dltc


In [None]:
# ETL

combined=(reduce(lambda x,y:x.join(y,"time"),
          [dbtc.alias("dbtc"),deth.alias("deth"),dltc.alias("dltc")])
.select("time",*[F.expr(f"d{coin}.{kpi}").alias(f"{coin}_{kpi}")
    for kpi in ('open','close','low','high','volume')
    for coin in ('btc','eth','ltc')]).orderBy("time"))

df=(combined.alias("combined").select("combined.*",*rollingRows())\
.withColumn("row",F.rank().over(Window.orderBy("time"))).where("row>10") # Removing first n rows to allow for rolling windows
.orderBy("time"))

#display(df.limit(3).toPandas())

Unnamed: 0,time,btc_open,eth_open,ltc_open,btc_close,eth_close,ltc_close,btc_low,eth_low,ltc_low,btc_high,eth_high,ltc_high,btc_volume,eth_volume,ltc_volume,btc_open_rolling3,btc_close_rolling3,btc_volume_rolling3,eth_open_rolling3,eth_close_rolling3,eth_volume_rolling3,ltc_open_rolling3,ltc_close_rolling3,ltc_volume_rolling3,btc_low_rolling3,eth_low_rolling3,ltc_low_rolling3,btc_high_rolling3,eth_high_rolling3,ltc_high_rolling3,btc_open_rolling7,btc_close_rolling7,btc_volume_rolling7,eth_open_rolling7,eth_close_rolling7,eth_volume_rolling7,ltc_open_rolling7,ltc_close_rolling7,ltc_volume_rolling7,btc_low_rolling7,eth_low_rolling7,ltc_low_rolling7,btc_high_rolling7,eth_high_rolling7,ltc_high_rolling7,row
0,2020-08-01 11:00:00,11634.38,355.23,59.55,11667.58,356.53,59.92,11561.56,353.35,59.33,11667.58,356.79,59.93,425.599125,4008.054495,5826.619624,11582.455,11654.76,882.248475,355.5375,356.2675,8957.65722,59.9775,59.885,16414.317599,11379.7,353.85,58.38,11761.97,358.69,60.72,11448.11,11500.61125,693.499461,351.5125,353.1,9370.09811,59.2875,59.52625,19287.732063,11245.13,343.75,57.67,11761.97,358.69,60.72,11
1,2020-08-01 12:00:00,11667.58,356.53,59.92,11681.0,357.49,59.88,11645.11,355.89,59.63,11701.0,357.76,59.95,675.329904,6956.531368,7281.749042,11654.755,11678.5075,602.375838,356.255,357.1375,6839.781067,59.8825,59.8925,13641.727353,11575.61,354.13,58.38,11761.97,360.82,60.44,11500.59375,11554.355,687.864051,353.0925,354.835,9580.836733,59.525,59.805,19635.375762,11278.49,345.44,57.97,11761.97,360.82,60.72,12
2,2020-08-01 13:00:00,11681.0,357.35,59.91,11540.0,353.95,58.99,11490.63,352.95,58.74,11681.0,357.43,59.92,1194.913019,7488.993224,10820.4041,11678.505,11674.11,400.029166,357.1275,356.835,6980.833678,59.8825,59.765,11865.192451,11580.58,351.7,58.38,11738.0,360.82,60.8,11554.3375,11590.87375,660.237394,354.82625,355.865,9893.490802,59.7975,59.86375,17320.020219,11332.66,345.78,58.38,11761.97,360.82,60.8,13
