Skip to content
This repository has been archived by the owner on Nov 26, 2022. It is now read-only.

Commit

Permalink
WIP: Generic Data Bundles
Browse files Browse the repository at this point in the history
  * Split up inheritance structure of Bundle classes
  • Loading branch information
cfromknecht committed Jul 13, 2017
1 parent fa6b07c commit b82f719
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 226 deletions.
Expand Up @@ -30,10 +30,7 @@
logbook.StderrHandler().push_application()
log = logbook.Logger(__name__)

class AbstractBundle(object):
def __init__(self):
pass

class BaseBundle(object):
@lazyval
def name(self):
raise NotImplementedError()
Expand Down
56 changes: 56 additions & 0 deletions catalyst/data/bundles/base_pricing.py
@@ -0,0 +1,56 @@
#
# Copyright 2017 Enigma MPC, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from catalyst.data.bundles.bundle import BaseBundle
from catalyst.utils.memoize import lazyval

class BasePricingBundle(BaseBundle):
@lazyval
def md_dtypes(self):
return [
('symbol', 'object'),
('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('ac_date', 'datetime64[ns]'),
]

@lazyval
def dtypes(self):
return [
('date', 'datetime64[ns]'),
('open', 'float64'),
('high', 'float64'),
('low', 'float64'),
('close', 'float64'),
('volume', 'float64'),
]

class BaseCryptoPricingBundle(BasePricingBundle):
@lazyval
def calendar_name(self):
return 'OPEN'

@lazyval
def minutes_per_day(self):
return 1440

class BaseEquityPricingBundle(BasePricingBundle):
@lazyval
def calendar_name(self):
return 'NYSE'

@lazyval
def minutes_per_day(self):
return 390
189 changes: 154 additions & 35 deletions catalyst/data/bundles/poloniex.py
@@ -1,37 +1,156 @@
import tarfile

from . import core as bundles

POLONIEX_BUNDLE_URL = (
'https://www.dropbox.com/s/9naqffawnq8o4r2/poloniex-bundle.tar?dl=1'
)

#@bundles.register(
# 'poloniex',
# create_writers=False,
# calendar_name='OPEN',
# minutes_per_day=1440)
def poloniex_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir):
if show_progress:
data = bundles.download_with_progress(
POLONIEX_BUNDLE_URL,
chunk_size=bundles.ONE_MEGABYTE,
label="Downloading Bundle: poloniex",
#
# Copyright 2017 Enigma MPC, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime

import pandas as pd

from six.moves.urllib.parse import urlencode

from catalyst.data.bundles.core import register_bundle
from catalyst.data.bundles.pricing_bundle import BaseCryptoPricingBundle
from catalyst.utils.memoize import lazyval

class PoloniexBundle(BaseCryptoPricingBundle):
@lazyval
def name(self):
return 'poloniex'

@lazyval
def exchange(self):
return 'POLO'

@lazyval
def frequencies(self):
return set((
'daily',
))

@lazyval
def tar_url(self):
return (
'https://www.dropbox.com/s/9naqffawnq8o4r2/'
'poloniex-bundle.tar?dl=1'
)

@lazyval
def wait_time(self):
return pd.Timedelta(milliseconds=170)

def fetch_raw_metadata_frame(self, api_key, page_number):
if page_number > 1:
return pd.DataFrame([])

raw = pd.read_json(
self._format_metadata_url(
api_key,
page_number,
),
orient='index',
)

raw = raw.sort_index().reset_index()
raw.rename(
columns={'index':'symbol'},
inplace=True,
)

return raw

def post_process_symbol_metadata(self, metadata, data):
start_date = data.index[0].tz_localize(None)
end_date = data.index[-1].tz_localize(None)
ac_date = end_date + pd.Timedelta(days=1)

return (
metadata.symbol,
start_date,
end_date,
ac_date,
)

def fetch_raw_symbol_frame(self,
api_key,
symbol,
start_date,
end_date,
frequency):
raw = pd.read_json(
self._format_data_url(
api_key,
symbol,
start_date,
end_date,
frequency,
),
orient='records',
)

raw.set_index('date', inplace=True)

scale = 1000.0
raw.loc[:, 'open'] /= scale
raw.loc[:, 'high'] /= scale
raw.loc[:, 'low'] /= scale
raw.loc[:, 'close'] /= scale
raw.loc[:, 'volume'] *= scale

return raw

'''
HELPER METHODS
'''

def _format_metadata_url(self, api_key, page_number):
query_params = [
('command', 'returnTicker'),
]

return self._format_polo_query(query_params)


def _format_data_url(self,
api_key,
symbol,
start_date,
end_date,
data_frequency):
period_map = {
'daily': 86400,
'5-minute': 300,
'minute': 60,
}

try:
period = period_map[data_frequency]
except KeyError:
return None

query_params = [
('command', 'returnChartData'),
('currencyPair', symbol),
('start', start_date.value / 10**9),
('end', end_date.value / 10**9),
('period', period),
]

return self._format_polo_query(query_params)

def _format_polo_query(self, query_params):
return 'https://poloniex.com/public?{query}'.format(
query=urlencode(query_params),
)
else:
data = bundles.download_without_progress(POLONIEX_BUNDLE_URL)

with tarfile.open('r', fileobj=data) as tar:
if show_progress:
print("Writing data to %s." % output_dir)
tar.extractall(output_dir)
register_bundle(PoloniexBundle)

0 comments on commit b82f719

Please sign in to comment.