In [1]:
import concurrent.futures
import json
import logging
import math
import os.path
import time
from os import listdir

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from binance.client import Client
from binance.exceptions import BinanceAPIException, BinanceRequestException

# Prep some helpful functions and const to explore data.

In [2]:
PARQUETFOLDERNAME  = "dataset"

pa_type = [
    pa.timestamp("ms"),
    pa.float32(),
    pa.float32(),
    pa.float32(),
    pa.float32(),
    pa.float32(),
    pa.timestamp("ms"),
    pa.float32(),
    pa.uint16(),
    pa.float32(),
    pa.float32(),
    pa.float32(),
]

pa_names = [
    "open_time",
    "open",
    "high",
    "low",
    "close",
    "volume",
    "close_time",
    "quote_asset_volume",
    "number_of_trades",
    "taker_buy_base_asset_volume",
    "taker_buy_quote_asset_volume",
    "ignore",
]

pa_field = [pa.field(g[0], g[1]) for g in zip(pa_names, pa_type)]
pa_schema = pa.schema(pa_field)

In [3]:
pa_schema

open_time: timestamp[ms]
open: float
high: float
low: float
close: float
volume: float
close_time: timestamp[ms]
quote_asset_volume: float
number_of_trades: uint16
taker_buy_base_asset_volume: float
taker_buy_quote_asset_volume: float
ignore: float

In [4]:
def _get_klines(self, sym, start, end=None):
    "Update given symbol pair and returns a pyarrow table of the klines."
    # Request
    response = self.binance_client.get_klines(
        symbol=sym, interval=Client.KLINE_INTERVAL_1MINUTE, limit=1000, startTime=start
    )
    updatedPart = pd.DataFrame(response, columns=self.LABELS)
    updatedPart = updatedPart.astype(self.DATATYPES)
    # Drop useless columns
    updatedPart.drop(["close_time", "ignore"], axis=1, inplace=True)
    # convert to pyarrow table and return.
    return pa.Table.from_pandas(updatedPart)


def transpose_list(ll):
    "Given a list of list, transpose the list. I.e list of rows turns to a list of columns."
    return [list(row) for row in zip(*ll)]

def get_saved_parquet(sym_pair):
    "Gets saved historical parquet data or create new file and return it."
    filename = f"{sym_pair[0]}-{sym_pair[1]}.parquet"
    filepath = f"./{PARQUETFOLDERNAME}/{filename}"
    logging.info(f"Reading in parquet file: {filepath}")
    if os.path.exists(filepath):
        try:
            ret = pq.read_table(filepath)
            return ret
        except:
            # Unable to read file, probably corrupted. Delete file.
            remove(filepath)
    # Create empty pa table.
    tb = self._to_pa_table([])
    self._write_parquet(sym_pair, tb, timestamp="ms")
    return tb


def write_parquet(sym_pair, pa_tb, append="", timestamp=None):
    "Save a parquet file to disk using Pyarrow."
    filename = f"{sym_pair[0]}-{sym_pair[1]}{append}.parquet"
    logging.info("Saving as: " + filename)
    filepath = f"./{PARQUETFOLDERNAME}/{filename}"
    pq.write_table(pa_tb, filepath, coerce_timestamps=timestamp)

# Setup data and client for TUSDB TUSD

In [5]:
sym = "AAVEBKRW"
sym_p = ("AAVE", "BKRW")
start = 631152000000
binance_client = Client()

# Make call to binance using python package

In [6]:
response = binance_client.get_klines(
    symbol=sym, interval=Client.KLINE_INTERVAL_1MINUTE, limit=1000, startTime=start
)

# Data
Data comes in as a list of a single row of data corresponding to the LABEL

One row of data

In [7]:
response[0:1]

[[1602730800000,
  '57946.00000000',
  '66002.00000000',
  '57946.00000000',
  '66002.00000000',
  '2.05500000',
  1602730859999,
  '135472.99000000',
  2,
  '2.05500000',
  '135472.99000000',
  '0']]

List of rows needs to be transposed into list of columns to then create a Pyarrow table

In [8]:
list_of_cols = transpose_list(response)

One row of data

In [9]:
[col[0:1] for col in list_of_cols]

[[1602730800000],
 ['57946.00000000'],
 ['66002.00000000'],
 ['57946.00000000'],
 ['66002.00000000'],
 ['2.05500000'],
 [1602730859999],
 ['135472.99000000'],
 [2],
 ['2.05500000'],
 ['135472.99000000'],
 ['0']]

Take each col and create a pyarrow column then make a table of it. Data needs to be converted from string to a number. Float works fine for all.

In [10]:
pa_cols = [pa.array([float(data) for data in list_of_cols[n]], pa_type[n]) for n in range(0,len(pa_type)) ]

In [11]:
pa_table = pa.table(pa_cols, schema=pa_schema)

In [12]:
try:
    pa_table = pa_table.drop(["close_time","ignore"])
except:
    pass

In [13]:
pa_table

pyarrow.Table
open_time: timestamp[ms]
open: float
high: float
low: float
close: float
volume: float
quote_asset_volume: float
number_of_trades: uint16
taker_buy_base_asset_volume: float
taker_buy_quote_asset_volume: float
----
open_time: [[2020-10-15 03:00:00.000,2020-10-15 03:01:00.000,2020-10-15 03:02:00.000,2020-10-15 03:03:00.000,2020-10-15 03:04:00.000,2020-10-15 03:05:00.000,2020-10-15 03:06:00.000,2020-10-15 03:07:00.000,2020-10-15 03:08:00.000,2020-10-15 03:09:00.000,...,2020-10-15 19:30:00.000,2020-10-15 19:31:00.000,2020-10-15 19:32:00.000,2020-10-15 19:33:00.000,2020-10-15 19:34:00.000,2020-10-15 19:35:00.000,2020-10-15 19:36:00.000,2020-10-15 19:37:00.000,2020-10-15 19:38:00.000,2020-10-15 19:39:00.000]]
open: [[57946,299997,299997,54110,21109,59215,59215,59215,59215,52917,...,55047,55047,55047,55047,55047,55047,55047,55047,55047,55047]]
high: [[66002,299997,299997,54110,21109,59215,59215,59215,59215,52917,...,55047,55047,55047,55047,55047,55047,55047,55047,55047,55047]]
l

# Table can then be used to concat with existing data by reading in file. As well as creating graphs and charts.

In [14]:
write_parquet(sym_p, pa_table)

In [15]:
read_table = get_saved_parquet(sym_p)

In [16]:
read_table

pyarrow.Table
open_time: timestamp[ms]
open: float
high: float
low: float
close: float
volume: float
quote_asset_volume: float
number_of_trades: uint16
taker_buy_base_asset_volume: float
taker_buy_quote_asset_volume: float
----
open_time: [[2020-10-15 03:00:00.000,2020-10-15 03:01:00.000,2020-10-15 03:02:00.000,2020-10-15 03:03:00.000,2020-10-15 03:04:00.000,2020-10-15 03:05:00.000,2020-10-15 03:06:00.000,2020-10-15 03:07:00.000,2020-10-15 03:08:00.000,2020-10-15 03:09:00.000,...,2020-10-15 19:30:00.000,2020-10-15 19:31:00.000,2020-10-15 19:32:00.000,2020-10-15 19:33:00.000,2020-10-15 19:34:00.000,2020-10-15 19:35:00.000,2020-10-15 19:36:00.000,2020-10-15 19:37:00.000,2020-10-15 19:38:00.000,2020-10-15 19:39:00.000]]
open: [[57946,299997,299997,54110,21109,59215,59215,59215,59215,52917,...,55047,55047,55047,55047,55047,55047,55047,55047,55047,55047]]
high: [[66002,299997,299997,54110,21109,59215,59215,59215,59215,52917,...,55047,55047,55047,55047,55047,55047,55047,55047,55047,55047]]
l

# Conclusion

All data comes to about 1800 symbol pairs comes together to about 30 to 40 GB of parquet file data. The same amount of data could be double or triple if stored using some other format.

The data is easily managed and split between symbol pairs with the same kind of metadata which can also be eaisly modified or cast into a different form. No need to normalize data by much.

Other type of data like twitter mentions or types of trending data can be added in the future but going through twitter or google data might