Permalink
Browse files

Cleanup of daemon package:

* remove unuused imports other `pyflakes` fixes
* better logging: streamhandler, syslog
* neater handling of "global" data. no more global `DIR_PATH`
* don't bail out on non-200 status. log and return empty results
* use config directory `~/.coincharts` storing api auth and history symbols
  • Loading branch information...
stnbu committed Sep 1, 2018
1 parent 3991d60 commit 94026e142d083f7969acc60760b66bff355aa101
Showing with 79 additions and 33 deletions.
  1. +79 −33 coincharts/daemon/base.py
@@ -5,29 +5,33 @@
import os
import sys
# FIXME
# beautiful HACK until I get some stuff figured out.
sys.path.insert(0, '../mutils')
import datetime
import pytz
from dateutil.parser import parse as parse_dt
import json
import urllib
import requests
import logging
import logging.handlers
import time
import yaml
from sqlalchemy import Integer, String, Float
import daemon
import daemon.pidfile
from mutil import simple_alchemy
from mutils import simple_alchemy
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
DIR_PATH = None
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
CONFIG_DIR = os.path.expanduser('~/.coincharts')
API_KEY_FILE = os.path.join(CONFIG_DIR, 'API_KEY')
CONFIG_FILE = os.path.join(CONFIG_DIR, 'config.yaml')
class PriceSeries(object):
@@ -40,7 +44,7 @@ class PriceSeries(object):
)
date_format_template = '%Y-%m-%dT%H:%M:%S.%f0Z' # magic
headers = {'X-CoinAPI-Key':
open('API_KEY').read().strip()}
open(API_KEY_FILE).read().strip()}
schema = [
('time_period_start', 'TEXT'),
('time_period_end', 'TEXT'),
@@ -63,11 +67,12 @@ class PriceSeries(object):
_session = None
# the sqlalchemy is a class singleton, so many symbols can share a connection.
@classmethod
def get_db_session(cls):
def get_db_session(cls, dir_path):
if cls._session is not None:
return cls._session
db_path = os.path.join(os.path.abspath(DIR_PATH), 'db.sqlite3')
db_path = os.path.join(os.path.abspath(dir_path), 'db.sqlite3')
cls._session = simple_alchemy.get_session(db_path)
return cls._session
@@ -92,9 +97,10 @@ def round_up_hour(cls, dt):
dt = dt.replace(minute=0, second=0, microsecond=0)
return dt
def __init__(self, symbol_id):
def __init__(self, symbol_id, dir_path):
logger.debug('creating PriceSeries object for {}'.format(symbol_id))
self.symbol_id = symbol_id
self.dir_path = dir_path
schema = [
('time_period_start', String),
('time_period_end', String),
@@ -118,11 +124,11 @@ def get_prices_since(self, start_dt):
start_dt = parse_dt(start_dt)
except TypeError:
pass
start_dt = self.get_normalized_datetime(self.round_up(start_dt))
start_dt = self.get_normalized_datetime(self.round_up_hour(start_dt))
kwargs = {self.TIME: start_dt}
session = self.get_db_session()
session = self.get_db_session(self.dir_path)
first = session.query(self.data).filter_by(**kwargs).first()
results = session.query(self.data).filter(id >= start_dt).first()
results = session.query(self.data).filter(id >= first).first()
return results
def get_url(self, query_data):
@@ -162,12 +168,15 @@ def fetch(self):
url = self.get_url(query_data)
logger.debug('getting url {}'.format(url))
response = requests.get(url, headers=self.headers)
if response.status_code != 200:
logger.error('request {} failed: {}'.format(url, response.reason))
return {}
logger.info('account has {} more API requests for this time period'.format(
response.headers['X-RateLimit-Remaining']))
return response.json()
def get_last_date_from_store(self):
session = self.get_db_session()
session = self.get_db_session(self.dir_path)
obj = session.query(self.data).order_by(self.data.id.desc()).first()
if obj is None:
return parse_dt(self.first_date)
@@ -179,7 +188,7 @@ def insert(self, data):
insertions = []
for row in data:
insertions.append(self.data(**row))
session = self.get_db_session()
session = self.get_db_session(self.dir_path)
session.add_all(insertions)
session.commit()
@@ -188,29 +197,24 @@ def update(self):
self.insert(data)
def main(dir_path, daemon=True):
global DIR_PATH
DIR_PATH = dir_path
def worker(dir_path, daemonize=True):
fh = logging.FileHandler(os.path.join(dir_path, 'logs'))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
symbols = [
'BITSTAMP_SPOT_BTC_USD',
'BITSTAMP_SPOT_XRP_USD',
'BITSTAMP_SPOT_ETH_USD',
'BITSTAMP_SPOT_LTC_USD',
'BITSTAMP_SPOT_EUR_USD',
'BITSTAMP_SPOT_BCH_USD'
]
sh = logging.handlers.SysLogHandler(address='/var/run/syslog')
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
with open(CONFIG_FILE) as f:
config = yaml.load(f)
series = []
for symbol_id in symbols:
series.append(PriceSeries(symbol_id))
for symbol_id in config['history_symbols']:
series.append(PriceSeries(symbol_id, dir_path))
while True:
for ps in series:
@@ -219,8 +223,50 @@ def main(dir_path, daemon=True):
time.sleep(3600)
if __name__ == '__main__':
def main():
script_basename, _ = os.path.splitext(os.path.basename(__file__))
# no need to involve argparse for something this simple
if len(sys.argv) == 1:
print('usage: {} [--daemon] <directory>'.format(script_basename))
sys.exit(1)
from mutils.simple_daemon import daemonize
daemonize = True
if '--daemon' in sys.argv:
script_name, _, dir_path = sys.argv
else:
script_name, dir_path = sys.argv
daemonize = False
dir_path = os.path.abspath(dir_path)
if daemonize:
# when I'm a daemon, log all exceptions
def exception_handler(type_, value, tb):
logger.exception('uncaught exception on line {}; {}: {}'.format(
tb.tb_lineno,
type_.__name__,
value,
))
sys.__excepthook__(type_, value, tb)
sys.excepthook = exception_handler
logger.debug('starting daemon {} using path {}'.format(script_name, dir_path))
if daemonize:
pid_file = os.path.join(dir_path, script_basename + '.pid')
with daemon.DaemonContext(
working_directory=dir_path,
pidfile=daemon.pidfile.PIDLockFile(pid_file),
):
worker(dir_path, daemonize=daemonize)
else:
worker(dir_path, daemonize=daemonize)
if __name__ == '__main__':
daemonize(main)
main()

0 comments on commit 94026e1

Please sign in to comment.