@@ -14,11 +14,10 @@
import logging .handlers
import time
from sqlalchemy import Integer , String , Float
import daemon
import daemon .pidfile
from coincharts import schema , config
from coincharts import config , db
# We're replacing the module with a dict. Importing the file shouldn't result in reading from disk, etc. That's why.
config = config .get_config ()
@@ -45,17 +44,6 @@ class PriceSeries(object):
# this is the beginning of time if we don't have any local data
first_date = '2018-01-09T00:00:00.0000000Z'
_session = None
# the sqlalchemy is a class singleton, so many symbols can share a connection.
@classmethod
def get_db_session (cls , dir_path ):
if cls ._session is not None :
return cls ._session
db_path = os .path .join (os .path .abspath (dir_path ), 'db.sqlite3' )
cls ._session = schema .get_sqlalchemy_session (db_path )
return cls ._session
@classmethod
def validate_datetime_object (cls , dt ):
assert dt .tzname () == 'UTC' , 'tzname==`{}`. Expected `UTC`' .format (dt .tzname ())
@@ -77,30 +65,12 @@ def round_up_hour(cls, dt):
dt = dt .replace (minute = 0 , second = 0 , microsecond = 0 )
return dt
def __init__ (self , symbol_id , dir_path ):
logger .debug ('creating PriceSeries object for {}' .format (symbol_id ))
self .symbol_id = symbol_id
def __init__ (self , symbol_ids , dir_path ):
self .symbol_ids = symbol_ids
self .dir_path = dir_path
self .data = schema .get_db_table (symbol_id , 'sqlalchemy' )
def get_prices_since (self , start_dt ):
"""Get prices for this PriceSeries where datetime is greater or equal to `start_dt`
"""
# Since sqlite does not have native dates/times, we get the id for the row containing
# date (string) `start_dt` and then do a second SQL query for rows with that ID or greater.
try :
start_dt = parse_dt (start_dt )
except TypeError :
pass
start_dt = self .get_normalized_datetime (self .round_up_hour (start_dt ))
kwargs = {schema .datetime_field : start_dt }
session = self .get_db_session (self .dir_path )
first = session .query (self .data ).filter_by (** kwargs ).first ()
results = session .query (self .data ).filter (id >= first ).first ()
return results
def get_url (self , query_data ):
url_1 = ('https' , 'rest.coinapi.io/v1' , 'ohlcv/{}/history' .format (self .symbol_id ))
def get_url (self , symbol_id , query_data ):
url_beginning = ('https' , 'rest.coinapi.io/v1' , 'ohlcv/{}/history' .format (symbol_id ))
query = []
for key , value in query_data .items ():
if not value :
@@ -110,17 +80,17 @@ def get_url(self, query_data):
value = self .get_normalized_datetime (value )
query .append ('{}={}' .format (key , value ))
query = '&' .join (query )
url_2 = ('' , query , '' )
url = urllib .parse .urlunparse (url_1 + url_2 )
url_end = ('' , query , '' )
url = urllib .parse .urlunparse (url_beginning + url_end )
return url
def fetch (self ):
last_date = self .get_last_date_from_store ()
def fetch (self , symbol_id ):
last_date = self .get_last_date_from_store (symbol_id )
if last_date is None :
logger .debug ('last date for {} not found. using default of {}' .format (self . symbol_id , self .first_date ))
logger .debug ('last date for {} not found. using default of {}' .format (symbol_id , self .first_date ))
last_date = parse_dt (self .first_date )
else :
logger .debug ('date of last record for {} is {}' .format (self . symbol_id , last_date ))
logger .debug ('date of last record for {} is {}' .format (symbol_id , last_date ))
self .validate_datetime_object (last_date )
now = datetime .datetime .now (tz = pytz .UTC )
@@ -133,36 +103,37 @@ def fetch(self):
query_data = dict (self .query_template )
query_data ['time_start' ] = first_fetch_date
query_data ['limit' ] = 1500 # just over one year of records @6hrs
url = self .get_url (query_data )
url = self .get_url (symbol_id , query_data )
logger .debug ('getting url {}' .format (url ))
response = requests .get (url , headers = self .headers )
if response .status_code != 200 :
logger .error ('request {} failed: {}' .format (url , response .reason ))
return {}
logger .info ('account has {} more API requests for this time period' .format (
response .headers ['X-RateLimit-Remaining' ]))
return response .json ()
def get_last_date_from_store ( self ):
session = self . get_db_session ( self . dir_path )
obj = session . query ( self . data ). order_by ( self . data . id . desc ()). first ()
if obj is None :
return parse_dt ( self . first_date )
dt = getattr (obj , schema . datetime_field )
data = response .json ()
# validate the FIRST date from the data returned. Not perfect, but will prevent future heartache.
self . validate_datetime_object ( data [ 0 ][ 'time_period_end' ])
return data
def get_last_date_from_store ( self , symbol_id ) :
obj = db . Prices . objects . filter ( symbol_id = symbol_id ). order_by ( 'id' ). latest ( )
dt = getattr (obj , 'time_period_end' )
return parse_dt (dt )
def insert (self , data ):
logger .debug ('inserting {} records into table {}' .format (len (data ), self . symbol_id ))
def insert (self , symbol_id , data ):
logger .debug ('inserting {} records for symbol_id {}' .format (len (data ), symbol_id ))
insertions = []
for row in data :
insertions .append (self .data (** row ))
session = self .get_db_session (self .dir_path )
session .add_all (insertions )
session .commit ()
insertions .append (db .Prices (symbol_id = symbol_id , ** row ))
# `.save()` done by django orm after `bulk_create`
db .Prices .objects .bulk_create (insertions )
def update (self ):
data = self .fetch ()
self .insert (data )
# TODO: probably opportunities for parallelization
for symbol_id in self .symbol_ids :
data = self .fetch (symbol_id )
self .insert (symbol_id , data )
def worker (dir_path , daemonize = True ):
@@ -173,17 +144,15 @@ def worker(dir_path, daemonize=True):
fh .setLevel (logging .DEBUG )
logger .addHandler (fh )
# TODO: SysLogHandler will not complain if socket not present. What do?
sh = logging .handlers .SysLogHandler (address = '/var/run/syslog' )
sh .setLevel (logging .DEBUG )
logger .addHandler (sh )
series = []
for symbol_id in config ['history_symbols' ]:
series .append (PriceSeries (symbol_id , dir_path ))
series = PriceSeries (config ['history_symbols' ], dir_path )
while True :
for ps in series :
ps .update ()
series .update ()
logger .info ('sleeping for 3600s' )
time .sleep (3600 )
@@ -221,7 +190,6 @@ def exception_handler(type_, value, tb):
if daemonize :
pid_file = os .path .join (dir_path , script_basename + '.pid' )
with daemon .DaemonContext (
working_directory = dir_path ,
pidfile = daemon .pidfile .PIDLockFile (pid_file ),
@@ -233,5 +201,4 @@ def exception_handler(type_, value, tb):
if __name__ == '__main__' :
main ()