Skip to content

Commit

Permalink
Merge 90386e9 into 7dd876d
Browse files Browse the repository at this point in the history
  • Loading branch information
florian-hoenicke committed Mar 7, 2021
2 parents 7dd876d + 90386e9 commit 8fe4638
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
80 changes: 52 additions & 28 deletions jina/clients/sugary_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,69 +8,85 @@
import json
import os
import random
from typing import List, Union, Iterator, Any, Iterable, Dict
from typing import List, Union, Iterator, Iterable, Dict, Generator, Optional

import numpy as np

if False:
from jina import Document

# https://github.com/ndjson/ndjson.github.io/issues/1#issuecomment-109935996
_jsonl_ext = {'.jsonlines', '.ndjson', '.jsonl', '.jl', '.ldjson'}
_csv_ext = {'.csv', '.tcsv'}


def _sample(iterable, sampling_rate: float = None):
def _sample(iterable, sampling_rate: Optional[float] = None):
for i in iterable:
if sampling_rate is None or random.random() < sampling_rate:
yield i


def _subsample(iterable, sampling_rate: float = None, size: int = None, **kwargs):
def _subsample(
iterable, size: Optional[int] = None, sampling_rate: Optional[float] = None
):
yield from it.islice(_sample(iterable, sampling_rate), size)


def _input_lines(
lines: Iterable[str] = None,
filepath: str = None,
lines: Optional[Iterable[str]] = None,
filepath: Optional[str] = None,
read_mode: str = 'r',
line_format: str = 'json',
**kwargs,
) -> Iterator[Union[str, bytes]]:
"""Create a generator from either an Iterable of lines, or a file.
field_resolver: Optional[Dict[str, str]] = None,
size: Optional[int] = None,
sampling_rate: Optional[float] = None,
) -> Generator[Union[str, 'Document'], None, None]:
"""Generator function for lines, json and sc. Yields documents or strings.
:param lines: a list of strings, each is considered as a document
:param filepath: a text file that each line contains a document
:param read_mode: specifies the mode in which the file
is opened. 'r' for reading in text mode, 'rb' for reading in binary
is opened. 'r' for reading in text mode, 'rb' for reading in binary
:param line_format: the format of each line ``json`` or ``csv``
:param kwargs: additional key word arguments
:yield: data from input lines
:param field_resolver: a map from field names defined in ``document`` (JSON, dict) to the field
names defined in Protobuf. This is only used when the given ``document`` is
a JSON string or a Python dict.
:param size: the maximum number of the documents
:param sampling_rate: the sampling rate between [0, 1]
:yields: documents
.. note::
This function should not be directly used, use :meth:`Flow.index_lines`, :meth:`Flow.search_lines` instead
This function should not be directly used, use :meth:`Flow.index_files`, :meth:`Flow.search_files` instead
"""
if filepath:
file_type = os.path.splitext(filepath)[1]
with open(filepath, read_mode) as f:
if file_type in _jsonl_ext:
yield from _input_ndjson(f, **kwargs)
yield from _input_ndjson(f)
elif file_type in _csv_ext:
yield from _input_csv(f, **kwargs)
yield from _input_csv(f, field_resolver, size, sampling_rate)
else:
yield from _subsample(f, **kwargs)
yield from _subsample(f, size, sampling_rate)
elif lines:
if line_format == 'json':
yield from _input_ndjson(lines, **kwargs)
yield from _input_ndjson(lines)
elif line_format == 'csv':
yield from _input_csv(lines, **kwargs)
yield from _input_csv(lines, field_resolver, size, sampling_rate)
else:
yield from _subsample(lines, **kwargs)
yield from _subsample(lines, size, sampling_rate)
else:
raise ValueError('"filepath" and "lines" can not be both empty')


def _input_ndjson(fp: Iterable[str], field_resolver: Dict[str, str] = None, **kwargs):
def _input_ndjson(
fp: Iterable[str],
field_resolver: Optional[Dict[str, str]] = None,
size: Optional[int] = None,
sampling_rate: Optional[float] = None,
):
from jina import Document

for line in _subsample(fp, **kwargs):
for line in _subsample(fp, size, sampling_rate):
value = json.loads(line)
if 'groundtruth' in value and 'document' in value:
yield Document(value['document'], field_resolver), Document(
Expand All @@ -80,11 +96,16 @@ def _input_ndjson(fp: Iterable[str], field_resolver: Dict[str, str] = None, **kw
yield Document(value, field_resolver)


def _input_csv(fp: Iterable[str], field_resolver: Dict[str, str] = None, **kwargs):
def _input_csv(
fp: Iterable[str],
field_resolver: Optional[Dict[str, str]] = None,
size: Optional[int] = None,
sampling_rate: Optional[float] = None,
):
from jina import Document

lines = csv.DictReader(fp)
for value in _subsample(lines, **kwargs):
for value in _subsample(lines, size, sampling_rate):
if 'groundtruth' in value and 'document' in value:
yield Document(value['document'], field_resolver), Document(
value['groundtruth'], field_resolver
Expand All @@ -96,9 +117,9 @@ def _input_csv(fp: Iterable[str], field_resolver: Dict[str, str] = None, **kwarg
def _input_files(
patterns: Union[str, List[str]],
recursive: bool = True,
size: int = None,
sampling_rate: float = None,
read_mode: str = None,
size: Optional[int] = None,
sampling_rate: Optional[float] = None,
read_mode: Optional[str] = None,
) -> Iterator[Union[str, bytes]]:
"""Creates an iterator over a list of file path or the content of the files.
Expand All @@ -110,7 +131,7 @@ def _input_files(
:param read_mode: specifies the mode in which the file is opened.
'r' for reading in text mode, 'rb' for reading in binary mode.
If `read_mode` is None, will iterate over filenames.
:yield: file paths or content
:yield: file paths or binary content
.. note::
This function should not be directly used, use :meth:`Flow.index_files`, :meth:`Flow.search_files` instead
Expand All @@ -137,8 +158,11 @@ def _iter_file_exts(ps):


def _input_ndarray(
array: 'np.ndarray', axis: int = 0, size: int = None, shuffle: bool = False
) -> Iterator[Any]:
array: 'np.ndarray',
axis: int = 0,
size: Optional[int] = None,
shuffle: bool = False,
) -> Generator['np.ndarray', None, None]:
"""Create a generator for a given dimension of a numpy array.
:param array: the numpy ndarray data source
Expand Down
Empty file.

0 comments on commit 8fe4638

Please sign in to comment.