Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: type annotation #2049

Merged
merged 12 commits into from
Mar 9, 2021
64 changes: 40 additions & 24 deletions jina/clients/sugary_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import json
import os
import random
from typing import List, Union, Iterator, Any, Iterable, Dict
from typing import List, Union, Iterator, Iterable, Dict, TextIO, Generator

import numpy as np

if False:
from jina import Document

# https://github.com/ndjson/ndjson.github.io/issues/1#issuecomment-109935996
_jsonl_ext = {'.jsonlines', '.ndjson', '.jsonl', '.jl', '.ldjson'}
_csv_ext = {'.csv', '.tcsv'}
Expand All @@ -23,54 +26,62 @@ def _sample(iterable, sampling_rate: float = None):
yield i


def _subsample(iterable, sampling_rate: float = None, size: int = None, **kwargs):
def _subsample(iterable, size: int = None, sampling_rate: float = None):
yield from it.islice(_sample(iterable, sampling_rate), size)


def _input_lines(
lines: Iterable[str] = None,
filepath: str = None,
florian-hoenicke marked this conversation as resolved.
Show resolved Hide resolved
read_mode: str = 'r',
line_format: str = 'json',
**kwargs,
) -> Iterator[Union[str, bytes]]:
"""Create a generator from either an Iterable of lines, or a file.
field_resolver: Dict[str, str] = None,
size: int = None,
sampling_rate: float = None,
) -> Generator[Union[str, 'Document'], None, None]:
"""Generator function for lines, json and sc. Yields documents or strings.

:param lines: a list of strings, each is considered as a document
:param filepath: a text file that each line contains a document
:param read_mode: specifies the mode in which the file
is opened. 'r' for reading in text mode, 'rb' for reading in binary
:param line_format: the format of each line ``json`` or ``csv``
:param kwargs: additional key word arguments
:yield: data from input lines
:param field_resolver: a map from field names defined in ``document`` (JSON, dict) to the field
names defined in Protobuf. This is only used when the given ``document`` is
a JSON string or a Python dict.
:param size: the maximum number of the documents
:param sampling_rate: the sampling rate between [0, 1]
:yields: documents

.. note::
This function should not be directly used, use :meth:`Flow.index_lines`, :meth:`Flow.search_lines` instead
This function should not be directly used, use :meth:`Flow.index_files`, :meth:`Flow.search_files` instead
"""
if filepath:
file_type = os.path.splitext(filepath)[1]
with open(filepath, read_mode) as f:
with open(filepath, 'r') as f:
if file_type in _jsonl_ext:
yield from _input_ndjson(f, **kwargs)
florian-hoenicke marked this conversation as resolved.
Show resolved Hide resolved
yield from _input_ndjson(f)
elif file_type in _csv_ext:
yield from _input_csv(f, **kwargs)
yield from _input_csv(f, field_resolver, size, sampling_rate)
else:
yield from _subsample(f, **kwargs)
yield from _subsample(f, size, sampling_rate)
elif lines:
if line_format == 'json':
yield from _input_ndjson(lines, **kwargs)
yield from _input_ndjson(lines)
elif line_format == 'csv':
yield from _input_csv(lines, **kwargs)
yield from _input_csv(lines, field_resolver, size, sampling_rate)
else:
yield from _subsample(lines, **kwargs)
yield from _subsample(lines, size, sampling_rate)
else:
raise ValueError('"filepath" and "lines" can not be both empty')


def _input_ndjson(fp: Iterable[str], field_resolver: Dict[str, str] = None, **kwargs):
def _input_ndjson(
fp: Iterable[str],
field_resolver: Dict[str, str] = None,
florian-hoenicke marked this conversation as resolved.
Show resolved Hide resolved
size: int = None,
sampling_rate: float = None,
):
from jina import Document

for line in _subsample(fp, **kwargs):
for line in _subsample(fp, size, sampling_rate):
value = json.loads(line)
if 'groundtruth' in value and 'document' in value:
yield Document(value['document'], field_resolver), Document(
Expand All @@ -80,11 +91,16 @@ def _input_ndjson(fp: Iterable[str], field_resolver: Dict[str, str] = None, **kw
yield Document(value, field_resolver)


def _input_csv(fp: Iterable[str], field_resolver: Dict[str, str] = None, **kwargs):
def _input_csv(
fp: Iterable[str],
field_resolver: Dict[str, str] = None,
size: int = None,
sampling_rate: float = None,
):
from jina import Document

lines = csv.DictReader(fp)
for value in _subsample(lines, **kwargs):
for value in _subsample(lines, size, sampling_rate):
if 'groundtruth' in value and 'document' in value:
yield Document(value['document'], field_resolver), Document(
value['groundtruth'], field_resolver
Expand All @@ -110,7 +126,7 @@ def _input_files(
:param read_mode: specifies the mode in which the file is opened.
'r' for reading in text mode, 'rb' for reading in binary mode.
If `read_mode` is None, will iterate over filenames.
:yield: file paths or content
:yield: file paths or binary content

.. note::
This function should not be directly used, use :meth:`Flow.index_files`, :meth:`Flow.search_files` instead
Expand Down Expand Up @@ -138,7 +154,7 @@ def _iter_file_exts(ps):

def _input_ndarray(
array: 'np.ndarray', axis: int = 0, size: int = None, shuffle: bool = False
) -> Iterator[Any]:
) -> Generator['np.ndarray', None, None]:
"""Create a generator for a given dimension of a numpy array.

:param array: the numpy ndarray data source
Expand Down
Empty file.