In [1]:


from binance.client import AsyncClient, HistoricalKlinesType
import pandas as pd

import uuid
from pathlib import Path
import os
from datetime import datetime, timedelta
import time
import dateutil.parser as dp

import asyncio


In [2]:
client = await AsyncClient.create()

class Binance_Batch_Klines_Downloader:
  def __init__(
      self,
      save_dir="../raw_data/",
      start_date=None,
      end_date=None,
      days_back=1,
      interval="15m",
      klines_type=HistoricalKlinesType.FUTURES,
      rate_limit_ps=2
    ):
    self.save_dir = save_dir
    
    self.start_date = start_date
    self.end_date = end_date
    self.days_back = days_back
    self.start_days_back = str(datetime.utcnow() - timedelta(days = self.days_back))
    
    self.interval = interval
    self.klines_type = klines_type
    self.symbols = None
    self.rate_limit_ps = rate_limit_ps
    
  def _bars_to_df(self, bars):
    df = pd.DataFrame(bars)
    df["Date"] = pd.to_datetime(df.iloc[:,0], unit = "ms")
    df.columns = ["Open Time", "Open", "High", "Low", "Close", "Volume",
                  "Close Time", "Quote Asset Volume", "Number of Trades",
                  "Taker Buy Base Asset Volume", "Taker Buy Quote Asset Volume", "Ignore", "Date"]
    df = df[["Date", "Open", "High", "Low", "Close", "Volume"]].copy()
    df.set_index("Date", inplace = True)
    for column in df.columns:
        df[column] = pd.to_numeric(df[column], errors = "coerce")
        
    return df
  
  async def _get_symbols(self, klines_type):
    exchange_info = None
    if klines_type == HistoricalKlinesType.FUTURES:
      exchange_info = await client.futures_exchange_info()
    if klines_type == HistoricalKlinesType.SPOT:
      exchange_info = await client.get_exchange_info()
    
    if exchange_info is not None:
      return list(map(lambda x: x['symbol'], exchange_info['symbols']))
  
  async def download__all_klines_ohlcv(self, refetch_dir=None):
    self.symbols = await self._get_symbols(self.klines_type)
    
    
    if refetch_dir:
      fetched_symbols = self._get_already_fetched_symbols_from_dir(refetch_dir, klines_type=self.klines_type)
      self.fetched_symbols = fetched_symbols
      self.symbols = list(set(self.symbols) - set(fetched_symbols))
    
    new_dir = "binance_historical_%s_%s_%s" % (self.interval, self.klines_type.name, str(uuid.uuid4()))
    save_path = self.save_dir + new_dir
    Path(save_path).mkdir(parents=True, exist_ok=True)
    
    chunks = []
    for idx, _ in enumerate(self.symbols):
      if idx % self.rate_limit_ps == 0:
        symbols_chunk = self.symbols[idx:idx+self.rate_limit_ps]
        ohlcv_chunk = []
        
        for symbol in symbols_chunk:
          if self.start_date:
            from_unix = int(dp.parse(self.start_date).timestamp()*1000)
            until_unix = int(dp.parse(self.end_date).timestamp()*1000)
            ohlcv_chunk.append(self._download_range_ohlcv(symbol=symbol, interval=self.interval,
                                                   from_unix=from_unix, until_unix=until_unix,
                                                   klines_type=self.klines_type))
          elif self.start_days_back:
            ohlcv_chunk.append(client.get_historical_klines(symbol=symbol, interval=self.interval,
                                          from_unix=from_unix, end_str=None, klines_type=self.klines_type))
          
        chunks.append(ohlcv_chunk)
        
    for index, chunk in enumerate(chunks):
      print("Fetching data for ↓ %s %s" % (self.klines_type.name, self.interval), "%i / %i" % (index, len(chunks)))
      results = await asyncio.gather(*chunk)
      
      for i, bars in enumerate(results):
        print(self.symbols[index*self.rate_limit_ps+i])
        try:
          df = self._bars_to_df(bars)
          df.to_csv("%s/%s_%s_%s.csv" % (save_path, self.symbols[index*self.rate_limit_ps+i], self.klines_type.name, self.interval))
        except:
          print("Couldn't construct DataFrame from raw data for %s" % self.symbols[index*self.rate_limit_ps+i])
          
  async def _download_range_ohlcv(
      self,
      symbol,
      interval,
      from_unix,
      until_unix=int(time.time())*1000-86400000,
      klines_type=HistoricalKlinesType.FUTURES,
      
    ):
    ohlcv = []
    current_unix = from_unix
    
    try:
      print("Fetching %s from %s to %s" % (symbol, current_unix, until_unix))
      ohlcv_chunk = await client.get_historical_klines(symbol=symbol, interval=interval,
                                          start_str=current_unix, end_str=until_unix, klines_type=klines_type)
      if type(ohlcv_chunk) == list:
        ohlcv.extend(ohlcv_chunk)
    except:
      print("Couldn't fetch OHLCV for %s %s" % (symbol, klines_type))
      return None
    
    if type(ohlcv[-1][0]) == int:
      try:
        while ohlcv[-1][0] < until_unix-86400000 and ohlcv[-1][0] != current_unix:
          current_unix = ohlcv[-1][0]
          print("Fetching %s from %s to %s" % (symbol, current_unix, until_unix))
          ohlcv_chunk = await client.get_historical_klines(symbol=symbol, interval=interval,
                                              start_str=current_unix, end_str=until_unix, klines_type=klines_type)
          ohlcv.extend(ohlcv_chunk)
      except:
        print("Couldn't recur historical data for %s" % symbol)
      
    return ohlcv
  
  def _get_already_fetched_symbols_from_dir(self, path, klines_type):
    symbols = []
    with os.scandir('../raw_data/%s' % path) as entries:
      for entry in entries:
        instrument = entry.name.split("_"+klines_type.name)[0]
        symbols.append(instrument)
    return symbols
    

<h2 style="color: orange">DEMO</h2>

In [3]:
downloader = Binance_Batch_Klines_Downloader(
  save_dir="../raw_data/BINANCE_DOWNLOADER_TESTS/", interval="3m",
  start_date='2018-01-01 00:00:00', end_date=str(datetime.utcnow() - timedelta(days = 1)),
  klines_type=HistoricalKlinesType.FUTURES, rate_limit_ps=4)

In [8]:
# refetch_dir="../raw_data/BINANCE_DOWNLOADER_TESTS/binance_historical_3m_FUTURES_fded11fd-145c-471c-b78e-34d1b917a265"
await downloader.download__all_klines_ohlcv(refetch_dir="../raw_data/BINANCE_DOWNLOADER_TESTS/binance_historical_3m_FUTURES_fded11fd-145c-471c-b78e-34d1b917a265")

Fetching data for ↓ FUTURES 3m 0 / 1
Fetching FLMUSDT from 1514761200000 to 1669581076671
Fetching LUNA2USDT from 1514761200000 to 1669581076671
Fetching STMXUSDT from 1514761200000 to 1669581076671
FLMUSDT
LUNA2USDT
STMXUSDT


<h3 style="color: yellow;">Observation length investigation</h3>

In [7]:
import os

In [13]:
dir_paths = ["BINANCE_DOWNLOADER_TESTS/binance_historical_30m_FUTURES_6b1f260d-1873-417a-ae21-1317557f5930"]
def closings_csv_to_df():
    # reading Close values and merging to one DF
    df_closings = pd.DataFrame()
    
    for path in dir_paths:
      with os.scandir('../raw_data/%s' % path) as entries:
          for entry in entries:
            instrument = "_".join(entry.name.split("_")[0:2])
            df = pd.read_csv('../raw_data/%s/%s' % (path, entry.name), index_col="Date")
            df = df[["Close"]].copy()
            df.columns = [instrument]
            df_closings = pd.concat([df_closings, df], axis=1)
    
    # filtering data based on amount of observations in DF
    df_observation_num = pd.DataFrame(columns=["observations"])
    for column in df_closings.columns:
      df_observation_num.loc[column] = len(df_closings[column].dropna())
      
    return df_observation_num

    # drop_columns = []
    # for _, row in df_observation_num.iterrows():
    #   # arbitrarily selected value based on bottom values from df_observation_num
    #   if row.observations < self.observations_low_pass:
    #     drop_columns.append(row.name)
        
    # # removing outliers from the original DF
    # df_closings.drop(columns=drop_columns, inplace=True)

    # # cleaning DF
    # df_closings.dropna(inplace=True)
            
    # self.df = df_closings
    df_closings = pd.DataFrame()
df_closings = pd.DataFrame()   
for path in dir_paths:
  with os.scandir('../raw_data/%s' % path) as entries:
      for entry in entries:
        instrument = "_".join(entry.name.split("_")[0:2])
        df = pd.read_csv('../raw_data/%s/%s' % (path, entry.name), index_col="Date")
        df = df[["Close"]].copy()
        df.columns = [instrument]
        df_closings = pd.concat([df_closings, df], axis=1)
    

In [9]:
observations_df = closings_csv_to_df()
observations_df["days"] = observations_df["observations"]/48
observations_df.sort_values(by="observations", ascending=False, inplace=True)
observations_df


Unnamed: 0,observations,days
BTCUSDT_FUTURES,56057,1167.854167
ETHUSDT_FUTURES,52237,1088.270833
BCHUSDT_FUTURES,51179,1066.229167
XRPUSDT_FUTURES,50316,1048.250000
EOSUSDT_FUTURES,50220,1046.250000
...,...,...
ICPUSDT_FUTURES,2567,53.479167
APTUSDT_FUTURES,1512,31.500000
QNTUSDT_FUTURES,1463,30.479167
APTBUSD_FUTURES,1223,25.479167


In [11]:
target_instruments = observations_df.loc[observations_df.days > 350]
target_instruments

Unnamed: 0,observations,days
BTCUSDT_FUTURES,56057,1167.854167
ETHUSDT_FUTURES,52237,1088.270833
BCHUSDT_FUTURES,51179,1066.229167
XRPUSDT_FUTURES,50316,1048.250000
EOSUSDT_FUTURES,50220,1046.250000
...,...,...
KLAYUSDT_FUTURES,19365,403.437500
ARPAUSDT_FUTURES,19029,396.437500
CTSIUSDT_FUTURES,18693,389.437500
LPTUSDT_FUTURES,17925,373.437500


In [12]:
target_instruments.index, len(target_instruments.index)

Index(['BTCUSDT_FUTURES', 'ETHUSDT_FUTURES', 'BCHUSDT_FUTURES',
       'XRPUSDT_FUTURES', 'EOSUSDT_FUTURES', 'LTCUSDT_FUTURES',
       'TRXUSDT_FUTURES', 'ETCUSDT_FUTURES', 'LINKUSDT_FUTURES',
       'XLMUSDT_FUTURES',
       ...
       'DYDXUSDT_FUTURES', '1000XECUSDT_FUTURES', 'GALAUSDT_FUTURES',
       'CELOUSDT_FUTURES', 'ARUSDT_FUTURES', 'KLAYUSDT_FUTURES',
       'ARPAUSDT_FUTURES', 'CTSIUSDT_FUTURES', 'LPTUSDT_FUTURES',
       'ENSUSDT_FUTURES'],
      dtype='object', length=133)

In [17]:
df_closings = df_closings.filter(items=target_instruments.index)
df_closings

Unnamed: 0_level_0,BTCUSDT_FUTURES,ETHUSDT_FUTURES,BCHUSDT_FUTURES,XRPUSDT_FUTURES,EOSUSDT_FUTURES,LTCUSDT_FUTURES,TRXUSDT_FUTURES,ETCUSDT_FUTURES,LINKUSDT_FUTURES,XLMUSDT_FUTURES,...,DYDXUSDT_FUTURES,1000XECUSDT_FUTURES,GALAUSDT_FUTURES,CELOUSDT_FUTURES,ARUSDT_FUTURES,KLAYUSDT_FUTURES,ARPAUSDT_FUTURES,CTSIUSDT_FUTURES,LPTUSDT_FUTURES,ENSUSDT_FUTURES
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2020-09-04 07:00:00,10253.99,383.69,223.20,0.2486,2.679,48.48,0.04274,5.114,11.495,0.07674,...,,,,,,,,,,
2020-09-04 07:30:00,10295.64,386.29,224.19,0.2493,2.712,48.76,0.04245,5.135,11.646,0.07702,...,,,,,,,,,,
2020-09-04 08:00:00,10394.81,393.71,228.75,0.2514,2.757,49.43,0.04289,5.271,11.988,0.07864,...,,,,,,,,,,
2020-09-04 08:30:00,10422.00,395.89,228.97,0.2523,2.793,49.65,0.04179,5.268,12.336,0.07868,...,,,,,,,,,,
2020-09-04 09:00:00,10434.85,396.68,230.09,0.2514,2.821,49.74,0.04113,5.261,12.353,0.07883,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2020-01-06 05:30:00,7529.72,139.07,236.40,,,,,,,,...,,,,,,,,,,
2020-01-06 06:00:00,7515.27,138.88,235.99,,,,,,,,...,,,,,,,,,,
2020-01-06 06:30:00,7517.40,138.83,236.01,,,,,,,,...,,,,,,,,,,
2020-01-06 07:00:00,7517.45,138.93,236.20,,,,,,,,...,,,,,,,,,,


In [8]:
path = "BINANCE_DOWNLOADER_TESTS/binance_historical_5m_FUTURES_cdcfc8f6-99fe-4124-acec-87acd320c321"
with os.scandir('../raw_data/%s' % path) as entries:
  for entry in entries:
    instrument = "_".join(entry.name.split("_")[0:1])
    print(instrument)

YFIUSDT
CRVUSDT
MATICUSDT
IOTAUSDT
SOLUSDT
ANTUSDT
1000LUNCUSDT
CVXUSDT
INJUSDT
KNCUSDT
OCEANUSDT
BNXUSDT
FOOTBALLUSDT
LDOUSDT
COMPUSDT
APEUSDT
DARUSDT
KLAYUSDT
ARUSDT
SKLUSDT
CVXBUSD
ICXUSDT
TOMOUSDT
LDOBUSD
ETCBUSD
MATICBUSD
ONEUSDT
STORJUSDT
ANCBUSD
ALPHAUSDT
SNXUSDT
1000LUNCBUSD
STGUSDT
DOTBUSD
SUSHIUSDT
1000SHIBBUSD
API3USDT
XRPUSDT
ONTUSDT
ALICEUSDT
GALUSDT
IMXUSDT
CELOUSDT
GALABUSD
FILUSDT
BALUSDT
SXPUSDT
PEOPLEUSDT
MANAUSDT
APTBUSD
ETHBUSD
TLMUSDT
KSMUSDT
ICPBUSD
BELUSDT
SPELLUSDT
1INCHUSDT
ICPUSDT
DOGEUSDT
BLUEBIRDUSDT
WOOUSDT
ETHUSDT
TLMBUSD
APTUSDT
GALBUSD
KAVAUSDT
CTSIUSDT
1000SHIBUSDT
QNTUSDT
FILBUSD
GALAUSDT
NEARUSDT
FLMUSDT
ETHUSDT
1000XECUSDT
UNIBUSD
GRTUSDT
RSRUSDT
BTCUSDT
OPUSDT
ADAUSDT
AVAXUSDT
RLCUSDT
BTCSTUSDT
SRMUSDT
JASMYUSDT
DEFIUSDT
ZRXUSDT
THETAUSDT
AVAXBUSD
CTKUSDT
FLOWUSDT
AXSUSDT
LEVERBUSD
RUNEUSDT
BTCUSDT
UNIUSDT
MKRUSDT
NEOUSDT
LUNA2BUSD
LTCBUSD
RENUSDT
EOSUSDT
ALGOUSDT
AMBBUSD
LRCUSDT
PHBBUSD
AUCTIONBUSD
LINKBUSD
FTMUSDT
TRXBUSD
ZILUSDT
LPTUSDT
BLZUSDT
S

In [3]:
Path("../processed_data/TEST_NEW/TEST_3").mkdir(parents=True, exist_ok=True)

In [6]:
os.path.exists("../processed_data/ebe")

False