In [1]:
import os
import sys
import ccxt
import pandas as pd
import csv
from datetime import datetime
import time
# import pytz
import json
import numpy as np 
from pymongo import MongoClient
from bson.objectid import ObjectId
from pprint import pprint

import schedule


In [2]:
mongo_client = MongoClient()

In [3]:
def retry_fetch_ohlcv(exchange_id, max_retries, symbol, timeframe, since, limit):
    exchange = getattr(ccxt, exchange_id)({
        'enableRateLimit': True,  # required by the Manual
    })
    num_retries = 0
    try:
        num_retries += 1
        ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since, limit)
        # print('Fetched', len(ohlcv), symbol, 'candles from', exchange.iso8601 (ohlcv[0][0]), 'to', exchange.iso8601 (ohlcv[-1][0]))
        return ohlcv
    except Exception:
        if num_retries > max_retries:
            raise  # Exception('Failed to fetch', timeframe, symbol, 'OHLCV in', max_retries, 'attempts')


def scrape_ohlcv(exchange_id, max_retries, symbol, timeframe, since, limit):
    exchange = getattr(ccxt, exchange_id)({
        'enableRateLimit': True,  # required by the Manual
    })
    timeframe_duration_in_seconds = exchange.parse_timeframe(timeframe)
    timeframe_duration_in_ms = timeframe_duration_in_seconds * 1000
    timedelta = limit * timeframe_duration_in_ms
    now = exchange.milliseconds()
    all_ohlcv = []
    fetch_since = since
    while fetch_since < now:
        ohlcv = retry_fetch_ohlcv(exchange_id, max_retries, symbol, timeframe, fetch_since, limit)
        fetch_since = (ohlcv[-1][0] + 1) if len(ohlcv) else (fetch_since + timedelta)
        all_ohlcv = all_ohlcv + ohlcv
        if len(all_ohlcv):
            print(len(all_ohlcv), 'candles in total from', exchange.iso8601(all_ohlcv[0][0]), 'to', exchange.iso8601(all_ohlcv[-1][0]))
        else:
            print(len(all_ohlcv), 'candles in total from', exchange.iso8601(fetch_since))
    return exchange.filter_by_since_limit(all_ohlcv, since, None, key=0)

def scrape_candles_to_csv(filename, exchange_id, max_retries, symbol, timeframe, since, limit):
    # instantiate the exchange by id
    exchange = getattr(ccxt, exchange_id)({
        'enableRateLimit': True,  # required by the Manual
    })
    # convert since from string to milliseconds integer if needed
    if isinstance(since, str):
        since = exchange.parse8601(since)
    # preload all markets from the exchange
    exchange.load_markets()
    # fetch all candles
    ohlcv = scrape_ohlcv(exchange, max_retries, symbol, timeframe, since, limit)
    # save them to csv file
    write_to_csv(filename, ohlcv)
    print('Saved', len(ohlcv), f'candles for {exchange_id}, {symbol}, {timeframe} from', exchange.iso8601(ohlcv[0][0]), 'to', exchange.iso8601(ohlcv[-1][0]), 'to', filename)


def scrape_candles_to_db(exchange_id, symbol, timeframe, since, to=None, max_retries=3, limit=100):
    # instantiate the exchange by id
    exchange = getattr(ccxt, exchange_id)({
        'enableRateLimit': True,  # required by the Manual
    })
    # convert since from string to milliseconds integer if needed
    if isinstance(since, str):
        since = exchange.parse8601(since)
    # preload all markets from the exchange
    exchange.load_markets()
    # fetch all candles
    ohlcv = scrape_ohlcv(exchange_id, max_retries, symbol, timeframe, since, limit)
    # ohlcv = ohlcv[0:-1]
    if len(ohlcv) > 0:
        df = pd.DataFrame(ohlcv)
        df.columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
        df['date'] = pd.to_datetime(df['timestamp'], unit='ms', utc=False)
        df = df[['timestamp', 'date', 'open', 'high', 'low', 'close', 'volume']]

        if not to is None:
            df = df[df.timestamp <= to]

        db = mongo_client[exchange_id]
        collection = db[f'{symbol}-{timeframe}']
        collection.delete_many({'timestamp': {'$gte': since}})
        collection.insert_many(df.to_dict("records"))

    print(f'Saved to DB {len(ohlcv)} candles for {exchange_id}, {symbol}, {timeframe}, {exchange.iso8601(ohlcv[0][0])} to {exchange.iso8601(ohlcv[-1][0])}')


In [4]:
tframe2msec = {
  '1m': 1000 * 60 * 1,
  '5m': 1000 * 60 * 5,
  '15m': 1000 * 60 * 15,
  '30m': 1000 * 60 * 30,
  '1h': 1000 * 60 * 60,
  '4h': 1000 * 60 * 60 * 4,
  '8h': 1000 * 60 * 60 * 8,
  '1d': 1000 * 60 * 60 * 24,
}

def get_now_btimestamp(timeframe):
  delta = tframe2msec[timeframe]
  now_stamp = int(datetime.now().timestamp() * 1000)
  now_bstamp = divmod(now_stamp, delta)[0] * delta
  return now_bstamp


def get_latest_to_db(exchange_id, symbol, timeframe):
  db = mongo_client[exchange_id]
  collection_name = f'{symbol}-{timeframe}'
  delta = tframe2msec[timeframe]
  now_bstamp = get_now_btimestamp(timeframe) 
  collection = db[collection_name]
  res = list(collection.find({'timestamp': {'$lt' : now_bstamp}}).sort([('timestamp', -1)]))
  since = now_bstamp
  if (len(res) == 0):
    since = now_bstamp - 5 * delta
  elif (res[0]['timestamp'] < now_bstamp - delta):
    since = res[0]['timestamp'] - delta

  if (since < now_bstamp):
    print('Scraping to db:', exchange_id, symbol, timeframe)
    scrape_candles_to_db(exchange_id, symbol, timeframe, since, to=now_bstamp-delta)


In [5]:

exchange_id = 'kucoinfutures'
symbol = 'ADAUSDTM'
timeframe = '5m'

get_latest_to_db(exchange_id, symbol, timeframe)

In [None]:
##################################
# update latest candles from exchange
exchange_ids = ['kucoinfutures']
symbols = ['XBTUSDTM', 'DOTUSDTM', 'ADAUSDTM', 'ALGOUSDTM', 'SOLUSDTM']
timeframes = ['5m', '1h', '8h', '1d']

def check_N_update():
  for exchange_id in exchange_ids:
    for symbol in symbols:
      for timeframe in timeframes:
        get_latest_to_db(exchange_id, symbol, timeframe)

schedule.every(5).seconds.do(check_N_update)
    
while True:
    schedule.run_pending()
    time.sleep(10)




In [6]:
import imp
dyn_updater_module_name = 'kline_dyn_updater'
updmodule = imp.new_module(dyn_updater_module_name)
# updmodule
sys.modules[dyn_updater_module_name] = updmodule
# sys.modules.setdefault('kline_dyn_updater', updmodule)

import kline_dyn_updater

kline_dyn_updater
     

<module 'kline_dyn_updater'>

In [7]:
import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.ERROR)


from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    # 'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 5
}


def get_latest_to_db_factory(exchange_id, symbol, timeframe):
  def f():
    get_latest_to_db(exchange_id, symbol, timeframe)
  return f


In [8]:

exchange_ids = ['kucoinfutures']
symbols = ['XBTUSDTM', 'DOTUSDTM', 'ADAUSDTM', 'ALGOUSDTM', 'SOLUSDTM']
timeframes = ['5m', '1h', '4h', '8h', '1d']


logging.getLogger('apscheduler').setLevel(logging.ERROR)

apscheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

for exchange_id in exchange_ids:
  for symbol in symbols:
    for timeframe in timeframes:
      method_name = f'{exchange_id}_{symbol}_{timeframe}'
      # print(f'->{method_name}')
      setattr(kline_dyn_updater, method_name, get_latest_to_db_factory(exchange_id, symbol, timeframe))
      apscheduler.add_job(f'kline_dyn_updater:{method_name}', 'interval', seconds=5, id=method_name)
      

if os.path.exists("jobs.sqlite"): os.remove("jobs.sqlite")
apscheduler.start()
# scheduler.shutdown()


ConflictingIdError: 'Job identifier (kucoinfutures_XBTUSDTM_5m) conflicts with an existing job'

In [None]:

logging.getLogger('apscheduler').setLevel(logging.INFO)
apscheduler.start()

In [None]:
# apscheduler.shutdown()
# apscheduler.shutdown(wait=False)

apscheduler.remove_all_jobs()
if os.path.exists("jobs.sqlite"): os.remove("jobs.sqlite")

In [None]:
# apscheduler.remove_all_jobs()
# apscheduler.get_jobs()
# for job in apscheduler.get_jobs(): 
