Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start session timeouts send fatal error and stop further executions #162

Merged
merged 8 commits into from
Feb 8, 2016
Merged
68 changes: 44 additions & 24 deletions remotespark/kernels/kernelmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def __init__(self, shell, data=None):
self.language = ""
self.url = None
self.connection_string = None
self.fatal_error = False
self.fatal_error_message = ""

@cell_magic
def help(self, line, cell="", local_ns=None):
Expand Down Expand Up @@ -96,7 +98,7 @@ def local(self, line, cell="", local_ns=None):
def info(self, line, cell="", local_ns=None):
self.ipython_display.writeln("Endpoint:\n\t{}\n".format(self.url))

self.ipython_display.writeln("Current session:\n\t{}\n".format(
self.ipython_display.writeln("Current session ID number:\n\t{}\n".format(
self.spark_controller.get_session_id_for_client(self.session_name)))

self.ipython_display.writeln("Session configs:\n\t{}\n".format(conf.get_session_properties(self.language)))
Expand Down Expand Up @@ -145,37 +147,40 @@ def configure(self, line, cell="", local_ns=None):

@cell_magic
def spark(self, line, cell="", local_ns=None):
self._do_not_call_start_session("")

(success, out) = self.spark_controller.run_cell(cell)
if success:
self.ipython_display.write(out)
if self._do_not_call_start_session(""):
(success, out) = self.spark_controller.run_cell(cell)
if success:
self.ipython_display.write(out)
else:
self.ipython_display.send_error(out)
else:
self.ipython_display.send_error(out)
return None

@magic_arguments()
@cell_magic
@needs_local_scope
@argument("-o", "--output", type=str, default=None, help="If present, query will be stored in variable of this "
"name.")
def sql(self, line, cell="", local_ns=None):
self._do_not_call_start_session("")

args = parse_argstring(self.sql, line)
return self.execute_against_context_that_returns_df(self.spark_controller.run_cell_sql, cell,
if self._do_not_call_start_session(""):
args = parse_argstring(self.sql, line)
return self.execute_against_context_that_returns_df(self.spark_controller.run_cell_sql, cell,
None, args.output)
else:
return None

@magic_arguments()
@cell_magic
@needs_local_scope
@argument("-o", "--output", type=str, default=None, help="If present, query will be stored in variable of this "
"name.")
def hive(self, line, cell="", local_ns=None):
self._do_not_call_start_session("")

args = parse_argstring(self.hive, line)
return self.execute_against_context_that_returns_df(self.spark_controller.run_cell_hive, cell,
None, args.output)
if self._do_not_call_start_session(""):
args = parse_argstring(self.hive, line)
return self.execute_against_context_that_returns_df(self.spark_controller.run_cell_hive, cell,
None, args.output)
else:
return None

@magic_arguments()
@cell_magic
Expand All @@ -190,7 +195,7 @@ def cleanup(self, line, cell="", local_ns=None):
self.ipython_display.send_error("When you clean up the endpoint, all sessions will be lost, including the "

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to this PR. It's better to set a string variable with this sentence then print it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use a string and doing whatever you want to do with it, it's better to save the string first in a new variable then use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. I wouldn't say that as a blank statement.

"one used for this notebook. Include the -f parameter if that's your "
"intention.")
return
return None

@magic_arguments()
@cell_magic
Expand All @@ -206,23 +211,38 @@ def delete(self, line, cell="", local_ns=None):
self.ipython_display.send_error("Cannot delete this kernel's session ({}). Specify a different session,"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same applied here.

" shutdown the kernel to delete this session, or run %cleanup to "
"delete all sessions for this endpoint.".format(id))
return
return None

self.spark_controller.delete_session_by_id(self.connection_string, session)
else:
self.ipython_display.send_error("Include the -f parameter if you understand that all statements executed"
"in this session will be lost.")
return
return None

@cell_magic
def _do_not_call_start_session(self, line, cell="", local_ns=None):
if not self.session_started:
self.session_started = True
# Starts a session unless session is already created or a fatal error occurred. Returns True when session is
# created successfully.

if self.fatal_error:
self.ipython_display.send_error(self.fatal_error_message)
return False

if not self.session_started:
skip = False
properties = conf.get_session_properties(self.language)

self.spark_controller.add_session(self.session_name, self.connection_string, skip, properties)
try:
self.session_started = True
self.spark_controller.add_session(self.session_name, self.connection_string, skip, properties)
except Exception as e:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave an extra line before this line for readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not PEP

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's "PEP"? Generally, this will help the code reader to identify whether this is a different statement separated from the try or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you search for it? That's what indentation is for.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that works.

self.fatal_error = True
self.fatal_error_message = conf.fatal_error_suggestion().format(e)
self.logger.error("Error creating session: {}".format(e))
self.ipython_display.send_error(self.fatal_error_message)
return False

return self.session_started

@cell_magic
def _do_not_call_delete_session(self, line, cell="", local_ns=None):
Expand All @@ -239,11 +259,11 @@ def _do_not_call_change_language(self, line, cell="", local_ns=None):

if language not in Constants.lang_supported:
self.ipython_display.send_error("'{}' language not supported in kernel magics.".format(language))
return
return None

if self.session_started:
self.ipython_display.send_error("Cannot change the language if a session has been started.")
return
return None

self.language = language
self.refresh_configuration()
Expand Down
2 changes: 1 addition & 1 deletion remotespark/livyclientlib/livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def wait_for_idle(self, seconds_to_wait):
return

if current_status in Constants.final_status:
error = "Session {} unexpectedly reached final status {}. See logs:\n{}"\
error = "Session {} unexpectedly reached final status '{}'. See logs:\n{}"\
.format(self.id, current_status, self.logs)
self.logger.error(error)
raise LivyUnexpectedStatusError(error)
Expand Down
75 changes: 71 additions & 4 deletions tests/test_kernel_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from nose.tools import with_setup, raises, assert_equals
from IPython.core.magic import magics_class

import remotespark.utils.configuration as conf
from remotespark.kernels.kernelmagics import KernelMagics
from remotespark.livyclientlib.livyclienttimeouterror import LivyClientTimeoutError

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-arrange alphabetically. They should be:
from remotespark.kernels.kernelmagics import KernelMagics
from remotespark.livyclientlib.livyclienttimeouterror import LivyClientTimeoutError
from remotespark.utils.constants import Constants
import remotespark.utils.configuration as conf

from remotespark.utils.constants import Constants
import remotespark.utils.configuration as conf

magic = None
spark_controller = None
Expand Down Expand Up @@ -53,18 +54,41 @@ def test_start_session():
line = ""
assert not magic.session_started

magic._do_not_call_start_session(line)
ret = magic._do_not_call_start_session(line)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the variable name to be more meaningful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaningful enough. It's a unit test.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What "ret" stands for? Even if it's a unit test, best practice to have your test code is easy to update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's short for whatever is returned. Another very popular option is ret_v. Usually used in places where it's not really that meaningful, like a unit test.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually I prefer to have a meaningful variable name even if it's in unit test, this will help any new person to this code section to get the intent of the test easily.


assert ret
assert magic.session_started
spark_controller.add_session.assert_called_once_with(magic.session_name, magic.connection_string, False,
{"kind": Constants.session_kind_pyspark})

# Call a second time
magic._do_not_call_start_session(line)
ret = magic._do_not_call_start_session(line)
assert ret
assert magic.session_started
assert spark_controller.add_session.call_count == 1


@with_setup(_setup, _teardown)
def test_start_session_times_out():
line = ""
spark_controller.add_session = MagicMock(side_effect=LivyClientTimeoutError)
assert not magic.session_started

ret = magic._do_not_call_start_session(line)

assert not ret
assert magic.session_started
assert magic.fatal_error
assert_equals(ipython_display.send_error.call_count, 1)

# Call after fatal error
ipython_display.send_error.reset_mock()
ret = magic._do_not_call_start_session(line)
assert not ret
assert magic.session_started
assert_equals(ipython_display.send_error.call_count, 1)


@with_setup(_setup, _teardown)
def test_delete_session():
line = ""
Expand Down Expand Up @@ -195,7 +219,7 @@ def test_get_session_settings():
assert magic.get_session_settings(" something", False) == "something"
assert magic.get_session_settings("-f something", True) == "something"
assert magic.get_session_settings("something -f", True) == "something"
assert magic.get_session_settings("something", True) == None
assert magic.get_session_settings("something", True) is None


@with_setup(_setup, _teardown)
Expand Down Expand Up @@ -225,6 +249,21 @@ def test_spark_error():
{"kind": Constants.session_kind_pyspark})
spark_controller.run_cell.assert_called_once_with(cell)


@with_setup(_setup, _teardown)
def test_spark_failed_session_start():
line = ""
cell = "some spark code"
magic._do_not_call_start_session = MagicMock(return_value=False)

ret = magic.spark(line, cell)

assert_equals(ret, None)
assert_equals(ipython_display.write.call_count, 0)
assert_equals(spark_controller.add_session.call_count, 0)
assert_equals(spark_controller.run_cell.call_count, 0)


@with_setup(_setup, _teardown)
def test_sql_without_output():
line = ""
Expand All @@ -238,6 +277,7 @@ def test_sql_without_output():
magic.execute_against_context_that_returns_df.assert_called_once_with(spark_controller.run_cell_sql, cell, None,
None)


@with_setup(_setup, _teardown)
def test_sql_with_output():
line = "-o my_var"
Expand All @@ -252,6 +292,19 @@ def test_sql_with_output():
"my_var")


@with_setup(_setup, _teardown)
def test_sql_failed_session_start():
line = ""
cell = "some spark code"
magic._do_not_call_start_session = MagicMock(return_value=False)

ret = magic.sql(line, cell)

assert_equals(ret, None)
assert_equals(spark_controller.add_session.call_count, 0)
assert_equals(spark_controller.execute_against_context_that_returns_df.call_count, 0)


@with_setup(_setup, _teardown)
def test_hive_without_output():
line = ""
Expand All @@ -265,6 +318,7 @@ def test_hive_without_output():
magic.execute_against_context_that_returns_df.assert_called_once_with(spark_controller.run_cell_hive, cell, None,
None)


@with_setup(_setup, _teardown)
def test_hive_with_output():
line = "-o my_var"
Expand All @@ -279,6 +333,19 @@ def test_hive_with_output():
"my_var")


@with_setup(_setup, _teardown)
def test_hive_failed_session_start():
line = ""
cell = "some spark code"
magic._do_not_call_start_session = MagicMock(return_value=False)

ret = magic.hive(line, cell)

assert_equals(ret, None)
assert_equals(spark_controller.add_session.call_count, 0)
assert_equals(spark_controller.execute_against_context_that_returns_df.call_count, 0)


@with_setup(_setup, _teardown)
def test_cleanup_without_force():
line = ""
Expand Down