diff --git a/spark_log_parser/__init__.py b/spark_log_parser/__init__.py index a49b965..e7a49be 100644 --- a/spark_log_parser/__init__.py +++ b/spark_log_parser/__init__.py @@ -1,3 +1,3 @@ """Tools for providing Spark event log""" +__version__ = "0.1.3" -__version__ = "0.1.2" diff --git a/spark_log_parser/cli.py b/spark_log_parser/cli.py index 7df35e9..1bd3748 100644 --- a/spark_log_parser/cli.py +++ b/spark_log_parser/cli.py @@ -10,7 +10,7 @@ logging.captureWarnings(True) from spark_log_parser.eventlog import EventLogBuilder # noqa: E402 -from spark_log_parser.extractor import Extractor # noqa: E402 +from spark_log_parser.extractor import Extractor, ExtractThresholds # noqa: E402 from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402 logger = logging.getLogger("spark_log_parser") @@ -42,8 +42,10 @@ def main(): with tempfile.TemporaryDirectory() as work_dir: - log_path = unquote(args.log_file.resolve().as_uri()) - event_log_paths = Extractor(log_path, work_dir).extract() + event_log_paths = Extractor( + unquote(args.log_file.resolve().as_uri()), work_dir, thresholds=ExtractThresholds(size=20000000000) + ).extract() + event_log = EventLogBuilder(event_log_paths, work_dir).build() app = sparkApplication(eventlog=str(event_log)) diff --git a/spark_log_parser/eventlog.py b/spark_log_parser/eventlog.py index 47fd867..0be9ae7 100644 --- a/spark_log_parser/eventlog.py +++ b/spark_log_parser/eventlog.py @@ -15,6 +15,7 @@ def __init__( self.event_log_paths = self._validate_event_log_paths(event_log_paths) self.work_dir = self._validate_work_dir(work_dir) + def _validate_event_log_paths(self, event_log_paths: list[Path] | list[str]) -> list[Path]: return [Path(x) for x in event_log_paths] diff --git a/spark_log_parser/parsing_models/application_model.py b/spark_log_parser/parsing_models/application_model.py index 3e162f6..36dedf8 100644 --- a/spark_log_parser/parsing_models/application_model.py +++ b/spark_log_parser/parsing_models/application_model.py @@ -143,6 +143,12 @@ def __init__(self, eventlogpath, bucket=None, stdoutpath=None, debug=False): # missing_event=f"Job Start for Stage {stage_id}" ) + if "Submission Time" not in json_data["Stage Info"]: + # PROD-426 Submission Time key may be missing from stages that + # don't get submitted. There is usually a StageCompleted event + # shortly after. + continue + for job_id in self.jobs_for_stage[stage_id]: self.jobs[job_id].stages[stage_id].submission_time = ( json_data["Stage Info"]["Submission Time"] / 1000 diff --git a/tests/test_eventlog.py b/tests/test_eventlog.py index 320e3b1..90de8cf 100644 --- a/tests/test_eventlog.py +++ b/tests/test_eventlog.py @@ -23,8 +23,8 @@ def test_simple_emr_log(): event_log_path = Path("tests", "logs", "emr.zip").resolve() event = get_event(event_log_path) - assert all(key in event for key in ["Event", "Spark Version"]), "All keys are present" - assert event["Event"] == "SparkListenerLogStart", "Expected first event is present" + assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present" + assert event["Event"] == "SparkListenerLogStart", "First event is not as expected" def test_simple_databricks_log(): @@ -33,7 +33,7 @@ def test_simple_databricks_log(): assert all( key in event for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"] - ), "All keys are present" + ), "Not all keys are present" def test_raw_databricks_log(): @@ -43,9 +43,10 @@ def test_raw_databricks_log(): assert all( key in event for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"] - ), "All keys are present" + ), "Not all keys are present" + + assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected" - assert event["Event"] == "DBCEventLoggingListenerMetadata", "Expected first event is present" def test_log_in_dir(): @@ -55,9 +56,9 @@ def test_log_in_dir(): assert all( key in event for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"] - ), "All keys are present" + ), "Not all keys are present" - assert event["Event"] == "DBCEventLoggingListenerMetadata", "Expected first event is present" + assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected" class RolloverLog(unittest.TestCase): @@ -100,11 +101,11 @@ def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_tota "Rollover Number", "SparkContext Id", ] - ), "All keys are present" + ), "Not all keys are present" assert ( rollover_count == event["Rollover Number"] - ), "Contiguous monotonically increasing IDs" + ), "Rollover IDs are not contiguous and monotonically increasing" rollover_count += 1 - assert rollover_count == log_file_total, "All log parts are present" - assert i + 1 == log_entry_total, "All events are present" + assert rollover_count == log_file_total, "Not all log parts are present" + assert i + 1 == log_entry_total, "Not all events are present" diff --git a/tests/test_eventlog_s3.py b/tests/test_eventlog_s3.py index 58ef4ac..692ce6a 100644 --- a/tests/test_eventlog_s3.py +++ b/tests/test_eventlog_s3.py @@ -71,9 +71,8 @@ def test_emr_log_from_s3(event_log_url, event_log_file_archive, event_log_s3_dir with open(event_log) as log_fobj: event = json.loads(log_fobj.readline()) - assert all(key in event for key in ["Event", "Spark Version"]), "All keys are present" - - assert event["Event"] == "SparkListenerLogStart", "Expected first event is present" + assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present" + assert event["Event"] == "SparkListenerLogStart", "Expected first event is not present" @pytest.mark.parametrize( @@ -158,13 +157,13 @@ def test_databricks_log_from_s3_dir(event_log_url, event_log_file_archive, event "Rollover Number", "SparkContext Id", ] - ), "All keys are present" + ), "Not all keys are present" assert ( rollover_count == event["Rollover Number"] - ), "Contiguous monotonically increasing IDs" + ), "Rollover IDs are not contiguous and monotonically increasing" rollover_count += 1 except Exception as exc: raise ValueError("Problem with line %d: %s" % (i, event_str), exc) - assert rollover_count == 3, "All log parts are present" - assert i + 1 == 16945, "All events are present" + assert rollover_count == 3, "Not all log parts are present" + assert i + 1 == 16945, "Not all events are present" diff --git a/tests/test_parse.py b/tests/test_parse.py index 8b4a613..e4f5a14 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -37,7 +37,7 @@ def test_simple_databricks_log(): "stageData", "taskData", ] - ), "All keys are present" + ), "Not all keys are present" assert ( parsed["metadata"]["application_info"]["name"] == "Databricks Shell" @@ -60,7 +60,7 @@ def test_simple_emr_log(): "stageData", "taskData", ] - ), "All keys are present" + ), "Not all keys are present" assert ( parsed["metadata"]["application_info"]["name"] == "Text Similarity"