Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow writing of None data in transactions #846

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions arctic/scripts/arctic_copy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,22 @@ def _copy_symbol(symbols):

if existing_data and splice:
original_data = dest.read(symbol).data
preserve_start = to_pandas_closed_closed(DateRange(None, new_data.index[0].to_pydatetime(),
interval=CLOSED_OPEN)).end
preserve_end = to_pandas_closed_closed(DateRange(new_data.index[-1].to_pydatetime(),
None,
interval=OPEN_CLOSED)).start
if not original_data.index.tz:
# No timezone on the original, should we even allow this?
preserve_start = preserve_start.replace(tzinfo=None)
preserve_end = preserve_end.replace(tzinfo=None)
before = original_data.loc[:preserve_start]
after = original_data[preserve_end:]
new_data = before.append(new_data).append(after)

if new_data is None or len(new_data) == 0:
new_data = original_data
else:
preserve_start = to_pandas_closed_closed(DateRange(None, new_data.index[0].to_pydatetime(),
interval=CLOSED_OPEN)).end
preserve_end = to_pandas_closed_closed(DateRange(new_data.index[-1].to_pydatetime(),
None,
interval=OPEN_CLOSED)).start
if not original_data.index.tz:
# No timezone on the original, should we even allow this?
preserve_start = preserve_start.replace(tzinfo=None)
preserve_end = preserve_end.replace(tzinfo=None)
before = original_data.loc[:preserve_start]
after = original_data[preserve_end:]
new_data = before.append(new_data).append(after)

mt.write(symbol, new_data, metadata=version.metadata)
return _copy_symbol
Expand Down
7 changes: 3 additions & 4 deletions arctic/store/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,9 @@ def write(self, symbol, data, prune_previous_version=True, metadata=None, **kwar
Records a write request to be actioned on context exit. Takes exactly the same parameters as the regular
library write call.
"""
if data is not None:
# We only write data if existing data is None or the Timeseries data has changed or metadata has changed
if self.base_ts.data is None or not are_equals(data, self.base_ts.data) or metadata != self.base_ts.metadata:
self._do_write = True
# We only write data if existing data is None or the Timeseries data has changed or metadata has changed
if self.base_ts.data is None or not are_equals(data, self.base_ts.data) or metadata != self.base_ts.metadata:
self._do_write = True
self._write = partial(self._version_store.write, symbol, data, prune_previous_version=prune_previous_version,
metadata=metadata, **kwargs)

Expand Down
24 changes: 24 additions & 0 deletions tests/integration/scripts/test_copy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,27 @@ def test_copy_data_doesnt_exist(arctic, mongo_host):
assert logger.info.call_args_list == [call('Copying data from %s -> %s' % (src_host, dest_host)),
call('Copying: 0 symbols')]
assert logger.warn.call_args_list == [call('No symbols found that matched those provided.')]

@pytest.mark.parametrize('source_ts', [read_str_as_pandas(""" times | near"""), None])
def test_copy_empty_data(arctic, mongo_host, source_ts):
src = 'user.library'
dest = 'user.library2'
# Put ts, ts1 in library
arctic[src].write('some_ts', source_ts)

# Put some other value for ts in library2
arctic[dest].write('some_ts', ts)

# Create the user against the current mongo database
src_host = src + '@' + mongo_host
dest_host = dest + '@' + mongo_host
with patch('arctic.scripts.arctic_copy_data.logger') as logger:
run_as_main(mcd.main, '--src', src_host, '--dest', dest_host, '--log', 'CR101', '--splice', 'some_ts')

assert_frame_equal(ts, arctic[dest].read('some_ts').data)
assert logger.info.call_args_list == [call('Copying data from %s -> %s' % (src_host, dest_host)),
call('Copying: 1 symbols')]
assert logger.warn.call_args_list == [call('Symbol: some_ts already exists in destination, splicing in new data')]

# As the destination data is unchanged, no writing takes place and the audit log is empty.
assert len(arctic[dest].read_audit_log('some_ts')) == 0
12 changes: 6 additions & 6 deletions tests/unit/store/test_version_store_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def test_ArcticTransaction_writes_if_metadata_changed():


def test_ArcticTransaction_writes_if_base_data_corrupted():

vs = Mock(spec=VersionStore)
ts1 = pd.DataFrame(index=[1, 2], data={'a': [1.0, 2.0]})
vs.read.side_effect = OperationFailure('some failure')
Expand Down Expand Up @@ -109,7 +108,7 @@ def test_ArcticTransaction_writes_no_data_found():

assert vs.write.call_args_list == [call(sentinel.symbol, ANY, prune_previous_version=True, metadata={1: 2})]
assert vs.list_versions.call_args_list == [call(sentinel.symbol, latest_only=True),
call(sentinel.symbol)]
call(sentinel.symbol)]


def test_ArcticTransaction_writes_no_data_found_deleted():
Expand Down Expand Up @@ -146,19 +145,20 @@ def test_ArcticTransaction_does_nothing_when_data_not_modified():
assert not vs.write.called


def test_ArcticTransaction_does_nothing_when_data_is_None():
def test_ArcticTransaction_does_write_when_new_data_is_None():
vs = Mock(spec=VersionStore)
ts1 = pd.DataFrame(index=[1, 2], data={'a': [1.0, 2.0]})
vs.read.return_value = VersionedItem(symbol=sentinel.symbol, library=sentinel.library, version=1, metadata=None,
data=ts1, host=sentinel.host)
vs.write.return_value = VersionedItem(symbol=sentinel.symbol, library=sentinel.library, version=2,
metadata=None, data=None, host=sentinel.host)
vs.list_versions.return_value = [{'version': 1}, {'version': 2}]
vs.list_versions.return_value = [{'version': 2}, {'version': 1}]

with ArcticTransaction(vs, sentinel.symbol, sentinel.user, sentinel.log) as cwb:
pass
cwb.write(sentinel.symbol, None, metadata={1: 2})

assert not vs._delete_version.called
assert not vs.write.called
assert vs.write.call_args_list == [call(sentinel.symbol, None, prune_previous_version=True, metadata={1: 2})]


def test_ArcticTransaction_guards_against_inconsistent_ts():
Expand Down