From ebe7ea818ec5f4fc3f06aa427a1db3e4ea539cb2 Mon Sep 17 00:00:00 2001 From: Cristian Date: Mon, 20 Sep 2021 09:28:40 +0200 Subject: [PATCH] fix: psql various fixes --- .gitignore | 5 +- .../searcher/FaissSearcher/faiss_searcher.py | 8 +- .../FaissPostgresSearcher/faisspsql.py | 27 ++- .../FaissPostgresSearcher/requirements.txt | 4 +- .../tests/integration/test_dummy.py | 7 + .../FaissPostgresSearcher/tests/test_dummy.py | 3 - .../PostgreSQLStorage/postgres_indexer.py | 33 ++-- tests/integration/faiss_psql/__init__.py | 0 .../faiss_psql}/docker-compose.yml | 0 tests/integration/faiss_psql/test_208.py | 29 ++++ .../faiss_psql}/test_flow_integration.py | 21 +-- .../psql_dump_reload/flow_storage.yml | 2 +- .../psql_dump_reload/test_dump_psql.py | 24 ++- .../psql_import/test_import_psql.py | 154 +++++++++++------- tests/requirements.txt | 2 +- 15 files changed, 199 insertions(+), 120 deletions(-) create mode 100644 jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/integration/test_dummy.py delete mode 100644 jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/test_dummy.py create mode 100644 tests/integration/faiss_psql/__init__.py rename {jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests => tests/integration/faiss_psql}/docker-compose.yml (100%) create mode 100644 tests/integration/faiss_psql/test_208.py rename {jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/integration => tests/integration/faiss_psql}/test_flow_integration.py (52%) diff --git a/.gitignore b/.gitignore index 20c01ebf..887981ed 100644 --- a/.gitignore +++ b/.gitignore @@ -134,4 +134,7 @@ shell/jina-wizard.sh # Local model downloads .cache -assets \ No newline at end of file +assets +benchmark.txt +dump.txt +import.txt diff --git a/jinahub/indexers/searcher/FaissSearcher/faiss_searcher.py b/jinahub/indexers/searcher/FaissSearcher/faiss_searcher.py index 2bbb9064..b81ab779 100644 --- a/jinahub/indexers/searcher/FaissSearcher/faiss_searcher.py +++ b/jinahub/indexers/searcher/FaissSearcher/faiss_searcher.py @@ -1,5 +1,5 @@ -__copyright__ = "Copyright (c) 2021 Jina AI Limited. All rights reserved." -__license__ = "Apache-2.0" +__copyright__ = 'Copyright (c) 2021 Jina AI Limited. All rights reserved.' +__license__ = 'Apache-2.0' import gzip import os @@ -629,6 +629,10 @@ def _add_delta(self, delta: GENERATOR_DELTA): Adding the delta data to the indexer :param delta: a generator yielding (id, np.ndarray, last_updated) """ + if delta is None: + self.logger.warning('No data received in Faiss._add_delta. Skipping...') + return + for doc_id, vec_array, _ in delta: idx = self._doc_id_to_offset.get(doc_id) if idx is None: # add new item diff --git a/jinahub/indexers/searcher/compound/FaissPostgresSearcher/faisspsql.py b/jinahub/indexers/searcher/compound/FaissPostgresSearcher/faisspsql.py index 1f3a5dd7..8dd5edb0 100644 --- a/jinahub/indexers/searcher/compound/FaissPostgresSearcher/faisspsql.py +++ b/jinahub/indexers/searcher/compound/FaissPostgresSearcher/faisspsql.py @@ -1,5 +1,5 @@ -__copyright__ = "Copyright (c) 2021 Jina AI Limited. All rights reserved." -__license__ = "Apache-2.0" +__copyright__ = 'Copyright (c) 2021 Jina AI Limited. All rights reserved.' +__license__ = 'Apache-2.0' import copy import datetime @@ -48,6 +48,9 @@ def __init__( 'total_shards is None, rolling update ' 'via PSQL import will not be possible.' ) + else: + # shards is passed as str from Flow.add in yaml + self.total_shards = int(self.total_shards) # when constructed from rolling update # args are passed via runtime_args @@ -68,7 +71,7 @@ def __init__( def _init_executors(self, dump_path, kwargs, startup_sync_args): # float32 because that's what faiss expects kv_indexer = PostgreSQLStorage(dump_dtype=np.float32, **kwargs) - vec_indexer = FaissSearcher(dump_path=dump_path, **kwargs) + vec_indexer = FaissSearcher(dump_path=dump_path, prefetch_size=16, **kwargs) if dump_path is None and startup_sync_args is None: name = getattr(self.metas, 'name', self.__class__.__name__) @@ -99,7 +102,9 @@ def _sync_snapshot(self, use_delta): self._kv_indexer.get_snapshot, total_shards=self.total_shards ) timestamp = self._kv_indexer.last_snapshot_timestamp - self._vec_indexer = FaissSearcher(dump_func=dump_func, **self._init_kwargs) + self._vec_indexer = FaissSearcher( + dump_func=dump_func, prefetch_size=12, **self._init_kwargs + ) if use_delta: self.logger.info(f'Now adding delta from timestamp {timestamp}') @@ -135,9 +140,11 @@ def _sync_only_delta(self, parameters, **kwargs): dump_func = functools.partial( self._kv_indexer._get_delta, total_shards=self.total_shards, - timestamp=datetime.datetime.min, + timestamp=timestamp, + ) + self._vec_indexer = FaissSearcher( + dump_func=dump_func, prefetch_size=12, **self._init_kwargs ) - self._vec_indexer = FaissSearcher(dump_func=dump_func, **self._init_kwargs) else: self.logger.warning( 'Syncing via delta method. This cannot guarantee consistency' @@ -220,3 +227,11 @@ def delete(self, docs: Optional[DocumentArray], parameters: Dict, **kwargs): parameters['soft_delete'] = True self._kv_indexer.delete(docs, parameters, **kwargs) + + @requests(on='/dump') + def dump(self, parameters: Dict, **kwargs): + """Dump the index + + :param parameters: a dictionary containing the parameters for the dump + """ + self._kv_indexer.dump(parameters) diff --git a/jinahub/indexers/searcher/compound/FaissPostgresSearcher/requirements.txt b/jinahub/indexers/searcher/compound/FaissPostgresSearcher/requirements.txt index ee6edc5e..e0ebfa78 100644 --- a/jinahub/indexers/searcher/compound/FaissPostgresSearcher/requirements.txt +++ b/jinahub/indexers/searcher/compound/FaissPostgresSearcher/requirements.txt @@ -1,5 +1,5 @@ psycopg2-binary==2.8.6 -git+https://github.com/jina-ai/executors faiss-cpu==1.6.5 git+https://github.com/jina-ai/jina-commons -git+https://github.com/jina-ai/jina.git@v2.0.23#egg=jina[hub] +git+https://github.com/jina-ai/jina.git@v2.1.1#egg=jina[standard] +git+https://github.com/jina-ai/executors # only required to start. DO NOT add real tests here, but in top-level integration diff --git a/jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/integration/test_dummy.py b/jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/integration/test_dummy.py new file mode 100644 index 00000000..513d57e7 --- /dev/null +++ b/jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/integration/test_dummy.py @@ -0,0 +1,7 @@ +# tests require to import from Faiss module +# so thus require PYTHONPATH +# the other option would be installing via requirements +# but that would always be a different version +# only required for CI. DO NOT add real tests here, but in top-level integration +def test_dummy(): + pass diff --git a/jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/test_dummy.py b/jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/test_dummy.py deleted file mode 100644 index b11235a3..00000000 --- a/jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/test_dummy.py +++ /dev/null @@ -1,3 +0,0 @@ -def test_dummy(): - # required by CI, since the `tests` folder exists - pass diff --git a/jinahub/indexers/storage/PostgreSQLStorage/postgres_indexer.py b/jinahub/indexers/storage/PostgreSQLStorage/postgres_indexer.py index 228af567..0f8495e3 100644 --- a/jinahub/indexers/storage/PostgreSQLStorage/postgres_indexer.py +++ b/jinahub/indexers/storage/PostgreSQLStorage/postgres_indexer.py @@ -1,5 +1,5 @@ -__copyright__ = "Copyright (c) 2021 Jina AI Limited. All rights reserved." -__license__ = "Apache-2.0" +__copyright__ = 'Copyright (c) 2021 Jina AI Limited. All rights reserved.' +__license__ = 'Apache-2.0' from typing import Dict, List @@ -225,12 +225,16 @@ def get_snapshot(self, shard_id: int, total_shards: int): to this shard id, out of X total shards, based on the virtual shards allocated. """ - shards_to_get = self._vshards_to_get( - shard_id, total_shards, self.virtual_shards - ) + if self.snapshot_size > 0: + shards_to_get = self._vshards_to_get( + shard_id, total_shards, self.virtual_shards + ) - with self.handler as postgres_handler: - return postgres_handler.get_snapshot(shards_to_get) + with self.handler as postgres_handler: + return postgres_handler.get_snapshot(shards_to_get) + else: + self.logger.warning('Not data in PSQL db snapshot. Nothing to export...') + return None @staticmethod def _vshards_to_get(shard_id, total_shards, virtual_shards): @@ -258,12 +262,17 @@ def _get_delta(self, shard_id, total_shards, timestamp): """ Get the rows that have changed since the last timestamp, per shard """ - shards_to_get = self._vshards_to_get( - shard_id, total_shards, self.virtual_shards - ) + if self.size > 0: - with self.handler as postgres_handler: - return postgres_handler._get_delta(shards_to_get, timestamp) + shards_to_get = self._vshards_to_get( + shard_id, total_shards, self.virtual_shards + ) + + with self.handler as postgres_handler: + return postgres_handler._get_delta(shards_to_get, timestamp) + else: + self.logger.warning('No data in PSQL to export with _get_delta...') + return None @property def last_snapshot_timestamp(self): diff --git a/tests/integration/faiss_psql/__init__.py b/tests/integration/faiss_psql/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/docker-compose.yml b/tests/integration/faiss_psql/docker-compose.yml similarity index 100% rename from jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/docker-compose.yml rename to tests/integration/faiss_psql/docker-compose.yml diff --git a/tests/integration/faiss_psql/test_208.py b/tests/integration/faiss_psql/test_208.py new file mode 100644 index 00000000..b413be32 --- /dev/null +++ b/tests/integration/faiss_psql/test_208.py @@ -0,0 +1,29 @@ +import os + +import pytest +from jina import Document, Flow + +from jinahub.indexers.searcher.compound.FaissPostgresSearcher import ( + FaissPostgresSearcher, +) + +cur_dir = os.path.dirname(os.path.abspath(__file__)) +compose_yml = os.path.join(cur_dir, 'docker-compose.yml') + +# fixes issue #208 https://github.com/jina-ai/executors/issues/208 +@pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose']) +def test_shards_str(docker_compose): + with Flow().load_config( + """ + jtype: Flow + executors: + - name: text_indexer + shards: 1 + uses: FaissPostgresSearcher + uses_with: + startup_sync_args: + only_delta: True + total_shards: 1 + """ + ) as f: + f.search([Document() for _ in range(20)]) diff --git a/jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/integration/test_flow_integration.py b/tests/integration/faiss_psql/test_flow_integration.py similarity index 52% rename from jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/integration/test_flow_integration.py rename to tests/integration/faiss_psql/test_flow_integration.py index e8056b45..b658187e 100644 --- a/jinahub/indexers/searcher/compound/FaissPostgresSearcher/tests/integration/test_flow_integration.py +++ b/tests/integration/faiss_psql/test_flow_integration.py @@ -1,25 +1,14 @@ import os -import time import pytest from jina import Document, Flow -cur_dir = os.path.dirname(os.path.abspath(__file__)) -compose_yml = os.path.join(cur_dir, '..', 'docker-compose.yml') - +from jinahub.indexers.searcher.compound.FaissPostgresSearcher import ( + FaissPostgresSearcher, +) -@pytest.fixture() -def docker_compose(request): - os.system( - f"docker-compose -f {request.param} --project-directory . up --build -d " - f"--remove-orphans" - ) - time.sleep(5) - yield - os.system( - f"docker-compose -f {request.param} --project-directory . down " - f"--remove-orphans" - ) +cur_dir = os.path.dirname(os.path.abspath(__file__)) +compose_yml = os.path.join(cur_dir, 'docker-compose.yml') @pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose']) diff --git a/tests/integration/psql_dump_reload/flow_storage.yml b/tests/integration/psql_dump_reload/flow_storage.yml index 66c245ab..08303475 100644 --- a/tests/integration/psql_dump_reload/flow_storage.yml +++ b/tests/integration/psql_dump_reload/flow_storage.yml @@ -6,6 +6,6 @@ executors: jtype: PostgreSQLStorage metas: workspace: $STORAGE_WORKSPACE - name: psql + name: compound_indexer shards: $SHARDS polling: any diff --git a/tests/integration/psql_dump_reload/test_dump_psql.py b/tests/integration/psql_dump_reload/test_dump_psql.py index fa42e3b7..15cbe4de 100644 --- a/tests/integration/psql_dump_reload/test_dump_psql.py +++ b/tests/integration/psql_dump_reload/test_dump_psql.py @@ -17,12 +17,14 @@ @pytest.fixture() def docker_compose(request): os.system( - f"docker-compose -f {request.param} --project-directory . up --build -d --remove-orphans" + f'docker-compose -f {request.param} --project-directory . up --build -d ' + f'--remove-orphans' ) time.sleep(5) yield os.system( - f"docker-compose -f {request.param} --project-directory . down --remove-orphans" + f'docker-compose -f {request.param} --project-directory . down ' + f'--remove-orphans' ) @@ -31,9 +33,6 @@ def docker_compose(request): from jinahub.indexers.searcher.compound.FaissPostgresSearcher import ( FaissPostgresSearcher, ) -from jinahub.indexers.storage.PostgreSQLStorage.postgres_indexer import ( - PostgreSQLStorage, -) # noinspection PyUnresolvedReferences from jinahub.indexers.storage.PostgreSQLStorage.postgreshandler import ( @@ -116,7 +115,8 @@ def assert_dump_data(dump_path, docs, shards, pea_id): docs_expected = docs[(pea_id) * size_shard : (pea_id + 1) * size_shard] print(f'### pea {pea_id} has {len(docs_expected)} docs') - # TODO these might fail if we implement any ordering of elements on dumping / reloading + # TODO these might fail if we implement any ordering of elements on dumping / + # reloading ids_dump = list(ids_dump) vectors_dump = list(vectors_dump) np.testing.assert_equal(set(ids_dump), set([d.id for d in docs_expected])) @@ -231,18 +231,16 @@ def _in_docker(): return False -# benchmark only -@pytest.mark.skipif( - _in_docker() or ('GITHUB_WORKFLOW' in os.environ), - reason='skip the benchmark test on github workflow or docker', -) @pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose']) def test_benchmark(tmpdir, docker_compose): - nr_docs = 100000 + # benchmark only + nr_docs = 1000000 + if _in_docker() or ('GITHUB_WORKFLOW' in os.environ): + nr_docs = 1000 return test_dump_reload( tmpdir, nr_docs=nr_docs, - emb_size=128, + emb_size=256, shards=3, docker_compose=compose_yml, benchmark=True, diff --git a/tests/integration/psql_import/test_import_psql.py b/tests/integration/psql_import/test_import_psql.py index f9160791..1cfef043 100644 --- a/tests/integration/psql_import/test_import_psql.py +++ b/tests/integration/psql_import/test_import_psql.py @@ -1,4 +1,5 @@ import collections +import datetime import os import time from collections import OrderedDict @@ -24,17 +25,17 @@ METRIC = 'l2' -def _flow(uses_after, total_shards, startup_args): +def _flow(uses_after, total_shards, startup_args, polling, replicas=1, name='indexer'): return Flow().add( - name='indexer', + name=name, uses=FaissPostgresSearcher, uses_with={ 'startup_sync_args': startup_args, }, - uses_metas={'name': 'compound_indexer'}, + uses_metas={'name': name}, parallel=total_shards, - replicas=1, - polling='all', + replicas=replicas, + polling=polling, uses_after=uses_after, ) @@ -42,20 +43,19 @@ def _flow(uses_after, total_shards, startup_args): @pytest.fixture() def docker_compose(request): os.system( - f"docker-compose -f {request.param} --project-directory . up --build -d " - f"--remove-orphans" + f'docker-compose -f {request.param} --project-directory . up --build -d ' + f'--remove-orphans' ) time.sleep(5) yield os.system( - f"docker-compose -f {request.param} --project-directory . down " - f"--remove-orphans" + f'docker-compose -f {request.param} --project-directory . down ' + f'--remove-orphans' ) cur_dir = os.path.dirname(os.path.abspath(__file__)) compose_yml = os.path.join(cur_dir, 'docker-compose.yml') -flow_yml = os.path.join(cur_dir, 'flow.yml') class Pass(Executor): @@ -162,16 +162,18 @@ def flatten(it): # replicas w 1 shard doesn't work +# TODO /sync doesn't work with replicas @pytest.mark.parametrize('shards', [1, 3, 7]) @pytest.mark.parametrize('nr_docs', [100]) @pytest.mark.parametrize('emb_size', [10]) +@pytest.mark.parametrize('replicas', [1]) @pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose']) def test_psql_import( - tmpdir, nr_docs, emb_size, shards, docker_compose, benchmark=False + tmpdir, nr_docs, emb_size, shards, replicas, docker_compose, benchmark=False ): # for psql to start time.sleep(2) - top_k = 5 + top_k = 50 batch_size = min(1000, nr_docs) docs = get_batch_iterator( batches=nr_docs // batch_size, batch_size=batch_size, emb_size=emb_size @@ -181,52 +183,69 @@ def test_psql_import( os.environ['STORAGE_WORKSPACE'] = os.path.join(str(tmpdir), 'index_ws') os.environ['SHARDS'] = str(shards) - if shards > 1: - uses_after = 'MatchMerger' - else: - uses_after = 'Pass' # we only need one Flow - with _flow(uses_after=uses_after, total_shards=shards, startup_args={}) as flow: - # necessary since PSQL instance might not have shutdown properly between tests - if not benchmark: - flow.post(on='/delete', inputs=docs) + # but we make two because + # of polling + storage_shards = shards + if benchmark: + storage_shards //= 2 - with TimeContext(f'### indexing {nr_docs} docs'): - flow.post(on='/index', inputs=docs) + with _flow( + uses_after='Pass', + total_shards=storage_shards, + startup_args={}, + polling='any', + name='indexer_storage', + ) as flow_storage: + with _flow( + uses_after='MatchMerger', + total_shards=shards, + startup_args={}, + polling='all', + replicas=replicas, + name='indexer_query', + ) as flow_query: + # necessary since PSQL instance might not have shutdown properly + # between tests + if not benchmark: + flow_storage.post(on='/delete', inputs=docs) + + with TimeContext(f'### indexing {nr_docs} docs'): + flow_storage.post(on='/index', inputs=docs) + + results = flow_query.post( + on='/search', + inputs=get_documents(nr=1, index_start=0, emb_size=emb_size), + return_results=True, + ) + assert len(results[0].docs[0].matches) == 0 - results = flow.post( - on='/search', - inputs=get_documents(nr=1, index_start=0, emb_size=emb_size), - return_results=True, - ) - assert len(results[0].docs[0].matches) == 0 + with TimeContext(f'### snapshotting {nr_docs} docs'): + flow_storage.post( + on='/snapshot', + ) - with TimeContext(f'### snapshotting {nr_docs} docs'): - flow.post( - on='/snapshot', - ) + with TimeContext(f'### importing {nr_docs} docs'): + flow_query.post( + on='/sync', + ) - with TimeContext(f'### importing {nr_docs} docs'): - flow.post( - on='/sync', + params = {'top_k': nr_docs} + if benchmark: + params = {'top_k': top_k} + results = flow_query.post( + on='/search', + inputs=get_documents(nr=3, index_start=0, emb_size=emb_size), + parameters=params, + return_results=True, ) - - params = {'top_k': nr_docs} - if benchmark: - params = {'top_k': top_k} - results = flow.post( - on='/search', - inputs=get_documents(nr=3, index_start=0, emb_size=emb_size), - parameters=params, - return_results=True, - ) - if benchmark: - assert len(results[0].docs[0].matches) == top_k - else: - assert len(results[0].docs[0].matches) == nr_docs - # TODO score is not deterministic - assert results[0].docs[0].matches[0].scores[METRIC].value > 0.0 + if benchmark: + assert len(results[0].docs[0].matches) == top_k + else: + assert len(results[0].docs[0].matches) == nr_docs + # TODO score is not deterministic + assert results[0].docs[0].matches[0].scores[METRIC].value > 0.0 idx = PostgreSQLStorage() assert idx.size == nr_docs @@ -241,19 +260,21 @@ def _in_docker(): return False -# benchmark only -@pytest.mark.skipif( - _in_docker() or ('GITHUB_WORKFLOW' in os.environ), - reason='skip the benchmark test on github workflow or docker', -) @pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose']) def test_benchmark(tmpdir, docker_compose): - nr_docs = 100000 + # benchmark only + nr_docs = 1000000 + if _in_docker() or ('GITHUB_WORKFLOW' in os.environ): + nr_docs = 1000 return test_psql_import( tmpdir, nr_docs=nr_docs, - emb_size=128, - shards=3, + emb_size=256, + shards=2 * 2, # to make up for replicas, in comparison + # TODO make sync work with replicas. + # Replicas have polling `any` + # and sync needs all + replicas=1, docker_compose=compose_yml, benchmark=True, ) @@ -262,9 +283,11 @@ def test_benchmark(tmpdir, docker_compose): @pytest.mark.parametrize('docker_compose', [compose_yml], indirect=['docker_compose']) def test_start_up(docker_compose): docs = list(get_batch_iterator(batches=10, batch_size=10, emb_size=7)) - shards = 3 + shards = 1 - with _flow(uses_after='MatchMerger', total_shards=shards, startup_args={}) as flow: + with _flow( + uses_after='MatchMerger', total_shards=shards, startup_args={}, polling='any' + ) as flow: flow.post(on='/index', inputs=docs) # here we show how you can avoid having to do a snapshot @@ -273,7 +296,10 @@ def test_start_up(docker_compose): # WARNING: this cannot guarantee consistency if you do # any writes to the PSQL while the shards are loading with _flow( - uses_after='MatchMerger', total_shards=shards, startup_args={'only_delta': True} + uses_after='MatchMerger', + total_shards=shards, + startup_args={'only_delta': True}, + polling='all', ) as flow: results = flow.post( on='/search', @@ -310,7 +336,9 @@ def test_psql_sync_delta( os.environ['SHARDS'] = str(shards) uses_after = 'Pass' - with _flow(uses_after=uses_after, total_shards=shards, startup_args={}) as flow: + with _flow( + uses_after=uses_after, total_shards=shards, startup_args={}, polling='all' + ) as flow: flow.post(on='/index', inputs=docs_original) # we first sync by snapshot and then by delta diff --git a/tests/requirements.txt b/tests/requirements.txt index 7e3eeb3a..5b061848 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/jina-ai/jina.git@v2.0.21#egg=jina[standard] +git+https://github.com/jina-ai/jina.git@v2.1.1#egg=jina[standard] psycopg2-binary==2.8.6 jina_commons @ git+https://github.com/jina-ai/jina-commons@v0.0.5 lmdb==1.2.1