Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added one internal magic to enable retry of session creation. #716

Merged
merged 3 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## NEXT RELEASE
### Features
Added one internal magic to enable retry of session creation.

## 0.18.0

Expand Down
12 changes: 10 additions & 2 deletions sparkmagic/sparkmagic/kernels/kernelmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self, shell, data=None, spark_events=None):
self.language = u""
self.endpoint = None
self.fatal_error = False
self.allow_retry_fatal = False
self.fatal_error_message = u""
if spark_events is None:
spark_events = SparkEvents()
Expand Down Expand Up @@ -347,17 +348,19 @@ def _do_not_call_start_session(self, line, cell="", local_ns=None):
# No need to add the handle_expected_exceptions decorator to this since we manually catch all
# exceptions when starting the session.

if self.fatal_error:
if self.fatal_error and not self.allow_retry_fatal:
self.ipython_display.send_error(self.fatal_error_message)
return False

if not self.session_started:
skip = False
properties = conf.get_session_properties(self.language)
self.session_started = True

try:
self.spark_controller.add_session(self.session_name, self.endpoint, skip, properties)
self.session_started = True
self.fatal_error = False
self.fatal_error_message = u""
except Exception as e:
self.fatal_error = True
self.fatal_error_message = conf.fatal_error_suggestion().format(e)
Expand All @@ -371,6 +374,11 @@ def _do_not_call_start_session(self, line, cell="", local_ns=None):

return self.session_started

@cell_magic
def _do_not_call_allow_retry_fatal(self, line, cell="", local_ns=None):
# enable the flag to retry session creation
self.allow_retry_fatal = True

@cell_magic
@handle_expected_exceptions
def _do_not_call_delete_session(self, line, cell="", local_ns=None):
Expand Down
2 changes: 1 addition & 1 deletion sparkmagic/sparkmagic/livyclientlib/sparkcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def add_session(self, name, endpoint, skip_if_exists, properties):
return
http_client = self._http_client(endpoint)
session = self._livy_session(http_client, properties, self.ipython_display)
self.session_manager.add_session(name, session)
session.start()
self.session_manager.add_session(name, session)
Copy link
Collaborator

@devstein devstein Jun 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edwardps Can you think of any unintended side-effects of only adding the session after it's started? I agree this change makes sense, but it's been this way for many years and I don't know if there are historical reasons for the ordering

cc @itamarst

Copy link
Contributor Author

@edwardps edwardps Jun 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devstein This is a fair callout. If there is timeout issue[1], we might have session leaking. So we can clean up the session for edge case caused by e.g. timeout.
Any suggestion?

        try:
            session.start()
        except:
            if session.is_posted(): session.delete() # is_posted() is decided by self.status != constants.NOT_STARTED_SESSION_STATUS
            raise
        else:
            self.session_manager.add_session(name, session)

[1] - https://github.com/jupyter-incubator/sparkmagic/blob/master/sparkmagic/sparkmagic/livyclientlib/livysession.py#L151

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Devin,

Made a code fix for the session leaking issue. Please let me know if you have any suggestion or comments.
edwardps@b6c89eb

Will update the PR after confirming the solution.

Thanks,
Ed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Ed, thanks for the quick turnaround. The solution looks good to me. Can you add a comment to explain the logic (avoid leaking sessions)?

Also, cc @aggFTW as he was the last to edit this logic and refactored it. @aggFTW Do you see any issues with changing this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. I will add more details in the comment when update this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added info about the session leaking edge case handling in the PR description and git submit.


def get_session_id_for_client(self, name):
return self.session_manager.get_session_id_for_client(name)
Expand Down
64 changes: 61 additions & 3 deletions sparkmagic/sparkmagic/tests/test_kernel_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ def test_start_session_times_out():
ret = magic._do_not_call_start_session(line)

assert not ret
assert magic.session_started
assert not magic.session_started
assert magic.fatal_error
assert_equals(ipython_display.send_error.call_count, 1)

# Call after fatal error
ipython_display.send_error.reset_mock()
ret = magic._do_not_call_start_session(line)
assert not ret
assert magic.session_started
assert not magic.session_started
assert_equals(ipython_display.send_error.call_count, 1)

@with_setup(_setup, _teardown)
Expand Down Expand Up @@ -923,14 +923,72 @@ def test_start_session_displays_fatal_error_when_session_throws():
magic.ipython_display = ipython_display
magic.ipython_display.send_error = MagicMock()

magic._do_not_call_start_session("Test Line")
magic._do_not_call_start_session("")

magic.spark_controller.add_session.assert_called_once_with(magic.session_name, magic.endpoint, False,
{"kind": constants.SESSION_KIND_SPARK})
assert magic.fatal_error
assert magic.fatal_error_message == conf.fatal_error_suggestion().format(str(e))


@with_setup(_setup, _teardown)
def test_start_session_when_retry_fatal_error_is_not_allowed_by_default():
e = ValueError("Failed to create the SqlContext.\nError, '{}'".format("Exception"))
magic.spark_controller.add_session = MagicMock(side_effect=e)
magic.language = constants.LANG_SCALA
magic.ipython_display = ipython_display
magic.ipython_display.send_error = MagicMock()

# first session creation
magic._do_not_call_start_session("")
# retry session creation
magic._do_not_call_start_session("")

# call add_session once and call send_error twice to show the error message
magic.spark_controller.add_session.assert_called_once_with(magic.session_name, magic.endpoint, False,
{"kind": constants.SESSION_KIND_SPARK})
assert_equals(magic.ipython_display.send_error.call_count, 2)


@with_setup(_setup, _teardown)
def test_retry_start_session_when_retry_fatal_error_is_allowed():
e = ValueError("Failed to create the SqlContext.\nError, '{}'".format("Exception"))
magic.spark_controller.add_session = MagicMock(side_effect=e)
magic.language = constants.LANG_SCALA
magic.ipython_display = ipython_display
magic.ipython_display.send_error = MagicMock()
magic.allow_retry_fatal = True

# first session creation - failed
session_created = magic._do_not_call_start_session("")
magic.spark_controller.add_session.assert_called_once_with(magic.session_name, magic.endpoint, False,
{"kind": constants.SESSION_KIND_SPARK})
assert not session_created
assert magic.fatal_error
assert magic.fatal_error_message == conf.fatal_error_suggestion().format(str(e))

# retry session creation - successful
magic.spark_controller.add_session = MagicMock()
session_created = magic._do_not_call_start_session("")
magic.spark_controller.add_session.assert_called_once_with(magic.session_name, magic.endpoint, False,
{"kind": constants.SESSION_KIND_SPARK})
print(session_created)
assert session_created
assert magic.session_started
assert not magic.fatal_error
assert magic.fatal_error_message == u''

# show error message only once
assert magic.ipython_display.send_error.call_count == 1


@with_setup(_setup, _teardown)
def test_allow_retry_fatal():
assert not magic.allow_retry_fatal
magic._do_not_call_allow_retry_fatal("")
assert magic.allow_retry_fatal


@with_setup(_setup, _teardown)
def test_kernel_magics_names():
"""The magics machinery in IPython depends on the docstrings and
Expand Down
1 change: 1 addition & 0 deletions sparkmagic/sparkmagic/tests/test_sparkcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,4 @@ def test_add_session_throws_when_session_start_fails():
except ValueError as ex:
assert str(ex) == str(e)
session.start.assert_called_once()
controller.session_manager.add_session.assert_not_called