Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added one internal magic to enable retry of session creation. #716

Merged
merged 3 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## NEXT RELEASE
### Features
Added one internal magic to enable retry of session creation.

* New `%%pretty` magic for pretty printing a dataframe as an HTML table. Thanks @hegary
* Update Endpoint widget to shield passwords when entering them in the ipywidget. Thanks @J0rg3M3nd3z @jodom961
Expand Down
12 changes: 10 additions & 2 deletions sparkmagic/sparkmagic/kernels/kernelmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, shell, data=None, spark_events=None):
self.language = u""
self.endpoint = None
self.fatal_error = False
self.allow_retry_fatal = False
self.fatal_error_message = u""
if spark_events is None:
spark_events = SparkEvents()
Expand Down Expand Up @@ -375,17 +376,19 @@ def _do_not_call_start_session(self, line, cell="", local_ns=None):
# No need to add the handle_expected_exceptions decorator to this since we manually catch all
# exceptions when starting the session.

if self.fatal_error:
if self.fatal_error and not self.allow_retry_fatal:
self.ipython_display.send_error(self.fatal_error_message)
return False

if not self.session_started:
skip = False
properties = conf.get_session_properties(self.language)
self.session_started = True

try:
self.spark_controller.add_session(self.session_name, self.endpoint, skip, properties)
self.session_started = True
self.fatal_error = False
self.fatal_error_message = u""
except Exception as e:
self.fatal_error = True
self.fatal_error_message = conf.fatal_error_suggestion().format(e)
Expand All @@ -399,6 +402,11 @@ def _do_not_call_start_session(self, line, cell="", local_ns=None):

return self.session_started

@cell_magic
def _do_not_call_allow_retry_fatal(self, line, cell="", local_ns=None):
# enable the flag to retry session creation
self.allow_retry_fatal = True

@cell_magic
@handle_expected_exceptions
def _do_not_call_delete_session(self, line, cell="", local_ns=None):
Expand Down
3 changes: 3 additions & 0 deletions sparkmagic/sparkmagic/livyclientlib/livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ def endpoint(self):
def is_final_status(status):
return status in constants.FINAL_STATUS

def is_posted(self):
return self.status != constants.NOT_STARTED_SESSION_STATUS

def delete(self):
session_id = self.id
self._spark_events.emit_session_deletion_start_event(self.guid, self.kind, session_id, self.status)
Expand Down
12 changes: 10 additions & 2 deletions sparkmagic/sparkmagic/livyclientlib/sparkcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,16 @@ def add_session(self, name, endpoint, skip_if_exists, properties):
return
http_client = self._http_client(endpoint)
session = self._livy_session(http_client, properties, self.ipython_display)
self.session_manager.add_session(name, session)
session.start()

try:
session.start()
except:
if session.is_posted():
session.delete()
raise
else:
self.session_manager.add_session(name, session)


def get_session_id_for_client(self, name):
return self.session_manager.get_session_id_for_client(name)
Expand Down
64 changes: 61 additions & 3 deletions sparkmagic/sparkmagic/tests/test_kernel_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ def test_start_session_times_out():
ret = magic._do_not_call_start_session(line)

assert not ret
assert magic.session_started
assert not magic.session_started
assert magic.fatal_error
assert_equals(ipython_display.send_error.call_count, 1)

# Call after fatal error
ipython_display.send_error.reset_mock()
ret = magic._do_not_call_start_session(line)
assert not ret
assert magic.session_started
assert not magic.session_started
assert_equals(ipython_display.send_error.call_count, 1)

@with_setup(_setup, _teardown)
Expand Down Expand Up @@ -923,14 +923,72 @@ def test_start_session_displays_fatal_error_when_session_throws():
magic.ipython_display = ipython_display
magic.ipython_display.send_error = MagicMock()

magic._do_not_call_start_session("Test Line")
magic._do_not_call_start_session("")

magic.spark_controller.add_session.assert_called_once_with(magic.session_name, magic.endpoint, False,
{"kind": constants.SESSION_KIND_SPARK})
assert magic.fatal_error
assert magic.fatal_error_message == conf.fatal_error_suggestion().format(str(e))


@with_setup(_setup, _teardown)
def test_start_session_when_retry_fatal_error_is_not_allowed_by_default():
e = ValueError("Failed to create the SqlContext.\nError, '{}'".format("Exception"))
magic.spark_controller.add_session = MagicMock(side_effect=e)
magic.language = constants.LANG_SCALA
magic.ipython_display = ipython_display
magic.ipython_display.send_error = MagicMock()

# first session creation
magic._do_not_call_start_session("")
# retry session creation
magic._do_not_call_start_session("")

# call add_session once and call send_error twice to show the error message
magic.spark_controller.add_session.assert_called_once_with(magic.session_name, magic.endpoint, False,
{"kind": constants.SESSION_KIND_SPARK})
assert_equals(magic.ipython_display.send_error.call_count, 2)


@with_setup(_setup, _teardown)
def test_retry_start_session_when_retry_fatal_error_is_allowed():
e = ValueError("Failed to create the SqlContext.\nError, '{}'".format("Exception"))
magic.spark_controller.add_session = MagicMock(side_effect=e)
magic.language = constants.LANG_SCALA
magic.ipython_display = ipython_display
magic.ipython_display.send_error = MagicMock()
magic.allow_retry_fatal = True

# first session creation - failed
session_created = magic._do_not_call_start_session("")
magic.spark_controller.add_session.assert_called_once_with(magic.session_name, magic.endpoint, False,
{"kind": constants.SESSION_KIND_SPARK})
assert not session_created
assert magic.fatal_error
assert magic.fatal_error_message == conf.fatal_error_suggestion().format(str(e))

# retry session creation - successful
magic.spark_controller.add_session = MagicMock()
session_created = magic._do_not_call_start_session("")
magic.spark_controller.add_session.assert_called_once_with(magic.session_name, magic.endpoint, False,
{"kind": constants.SESSION_KIND_SPARK})
print(session_created)
assert session_created
assert magic.session_started
assert not magic.fatal_error
assert magic.fatal_error_message == u''

# show error message only once
assert magic.ipython_display.send_error.call_count == 1


@with_setup(_setup, _teardown)
def test_allow_retry_fatal():
assert not magic.allow_retry_fatal
magic._do_not_call_allow_retry_fatal("")
assert magic.allow_retry_fatal


@with_setup(_setup, _teardown)
def test_kernel_magics_names():
"""The magics machinery in IPython depends on the docstrings and
Expand Down
10 changes: 10 additions & 0 deletions sparkmagic/sparkmagic/tests/test_livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,3 +623,13 @@ def test_spark_session_and_sql_context_unavailable(self):
self.http_client.get_statement.return_value = self.ready_statement_failed_json
session = self._create_session()
session.start()

def test_is_posted(self):
self.http_client.post_session.return_value = self.session_create_json
self.http_client.get_session.return_value = self.ready_sessions_json
self.http_client.get_statement.return_value = self.ready_statement_json

session = self._create_session()
assert not session.is_posted()
session.start()
assert session.is_posted()
74 changes: 69 additions & 5 deletions sparkmagic/sparkmagic/tests/test_sparkcontroller.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from mock import MagicMock, patch
from nose.tools import with_setup, assert_equals, raises
import json

from sparkmagic.livyclientlib.sparkcontroller import SparkController
from mock import MagicMock
from nose.tools import with_setup, assert_equals, raises
from sparkmagic.livyclientlib.endpoint import Endpoint
from sparkmagic.livyclientlib.exceptions import SessionManagementException, HttpClientException
import sparkmagic.utils.configuration as conf

from sparkmagic.livyclientlib.sparkcontroller import SparkController

client_manager = None
controller = None
Expand Down Expand Up @@ -259,3 +257,69 @@ def test_add_session_throws_when_session_start_fails():
except ValueError as ex:
assert str(ex) == str(e)
session.start.assert_called_once()
controller.session_manager.add_session.assert_not_called

@with_setup(_setup, _teardown)
def test_add_session_cleanup_when_timeouts_and_session_posted_to_livy():
pass

@with_setup(_setup, _teardown)
def test_add_session_cleanup_when_timeouts_and_session_posted_to_livy():
_do_test_add_session_cleanup_when_timeouts(is_session_posted_to_livy=True)

@with_setup(_setup, _teardown)
def test_add_session_cleanup_when_timeouts_and_session_not_posted_to_livy():
_do_test_add_session_cleanup_when_timeouts(is_session_posted_to_livy=False)

def _do_test_add_session_cleanup_when_timeouts(is_session_posted_to_livy):
name = "name"
properties = {"kind": "spark"}
endpoint = Endpoint("http://location:port", None)
session = MagicMock()

controller._livy_session = MagicMock(return_value=session)
controller._http_client = MagicMock(return_value=MagicMock())
e = RuntimeError("Time out while post session to livy")
session.start = MagicMock(side_effect=e)

if is_session_posted_to_livy:
session.is_posted = MagicMock(return_value=True)
else:
session.is_posted = MagicMock(return_value=False)

try:
controller.add_session(name, endpoint, False, properties)
assert False
except RuntimeError as ex:
assert str(ex) == str(e)
session.start.assert_called_once()
controller.session_manager.add_session.assert_not_called

if is_session_posted_to_livy:
session.delete.assert_called_once()
else:
session.delete.assert_not_called()

@with_setup(_setup, _teardown)
def test_add_session_cleanup_when_session_delete_throws():
name = "name"
properties = {"kind": "spark"}
endpoint = Endpoint("http://location:port", None)
session = MagicMock()

controller._livy_session = MagicMock(return_value=session)
controller._http_client = MagicMock(return_value=MagicMock())
e = RuntimeError("Time out while post session to livy")
session.start = MagicMock(side_effect=e)
session.is_posted = MagicMock(return_value=True)

error_from_cleanup = RuntimeError("Error while clean up session")
session.delete = MagicMock(side_effect=error_from_cleanup)
try:
controller.add_session(name, endpoint, False, properties)
assert False
except Exception as ex:
# in the exception chain mechanism, original exception will be set as context.
assert str(ex) == str(error_from_cleanup)
assert str(ex.__context__) == str(e)
controller.session_manager.add_session.assert_not_called