Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: fix benchmark tests #173

Merged
merged 1 commit into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,7 @@ shell/jina-wizard.sh

# Local model downloads
.cache
assets
assets
benchmark.txt
dump.txt
import.txt
8 changes: 6 additions & 2 deletions jinahub/indexers/searcher/FaissSearcher/faiss_searcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__copyright__ = "Copyright (c) 2021 Jina AI Limited. All rights reserved."
__license__ = "Apache-2.0"
__copyright__ = 'Copyright (c) 2021 Jina AI Limited. All rights reserved.'
__license__ = 'Apache-2.0'

import gzip
import os
Expand Down Expand Up @@ -629,6 +629,10 @@ def _add_delta(self, delta: GENERATOR_DELTA):
Adding the delta data to the indexer
:param delta: a generator yielding (id, np.ndarray, last_updated)
"""
if delta is None:
self.logger.warning('No data received in Faiss._add_delta. Skipping...')
return

for doc_id, vec_array, _ in delta:
idx = self._doc_id_to_offset.get(doc_id)
if idx is None: # add new item
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__copyright__ = "Copyright (c) 2021 Jina AI Limited. All rights reserved."
__license__ = "Apache-2.0"
__copyright__ = 'Copyright (c) 2021 Jina AI Limited. All rights reserved.'
__license__ = 'Apache-2.0'

import copy
import datetime
Expand Down Expand Up @@ -48,6 +48,9 @@ def __init__(
'total_shards is None, rolling update '
'via PSQL import will not be possible.'
)
else:
# shards is passed as str from Flow.add in yaml
self.total_shards = int(self.total_shards)

# when constructed from rolling update
# args are passed via runtime_args
Expand All @@ -68,7 +71,7 @@ def __init__(
def _init_executors(self, dump_path, kwargs, startup_sync_args):
# float32 because that's what faiss expects
kv_indexer = PostgreSQLStorage(dump_dtype=np.float32, **kwargs)
vec_indexer = FaissSearcher(dump_path=dump_path, **kwargs)
vec_indexer = FaissSearcher(dump_path=dump_path, prefetch_size=16, **kwargs)

if dump_path is None and startup_sync_args is None:
name = getattr(self.metas, 'name', self.__class__.__name__)
Expand Down Expand Up @@ -99,7 +102,9 @@ def _sync_snapshot(self, use_delta):
self._kv_indexer.get_snapshot, total_shards=self.total_shards
)
timestamp = self._kv_indexer.last_snapshot_timestamp
self._vec_indexer = FaissSearcher(dump_func=dump_func, **self._init_kwargs)
self._vec_indexer = FaissSearcher(
dump_func=dump_func, prefetch_size=12, **self._init_kwargs
)

if use_delta:
self.logger.info(f'Now adding delta from timestamp {timestamp}')
Expand Down Expand Up @@ -135,9 +140,11 @@ def _sync_only_delta(self, parameters, **kwargs):
dump_func = functools.partial(
self._kv_indexer._get_delta,
total_shards=self.total_shards,
timestamp=datetime.datetime.min,
timestamp=timestamp,
)
self._vec_indexer = FaissSearcher(
dump_func=dump_func, prefetch_size=12, **self._init_kwargs
)
self._vec_indexer = FaissSearcher(dump_func=dump_func, **self._init_kwargs)
else:
self.logger.warning(
'Syncing via delta method. This cannot guarantee consistency'
Expand Down Expand Up @@ -220,3 +227,11 @@ def delete(self, docs: Optional[DocumentArray], parameters: Dict, **kwargs):
parameters['soft_delete'] = True

self._kv_indexer.delete(docs, parameters, **kwargs)

@requests(on='/dump')
def dump(self, parameters: Dict, **kwargs):
"""Dump the index

:param parameters: a dictionary containing the parameters for the dump
"""
self._kv_indexer.dump(parameters)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
psycopg2-binary==2.8.6
git+https://github.com/jina-ai/executors
faiss-cpu==1.6.5
git+https://github.com/jina-ai/jina-commons
git+https://github.com/jina-ai/jina.git@v2.0.23#egg=jina[hub]
git+https://github.com/jina-ai/jina.git@v2.1.1#egg=jina[standard]
git+https://github.com/jina-ai/executors # only required to start. DO NOT add real tests here, but in top-level integration
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# tests require to import from Faiss module
# so thus require PYTHONPATH
# the other option would be installing via requirements
# but that would always be a different version
# only required for CI. DO NOT add real tests here, but in top-level integration
def test_dummy():
pass

This file was deleted.

33 changes: 21 additions & 12 deletions jinahub/indexers/storage/PostgreSQLStorage/postgres_indexer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__copyright__ = "Copyright (c) 2021 Jina AI Limited. All rights reserved."
__license__ = "Apache-2.0"
__copyright__ = 'Copyright (c) 2021 Jina AI Limited. All rights reserved.'
__license__ = 'Apache-2.0'

from typing import Dict, List

Expand Down Expand Up @@ -225,12 +225,16 @@ def get_snapshot(self, shard_id: int, total_shards: int):
to this shard id, out of X total shards, based on the virtual
shards allocated.
"""
shards_to_get = self._vshards_to_get(
shard_id, total_shards, self.virtual_shards
)
if self.snapshot_size > 0:
shards_to_get = self._vshards_to_get(
shard_id, total_shards, self.virtual_shards
)

with self.handler as postgres_handler:
return postgres_handler.get_snapshot(shards_to_get)
with self.handler as postgres_handler:
return postgres_handler.get_snapshot(shards_to_get)
else:
self.logger.warning('Not data in PSQL db snapshot. Nothing to export...')
return None

@staticmethod
def _vshards_to_get(shard_id, total_shards, virtual_shards):
Expand Down Expand Up @@ -258,12 +262,17 @@ def _get_delta(self, shard_id, total_shards, timestamp):
"""
Get the rows that have changed since the last timestamp, per shard
"""
shards_to_get = self._vshards_to_get(
shard_id, total_shards, self.virtual_shards
)
if self.size > 0:

with self.handler as postgres_handler:
return postgres_handler._get_delta(shards_to_get, timestamp)
shards_to_get = self._vshards_to_get(
shard_id, total_shards, self.virtual_shards
)

with self.handler as postgres_handler:
return postgres_handler._get_delta(shards_to_get, timestamp)
else:
self.logger.warning('No data in PSQL to export with _get_delta...')
return None

@property
def last_snapshot_timestamp(self):
Expand Down
Empty file.
29 changes: 29 additions & 0 deletions tests/integration/faiss_psql/test_208.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os

import pytest
from jina import Document, Flow

from jinahub.indexers.searcher.compound.FaissPostgresSearcher import (
FaissPostgresSearcher,
)

cur_dir = os.path.dirname(os.path.abspath(__file__))
compose_yml = os.path.join(cur_dir, 'docker-compose.yml')

# fixes issue #208 https://github.com/jina-ai/executors/issues/208
@pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose'])
def test_shards_str(docker_compose):
with Flow().load_config(
"""
jtype: Flow
executors:
- name: text_indexer
shards: 1
uses: FaissPostgresSearcher
uses_with:
startup_sync_args:
only_delta: True
total_shards: 1
"""
) as f:
f.search([Document() for _ in range(20)])
Original file line number Diff line number Diff line change
@@ -1,25 +1,14 @@
import os
import time

import pytest
from jina import Document, Flow

cur_dir = os.path.dirname(os.path.abspath(__file__))
compose_yml = os.path.join(cur_dir, '..', 'docker-compose.yml')

from jinahub.indexers.searcher.compound.FaissPostgresSearcher import (
FaissPostgresSearcher,
)

@pytest.fixture()
def docker_compose(request):
os.system(
f"docker-compose -f {request.param} --project-directory . up --build -d "
f"--remove-orphans"
)
time.sleep(5)
yield
os.system(
f"docker-compose -f {request.param} --project-directory . down "
f"--remove-orphans"
)
cur_dir = os.path.dirname(os.path.abspath(__file__))
compose_yml = os.path.join(cur_dir, 'docker-compose.yml')


@pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose'])
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/psql_dump_reload/flow_storage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ executors:
jtype: PostgreSQLStorage
metas:
workspace: $STORAGE_WORKSPACE
name: psql
name: compound_indexer
shards: $SHARDS
polling: any
24 changes: 11 additions & 13 deletions tests/integration/psql_dump_reload/test_dump_psql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
@pytest.fixture()
def docker_compose(request):
os.system(
f"docker-compose -f {request.param} --project-directory . up --build -d --remove-orphans"
f'docker-compose -f {request.param} --project-directory . up --build -d '
f'--remove-orphans'
)
time.sleep(5)
yield
os.system(
f"docker-compose -f {request.param} --project-directory . down --remove-orphans"
f'docker-compose -f {request.param} --project-directory . down '
f'--remove-orphans'
)


Expand All @@ -31,9 +33,6 @@ def docker_compose(request):
from jinahub.indexers.searcher.compound.FaissPostgresSearcher import (
FaissPostgresSearcher,
)
from jinahub.indexers.storage.PostgreSQLStorage.postgres_indexer import (
PostgreSQLStorage,
)

# noinspection PyUnresolvedReferences
from jinahub.indexers.storage.PostgreSQLStorage.postgreshandler import (
Expand Down Expand Up @@ -116,7 +115,8 @@ def assert_dump_data(dump_path, docs, shards, pea_id):
docs_expected = docs[(pea_id) * size_shard : (pea_id + 1) * size_shard]
print(f'### pea {pea_id} has {len(docs_expected)} docs')

# TODO these might fail if we implement any ordering of elements on dumping / reloading
# TODO these might fail if we implement any ordering of elements on dumping /
# reloading
ids_dump = list(ids_dump)
vectors_dump = list(vectors_dump)
np.testing.assert_equal(set(ids_dump), set([d.id for d in docs_expected]))
Expand Down Expand Up @@ -231,18 +231,16 @@ def _in_docker():
return False


# benchmark only
@pytest.mark.skipif(
_in_docker() or ('GITHUB_WORKFLOW' in os.environ),
reason='skip the benchmark test on github workflow or docker',
)
@pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose'])
def test_benchmark(tmpdir, docker_compose):
nr_docs = 100000
# benchmark only
nr_docs = 1000000
if _in_docker() or ('GITHUB_WORKFLOW' in os.environ):
nr_docs = 1000
return test_dump_reload(
tmpdir,
nr_docs=nr_docs,
emb_size=128,
emb_size=256,
shards=3,
docker_compose=compose_yml,
benchmark=True,
Expand Down
Loading