Skip to content

Commit

Permalink
Add initial_image as optional parameter on tickstore write() - #98
Browse files Browse the repository at this point in the history
  • Loading branch information
reasto committed Feb 2, 2016
1 parent 517d402 commit e9a1814
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 77 deletions.
127 changes: 86 additions & 41 deletions arctic/tickstore/tickstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pandas.core.frame import _arrays_to_mgr
import pymongo
from pymongo.errors import OperationFailure
import copy

from ..date import DateRange, to_pandas_closed_closed, mktz, datetime_to_ms, CLOSED_CLOSED, to_dt
from ..decorators import mongo_retry
Expand Down Expand Up @@ -78,8 +79,6 @@

class TickStore(object):

chunk_size = 100000

@classmethod
def initialize_library(cls, arctic_lib, **kwargs):
TickStore(arctic_lib)._ensure_index()
Expand All @@ -91,7 +90,16 @@ def _ensure_index(self):
(START, pymongo.ASCENDING)], background=True)
collection.create_index([(START, pymongo.ASCENDING)], background=True)

def __init__(self, arctic_lib):
def __init__(self, arctic_lib, chunk_size=100000):
"""
Parameters
----------
arctic_lib : TickStore
Arctic Library
chunk_size : int
Number of ticks to store in a document before splitting to another document.
if the library was obtained through get_library then set with: self._chuck_size = 10000
"""
self._arctic_lib = arctic_lib

# Do we allow reading from secondaries
Expand All @@ -100,6 +108,8 @@ def __init__(self, arctic_lib):
# The default collections
self._collection = arctic_lib.get_top_level_collection()

self._chunk_size = chunk_size

def __getstate__(self):
return {'arctic_lib': self._arctic_lib}

Expand Down Expand Up @@ -334,7 +344,7 @@ def _set_or_promote_dtype(self, column_dtypes, c, dtype):

def _prepend_image(self, document, im, rtn_length, column_dtypes, column_set, columns):
image = im[IMAGE]
first_dt = im['t']
first_dt = im[DTYPE]
if not first_dt.tzinfo:
first_dt = first_dt.replace(tzinfo=mktz('UTC'))
document[INDEX] = np.insert(document[INDEX], 0, np.uint64(datetime_to_ms(first_dt)))
Expand All @@ -354,7 +364,7 @@ def _prepend_image(self, document, im, rtn_length, column_dtypes, column_set, co
for field in set(document).difference(set(image)):
if field == INDEX:
continue
logger.debug("Field %s is missing from image!", field)
logger.debug("Field %s is missing from image!" % field)
if document[field] is not None:
val = np.nan
document[field] = np.insert(document[field], 0, document[field].dtype.type(val))
Expand Down Expand Up @@ -450,16 +460,21 @@ def _assert_nonoverlapping_data(self, symbol, start, end):
raise OverlappingDataException("Document already exists with start:{} end:{} in the range of our start:{} end:{}".format(
doc[START], doc[END], start, end))

def write(self, symbol, data):
def write(self, symbol, data, initial_image=None):
"""
Writes a list of market data events.
Parameters
----------
symbol : `str`
symbol name for the item
data : list of dicts
data : list of dicts or a panda.DataFrame
List of ticks to store to the tick-store.
if a list of dicts, each dict must contain a 'index' datetime
if a panda.DataFrame the index must be a Timestamp that can be converted to a datetime
initial_image : dict
Dict of the initial image at the start of the document. If this contains a 'index' entry it is
assumed to be the time of the timestamp of the index
"""
pandas = False
# Check for overlapping data
Expand All @@ -475,38 +490,41 @@ def write(self, symbol, data):
self._assert_nonoverlapping_data(symbol, to_dt(start), to_dt(end))

if pandas:
buckets = self._pandas_to_buckets(data, symbol)
buckets = self._pandas_to_buckets(data, symbol, initial_image)
else:
buckets = self._to_buckets(data, symbol)
buckets = self._to_buckets(data, symbol, initial_image)
self._write(buckets)

def _write(self, buckets):
start = dt.now()
mongo_retry(self._collection.insert_many)(buckets)
t = (dt.now() - start).total_seconds()
ticks = len(buckets) * self.chunk_size
print("%d buckets in %s: approx %s ticks/sec" % (len(buckets), t, int(ticks / t)))
ticks = len(buckets) * self._chunk_size

def _pandas_to_buckets(self, x, symbol):
def _pandas_to_buckets(self, x, symbol, initial_image):
rtn = []
for i in range(0, len(x), self.chunk_size):
rtn.append(self._pandas_to_bucket(x[i:i + self.chunk_size], symbol))
for i in range(0, len(x), self._chunk_size):
bucket, initial_image = TickStore._pandas_to_bucket(x[i:i + self._chunk_size], symbol, initial_image)
rtn.append(bucket)
return rtn

def _to_buckets(self, x, symbol):
def _to_buckets(self, x, symbol, initial_image):
rtn = []
for i in range(0, len(x), self.chunk_size):
rtn.append(self._to_bucket(x[i:i + self.chunk_size], symbol))
for i in range(0, len(x), self._chunk_size):
bucket, initial_image = TickStore._to_bucket(x[i:i + self._chunk_size], symbol, initial_image)
rtn.append(bucket)
return rtn

def _to_ms(self, date):
@staticmethod
def _to_ms(date):
if isinstance(date, dt):
if not date.tzinfo:
logger.warning('WARNING: treating naive datetime as London in write path')
logger.warning('WARNING: treating naive datetime as UTC in write path')
return datetime_to_ms(date)
return date

def _str_dtype(self, dtype):
@staticmethod
def _str_dtype(dtype):
"""
Represent dtypes without byte order, as earlier Java tickstore code doesn't support explicit byte order.
"""
Expand All @@ -522,8 +540,8 @@ def _str_dtype(self, dtype):
else:
raise UnhandledDtypeException("Bad dtype '%s'" % dtype)


def _ensure_supported_dtypes(self, array):
@staticmethod
def _ensure_supported_dtypes(array):
# We only support these types for now, as we need to read them in Java
if (array.dtype.kind) == 'i':
array = array.astype('<i8')
Expand All @@ -538,42 +556,68 @@ def _ensure_supported_dtypes(self, array):
array = array.astype(array.dtype.newbyteorder('<'))
return array

def _pandas_to_bucket(self, df, symbol):
start = to_dt(df.index[0].to_datetime())
@staticmethod
def _pandas_compute_final_image(df, image, end):
# Compute the final image with forward fill of df applied to the image
final_image = copy.copy(image)
last_values = df.ffill().tail(1).to_dict()
last_dict = {i: a.values()[0] for i, a in last_values.items()}
final_image.update(last_dict)
final_image['index'] = end
return final_image

@staticmethod
def _pandas_to_bucket(df, symbol, initial_image):
rtn = {SYMBOL: symbol, VERSION: CHUNK_VERSION_NUMBER, COLUMNS: {}, COUNT: len(df)}
end = to_dt(df.index[-1].to_datetime())
rtn = {START: start, END: end, SYMBOL: symbol}
rtn[VERSION] = CHUNK_VERSION_NUMBER
rtn[COUNT] = len(df)
rtn[COLUMNS] = {}
if initial_image :
if 'index' in initial_image:
start = min(to_dt(df.index[0].to_datetime()), initial_image['index'])
else:
start = to_dt(df.index[0].to_datetime())
image_start = initial_image.get('index', start)
image = {k: v for k, v in initial_image.items() if k != 'index'}
rtn[IMAGE_DOC] = {DTYPE: image_start, START: 0, IMAGE: initial_image}
final_image = TickStore._pandas_compute_final_image(df, initial_image, end)
else:
start = to_dt(df.index[0].to_datetime())
final_image = {}
rtn[END] = end
rtn[START] = start

logger.warning("NB treating all values as 'exists' - no longer sparse")
rowmask = Binary(lz4.compressHC(np.packbits(np.ones(len(df), dtype='uint8'))))

recs = df.to_records(convert_datetime64=False)
for col in df:
array = self._ensure_supported_dtypes(recs[col])
array = TickStore._ensure_supported_dtypes(recs[col])
col_data = {}
col_data[DATA] = Binary(lz4.compressHC(array.tostring()))
col_data[ROWMASK] = rowmask
col_data[DTYPE] = self._str_dtype(array.dtype)
col_data[DTYPE] = TickStore._str_dtype(array.dtype)
rtn[COLUMNS][col] = col_data
rtn[INDEX] = Binary(lz4.compressHC(np.concatenate(([recs['index'][0].astype('datetime64[ms]').view('uint64')],
np.diff(recs['index'].astype('datetime64[ms]').view('uint64')))
).tostring()))
return rtn
return rtn, final_image

def _to_bucket(self, ticks, symbol):
@staticmethod
def _to_bucket(ticks, symbol, initial_image):
rtn = {SYMBOL: symbol, VERSION: CHUNK_VERSION_NUMBER, COLUMNS: {}, COUNT: len(ticks)}
data = {}
rowmask = {}
start = to_dt(ticks[0]['index'])
end = to_dt(ticks[-1]['index'])
final_image = copy.copy(initial_image) if initial_image else {}
for i, t in enumerate(ticks):
if initial_image:
final_image.update(t)
for k, v in iteritems(t):
try:
if k != 'index':
rowmask[k][i] = 1
else:
v = self._to_ms(v)
v = TickStore._to_ms(v)
data[k].append(v)
except KeyError:
if k != 'index':
Expand All @@ -583,21 +627,22 @@ def _to_bucket(self, ticks, symbol):

rowmask = dict([(k, Binary(lz4.compressHC(np.packbits(v).tostring())))
for k, v in iteritems(rowmask)])

rtn = {START: start, END: end, SYMBOL: symbol}
rtn[VERSION] = CHUNK_VERSION_NUMBER
rtn[COUNT] = len(ticks)
rtn[COLUMNS] = {}
for k, v in iteritems(data):
if k != 'index':
v = np.array(v)
v = self._ensure_supported_dtypes(v)
v = TickStore._ensure_supported_dtypes(v)
rtn[COLUMNS][k] = {DATA: Binary(lz4.compressHC(v.tostring())),
DTYPE: self._str_dtype(v.dtype),
DTYPE: TickStore._str_dtype(v.dtype),
ROWMASK: rowmask[k]}

if initial_image:
image_start = initial_image.get('index', start)
start = min(start, image_start)
rtn[IMAGE_DOC] = {DTYPE: image_start, START: 0, IMAGE: final_image}
rtn[END] = end
rtn[START] = start
rtn[INDEX] = Binary(lz4.compressHC(np.concatenate(([data['index'][0]], np.diff(data['index']))).tostring()))
return rtn
return rtn, final_image

def max_date(self, symbol):
"""
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/tickstore/test_ts_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_delete(tickstore_lib):
'index': dt(2013, 1, 30, tzinfo=mktz('Europe/London'))
},
]
tickstore_lib.chunk_size = 1
tickstore_lib._chunk_size = 1
tickstore_lib.write('SYM', DUMMY_DATA)
tickstore_lib.delete('SYM')
with pytest.raises(NoDataFoundException):
Expand All @@ -45,7 +45,7 @@ def test_delete_daterange(tickstore_lib):
'index': dt(2013, 2, 1, tzinfo=mktz('Europe/London'))
},
]
tickstore_lib.chunk_size = 1
tickstore_lib._chunk_size = 1
tickstore_lib.write('SYM', DUMMY_DATA)

# Delete with a date-range
Expand Down
16 changes: 8 additions & 8 deletions tests/integration/tickstore/test_ts_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def test_read_all_cols_all_dtypes(tickstore_lib, chunk_size):
'index': dt(1970, 1, 1, 0, 0, 1, tzinfo=mktz('UTC')),
},
]
tickstore_lib.chunk_size = 3
tickstore_lib._chunk_size = chunk_size
tickstore_lib.write('sym', data)
df = tickstore_lib.read('sym', columns=None)

Expand Down Expand Up @@ -167,7 +167,7 @@ def test_date_range(tickstore_lib):
tickstore_lib.delete('SYM')

# Chunk every 3 symbols and lets have some fun
tickstore_lib.chunk_size = 3
tickstore_lib._chunk_size = 3
tickstore_lib.write('SYM', DUMMY_DATA)

with patch.object(tickstore_lib._collection, 'find', side_effect=tickstore_lib._collection.find) as f:
Expand Down Expand Up @@ -222,7 +222,7 @@ def test_date_range_end_not_in_range(tickstore_lib):
},
]

tickstore_lib.chunk_size = 1
tickstore_lib._chunk_size = 1
tickstore_lib.write('SYM', DUMMY_DATA)
with patch.object(tickstore_lib._collection, 'find', side_effect=tickstore_lib._collection.find) as f:
df = tickstore_lib.read('SYM', date_range=DateRange(20130101, dt(2013, 1, 2, 9, 0)), columns=None)
Expand Down Expand Up @@ -251,7 +251,7 @@ def test_date_range_default_timezone(tickstore_lib, tz_name):
]

with patch('arctic.date._mktz.DEFAULT_TIME_ZONE_NAME', tz_name):
tickstore_lib.chunk_size = 1
tickstore_lib._chunk_size = 1
tickstore_lib.write('SYM', DUMMY_DATA)
df = tickstore_lib.read('SYM', date_range=DateRange(20130101, 20130701), columns=None)
assert len(df) == 2
Expand Down Expand Up @@ -281,7 +281,7 @@ def test_date_range_no_bounds(tickstore_lib):
},
]

tickstore_lib.chunk_size = 1
tickstore_lib._chunk_size = 1
tickstore_lib.write('SYM', DUMMY_DATA)

# 1) No start, no end
Expand Down Expand Up @@ -315,7 +315,7 @@ def test_date_range_BST(tickstore_lib):
'index': dt(2013, 6, 1, 13, 00, tzinfo=mktz('Europe/London'))
},
]
tickstore_lib.chunk_size = 1
tickstore_lib._chunk_size = 1
tickstore_lib.write('SYM', DUMMY_DATA)

df = tickstore_lib.read('SYM', columns=None)
Expand Down Expand Up @@ -363,7 +363,7 @@ def test_read_out_of_order(tickstore_lib):
'index': dt(2013, 6, 1, 13, 00, tzinfo=mktz('UTC'))
},
]
tickstore_lib.chunk_size = 3
tickstore_lib._chunk_size = 3
tickstore_lib.write('SYM', DUMMY_DATA)
tickstore_lib.read('SYM', columns=None)
assert len(tickstore_lib.read('SYM', columns=None, date_range=DateRange(dt(2013, 6, 1, tzinfo=mktz('UTC')), dt(2013, 6, 2, tzinfo=mktz('UTC'))))) == 3
Expand All @@ -380,7 +380,7 @@ def test_read_longs(tickstore_lib):
'index': dt(2013, 6, 1, 13, 00, tzinfo=mktz('Europe/London'))
},
]
tickstore_lib.chunk_size = 3
tickstore_lib._chunk_size = 3
tickstore_lib.write('SYM', DUMMY_DATA)
tickstore_lib.read('SYM', columns=None)
read = tickstore_lib.read('SYM', columns=None, date_range=DateRange(dt(2013, 6, 1), dt(2013, 6, 2)))
Expand Down
25 changes: 0 additions & 25 deletions tests/integration/tickstore/test_ts_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,3 @@ def test_ts_write_pandas(tickstore_lib):

read = tickstore_lib.read('SYM', columns=None)
assert_frame_equal(read, data, check_names=False)


def test_to_bucket(tickstore_lib):
bucket = tickstore_lib._to_bucket(DUMMY_DATA, 'SYM')
assert bucket[SYMBOL] == 'SYM'
assert bucket[START] == dt(2013, 1, 1, tzinfo=mktz('Europe/London'))
assert bucket[END] == dt(2013, 7, 5, tzinfo=mktz('Europe/London'))
assert bucket[COUNT] == 5


def test_pandas_to_bucket(tickstore_lib):
df = read_str_as_pandas(""" index | near
2012-09-08 17:06:11 | 1.0
2012-10-08 17:06:11 | 2.0
2012-10-09 17:06:11 | 2.5
2012-11-08 17:06:11 | 3.0""")
df = df.tz_localize('UTC')
bucket = tickstore_lib._pandas_to_bucket(df, 'SYM')
assert bucket[SYMBOL] == 'SYM'
assert bucket[START] == dt(2012, 9, 8, 17, 6, 11, tzinfo=mktz('UTC'))
assert bucket[END] == dt(2012, 11, 8, 17, 6, 11, tzinfo=mktz('UTC'))
assert bucket[COUNT] == 4
assert len(bucket[COLUMNS]) == 1
assert 'near' in bucket[COLUMNS]

Loading

0 comments on commit e9a1814

Please sign in to comment.