Skip to content

Commit

Permalink
Mongo 3.6+ no longer uses the index to sort when using multi-key inde…
Browse files Browse the repository at this point in the history
…xes (SERVER-19402), therefore, we are now using the client to sort.
  • Loading branch information
rob256 committed Aug 24, 2018
1 parent 4d19167 commit cbddbb4
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 25 deletions.
10 changes: 4 additions & 6 deletions arctic/store/_ndarray_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import hashlib
import logging
import os
from operator import itemgetter


from bson.binary import Binary
Expand Down Expand Up @@ -269,7 +270,7 @@ def _do_read(self, collection, version, symbol, index_range=None):

data = bytearray()
i = -1
for i, x in enumerate(collection.find(spec, sort=[('segment', pymongo.ASCENDING)],)):
for i, x in enumerate(sorted(collection.find(spec), key=itemgetter('segment'))):
data.extend(decompress(x['data']) if x['compressed'] else x['data'])

# Check that the correct number of segments has been returned
Expand Down Expand Up @@ -409,11 +410,8 @@ def _concat_and_rewrite(self, collection, version, symbol, item, previous_versio
read_index_range = [0, None]
# The unchanged segments are the compressed ones (apart from the last compressed)
unchanged_segment_ids = []
for segment in collection.find(spec, projection={'_id':1,
'segment':1,
'compressed': 1
},
sort=[('segment', pymongo.ASCENDING)]):
for segment in sorted(collection.find(spec, projection={'_id': 1, 'segment': 1, 'compressed': 1}),
key=itemgetter('segment')):
# We want to stop iterating when we find the first uncompressed chunks
if not segment['compressed']:
# We include the last compressed chunk in the recompression
Expand Down
13 changes: 7 additions & 6 deletions arctic/store/_pickle_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from bson.binary import Binary
from bson.errors import InvalidDocument
from operator import itemgetter
from six.moves import cPickle, xrange
import io
from .._compression import decompress, compress_array
Expand Down Expand Up @@ -37,14 +38,14 @@ def read(self, mongoose_lib, version, symbol, **kwargs):
if blob is not None:
if blob == _MAGIC_CHUNKEDV2:
collection = mongoose_lib.get_top_level_collection()
data = b''.join(decompress(x['data']) for x in collection.find({'symbol': symbol,
'parent': version_base_or_id(version)},
sort=[('segment', pymongo.ASCENDING)]))
data = b''.join(decompress(x['data']) for x in sorted(
collection.find({'symbol': symbol, 'parent': version_base_or_id(version)}),
key=itemgetter('segment')))
elif blob == _MAGIC_CHUNKED:
collection = mongoose_lib.get_top_level_collection()
data = b''.join(x['data'] for x in collection.find({'symbol': symbol,
'parent': version_base_or_id(version)},
sort=[('segment', pymongo.ASCENDING)]))
data = b''.join(x['data'] for x in sorted(
collection.find({'symbol': symbol, 'parent': version_base_or_id(version)}),
key=itemgetter('segment')))
data = decompress(data)
else:
if blob[:len(_MAGIC_CHUNKED)] == _MAGIC_CHUNKED:
Expand Down
1 change: 1 addition & 0 deletions arctic/store/_version_store_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import pickle
import six
from operator import itemgetter

import numpy as np
import pandas as pd
Expand Down
23 changes: 14 additions & 9 deletions tests/unit/store/test_ndarray_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def test_concat_and_rewrite_checks_chunk_count():
symbol = sentinel.symbol
item = sentinel.item

collection.find.return_value = [{'compressed': True},
{'compressed': False}]
collection.find.return_value = [{'compressed': True, 'segment': 1},
{'compressed': False, 'segment': 2}]
with pytest.raises(DataIntegrityException) as e:
NdarrayStore._concat_and_rewrite(self, collection, version, symbol, item, previous_version)
assert str(e.value) == 'Symbol: sentinel.symbol:sentinel.version expected 1 segments but found 0'
Expand All @@ -108,9 +108,11 @@ def test_concat_and_rewrite_checks_written():

collection.find.return_value = [{'_id': sentinel.id,
'segment': 47, 'compressed': True},
{'compressed': True},
{'_id': sentinel.id_2, 'segment': 48, 'compressed': True},
# 3 appended items
{'compressed': False}, {'compressed': False}, {'compressed': False}]
{'_id': sentinel.id_3, 'segment': 49, 'compressed': False},
{'_id': sentinel.id_4, 'segment': 50, 'compressed': False},
{'_id': sentinel.id_5, 'segment': 51, 'compressed': False}]
collection.update_many.return_value = create_autospec(UpdateResult, matched_count=1)
NdarrayStore._concat_and_rewrite(self, collection, version, symbol, item, previous_version)
assert self.check_written.call_count == 1
Expand All @@ -131,8 +133,11 @@ def test_concat_and_rewrite_checks_different_id():
item = []

collection.find.side_effect = [
[{'_id': sentinel.id, 'segment' : 47, 'compressed': True}, {'compressed': True},
{'compressed': False}, {'compressed': False}, {'compressed': False}], # 3 appended items
[{'_id': sentinel.id, 'segment' : 47, 'compressed': True},
{'_id': sentinel.id_3, 'segment': 48, 'compressed': True},
{'_id': sentinel.id_4, 'segment': 49, 'compressed': False},
{'_id': sentinel.id_5, 'segment': 50, 'compressed': False},
{'_id': sentinel.id_6, 'segment': 51, 'compressed': False}], # 3 appended items
[{'_id': sentinel.id_2}] # the returned id is different after the update_many
]

Expand Down Expand Up @@ -163,9 +168,9 @@ def test_concat_and_rewrite_checks_fewer_updated():
[{'_id': sentinel.id_1, 'segment': 47, 'compressed': True},
{'_id': sentinel.id_2, 'segment': 48, 'compressed': True},
{'_id': sentinel.id_3, 'segment': 49, 'compressed': True},
{'compressed': False},
{'compressed': False},
{'compressed': False}], # 3 appended items
{'_id': sentinel.id_4, 'segment': 50, 'compressed': False},
{'_id': sentinel.id_5, 'segment': 51, 'compressed': False},
{'_id': sentinel.id_6, 'segment': 52, 'compressed': False}], # 3 appended items
[{'_id': sentinel.id_1}] # the returned id is different after the update_many
]

Expand Down
10 changes: 6 additions & 4 deletions tests/unit/store/test_pickle_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ def test_read_object_2():
coll = Mock()
arctic_lib = Mock()
coll.find.return_value = [{'data': Binary(compressHC(cPickle.dumps(object))),
'symbol': 'sentinel.symbol'}
'symbol': 'sentinel.symbol',
'segment': 1}
]
arctic_lib.get_top_level_collection.return_value = coll

assert PickleStore.read(self, arctic_lib, version, sentinel.symbol) == object
assert coll.find.call_args_list == [call({'symbol': sentinel.symbol, 'parent': sentinel._id}, sort=[('segment', 1)])]
assert coll.find.call_args_list == [call({'symbol': sentinel.symbol, 'parent': sentinel._id})]


def test_read_with_base_version_id():
Expand All @@ -74,12 +75,13 @@ def test_read_with_base_version_id():
coll = Mock()
arctic_lib = Mock()
coll.find.return_value = [{'data': Binary(compressHC(cPickle.dumps(object))),
'symbol': 'sentinel.symbol'}
'symbol': 'sentinel.symbol',
'segment': 1}
]
arctic_lib.get_top_level_collection.return_value = coll

assert PickleStore.read(self, arctic_lib, version, sentinel.symbol) == object
assert coll.find.call_args_list == [call({'symbol': sentinel.symbol, 'parent': sentinel.base_version_id}, sort=[('segment', 1)])]
assert coll.find.call_args_list == [call({'symbol': sentinel.symbol, 'parent': sentinel.base_version_id})]


@pytest.mark.xfail(sys.version_info >= (3,),
Expand Down

0 comments on commit cbddbb4

Please sign in to comment.