Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FILES := $(shell git diff --name-only --diff-filter=AM $$(git merge-base origin/main HEAD) -- \*.py)


.PHONY: test
test:
pytest

.PHONY: lint
lint:
flake8 --filename ./$(FILES) --max-complexity=10 --ignore=E501,W503

.PHONY: format
format:
ifneq ("$(FILES)"," ")
black $(FILES)
isort $(FILES)
endif

.PHONY: tidy
tidy: format lint

6 changes: 1 addition & 5 deletions parse.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ function print_usage() {
}

results_dir="$HOME/results"
args=()

while getopts l:r:h name; do
case $name in
l)
event_log="$OPTARG"
args=("${args[@]}" -l "$OPTARG")
;;
r)
results_dir="$OPTARG"
Expand All @@ -41,6 +39,4 @@ if [[ -z $event_log ]]; then
exit 1
fi

args=("${args[@]}" -r "$results_dir")

exec python3 -m spark_log_parser "${args[@]}"
exec python3 -m spark_log_parser -l "$event_log" -r "$results_dir"
25 changes: 20 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "flit.buildapi"
name = "spark_log_parser"
authors = [{"name" = "Sync Computing"}]
readme = "README.md"
requires-python = ">=3.6.1"
requires-python = ">=3.10.3"
classifiers = [
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
Expand All @@ -16,20 +16,26 @@ classifiers = [
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
]
dependencies = [
"boto3==1.20.24",
"numpy==1.21.4",
"pandas==1.3.5",
"ujson==5.3.0",
"pydantic==1.9.0",
"requests==2.26.0",
]
dynamic = ["version", "description"]

[project.optional-dependencies]
test = [
"pytest==7.1.2",
"black==22.6.0",
"isort==5.10.1",
"flake8==4.0.1",
]

[project.urls]
Home = "https://github.com/synccomputingcode/spark_log_parser"

Expand All @@ -38,3 +44,12 @@ line-length = 100

[tool.isort]
profile = "black"
line_length = 100

[tool.pytest.ini_options]
pythonpath = [
"."
]
filterwarnings = [
"ignore::UserWarning"
]
47 changes: 26 additions & 21 deletions spark_log_parser/__main__.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,42 @@
import argparse
import logging
import os
import sys
import tempfile
from pathlib import Path

logging.captureWarnings(True)

from spark_log_parser.parsing_models.application_model_v2 import sparkApplication

import os
import argparse
import sys
from spark_log_parser.eventlog import EventLogBuilder # noqa: E402
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402

logger = logging.getLogger("spark_log_parser")

parser = argparse.ArgumentParser("spark_log_parser")
parser.add_argument("-l", "--log-file", required=True, help="path to event log")
parser.add_argument("-r", "--result-dir", required=True, help="path to directory in which to save parsed logs")
args = parser.parse_args()

print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")

log_path = os.path.abspath(args.log_file)
parser = argparse.ArgumentParser("spark_log_parser")
parser.add_argument("-l", "--log-file", required=True, type=Path, help="path to event log")
parser.add_argument(
"-r", "--result-dir", required=True, help="path to directory in which to save parsed logs"
)
args = parser.parse_args()

if not os.path.isdir(args.result_dir):
logger.error("%s is not a directory", args.result_dir)
sys.exit(1)

print("\n--Processing log file: " + log_path)
print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
print("--Processing log file: " + str(args.log_file))

log_name = os.path.basename(log_path)
result_path = os.path.join(args.result_dir, "parsed-" + log_name)
with tempfile.TemporaryDirectory() as work_dir:
event_log = EventLogBuilder(args.log_file.resolve().as_uri(), work_dir).build()
app = sparkApplication(eventlog=str(event_log))

if os.path.exists(result_path):
os.remove(result_path)
if args.log_file.suffixes:
result_path = os.path.join(
args.result_dir, "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
)
else:
result_path = os.path.join(args.result_dir, "parsed-" + args.log_file.name)

appobj = sparkApplication(eventlog=log_path)
appobj.save(result_path)
app.save(result_path)

print(f"--Log directory saved to: {result_path}")
print(f"--Result saved to: {result_path}.json")
73 changes: 73 additions & 0 deletions spark_log_parser/eventlog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import json
import tempfile
from pathlib import Path
from urllib.parse import ParseResult

import pandas as pd

from spark_log_parser.extractor import Extractor


class EventLogBuilder:
def __init__(self, source_url: ParseResult | str, work_dir: Path | str, s3_client=None):
self.source_url = source_url
self.work_dir = self._validate_work_dir(work_dir)
self.s3_client = s3_client
self.extractor = Extractor(self.source_url, self.work_dir, self.s3_client)

def _validate_work_dir(self, work_dir: Path | str) -> Path:
work_dir_path = work_dir if isinstance(work_dir, Path) else Path(work_dir)
if not work_dir_path.is_dir():
raise ValueError("Path is not a directory")

return work_dir_path

def build(self) -> Path:
event_logs = self.extractor.extract()

self.event_log = self._concat(event_logs)

return self.event_log

def _concat(self, event_logs: list[Path]) -> Path:
if len(event_logs) == 1:
return event_logs[0]

dat = []
for log in event_logs:
with open(log) as log_file:
try:
line = json.loads(log_file.readline())
except ValueError:
continue # Maybe a Databricks pricing file
if line["Event"] == "DBCEventLoggingListenerMetadata":
dat.append((line["Rollover Number"], line["SparkContext Id"], log))
else:
raise ValueError("Expected DBC event not found")

df = pd.DataFrame(dat, columns=["rollover_index", "context_id", "path"]).sort_values(
"rollover_index"
)

self._validate_rollover_logs(df)

event_log = Path(tempfile.mkstemp(suffix="-concatenated.json", dir=str(self.work_dir))[1])
with open(event_log, "w") as fobj:
for path in df.path:
with open(path) as part_fobj:
for line in part_fobj:
fobj.write(line)

return event_log

def _validate_rollover_logs(self, df: pd.DataFrame):
if not len(df.context_id.unique()) == 1:
raise ValueError("Not all rollover files have the same Spark context ID")

diffs = df.rollover_index.diff()[1:]

if any(diffs > 1) or df.rollover_index[0] > 0:
raise ValueError("Rollover file appears to be missing")

if any(diffs < 1):
raise ValueError("Duplicate rollover file detected")
Loading