In [1]:
import rx
from rx import operators as ops
from urllib3 import Retry
from datetime import datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
from collections import OrderedDict
from csv import DictReader
from os import getcwd, listdir
from os.path import join, exists, isfile, isdir, abspath, pardir
from time import time

#### Connection Configuration(s)

In [2]:
token = "RtWgHEkMiipthr5IkT1RpRed4ZXRH-9Q8YuEzZ3pdkjgelVA83D1SZ5tF9V_NY3u7W5-lAl-GuVUC3n3mS1cCQ=="
org = "ulb"
bucket = "advanced_db"
port=8086
url="http://localhost:{}".format(port)

In [3]:
parent_dir = abspath(join(getcwd(), pardir))
data_dir = join(parent_dir, "data")
book_dir = join(data_dir, "data_books")
trade_dir = join(data_dir, "data_trades")

In [5]:
# days = [1, 3, 7, 14, 30, 90]
days = [1, 2, 3, 4, 5, 6, 7]
trades_btc_files = sorted([join(trade_dir, x) for x in listdir(trade_dir) if isfile(join(trade_dir, x)) and str(x).__contains__("btc") and len(str(x).split("_")) > 2 and int(str(x).split("_")[2]) in days])
trades_eth_files = sorted([join(trade_dir, x) for x in listdir(trade_dir) if isfile(join(trade_dir, x)) and str(x).__contains__("eth") and len(str(x).split("_")) > 2 and int(str(x).split("_")[2]) in days])
orderbook_btc_files = sorted([join(book_dir, x) for x in listdir(book_dir) if isfile(join(book_dir, x)) and str(x).__contains__("btc") and len(str(x).split("_")) > 2 and int(str(x).split("_")[2]) in days])

##### Helper function for parsing data

In [6]:
def parse_order_book_row(row: OrderedDict):
    return Point("order_book").tag("symbol", "{}".format(row['symbol'])) \
        .field("ask_price_5", float(row['ask_price_5'])) \
        .field("ask_volume_5", float(row['ask_volume_5'])) \
        .field("ask_price_4", float(row['ask_price_4'])) \
        .field("ask_volume_4", float(row['ask_volume_4'])) \
        .field("ask_price_3", float(row['ask_price_3'])) \
        .field("ask_volume_3", float(row['ask_volume_3'])) \
        .field("ask_price_2", float(row['ask_price_2'])) \
        .field("ask_volume_2", float(row['ask_volume_2'])) \
        .field("ask_price_1", float(row['ask_price_1'])) \
        .field("ask_volume_1", float(row['ask_volume_1'])) \
        .field("bid_price_1", float(row['bid_price_1'])) \
        .field("bid_volume_1", float(row['bid_volume_1'])) \
        .field("bid_price_2", float(row['bid_price_2'])) \
        .field("bid_volume_2", float(row['bid_volume_2'])) \
        .field("bid_price_3", float(row['bid_price_3'])) \
        .field("bid_volume_3", float(row['bid_volume_3'])) \
        .field("bid_price_4", float(row['bid_price_4'])) \
        .field("bid_volume_4", float(row['bid_volume_4'])) \
        .field("bid_price_5", float(row['bid_price_5'])) \
        .field("bid_volume_5", float(row['bid_volume_5'])) \
        .time(datetime.fromtimestamp(float(row['time_stamp'])))

def parse_trade_row(row: OrderedDict):
    return Point("trades").tag("symbol", "{}".format(row['symbol'])) \
        .field("side", 1 if row['side'] == 'buy' else -1) \
        .field("price", float(row['price'])) \
        .field("amount", float(row['amount'])) \
        .time(datetime.fromtimestamp(float(row['time_stamp'])))

##### Helper function to load the data in InfluxDB

In [7]:
def load_data_influxdb(files, parse_data_func):
    for file in files:
        data = list(map(parse_data_func, DictReader(open(file, 'r'))))
        with InfluxDBClient(url=url, token=token, org=org) as client:
            with client.write_api(write_options=ASYNCHRONOUS) as write_api:
                start_time = time()
                write_api.write(bucket=bucket, record=data)
                print("Took {} seconds to load {} records in {}".format( time() - start_time, len(data), file))

##### Helper function to remove the bucket

In [9]:
def remove_bucket(bucket_name):
    with InfluxDBClient(url=url, token=token, org=org) as client:
        buckets_api = client.buckets_api()
        start_time = time()
        buckets_api.delete_bucket(bucket_name)
        print("Took {} seconds to delete '{}' bucket".format( time() - start_time, bucket_name ))

##### Load LOB data into InfluxDB

In [8]:
load_data_influxdb(orderbook_btc_files, parse_order_book_row)

Took 12.782481908798218 seconds to load 345622 records in /Users/mohammadzainabbas/Masters/ULB/Advanced-Databases/advanced-database-project/data/data_books/orderbook_btc_1_day.csv
Took 25.623060941696167 seconds to load 691244 records in /Users/mohammadzainabbas/Masters/ULB/Advanced-Databases/advanced-database-project/data/data_books/orderbook_btc_2_day.csv
Took 38.982547998428345 seconds to load 1036863 records in /Users/mohammadzainabbas/Masters/ULB/Advanced-Databases/advanced-database-project/data/data_books/orderbook_btc_3_day.csv
Took 52.094724893569946 seconds to load 1382485 records in /Users/mohammadzainabbas/Masters/ULB/Advanced-Databases/advanced-database-project/data/data_books/orderbook_btc_4_day.csv
Took 65.81292605400085 seconds to load 1728107 records in /Users/mohammadzainabbas/Masters/ULB/Advanced-Databases/advanced-database-project/data/data_books/orderbook_btc_5_day.csv
Took 80.34209108352661 seconds to load 2073730 records in /Users/mohammadzainabbas/Masters/ULB/Adv

In [None]:
for day in days:
    