Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions multicorn_das/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,33 @@ def _raise_unavailable(das_name, das_type, das_url, table_name, cause=None):
raise MulticornException("Server unavailable", detail=json.dumps(error_struct))

@staticmethod
def _raise_unauthenticated(das_name, das_type, das_url, table_name, cause=None):
error_struct = {'code': 'UNAUTHENTICATED', 'das_name': das_name, 'das_type': das_type, 'das_url': das_url, 'table_name': table_name}
def _raise_unauthenticated(message, das_name, das_type, das_url, table_name, cause=None):
error_struct = {'code': 'UNAUTHENTICATED', 'message': message, 'das_name': das_name, 'das_type': das_type, 'das_url': das_url, 'table_name': table_name}
if cause:
error_struct['cause'] = str(cause)
raise MulticornException("Unauthenticated", detail=json.dumps(error_struct))

@staticmethod
def _raise_permission_denied(das_name, das_type, das_url, table_name, cause=None):
error_struct = {'code': 'PERMISSION_DENIED', 'das_name': das_name, 'das_type': das_type, 'das_url': das_url, 'table_name': table_name}
def _raise_permission_denied(message, das_name, das_type, das_url, table_name, cause=None):
error_struct = {'code': 'PERMISSION_DENIED', 'message': message, 'das_name': das_name, 'das_type': das_type, 'das_url': das_url, 'table_name': table_name}
if cause:
error_struct['cause'] = str(cause)
raise MulticornException("Permission denied", detail=json.dumps(error_struct))

@staticmethod
def _raise_invalid_argument(das_name, das_type, das_url, table_name, cause=None):
error_struct = {'code': 'INVALID_ARGUMENT', 'das_name': das_name, 'das_type': das_type, 'das_url': das_url, 'table_name': table_name}
def _raise_invalid_argument(message, das_name, das_type, das_url, table_name, cause=None):
error_struct = {'code': 'INVALID_ARGUMENT', 'message': message, 'das_name': das_name, 'das_type': das_type, 'das_url': das_url, 'table_name': table_name}
if cause:
error_struct['cause'] = str(cause)
raise MulticornException("Invalid argument", detail=json.dumps(error_struct))

@staticmethod
def _raise_unsupported_operation(message, das_name, das_type, das_url, table_name, cause=None):
error_struct = {'code': 'UNSUPPORTED_OPERATION', 'message': message, 'das_name': das_name, 'das_type': das_type, 'das_url': das_url, 'table_name': table_name}
if cause:
error_struct['cause'] = str(cause)
raise MulticornException("Unsupported operation", detail=json.dumps(error_struct))

@staticmethod
def _raise_internal_error(message, cause=None):
error_struct = {'code': 'INTERNAL', 'message': message}
Expand Down Expand Up @@ -220,13 +227,16 @@ def _grpc_call_internal(das_name, das_type, das_url, table_name,
)

if code == grpc.StatusCode.UNAUTHENTICATED:
DASFdw._raise_unauthenticated(das_name, das_type, das_url, table_name, cause=e)
DASFdw._raise_unauthenticated(e.details(), das_name, das_type, das_url, table_name, cause=e)

if code == grpc.StatusCode.PERMISSION_DENIED:
DASFdw._raise_permission_denied(das_name, das_type, das_url, table_name, cause=e)
DASFdw._raise_permission_denied(e.details(), das_name, das_type, das_url, table_name, cause=e)

if code == grpc.StatusCode.INVALID_ARGUMENT:
DASFdw._raise_invalid_argument(das_name, das_type, das_url, table_name, cause=e)
DASFdw._raise_invalid_argument(e.details(), das_name, das_type, das_url, table_name, cause=e)

if code == grpc.StatusCode.UNIMPLEMENTED:
DASFdw._raise_unsupported_operation(e.details(), das_name, das_type, das_url, table_name, cause=e)

# Anything else => generic error
DASFdw._raise_internal_error("gRPC error calling remote DAS server", cause=e)
Expand Down Expand Up @@ -1075,12 +1085,14 @@ def __next__(self):
code = e.code()
if code == grpc.StatusCode.UNAVAILABLE:
DASFdw._raise_unavailable(self._das_name, self._das_type, self._das_url, self._table_name, cause=e)
elif code == grpc.StatusCode.UNIMPLEMENTED:
DASFdw._raise_unsupported_operation(e.details(), self._das_name, self._das_type, self._das_url, self._table_name, cause=e)
elif code == grpc.StatusCode.UNAUTHENTICATED:
DASFdw._raise_unauthenticated(self._das_name, self._das_type, self._das_url, self._table_name, cause=e)
DASFdw._raise_unauthenticated(e.details(), self._das_name, self._das_type, self._das_url, self._table_name, cause=e)
elif code == grpc.StatusCode.PERMISSION_DENIED:
DASFdw._raise_permission_denied(self._das_name, self._das_type, self._das_url, self._table_name, cause=e)
DASFdw._raise_permission_denied(e.details(), self._das_name, self._das_type, self._das_url, self._table_name, cause=e)
elif code == grpc.StatusCode.INVALID_ARGUMENT:
DASFdw._raise_invalid_argument(self._das_name, self._das_type, self._das_url, self._table_name, cause=e)
DASFdw._raise_invalid_argument(e.details(), self._das_name, self._das_type, self._das_url, self._table_name, cause=e)
else:
# Notably, we do not handle NOT_FOUND here, as that should not happen in a stream.
# Instead, we expect it to be handled in a previous call to get_rel_size.
Expand Down