In [1]:
import polars as pl
import numpy as np
import os
import datetime
import urllib.request
from tqdm import tqdm
import zipfile
from datetime import date
import math

In [2]:
def get_features_df(trades_df):
    
    def weighted_avg_and_std(values, weights):
        try:
            average = np.average(values, weights=weights)
            variance = np.average((values-average)**2, weights=weights)
        except:
            average = np.average(values)
            variance = np.average((values-average)**2)
        return (average, math.sqrt(variance))
    
    def avg(values):
        try:
            return np.average(values)
        except Exception as e:
            print(f"AVG ERR: {e}")
            return 0
        
    def std(values):
        try:
            return np.std(values)
        except Exception as e:
            print(f"STD ERR: {e}")
            return 0

    def features(x):
        dt = x[0]
        price = np.array(x[1])
        volume = np.array(x[2])
        buy_or_sell = np.array(x[3])

        buy_price = price[buy_or_sell]
        buy_volume = volume[buy_or_sell]
        sell_price = price[np.invert(buy_or_sell)]
        sell_volume = volume[np.invert(buy_or_sell)]
        buy_cnt = len(buy_price)
        sell_cnt = len(sell_price)
        
        buy_volume_sum = sum(buy_volume)
        sell_volume_sum = sum(sell_volume)
        delta_volume_sum = buy_volume_sum - sell_volume_sum
        
        buy_volume_avg = avg(buy_volume)
        sell_volume_avg = avg(sell_volume)
        delta_volume_avg = buy_volume_avg - sell_volume_avg
        
        buy_volume_std = std(buy_volume)
        sell_volume_std = std(sell_volume)
        delta_volume_std = buy_volume_std - sell_volume_std
        
        buy_avg, buy_std = weighted_avg_and_std(buy_price, buy_volume)
        sell_avg, sell_std = weighted_avg_and_std(sell_price, sell_volume)
        
        df = (
            dt, 
            buy_cnt, 
            sell_cnt, 
            buy_avg, 
            sell_avg, 
            buy_std, 
            sell_std, 
            buy_volume_sum,
            sell_volume_sum ,
            delta_volume_sum,
            buy_volume_avg,
            sell_volume_avg,
            delta_volume_avg,
            buy_volume_std,
            sell_volume_std,
            delta_volume_std,
            )
        return df

    trades_df = trades_df.lazy().with_columns(pl.from_epoch("time", unit='ms'))
    trades_df = trades_df.lazy().with_columns(pl.from_epoch("time", unit='us')).collect()

    periods = ["5", "10", "30", "60"]
    features_df = trades_df.select(pl.col("time").dt.truncate("1m")).unique(maintain_order=True)
    features_df = features_df.with_columns(pl.col("time") + pl.duration(minutes=1))

    for window in periods:
        df = trades_df.groupby_dynamic("time", every="1m", period=f"{window}s", closed="left", offset=f"-{window}s", include_boundaries=True).agg(pl.col(["price", "qty", "is_buyer_maker"]))
        df = df.select(pl.col(["_upper_boundary", "price", "qty", "is_buyer_maker"]))
        
        df = df.rename({"_upper_boundary": "time"})
        df = df.apply(lambda x: features(x))
        df = df.rename({"column_0": "time", 
                        "column_1": f"buy_cnt_{window}", 
                        "column_2": f"sell_cnt_{window}", 
                        "column_3": f"buy_avg_{window}", 
                        "column_4": f"sell_avg_{window}", 
                        "column_5": f"buy_std_{window}", 
                        "column_6": f"sell_std_{window}",
                        "column_7": f"buy_volume_sum_{window}",
                        "column_8": f"sell_volume_sum_{window}",
                        "column_9": f"delta_volume_sum_{window}",
                        "column_10": f"buy_volume_avg_{window}",
                        "column_11": f"sell_volume_avg_{window}",
                        "column_12": f"delta_volume_avg_{window}",
                        "column_13": f"buy_volume_std_{window}",
                        "column_14": f"sell_volume_std_{window}",
                        "column_15": f"delta_volume_std_{window}"})
        features_df = features_df.join(df, on="time")
    return features_df

In [3]:
def get_agg_data(trading_pair: str, download_dir: str, date: datetime):
    trading_pair = trading_pair.upper()
    file = f"./{download_dir}/{trading_pair}-trades-{date.strftime('%Y-%m-%d')}.csv"
    agg_data = get_features_df(pl.read_csv(file))
    os.remove(file)
    return agg_data

In [4]:
def get_clusters_data(trading_pair: str, from_date: datetime, number_of_days: int, download_dir: str):
    trading_pair = trading_pair.upper()
    date_list = [from_date - datetime.timedelta(days=x) for x in range(1, number_of_days + 1)]
    os.makedirs(download_dir, exist_ok=True)
    data = pl.DataFrame()
    print(f"DOWNLOADING {number_of_days} FILES TO {download_dir} AND GENERATING CLUSTERS")
    for current_date in tqdm(date_list):
        try:
            zip_file_path = f"./{download_dir}/{trading_pair}_{current_date.strftime('%Y-%m-%d')}.zip"
            urllib.request.urlretrieve(f"https://data.binance.vision/data/futures/um/daily/trades/{trading_pair}/{trading_pair}-trades-{current_date.strftime('%Y-%m-%d')}.zip", zip_file_path)
            with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
                zip_ref.extractall(download_dir)
            os.remove(zip_file_path)
            agg_data = get_agg_data(trading_pair, download_dir, current_date)
            data = pl.concat([data, agg_data])
        except Exception as e:
            print(f"Something went wrong with {trading_pair} {current_date}: {e}")
    return data

In [5]:
trading_pair = "ETHUSDT"
trading_pair = trading_pair.upper()
from_date = datetime.datetime.strptime('23032023', "%d%m%Y").date()
download_dir = f"./{trading_pair}_daily_trades_data_{from_date.strftime('%Y-%m-%d')}"
x = get_clusters_data(trading_pair, from_date, 136, "ETHUSDT_trades")

DOWNLOADING 136 FILES TO ETHUSDT_trades AND GENERATING CLUSTERS


  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)
  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean, casting='unsafe',
  ret = ret.dtype.type(ret / rcount)
100%|██████████| 136/136 [37:36<00:00, 16.59s/it] 


In [6]:
x.shape

(195820, 61)

In [7]:
x.head()

time,buy_cnt_5,sell_cnt_5,buy_avg_5,sell_avg_5,buy_std_5,sell_std_5,buy_volume_sum_5,sell_volume_sum_5,delta_volume_sum_5,buy_volume_avg_5,sell_volume_avg_5,delta_volume_avg_5,buy_volume_std_5,sell_volume_std_5,delta_volume_std_5,buy_cnt_10,sell_cnt_10,buy_avg_10,sell_avg_10,buy_std_10,sell_std_10,buy_volume_sum_10,sell_volume_sum_10,delta_volume_sum_10,buy_volume_avg_10,sell_volume_avg_10,delta_volume_avg_10,buy_volume_std_10,sell_volume_std_10,delta_volume_std_10,buy_cnt_30,sell_cnt_30,buy_avg_30,sell_avg_30,buy_std_30,sell_std_30,buy_volume_sum_30,sell_volume_sum_30,delta_volume_sum_30,buy_volume_avg_30,sell_volume_avg_30,delta_volume_avg_30,buy_volume_std_30,sell_volume_std_30,delta_volume_std_30,buy_cnt_60,sell_cnt_60,buy_avg_60,sell_avg_60,buy_std_60,sell_std_60,buy_volume_sum_60,sell_volume_sum_60,delta_volume_sum_60,buy_volume_avg_60,sell_volume_avg_60,delta_volume_avg_60,buy_volume_std_60,sell_volume_std_60,delta_volume_std_60
datetime[μs],i64,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i64,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i64,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i64,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
2023-03-22 00:01:00,67,60,1800.544099,1800.524901,0.056274,0.046173,81.222,84.937,-3.715,1.212269,1.415617,-0.203348,1.596876,2.588765,-0.991889,268,84,1800.73956,1800.564225,0.191023,0.123568,322.352,100.738,221.614,1.202806,1.199262,0.003544,2.348393,2.244956,0.103437,702,495,1800.806609,1800.806885,0.216716,0.246771,996.378,516.261,480.117,1.419342,1.042952,0.37639,2.378116,2.443398,-0.065282,1698,1346,1800.328165,1800.51863,0.533397,0.37928,2516.951,1826.836,690.115,1.482303,1.357233,0.12507,2.714384,3.227377,-0.512994
2023-03-22 00:02:00,69,135,1800.078015,1800.000037,0.142564,0.119596,45.214,110.33,-65.116,0.655275,0.817259,-0.161984,1.018594,1.613973,-0.595379,129,150,1799.996157,1799.996911,0.10347,0.115979,130.322,118.523,11.799,1.010248,0.790153,0.220095,2.259382,1.554,0.705381,589,295,1800.349381,1800.171108,0.32347,0.301491,1091.764,245.161,846.603,1.853589,0.831054,1.022535,3.664287,1.798239,1.866048,1252,901,1800.64112,1800.686928,0.42539,0.437909,2022.238,889.131,1133.107,1.615206,0.986827,0.628379,3.464173,2.182003,1.28217
2023-03-22 00:03:00,35,8,1800.1898,1800.088906,0.005172,0.056668,75.794,1.453,74.341,2.165543,0.181625,1.983918,3.513629,0.175934,3.337695,41,56,1800.189088,1800.026114,0.010225,0.04384,77.151,31.223,45.928,1.881732,0.557554,1.324178,3.319181,1.350108,1.969073,265,252,1800.194365,1800.230629,0.186173,0.237156,366.029,193.151,172.878,1.381242,0.766472,0.614769,2.828735,1.875911,0.952824,794,724,1800.095381,1800.258023,0.300812,0.342003,1417.074,596.13,820.944,1.784728,0.823384,0.961344,9.188564,1.786539,7.402025
2023-03-22 00:04:00,5,1,1800.53,1800.54,2.2737e-13,0.0,12.009,0.057,11.952,2.4018,0.057,2.3448,4.35632,0.0,4.35632,16,59,1800.470101,1800.446887,0.10094,0.091021,16.753,35.355,-18.602,1.0470625,0.599237,0.447825,2.618432,1.023813,1.594618,243,220,1800.639524,1800.654432,0.299018,0.275154,239.508,359.56,-120.052,0.98563,1.634364,-0.648734,2.275591,3.813512,-1.537921,409,502,1800.464266,1800.55455,0.312184,0.258209,416.569,705.516,-288.947,1.018506,1.40541,-0.386904,2.18445,5.576778,-3.392328
2023-03-22 00:05:00,11,9,1800.44,1800.45,0.0,2.2737e-13,10.945,3.22,7.725,0.995,0.357778,0.637222,1.57972,0.722845,0.856874,38,58,1800.384951,1800.15801,0.110464,0.201568,19.973,20.844,-0.871,0.525605,0.359379,0.166226,0.921473,0.752717,0.168755,211,170,1800.398836,1800.322505,0.260751,0.224136,142.519,61.368,81.151,0.675445,0.360988,0.314457,1.40948,0.799454,0.610026,552,444,1800.366757,1800.152589,0.184849,0.189314,449.728,449.707,0.021,0.814725,1.012854,-0.198129,1.946279,2.491117,-0.544838


In [8]:
x = x.sort("time")

In [9]:
x.write_parquet("./ETHUSDT_features_136_23032023.parquet")