From 6af747fd7491225551dcaad98a7748a667432533 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Tue, 13 Sep 2022 13:50:23 -0400 Subject: [PATCH 1/8] Substitued some ValueErrors for SyncParserExceptions so errors will pass through to user --- spark_log_parser/eventlog.py | 23 ++++-- spark_log_parser/parsing_models/errors.py | 64 ++++++++------- spark_log_parser/parsing_models/exceptions.py | 80 +++++++++++-------- tests/test_bad_eventlog.py | 23 +++--- 4 files changed, 112 insertions(+), 78 deletions(-) diff --git a/spark_log_parser/eventlog.py b/spark_log_parser/eventlog.py index 0be9ae7..10215d5 100644 --- a/spark_log_parser/eventlog.py +++ b/spark_log_parser/eventlog.py @@ -4,6 +4,8 @@ import pandas as pd +from spark_log_parser.parsing_models.exceptions import LogSubmissionException + class EventLogBuilder: def __init__( @@ -15,7 +17,6 @@ def __init__( self.event_log_paths = self._validate_event_log_paths(event_log_paths) self.work_dir = self._validate_work_dir(work_dir) - def _validate_event_log_paths(self, event_log_paths: list[Path] | list[str]) -> list[Path]: return [Path(x) for x in event_log_paths] @@ -29,7 +30,9 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path: def build(self) -> Path: if not self.event_log_paths: - raise ValueError("No files found") + raise LogSubmissionException( + error_message="No Spark eventlogs were found in submission" + ) self.event_log = self._get_event_log(self.event_log_paths) @@ -53,12 +56,16 @@ def _get_event_log(self, paths: list[Path]) -> Path: if rollover_dat: if len(log_files) > len(rollover_dat): - raise ValueError("No rollover properties found in log file") + raise LogSubmissionException( + error_message="Multiple logs were discovered but not all had rollover properties" + ) return self._concat(rollover_dat) if len(log_files) > 1: - raise ValueError("No rollover properties found in log file") + raise LogSubmissionException( + error_message="Multiple logs were discovered but not all had rollover properties" + ) return log_files[0] @@ -70,15 +77,17 @@ def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path: ) if not len(rollover_df.context_id.unique()) == 1: - raise ValueError("Not all rollover log files have the same Spark context ID") + raise LogSubmissionException( + error_message="Not all rollover log files have the same Spark context ID" + ) diffs = rollover_df.rollover_index.diff() if any(diffs > 1) or rollover_df.rollover_index[0] > 0: - raise ValueError("Rollover log file appears to be missing") + raise LogSubmissionException(error_message="One or more rollover logs is missing") if any(diffs < 1): - raise ValueError("Duplicate rollover log file detected") + raise LogSubmissionException(error_message="Duplicate rollover log file detected") event_log = Path(tempfile.mkstemp(suffix="-concatenated.json", dir=str(self.work_dir))[1]) with open(event_log, "w") as fobj: diff --git a/spark_log_parser/parsing_models/errors.py b/spark_log_parser/parsing_models/errors.py index e1c2a13..6db558c 100644 --- a/spark_log_parser/parsing_models/errors.py +++ b/spark_log_parser/parsing_models/errors.py @@ -2,52 +2,58 @@ logger = logging.getLogger("ParserExceptionLogger") -class ParserErrorMessages(): - SPARK_CONFIG_GENERIC_MESSAGE = ( +class ParserErrorMessages: - "Some configurations were detected that are not yet supported by the Sync Autotuner. " + - "If you would like to try again, please make the following configuration changes and " + - "rerun your application:" + SPARK_CONFIG_GENERIC_MESSAGE = ( + "Some configurations were detected that are not yet supported by the Sync Autotuner. " + + "If you would like to try again, please make the following configuration changes and " + + "rerun your application:" ) - MISSING_EVENT_JOB_START = ( - "Event SparkListenerJobStart was missing for the following jobs: ") - MISSING_EVENT_JOB_END = ( - "Event SparkListenerJobEnd was missing for the following jobs: ") + MISSING_EVENT_JOB_START = "Event SparkListenerJobStart was missing for the following jobs: " + MISSING_EVENT_JOB_END = "Event SparkListenerJobEnd was missing for the following jobs: " MISSING_EVENT_STAGE_SUBMIT = ( - "Event SparkListenerStageSubmitted was missing for the following stages: ") + "Event SparkListenerStageSubmitted was missing for the following stages: " + ) MISSING_EVENT_STAGE_COMPLETE = ( - "Event SparkListenerStageCompleted was missing for the following stages: ") + "Event SparkListenerStageCompleted was missing for the following stages: " + ) MISSING_EVENT_GENERIC_MESSAGE = ( - "Some Spark Listener Event data is missing from the eventlog related to: ") + "Some Spark Listener Event data is missing from the eventlog related to: " + ) MISSING_EVENT_EXPLANATION = ( - "This Event data is necessary for the Sync Autotuner. " + - "Events may be missing for a number of reasons including " + - "-- (1) The application did not complete successfully " + - "-- (2) If these are rollover logs, there may be one or more missing logs " + - "-- (3) Spark Listener communication failures. " + - "There are some steps that can help mitigate these issues " + - "-- (1) Ensure the SparkContext closes correctly at the end of your application, e.g. " + - "by using sc.stop() " + - "-- (2) If you submitted a rollover log set, ensure that all rollover logs for the " + - "application were submitted " + - "-- (3) Resubmit the next log produced with this application." + "This Event data is necessary for the Sync Autotuner. " + + "Events may be missing for a number of reasons including " + + "-- (1) The application did not complete successfully " + + "-- (2) If these are rollover logs, there may be one or more missing logs " + + "-- (3) Spark Listener communication failures. " + + "There are some steps that can help mitigate these issues " + + "-- (1) Ensure the SparkContext closes correctly at the end of your application, e.g. " + + "by using sc.stop() " + + "-- (2) If you submitted a rollover log set, ensure that all rollover logs for the " + + "application were submitted " + + "-- (3) Resubmit the next log produced with this application." ) SUPPORT_MESSAGE = ( - "If you have questions or would like assistance in resolving the issue " + - "please contact our support at support@synccomputing.com.") + "If you have questions or would like assistance in resolving the issue " + + "please contact our support at support@synccomputing.com." + ) -class ParserErrorTypes(): + +class ParserErrorTypes: SPARK_CONFIG_ERROR = "Invalid Spark Configuration Error" - MISSING_EVENT_ERROR = 'Event missing from Spark eventlog' + MISSING_EVENT_ERROR = "Event missing from Spark eventlog" + LOG_SUBMISSION_ERROR = "Invalid log submission" + -class ParserErrorCodes(): +class ParserErrorCodes: SPARK_CONFIG_ERROR = 2001 - SPARK_EVENT_ERROR = 2002 \ No newline at end of file + SPARK_EVENT_ERROR = 2002 + LOG_SUBMISSION_ERROR = 2003 diff --git a/spark_log_parser/parsing_models/exceptions.py b/spark_log_parser/parsing_models/exceptions.py index f68f989..78f5992 100644 --- a/spark_log_parser/parsing_models/exceptions.py +++ b/spark_log_parser/parsing_models/exceptions.py @@ -1,17 +1,19 @@ import json import logging -from .errors import ParserErrorMessages, ParserErrorTypes, ParserErrorCodes +from .errors import ParserErrorCodes, ParserErrorMessages, ParserErrorTypes logger = logging.getLogger("ParserExceptionLogger") + class SyncParserException(Exception): def __init__( - self, - error_type: str = None, - error_message: str = None, - status_code: int = None, - exception: Exception = None ): + self, + error_type: str = None, + error_message: str = None, + status_code: int = None, + exception: Exception = None, + ): super().__init__(error_message) @@ -20,84 +22,96 @@ def __init__( self.status_code = status_code # 2022-03-25 RW: Format the information in the way it is expected by the backend code - self.error = { - "error" : error_type, - "message" : error_message - } + self.error = {"error": error_type, "message": error_message} logger.error(error_message) if exception: logger.exception(exception) - def get_ui_return_value(self) -> dict: """ A possible rendering of one set of return information as dict """ - return { - "error": self.error_type, - "message": self.error_message - } + return {"error": self.error_type, "message": self.error_message} def get_ui_return_value_as_json(self) -> json: """ A possible rendering of one set of return information as JSON """ - return json.dumps( { - "error" : self.error_type, - "message" : self.error_message - } ) + return json.dumps({"error": self.error_type, "message": self.error_message}) -class ConfigurationException(SyncParserException): +class ConfigurationException(SyncParserException): def __init__(self, config_recs: str): - error_message = ParserErrorMessages.SPARK_CONFIG_GENERIC_MESSAGE for idx, c in enumerate(config_recs): - count = idx+1 - error_message += f' ({count}) {c}' + count = idx + 1 + error_message += f" ({count}) {c}" - error_message += f'. {ParserErrorMessages.SUPPORT_MESSAGE}' + error_message += f". {ParserErrorMessages.SUPPORT_MESSAGE}" super().__init__( error_type=ParserErrorTypes.SPARK_CONFIG_ERROR, error_message=error_message, status_code=ParserErrorCodes.SPARK_CONFIG_ERROR, - exception=None) + exception=None, + ) + class LazyEventValidationException(SyncParserException): """ This Exception is for missing event data that doesn't immediately kill the parser. All of the related missing events can be gathered and identified in the error message. """ + def __init__(self, error_message: str): error_message += ( - f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} " + - f"{ParserErrorMessages.SUPPORT_MESSAGE}") + f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} " + + f"{ParserErrorMessages.SUPPORT_MESSAGE}" + ) super().__init__( error_type=ParserErrorTypes.MISSING_EVENT_ERROR, error_message=error_message, status_code=ParserErrorCodes.SPARK_EVENT_ERROR, - exception=None) + exception=None, + ) class UrgentEventValidationException(SyncParserException): """ This Exception is for missing event data that stops the parser dead in its tracks. """ - def __init__(self, missing_event: str = ''): + + def __init__(self, missing_event: str = ""): error_message = ( - f"{ParserErrorMessages.MISSING_EVENT_GENERIC_MESSAGE} '{missing_event}'. " + - f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} " + - f"{ParserErrorMessages.SUPPORT_MESSAGE}") + f"{ParserErrorMessages.MISSING_EVENT_GENERIC_MESSAGE} '{missing_event}'. " + + f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} " + + f"{ParserErrorMessages.SUPPORT_MESSAGE}" + ) super().__init__( error_type=ParserErrorTypes.MISSING_EVENT_ERROR, error_message=error_message, status_code=ParserErrorCodes.SPARK_EVENT_ERROR, - exception=None) + exception=None, + ) + + +class LogSubmissionException(SyncParserException): + """ + This Exception is for malformed log submission + """ + + def __init__(self, error_message: str): + + super().__init__( + error_type=ParserErrorTypes.LOG_SUBMISSION_ERROR, + error_message=error_message, + status_code=ParserErrorCodes.LOG_SUBMISSION_ERROR, + exception=None, + ) diff --git a/tests/test_bad_eventlog.py b/tests/test_bad_eventlog.py index 262dc1f..438ae3d 100644 --- a/tests/test_bad_eventlog.py +++ b/tests/test_bad_eventlog.py @@ -4,13 +4,14 @@ from zipfile import ZipFile from spark_log_parser import eventlog, extractor +from spark_log_parser.parsing_models.exceptions import LogSubmissionException class BadEventLog(unittest.TestCase): - def check_value_error(self, event_log_path, msg): + def check_sync_exceptions(self, event_log_path, msg): with tempfile.TemporaryDirectory() as temp_dir: - with self.assertRaises(ValueError) as cm: + with self.assertRaises(LogSubmissionException) as cm: event_log_paths = extractor.Extractor(event_log_path.as_uri(), temp_dir).extract() eventlog.EventLogBuilder(event_log_paths, temp_dir).build() @@ -19,25 +20,27 @@ def check_value_error(self, event_log_path, msg): def test_multiple_context_ids(self): event_log = Path("tests", "logs", "bad", "non-unique-context-id.zip").resolve() - self.check_value_error( + self.check_sync_exceptions( event_log, "Not all rollover log files have the same Spark context ID" ) def test_missing_dbc_event(self): event_log = Path("tests", "logs", "bad", "missing-dbc-event.zip").resolve() - self.check_value_error(event_log, "No rollover properties found in log file") + self.check_sync_exceptions( + event_log, "Multiple logs were discovered but not all had rollover properties" + ) def test_duplicate_log_part(self): event_log = Path("tests", "logs", "bad", "duplicate-part.tgz").resolve() - self.check_value_error(event_log, "Duplicate rollover log file detected") + self.check_sync_exceptions(event_log, "Duplicate rollover log file detected") def test_missing_log_part(self): event_log = Path("tests", "logs", "bad", "missing-part.zip").resolve() - self.check_value_error(event_log, "Rollover log file appears to be missing") + self.check_sync_exceptions(event_log, "One or more rollover logs is missing") def test_missing_first_part(self): event_log = Path("tests", "logs", "bad", "missing-first-part.zip").resolve() - self.check_value_error(event_log, "Rollover log file appears to be missing") + self.check_sync_exceptions(event_log, "One or more rollover logs is missing") def test_only_non_first_part(self): with tempfile.TemporaryDirectory() as temp_dir: @@ -46,8 +49,10 @@ def test_only_non_first_part(self): [zinfo for zinfo in zfile.infolist() if not zinfo.is_dir()][0], temp_dir ) - self.check_value_error(Path(temp_dir), "Rollover log file appears to be missing") + self.check_sync_exceptions(Path(temp_dir), "One or more rollover logs is missing") def test_empty_log_dir(self): with tempfile.TemporaryDirectory() as temp_dir: - self.check_value_error(Path(temp_dir), "No files found") + self.check_sync_exceptions( + Path(temp_dir), "No Spark eventlogs were found in submission" + ) From 80dd20965670ca04c7a47d069336355ffb151e9a Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Tue, 13 Sep 2022 14:53:06 -0400 Subject: [PATCH 2/8] incremented version --- spark_log_parser/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark_log_parser/__init__.py b/spark_log_parser/__init__.py index e7a49be..bfaf605 100644 --- a/spark_log_parser/__init__.py +++ b/spark_log_parser/__init__.py @@ -1,3 +1,3 @@ """Tools for providing Spark event log""" -__version__ = "0.1.3" +__version__ = "0.1.5" From eada47da0a8514683b911062a8e28ca2496d13c9 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Mon, 19 Sep 2022 21:23:41 -0400 Subject: [PATCH 3/8] fixed new exception case --- spark_log_parser/eventlog.py | 2 +- tests/test_bad_eventlog.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spark_log_parser/eventlog.py b/spark_log_parser/eventlog.py index 6e0981f..0685399 100644 --- a/spark_log_parser/eventlog.py +++ b/spark_log_parser/eventlog.py @@ -66,7 +66,7 @@ def _get_event_log(self, paths: list[Path]) -> tuple[Path, bool]: continue if len(log_files) > 1 and parsed: - raise ValueError("A parsed log file was submitted with other log files") + raise LogSubmissionException("A parsed log file was submitted with other log files") if rollover_dat: if len(log_files) > len(rollover_dat): diff --git a/tests/test_bad_eventlog.py b/tests/test_bad_eventlog.py index ce14870..0782977 100644 --- a/tests/test_bad_eventlog.py +++ b/tests/test_bad_eventlog.py @@ -44,7 +44,7 @@ def test_missing_first_part(self): def test_mixed_parsed(self): event_log = Path("tests", "logs", "bad", "mixed_parsed.zip").resolve() - self.check_value_error(event_log, "A parsed log file was submitted with other log files") + self.check_sync_exceptions(event_log, "A parsed log file was submitted with other log files") def test_only_non_first_part(self): with tempfile.TemporaryDirectory() as temp_dir: From fca54597d969af644183dec0717e05a850ebd158 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Mon, 19 Sep 2022 21:37:13 -0400 Subject: [PATCH 4/8] make tidy --- spark_log_parser/__init__.py | 2 +- spark_log_parser/eventlog.py | 4 ++-- tests/test_bad_eventlog.py | 8 +++++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/spark_log_parser/__init__.py b/spark_log_parser/__init__.py index 884acc0..6c0f8b6 100644 --- a/spark_log_parser/__init__.py +++ b/spark_log_parser/__init__.py @@ -1,2 +1,2 @@ """Tools for providing Spark event log""" -__version__ = "0.1.6" \ No newline at end of file +__version__ = "0.1.6" diff --git a/spark_log_parser/eventlog.py b/spark_log_parser/eventlog.py index 0685399..15e6653 100644 --- a/spark_log_parser/eventlog.py +++ b/spark_log_parser/eventlog.py @@ -71,7 +71,8 @@ def _get_event_log(self, paths: list[Path]) -> tuple[Path, bool]: if rollover_dat: if len(log_files) > len(rollover_dat): raise LogSubmissionException( - error_message="Rollover logs were detected, but not all files had rollover properties") + error_message="Rollover logs were detected, but not all files had rollover properties" + ) return self._concat(rollover_dat), False @@ -80,7 +81,6 @@ def _get_event_log(self, paths: list[Path]) -> tuple[Path, bool]: error_message="Multiple files detected without log rollover properties" ) - return log_files[0], parsed def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path: diff --git a/tests/test_bad_eventlog.py b/tests/test_bad_eventlog.py index 0782977..81fdb40 100644 --- a/tests/test_bad_eventlog.py +++ b/tests/test_bad_eventlog.py @@ -27,8 +27,8 @@ def test_multiple_context_ids(self): def test_missing_dbc_event(self): event_log = Path("tests", "logs", "bad", "missing-dbc-event.zip").resolve() self.check_sync_exceptions( - event_log, "Rollover logs were detected, but not all files had rollover properties") - + event_log, "Rollover logs were detected, but not all files had rollover properties" + ) def test_duplicate_log_part(self): event_log = Path("tests", "logs", "bad", "duplicate-part.tgz").resolve() @@ -44,7 +44,9 @@ def test_missing_first_part(self): def test_mixed_parsed(self): event_log = Path("tests", "logs", "bad", "mixed_parsed.zip").resolve() - self.check_sync_exceptions(event_log, "A parsed log file was submitted with other log files") + self.check_sync_exceptions( + event_log, "A parsed log file was submitted with other log files" + ) def test_only_non_first_part(self): with tempfile.TemporaryDirectory() as temp_dir: From e2313c76d4521aaf5cca9777c79d9744b27ac289 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Mon, 19 Sep 2022 21:40:40 -0400 Subject: [PATCH 5/8] align SyncParserException with SyncException --- spark_log_parser/parsing_models/exceptions.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/spark_log_parser/parsing_models/exceptions.py b/spark_log_parser/parsing_models/exceptions.py index 78f5992..ad74f43 100644 --- a/spark_log_parser/parsing_models/exceptions.py +++ b/spark_log_parser/parsing_models/exceptions.py @@ -40,6 +40,24 @@ def get_ui_return_value_as_json(self) -> json: """ return json.dumps({"error": self.error_type, "message": self.error_message}) + def get_exception_response(self) -> dict: + """ + Used to get an example response, which will be used for + the swagger documentation. + + Returns: + dict: error response + """ + # Convert into a pydantic model in future iterations + # once flask is stripped away + return { + "error": { + "type": self.error_type, + "code": self.status_code, + "message": self.error_message, + } + } + class ConfigurationException(SyncParserException): def __init__(self, config_recs: str): From 2cbadfddec473c91b84ce6e467081bd802da0f98 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Mon, 19 Sep 2022 22:06:23 -0400 Subject: [PATCH 6/8] Kept version at 0.1.5 --- spark_log_parser/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark_log_parser/__init__.py b/spark_log_parser/__init__.py index 6c0f8b6..075fd67 100644 --- a/spark_log_parser/__init__.py +++ b/spark_log_parser/__init__.py @@ -1,2 +1,2 @@ """Tools for providing Spark event log""" -__version__ = "0.1.6" +__version__ = "0.1.5" From bf78bdeaf92784fa15f9a168de4c8e57b3633fd6 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Tue, 20 Sep 2022 13:23:03 -0400 Subject: [PATCH 7/8] More cleanup on exceptions --- spark_log_parser/parsing_models/exceptions.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/spark_log_parser/parsing_models/exceptions.py b/spark_log_parser/parsing_models/exceptions.py index ad74f43..1c84cc9 100644 --- a/spark_log_parser/parsing_models/exceptions.py +++ b/spark_log_parser/parsing_models/exceptions.py @@ -12,7 +12,6 @@ def __init__( error_type: str = None, error_message: str = None, status_code: int = None, - exception: Exception = None, ): super().__init__(error_message) @@ -24,10 +23,6 @@ def __init__( # 2022-03-25 RW: Format the information in the way it is expected by the backend code self.error = {"error": error_type, "message": error_message} - logger.error(error_message) - if exception: - logger.exception(exception) - def get_ui_return_value(self) -> dict: """ A possible rendering of one set of return information as dict @@ -74,7 +69,6 @@ def __init__(self, config_recs: str): error_type=ParserErrorTypes.SPARK_CONFIG_ERROR, error_message=error_message, status_code=ParserErrorCodes.SPARK_CONFIG_ERROR, - exception=None, ) @@ -95,7 +89,6 @@ def __init__(self, error_message: str): error_type=ParserErrorTypes.MISSING_EVENT_ERROR, error_message=error_message, status_code=ParserErrorCodes.SPARK_EVENT_ERROR, - exception=None, ) @@ -116,7 +109,6 @@ def __init__(self, missing_event: str = ""): error_type=ParserErrorTypes.MISSING_EVENT_ERROR, error_message=error_message, status_code=ParserErrorCodes.SPARK_EVENT_ERROR, - exception=None, ) @@ -131,5 +123,4 @@ def __init__(self, error_message: str): error_type=ParserErrorTypes.LOG_SUBMISSION_ERROR, error_message=error_message, status_code=ParserErrorCodes.LOG_SUBMISSION_ERROR, - exception=None, ) From ea2a86620157e7d37ad4053e436f40b893d02936 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Thu, 22 Sep 2022 09:42:39 -0400 Subject: [PATCH 8/8] Small PR changes --- spark_log_parser/parsing_models/exceptions.py | 20 +------------------ 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/spark_log_parser/parsing_models/exceptions.py b/spark_log_parser/parsing_models/exceptions.py index 1c84cc9..1320b2a 100644 --- a/spark_log_parser/parsing_models/exceptions.py +++ b/spark_log_parser/parsing_models/exceptions.py @@ -35,24 +35,6 @@ def get_ui_return_value_as_json(self) -> json: """ return json.dumps({"error": self.error_type, "message": self.error_message}) - def get_exception_response(self) -> dict: - """ - Used to get an example response, which will be used for - the swagger documentation. - - Returns: - dict: error response - """ - # Convert into a pydantic model in future iterations - # once flask is stripped away - return { - "error": { - "type": self.error_type, - "code": self.status_code, - "message": self.error_message, - } - } - class ConfigurationException(SyncParserException): def __init__(self, config_recs: str): @@ -112,7 +94,7 @@ def __init__(self, missing_event: str = ""): ) -class LogSubmissionException(SyncParserException): +class LogSubmissionException(SyncParserException, ValueError): """ This Exception is for malformed log submission """