Skip to content

Commit

Permalink
refactor(dump): type annotations and refactor (#2310)
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianmtr committed Apr 15, 2021
1 parent 8403440 commit 523747e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 17 deletions.
6 changes: 4 additions & 2 deletions jina/executors/indexers/dbms/keyvalue.py
@@ -1,5 +1,5 @@
import pickle
from typing import List
from typing import List, Tuple, Generator
import numpy as np

from jina.executors.indexers.dump import export_dump_streaming
Expand All @@ -10,7 +10,9 @@
class BinaryPbDBMSIndexer(BinaryPbWriterMixin, BaseDBMSIndexer):
"""A DBMS Indexer (no query method)"""

def _get_generator(self, ids: List[str]):
def _get_generator(
self, ids: List[str]
) -> Generator[Tuple[str, np.array, bytes], None, None]:
for id_ in ids:
vecs_metas_bytes = super()._query(id_)
vec, meta = pickle.loads(vecs_metas_bytes)
Expand Down
47 changes: 32 additions & 15 deletions jina/executors/indexers/dump.py
@@ -1,6 +1,6 @@
import os
import sys
from typing import Iterable, Tuple
from typing import Tuple, Generator, BinaryIO, TextIO

import numpy as np

Expand All @@ -11,7 +11,7 @@ def export_dump_streaming(
path: str,
shards: int,
size: int,
data: Iterable[Tuple[str, np.array, bytes]],
data: Generator[Tuple[str, np.array, bytes], None, None],
):
"""Export the data to a path, based on sharding,
Expand All @@ -23,7 +23,12 @@ def export_dump_streaming(
_handle_dump(data, path, shards, size)


def _handle_dump(data, path, shards, size):
def _handle_dump(
data: Generator[Tuple[str, np.array, bytes], None, None],
path: str,
shards: int,
size: int,
):
if os.path.exists(path):
raise Exception(f'path for dump {path} already exists. Not dumping...')
size_per_shard = size // shards
Expand All @@ -37,7 +42,12 @@ def _handle_dump(data, path, shards, size):
_write_shard_data(data, path, shard_id, size_this_shard)


def _write_shard_data(data, path, shard_id, size_this_shard):
def _write_shard_data(
data: Generator[Tuple[str, np.array, bytes], None, None],
path: str,
shard_id: int,
size_this_shard: int,
):
shard_path = os.path.join(path, str(shard_id))
shard_docs_written = 0
os.makedirs(shard_path)
Expand All @@ -46,16 +56,23 @@ def _write_shard_data(data, path, shard_id, size_this_shard):
ids_fp, 'w'
) as ids_fh:
while shard_docs_written < size_this_shard:
id_, vec, meta = next(data)
vec_bytes = vec.tobytes()
vectors_fh.write(
len(vec_bytes).to_bytes(BYTE_PADDING, sys.byteorder) + vec_bytes
)
metas_fh.write(len(meta).to_bytes(BYTE_PADDING, sys.byteorder) + meta)
ids_fh.write(id_ + '\n')
_write_shard_files(data, ids_fh, metas_fh, vectors_fh)
shard_docs_written += 1


def _write_shard_files(
data: Generator[Tuple[str, np.array, bytes], None, None],
ids_fh: TextIO,
metas_fh: BinaryIO,
vectors_fh: BinaryIO,
):
id_, vec, meta = next(data)
vec_bytes = vec.tobytes()
vectors_fh.write(len(vec_bytes).to_bytes(BYTE_PADDING, sys.byteorder) + vec_bytes)
metas_fh.write(len(meta).to_bytes(BYTE_PADDING, sys.byteorder) + meta)
ids_fh.write(id_ + '\n')


def import_vectors(path: str, pea_id: str):
"""Import id and vectors
Expand All @@ -82,13 +99,13 @@ def import_metas(path: str, pea_id: str):
return ids_gen, metas_gen


def _ids_gen(path):
def _ids_gen(path: str):
with open(os.path.join(path, 'ids'), 'r') as ids_fh:
for l in ids_fh:
yield l.strip()


def _vecs_gen(path):
def _vecs_gen(path: str):
with open(os.path.join(path, 'vectors'), 'rb') as vectors_fh:
while True:
next_size = vectors_fh.read(BYTE_PADDING)
Expand All @@ -103,7 +120,7 @@ def _vecs_gen(path):
break


def _metas_gen(path):
def _metas_gen(path: str):
with open(os.path.join(path, 'metas'), 'rb') as metas_fh:
while True:
next_size = metas_fh.read(BYTE_PADDING)
Expand All @@ -115,7 +132,7 @@ def _metas_gen(path):
break


def _get_file_paths(shard_path):
def _get_file_paths(shard_path: str):
vectors_fp = os.path.join(shard_path, 'vectors')
metas_fp = os.path.join(shard_path, 'metas')
ids_fp = os.path.join(shard_path, 'ids')
Expand Down

0 comments on commit 523747e

Please sign in to comment.