diff --git a/spark_log_parser/__init__.py b/spark_log_parser/__init__.py index 000be10..075fd67 100644 --- a/spark_log_parser/__init__.py +++ b/spark_log_parser/__init__.py @@ -1,3 +1,2 @@ """Tools for providing Spark event log""" -__version__ = "0.1.4" - +__version__ = "0.1.5" diff --git a/spark_log_parser/cli.py b/spark_log_parser/cli.py index 1bd3748..8a0ee8a 100644 --- a/spark_log_parser/cli.py +++ b/spark_log_parser/cli.py @@ -2,6 +2,8 @@ import logging import sys import tempfile +import json +import shutil from pathlib import Path from urllib.parse import unquote @@ -40,15 +42,6 @@ def main(): print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n") print("--Processing log file: " + str(args.log_file)) - with tempfile.TemporaryDirectory() as work_dir: - - event_log_paths = Extractor( - unquote(args.log_file.resolve().as_uri()), work_dir, thresholds=ExtractThresholds(size=20000000000) - ).extract() - - event_log = EventLogBuilder(event_log_paths, work_dir).build() - app = sparkApplication(eventlog=str(event_log)) - if args.log_file.suffixes: result_path = args.result_dir.joinpath( "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))] @@ -56,6 +49,21 @@ def main(): else: result_path = args.result_dir.joinpath("parsed-" + args.log_file.name) - app.save(str(result_path)) + with tempfile.TemporaryDirectory() as work_dir: + + event_log_paths = Extractor( + unquote(args.log_file.resolve().as_uri()), + work_dir, + thresholds=ExtractThresholds(size=20000000000), + ).extract() + + event_log, parsed = EventLogBuilder(event_log_paths, work_dir).build() + + if not parsed: + app = sparkApplication(spark_eventlog_path=str(event_log)) + app.save(str(result_path)) + else: + print("--Input log was already parsed") + shutil.copyfile(event_log, str(result_path) + ".json") print(f"--Result saved to: {result_path}.json") diff --git a/spark_log_parser/eventlog.py b/spark_log_parser/eventlog.py index 0be9ae7..a354de0 100644 --- a/spark_log_parser/eventlog.py +++ b/spark_log_parser/eventlog.py @@ -11,11 +11,9 @@ def __init__( event_log_paths: list[Path] | list[str], work_dir: Path | str, ): - self.event_log_paths = self._validate_event_log_paths(event_log_paths) self.work_dir = self._validate_work_dir(work_dir) - def _validate_event_log_paths(self, event_log_paths: list[Path] | list[str]) -> list[Path]: return [Path(x) for x in event_log_paths] @@ -26,41 +24,58 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path: return work_dir_path - def build(self) -> Path: + def build(self) -> tuple[Path, bool]: if not self.event_log_paths: raise ValueError("No files found") - self.event_log = self._get_event_log(self.event_log_paths) + self.event_log, self.parsed = self._get_event_log(self.event_log_paths) - return self.event_log + return self.event_log, self.parsed + + def _get_event_log(self, paths: list[Path]) -> tuple[Path, bool]: - def _get_event_log(self, paths: list[Path]) -> Path: log_files = [] rollover_dat = [] + parsed = False for path in paths: - with open(path) as fobj: - try: + try: # Test if it is a raw log + with open(path) as fobj: line = json.loads(fobj.readline()) + if "Event" in line: + log_files.append(path) + if line["Event"] == "DBCEventLoggingListenerMetadata": + rollover_dat.append( + (line["Rollover Number"], line["SparkContext Id"], path) + ) + else: + raise ValueError + + except ValueError: + try: # Test if it is a parsed log + with open(path) as fobj: + data = json.load(fobj) + if "jobData" in data: + log_files.append(path) + parsed = True except ValueError: continue - if "Event" in line: - log_files.append(path) - if line["Event"] == "DBCEventLoggingListenerMetadata": - rollover_dat.append( - (line["Rollover Number"], line["SparkContext Id"], path) - ) + + if len(log_files) > 1 and parsed: + raise ValueError("A parsed log file was submitted with other log files") if rollover_dat: if len(log_files) > len(rollover_dat): - raise ValueError("No rollover properties found in log file") + raise ValueError( + "Rollover logs were detected, but not all files had rollover properties" + ) - return self._concat(rollover_dat) + return self._concat(rollover_dat), False if len(log_files) > 1: - raise ValueError("No rollover properties found in log file") + raise ValueError("Multiple files detected without log rollover properties") - return log_files[0] + return log_files[0], parsed def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path: rollover_df = ( diff --git a/spark_log_parser/parsing_models/application_model_v2.py b/spark_log_parser/parsing_models/application_model_v2.py index dd69819..be44f95 100644 --- a/spark_log_parser/parsing_models/application_model_v2.py +++ b/spark_log_parser/parsing_models/application_model_v2.py @@ -17,42 +17,38 @@ class sparkApplication: def __init__( self, - objfile=None, # Previously saved object. This is the fastest and best option - appobj=None, # application_model object - eventlog=None, # spark eventlog path, + spark_eventlog_parsed_path=None, + spark_eventlog_path=None, stdout=None, debug=False, ): - self.eventlog = eventlog + self.spark_eventlog_path = spark_eventlog_path + self.spark_eventlog_parsed_path = spark_eventlog_parsed_path self.existsSQL = False self.existsExecutors = False - # self.sparkMetadata = {} self.metadata = {} self.stdout = stdout self.debug = debug - if objfile is not None: # Load a previously saved sparkApplication Model - self.load(filepath=objfile) + if self.spark_eventlog_parsed_path is not None: + self.load(filepath=self.spark_eventlog_parsed_path) - if (appobj is not None) or (eventlog is not None): # Load an application_model or eventlog + elif self.spark_eventlog_path is not None: - if eventlog is not None: - t0 = time.time() - if "s3://" in eventlog: - path = eventlog.replace("s3://", "").split("/") - bucket = path[0] - path = "/".join(path[1:]) - else: - path = eventlog - bucket = None - - appobj = ApplicationModel( - eventlogpath=path, bucket=bucket, stdoutpath=stdout, debug=debug - ) - logging.info("Loaded object from spark eventlog [%.2fs]" % (time.time() - t0)) + t0 = time.time() + if "s3://" in self.spark_eventlog_path: + path = self.spark_eventlog_path.replace("s3://", "").split("/") + bucket = path[0] + path = "/".join(path[1:]) else: - logging.info("Loaded object from ApplicationModel object") + path = self.spark_eventlog_path + bucket = None + + appobj = ApplicationModel( + eventlogpath=path, bucket=bucket, stdoutpath=stdout, debug=debug + ) + logging.info("Loaded object from spark eventlog [%.2fs]" % (time.time() - t0)) self.validate_app(appobj, self.debug) @@ -651,9 +647,11 @@ def save(self, filepath=None, compress=False): def save_to_local(self, saveDat, filepath, compress): if filepath is None: - if self.eventlog is None: + if self.spark_eventlog_path is None: raise Exception('No input eventlog found. Must specify "filepath".') - inputFile = os.path.basename(os.path.normpath(self.eventlog)).replace(".gz", "") + inputFile = os.path.basename(os.path.normpath(self.spark_eventlog_path)).replace( + ".gz", "" + ) filepath = inputFile + "-sync" if compress is False: diff --git a/tests/logs/bad/mixed_parsed.zip b/tests/logs/bad/mixed_parsed.zip new file mode 100644 index 0000000..081051f Binary files /dev/null and b/tests/logs/bad/mixed_parsed.zip differ diff --git a/tests/logs/similarity_parsed.json.gz b/tests/logs/similarity_parsed.json.gz new file mode 100644 index 0000000..a4cd7b7 Binary files /dev/null and b/tests/logs/similarity_parsed.json.gz differ diff --git a/tests/test_bad_eventlog.py b/tests/test_bad_eventlog.py index 262dc1f..61d098c 100644 --- a/tests/test_bad_eventlog.py +++ b/tests/test_bad_eventlog.py @@ -25,7 +25,9 @@ def test_multiple_context_ids(self): def test_missing_dbc_event(self): event_log = Path("tests", "logs", "bad", "missing-dbc-event.zip").resolve() - self.check_value_error(event_log, "No rollover properties found in log file") + self.check_value_error( + event_log, "Rollover logs were detected, but not all files had rollover properties" + ) def test_duplicate_log_part(self): event_log = Path("tests", "logs", "bad", "duplicate-part.tgz").resolve() @@ -39,6 +41,10 @@ def test_missing_first_part(self): event_log = Path("tests", "logs", "bad", "missing-first-part.zip").resolve() self.check_value_error(event_log, "Rollover log file appears to be missing") + def test_mixed_parsed(self): + event_log = Path("tests", "logs", "bad", "mixed_parsed.zip").resolve() + self.check_value_error(event_log, "A parsed log file was submitted with other log files") + def test_only_non_first_part(self): with tempfile.TemporaryDirectory() as temp_dir: with ZipFile(Path("tests", "logs", "bad", "missing-first-part.zip")) as zfile: diff --git a/tests/test_eventlog.py b/tests/test_eventlog.py index 90de8cf..de21a75 100644 --- a/tests/test_eventlog.py +++ b/tests/test_eventlog.py @@ -11,34 +11,37 @@ def get_event(event_log_path): with tempfile.TemporaryDirectory() as temp_dir: event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract() - event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() + event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() with open(event_log) as log_fobj: event = json.loads(log_fobj.readline()) - return event + return event, parsed def test_simple_emr_log(): event_log_path = Path("tests", "logs", "emr.zip").resolve() - event = get_event(event_log_path) + event, parsed = get_event(event_log_path) assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present" assert event["Event"] == "SparkListenerLogStart", "First event is not as expected" + assert not parsed def test_simple_databricks_log(): event_log_path = Path("tests", "logs", "databricks.zip").resolve() - event = get_event(event_log_path) + event, parsed = get_event(event_log_path) + assert all( key in event for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"] ), "Not all keys are present" + assert not parsed def test_raw_databricks_log(): event_log_path = Path("tests", "logs", "databricks.json").resolve() - event = get_event(event_log_path) + event, parsed = get_event(event_log_path) assert all( key in event @@ -46,12 +49,12 @@ def test_raw_databricks_log(): ), "Not all keys are present" assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected" - + assert not parsed def test_log_in_dir(): event_log_path = Path("tests", "logs", "log_in_dir", "databricks.json.gz").resolve() - event = get_event(event_log_path) + event, parsed = get_event(event_log_path) assert all( key in event @@ -59,6 +62,7 @@ def test_log_in_dir(): ), "Not all keys are present" assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected" + assert not parsed class RolloverLog(unittest.TestCase): @@ -82,7 +86,7 @@ def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_tota event_log_paths = extractor.Extractor( event_log_path.resolve().as_uri(), temp_dir ).extract() - event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() + event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() with open(event_log) as log_fobj: event = json.loads(log_fobj.readline()) @@ -109,3 +113,9 @@ def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_tota assert rollover_count == log_file_total, "Not all log parts are present" assert i + 1 == log_entry_total, "Not all events are present" + assert not parsed + + +if __name__ == "__main__": + + test_simple_emr_log() diff --git a/tests/test_eventlog_s3.py b/tests/test_eventlog_s3.py index 692ce6a..1090cb9 100644 --- a/tests/test_eventlog_s3.py +++ b/tests/test_eventlog_s3.py @@ -66,13 +66,14 @@ def test_emr_log_from_s3(event_log_url, event_log_file_archive, event_log_s3_dir with tempfile.TemporaryDirectory() as temp_dir, stubber: event_log_paths = extractor.Extractor(event_log_url, temp_dir, s3).extract() - event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() + event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() with open(event_log) as log_fobj: event = json.loads(log_fobj.readline()) assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present" assert event["Event"] == "SparkListenerLogStart", "Expected first event is not present" + assert not parsed @pytest.mark.parametrize( @@ -135,7 +136,7 @@ def test_databricks_log_from_s3_dir(event_log_url, event_log_file_archive, event with stubber: event_log_paths = extractor.Extractor(event_log_url, temp_dir, s3).extract() - event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() + event_log, _ = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() stubber.assert_no_pending_responses() diff --git a/tests/test_parse.py b/tests/test_parse.py index e4f5a14..4fa478f 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -5,15 +5,25 @@ from spark_log_parser import eventlog, extractor from spark_log_parser.parsing_models.application_model_v2 import sparkApplication +PARSED_KEYS = [ + "accumData", + "executors", + "jobData", + "metadata", + "sqlData", + "stageData", + "taskData", +] + def get_parsed_log(event_log_path): with tempfile.TemporaryDirectory() as temp_dir: event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract() - event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() + event_log, _ = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() result_path = str(Path(temp_dir, "result")) - sparkApplication(eventlog=str(event_log)).save(result_path) + sparkApplication(spark_eventlog_path=str(event_log)).save(result_path) with open(result_path + ".json") as result_fobj: parsed = json.load(result_fobj) @@ -26,19 +36,7 @@ def test_simple_databricks_log(): parsed = get_parsed_log(event_log_path) - assert all( - key in parsed - for key in [ - "accumData", - "executors", - "jobData", - "metadata", - "sqlData", - "stageData", - "taskData", - ] - ), "Not all keys are present" - + assert all(key in parsed for key in PARSED_KEYS), "Not all keys are present" assert ( parsed["metadata"]["application_info"]["name"] == "Databricks Shell" ), "Name is as expected" @@ -49,30 +47,30 @@ def test_simple_emr_log(): parsed = get_parsed_log(event_log_path) - assert all( - key in parsed - for key in [ - "accumData", - "executors", - "jobData", - "metadata", - "sqlData", - "stageData", - "taskData", - ] - ), "Not all keys are present" - + assert all(key in parsed for key in PARSED_KEYS), "Not all keys are present" assert ( parsed["metadata"]["application_info"]["name"] == "Text Similarity" ), "Name is as expected" +def test_parsed_log(): + event_log_path = Path("tests", "logs", "similarity_parsed.json.gz").resolve() + + with tempfile.TemporaryDirectory() as temp_dir: + event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract() + event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() + assert parsed + + sparkApplication(spark_eventlog_parsed_path=str(event_log)) + + def test_emr_missing_sql_events(): event_log_path = Path("tests", "logs", "emr_missing_sql_events.zip").resolve() with tempfile.TemporaryDirectory() as temp_dir: event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract() - event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() - obj = sparkApplication(eventlog=str(event_log)) + event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build() + assert not parsed - assert list(obj.sqlData.index.values) == [0, 2, 3, 5, 6, 7, 8] + obj = sparkApplication(spark_eventlog_path=str(event_log)) + assert list(obj.sqlData.index.values) == [0, 2, 3, 5, 6, 7, 8]