Skip to content

Commit

Permalink
Code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoscon committed Aug 5, 2016
1 parent 605ca6d commit 20a4d9b
Showing 1 changed file with 14 additions and 30 deletions.
44 changes: 14 additions & 30 deletions arctic/chunkstore/chunkstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
ROWS = 'r'



class ChunkStore(object):
@classmethod
def initialize_library(cls, arctic_lib, **kwargs):
Expand Down Expand Up @@ -73,12 +72,11 @@ def __str__(self):
def __repr__(self):
return str(self)

def _checksum(self, symbol, doc):
def _checksum(self, doc):
"""
Checksum the passed in dictionary
"""
sha = hashlib.sha1()
sha.update(symbol.encode('ascii'))
sha.update(self.chunker.chunk_to_str(doc[START]).encode('ascii'))
sha.update(self.chunker.chunk_to_str(doc[END]).encode('ascii'))
for k in doc[DATA][COLUMNS]:
Expand Down Expand Up @@ -145,24 +143,11 @@ def rename(self, from_symbol, to_symbol):
if self._get_symbol_info(to_symbol) is not None:
raise Exception('Symbol %s already exists' % (from_symbol))

chunks = []
for x in self._collection.find({SYMBOL: from_symbol}, sort=[(START, pymongo.ASCENDING)],):
chunks.append(x)

bulk = self._collection.initialize_unordered_bulk_op()
for chunk in chunks:
chunk.pop('_id', None)
chunk[SYMBOL] = to_symbol
chunk[SHA] = self._checksum(to_symbol, chunk)
bulk.find({SYMBOL: from_symbol, START: chunk[START], END: chunk[END]},).upsert().update_one({'$set': chunk})
mongo_retry(self._collection.update_many)({SYMBOL: from_symbol},
{'$set': {SYMBOL: to_symbol}})

if len(chunks) > 0:
bulk.execute()
sym[SYMBOL] = to_symbol
sym.pop('_id', None)
mongo_retry(self._symbols.update_one)({SYMBOL: from_symbol},
{'$set': sym},
upsert=True)
{'$set': {SYMBOL: to_symbol}})

def read(self, symbol, chunk_range=None, columns=None, filter_data=True):
"""
Expand Down Expand Up @@ -250,7 +235,7 @@ def write(self, symbol, item, chunk_size):
chunk[START] = start
chunk[END] = end
chunk[SYMBOL] = symbol
chunk[SHA] = self._checksum(symbol, chunk)
chunk[SHA] = self._checksum(chunk)

if chunk[SHA] not in previous_shas:
op = True
Expand Down Expand Up @@ -286,13 +271,12 @@ def __update(self, symbol, item, combine_method=None):
sym = self._get_symbol_info(symbol)
if not sym:
raise NoDataFoundException("Symbol does not exist.")

if sym[TYPE] == 'series' and not isinstance(item, Series):
raise Exception("Cannot combine Series and DataFrame")
if sym[TYPE] == 'dataframe' and not isinstance(item, DataFrame):
raise Exception("Cannot combine DataFrame and Series")


bulk = self._collection.initialize_unordered_bulk_op()
op = False
for start, end, record in self.chunker.to_chunks(item, sym[CHUNK_SIZE]):
Expand All @@ -316,19 +300,19 @@ def __update(self, symbol, item, combine_method=None):
data = self.serializer.serialize(record)
op = True

segment = {DATA: data}
segment[TYPE] = 'dataframe' if isinstance(record, DataFrame) else 'series'
segment[START] = start
segment[END] = end
sha = self._checksum(symbol, segment)
segment[SHA] = sha
chunk = {DATA: data}
chunk[TYPE] = 'dataframe' if isinstance(record, DataFrame) else 'series'
chunk[START] = start
chunk[END] = end
sha = self._checksum(chunk)
chunk[SHA] = sha
if new_chunk:
# new chunk
bulk.find({SYMBOL: symbol, SHA: sha}
).upsert().update_one({'$set': segment})
).upsert().update_one({'$set': chunk})
else:
bulk.find({SYMBOL: symbol, START: start, END: end}
).update_one({'$set': segment})
).update_one({'$set': chunk})

if op:
bulk.execute()
Expand Down

0 comments on commit 20a4d9b

Please sign in to comment.