Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions spark_log_parser/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import argparse
import logging
import os
import sys
import tempfile
from pathlib import Path
Expand All @@ -12,17 +11,25 @@

logger = logging.getLogger("spark_log_parser")


parser = argparse.ArgumentParser("spark_log_parser")
parser.add_argument("-l", "--log-file", required=True, type=Path, help="path to event log")
parser.add_argument(
"-r", "--result-dir", required=True, help="path to directory in which to save parsed logs"
"-l", "--log-file", required=True, type=Path, help="path to event log file or directory"
)
parser.add_argument(
"-r",
"--result-dir",
required=True,
type=Path,
help="path to directory in which to save the parsed log",
)
args = parser.parse_args()

if not os.path.isdir(args.result_dir):
if not args.result_dir.is_dir():
logger.error("%s is not a directory", args.result_dir)
sys.exit(1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should refrain from using sys.exit() here. In the case the spark_log_parser is used anywhere in the backend or celery task, it would terminate the process, and likely cause unintended side effects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're ok here. This code is only executed by users on the command line, e.g.

python -m spark_log_parser

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, makes sense. It caught my eye because some areas we raise a ValueError but here we exit the process.



print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
print("--Processing log file: " + str(args.log_file))

Expand All @@ -31,12 +38,12 @@
app = sparkApplication(eventlog=str(event_log))

if args.log_file.suffixes:
result_path = os.path.join(
args.result_dir, "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
result_path = args.result_dir.joinpath(
"parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
)
else:
result_path = os.path.join(args.result_dir, "parsed-" + args.log_file.name)
result_path = args.result_dir.joinpath("parsed-" + args.log_file.name)

app.save(result_path)
app.save(str(result_path))

print(f"--Result saved to: {result_path}.json")
76 changes: 44 additions & 32 deletions spark_log_parser/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,51 +23,63 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path:
return work_dir_path

def build(self) -> Path:
event_logs = self.extractor.extract()
paths = self.extractor.extract()

self.event_log = self._concat(event_logs)
if not paths:
raise ValueError("No files found")

return self.event_log
self.event_log = self._get_event_log(paths)

def _concat(self, event_logs: list[Path]) -> Path:
if len(event_logs) == 1:
return event_logs[0]
return self.event_log

dat = []
for log in event_logs:
with open(log) as log_file:
def _get_event_log(self, paths: list[Path]) -> Path:
log_files = []
rollover_dat = []
for path in paths:
with open(path) as fobj:
try:
line = json.loads(log_file.readline())
line = json.loads(fobj.readline())
except ValueError:
continue # Maybe a Databricks pricing file
if line["Event"] == "DBCEventLoggingListenerMetadata":
dat.append((line["Rollover Number"], line["SparkContext Id"], log))
else:
raise ValueError("Expected DBC event not found")
continue
if "Event" in line:
log_files.append(path)
if line["Event"] == "DBCEventLoggingListenerMetadata":
rollover_dat.append(
(line["Rollover Number"], line["SparkContext Id"], path)
)

if rollover_dat:
if len(log_files) > len(rollover_dat):
raise ValueError("No rollover properties found in log file")

return self._concat(rollover_dat)

if len(log_files) > 1:
raise ValueError("No rollover properties found in log file")

return log_files[0]

def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path:
rollover_df = pd.DataFrame(
rollover_dat, columns=["rollover_index", "context_id", "path"]
).sort_values("rollover_index")

if not len(rollover_df.context_id.unique()) == 1:
raise ValueError("Not all rollover log files have the same Spark context ID")

df = pd.DataFrame(dat, columns=["rollover_index", "context_id", "path"]).sort_values(
"rollover_index"
)
diffs = rollover_df.rollover_index.diff()

self._validate_rollover_logs(df)
if any(diffs > 1) or rollover_df.rollover_index[0] > 0:
raise ValueError("Rollover log file appears to be missing")

if any(diffs < 1):
raise ValueError("Duplicate rollover log file detected")

event_log = Path(tempfile.mkstemp(suffix="-concatenated.json", dir=str(self.work_dir))[1])
with open(event_log, "w") as fobj:
for path in df.path:
for path in rollover_df.path:
with open(path) as part_fobj:
for line in part_fobj:
fobj.write(line)

return event_log

def _validate_rollover_logs(self, df: pd.DataFrame):
if not len(df.context_id.unique()) == 1:
raise ValueError("Not all rollover files have the same Spark context ID")

diffs = df.rollover_index.diff()[1:]

if any(diffs > 1) or df.rollover_index[0] > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember seeing this in the last PR to check for the case this PR is addressing. Did this logic just not get run if there was only one log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. With only 1 log file rollover validation was skipped.

raise ValueError("Rollover file appears to be missing")

if any(diffs < 1):
raise ValueError("Duplicate rollover file detected")
Binary file added tests/logs/bad/missing-first-part.zip
Binary file not shown.
16 changes: 16 additions & 0 deletions tests/test_bad_eventlog.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import tempfile
import unittest
from pathlib import Path
from zipfile import ZipFile

from spark_log_parser.eventlog import EventLogBuilder

Expand Down Expand Up @@ -42,3 +43,18 @@ def test_missing_first_part(self):
with tempfile.TemporaryDirectory() as temp_dir:
with self.assertRaises(ValueError, msg="Rollover file appears to be missing"):
EventLogBuilder(event_log.as_uri(), temp_dir).build()

def test_only_non_first_part(self):
with tempfile.TemporaryDirectory() as temp_dir:
with ZipFile(Path("tests", "logs", "bad", "missing-first-part.zip")) as zfile:
zfile.extract(
[zinfo for zinfo in zfile.infolist() if not zinfo.is_dir()][0], temp_dir
)

with self.assertRaises(ValueError, msg="Rollover file appears to be missing"):
EventLogBuilder(Path(temp_dir).as_uri(), temp_dir).build()

def test_empty_log_dir(self):
with tempfile.TemporaryDirectory() as temp_dir:
with self.assertRaises(ValueError, msg="No log files found"):
EventLogBuilder(Path(temp_dir).as_uri(), temp_dir).build()