Skip to content

Commit

Permalink
Merge f9b57f5 into 1f4ded9
Browse files Browse the repository at this point in the history
  • Loading branch information
Yongxuanzhang committed Feb 24, 2021
2 parents 1f4ded9 + f9b57f5 commit ac7f7de
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 37 deletions.
29 changes: 26 additions & 3 deletions jina/clients/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,20 @@ def __init__(self, args: 'argparse.Namespace'):

@property
def mode(self) -> str:
"""The mode for this client (index, query etc.)."""
"""
Get the mode for this client (index, query etc.).
:return: Mode of the client.
"""
return self._mode

@mode.setter
def mode(self, value: RequestType) -> None:
"""
Set the mode.
:param value: Request type. (e.g. INDEX, SEARCH, DELETE, UPDATE, CONTROL, TRAIN)
"""
if isinstance(value, RequestType):
self._mode = value
self.args.mode = value
Expand All @@ -64,6 +73,7 @@ def check_input(input_fn: Optional[InputFnType] = None, **kwargs) -> None:
"""Validate the input_fn and print the first request if success.
:param input_fn: the input function
:param kwargs: keyword arguments
"""
if hasattr(input_fn, '__call__'):
input_fn = input_fn()
Expand All @@ -85,7 +95,12 @@ def check_input(input_fn: Optional[InputFnType] = None, **kwargs) -> None:
raise BadClientInput from ex

def _get_requests(self, **kwargs) -> Union[Iterator['Request'], AsyncIterator['Request']]:
"""Get request in generator."""
"""
Get request in generator.
:param kwargs: Keyword arguments.
:return: Iterator of request.
"""
_kwargs = vars(self.args)
_kwargs['data'] = self.input_fn
# override by the caller-specific kwargs
Expand All @@ -106,9 +121,12 @@ def _get_task_name(self, kwargs: Dict) -> str:

@property
def input_fn(self) -> InputFnType:
"""An iterator of bytes, each element represents a Document's raw content.
"""
An iterator of bytes, each element represents a Document's raw content.
``input_fn`` defined in the protobuf
:return: input function
"""
if self._input_fn is not None:
return self._input_fn
Expand All @@ -117,6 +135,11 @@ def input_fn(self) -> InputFnType:

@input_fn.setter
def input_fn(self, bytes_gen: InputFnType) -> None:
"""
Set the input data.
:param bytes_gen: input function type
"""
if hasattr(bytes_gen, '__call__'):
self._input_fn = bytes_gen()
else:
Expand Down
25 changes: 19 additions & 6 deletions jina/executors/indexers/keyvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


class BinaryPbIndexer(BaseKVIndexer):
"""Simple Key-value indexer."""
class WriteHandler:
"""
Write file handler.
Expand All @@ -21,7 +22,6 @@ class WriteHandler:
:param mode: Writing mode. (e.g. 'ab', 'wb')
"""
def __init__(self, path, mode):
"""Constructor."""
self.body = open(path, mode)
self.header = open(path + '.head', mode)

Expand All @@ -43,7 +43,6 @@ class ReadHandler:
:param key_length: Length of key.
"""
def __init__(self, path, key_length):
"""Constructor."""
with open(path + '.head', 'rb') as fp:
tmp = np.frombuffer(fp.read(),
dtype=[('', (np.str_, key_length)), ('', np.int64), ('', np.int64), ('', np.int64)])
Expand All @@ -58,24 +57,32 @@ def close(self):
self._body.close()

def get_add_handler(self) -> 'WriteHandler':
"""Get write file handler.
"""
Get write file handler.
:return: write handler
"""
# keep _start position as in pickle serialization
return self.WriteHandler(self.index_abspath, 'ab')

def get_create_handler(self) -> 'WriteHandler':
"""Get write file handler.
"""
Get write file handler.
:return: write handler.
"""
self._start = 0 # override _start position
return self.WriteHandler(self.index_abspath, 'wb')

def get_query_handler(self) -> 'ReadHandler':
"""Get read file handler.
"""
Get read file handler.
:return: read handler.
"""
return self.ReadHandler(self.index_abspath, self.key_length)

def __init__(self, *args, **kwargs):
"""Constructor."""
super().__init__(*args, **kwargs)
self._total_byte_len = 0
self._start = 0
Expand All @@ -86,6 +93,8 @@ def add(self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs) ->
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
:param values: serialized documents
:param args: extra arguments
:param kwargs: keyword arguments
"""
if not keys:
return
Expand Down Expand Up @@ -122,6 +131,8 @@ def update(self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs)
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
:param values: serialized documents
:param args: extra arguments
:param kwargs: keyword arguments
"""
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.header.keys())
if keys:
Expand All @@ -147,6 +158,8 @@ def delete(self, keys: Iterable[str], *args, **kwargs) -> None:
"""Delete the serialized documents from the index via document ids.
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
:param args: extra arguments
:param kwargs: keyword arguments
"""
keys = self._filter_nonexistent_keys(keys, self.query_handler.header.keys())
if keys:
Expand Down
2 changes: 1 addition & 1 deletion jina/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@


class FlowType(type(ExitStack), type(JAMLCompatible)):
"""Type of Flow, metaclass of :class:`BaseFlow`"""
pass


Expand Down Expand Up @@ -60,7 +61,6 @@ class BaseFlow(JAMLCompatible, ExitStack, metaclass=FlowType):
_cls_client = Client #: the type of the Client, can be changed to other class

def __init__(self, args: Optional['argparse.Namespace'] = None, env: Optional[Dict] = None, **kwargs):
"""Initialize a Flow object"""
super().__init__()
self._version = '1' #: YAML version number, this will be later overridden if YAML config says the other way
self._pod_nodes = OrderedDict() # type: Dict[str, 'BasePod']
Expand Down
5 changes: 5 additions & 0 deletions jina/helloworld/chatbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@


def hello_world(args):
"""
Execute the chatbot example.
:param args: arguments passed from CLI
"""
Path(args.workdir).mkdir(parents=True, exist_ok=True)

with ImportExtensions(required=True, help_text='this demo requires Pytorch and Transformers to be installed, '
Expand Down
45 changes: 45 additions & 0 deletions jina/helloworld/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,28 @@ def _get_groundtruths(target, pseudo_match=True):


def index_generator(num_docs: int, target: dict):
"""
Generate the index data.
:param num_docs: Number of documents to be indexed.
:param target: Dictionary which stores the data paths
:yields: index data
"""
for internal_doc_id in range(num_docs):
d = Document(content=target['index']['data'][internal_doc_id])
d.tags['id'] = internal_doc_id
yield d


def query_generator(num_docs: int, target: dict, with_groundtruth: bool = True):
"""
Generate the query data.
:param num_docs: Number of documents to be queried
:param target: Dictionary which stores the data paths
:param with_groundtruth: True if want to include labels into query data
:yields: query data
"""
gts = _get_groundtruths(target)
for _ in range(num_docs):
num_data = len(target['query-labels']['data'])
Expand All @@ -67,6 +82,11 @@ def query_generator(num_docs: int, target: dict, with_groundtruth: bool = True):


def print_result(resp):
"""
Callback function to receive results.
:param resp: returned response with data
"""
global evaluation_value
global top_k
for d in resp.search.docs:
Expand All @@ -85,6 +105,11 @@ def print_result(resp):


def write_html(html_path):
"""
Method to present results in browser.
:param html_path: path of the written html
"""
global num_docs_evaluated
global evaluation_value

Expand Down Expand Up @@ -116,6 +141,13 @@ def write_html(html_path):


def download_data(targets, download_proxy=None, task_name='download fashion-mnist'):
"""
Download data.
:param targets: target path for data.
:param download_proxy: download proxy (e.g. 'http', 'https')
:param task_name: name of the task
"""
opener = urllib.request.build_opener()
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
if download_proxy:
Expand All @@ -133,10 +165,23 @@ def download_data(targets, download_proxy=None, task_name='download fashion-mnis


def load_mnist(path):
"""
Load MNIST data
:param path: path of data
:return: MNIST data in np.array
"""

with gzip.open(path, 'rb') as fp:
return np.frombuffer(fp.read(), dtype=np.uint8, offset=16).reshape([-1, 784])


def load_labels(path: str):
"""
Load labels from path
:param path: path of labels
:return: labels in np.array
"""
with gzip.open(path, 'rb') as fp:
return np.frombuffer(fp.read(), dtype=np.uint8, offset=8).reshape([-1, 1])
5 changes: 5 additions & 0 deletions jina/helloworld/multimodal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@


def hello_world(args):
"""
Execute the multimodal example.
:param args: arguments passed from CLI
"""
Path(args.workdir).mkdir(parents=True, exist_ok=True)

with ImportExtensions(required=True, help_text='this demo requires Pytorch and Transformers to be installed, '
Expand Down
35 changes: 18 additions & 17 deletions jina/logging/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def info(self, msg: str, **kwargs):
"""
Log info-level message.
:param kwargs: Keyword arguments.
:param msg: Context of log.
"""
if self.log_level <= LogVerbosity.INFO:
Expand All @@ -46,6 +47,7 @@ def critical(self, msg: str, **kwargs):
"""
Log critical-level message.
:param kwargs: Keyword arguments.
:param msg: Context of log.
"""
if self.log_level <= LogVerbosity.CRITICAL:
Expand All @@ -55,6 +57,7 @@ def debug(self, msg: str, **kwargs):
"""
Log debug-level message.
:param kwargs: Keyword arguments.
:param msg: Content of log.
"""
if self.log_level <= LogVerbosity.DEBUG:
Expand All @@ -64,6 +67,7 @@ def error(self, msg: str, **kwargs):
"""
Log error-level message.
:param kwargs: Keyword arguments.
:param msg: Context of log.
"""
if self.log_level <= LogVerbosity.ERROR:
Expand All @@ -73,6 +77,7 @@ def warning(self, msg: str, **kwargs):
"""
Log warning-level message.
:param kwargs: Keyword arguments.
:param msg: Context of log.
"""
if self.log_level <= LogVerbosity.WARNING:
Expand All @@ -83,13 +88,14 @@ def success(self, msg: str, **kwargs):
Log success-level message.
:param msg: Context of log.
:param kwargs: Keyword arguments.
"""
if self.log_level <= LogVerbosity.SUCCESS:
sys.stdout.write(f'{self.context}[S]:{self._planify(msg)}')


class PrintLogger(NTLogger):

"""Print the message."""
@staticmethod
def _planify(msg):
return msg
Expand All @@ -116,6 +122,16 @@ class SysLogHandlerWrapper(logging.handlers.SysLogHandler):


class JinaLogger:
"""
Build a logger for a context.
:param context: The context identifier of the class, module or method.
:param log_config: The configuration file for the logger.
:param identity: The id of the group the messages from this logger will belong, used by fluentd default
configuration to group logs by pod.
:param workspace_path: The workspace path where the log will be stored at (only apply to fluentd)
:returns: an executor object.
"""
supported = {'FileHandler', 'StreamHandler', 'SysLogHandler', 'FluentHandler'}

def __init__(self,
Expand All @@ -126,16 +142,6 @@ def __init__(self,
workspace_path: Optional[str] = None,
quiet: bool = False,
**kwargs):
"""
Build a logger for a context.
:param context: The context identifier of the class, module or method.
:param log_config: The configuration file for the logger.
:param identity: The id of the group the messages from this logger will belong, used by fluentd default
configuration to group logs by pod.
:param workspace_path: The workspace path where the log will be stored at (only apply to fluentd)
:returns: an executor object.
"""
from .. import __uptime__
if not log_config:
log_config = os.getenv('JINA_LOG_CONFIG',
Expand Down Expand Up @@ -199,11 +205,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def close(self):
"""
Close all the handlers.
:returns: None
"""
"""Close all the handlers."""
for handler in self.logger.handlers:
handler.close()

Expand All @@ -213,7 +215,6 @@ def add_handlers(self, config_path: str = None, **kwargs):
:param config_path: Path of config file.
:param kwargs: Extra parameters.
:returns: None
"""
self.logger.handlers = []

Expand Down

0 comments on commit ac7f7de

Please sign in to comment.