In [4]:
import pandas as pd 
import numpy as np  
import json 
import os
from pathlib import Path
from os.path import join
from binance import Client, ThreadedWebsocketManager, ThreadedDepthCacheManager
from datetime import datetime, timedelta
import time

config_path = join(Path(os.getcwd()).parents[0].parents[0], 'config.json')

with open(config_path) as f:
    config = json.load(f)

# Binance REST API Guideline
# https://github.com/binance/binance-spot-api-docs/blob/master/rest-api.md

In [17]:
client = Client(config['key'], config['secret'])

# get all symbol prices
prices = client.get_all_tickers()

tickers = [x['symbol'] for x in prices]

In [3]:
portfolio_path = '/media/john/Data/Tensor_Invest_Fund/data/Tensor_Portfolio.csv'

In [4]:
df_portfolio = pd.read_csv(portfolio_path)

In [16]:
df_portfolio.sort_values(by = ['Weight'], ascending = False)

Unnamed: 0,Asset_ID,Weight,Asset_Name,Ticker
2,1,6.779922,Bitcoin,BTCUSDT
5,6,5.894403,Ethereum,ETHUSDT
10,3,4.406719,Cardano,ADAUSDT
1,0,4.304065,Binance Coin,BNBUSDT
13,4,3.555348,Dogecoin,DOGEUSDT
0,2,2.397895,Bitcoin Cash,BCHUSDT
6,9,2.397895,Litecoin,LTCUSDT
4,7,2.079442,Ethereum Classic,ETCUSDT
9,12,2.079442,Stellar,XLMUSDT
8,13,1.791759,TRON,TRXUSDT


In [17]:
for t in df_portfolio['Ticker'].unique():
    if (t in tickers):
        print(t, " available in Binance")
    else:
        print(t, " not in Binance")

BCHUSDT  available in Binance
BNBUSDT  available in Binance
BTCUSDT  available in Binance
EOSUSDT  available in Binance
ETCUSDT  available in Binance
ETHUSDT  available in Binance
LTCUSDT  available in Binance
XMRBTC  available in Binance
TRXUSDT  available in Binance
XLMUSDT  available in Binance
ADAUSDT  available in Binance
IOTAUSDT  available in Binance
MKRUSDT  available in Binance
DOGEUSDT  available in Binance


In [60]:
class ETL():

    def __init__(self, load_size_days, 
                    end_timestamp, 
                    start_time_stamp = None, 
                    total_days = None, ):

        self.load_size_days = load_size_days
        self.end_timestamp = end_timestamp
        self.start_time_stamp = start_time_stamp
        self.total_days = total_days
        self._import_timestamps = []

    @property
    def import_timestamps(self):
        return self._import_timestamps

    
    def create_import_timestamps_tuples(self):
        """Create import timestamps for Binance
        get historical klines function. 

        Args:
            total_days (int): Total days for which to get historical data
            load_size_days (int): number of days per request
            end_timestamp (datetime.datetime): last timestamp reference
        
        Returns:
            list: start and ending timestamps
        """
        
        self._import_timestamps = []

        if self.total_days is not None:

            print("for")
            
            for lot in range(1, int(self.total_days/self.load_size_days)):
                self._import_timestamps.append((self.end_timestamp - timedelta(days = lot * self.load_size_days), 
                                            self.end_timestamp - timedelta(days = (lot -1) * self.load_size_days)))
                print("adfadsfölkj")
        else:
            
            
            # To create daily updates for appending to historical data
            lot = 1
            end_time = self.start_time_stamp
            while(end_time < self.end_timestamp):

                start_time = self.start_time_stamp + timedelta(days = (lot -1)  * self.load_size_days)
                end_time =  self.start_time_stamp + timedelta(days = lot * self.load_size_days)
                
                if end_time > end_timestamp:
                    end_time = end_timestamp

                self._import_timestamps.append((start_time, 
                                                end_time))

                lot += 1

            
        return self._import_timestamps

    def retrieve_historical_data(self):
        raise NotImplementedError
        

    def update_data(lastest_update_date ):
        raise NotImplementedError



class ETL_Binance(ETL):

    def __init__(self, tickers, 
                        storage_folder,
                        load_size_days, 
                        end_timestamp,
                        start_time_stamp = None, 
                        total_days = None):

        super().__init__(load_size_days,
                            end_timestamp,
                            start_time_stamp,
                            total_days  )

        self._tickers = tickers
        self.storage_folder = storage_folder
        self._client = None


    @property     
    def tickers(self):
        return self._tickers

    @property     
    def client(self):
        return self._client

    def connect_API(self, config):
        try:
            self._client = Client(config['key'], config['secret'])
            print("Sucessful connection to Binance API")
        except Exception as e:
            print("Unsucessfull connection, ", e)

    def download_historical_data(self, interval, limit = 1000, verbose = 0):

        # Calculate Import time stamps
        self._import_timestamps = self.create_import_timestamps_tuples()

        print(self._import_timestamps)

        _df = pd.DataFrame()
        for ticker in self._tickers:

            print(ticker)

            if verbose > 0:
                print("Getting data for ticker", ticker)

            _df = self.retrieve_historical_data(ticker, interval, limit )

            if not _df.empty:
                self.store_historical_as_parquet(_df, ticker)

    
    def update_data(self, interval, limit = 1000, verbose = 0):
        """_summary_

        Args:
            interval (_type_): _description_
            limit (int, optional): _description_. Defaults to 1000.
            verbose (int, optional): _description_. Defaults to 0.
        """

       
        # TODO: adjust function to be able to update multiple
        # years at once, when year end to year begin
        for ticker in self._tickers:
            
            if verbose > 0:
                print("Updating data for ticker", ticker)

            df_old = self.read_latest_file(ticker)

            # Get latest update timestamp
            self.start_time_stamp = df_old['Date'].max()

            print("Latest update at", self.start_time_stamp )

            # Calculate Import time stamps
            self._import_timestamps = self.create_import_timestamps_tuples()

            if verbose > 0:
                print(" Updating data between", 
                    self._import_timestamps[0][0], 
                    " and ", self._import_timestamps[-1][1])

            df_new = self.retrieve_historical_data(ticker, interval, limit )

            if not df_new.empty:

                # Concanetate new data
                df_new = pd.concat([df_new, df_old], ignore_index= True).copy()

                self.store_update_as_parquet(df_new, ticker, year = max(df_new['Year'].unique()))

                if verbose > 0:
                    print("Sucessfully data update for", ticker, "New Update at", df_new['Date'].max())
                    print("\n")

            else:

                print("No data found to update")
                


    def retrieve_historical_data(self, ticker, interval, limit = 1000 ):
        """Get historical data from Binance API. 
        Avoiding request overload or account ban. 

        Args:
            import_timestamps (list): start and ending timestamps
            ticker (str): coinpair ticker
            interval (str): candle bars interval
            limit (int, optional): candle bars limit. Defaults to 1000.

        Returns:
            pd.DataFrame: historical data for selected ticker
        """


        dfs = []
        df_out = pd.DataFrame()

        number_of_requests = 0

        for start_str, end_str in self._import_timestamps:

            print("Getting data from ", start_str, " to , ", end_str)

            try:                
                bars = self._client.get_historical_klines(symbol = ticker, 
                                                    interval = interval,
                                                    start_str = str(start_str), 
                                                    end_str = str(end_str), 
                                                    limit = limit)
            except Exception as e:

                print(" Data could not be retrieved. ")
                print(" Error", e)
                print(" No more data will be imported. Closing Loop")
                break
                
            
            # TODO: Check if all data is in same timezone
            df = pd.DataFrame(bars)
            df["Date"] = pd.to_datetime(df.iloc[:,0], unit = "ms")
            df.columns = ["Open Time", "Open", "High", "Low", "Close", "Volume",
                            "Clos Time", "Quote Asset Volume", "Number of Trades",
                            "Taker Buy Base Asset Volume", "Taker Buy Quote Asset Volume", "Ignore", "Date"]

            
            dfs.append(df)

            number_of_requests += 1
            
            sleep_time_sec = np.random.randint(8,15)
            print("Waiting ", sleep_time_sec, " Sec...")

            # After 10 requests wait 60 sec for
            # next request
            if number_of_requests == 10:

                print(" Request number reached 10, waiting for 1 minute")

                sleep_time_sec = 60
                number_of_requests = 0
            
            time.sleep(sleep_time_sec)

        
        if dfs:

            #Concatenate all single results
            df_out = pd.concat(dfs, ignore_index = True)

            # Generate year Column
            df_out.loc[:,'Year'] = df_out['Date'].dt.year

            # Generate Ticker column
            df_out.loc[:,'Ticker'] = ticker

        return df_out

    def store_historical_as_parquet(self, df, ticker):
        """Store data as parquet.

        Args:
            df (pd.DataFrame): historical candle bar data
            ticker (str): coinpair ticker
        """

        ticker_dir = os.path.join(self.storage_folder, ticker)

        if not os.path.exists(ticker_dir):
            os.mkdir(ticker_dir)

        # Generate year Column
        df.loc[:,'Year'] = df['Date'].dt.year

        # Generate Ticker column
        df.loc[:,'Ticker'] = ticker

        for year in df['Year'].unique():

            file_name = f"{year}_{ticker}.parquet"

            file_storage_location = os.path.join(ticker_dir, file_name)

            df[df['Year'] == year].to_parquet(file_storage_location)

            print("File Store at ", file_storage_location)

    def store_update_as_parquet(self, df, ticker, year):
        """_summary_

        Args:
            df (_type_): _description_
            ticker (_type_): _description_
            year (_type_): _description_
        """

        ticker_dir = os.path.join(self.storage_folder, ticker)
        file_name = f"{year}_{ticker}.parquet"
        file_storage_location = os.path.join(ticker_dir, file_name)

        df.to_parquet(file_storage_location)

        print("File Store at ", file_storage_location)

        pass

    def select_latest_ticker_file(self, ticker):
        """Get storage location for most recent file
        given a ticker

        Args:
            ticker (str): coinpair ticker

        Returns:
            str: storage location for most recent file
        """

        ticker_dir = os.path.join(self.storage_folder, ticker)

        latest_update_year = max([x[:4] for x in os.listdir(ticker_dir)])

        lastest_file = os.path.join(self.storage_folder, 
                                ticker, 
                                f"{latest_update_year}_{ticker}.parquet")

        return lastest_file



    def read_latest_file(self, ticker):
        """Get most recent data for a ticker

        Args:
            ticker (str): coinpair ticker

        Returns:
            pd.DataFrame: data frame with historical data
        """

        lastest_file  = self.select_latest_ticker_file(ticker)

        _df = pd.read_parquet(lastest_file)

        return _df
    

In [62]:
load_size_days = 5
start_time_stamp = datetime(2022, 3, 20, 0, 0, 0)
start_time_stamp = None
end_timestamp = datetime.utcnow()
interval = Client.KLINE_INTERVAL_1MINUTE
storage_folder = '/media/john/Data/Tensor_Invest_Fund/data/Cryptos'
storage_folder = '/mnt/Data/Tensor_Invest_Fund/data/Cryptos/'
verbose = 1

SYMBOLS = ['BNBUSDT', 'BNBBTC', 'BTCUSDT', 'EOSUSDT', 'ETCUSDT',
       'LTCUSDT', 'XMRBTC', 'TRXUSDT', 'XLMUSDT', 'ADAUSDT', 'IOTAUSDT',
       'MKRUSDT', 'DOGEUSDT','ETHUSDT']

symbols = ['DOGEUSDT']

new_ETL = ETL_Binance(symbols, 
                     storage_folder,
                    load_size_days, 
                    end_timestamp, 
                    start_time_stamp, 
                    total_days = None)

new_ETL.connect_API(config)

Sucessful connection to Binance API


In [57]:
new_ETL.download_historical_data(interval, verbose = verbose)

In [63]:
new_ETL.update_data(interval, verbose = 1)

Updating data for ticker DOGEUSDT
Latest update at 2022-03-21 18:37:00
 Updating data between 2022-03-21 18:37:00  and  2022-03-22 06:17:58.551664
Getting data from  2022-03-21 18:37:00  to ,  2022-03-22 06:17:58.551664
Waiting  8  Sec...
File Store at  /mnt/Data/Tensor_Invest_Fund/data/Cryptos/DOGEUSDT/2022_DOGEUSDT.parquet
Sucessfully data update for DOGEUSDT New Update at 2022-03-22 06:17:00




In [18]:
ticker = "BNBBTC"
start_str = "2022-02-24 00:00:00"
end_str = "2022-03-01 00:00:00"
limit = 1000


bars = client.get_historical_klines(
                                symbol = ticker, 
                                interval = interval,
                                start_str = str(start_str), 
                                end_str = str(end_str), 
                                limit = limit)

In [None]:
import_timestamps = new_ETL.create_import_timestamps_tuples()

In [155]:
new_ETL.symbols

['BNBUSDT',
 'BTCUSDT',
 'EOSUSDT',
 'ETCUSDT',
 'LTCUSDT',
 'XMRBTC',
 'TRXUSDT',
 'XLMUSDT',
 'ADAUSDT',
 'IOTAUSDT',
 'MKRUSDT',
 'DOGEUSDT']

In [146]:
import_timestamps

[(datetime.datetime(2022, 3, 3, 22, 53, 28),
  datetime.datetime(2022, 3, 8, 22, 53, 28)),
 (datetime.datetime(2022, 3, 8, 22, 53, 28),
  datetime.datetime(2022, 3, 13, 22, 53, 28)),
 (datetime.datetime(2022, 3, 13, 22, 53, 28),
  datetime.datetime(2022, 3, 18, 22, 53, 28)),
 (datetime.datetime(2022, 3, 18, 22, 53, 28),
  datetime.datetime(2022, 3, 21, 17, 12, 0, 375642))]

# 1. Historical Data Retrieval Process

In [14]:
path = os.path.join( storage_folder, "BTCUSDT", "2022_BTCUSDT.parquet")

In [15]:
t = pd.read_parquet(path).head()

In [16]:
t['Date'].max()

Timestamp('2022-03-08 22:58:00')

In [7]:
df_portfolio['Ticker'].unique()

array(['BCHUSDT', 'BNBUSDT', 'BTCUSDT', 'EOSUSDT', 'ETCUSDT', 'ETHUSDT',
       'LTCUSDT', 'XMRBTC', 'TRXUSDT', 'XLMUSDT', 'ADAUSDT', 'IOTAUSDT',
       'MKRUSDT', 'DOGEUSDT'], dtype=object)

In [156]:


total_days = 400
load_size_days = 5
end_timestamp = datetime.utcnow()


In [9]:
import_timestamps = create_import_timestamps_tuples(load_size_days, end_timestamp, total_days )

In [135]:
for symbol in SYMBOLS:

    print("Getting historical data for ", symbol)

    _df = get_historical_data_binance_api(import_timestamps, symbol, interval, limit = 1000 )

    store_historical_candle_data_as_parquet(_df, storage_folder, symbol)

    print("  Waiting 120 sec to start data retrieval for next coin")

    time.sleep(120)

Getting historical data for  BNBUSDT
Getting data from  2022-03-08 22:53:28.076502  to ,  2022-03-13 22:53:28.076502
Waiting  12  Sec...
Getting data from  2022-03-03 22:53:28.076502  to ,  2022-03-08 22:53:28.076502
Waiting  9  Sec...
Getting data from  2022-02-26 22:53:28.076502  to ,  2022-03-03 22:53:28.076502
Waiting  13  Sec...
Getting data from  2022-02-21 22:53:28.076502  to ,  2022-02-26 22:53:28.076502
Waiting  13  Sec...
Getting data from  2022-02-16 22:53:28.076502  to ,  2022-02-21 22:53:28.076502
Waiting  8  Sec...
Getting data from  2022-02-11 22:53:28.076502  to ,  2022-02-16 22:53:28.076502
Waiting  9  Sec...
Getting data from  2022-02-06 22:53:28.076502  to ,  2022-02-11 22:53:28.076502
Waiting  12  Sec...
Getting data from  2022-02-01 22:53:28.076502  to ,  2022-02-06 22:53:28.076502
Waiting  9  Sec...
Getting data from  2022-01-27 22:53:28.076502  to ,  2022-02-01 22:53:28.076502
Waiting  14  Sec...
Getting data from  2022-01-22 22:53:28.076502  to ,  2022-01-27 22:

# 2. Additional Data Retrieval Functions

In [None]:
# place a test market buy order, to place an actual order use the create_order function
# order = client.create_test_order(
#     symbol='BNBBTC',
#    side=Client.SIDE_BUY,
#    type=Client.ORDER_TYPE_MARKET,
#    quantity=100)

In [None]:
# get market depth
depth = client.get_order_book(symbol='BNBBTC')

In [None]:
#days = 5
#now = datetime.utcnow()
#past_str = str(now - timedelta(days = days))
#end_str = str(now)
#limit = 1000

df = df[["Date", "Open", "High", "Low", "Close", "Volume"]].copy()
df.set_index("Date", inplace = True)
for column in df.columns:
    df[column] = pd.to_numeric(df[column], errors = "coerce")
df["Complete"] = [True for row in range(len(df)-1)] + [False]

In [None]:
# withdraw 100 ETH
# check docs for assumptions around withdrawals
from binance.exceptions import BinanceAPIException
try:
    result = client.withdraw(
        asset='ETH',
        address='<eth_address>',
        amount=100)
except BinanceAPIException as e:
    print(e)
else:
    print("Success")

# fetch list of withdrawals
withdraws = client.get_withdraw_history()

# fetch list of ETH withdrawals
eth_withdraws = client.get_withdraw_history(coin='ETH')

# get a deposit address for BTC
address = client.get_deposit_address(coin='BTC')

In [None]:

# fetch 30 minute klines for the last month of 2017
klines = client.get_historical_klines("ETHBTC", Client.KLINE_INTERVAL_30MINUTE, "1 Dec, 2017", "1 Jan, 2018")

# fetch weekly klines since it listed
klines = client.get_historical_klines("NEOBTC", Client.KLINE_INTERVAL_1WEEK, "1 Jan, 2017")

# socket manager using threads
twm = ThreadedWebsocketManager()
twm.start()

# depth cache manager using threads
dcm = ThreadedDepthCacheManager()
dcm.start()

def handle_socket_message(msg):
    print(f"message type: {msg['e']}")
    print(msg)

def handle_dcm_message(depth_cache):
    print(f"symbol {depth_cache.symbol}")
    print("top 5 bids")
    print(depth_cache.get_bids()[:5])
    print("top 5 asks")
    print(depth_cache.get_asks()[:5])
    print("last update time {}".format(depth_cache.update_time))

twm.start_kline_socket(callback=handle_socket_message, symbol='BNBBTC')

dcm.start_depth_cache(callback=handle_dcm_message, symbol='ETHBTC')

# replace with a current options symbol
options_symbol = 'BTC-210430-36000-C'
dcm.start_options_depth_cache(callback=handle_dcm_message, symbol=options_symbol)

# join the threaded managers to the main thread
twm.join()
dcm.join()