Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(binarypb): delete on dump #2102

Merged
merged 1 commit into from
Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
96 changes: 78 additions & 18 deletions jina/executors/indexers/keyvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
__license__ = "Apache-2.0"

import mmap
import os
from typing import Iterable, Optional

import numpy as np
Expand All @@ -13,14 +14,19 @@


class BinaryPbIndexer(BaseKVIndexer):
"""Simple Key-value indexer."""
"""Simple Key-value indexer that writes to disk

:param delete_on_dump: whether to delete the entries that were marked as 'deleted'
"""

class WriteHandler:
"""
Write file handler.

:param path: Path of the file.
:param mode: Writing mode. (e.g. 'ab', 'wb')
"""

def __init__(self, path, mode):
self.body = open(path, mode)
self.header = open(path + '.head', mode)
Expand All @@ -42,6 +48,7 @@ class ReadHandler:
:param path: Path of the file.
:param key_length: Length of key.
"""

def __init__(self, path, key_length):
with open(path + '.head', 'rb') as fp:
tmp = np.frombuffer(fp.read(),
Expand All @@ -56,6 +63,52 @@ def close(self):
"""Close the file."""
self._body.close()

def __getstate__(self):
# called on pickle save
if self.delete_on_dump:
self._delete_invalid_indices()
d = super().__getstate__()
return d

def _delete_invalid_indices(self):
if self.query_handler:
self.query_handler.close()
if self.write_handler:
self.write_handler.flush()
cristianmtr marked this conversation as resolved.
Show resolved Hide resolved
self.write_handler.close()

keys = []
vals = []
# we read the valid values and write them to the intermediary file
read_handler = self.ReadHandler(self.index_abspath, self.key_length)
cristianmtr marked this conversation as resolved.
Show resolved Hide resolved
for key in read_handler.header.keys():
pos_info = read_handler.header.get(key, None)
if pos_info:
p, r, l = pos_info
with mmap.mmap(read_handler.body, offset=p, length=l) as m:
keys.append(key)
vals.append(m[r:])
read_handler.close()
if len(keys) == 0:
return

# intermediary file
tmp_file = self.index_abspath + '-tmp'
self._start = 0
filtered_data_writer = self.WriteHandler(tmp_file, 'ab')
# reset size
self._size = 0
self._add(keys, vals, filtered_data_writer)
filtered_data_writer.close()
cristianmtr marked this conversation as resolved.
Show resolved Hide resolved

# replace orig. file
# and .head file
head_path = self.index_abspath + '.head'
os.remove(self.index_abspath)
os.remove(head_path)
os.rename(tmp_file, self.index_abspath)
os.rename(tmp_file + '.head', head_path)

def get_add_handler(self) -> 'WriteHandler':
"""
Get write file handler.
Expand All @@ -82,11 +135,14 @@ def get_query_handler(self) -> 'ReadHandler':
"""
return self.ReadHandler(self.index_abspath, self.key_length)

def __init__(self, *args, **kwargs):
def __init__(self,
delete_on_dump: bool = False,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self._total_byte_len = 0
self._start = 0
self._page_size = mmap.ALLOCATIONGRANULARITY
self.delete_on_dump = delete_on_dump

def add(self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs) -> None:
"""Add the serialized documents to the index via document ids.
Expand All @@ -96,23 +152,10 @@ def add(self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs) ->
:param args: extra arguments
:param kwargs: keyword arguments
"""
if not keys:
if not any(keys):
return

for key, value in zip(keys, values):
l = len(value) #: the length
p = int(self._start / self._page_size) * self._page_size #: offset of the page
r = self._start % self._page_size #: the remainder, i.e. the start position given the offset
self.write_handler.header.write(
np.array(
(key, p, r, r + l),
dtype=[('', (np.str_, self.key_length)), ('', np.int64), ('', np.int64), ('', np.int64)]
).tobytes()
)
self._start += l
self.write_handler.body.write(value)
self._size += 1
self.write_handler.flush()
self._add(keys, values, writer=self.write_handler)

def query(self, key: str) -> Optional[bytes]:
"""Find the serialized document to the index via document id.
Expand Down Expand Up @@ -165,6 +208,23 @@ def delete(self, keys: Iterable[str], *args, **kwargs) -> None:
if keys:
self._delete(keys)

def _add(self, keys: Iterable[str], values: Iterable[bytes], writer: WriteHandler):
for key, value in zip(keys, values):
l = len(value) #: the length
p = int(self._start / self._page_size) * self._page_size #: offset of the page
r = self._start % self._page_size #: the remainder, i.e. the start position given the offset
# noinspection PyTypeChecker
writer.header.write(
np.array(
(key, p, r, r + l),
dtype=[('', (np.str_, self.key_length)), ('', np.int64), ('', np.int64), ('', np.int64)]
).tobytes()
)
self._start += l
writer.body.write(value)
self._size += 1
writer.flush()


class DataURIPbIndexer(BinaryPbIndexer):
"""Alias for BinaryPbIndexer"""
Expand Down
79 changes: 62 additions & 17 deletions tests/unit/executors/indexers/test_binary_indexer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import os
import time

import numpy as np
import pytest
Expand Down Expand Up @@ -37,29 +38,36 @@ def validate(req):
validate_callback(mock, validate)


def test_binarypb_update1(test_metas):
with BinaryPbIndexer(metas=test_metas) as idxer:
@pytest.mark.parametrize('delete_on_dump', [True, False])
def test_binarypb_update1(test_metas, delete_on_dump):
with BinaryPbIndexer(metas=test_metas, delete_on_dump=delete_on_dump) as idxer:
idxer.add(['1', '2', '3'], [b'oldvalue', b'same', b'random'])
idxer.save()
assert idxer.size == 3
first_size = os.fstat(idxer.write_handler.body.fileno()).st_size
save_abspath = idxer.save_abspath

first_size = os.path.getsize(idxer.index_abspath)
save_abspath = idxer.save_abspath

with BaseIndexer.load(save_abspath) as idxer:
assert idxer.query('1') == b'oldvalue'

with BaseIndexer.load(save_abspath) as idxer:
assert idxer.query('1') == b'oldvalue'
second_size = os.fstat(idxer.query_handler._body.fileno()).st_size
assert second_size == first_size

second_size = os.path.getsize(idxer.index_abspath)
assert second_size == first_size

with BaseIndexer.load(save_abspath) as idxer:
# some new value
idxer.update(['1', '2'], [b'newvalue', b'same'])
idxer.save()
third_size = os.fstat(idxer.write_handler.body.fileno()).st_size

third_size = os.path.getsize(idxer.index_abspath)
if delete_on_dump:
assert third_size == first_size
else:
assert third_size > first_size
assert idxer.size == 3
assert idxer.size == 3

with BaseIndexer.load(save_abspath) as idxer:
assert idxer.query('1') == b'newvalue'
Expand All @@ -69,19 +77,27 @@ def test_binarypb_update1(test_metas):

with BaseIndexer.load(save_abspath) as idxer:
# partial update when missing keys encountered
idxer.update(['1', '2', '99'], [b'newvalue2', b'newvalue3', b'decoy'])
idxer.update(['1', '2', '99'], [b'abcvalue', b'abcd', b'WILL_BE_IGNORED'])
idxer.save()
assert idxer.size == 3

fourth_size = os.path.getsize(idxer.index_abspath)
if delete_on_dump:
assert fourth_size == first_size
else:
assert fourth_size > first_size
assert idxer.size == 3

with BaseIndexer.load(save_abspath) as idxer:
assert idxer.query('1') == b'newvalue2'
assert idxer.query('2') == b'newvalue3'
assert idxer.query('1') == b'abcvalue'
assert idxer.query('2') == b'abcd'
assert idxer.query('3') == b'random'
assert idxer.query('99') is None


def test_binarypb_add_and_update_not_working(test_metas):
with BinaryPbIndexer(metas=test_metas) as idxer:
@pytest.mark.parametrize('delete_on_dump', [True, False])
def test_binarypb_add_and_update_not_working(test_metas, delete_on_dump):
with BinaryPbIndexer(metas=test_metas, delete_on_dump=delete_on_dump) as idxer:
idxer.add(['11', '12'], [b'eleven', b'twelve'])
idxer.save()
# FIXME `add` and `update` won't work in the same context
Expand All @@ -104,14 +120,16 @@ def test_binarypb_add_and_update_not_working(test_metas):
assert idxer.size == 2


def test_binarypb_delete(test_metas):
with BinaryPbIndexer(metas=test_metas) as idxer:
@pytest.mark.parametrize('delete_on_dump', [True, False])
def test_binarypb_delete(test_metas, delete_on_dump):
with BinaryPbIndexer(metas=test_metas, delete_on_dump=delete_on_dump) as idxer:
idxer.add(['1', '2', '3'], [b'oldvalue', b'same', b'random'])
idxer.save()
assert idxer.size == 3
save_abspath = idxer.save_abspath

with BaseIndexer.load(save_abspath) as idxer:
assert idxer.size == 3
assert idxer.query('1') == b'oldvalue'

with BaseIndexer.load(save_abspath) as idxer:
Expand All @@ -125,9 +143,10 @@ def test_binarypb_delete(test_metas):
assert idxer.query('3') == b'random'


def test_binarypb_update_twice(test_metas):
@pytest.mark.parametrize('delete_on_dump', [True, False])
def test_binarypb_update_twice(test_metas, delete_on_dump):
"""two updates in a row does work"""
with BinaryPbIndexer(metas=test_metas) as idxer:
with BinaryPbIndexer(metas=test_metas, delete_on_dump=delete_on_dump) as idxer:
idxer.add(['1', '2', '3'], [b'oldvalue', b'same', b'random'])
idxer.save()
assert idxer.size == 3
Expand All @@ -141,3 +160,29 @@ def test_binarypb_update_twice(test_metas):
with BaseIndexer.load(save_abspath) as idxer:
assert idxer.query('1') == b'newvalue'
assert idxer.query('2') == b'othernewvalue'


# benchmark only
@pytest.mark.skipif('GITHUB_WORKFLOW' in os.environ, reason='skip the network test on github workflow')
@pytest.mark.parametrize('delete_on_dump', [True, False])
def test_binarypb_benchmark(test_metas, delete_on_dump):
entries = 100000
nr_to_update = 10000
keys = np.arange(entries)
values = np.random.randint(0, 10, size=entries).astype(bytes)

with BinaryPbIndexer(metas=test_metas, delete_on_dump=delete_on_dump) as idxer:
idxer.add(keys, values)
idxer.save()
assert idxer.size == entries
save_abspath = idxer.save_abspath

new_values = np.random.randint(0, 10, size=nr_to_update).astype(bytes)

with BaseIndexer.load(save_abspath) as idxer:
idxer.update(keys[:nr_to_update], new_values)
time_now = time.time()
idxer.save()

time_end = time.time()
print(f'delete_on_dump = {delete_on_dump}, entries={entries}. took {time_end - time_now} seconds')