Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions spark_log_parser/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import pandas as pd

from spark_log_parser.parsing_models.exceptions import LogSubmissionException


class EventLogBuilder:
def __init__(
Expand All @@ -27,7 +29,9 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path:
def build(self) -> tuple[Path, bool]:

if not self.event_log_paths:
raise ValueError("No files found")
raise LogSubmissionException(
error_message="No Spark eventlogs were found in submission"
)

self.event_log, self.parsed = self._get_event_log(self.event_log_paths)

Expand Down Expand Up @@ -62,18 +66,20 @@ def _get_event_log(self, paths: list[Path]) -> tuple[Path, bool]:
continue

if len(log_files) > 1 and parsed:
raise ValueError("A parsed log file was submitted with other log files")
raise LogSubmissionException("A parsed log file was submitted with other log files")

if rollover_dat:
if len(log_files) > len(rollover_dat):
raise ValueError(
"Rollover logs were detected, but not all files had rollover properties"
raise LogSubmissionException(
error_message="Rollover logs were detected, but not all files had rollover properties"
)

return self._concat(rollover_dat), False

if len(log_files) > 1:
raise ValueError("Multiple files detected without log rollover properties")
raise LogSubmissionException(
error_message="Multiple files detected without log rollover properties"
)

return log_files[0], parsed

Expand All @@ -85,15 +91,17 @@ def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path:
)

if not len(rollover_df.context_id.unique()) == 1:
raise ValueError("Not all rollover log files have the same Spark context ID")
raise LogSubmissionException(
error_message="Not all rollover log files have the same Spark context ID"
)

diffs = rollover_df.rollover_index.diff()

if any(diffs > 1) or rollover_df.rollover_index[0] > 0:
raise ValueError("Rollover log file appears to be missing")
raise LogSubmissionException(error_message="One or more rollover logs is missing")

if any(diffs < 1):
raise ValueError("Duplicate rollover log file detected")
raise LogSubmissionException(error_message="Duplicate rollover log file detected")

event_log = Path(tempfile.mkstemp(suffix="-concatenated.json", dir=str(self.work_dir))[1])
with open(event_log, "w") as fobj:
Expand Down
64 changes: 35 additions & 29 deletions spark_log_parser/parsing_models/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,58 @@

logger = logging.getLogger("ParserExceptionLogger")

class ParserErrorMessages():

SPARK_CONFIG_GENERIC_MESSAGE = (
class ParserErrorMessages:

"Some configurations were detected that are not yet supported by the Sync Autotuner. " +
"If you would like to try again, please make the following configuration changes and " +
"rerun your application:"
SPARK_CONFIG_GENERIC_MESSAGE = (
"Some configurations were detected that are not yet supported by the Sync Autotuner. "
+ "If you would like to try again, please make the following configuration changes and "
+ "rerun your application:"
)

MISSING_EVENT_JOB_START = (
"Event SparkListenerJobStart was missing for the following jobs: ")
MISSING_EVENT_JOB_END = (
"Event SparkListenerJobEnd was missing for the following jobs: ")
MISSING_EVENT_JOB_START = "Event SparkListenerJobStart was missing for the following jobs: "
MISSING_EVENT_JOB_END = "Event SparkListenerJobEnd was missing for the following jobs: "

MISSING_EVENT_STAGE_SUBMIT = (
"Event SparkListenerStageSubmitted was missing for the following stages: ")
"Event SparkListenerStageSubmitted was missing for the following stages: "
)
MISSING_EVENT_STAGE_COMPLETE = (
"Event SparkListenerStageCompleted was missing for the following stages: ")
"Event SparkListenerStageCompleted was missing for the following stages: "
)

MISSING_EVENT_GENERIC_MESSAGE = (
"Some Spark Listener Event data is missing from the eventlog related to: ")
"Some Spark Listener Event data is missing from the eventlog related to: "
)

MISSING_EVENT_EXPLANATION = (
"This Event data is necessary for the Sync Autotuner. " +
"Events may be missing for a number of reasons including " +
"-- (1) The application did not complete successfully " +
"-- (2) If these are rollover logs, there may be one or more missing logs " +
"-- (3) Spark Listener communication failures. " +
"There are some steps that can help mitigate these issues " +
"-- (1) Ensure the SparkContext closes correctly at the end of your application, e.g. " +
"by using sc.stop() " +
"-- (2) If you submitted a rollover log set, ensure that all rollover logs for the " +
"application were submitted " +
"-- (3) Resubmit the next log produced with this application."
"This Event data is necessary for the Sync Autotuner. "
+ "Events may be missing for a number of reasons including "
+ "-- (1) The application did not complete successfully "
+ "-- (2) If these are rollover logs, there may be one or more missing logs "
+ "-- (3) Spark Listener communication failures. "
+ "There are some steps that can help mitigate these issues "
+ "-- (1) Ensure the SparkContext closes correctly at the end of your application, e.g. "
+ "by using sc.stop() "
+ "-- (2) If you submitted a rollover log set, ensure that all rollover logs for the "
+ "application were submitted "
+ "-- (3) Resubmit the next log produced with this application."
)

SUPPORT_MESSAGE = (
"If you have questions or would like assistance in resolving the issue " +
"please contact our support at support@synccomputing.com.")
"If you have questions or would like assistance in resolving the issue "
+ "please contact our support at support@synccomputing.com."
)

class ParserErrorTypes():

class ParserErrorTypes:

SPARK_CONFIG_ERROR = "Invalid Spark Configuration Error"
MISSING_EVENT_ERROR = 'Event missing from Spark eventlog'
MISSING_EVENT_ERROR = "Event missing from Spark eventlog"
LOG_SUBMISSION_ERROR = "Invalid log submission"


class ParserErrorCodes():
class ParserErrorCodes:

SPARK_CONFIG_ERROR = 2001
SPARK_EVENT_ERROR = 2002
SPARK_EVENT_ERROR = 2002
LOG_SUBMISSION_ERROR = 2003
79 changes: 42 additions & 37 deletions spark_log_parser/parsing_models/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import json
import logging

from .errors import ParserErrorMessages, ParserErrorTypes, ParserErrorCodes
from .errors import ParserErrorCodes, ParserErrorMessages, ParserErrorTypes

logger = logging.getLogger("ParserExceptionLogger")


class SyncParserException(Exception):
def __init__(
self,
error_type: str = None,
error_message: str = None,
status_code: int = None,
exception: Exception = None ):
self,
error_type: str = None,
error_message: str = None,
status_code: int = None,
):

super().__init__(error_message)

Expand All @@ -20,84 +21,88 @@ def __init__(
self.status_code = status_code

# 2022-03-25 RW: Format the information in the way it is expected by the backend code
self.error = {
"error" : error_type,
"message" : error_message
}

logger.error(error_message)
if exception:
logger.exception(exception)

self.error = {"error": error_type, "message": error_message}

def get_ui_return_value(self) -> dict:
"""
A possible rendering of one set of return information as dict
"""
return {
"error": self.error_type,
"message": self.error_message
}
return {"error": self.error_type, "message": self.error_message}

def get_ui_return_value_as_json(self) -> json:
"""
A possible rendering of one set of return information as JSON
"""
return json.dumps( {
"error" : self.error_type,
"message" : self.error_message
} )
return json.dumps({"error": self.error_type, "message": self.error_message})

class ConfigurationException(SyncParserException):

class ConfigurationException(SyncParserException):
def __init__(self, config_recs: str):


error_message = ParserErrorMessages.SPARK_CONFIG_GENERIC_MESSAGE

for idx, c in enumerate(config_recs):
count = idx+1
error_message += f' ({count}) {c}'
count = idx + 1
error_message += f" ({count}) {c}"

error_message += f'. {ParserErrorMessages.SUPPORT_MESSAGE}'
error_message += f". {ParserErrorMessages.SUPPORT_MESSAGE}"

super().__init__(
error_type=ParserErrorTypes.SPARK_CONFIG_ERROR,
error_message=error_message,
status_code=ParserErrorCodes.SPARK_CONFIG_ERROR,
exception=None)
)


class LazyEventValidationException(SyncParserException):
"""
This Exception is for missing event data that doesn't immediately kill the parser.
All of the related missing events can be gathered and identified in the error message.
"""

def __init__(self, error_message: str):

error_message += (
f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} " +
f"{ParserErrorMessages.SUPPORT_MESSAGE}")
f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} "
+ f"{ParserErrorMessages.SUPPORT_MESSAGE}"
)

super().__init__(
error_type=ParserErrorTypes.MISSING_EVENT_ERROR,
error_message=error_message,
status_code=ParserErrorCodes.SPARK_EVENT_ERROR,
exception=None)
)


class UrgentEventValidationException(SyncParserException):
"""
This Exception is for missing event data that stops the parser dead in its tracks.
"""
def __init__(self, missing_event: str = ''):

def __init__(self, missing_event: str = ""):

error_message = (
f"{ParserErrorMessages.MISSING_EVENT_GENERIC_MESSAGE} '{missing_event}'. " +
f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} " +
f"{ParserErrorMessages.SUPPORT_MESSAGE}")
f"{ParserErrorMessages.MISSING_EVENT_GENERIC_MESSAGE} '{missing_event}'. "
+ f"{ParserErrorMessages.MISSING_EVENT_EXPLANATION} "
+ f"{ParserErrorMessages.SUPPORT_MESSAGE}"
)

super().__init__(
error_type=ParserErrorTypes.MISSING_EVENT_ERROR,
error_message=error_message,
status_code=ParserErrorCodes.SPARK_EVENT_ERROR,
exception=None)
)


class LogSubmissionException(SyncParserException, ValueError):
"""
This Exception is for malformed log submission
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def __init__(self, error_message: str):

super().__init__(
error_type=ParserErrorTypes.LOG_SUBMISSION_ERROR,
error_message=error_message,
status_code=ParserErrorCodes.LOG_SUBMISSION_ERROR,
)
25 changes: 15 additions & 10 deletions tests/test_bad_eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from zipfile import ZipFile

from spark_log_parser import eventlog, extractor
from spark_log_parser.parsing_models.exceptions import LogSubmissionException


class BadEventLog(unittest.TestCase):
def check_value_error(self, event_log_path, msg):
def check_sync_exceptions(self, event_log_path, msg):

with tempfile.TemporaryDirectory() as temp_dir:
with self.assertRaises(ValueError) as cm:
with self.assertRaises(LogSubmissionException) as cm:

event_log_paths = extractor.Extractor(event_log_path.as_uri(), temp_dir).extract()
eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
Expand All @@ -19,31 +20,33 @@ def check_value_error(self, event_log_path, msg):

def test_multiple_context_ids(self):
event_log = Path("tests", "logs", "bad", "non-unique-context-id.zip").resolve()
self.check_value_error(
self.check_sync_exceptions(
event_log, "Not all rollover log files have the same Spark context ID"
)

def test_missing_dbc_event(self):
event_log = Path("tests", "logs", "bad", "missing-dbc-event.zip").resolve()
self.check_value_error(
self.check_sync_exceptions(
event_log, "Rollover logs were detected, but not all files had rollover properties"
)

def test_duplicate_log_part(self):
event_log = Path("tests", "logs", "bad", "duplicate-part.tgz").resolve()
self.check_value_error(event_log, "Duplicate rollover log file detected")
self.check_sync_exceptions(event_log, "Duplicate rollover log file detected")

def test_missing_log_part(self):
event_log = Path("tests", "logs", "bad", "missing-part.zip").resolve()
self.check_value_error(event_log, "Rollover log file appears to be missing")
self.check_sync_exceptions(event_log, "One or more rollover logs is missing")

def test_missing_first_part(self):
event_log = Path("tests", "logs", "bad", "missing-first-part.zip").resolve()
self.check_value_error(event_log, "Rollover log file appears to be missing")
self.check_sync_exceptions(event_log, "One or more rollover logs is missing")

def test_mixed_parsed(self):
event_log = Path("tests", "logs", "bad", "mixed_parsed.zip").resolve()
self.check_value_error(event_log, "A parsed log file was submitted with other log files")
self.check_sync_exceptions(
event_log, "A parsed log file was submitted with other log files"
)

def test_only_non_first_part(self):
with tempfile.TemporaryDirectory() as temp_dir:
Expand All @@ -52,8 +55,10 @@ def test_only_non_first_part(self):
[zinfo for zinfo in zfile.infolist() if not zinfo.is_dir()][0], temp_dir
)

self.check_value_error(Path(temp_dir), "Rollover log file appears to be missing")
self.check_sync_exceptions(Path(temp_dir), "One or more rollover logs is missing")

def test_empty_log_dir(self):
with tempfile.TemporaryDirectory() as temp_dir:
self.check_value_error(Path(temp_dir), "No files found")
self.check_sync_exceptions(
Path(temp_dir), "No Spark eventlogs were found in submission"
)