From 9232866c62e030f2d170f9c39cb63231e36198cc Mon Sep 17 00:00:00 2001 From: Scott Romney Date: Wed, 3 Aug 2022 20:12:28 -0700 Subject: [PATCH] [PROD-399] Handle edge case of single rollover file with index > 0 --- spark_log_parser/__main__.py | 23 +++++--- spark_log_parser/eventlog.py | 76 +++++++++++++++----------- tests/logs/bad/missing-first-part.zip | Bin 0 -> 1214 bytes tests/test_bad_eventlog.py | 16 ++++++ 4 files changed, 75 insertions(+), 40 deletions(-) create mode 100644 tests/logs/bad/missing-first-part.zip diff --git a/spark_log_parser/__main__.py b/spark_log_parser/__main__.py index 1f6d748..9aaf01f 100644 --- a/spark_log_parser/__main__.py +++ b/spark_log_parser/__main__.py @@ -1,6 +1,5 @@ import argparse import logging -import os import sys import tempfile from pathlib import Path @@ -12,17 +11,25 @@ logger = logging.getLogger("spark_log_parser") + parser = argparse.ArgumentParser("spark_log_parser") -parser.add_argument("-l", "--log-file", required=True, type=Path, help="path to event log") parser.add_argument( - "-r", "--result-dir", required=True, help="path to directory in which to save parsed logs" + "-l", "--log-file", required=True, type=Path, help="path to event log file or directory" +) +parser.add_argument( + "-r", + "--result-dir", + required=True, + type=Path, + help="path to directory in which to save the parsed log", ) args = parser.parse_args() -if not os.path.isdir(args.result_dir): +if not args.result_dir.is_dir(): logger.error("%s is not a directory", args.result_dir) sys.exit(1) + print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n") print("--Processing log file: " + str(args.log_file)) @@ -31,12 +38,12 @@ app = sparkApplication(eventlog=str(event_log)) if args.log_file.suffixes: - result_path = os.path.join( - args.result_dir, "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))] + result_path = args.result_dir.joinpath( + "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))] ) else: - result_path = os.path.join(args.result_dir, "parsed-" + args.log_file.name) + result_path = args.result_dir.joinpath("parsed-" + args.log_file.name) -app.save(result_path) +app.save(str(result_path)) print(f"--Result saved to: {result_path}.json") diff --git a/spark_log_parser/eventlog.py b/spark_log_parser/eventlog.py index 926fe28..8a03951 100644 --- a/spark_log_parser/eventlog.py +++ b/spark_log_parser/eventlog.py @@ -23,51 +23,63 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path: return work_dir_path def build(self) -> Path: - event_logs = self.extractor.extract() + paths = self.extractor.extract() - self.event_log = self._concat(event_logs) + if not paths: + raise ValueError("No files found") - return self.event_log + self.event_log = self._get_event_log(paths) - def _concat(self, event_logs: list[Path]) -> Path: - if len(event_logs) == 1: - return event_logs[0] + return self.event_log - dat = [] - for log in event_logs: - with open(log) as log_file: + def _get_event_log(self, paths: list[Path]) -> Path: + log_files = [] + rollover_dat = [] + for path in paths: + with open(path) as fobj: try: - line = json.loads(log_file.readline()) + line = json.loads(fobj.readline()) except ValueError: - continue # Maybe a Databricks pricing file - if line["Event"] == "DBCEventLoggingListenerMetadata": - dat.append((line["Rollover Number"], line["SparkContext Id"], log)) - else: - raise ValueError("Expected DBC event not found") + continue + if "Event" in line: + log_files.append(path) + if line["Event"] == "DBCEventLoggingListenerMetadata": + rollover_dat.append( + (line["Rollover Number"], line["SparkContext Id"], path) + ) + + if rollover_dat: + if len(log_files) > len(rollover_dat): + raise ValueError("No rollover properties found in log file") + + return self._concat(rollover_dat) + + if len(log_files) > 1: + raise ValueError("No rollover properties found in log file") + + return log_files[0] + + def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path: + rollover_df = pd.DataFrame( + rollover_dat, columns=["rollover_index", "context_id", "path"] + ).sort_values("rollover_index") + + if not len(rollover_df.context_id.unique()) == 1: + raise ValueError("Not all rollover log files have the same Spark context ID") - df = pd.DataFrame(dat, columns=["rollover_index", "context_id", "path"]).sort_values( - "rollover_index" - ) + diffs = rollover_df.rollover_index.diff() - self._validate_rollover_logs(df) + if any(diffs > 1) or rollover_df.rollover_index[0] > 0: + raise ValueError("Rollover log file appears to be missing") + + if any(diffs < 1): + raise ValueError("Duplicate rollover log file detected") event_log = Path(tempfile.mkstemp(suffix="-concatenated.json", dir=str(self.work_dir))[1]) with open(event_log, "w") as fobj: - for path in df.path: + for path in rollover_df.path: with open(path) as part_fobj: for line in part_fobj: fobj.write(line) return event_log - - def _validate_rollover_logs(self, df: pd.DataFrame): - if not len(df.context_id.unique()) == 1: - raise ValueError("Not all rollover files have the same Spark context ID") - - diffs = df.rollover_index.diff()[1:] - - if any(diffs > 1) or df.rollover_index[0] > 0: - raise ValueError("Rollover file appears to be missing") - - if any(diffs < 1): - raise ValueError("Duplicate rollover file detected") diff --git a/tests/logs/bad/missing-first-part.zip b/tests/logs/bad/missing-first-part.zip new file mode 100644 index 0000000000000000000000000000000000000000..ab55b8b0f6f393e47654efa5ce2b8b3bee97cbf8 GIT binary patch literal 1214 zcmWIWW@h1H0D(P$Orc-~ln`c+VaUxaF3!wL*G&Tu7LsA8U~TaNvoqUtkLZ`$aTm-fVE!r&CV%HJacX>zSQKZu75XdDoDjcOAK#3?v->ch#g%%*`u)x1y9|jkD|bR(YdKzR#w! zT&(wBbo=qa>iiq)B{+26%=*?IE5jz=c22*A@v`8_uYEF43TDPurI`D4_a|zfY7$TS z9B%*6DR0)nBHdOk(_|m+iHE_yO3M({ns;ZoO--Ejh=2{dcQD zf|b(EgOgfi^fvqUziG?-XqdyQ9}$@Q@0EJY2|r_rux_uS*y%mjt=fxaj_*i1<#+yN zA1BZM_+1hl@l#~k44hsEX&zM3pY6t}y7XTSV}Lg!lN>XyoFD;A!XO~P@YWGTW6c+= zkbHrWCqM?{$`ufUf%%7FNuw`zgYo4Npp~FJLcmJYl!