Skip to content

Commit

Permalink
Fixed issue with error handling on Py 3.8 (#500)
Browse files Browse the repository at this point in the history
  • Loading branch information
benknoll-umn committed Mar 20, 2024
1 parent d583454 commit 4464a9b
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 163 deletions.
20 changes: 13 additions & 7 deletions python/mtap/_event.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Regents of the University of Minnesota.
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,7 +71,7 @@ class Event(ContextManager['Event']):
"""
__slots__ = ('_event_id', '_client', '_lock', 'label_adapters',
'_documents_cache', '_metadata_cache', '_binaries_cache',
'_event_service_instance_id')
'_event_service_instance_id', '_leased')

label_adapters: Mapping[str, 'ProtoLabelAdapter']

Expand All @@ -81,6 +81,7 @@ def __init__(
only_create_new: bool = False,
label_adapters: Optional[Mapping[str, 'ProtoLabelAdapter']] = None,
event_service_instance_id: Optional[str] = None,
lease: bool = True
):
self._event_id = event_id or str(uuid.uuid4())
self._event_service_instance_id = event_service_instance_id
Expand All @@ -89,7 +90,8 @@ def __init__(
else:
self.label_adapters = {}
self._client = client
if self._client is not None:
self._leased = lease
if self._client is not None and self._leased:
self._event_service_instance_id = self._client.open_event(
event_service_instance_id,
self._event_id,
Expand Down Expand Up @@ -142,14 +144,17 @@ def binaries(self) -> MutableMapping[str, bytes]:
def created_indices(self) -> Dict[str, List[str]]:
"""A mapping of document names to a list of the names of all the label
indices that have been added to that document"""
return {document_name: document.created_indices
for document_name, document in self.documents.items()}
try:
return {document_name: document.created_indices
for document_name, document in self._documents_cache.data.items()}
except AttributeError:
return {}

def close(self):
"""Closes this event. Lets the event service know that we are done
with the event, allowing to clean up the event if no other clients
have open leases to it."""
if self.client is not None:
if self.client is not None and self._leased:
self.release_lease()

def create_document(self,
Expand Down Expand Up @@ -205,7 +210,8 @@ def add_document(self, document: Document):
cast(_Documents, self.documents).data[name] = document

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if exc_type is not KeyboardInterrupt:
self.close()

def add_created_indices(self, created_indices):
for k, v in created_indices.items():
Expand Down
7 changes: 1 addition & 6 deletions python/mtap/_events_client_grpc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Regents of the University of Minnesota.
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,11 +96,6 @@ def open_event(self, instance_id, event_id, only_create_new):
self.stub.OpenEvent(request)
return self.instance_id

def _conn_error(self):
return ConnectionError(
f"Failed to connect to events service on address: "
f"{self._address}")

def close_event(self, instance_id, event_id):
self._check_instance_id(instance_id)

Expand Down
16 changes: 16 additions & 0 deletions python/mtap/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,15 @@ class ProcessorDeployment:
startup_timeout: Optional[float] = None
"""Optional override startup timeout."""

mp: Optional[bool] = None
"""Whether to use the multiprocessing pool based processor server."""

mp_processes: Optional[int] = None
"""If using multiprocessing host, the number of worker processes."""

mp_start_method: Optional[str] = None
"""A multiprocessing.get_context start method to use."""

@staticmethod
def from_dict(conf: Dict) -> 'ProcessorDeployment':
"""Creates an MTAP processor deployment configuration from a
Expand Down Expand Up @@ -423,6 +432,13 @@ def _create_processor_call(depl, global_settings, port, service_deployment,
call.extend(['--events', ','.join(events_addresses)])
if depl.additional_args is not None:
call.extend(depl.additional_args)

if depl.mp:
call.extend(['--mp'])
if depl.mp_processes is not None:
call.extend([''])


if shared_config.additional_args is not None:
call.extend(shared_config.additional_args)
if (depl.implementation == 'java'
Expand Down
2 changes: 1 addition & 1 deletion python/mtap/events_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, host: str,
if config is None:
config = _config.Config()
options = config.get('grpc.events_options', {})
logger.info("Events service using options " + str(options))
logger.debug("Events service using options " + str(options))
server = grpc.server(thread_pool, options=list(options.items()))
servicer = EventsServicer(instance_id=self.sid)
events_pb2_grpc.add_EventsServicer_to_server(servicer, server)
Expand Down
66 changes: 45 additions & 21 deletions python/mtap/pipeline/_error_handling.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2023 Regents of the University of Minnesota.
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,7 @@
:class:`logging.Logger` object (name ``logging``).
* :class:`ErrorsDirectoryErrorHandler` which for all errors that occur
writes a set of files to disk containing the serialized event, the stack
trace, and the error information (name ``directory``).
trace, and the error information (name ``to_directory``).
* :class:`SuppressAllErrorsHandler` which suppresses all errors
(name ``suppress``).
Expand All @@ -43,11 +43,11 @@
An example of error_handlers configuration in the pipeline file is shown
below.
.. code-block:: text
. code-block:: text
error_handlers:
- name: logging
- name: directory
- name: to_directory
params:
output_directory: '/path/to/'
- name: termination
Expand All @@ -70,9 +70,10 @@
Dict,
Optional,
Union,
ClassVar, Type
ClassVar, Type, Tuple, Iterable
)

import mtap
from mtap._event import Event
from mtap.processing import ErrorInfo
from mtap.serialization import Serializer
Expand All @@ -82,6 +83,16 @@
ErrorHandlerFactory = Callable[..., 'ProcessingErrorHandler']


def handle_error(error_handlers: Iterable[Tuple['ProcessingErrorHandler', Dict]],
event: mtap.Event,
error: ErrorInfo):
for handler, state in error_handlers:
try:
handler.handle_error(event, error, state)
except SuppressError:
break


class StopProcessing(Exception):
"""Thrown by error handlers when the pipeline should immediately
terminate."""
Expand Down Expand Up @@ -179,14 +190,19 @@ def name(cls):

def handle_error(self, event, error_info, state):
from mtap.processing import ErrorOrigin
if error_info.error_repr == "ValueError('Cannot invoke RPC on closed channel!')":
return
if error_info.origin == ErrorOrigin.REMOTE:
print(
f"An error occurred while processing an event with id "
f"'{event.event_id}' through the remote component "
f"'{error_info.component_id}' at address "
f"'{error_info.address}': {error_info.error_repr}\n"
f"It had the following message: {error_info.localized_msg}\n"
f"{self.more_help}")
f"'{error_info.address}': {error_info.grpc_code}")
if error_info.localized_msg:
print(
f"It had the following message: {error_info.localized_msg}"
)
print(f"{self.more_help}", flush=True)
else:
print(
f"An error occurred while processing an event with id "
Expand Down Expand Up @@ -219,7 +235,7 @@ def handle_error(self, _1, _2, state):
state['failures'] += 1
except KeyError:
state['failures'] = 1
if state['failures'] >= self.max_failures:
if state['failures'] > self.max_failures:
print(f"Pipeline exceeded the maximum number "
f"of allowed failures ({self.max_failures}) "
f"and is terminating.")
Expand All @@ -237,26 +253,34 @@ class LoggingErrorHandler(ProcessingErrorHandler):
logger: The logger to use.
"""
logger: logging.Logger

def __init__(self, logger: Union[str, logging.Logger, None] = None):
def __init__(self, logger: Union[str, logging.Logger, None] = None, level='error'):
if isinstance(logger, str):
logger = logging.getLogger(logger)
self.logger = (logger or logging.getLogger('mtap.processing'))
self.logger = getattr(logger or logging.getLogger('mtap.processing'), level)

@classmethod
def name(cls):
return 'logging'

def handle_error(self, event, error_info, state):
self.logger.error(
f"An error occurred while processing an event with id "
f"'{event.event_id}' through the remote component "
f"'{error_info.component_id}' at address "
f"'{error_info.address}': {error_info.error_repr}\n"
f"It had the following message: {error_info.localized_msg}\n"
+ ''.join(error_info.stack_trace)
)
from mtap.processing import ErrorOrigin
if error_info.origin == ErrorOrigin.REMOTE:
msg = (f"An error occurred while processing an event with id "
f"'{event.event_id}' through the remote component "
f"'{error_info.component_id}' at address "
f"'{error_info.address}': {error_info.error_repr}")
if error_info.localized_msg:
msg += f"It had the following message: {error_info.localized_msg}"
msg += ''.join(error_info.stack_trace)
self.logger(msg)
else:
self.logger(
f"An error occurred while processing an event with id "
f"'{event.event_id}' through the component "
f"'{error_info.component_id}': '{error_info.error_repr}'\n"
f"It had the following message: {error_info.localized_msg}\n"
+ ''.join(error_info.stack_trace)
)


class ErrorsDirectoryErrorHandler(ProcessingErrorHandler):
Expand Down
5 changes: 1 addition & 4 deletions python/mtap/pipeline/_hosting.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import argparse
import logging
import threading
import traceback
import uuid
from argparse import ArgumentParser
from concurrent.futures import thread
Expand Down Expand Up @@ -172,13 +171,11 @@ def Process(self, request, context):
with dict_to_event(event_dict, client=self.events) as event:
params = {}
copy_struct_to_dict(request.params, params)
self.pool.wait_for_capacity()
res = self.pool.start_task(event, params)
_, result, error = res.get()
if error is not None:
exc = ProcessingException(error)
logger.error(str(exc))
logger.error(traceback.print_exception(exc))
logger.error(error)
context.abort_with_status(rpc_status.to_status(exc.to_rpc_status()))
return
response = pipeline_pb2.ProcessEventInPipelineResponse()
Expand Down

0 comments on commit 4464a9b

Please sign in to comment.