Skip to content

Commit

Permalink
Merge branch 'master' into add-concat-flag
Browse files Browse the repository at this point in the history
  • Loading branch information
AdrianTeng committed May 31, 2017
2 parents 4505920 + 5621ce4 commit b52c8ab
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 77 deletions.
9 changes: 7 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
## Changelog

### 1.43
### 1.44
* Feature: Expose compressHC from internal arctic LZ4 and remove external LZ4 dependency
* Feature: Appending older data (compare to what's exist in library) will raise. Use `concat=True` to append only the
new bits
* Bugfix: #350 remove deprecated pandas calls

### 1.43 (2017-05-30)
* Bugfix: #350 remove deprecated pandas calls
* Bugfix: #360 version incorrect in empty append in VersionStore
* Feature: #365 add generic BSON store

### 1.42 (2017-05-12)
* Bugfix: #346 fixed daterange subsetting error on very large dataframes in version store
Expand Down
35 changes: 14 additions & 21 deletions arctic/_compression.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
import lz4
import os
import logging
from . import _compress as clz4


logger = logging.getLogger(__name__)

# switch to parallel LZ4 compress (and potentially other parallel stuff), Default True
ENABLE_PARALLEL = not os.environ.get('DISABLE_PARALLEL')
LZ4_N_PARALLEL = 50 # No. of elements to use parellel compression in LZ4 mode

try:
from . import _compress as clz4
except ImportError:
logger.warning("Couldn't import cython lz4")
import lz4 as clz4
ENABLE_PARALLEL = False


def enable_parallel_lz4(mode):
"""
Expand All @@ -33,12 +27,9 @@ def enable_parallel_lz4(mode):
def compress_array(str_list):
"""
Compress an array of strings
By default LZ4 mode is standard in interactive mode,
and high compresion in applications/scripts
"""
if not ENABLE_PARALLEL:
return [lz4.compress(s) for s in str_list]
return [clz4.compress(s) for s in str_list]

# Less than 50 chunks its quicker to compress sequentially..
if len(str_list) > LZ4_N_PARALLEL:
Expand All @@ -47,27 +38,21 @@ def compress_array(str_list):
return [clz4.compress(s) for s in str_list]


def _get_lib():
if ENABLE_PARALLEL:
return clz4
return lz4


def compress(_str):
"""
Compress a string
By default LZ4 mode is standard in interactive mode,
and high compresion in applications/scripts
"""
return _get_lib().compress(_str)
return clz4.compress(_str)


def decompress(_str):
"""
Decompress a string
"""
return _get_lib().decompress(_str)
return clz4.decompress(_str)


def decompress_array(str_list):
Expand All @@ -76,4 +61,12 @@ def decompress_array(str_list):
"""
if ENABLE_PARALLEL:
return clz4.decompressarr(str_list)
return [lz4.decompress(chunk) for chunk in str_list]
return [clz4.decompress(chunk) for chunk in str_list]


def compressHC(_str):
"""
HC compression
"""
return clz4.compressHC(_str)

5 changes: 3 additions & 2 deletions arctic/arctic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .decorators import mongo_retry
from .exceptions import LibraryNotFoundException, ArcticException, QuotaExceededException
from .hooks import get_mongodb_uri
from .store import version_store
from .store import version_store, bson_store
from .tickstore import tickstore, toplevel
from .chunkstore import chunkstore
from six import string_types
Expand All @@ -27,7 +27,8 @@
LIBRARY_TYPES = {version_store.VERSION_STORE_TYPE: version_store.VersionStore,
tickstore.TICK_STORE_TYPE: tickstore.TickStore,
toplevel.TICK_STORE_TYPE: toplevel.TopLevelTickStore,
chunkstore.CHUNK_STORE_TYPE: chunkstore.ChunkStore
chunkstore.CHUNK_STORE_TYPE: chunkstore.ChunkStore,
bson_store.BSON_STORE_TYPE: bson_store.BSONStore
}


Expand Down
107 changes: 107 additions & 0 deletions arctic/store/bson_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import logging
from pymongo.errors import OperationFailure
from ..decorators import mongo_retry
from .._util import enable_sharding

logger = logging.getLogger(__name__)

BSON_STORE_TYPE = 'BSONStore'

class BSONStore(object):
"""
BSON Data Store. This stores any Python object that encodes to BSON correctly,
and offers a vanilla pymongo interface. Note that strings myst be valid UTF-8.
See: https://api.mongodb.com/python/3.4.0/api/bson/index.html
Note that this neither defines nor ensures any indices, they are left to the user
to create and manage according to the effective business schema applicable to their data.
Likewise, _id is left to the user to populate if they wish, and is exposed in documents. As
is normally the case with pymongo, _id is set to unique ObjectId if left unspecified at
document insert time.
"""

def __init__(self, arctic_lib):
self._arctic_lib = arctic_lib
self._collection = self._arctic_lib.get_top_level_collection()

@classmethod
def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
logger.info("Trying to enable sharding...")
try:
enable_sharding(arctic_lib.arctic, arctic_lib.get_name(), hashed=hashed)
except OperationFailure as exception:
logger.warning(("Library created, but couldn't enable sharding: "
"%s. This is OK if you're not 'admin'"), exception)

@mongo_retry
def stats(self):
"""
Store stats, necessary for quota to work.
"""
res = {}
db = self._collection.database
res['dbstats'] = db.command('dbstats')
res['data'] = db.command('collstats', self._collection.name)
res['totals'] = {'count': res['data']['count'],
'size': res['data']['size']}
return res

@mongo_retry
def find(self, *args, **kwargs):
"""
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.find
"""
return self._collection.find(*args, **kwargs)

@mongo_retry
def insert_one(self, value):
"""
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.insert_one
"""
self._arctic_lib.check_quota()
return self._collection.insert_one(value)

@mongo_retry
def insert_many(self, values):
"""
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.insert_many
"""
self._arctic_lib.check_quota()
return self._collection.insert_many(values)

@mongo_retry
def delete_one(self, query):
"""
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.delete_one
"""
return self._collection.delete_one(query)

@mongo_retry
def delete_many(self, query):
"""
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.delete_many
"""
return self._collection.delete_many(query)

@mongo_retry
def create_index(self, keys, **kwargs):
"""
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.create_index
"""
return self._collection.create_index(keys, **kwargs)

@mongo_retry
def drop_index(self, index_or_name):
"""
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.drop_index
"""
return self._collection.drop_index(index_or_name)

@mongo_retry
def index_information(self):
"""
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.index_information
"""
return self._collection.index_information()
2 changes: 1 addition & 1 deletion arctic/store/version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser
sort=[('version', pymongo.DESCENDING)])

if len(data) == 0 and previous_version is not None:
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=previous_version,
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=previous_version['version'],
metadata=version.pop('metadata', None), data=None)

if upsert and previous_version is None:
Expand Down
25 changes: 12 additions & 13 deletions arctic/tickstore/tickstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from bson.binary import Binary
import copy
from datetime import datetime as dt, timedelta
import lz4
import numpy as np
import pandas as pd
from pandas.core.frame import _arrays_to_mgr
Expand All @@ -17,7 +16,7 @@
from ..decorators import mongo_retry
from ..exceptions import OverlappingDataException, NoDataFoundException, UnorderedDataException, UnhandledDtypeException, ArcticException
from .._util import indent

from arctic._compression import compress, compressHC, decompress

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -413,7 +412,7 @@ def _read_bucket(self, doc, column_set, column_dtypes, include_symbol, include_i
rtn = {}
if doc[VERSION] != 3:
raise ArcticException("Unhandled document version: %s" % doc[VERSION])
rtn[INDEX] = np.cumsum(np.fromstring(lz4.decompress(doc[INDEX]), dtype='uint64'))
rtn[INDEX] = np.cumsum(np.fromstring(decompress(doc[INDEX]), dtype='uint64'))
doc_length = len(rtn[INDEX])
column_set.update(doc[COLUMNS].keys())

Expand All @@ -422,7 +421,7 @@ def _read_bucket(self, doc, column_set, column_dtypes, include_symbol, include_i
for c in column_set:
try:
coldata = doc[COLUMNS][c]
mask = np.fromstring(lz4.decompress(coldata[ROWMASK]), dtype='uint8')
mask = np.fromstring(decompress(coldata[ROWMASK]), dtype='uint8')
union_mask = union_mask | mask
except KeyError:
rtn[c] = None
Expand All @@ -438,11 +437,11 @@ def _read_bucket(self, doc, column_set, column_dtypes, include_symbol, include_i
try:
coldata = doc[COLUMNS][c]
dtype = np.dtype(coldata[DTYPE])
values = np.fromstring(lz4.decompress(coldata[DATA]), dtype=dtype)
values = np.fromstring(decompress(coldata[DATA]), dtype=dtype)
self._set_or_promote_dtype(column_dtypes, c, dtype)
rtn[c] = self._empty(rtn_length, dtype=column_dtypes[c])
rowmask = np.unpackbits(np.fromstring(lz4.decompress(coldata[ROWMASK]),
dtype='uint8'))[:doc_length].astype('bool')
rowmask = np.unpackbits(np.fromstring(decompress(coldata[ROWMASK]),
dtype='uint8'))[:doc_length].astype('bool')
rowmask = rowmask[union_mask]
rtn[c][rowmask] = values
except KeyError:
Expand Down Expand Up @@ -644,18 +643,18 @@ def _pandas_to_bucket(df, symbol, initial_image):
rtn[START] = start

logger.warning("NB treating all values as 'exists' - no longer sparse")
rowmask = Binary(lz4.compressHC(np.packbits(np.ones(len(df), dtype='uint8'))))
rowmask = Binary(compressHC(np.packbits(np.ones(len(df), dtype='uint8')).tostring()))

index_name = df.index.names[0] or "index"
recs = df.to_records(convert_datetime64=False)
for col in df:
array = TickStore._ensure_supported_dtypes(recs[col])
col_data = {}
col_data[DATA] = Binary(lz4.compressHC(array.tostring()))
col_data[DATA] = Binary(compressHC(array.tostring()))
col_data[ROWMASK] = rowmask
col_data[DTYPE] = TickStore._str_dtype(array.dtype)
rtn[COLUMNS][col] = col_data
rtn[INDEX] = Binary(lz4.compressHC(np.concatenate(([recs[index_name][0].astype('datetime64[ms]').view('uint64')],
rtn[INDEX] = Binary(compressHC(np.concatenate(([recs[index_name][0].astype('datetime64[ms]').view('uint64')],
np.diff(recs[index_name].astype('datetime64[ms]').view('uint64')))).tostring()))
return rtn, final_image

Expand Down Expand Up @@ -686,13 +685,13 @@ def _to_bucket(ticks, symbol, initial_image):
rowmask[k][i] = 1
data[k] = [v]

rowmask = dict([(k, Binary(lz4.compressHC(np.packbits(v).tostring())))
rowmask = dict([(k, Binary(compressHC(np.packbits(v).tostring())))
for k, v in iteritems(rowmask)])
for k, v in iteritems(data):
if k != 'index':
v = np.array(v)
v = TickStore._ensure_supported_dtypes(v)
rtn[COLUMNS][k] = {DATA: Binary(lz4.compressHC(v.tostring())),
rtn[COLUMNS][k] = {DATA: Binary(compressHC(v.tostring())),
DTYPE: TickStore._str_dtype(v.dtype),
ROWMASK: rowmask[k]}

Expand All @@ -705,7 +704,7 @@ def _to_bucket(ticks, symbol, initial_image):
rtn[IMAGE_DOC] = {IMAGE_TIME: image_start, IMAGE: initial_image}
rtn[END] = end
rtn[START] = start
rtn[INDEX] = Binary(lz4.compressHC(np.concatenate(([data['index'][0]], np.diff(data['index']))).tostring()))
rtn[INDEX] = Binary(compressHC(np.concatenate(([data['index'][0]], np.diff(data['index']))).tostring()))
return rtn, final_image

def max_date(self, symbol):
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def run_tests(self):

setup(
name="arctic",
version="1.43.0",
version="1.44.0",
author="Man AHL Technology",
author_email="ManAHLTech@ahl.com",
description=("AHL Research Versioned TimeSeries and Tick store"),
Expand All @@ -94,7 +94,6 @@ def run_tests(self):
],
install_requires=["decorator",
"enum34",
"lz4<=0.8.2",
"mockextras",
"pandas",
"pymongo>=3.0",
Expand All @@ -109,6 +108,7 @@ def run_tests(self):
"pytest-server-fixtures",
"pytest-timeout",
"pytest-xdist",
"lz4"
],
entry_points={'console_scripts': [
'arctic_init_library = arctic.scripts.arctic_init_library:main',
Expand Down
6 changes: 4 additions & 2 deletions tests/integration/store/test_version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,10 @@ def test_append_should_overwrite_after_delete(library):


def test_append_empty_ts(library):
library.append(symbol, ts1, upsert=True)
library.append(symbol, pd.DataFrame(), upsert=True)
data = library.append(symbol, ts1, upsert=True)
assert(data.version == 1)
data = library.append(symbol, pd.DataFrame(), upsert=True)
assert(data.version == 1)
assert len(library.read(symbol).data) == len(ts1)


Expand Down
Loading

0 comments on commit b52c8ab

Please sign in to comment.