-
Notifications
You must be signed in to change notification settings - Fork 439
/
exceptions.py
226 lines (174 loc) · 7.95 KB
/
exceptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import asyncio
import sys
import traceback
from sparkmagic.utils.constants import EXPECTED_ERROR_MSG, INTERNAL_ERROR_MSG
# == EXCEPTIONS ==
class LivyClientLibException(Exception):
"""Base class for all LivyClientLib exceptions. All exceptions that are
explicitly raised by code in this package should be a subclass of
LivyClientLibException. If you need to account for a new error condition,
either use one of the existing LivyClientLibException subclasses, or create
a new subclass with a descriptive name and add it to this file.
We distinguish between "expected" errors, which represent errors
that a user is likely to encounter in normal use, and "internal"
errors, which represents exceptions that happen due to a bug in the
library. Check EXPECTED_EXCEPTIONS to see which exceptions are
considered "expected".
"""
class HttpClientException(LivyClientLibException):
"""An exception thrown by the HTTP client when it fails to make a
request."""
class LivyClientTimeoutException(LivyClientLibException):
"""An exception for timeouts while interacting with Livy."""
class DataFrameParseException(LivyClientLibException):
"""An internal error which suggests a bad implementation of dataframe
parsing from JSON -- if we get a JSON parsing error when parsing the
results from the Livy server, this exception is thrown."""
class LivyUnexpectedStatusException(LivyClientLibException):
"""An exception that will be shown if some unexpected error happens on the
Livy side."""
class SessionManagementException(LivyClientLibException):
"""An exception that is thrown by the Session Manager when it is a given
session name is invalid in some way."""
class BadUserConfigurationException(LivyClientLibException):
"""An exception that is thrown when configuration provided by the user is
invalid in some way."""
class BadUserDataException(LivyClientLibException):
"""An exception that is thrown when data provided by the user is invalid in
some way."""
class SqlContextNotFoundException(LivyClientLibException):
"""Exception that is thrown when the SQL context is not found."""
class SparkStatementException(LivyClientLibException):
"""Exception that is thrown when an error occurs while parsing or executing
Spark statements."""
class HttpSessionAdapterConfigException(LivyClientLibException):
"""Exception that is thrown when the http session adapter config is invalid"""
# It has to be a KeyboardInterrupt to interrupt the notebook
class SparkStatementCancelledException(KeyboardInterrupt):
"""Exception that is thrown when a Spark statement is cancelled."""
_orig_show_tb = None
def __init__(self, msg):
self.msg = msg
# Patch _showtraceback() to avoid printing the traceback
ipython = get_ipython()
if SparkStatementCancelledException._orig_show_tb is None:
SparkStatementCancelledException._orig_show_tb = ipython._showtraceback
ipython._showtraceback = SparkStatementCancelledException._show_tb
def __str__(self):
return self.msg
@staticmethod
def _show_tb(exc_type, exc_val, tb):
if exc_type == SparkStatementCancelledException:
print(exc_val, file=sys.stderr)
else:
SparkStatementCancelledException._orig_show_tb(exc_type, exc_val, tb)
class SparkStatementCancellationFailedException(KeyboardInterrupt):
"""Exception that is thrown when a Spark statement is interrupted but fails
to be cancelled in Livy."""
# == DECORATORS FOR EXCEPTION HANDLING ==
EXPECTED_EXCEPTIONS = [
BadUserConfigurationException,
BadUserDataException,
LivyUnexpectedStatusException,
SqlContextNotFoundException,
HttpClientException,
LivyClientTimeoutException,
SessionManagementException,
SparkStatementException,
]
def handle_expected_exceptions(f):
"""A decorator that handles expected exceptions. Self can be any object with
an "ipython_display" attribute.
Usage:
@handle_expected_exceptions
def fn(self, ...):
etc..."""
from sparkmagic.utils import configuration as conf
exceptions_to_handle = tuple(EXPECTED_EXCEPTIONS)
# Notice that we're NOT handling e.DataFrameParseException here. That's because DataFrameParseException
# is an internal error that suggests something is wrong with LivyClientLib's implementation.
def wrapped(self, *args, **kwargs):
try:
out = f(self, *args, **kwargs)
except exceptions_to_handle as err:
if conf.all_errors_are_fatal():
raise err
# Do not log! as some messages may contain private client information
self.ipython_display.send_error(EXPECTED_ERROR_MSG.format(err))
return None
else:
return out
wrapped.__name__ = f.__name__
wrapped.__doc__ = f.__doc__
return wrapped
def wrap_unexpected_exceptions(f, execute_if_error=None):
"""A decorator that catches all exceptions from the function f and alerts the user about them.
Self can be any object with a "logger" attribute and a "ipython_display" attribute.
All exceptions are logged as "unexpected" exceptions, and a request is made to the user to file an issue
at the Github repository. If there is an error, returns None if execute_if_error is None, or else
returns the output of the function execute_if_error.
Usage:
@wrap_unexpected_exceptions
def fn(self, ...):
..etc"""
from sparkmagic.utils import configuration as conf
def handle_exception(self, e):
self.logger.error(
"ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format(
e, traceback.format_exc()
)
)
self.ipython_display.send_error(INTERNAL_ERROR_MSG.format(e))
return None if execute_if_error is None else execute_if_error()
def wrapped(self, *args, **kwargs):
try:
out = f(self, *args, **kwargs)
except Exception as err:
if conf.all_errors_are_fatal():
raise err
return handle_exception(self, err)
else:
return out
wrapped.__name__ = f.__name__
wrapped.__doc__ = f.__doc__
return wrapped
# async_wrap_unexpected_exceptions was created to handle async behavior ipykernel >=6
# It was safer to create a separate async wrapper than modify the original to accommodate both use-cases
def async_wrap_unexpected_exceptions(f, execute_if_error=None):
"""A decorator that catches all exceptions from the async function f and alerts the user about them.
Self can be any object with a "logger" attribute and a "ipython_display" attribute.
All exceptions are logged as "unexpected" exceptions, and a request is made to the user to file an issue
at the Github repository. If there is an error, returns None if execute_if_error is None, or else
returns the output of the function execute_if_error.
Usage:
@async_wrap_unexpected_exceptions
async def fn(self, ...):
..etc"""
from sparkmagic.utils import configuration as conf
async def handle_exception(self, e):
self.logger.error(
"ENCOUNTERED AN INTERNAL ERROR: {}\n\tTraceback:\n{}".format(
e, traceback.format_exc()
)
)
self.ipython_display.send_error(INTERNAL_ERROR_MSG.format(e))
if execute_if_error is None:
return None
result = execute_if_error()
if asyncio.iscoroutine(result):
return await result
return result
async def wrapped(self, *args, **kwargs):
try:
out = f(self, *args, **kwargs)
if asyncio.iscoroutine(out):
out = await out
except Exception as err:
if conf.all_errors_are_fatal():
raise err
return await handle_exception(self, err)
else:
return out
wrapped.__name__ = f.__name__
wrapped.__doc__ = f.__doc__
return wrapped