diff --git a/spark_log_parser/eventlog.py b/spark_log_parser/eventlog.py index a354de0..15e6653 100644 --- a/spark_log_parser/eventlog.py +++ b/spark_log_parser/eventlog.py @@ -4,6 +4,8 @@ import pandas as pd +from spark_log_parser.parsing_models.exceptions import LogSubmissionException + class EventLogBuilder: def __init__( @@ -27,7 +29,9 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path: def build(self) -> tuple[Path, bool]: if not self.event_log_paths: - raise ValueError("No files found") + raise LogSubmissionException( + error_message="No Spark eventlogs were found in submission" + ) self.event_log, self.parsed = self._get_event_log(self.event_log_paths) @@ -62,18 +66,20 @@ def _get_event_log(self, paths: list[Path]) -> tuple[Path, bool]: continue if len(log_files) > 1 and parsed: - raise ValueError("A parsed log file was submitted with other log files") + raise LogSubmissionException("A parsed log file was submitted with other log files") if rollover_dat: if len(log_files) > len(rollover_dat): - raise ValueError( - "Rollover logs were detected, but not all files had rollover properties" + raise LogSubmissionException( + error_message="Rollover logs were detected, but not all files had rollover properties" ) return self._concat(rollover_dat), False if len(log_files) > 1: - raise ValueError("Multiple files detected without log rollover properties") + raise LogSubmissionException( + error_message="Multiple files detected without log rollover properties" + ) return log_files[0], parsed @@ -85,15 +91,17 @@ def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path: ) if not len(rollover_df.context_id.unique()) == 1: - raise ValueError("Not all rollover log files have the same Spark context ID") + raise LogSubmissionException( + error_message="Not all rollover log files have the same Spark context ID" + ) diffs = rollover_df.rollover_index.diff() if any(diffs > 1) or rollover_df.rollover_index[0] > 0: - raise ValueError("Rollover log file appears to be missing") + raise LogSubmissionException(error_message="One or more rollover logs is missing") if any(diffs < 1): - raise ValueError("Duplicate rollover log file detected") + raise LogSubmissionException(error_message="Duplicate rollover log file detected") event_log = Path(tempfile.mkstemp(suffix="-concatenated.json", dir=str(self.work_dir))[1]) with open(event_log, "w") as fobj: diff --git a/spark_log_parser/parsing_models/errors.py b/spark_log_parser/parsing_models/errors.py index e1c2a13..6db558c 100644 --- a/spark_log_parser/parsing_models/errors.py +++ b/spark_log_parser/parsing_models/errors.py @@ -2,52 +2,58 @@ logger = logging.getLogger("ParserExceptionLogger") -class ParserErrorMessages(): - SPARK_CONFIG_GENERIC_MESSAGE = ( +class ParserErrorMessages: - "Some configurations were detected that are not yet supported by the Sync Autotuner. " + - "If you would like to try again, please make the following configuration changes and " + - "rerun your application:" + SPARK_CONFIG_GENERIC_MESSAGE = ( + "Some configurations were detected that are not yet supported by the Sync Autotuner. " + + "If you would like to try again, please make the following configuration changes and " + + "rerun your application:" ) - MISSING_EVENT_JOB_START = ( - "Event SparkListenerJobStart was missing for the following jobs: ") - MISSING_EVENT_JOB_END = ( - "Event SparkListenerJobEnd was missing for the following jobs: ") + MISSING_EVENT_JOB_START = "Event SparkListenerJobStart was missing for the following jobs: " + MISSING_EVENT_JOB_END = "Event SparkListenerJobEnd was missing for the following jobs: " MISSING_EVENT_STAGE_SUBMIT = ( - "Event SparkListenerStageSubmitted was missing for the following stages: ") + "Event SparkListenerStageSubmitted was missing for the following stages: " + ) MISSING_EVENT_STAGE_COMPLETE = ( - "Event SparkListenerStageCompleted was missing for the following stages: ") + "Event SparkListenerStageCompleted was missing for the following stages: " + ) MISSING_EVENT_GENERIC_MESSAGE = ( - "Some Spark Listener Event data is missing from the eventlog related to: ") + "Some Spark Listener Event data is missing from the eventlog related to: " + ) MISSING_EVENT_EXPLANATION = ( - "This Event data is necessary for the Sync Autotuner. " + - "Events may be missing for a number of reasons including " + - "-- (1) The application did not complete successfully " + - "-- (2) If these are rollover logs, there may be one or more missing logs " + - "-- (3) Spark Listener communication failures. " + - "There are some steps that can help mitigate these issues " + - "-- (1) Ensure the SparkContext closes correctly at the end of your application, e.g. " + - "by using sc.stop() " + - "-- (2) If you submitted a rollover log set, ensure that all rollover logs for the " + - "application were submitted " + - "-- (3) Resubmit the next log produced with this application." + "This Event data is necessary for the Sync Autotuner. " + + "Events may be missing for a number of reasons including " + + "-- (1) The application did not complete successfully " + + "-- (2) If these are rollover logs, there may be one or more missing logs " + + "-- (3) Spark Listener communication failures. " + + "There are some steps that can help mitigate these issues " + + "-- (1) Ensure the SparkContext closes correctly at the end of your application, e.g. " + + "by using sc.stop() " + + "-- (2) If you submitted a rollover log set, ensure that all rollover logs for the " + + "application were submitted " + + "-- (3) Resubmit the next log produced with this application." ) SUPPORT_MESSAGE = ( - "If you have questions or would like assistance in resolving the issue " + - "please contact our support at support@synccomputing.com.") + "If you have questions or would like assistance in resolving the issue " + + "please contact our support at support@synccomputing.com." + ) -class ParserErrorTypes(): + +class ParserErrorTypes: SPARK_CONFIG_ERROR = "Invalid Spark Configuration Error" - MISSING_EVENT_ERROR = 'Event missing from Spark eventlog' + MISSING_EVENT_ERROR = "Event missing from Spark eventlog" + LOG_SUBMISSION_ERROR = "Invalid log submission" + -class ParserErrorCodes(): +class ParserErrorCodes: SPARK_CONFIG_ERROR = 2001 - SPARK_EVENT_ERROR = 2002 \ No newline at end of file + SPARK_EVENT_ERROR = 2002 + LOG_SUBMISSION_ERROR = 2003 diff --git a/spark_log_parser/parsing_models/exceptions.py b/spark_log_parser/parsing_models/exceptions.py index f68f989..1320b2a 100644 --- a/spark_log_parser/parsing_models/exceptions.py +++ b/spark_log_parser/parsing_models/exceptions.py @@ -1,17 +1,18 @@ import json import logging -from .errors import ParserErrorMessages, ParserErrorTypes, ParserErrorCodes +from .errors import ParserErrorCodes, ParserErrorMessages, ParserErrorTypes logger = logging.getLogger("ParserExceptionLogger") + class SyncParserException(Exception): def __init__( - self, - error_type: str = None, - error_message: str = None, - status_code: int = None, - exception: Exception = None ): + self, + error_type: str = None, + error_message: str = None, + status_code: int = None, + ): super().__init__(error_message) @@ -20,84 +21,88 @@ def __init__( self.status_code = status_code # 2022-03-25 RW: Format the information in the way it is expected by the backend code - self.error = { - "error" : error_type, - "message" : error_message - } - - logger.error(error_message) - if exception: - logger.exception(exception) - + self.error = {"error": error_type, "message": error_message} def get_ui_return_value(self) -> dict: """ A possible rendering of one set of return information as dict """ - return { - "error": self.error_type, - "message": self.error_message - } + return {"error": self.error_type, "message": self.error_message} def get_ui_return_value_as_json(self) -> json: """ A possible rendering of one set of return information as JSON """ - return json.dumps( { - "error" : self.error_type, - "message" : self.error_message - } ) + return json.dumps({"error": self.error_type, "message": self.error_message}) -class ConfigurationException(SyncParserException): +class ConfigurationException(SyncParserException): def __init__(self, config_recs: str): - error_message = ParserErrorMessages.SPARK_CONFIG_GENERIC_MESSAGE for idx, c in enumerate(config_recs): - count = idx+1 - error_message += f' ({count}) {c}' + count = idx + 1 + error_message += f" ({count}) {c}" - error_message += f'. {ParserErrorMessages.SUPPORT_MESSAGE}' + error_message += f". {ParserErrorMessages.SUPPORT_MESSAGE}" super().__init__( error_type=ParserErrorTypes.SPARK_CONFIG_ERROR, error_message=error_message, status_code=ParserErrorCodes.SPARK_CONFIG_ERROR, - exception=None) + ) + class LazyEventValidationException(SyncParserException): """ This Exception is for missing event data that doesn't immediately kill the parser. All of the related missing events can be gathered and identified in the error message. """ + def __init__(self, error_message: str): error_message += ( - f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} " + - f"{ParserErrorMessages.SUPPORT_MESSAGE}") + f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} " + + f"{ParserErrorMessages.SUPPORT_MESSAGE}" + ) super().__init__( error_type=ParserErrorTypes.MISSING_EVENT_ERROR, error_message=error_message, status_code=ParserErrorCodes.SPARK_EVENT_ERROR, - exception=None) + ) class UrgentEventValidationException(SyncParserException): """ This Exception is for missing event data that stops the parser dead in its tracks. """ - def __init__(self, missing_event: str = ''): + + def __init__(self, missing_event: str = ""): error_message = ( - f"{ParserErrorMessages.MISSING_EVENT_GENERIC_MESSAGE} '{missing_event}'. " + - f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} " + - f"{ParserErrorMessages.SUPPORT_MESSAGE}") + f"{ParserErrorMessages.MISSING_EVENT_GENERIC_MESSAGE} '{missing_event}'. " + + f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} " + + f"{ParserErrorMessages.SUPPORT_MESSAGE}" + ) super().__init__( error_type=ParserErrorTypes.MISSING_EVENT_ERROR, error_message=error_message, status_code=ParserErrorCodes.SPARK_EVENT_ERROR, - exception=None) + ) + + +class LogSubmissionException(SyncParserException, ValueError): + """ + This Exception is for malformed log submission + """ + + def __init__(self, error_message: str): + + super().__init__( + error_type=ParserErrorTypes.LOG_SUBMISSION_ERROR, + error_message=error_message, + status_code=ParserErrorCodes.LOG_SUBMISSION_ERROR, + ) diff --git a/tests/test_bad_eventlog.py b/tests/test_bad_eventlog.py index 61d098c..81fdb40 100644 --- a/tests/test_bad_eventlog.py +++ b/tests/test_bad_eventlog.py @@ -4,13 +4,14 @@ from zipfile import ZipFile from spark_log_parser import eventlog, extractor +from spark_log_parser.parsing_models.exceptions import LogSubmissionException class BadEventLog(unittest.TestCase): - def check_value_error(self, event_log_path, msg): + def check_sync_exceptions(self, event_log_path, msg): with tempfile.TemporaryDirectory() as temp_dir: - with self.assertRaises(ValueError) as cm: + with self.assertRaises(LogSubmissionException) as cm: event_log_paths = extractor.Extractor(event_log_path.as_uri(), temp_dir).extract() eventlog.EventLogBuilder(event_log_paths, temp_dir).build() @@ -19,31 +20,33 @@ def check_value_error(self, event_log_path, msg): def test_multiple_context_ids(self): event_log = Path("tests", "logs", "bad", "non-unique-context-id.zip").resolve() - self.check_value_error( + self.check_sync_exceptions( event_log, "Not all rollover log files have the same Spark context ID" ) def test_missing_dbc_event(self): event_log = Path("tests", "logs", "bad", "missing-dbc-event.zip").resolve() - self.check_value_error( + self.check_sync_exceptions( event_log, "Rollover logs were detected, but not all files had rollover properties" ) def test_duplicate_log_part(self): event_log = Path("tests", "logs", "bad", "duplicate-part.tgz").resolve() - self.check_value_error(event_log, "Duplicate rollover log file detected") + self.check_sync_exceptions(event_log, "Duplicate rollover log file detected") def test_missing_log_part(self): event_log = Path("tests", "logs", "bad", "missing-part.zip").resolve() - self.check_value_error(event_log, "Rollover log file appears to be missing") + self.check_sync_exceptions(event_log, "One or more rollover logs is missing") def test_missing_first_part(self): event_log = Path("tests", "logs", "bad", "missing-first-part.zip").resolve() - self.check_value_error(event_log, "Rollover log file appears to be missing") + self.check_sync_exceptions(event_log, "One or more rollover logs is missing") def test_mixed_parsed(self): event_log = Path("tests", "logs", "bad", "mixed_parsed.zip").resolve() - self.check_value_error(event_log, "A parsed log file was submitted with other log files") + self.check_sync_exceptions( + event_log, "A parsed log file was submitted with other log files" + ) def test_only_non_first_part(self): with tempfile.TemporaryDirectory() as temp_dir: @@ -52,8 +55,10 @@ def test_only_non_first_part(self): [zinfo for zinfo in zfile.infolist() if not zinfo.is_dir()][0], temp_dir ) - self.check_value_error(Path(temp_dir), "Rollover log file appears to be missing") + self.check_sync_exceptions(Path(temp_dir), "One or more rollover logs is missing") def test_empty_log_dir(self): with tempfile.TemporaryDirectory() as temp_dir: - self.check_value_error(Path(temp_dir), "No files found") + self.check_sync_exceptions( + Path(temp_dir), "No Spark eventlogs were found in submission" + )