Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ If you have not already done so, complete the [instructions](https://github.com/
log file destination with the -l flag.

```shell
python3 -m spark_log_parser -l <log file location> -r <results directory>
spark-log-parser -l <log file location> -r <results directory>
```

The parsed file `parsed-<log file name>` will appear in the results directory.
Expand Down
42 changes: 0 additions & 42 deletions parse.sh

This file was deleted.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ test = [
"flake8==4.0.1",
]

[project.scripts]
spark-log-parser = "spark_log_parser.cli:main"

[project.urls]
Home = "https://github.com/synccomputingcode/spark_log_parser"

Expand Down
2 changes: 1 addition & 1 deletion spark_log_parser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Tools for providing Spark event log"""

__version__ = "0.0.1"
__version__ = "0.1.0"
50 changes: 2 additions & 48 deletions spark_log_parser/__main__.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,3 @@
import argparse
import logging
import sys
import tempfile
from pathlib import Path
from spark_log_parser.cli import main

logging.captureWarnings(True)

from spark_log_parser.eventlog import EventLogBuilder # noqa: E402
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402

logger = logging.getLogger("spark_log_parser")


parser = argparse.ArgumentParser("spark_log_parser")
parser.add_argument(
"-l", "--log-file", required=True, type=Path, help="path to event log file or directory"
)
parser.add_argument(
"-r",
"--result-dir",
required=True,
type=Path,
help="path to directory in which to save the parsed log",
)
args = parser.parse_args()

if not args.result_dir.is_dir():
logger.error("%s is not a directory", args.result_dir)
sys.exit(1)


print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
print("--Processing log file: " + str(args.log_file))

with tempfile.TemporaryDirectory() as work_dir:
event_log = EventLogBuilder(args.log_file.resolve().as_uri(), work_dir).build()
app = sparkApplication(eventlog=str(event_log))

if args.log_file.suffixes:
result_path = args.result_dir.joinpath(
"parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
)
else:
result_path = args.result_dir.joinpath("parsed-" + args.log_file.name)

app.save(str(result_path))

print(f"--Result saved to: {result_path}.json")
main()
54 changes: 54 additions & 0 deletions spark_log_parser/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import argparse
import logging
import sys
import tempfile
from pathlib import Path

import spark_log_parser

logging.captureWarnings(True)

from spark_log_parser.eventlog import EventLogBuilder # noqa: E402
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402

logger = logging.getLogger("spark_log_parser")


def main():
parser = argparse.ArgumentParser("spark-log-parser")
parser.add_argument(
"-l", "--log-file", required=True, type=Path, help="path to event log file or directory"
)
parser.add_argument(
"-r",
"--result-dir",
required=True,
type=Path,
help="path to directory in which to save the parsed log",
)
parser.add_argument(
"--version", action="version", version="%(prog)s " + spark_log_parser.__version__
)
args = parser.parse_args()

if not args.result_dir.is_dir():
logger.error("%s is not a directory", args.result_dir)
sys.exit(1)

print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
print("--Processing log file: " + str(args.log_file))

with tempfile.TemporaryDirectory() as work_dir:
event_log = EventLogBuilder(args.log_file.resolve().as_uri(), work_dir).build()
app = sparkApplication(eventlog=str(event_log))

if args.log_file.suffixes:
result_path = args.result_dir.joinpath(
"parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
)
else:
result_path = args.result_dir.joinpath("parsed-" + args.log_file.name)

app.save(str(result_path))

print(f"--Result saved to: {result_path}.json")
14 changes: 11 additions & 3 deletions spark_log_parser/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,23 @@

import pandas as pd

from spark_log_parser.extractor import Extractor
from spark_log_parser.extractor import Extractor, ExtractThresholds


class EventLogBuilder:
def __init__(self, source_url: ParseResult | str, work_dir: Path | str, s3_client=None):
def __init__(
self,
source_url: ParseResult | str,
work_dir: Path | str,
s3_client=None,
extract_thresholds=ExtractThresholds(),
):
self.source_url = source_url
self.work_dir = self._validate_work_dir(work_dir)
self.s3_client = s3_client
self.extractor = Extractor(self.source_url, self.work_dir, self.s3_client)
self.extractor = Extractor(
self.source_url, self.work_dir, self.s3_client, extract_thresholds
)

def _validate_work_dir(self, work_dir: Path | str) -> Path:
work_dir_path = work_dir if isinstance(work_dir, Path) else Path(work_dir)
Expand Down
31 changes: 21 additions & 10 deletions spark_log_parser/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
from urllib.parse import ParseResult, urlparse

import requests
from pydantic import BaseModel


class ExtractThresholds(BaseModel):
entries = 100
size = 5000000000
ratio = 100


class Extractor:
Expand All @@ -18,17 +25,21 @@ class Extractor:
ALLOWED_SCHEMES = {"https", "s3", "file"}
FILE_SKIP_PATTERNS = [".DS_Store".lower(), "__MACOSX".lower(), "/."]

THRESHOLD_ENTRIES = 100
THRESHOLD_SIZE = 5000000000
THRESHOLD_RATIO = 100

def __init__(self, source_url: ParseResult | str, work_dir: Path | str, s3_client=None):
def __init__(
self,
source_url: ParseResult | str,
work_dir: Path | str,
s3_client=None,
thresholds=ExtractThresholds(),
):
self.source_url = self._validate_url(source_url)
self.work_dir = self._validate_work_dir(work_dir)
self.s3_client = self._validate_s3_client(s3_client)
self.file_total = 0
self.size_total = 0

self.thresholds = thresholds

def _validate_url(self, url: ParseResult | str) -> ParseResult:
parsed_url = url if isinstance(url, ParseResult) else urlparse(url)
if parsed_url.scheme not in self.ALLOWED_SCHEMES:
Expand Down Expand Up @@ -204,13 +215,13 @@ def _add_to_stats_and_verify(self, size, count=1):
self.file_total += count

ratio = size / self.size_total
if ratio > self.THRESHOLD_RATIO:
if ratio > self.thresholds.ratio:
raise AssertionError("Encountered suspicious compression ratio in the archive")

if self.size_total > self.THRESHOLD_SIZE:
if self.size_total > self.thresholds.size:
raise AssertionError("The archive is too big")

if self.file_total > self.THRESHOLD_ENTRIES:
if self.file_total > self.thresholds.entries:
raise AssertionError("Too many files in the archive")

def _remove_from_stats(self, size, count=1):
Expand All @@ -236,9 +247,9 @@ def _download(self):
for content in result["Contents"]:
s3_content_count += 1
s3_content_size += content["Size"]
if s3_content_count > self.THRESHOLD_ENTRIES:
if s3_content_count > self.thresholds.entries:
raise AssertionError("Too many objects at %s" % self.source_url)
if s3_content_size > self.THRESHOLD_SIZE:
if s3_content_size > self.thresholds.size:
raise AssertionError(
"Size limit exceeded while downloading from %s" % self.source_url
)
Expand Down