From 25c23fc01318dc00e9334bac4982b7a0df9f57b8 Mon Sep 17 00:00:00 2001 From: Scott Romney Date: Mon, 15 Aug 2022 14:01:00 -0700 Subject: [PATCH 1/4] Added console_script, updated version and made extraction thresholds configuratble --- README.md | 2 +- parse.sh | 11 +++++++++-- pyproject.toml | 3 +++ spark_log_parser/__init__.py | 2 +- spark_log_parser/__main__.py | 5 +++++ spark_log_parser/eventlog.py | 14 +++++++++++--- spark_log_parser/extractor.py | 31 +++++++++++++++++++++---------- 7 files changed, 51 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 5626274..2552991 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ If you have not already done so, complete the [instructions](https://github.com/ log file destination with the -l flag. ```shell - python3 -m spark_log_parser -l -r + spark_log_parser -l -r ``` The parsed file `parsed-` will appear in the results directory. diff --git a/parse.sh b/parse.sh index 9355963..61dd9b9 100755 --- a/parse.sh +++ b/parse.sh @@ -4,14 +4,21 @@ HOME="$( cd -- "$(dirname "$0")" &>/dev/null; pwd -P )" function print_usage() { cat <<-EOD - Usage: $0 -l [-r ] + Usage: + $0 -l [-r ] + $0 --version Options: -l path to event log file -r directory in which to save parsed logs + --version print version information and exit -h show this helpful message and exit EOD } +if [[ $1 == "--version" ]]; then + exec spark_log_parser --version +fi + results_dir="$HOME/results" while getopts l:r:h name; do @@ -39,4 +46,4 @@ if [[ -z $event_log ]]; then exit 1 fi -exec python3 -m spark_log_parser -l "$event_log" -r "$results_dir" +exec spark_log_parser -l "$event_log" -r "$results_dir" diff --git a/pyproject.toml b/pyproject.toml index d05b7f8..bcfc485 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,9 @@ test = [ "flake8==4.0.1", ] +[project.scripts] +parse = "spark_log_parser:main" + [project.urls] Home = "https://github.com/synccomputingcode/spark_log_parser" diff --git a/spark_log_parser/__init__.py b/spark_log_parser/__init__.py index f84f915..bd390b2 100644 --- a/spark_log_parser/__init__.py +++ b/spark_log_parser/__init__.py @@ -1,3 +1,3 @@ """Tools for providing Spark event log""" -__version__ = "0.0.1" +__version__ = "0.0.3" diff --git a/spark_log_parser/__main__.py b/spark_log_parser/__main__.py index 9aaf01f..bbf48d1 100644 --- a/spark_log_parser/__main__.py +++ b/spark_log_parser/__main__.py @@ -4,6 +4,8 @@ import tempfile from pathlib import Path +import spark_log_parser + logging.captureWarnings(True) from spark_log_parser.eventlog import EventLogBuilder # noqa: E402 @@ -23,6 +25,9 @@ type=Path, help="path to directory in which to save the parsed log", ) +parser.add_argument( + "--version", action="version", version="%(prog)s " + spark_log_parser.__version__ +) args = parser.parse_args() if not args.result_dir.is_dir(): diff --git a/spark_log_parser/eventlog.py b/spark_log_parser/eventlog.py index 8a03951..950266d 100644 --- a/spark_log_parser/eventlog.py +++ b/spark_log_parser/eventlog.py @@ -5,15 +5,23 @@ import pandas as pd -from spark_log_parser.extractor import Extractor +from spark_log_parser.extractor import Extractor, ExtractThresholds class EventLogBuilder: - def __init__(self, source_url: ParseResult | str, work_dir: Path | str, s3_client=None): + def __init__( + self, + source_url: ParseResult | str, + work_dir: Path | str, + s3_client=None, + extract_thresholds=ExtractThresholds(), + ): self.source_url = source_url self.work_dir = self._validate_work_dir(work_dir) self.s3_client = s3_client - self.extractor = Extractor(self.source_url, self.work_dir, self.s3_client) + self.extractor = Extractor( + self.source_url, self.work_dir, self.s3_client, extract_thresholds + ) def _validate_work_dir(self, work_dir: Path | str) -> Path: work_dir_path = work_dir if isinstance(work_dir, Path) else Path(work_dir) diff --git a/spark_log_parser/extractor.py b/spark_log_parser/extractor.py index afce1a7..d1792e7 100644 --- a/spark_log_parser/extractor.py +++ b/spark_log_parser/extractor.py @@ -8,6 +8,13 @@ from urllib.parse import ParseResult, urlparse import requests +from pydantic import BaseModel + + +class ExtractThresholds(BaseModel): + entries = 100 + size = 5000000000 + ratio = 100 class Extractor: @@ -18,17 +25,21 @@ class Extractor: ALLOWED_SCHEMES = {"https", "s3", "file"} FILE_SKIP_PATTERNS = [".DS_Store".lower(), "__MACOSX".lower(), "/."] - THRESHOLD_ENTRIES = 100 - THRESHOLD_SIZE = 5000000000 - THRESHOLD_RATIO = 100 - - def __init__(self, source_url: ParseResult | str, work_dir: Path | str, s3_client=None): + def __init__( + self, + source_url: ParseResult | str, + work_dir: Path | str, + s3_client=None, + thresholds=ExtractThresholds(), + ): self.source_url = self._validate_url(source_url) self.work_dir = self._validate_work_dir(work_dir) self.s3_client = self._validate_s3_client(s3_client) self.file_total = 0 self.size_total = 0 + self.thresholds = thresholds + def _validate_url(self, url: ParseResult | str) -> ParseResult: parsed_url = url if isinstance(url, ParseResult) else urlparse(url) if parsed_url.scheme not in self.ALLOWED_SCHEMES: @@ -204,13 +215,13 @@ def _add_to_stats_and_verify(self, size, count=1): self.file_total += count ratio = size / self.size_total - if ratio > self.THRESHOLD_RATIO: + if ratio > self.thresholds.ratio: raise AssertionError("Encountered suspicious compression ratio in the archive") - if self.size_total > self.THRESHOLD_SIZE: + if self.size_total > self.thresholds.size: raise AssertionError("The archive is too big") - if self.file_total > self.THRESHOLD_ENTRIES: + if self.file_total > self.thresholds.entries: raise AssertionError("Too many files in the archive") def _remove_from_stats(self, size, count=1): @@ -236,9 +247,9 @@ def _download(self): for content in result["Contents"]: s3_content_count += 1 s3_content_size += content["Size"] - if s3_content_count > self.THRESHOLD_ENTRIES: + if s3_content_count > self.thresholds.entries: raise AssertionError("Too many objects at %s" % self.source_url) - if s3_content_size > self.THRESHOLD_SIZE: + if s3_content_size > self.thresholds.size: raise AssertionError( "Size limit exceeded while downloading from %s" % self.source_url ) From e1787e09ff594a98d268e5853623eeb1f88dee13 Mon Sep 17 00:00:00 2001 From: Scott Romney Date: Mon, 15 Aug 2022 17:00:29 -0700 Subject: [PATCH 2/4] Update script and move main to the cli module --- parse.sh | 9 +++--- pyproject.toml | 2 +- spark_log_parser/__init__.py | 2 +- spark_log_parser/__main__.py | 55 ++---------------------------------- spark_log_parser/cli.py | 54 +++++++++++++++++++++++++++++++++++ 5 files changed, 62 insertions(+), 60 deletions(-) create mode 100644 spark_log_parser/cli.py diff --git a/parse.sh b/parse.sh index 61dd9b9..7c1c526 100755 --- a/parse.sh +++ b/parse.sh @@ -1,6 +1,7 @@ #!/bin/bash HOME="$( cd -- "$(dirname "$0")" &>/dev/null; pwd -P )" +results_dir="$HOME/results" function print_usage() { cat <<-EOD @@ -9,18 +10,16 @@ function print_usage() { $0 --version Options: -l path to event log file - -r directory in which to save parsed logs + -r directory in which to save parsed logs (default $results_dir) --version print version information and exit -h show this helpful message and exit EOD } if [[ $1 == "--version" ]]; then - exec spark_log_parser --version + exec spark-log-parser --version fi -results_dir="$HOME/results" - while getopts l:r:h name; do case $name in l) @@ -46,4 +45,4 @@ if [[ -z $event_log ]]; then exit 1 fi -exec spark_log_parser -l "$event_log" -r "$results_dir" +exec spark-log-parser -l "$event_log" -r "$results_dir" diff --git a/pyproject.toml b/pyproject.toml index bcfc485..017f7f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ test = [ ] [project.scripts] -parse = "spark_log_parser:main" +spark-log-parser = "spark_log_parser.cli:main" [project.urls] Home = "https://github.com/synccomputingcode/spark_log_parser" diff --git a/spark_log_parser/__init__.py b/spark_log_parser/__init__.py index bd390b2..72dd153 100644 --- a/spark_log_parser/__init__.py +++ b/spark_log_parser/__init__.py @@ -1,3 +1,3 @@ """Tools for providing Spark event log""" -__version__ = "0.0.3" +__version__ = "0.1.0" diff --git a/spark_log_parser/__main__.py b/spark_log_parser/__main__.py index bbf48d1..1774d6d 100644 --- a/spark_log_parser/__main__.py +++ b/spark_log_parser/__main__.py @@ -1,54 +1,3 @@ -import argparse -import logging -import sys -import tempfile -from pathlib import Path +from spark_log_parser.cli import main -import spark_log_parser - -logging.captureWarnings(True) - -from spark_log_parser.eventlog import EventLogBuilder # noqa: E402 -from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402 - -logger = logging.getLogger("spark_log_parser") - - -parser = argparse.ArgumentParser("spark_log_parser") -parser.add_argument( - "-l", "--log-file", required=True, type=Path, help="path to event log file or directory" -) -parser.add_argument( - "-r", - "--result-dir", - required=True, - type=Path, - help="path to directory in which to save the parsed log", -) -parser.add_argument( - "--version", action="version", version="%(prog)s " + spark_log_parser.__version__ -) -args = parser.parse_args() - -if not args.result_dir.is_dir(): - logger.error("%s is not a directory", args.result_dir) - sys.exit(1) - - -print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n") -print("--Processing log file: " + str(args.log_file)) - -with tempfile.TemporaryDirectory() as work_dir: - event_log = EventLogBuilder(args.log_file.resolve().as_uri(), work_dir).build() - app = sparkApplication(eventlog=str(event_log)) - -if args.log_file.suffixes: - result_path = args.result_dir.joinpath( - "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))] - ) -else: - result_path = args.result_dir.joinpath("parsed-" + args.log_file.name) - -app.save(str(result_path)) - -print(f"--Result saved to: {result_path}.json") +main() diff --git a/spark_log_parser/cli.py b/spark_log_parser/cli.py new file mode 100644 index 0000000..670bb1f --- /dev/null +++ b/spark_log_parser/cli.py @@ -0,0 +1,54 @@ +import argparse +import logging +import sys +import tempfile +from pathlib import Path + +import spark_log_parser + +logging.captureWarnings(True) + +from spark_log_parser.eventlog import EventLogBuilder # noqa: E402 +from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402 + +logger = logging.getLogger("spark_log_parser") + + +def main(): + parser = argparse.ArgumentParser("spark-log-parser") + parser.add_argument( + "-l", "--log-file", required=True, type=Path, help="path to event log file or directory" + ) + parser.add_argument( + "-r", + "--result-dir", + required=True, + type=Path, + help="path to directory in which to save the parsed log", + ) + parser.add_argument( + "--version", action="version", version="%(prog)s " + spark_log_parser.__version__ + ) + args = parser.parse_args() + + if not args.result_dir.is_dir(): + logger.error("%s is not a directory", args.result_dir) + sys.exit(1) + + print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n") + print("--Processing log file: " + str(args.log_file)) + + with tempfile.TemporaryDirectory() as work_dir: + event_log = EventLogBuilder(args.log_file.resolve().as_uri(), work_dir).build() + app = sparkApplication(eventlog=str(event_log)) + + if args.log_file.suffixes: + result_path = args.result_dir.joinpath( + "parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))] + ) + else: + result_path = args.result_dir.joinpath("parsed-" + args.log_file.name) + + app.save(str(result_path)) + + print(f"--Result saved to: {result_path}.json") From c7dc1d3f60974da1e9af9c50a15c8e5b942e50c8 Mon Sep 17 00:00:00 2001 From: Scott Romney Date: Mon, 15 Aug 2022 17:01:18 -0700 Subject: [PATCH 3/4] Remove the Bash script --- parse.sh | 48 ------------------------------------------------ 1 file changed, 48 deletions(-) delete mode 100755 parse.sh diff --git a/parse.sh b/parse.sh deleted file mode 100755 index 7c1c526..0000000 --- a/parse.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/bin/bash - -HOME="$( cd -- "$(dirname "$0")" &>/dev/null; pwd -P )" -results_dir="$HOME/results" - -function print_usage() { - cat <<-EOD - Usage: - $0 -l [-r ] - $0 --version - Options: - -l path to event log file - -r directory in which to save parsed logs (default $results_dir) - --version print version information and exit - -h show this helpful message and exit - EOD -} - -if [[ $1 == "--version" ]]; then - exec spark-log-parser --version -fi - -while getopts l:r:h name; do - case $name in - l) - event_log="$OPTARG" - ;; - r) - results_dir="$OPTARG" - ;; - h) - print_usage - exit - ;; - *) - >&2 print_usage - exit 1 - ;; - esac -done - -if [[ -z $event_log ]]; then - >&2 echo "event log file path is required" - >&2 print_usage - exit 1 -fi - -exec spark-log-parser -l "$event_log" -r "$results_dir" From b277e4e5b583e211d1ea95e5abaeb8c3016bc8f9 Mon Sep 17 00:00:00 2001 From: Scott Romney Date: Mon, 15 Aug 2022 17:02:15 -0700 Subject: [PATCH 4/4] Update README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2552991..0d67928 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ If you have not already done so, complete the [instructions](https://github.com/ log file destination with the -l flag. ```shell - spark_log_parser -l -r + spark-log-parser -l -r ``` The parsed file `parsed-` will appear in the results directory.