From a692aace9bfc32ef49c648f7a1656dadec9b0977 Mon Sep 17 00:00:00 2001 From: Alejandro Guerrero Gonzalez Date: Mon, 8 Feb 2016 13:54:48 -0800 Subject: [PATCH 1/7] Start session failures send fatal error and stop further executions --- remotespark/kernels/kernelmagics.py | 56 +++++++++++++++------- tests/test_kernel_magics.py | 73 +++++++++++++++++++++++++++-- 2 files changed, 108 insertions(+), 21 deletions(-) diff --git a/remotespark/kernels/kernelmagics.py b/remotespark/kernels/kernelmagics.py index 5f1deaba0..ee80de0cc 100644 --- a/remotespark/kernels/kernelmagics.py +++ b/remotespark/kernels/kernelmagics.py @@ -14,6 +14,7 @@ from remotespark.utils.constants import Constants from remotespark.magics.sparkmagicsbase import SparkMagicBase from remotespark.utils.utils import get_connection_string +from remotespark.livyclientlib.livyclienttimeouterror import LivyClientTimeoutError @magics_class @@ -30,6 +31,8 @@ def __init__(self, shell, data=None): self.url = None self.connection_string = None + self.fatal_error = False + @cell_magic def help(self, line, cell="", local_ns=None): help_html = """ @@ -145,13 +148,14 @@ def configure(self, line, cell="", local_ns=None): @cell_magic def spark(self, line, cell="", local_ns=None): - self._do_not_call_start_session("") - - (success, out) = self.spark_controller.run_cell(cell) - if success: - self.ipython_display.write(out) + if self._do_not_call_start_session(""): + (success, out) = self.spark_controller.run_cell(cell) + if success: + self.ipython_display.write(out) + else: + self.ipython_display.send_error(out) else: - self.ipython_display.send_error(out) + return "" @magic_arguments() @cell_magic @@ -159,11 +163,12 @@ def spark(self, line, cell="", local_ns=None): @argument("-o", "--output", type=str, default=None, help="If present, query will be stored in variable of this " "name.") def sql(self, line, cell="", local_ns=None): - self._do_not_call_start_session("") - - args = parse_argstring(self.sql, line) - return self.execute_against_context_that_returns_df(self.spark_controller.run_cell_sql, cell, + if self._do_not_call_start_session(""): + args = parse_argstring(self.sql, line) + return self.execute_against_context_that_returns_df(self.spark_controller.run_cell_sql, cell, None, args.output) + else: + return "" @magic_arguments() @cell_magic @@ -171,11 +176,12 @@ def sql(self, line, cell="", local_ns=None): @argument("-o", "--output", type=str, default=None, help="If present, query will be stored in variable of this " "name.") def hive(self, line, cell="", local_ns=None): - self._do_not_call_start_session("") - - args = parse_argstring(self.hive, line) - return self.execute_against_context_that_returns_df(self.spark_controller.run_cell_hive, cell, - None, args.output) + if self._do_not_call_start_session(""): + args = parse_argstring(self.hive, line) + return self.execute_against_context_that_returns_df(self.spark_controller.run_cell_hive, cell, + None, args.output) + else: + return "" @magic_arguments() @cell_magic @@ -216,13 +222,27 @@ def delete(self, line, cell="", local_ns=None): @cell_magic def _do_not_call_start_session(self, line, cell="", local_ns=None): - if not self.session_started: - self.session_started = True + # Starts a session unless session is already created or a fatal error occurred. Returns True when session is + # created successfully. + if self.fatal_error: + self.ipython_display.send_error(conf.fatal_error_suggestion()) + return False + + if not self.session_started: skip = False properties = conf.get_session_properties(self.language) - self.spark_controller.add_session(self.session_name, self.connection_string, skip, properties) + try: + self.spark_controller.add_session(self.session_name, self.connection_string, skip, properties) + self.session_started = True + except LivyClientTimeoutError as e: + self.fatal_error = True + self.logger.error("Timeout creating session: {}".format(e)) + self.ipython_display.send_error(conf.fatal_error_suggestion()) + return False + + return self.session_started @cell_magic def _do_not_call_delete_session(self, line, cell="", local_ns=None): diff --git a/tests/test_kernel_magics.py b/tests/test_kernel_magics.py index 1835c725c..c4ba58dfc 100644 --- a/tests/test_kernel_magics.py +++ b/tests/test_kernel_magics.py @@ -5,6 +5,7 @@ import remotespark.utils.configuration as conf from remotespark.kernels.kernelmagics import KernelMagics from remotespark.utils.constants import Constants +from remotespark.livyclientlib.livyclienttimeouterror import LivyClientTimeoutError magic = None spark_controller = None @@ -53,18 +54,41 @@ def test_start_session(): line = "" assert not magic.session_started - magic._do_not_call_start_session(line) + ret = magic._do_not_call_start_session(line) + assert ret assert magic.session_started spark_controller.add_session.assert_called_once_with(magic.session_name, magic.connection_string, False, {"kind": Constants.session_kind_pyspark}) # Call a second time - magic._do_not_call_start_session(line) + ret = magic._do_not_call_start_session(line) + assert ret assert magic.session_started assert spark_controller.add_session.call_count == 1 +@with_setup(_setup, _teardown) +def test_start_session_times_out(): + line = "" + spark_controller.add_session = MagicMock(side_effect=LivyClientTimeoutError) + assert not magic.session_started + + ret = magic._do_not_call_start_session(line) + + assert not ret + assert not magic.session_started + assert magic.fatal_error + assert_equals(ipython_display.send_error.call_count, 1) + + # Call after fatal error + ipython_display.send_error.reset_mock() + ret = magic._do_not_call_start_session(line) + assert not ret + assert not magic.session_started + assert_equals(ipython_display.send_error.call_count, 1) + + @with_setup(_setup, _teardown) def test_delete_session(): line = "" @@ -195,7 +219,7 @@ def test_get_session_settings(): assert magic.get_session_settings(" something", False) == "something" assert magic.get_session_settings("-f something", True) == "something" assert magic.get_session_settings("something -f", True) == "something" - assert magic.get_session_settings("something", True) == None + assert magic.get_session_settings("something", True) is None @with_setup(_setup, _teardown) @@ -225,6 +249,21 @@ def test_spark_error(): {"kind": Constants.session_kind_pyspark}) spark_controller.run_cell.assert_called_once_with(cell) + +@with_setup(_setup, _teardown) +def test_spark_failed_session_start(): + line = "" + cell = "some spark code" + magic._do_not_call_start_session = MagicMock(return_value=False) + + ret = magic.spark(line, cell) + + assert_equals(ret, "") + assert_equals(ipython_display.write.call_count, 0) + assert_equals(spark_controller.add_session.call_count, 0) + assert_equals(spark_controller.run_cell.call_count, 0) + + @with_setup(_setup, _teardown) def test_sql_without_output(): line = "" @@ -238,6 +277,7 @@ def test_sql_without_output(): magic.execute_against_context_that_returns_df.assert_called_once_with(spark_controller.run_cell_sql, cell, None, None) + @with_setup(_setup, _teardown) def test_sql_with_output(): line = "-o my_var" @@ -252,6 +292,19 @@ def test_sql_with_output(): "my_var") +@with_setup(_setup, _teardown) +def test_sql_failed_session_start(): + line = "" + cell = "some spark code" + magic._do_not_call_start_session = MagicMock(return_value=False) + + ret = magic.sql(line, cell) + + assert_equals(ret, "") + assert_equals(spark_controller.add_session.call_count, 0) + assert_equals(spark_controller.execute_against_context_that_returns_df.call_count, 0) + + @with_setup(_setup, _teardown) def test_hive_without_output(): line = "" @@ -265,6 +318,7 @@ def test_hive_without_output(): magic.execute_against_context_that_returns_df.assert_called_once_with(spark_controller.run_cell_hive, cell, None, None) + @with_setup(_setup, _teardown) def test_hive_with_output(): line = "-o my_var" @@ -279,6 +333,19 @@ def test_hive_with_output(): "my_var") +@with_setup(_setup, _teardown) +def test_hive_failed_session_start(): + line = "" + cell = "some spark code" + magic._do_not_call_start_session = MagicMock(return_value=False) + + ret = magic.hive(line, cell) + + assert_equals(ret, "") + assert_equals(spark_controller.add_session.call_count, 0) + assert_equals(spark_controller.execute_against_context_that_returns_df.call_count, 0) + + @with_setup(_setup, _teardown) def test_cleanup_without_force(): line = "" From 7e95f6cb3fe18e985d7b3e96043bb3af4be8dda5 Mon Sep 17 00:00:00 2001 From: Alejandro Guerrero Gonzalez Date: Mon, 8 Feb 2016 13:57:14 -0800 Subject: [PATCH 2/7] Provide ID in info --- remotespark/kernels/kernelmagics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/remotespark/kernels/kernelmagics.py b/remotespark/kernels/kernelmagics.py index ee80de0cc..318d9333f 100644 --- a/remotespark/kernels/kernelmagics.py +++ b/remotespark/kernels/kernelmagics.py @@ -99,7 +99,7 @@ def local(self, line, cell="", local_ns=None): def info(self, line, cell="", local_ns=None): self.ipython_display.writeln("Endpoint:\n\t{}\n".format(self.url)) - self.ipython_display.writeln("Current session:\n\t{}\n".format( + self.ipython_display.writeln("Current session ID number:\n\t{}\n".format( self.spark_controller.get_session_id_for_client(self.session_name))) self.ipython_display.writeln("Session configs:\n\t{}\n".format(conf.get_session_properties(self.language))) From fdcbbbfe404a71806cece64e391da908979f14a0 Mon Sep 17 00:00:00 2001 From: Alejandro Guerrero Gonzalez Date: Mon, 8 Feb 2016 14:16:40 -0800 Subject: [PATCH 3/7] Catch all exceptions for session start problems --- remotespark/kernels/kernelmagics.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/remotespark/kernels/kernelmagics.py b/remotespark/kernels/kernelmagics.py index 318d9333f..a01e64670 100644 --- a/remotespark/kernels/kernelmagics.py +++ b/remotespark/kernels/kernelmagics.py @@ -14,7 +14,6 @@ from remotespark.utils.constants import Constants from remotespark.magics.sparkmagicsbase import SparkMagicBase from remotespark.utils.utils import get_connection_string -from remotespark.livyclientlib.livyclienttimeouterror import LivyClientTimeoutError @magics_class @@ -196,7 +195,7 @@ def cleanup(self, line, cell="", local_ns=None): self.ipython_display.send_error("When you clean up the endpoint, all sessions will be lost, including the " "one used for this notebook. Include the -f parameter if that's your " "intention.") - return + return "" @magic_arguments() @cell_magic @@ -212,13 +211,13 @@ def delete(self, line, cell="", local_ns=None): self.ipython_display.send_error("Cannot delete this kernel's session ({}). Specify a different session," " shutdown the kernel to delete this session, or run %cleanup to " "delete all sessions for this endpoint.".format(id)) - return + return "" self.spark_controller.delete_session_by_id(self.connection_string, session) else: self.ipython_display.send_error("Include the -f parameter if you understand that all statements executed" "in this session will be lost.") - return + return "" @cell_magic def _do_not_call_start_session(self, line, cell="", local_ns=None): @@ -226,7 +225,7 @@ def _do_not_call_start_session(self, line, cell="", local_ns=None): # created successfully. if self.fatal_error: - self.ipython_display.send_error(conf.fatal_error_suggestion()) + self.ipython_display.send_error(self.fatal_error_message) return False if not self.session_started: @@ -234,12 +233,13 @@ def _do_not_call_start_session(self, line, cell="", local_ns=None): properties = conf.get_session_properties(self.language) try: - self.spark_controller.add_session(self.session_name, self.connection_string, skip, properties) self.session_started = True - except LivyClientTimeoutError as e: + self.spark_controller.add_session(self.session_name, self.connection_string, skip, properties) + except Exception as e: self.fatal_error = True - self.logger.error("Timeout creating session: {}".format(e)) - self.ipython_display.send_error(conf.fatal_error_suggestion()) + self.fatal_error_message = conf.fatal_error_suggestion().format(e) + self.logger.error("Error creating session: {}".format(e)) + self.ipython_display.send_error(self.fatal_error_message) return False return self.session_started @@ -259,11 +259,11 @@ def _do_not_call_change_language(self, line, cell="", local_ns=None): if language not in Constants.lang_supported: self.ipython_display.send_error("'{}' language not supported in kernel magics.".format(language)) - return + return "" if self.session_started: self.ipython_display.send_error("Cannot change the language if a session has been started.") - return + return "" self.language = language self.refresh_configuration() From 1daf70dd5b624101439f68b5532ec9feec838701 Mon Sep 17 00:00:00 2001 From: Alejandro Guerrero Gonzalez Date: Mon, 8 Feb 2016 14:17:03 -0800 Subject: [PATCH 4/7] Add unit test change --- tests/test_kernel_magics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_kernel_magics.py b/tests/test_kernel_magics.py index c4ba58dfc..d5fdf0233 100644 --- a/tests/test_kernel_magics.py +++ b/tests/test_kernel_magics.py @@ -77,7 +77,7 @@ def test_start_session_times_out(): ret = magic._do_not_call_start_session(line) assert not ret - assert not magic.session_started + assert magic.session_started assert magic.fatal_error assert_equals(ipython_display.send_error.call_count, 1) @@ -85,7 +85,7 @@ def test_start_session_times_out(): ipython_display.send_error.reset_mock() ret = magic._do_not_call_start_session(line) assert not ret - assert not magic.session_started + assert magic.session_started assert_equals(ipython_display.send_error.call_count, 1) From 2826654abc07416bfcbc6a105213cd0833badec1 Mon Sep 17 00:00:00 2001 From: Alejandro Guerrero Gonzalez Date: Mon, 8 Feb 2016 14:26:27 -0800 Subject: [PATCH 5/7] Return None instead of empty strings --- remotespark/kernels/kernelmagics.py | 16 ++++++++-------- tests/test_kernel_magics.py | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/remotespark/kernels/kernelmagics.py b/remotespark/kernels/kernelmagics.py index a01e64670..1347d26dd 100644 --- a/remotespark/kernels/kernelmagics.py +++ b/remotespark/kernels/kernelmagics.py @@ -154,7 +154,7 @@ def spark(self, line, cell="", local_ns=None): else: self.ipython_display.send_error(out) else: - return "" + return None @magic_arguments() @cell_magic @@ -167,7 +167,7 @@ def sql(self, line, cell="", local_ns=None): return self.execute_against_context_that_returns_df(self.spark_controller.run_cell_sql, cell, None, args.output) else: - return "" + return None @magic_arguments() @cell_magic @@ -180,7 +180,7 @@ def hive(self, line, cell="", local_ns=None): return self.execute_against_context_that_returns_df(self.spark_controller.run_cell_hive, cell, None, args.output) else: - return "" + return None @magic_arguments() @cell_magic @@ -195,7 +195,7 @@ def cleanup(self, line, cell="", local_ns=None): self.ipython_display.send_error("When you clean up the endpoint, all sessions will be lost, including the " "one used for this notebook. Include the -f parameter if that's your " "intention.") - return "" + return None @magic_arguments() @cell_magic @@ -211,13 +211,13 @@ def delete(self, line, cell="", local_ns=None): self.ipython_display.send_error("Cannot delete this kernel's session ({}). Specify a different session," " shutdown the kernel to delete this session, or run %cleanup to " "delete all sessions for this endpoint.".format(id)) - return "" + return None self.spark_controller.delete_session_by_id(self.connection_string, session) else: self.ipython_display.send_error("Include the -f parameter if you understand that all statements executed" "in this session will be lost.") - return "" + return None @cell_magic def _do_not_call_start_session(self, line, cell="", local_ns=None): @@ -259,11 +259,11 @@ def _do_not_call_change_language(self, line, cell="", local_ns=None): if language not in Constants.lang_supported: self.ipython_display.send_error("'{}' language not supported in kernel magics.".format(language)) - return "" + return None if self.session_started: self.ipython_display.send_error("Cannot change the language if a session has been started.") - return "" + return None self.language = language self.refresh_configuration() diff --git a/tests/test_kernel_magics.py b/tests/test_kernel_magics.py index d5fdf0233..eece5315c 100644 --- a/tests/test_kernel_magics.py +++ b/tests/test_kernel_magics.py @@ -258,7 +258,7 @@ def test_spark_failed_session_start(): ret = magic.spark(line, cell) - assert_equals(ret, "") + assert_equals(ret, None) assert_equals(ipython_display.write.call_count, 0) assert_equals(spark_controller.add_session.call_count, 0) assert_equals(spark_controller.run_cell.call_count, 0) @@ -300,7 +300,7 @@ def test_sql_failed_session_start(): ret = magic.sql(line, cell) - assert_equals(ret, "") + assert_equals(ret, None) assert_equals(spark_controller.add_session.call_count, 0) assert_equals(spark_controller.execute_against_context_that_returns_df.call_count, 0) @@ -341,7 +341,7 @@ def test_hive_failed_session_start(): ret = magic.hive(line, cell) - assert_equals(ret, "") + assert_equals(ret, None) assert_equals(spark_controller.add_session.call_count, 0) assert_equals(spark_controller.execute_against_context_that_returns_df.call_count, 0) From d7ceac106a4bc36c247c585e1e4e36d9191b3cd5 Mon Sep 17 00:00:00 2001 From: Alejandro Guerrero Gonzalez Date: Mon, 8 Feb 2016 14:28:02 -0800 Subject: [PATCH 6/7] Quote status in unexpected status error message --- remotespark/livyclientlib/livysession.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/remotespark/livyclientlib/livysession.py b/remotespark/livyclientlib/livysession.py index a11c47b88..96d0e80e1 100644 --- a/remotespark/livyclientlib/livysession.py +++ b/remotespark/livyclientlib/livysession.py @@ -158,7 +158,7 @@ def wait_for_idle(self, seconds_to_wait): return if current_status in Constants.final_status: - error = "Session {} unexpectedly reached final status {}. See logs:\n{}"\ + error = "Session {} unexpectedly reached final status '{}'. See logs:\n{}"\ .format(self.id, current_status, self.logs) self.logger.error(error) raise LivyUnexpectedStatusError(error) From 20852280c1d29fffb8a8aacc796b85772fc7b58f Mon Sep 17 00:00:00 2001 From: Alejandro Guerrero Gonzalez Date: Mon, 8 Feb 2016 15:19:40 -0800 Subject: [PATCH 7/7] Added missing init value for member --- remotespark/kernels/kernelmagics.py | 2 +- tests/test_kernel_magics.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/remotespark/kernels/kernelmagics.py b/remotespark/kernels/kernelmagics.py index 1347d26dd..5d83a1a51 100644 --- a/remotespark/kernels/kernelmagics.py +++ b/remotespark/kernels/kernelmagics.py @@ -29,8 +29,8 @@ def __init__(self, shell, data=None): self.language = "" self.url = None self.connection_string = None - self.fatal_error = False + self.fatal_error_message = "" @cell_magic def help(self, line, cell="", local_ns=None): diff --git a/tests/test_kernel_magics.py b/tests/test_kernel_magics.py index eece5315c..741e88f29 100644 --- a/tests/test_kernel_magics.py +++ b/tests/test_kernel_magics.py @@ -2,10 +2,10 @@ from nose.tools import with_setup, raises, assert_equals from IPython.core.magic import magics_class -import remotespark.utils.configuration as conf from remotespark.kernels.kernelmagics import KernelMagics -from remotespark.utils.constants import Constants from remotespark.livyclientlib.livyclienttimeouterror import LivyClientTimeoutError +from remotespark.utils.constants import Constants +import remotespark.utils.configuration as conf magic = None spark_controller = None