diff --git a/spark_log_parser/__main__.py b/spark_log_parser/__main__.py index 1f6d748..9aaf01f 100644 --- a/spark_log_parser/__main__.py +++ b/spark_log_parser/__main__.py @@ -1,6 +1,5 @@ import argparse import logging -import os import sys import tempfile from pathlib import Path @@ -12,17 +11,25 @@ logger = logging.getLogger("spark_log_parser") + parser = argparse.ArgumentParser("spark_log_parser") -parser.add_argument("-l", "--log-file", required=True, type=Path, help="path to event log") parser.add_argument( - "-r", "--result-dir", required=True, help="path to directory in which to save parsed logs" + "-l", "--log-file", required=True, type=Path, help="path to event log file or directory" +) +parser.add_argument( + "-r", + "--result-dir", + required=True, + type=Path, + help="path to directory in which to save the parsed log", ) args = parser.parse_args() -if not os.path.isdir(args.result_dir): +if not args.result_dir.is_dir(): logger.error("%s is not a directory", args.result_dir) sys.exit(1) + print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n") print("--Processing log file: " + str(args.log_file)) @@ -31,12 +38,12 @@ app = sparkApplication(eventlog=str(event_log)) if args.log_file.suffixes: - result_path = os.path.join( - args.result_dir, "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))] + result_path = args.result_dir.joinpath( + "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))] ) else: - result_path = os.path.join(args.result_dir, "parsed-" + args.log_file.name) + result_path = args.result_dir.joinpath("parsed-" + args.log_file.name) -app.save(result_path) +app.save(str(result_path)) print(f"--Result saved to: {result_path}.json") diff --git a/spark_log_parser/eventlog.py b/spark_log_parser/eventlog.py index 926fe28..8a03951 100644 --- a/spark_log_parser/eventlog.py +++ b/spark_log_parser/eventlog.py @@ -23,51 +23,63 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path: return work_dir_path def build(self) -> Path: - event_logs = self.extractor.extract() + paths = self.extractor.extract() - self.event_log = self._concat(event_logs) + if not paths: + raise ValueError("No files found") - return self.event_log + self.event_log = self._get_event_log(paths) - def _concat(self, event_logs: list[Path]) -> Path: - if len(event_logs) == 1: - return event_logs[0] + return self.event_log - dat = [] - for log in event_logs: - with open(log) as log_file: + def _get_event_log(self, paths: list[Path]) -> Path: + log_files = [] + rollover_dat = [] + for path in paths: + with open(path) as fobj: try: - line = json.loads(log_file.readline()) + line = json.loads(fobj.readline()) except ValueError: - continue # Maybe a Databricks pricing file - if line["Event"] == "DBCEventLoggingListenerMetadata": - dat.append((line["Rollover Number"], line["SparkContext Id"], log)) - else: - raise ValueError("Expected DBC event not found") + continue + if "Event" in line: + log_files.append(path) + if line["Event"] == "DBCEventLoggingListenerMetadata": + rollover_dat.append( + (line["Rollover Number"], line["SparkContext Id"], path) + ) + + if rollover_dat: + if len(log_files) > len(rollover_dat): + raise ValueError("No rollover properties found in log file") + + return self._concat(rollover_dat) + + if len(log_files) > 1: + raise ValueError("No rollover properties found in log file") + + return log_files[0] + + def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path: + rollover_df = pd.DataFrame( + rollover_dat, columns=["rollover_index", "context_id", "path"] + ).sort_values("rollover_index") + + if not len(rollover_df.context_id.unique()) == 1: + raise ValueError("Not all rollover log files have the same Spark context ID") - df = pd.DataFrame(dat, columns=["rollover_index", "context_id", "path"]).sort_values( - "rollover_index" - ) + diffs = rollover_df.rollover_index.diff() - self._validate_rollover_logs(df) + if any(diffs > 1) or rollover_df.rollover_index[0] > 0: + raise ValueError("Rollover log file appears to be missing") + + if any(diffs < 1): + raise ValueError("Duplicate rollover log file detected") event_log = Path(tempfile.mkstemp(suffix="-concatenated.json", dir=str(self.work_dir))[1]) with open(event_log, "w") as fobj: - for path in df.path: + for path in rollover_df.path: with open(path) as part_fobj: for line in part_fobj: fobj.write(line) return event_log - - def _validate_rollover_logs(self, df: pd.DataFrame): - if not len(df.context_id.unique()) == 1: - raise ValueError("Not all rollover files have the same Spark context ID") - - diffs = df.rollover_index.diff()[1:] - - if any(diffs > 1) or df.rollover_index[0] > 0: - raise ValueError("Rollover file appears to be missing") - - if any(diffs < 1): - raise ValueError("Duplicate rollover file detected") diff --git a/tests/logs/bad/missing-first-part.zip b/tests/logs/bad/missing-first-part.zip new file mode 100644 index 0000000..ab55b8b Binary files /dev/null and b/tests/logs/bad/missing-first-part.zip differ diff --git a/tests/test_bad_eventlog.py b/tests/test_bad_eventlog.py index 236db54..446d2bc 100644 --- a/tests/test_bad_eventlog.py +++ b/tests/test_bad_eventlog.py @@ -1,6 +1,7 @@ import tempfile import unittest from pathlib import Path +from zipfile import ZipFile from spark_log_parser.eventlog import EventLogBuilder @@ -42,3 +43,18 @@ def test_missing_first_part(self): with tempfile.TemporaryDirectory() as temp_dir: with self.assertRaises(ValueError, msg="Rollover file appears to be missing"): EventLogBuilder(event_log.as_uri(), temp_dir).build() + + def test_only_non_first_part(self): + with tempfile.TemporaryDirectory() as temp_dir: + with ZipFile(Path("tests", "logs", "bad", "missing-first-part.zip")) as zfile: + zfile.extract( + [zinfo for zinfo in zfile.infolist() if not zinfo.is_dir()][0], temp_dir + ) + + with self.assertRaises(ValueError, msg="Rollover file appears to be missing"): + EventLogBuilder(Path(temp_dir).as_uri(), temp_dir).build() + + def test_empty_log_dir(self): + with tempfile.TemporaryDirectory() as temp_dir: + with self.assertRaises(ValueError, msg="No log files found"): + EventLogBuilder(Path(temp_dir).as_uri(), temp_dir).build()