# Crystal Ball
AI providing buy/sell/hold actions based on candlestick data.

## Binance
https://www.kaggle.com/code/lucasmorin/getting-all-1m-data-from-binance/notebook

https://developers.binance.com/docs/binance-spot-api-docs/rest-api/market-data-endpoints#klinecandlestick-data

## Reinforcement Learning
### Pong from Pixels
https://karpathy.github.io/2016/05/31/rl/

### Stable Baselines
https://stable-baselines3.readthedocs.io/en/master/
https://anaconda.org/conda-forge/stable-baselines3

### Policy Gradient Methods
https://youtu.be/5P7I-xPq8u8?si=hXVvvLb1S8XcWGfz

### Example
https://towardsdatascience.com/how-to-train-an-ai-to-play-any-game-f1489f3bc5c
https://github.com/guszejnovdavid/custom_game_reinforcement_learning/blob/main/custom_game_reinforcement_learning.ipynb


Installation
~~~shell
$ conda install pytorch torchvision torchaudio pytorch-cuda pandas -c pytorch -c nvidia
$ conda install pyarrow -c conda-forge
$ conda install conda-forge::stable-baselines3
~~~

In [1]:
# CUDA device setup
import torch
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print('device:', device)

device: cuda:0


In [2]:
import json
import os
import random
import subprocess
import time
from datetime import date, datetime, timedelta
from datetime import date


import requests
import pandas as pd
import numpy as np

API_BASE = 'https://api.binance.com/api/v3/'

LABELS = [
    'open_time',
    'open',
    'high',
    'low',
    'close',
    'volume',
    'close_time',
    'quote_asset_volume',
    'number_of_trades',
    'taker_buy_base_asset_volume',
    'taker_buy_quote_asset_volume',
    'ignore'
]


In [22]:
def get_batch(symbol, interval='1m', start_time=0, limit=1000):
    """Use a GET request to retrieve a batch of candlesticks. Process the JSON into a pandas
    dataframe and return it. If not successful, return an empty dataframe.
    """

    params = {
        'symbol': symbol,
        'interval': interval,
        'startTime': start_time,
        'limit': limit
    }
    try:
        # timeout should also be given as a parameter to the function
        response = requests.get(f'{API_BASE}klines', params, timeout=30)
    except requests.exceptions.ConnectionError:
        print('Connection error, Cooling down for 5 mins...')
        time.sleep(5 * 60)
        return get_batch(symbol, interval, start_time, limit)
    
    except requests.exceptions.Timeout:
        print('Timeout, Cooling down for 5 min...')
        time.sleep(5 * 60)
        return get_batch(symbol, interval, start_time, limit)
    
    except requests.exceptions.ConnectionResetError:
        print('Connection reset by peer, Cooling down for 5 min...')
        time.sleep(5 * 60)
        return get_batch(symbol, interval, start_time, limit)

    if response.status_code == 200:
        return pd.DataFrame(response.json(), columns=LABELS)
    print(f'Got erroneous response back: {response}')
    return pd.DataFrame([])

# TODO: No new data is available on this channel?
def all_candles_to_csv(base, quote, interval='1m'):
    """Collect a list of candlestick batches with all candlesticks of a trading pair,
    concat into a dataframe and write it to CSV.
    """

    # see if there is any data saved on disk already
    try:
        batches = [pd.read_csv(f'data/{base}-{quote}.csv')]
        last_timestamp = batches[-1]['open_time'].max()
    except FileNotFoundError:
        batches = [pd.DataFrame([], columns=LABELS)]
        last_timestamp = 0
    old_lines = len(batches[-1].index)

    # gather all candlesticks available, starting from the last timestamp loaded from disk or 0
    # stop if the timestamp that comes back from the api is the same as the last one
    previous_timestamp = None

    while previous_timestamp != last_timestamp:
        # stop if we reached data from today
        if date.fromtimestamp(last_timestamp / 1000) >= date.today():
            break

        previous_timestamp = last_timestamp

        new_batch = get_batch(
            symbol=base+quote,
            interval=interval,
            start_time=last_timestamp+1
        )

        # requesting candles from the future returns empty
        # also stop in case response code was not 200
        if new_batch.empty:
            break

        last_timestamp = new_batch['open_time'].max()

        # sometimes no new trades took place yet on date.today();
        # in this case the batch is nothing new
        if previous_timestamp == last_timestamp:
            break

        batches.append(new_batch)
        last_datetime = datetime.fromtimestamp(last_timestamp / 1000)

        covering_spaces = 20 * ' '
        print(datetime.now(), base, quote, interval, str(last_datetime)+covering_spaces, end='\r', flush=True)

    # write clean version of csv to parquet
    parquet_name = f'{base}-{quote}.parquet'
    full_path = f'compressed/{parquet_name}'
    df = pd.concat(batches, ignore_index=True)
    df = quick_clean(df)
    write_raw_to_parquet(df, full_path)

    # in the case that new data was gathered write it to disk
    if len(batches) > 1:
        df.to_csv(f'data/{base}-{quote}.csv', index=False)
        return len(df.index) - old_lines
    return 0

def set_dtypes(df):
    """
    set datetimeindex and convert all columns in pd.df to their proper dtype
    assumes csv is read raw without modifications; pd.read_csv(csv_filename)"""

    df['open_time'] = pd.to_datetime(df['open_time'], unit='ms')
    df = df.set_index('open_time', drop=True)

    df = df.astype(dtype={
        'open': 'float64',
        'high': 'float64',
        'low': 'float64',
        'close': 'float64',
        'volume': 'float64',
        'close_time': 'datetime64[ms]',
        'quote_asset_volume': 'float64',
        'number_of_trades': 'int64',
        'taker_buy_base_asset_volume': 'float64',
        'taker_buy_quote_asset_volume': 'float64',
        'ignore': 'float64'
    })

    return df


def set_dtypes_compressed(df):
    """Create a `DatetimeIndex` and convert all critical columns in pd.df to a dtype with low
    memory profile. Assumes csv is read raw without modifications; `pd.read_csv(csv_filename)`."""

    df['open_time'] = pd.to_datetime(df['open_time'], unit='ms')
    df = df.set_index('open_time', drop=True)

    df = df.astype(dtype={
        'open': 'float32',
        'high': 'float32',
        'low': 'float32',
        'close': 'float32',
        'volume': 'float32',
        'number_of_trades': 'uint16',
        'quote_asset_volume': 'float32',
        'taker_buy_base_asset_volume': 'float32',
        'taker_buy_quote_asset_volume': 'float32'
    })

    return df


def assert_integrity(df):
    """make sure no rows have empty cells or duplicate timestamps exist"""

    assert df.isna().all(axis=1).any() == False
    assert df['open_time'].duplicated().any() == False


def quick_clean(df):
    """clean a raw dataframe"""

    # drop dupes
    dupes = df['open_time'].duplicated().sum()
    if dupes > 0:
        df = df[df['open_time'].duplicated() == False]

    # sort by timestamp, oldest first
    df.sort_values(by=['open_time'], ascending=False)

    # just a doublcheck
    assert_integrity(df)

    return df


def write_raw_to_parquet(df, full_path):
    """takes raw df and writes a parquet to disk"""

    # some candlesticks do not span a full minute
    # these points are not reliable and thus filtered
    df = df[~(df['open_time'] - df['close_time'] != -59999)]

    # `close_time` column has become redundant now, as is the column `ignore`
    df = df.drop(['close_time', 'ignore'], axis=1)

    df = set_dtypes_compressed(df)

    # give all pairs the same nice cut-off
    df = df[df.index < str(date.today())]

    df.to_parquet(full_path)


def groom_data(dirname='data'):
    """go through data folder and perform a quick clean on all csv files"""

    for filename in os.listdir(dirname):
        if filename.endswith('.csv'):
            full_path = f'{dirname}/{filename}'
            quick_clean(pd.read_csv(full_path)).to_csv(full_path)


def compress_data(dirname='data'):
    """go through data folder and rewrite csv files to parquets"""

    os.makedirs('compressed', exist_ok=True)
    for filename in os.listdir(dirname):
        if filename.endswith('.csv'):
            full_path = f'{dirname}/{filename}'

            df = pd.read_csv(full_path)

            new_filename = filename.replace('.csv', '.parquet')
            new_full_path = f'compressed/{new_filename}'
            write_raw_to_parquet(df, new_full_path)


In [33]:
all_symbols = pd.DataFrame(requests.get(f'{API_BASE}exchangeInfo').json()['symbols'])

In [30]:
dict_ticker = {
    'Bitcoin Cash':'BCH',
    'Binance Coin':'BNB',
    'Bitcoin':'BTC',
    'EOS.IO':'EOS',
    'Ethereum Classic':'ETC',
    'Ethereum':'ETH',
    'Litecoin':'LTC',
    'Monero':'XMR',
    'TRON':'TRX',
    'Stellar':'XLM',
    'Cardano':'ADA',
    'IOTA':'IOTA',
    'Maker':'MKR',
    'Dogecoin':'DOGE'
}

In [25]:
for a in dict_ticker:
    quoteAssetsa = all_symbols[all_symbols.baseAsset == dict_ticker[a]].quoteAsset.unique()
    USDquoteAssetsa = [qA for qA in quoteAssetsa if 'USD' in qA]
    print(USDquoteAssetsa)

['USDT', 'USDC', 'TUSD', 'BUSD', 'FDUSD']
['USDT', 'TUSD', 'USDC', 'USDS', 'BUSD', 'USDP', 'FDUSD']
['USDT', 'TUSD', 'USDC', 'USDS', 'BUSD', 'USDP', 'FDUSD']
['USDT', 'TUSD', 'USDC', 'BUSD', 'FDUSD']
['USDT', 'USDC', 'TUSD', 'BUSD', 'FDUSD']
['USDT', 'TUSD', 'USDC', 'BUSD', 'USDP', 'FDUSD']
['USDT', 'TUSD', 'USDC', 'BUSD', 'FDUSD']
['USDT', 'BUSD']
['USDT', 'TUSD', 'USDC', 'BUSD']
['USDT', 'TUSD', 'USDC', 'BUSD', 'FDUSD']
['USDT', 'TUSD', 'USDC', 'BUSD', 'FDUSD']
['USDT', 'BUSD', 'FDUSD']
['USDT', 'BUSD']
['USDT', 'USDC', 'BUSD', 'TUSD', 'FDUSD']


In [26]:
quote = 'BUSD'

In [27]:
all_pairs = [(dict_ticker[a],quote) for a in dict_ticker]

In [28]:
all_pairs

[('BCH', 'BUSD'),
 ('BNB', 'BUSD'),
 ('BTC', 'BUSD'),
 ('EOS', 'BUSD'),
 ('ETC', 'BUSD'),
 ('ETH', 'BUSD'),
 ('LTC', 'BUSD'),
 ('XMR', 'BUSD'),
 ('TRX', 'BUSD'),
 ('XLM', 'BUSD'),
 ('ADA', 'BUSD'),
 ('IOTA', 'BUSD'),
 ('MKR', 'BUSD'),
 ('DOGE', 'BUSD')]

In [None]:
# make sure data folders exist
os.makedirs('data', exist_ok=True)
os.makedirs('compressed', exist_ok=True)

# do a full update on all pairs
n_count = len(all_pairs)
for n, pair in enumerate(all_pairs, 1):
    base, quote = pair
    new_lines = all_candles_to_csv(base=base, quote=quote)
    if new_lines > 0:
        print(f'{datetime.now()} {n}/{n_count} Wrote {new_lines} new lines to file for {base}-{quote}')
    else:
        print(f'{datetime.now()} {n}/{n_count} Already up to date with {base}-{quote}')

2025-01-30 18:41:19.028663 1/14 Already up to date with BCH-BUSD
2025-01-30 18:41:21.847687 2/14 Already up to date with BNB-BUSD
2025-01-30 18:41:24.690953 3/14 Already up to date with BTC-BUSD
2025-01-30 18:41:27.250552 4/14 Already up to date with EOS-BUSD
2025-01-30 18:41:29.889487 5/14 Already up to date with ETC-BUSD
2025-01-30 18:41:32.782606 6/14 Already up to date with ETH-BUSD
2025-01-30 18:41:35.477090 7/14 Already up to date with LTC-BUSD
2025-01-30 18:41:37.897723 8/14 Already up to date with XMR-BUSD
2025-01-30 18:41:40.528792 9/14 Already up to date with TRX-BUSD
2025-01-30 18:41:43.071846 10/14 Already up to date with XLM-BUSD
2025-01-30 18:41:45.749013 11/14 Already up to date with ADA-BUSD
2025-01-30 18:41:47.838450 12/14 Already up to date with IOTA-BUSD
2025-01-30 18:41:50.058091 13/14 Already up to date with MKR-BUSD
2025-01-30 18:41:52.328712 14/14 Already up to date with DOGE-BUSD


In [11]:
# TODO: Checking out how to handle parquet data ...

#import pyarrow as pa
#import pyarrow.parquet as pq
import pyarrow.dataset as ds
#import pandas as pd

from torch.utils.data import Dataset, IterableDataset
#from torch.utils.data import get_worker_info
from torch.multiprocessing import Queue

class IterableParquetDataset(IterableDataset):
    def __init__(self, path, process_func=None):
        super().__init__()
        self.__process_func = process_func

        dataset = ds.dataset(path)
        self.__batches = Queue()
        [self.__batches.put(batch) for batch in dataset.to_batches()]

    def __iter__(self):
        while not self.__batches.empty():
            batch = self.__batches.get().to_pydict()
            if self.__process_func is not None:
                batch.update(self.__process_func(batch))
            yield batch
        self.__batches.close()

from typing import List

class BinanceDataset(Dataset):
    def __init__(self, path):
        super().__init__()
        self._dict = ds.dataset(path).to_pydict()

    def __len__(self):
         return len(self._dict)

    def __getitem__(self, index: List):
        return { key: self._dict[key][index] for key in self._dict.keys() }
        #x, y = torch.load(self.files[index])
        #return x, y

from typing import Any

import torch
import pyarrow.dataset
from torch.utils.data import Dataset

class CustomDataset(Dataset):
    def __init__(self, data: pyarrow.dataset.Dataset, label_column: str) -> None:
        self.data = data

    def __len__(self) -> None:
        return self.data.count_rows()

    def __getitem__(self, index: int) -> tuple[list[Any], Any]:
        row = self.data.take([index]).to_pylist()[0]
        x = [v for k, v in row.items() if k != self.label_column]
        y = row[self.label_column]

        return x, y

data = pyarrow.dataset.dataset('dataset_dir', format='parquet')
torch_dataset = CustomDataset(data=data, label_column='label')

In [13]:
# TODO: Just testing ...

def first(iterable, condition = lambda x: True):
    """
    Returns the first item in the `iterable` that
    satisfies the `condition`.

    If the condition is not given, returns the first item of
    the iterable.

    Raises `StopIteration` if no item satysfing the condition is found.

    >>> first( (1,2,3), condition=lambda x: x % 2 == 0)
    2
    >>> first(range(3, 100))
    3
    >>> first( () )
    Traceback (most recent call last):
    ...
    StopIteration
    """

    return next(x for x in iterable if condition(x))

# digging into the data
#dataset = IterableParquetDataset('compressed/BTC-BUSD.parquet')

# Print the keys in the dictionary
#print(first(dataset).keys())

#for batch in dataset:
#    print(len(batch['open']))

dataset2 = BinanceDataset('compressed/BTC-BUSD.parquet')
print(dataset2[0])

AttributeError: 'pyarrow._dataset.FileSystemDataset' object has no attribute 'to_pydict'