Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spark_log_parser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Tools for providing Spark event log"""

__version__ = "0.1.1"
__version__ = "0.1.2"
7 changes: 6 additions & 1 deletion spark_log_parser/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import sys
import tempfile
from pathlib import Path
from urllib.parse import unquote

import spark_log_parser

logging.captureWarnings(True)

from spark_log_parser.eventlog import EventLogBuilder # noqa: E402
from spark_log_parser.extractor import Extractor # noqa: E402
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402

logger = logging.getLogger("spark_log_parser")
Expand Down Expand Up @@ -39,7 +41,10 @@ def main():
print("--Processing log file: " + str(args.log_file))

with tempfile.TemporaryDirectory() as work_dir:
event_log = EventLogBuilder(args.log_file.resolve().as_uri(), work_dir).build()

log_path = unquote(args.log_file.resolve().as_uri())
event_log_paths = Extractor(log_path, work_dir).extract()
event_log = EventLogBuilder(event_log_paths, work_dir).build()
app = sparkApplication(eventlog=str(event_log))

if args.log_file.suffixes:
Expand Down
30 changes: 13 additions & 17 deletions spark_log_parser/eventlog.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
import json
import tempfile
from pathlib import Path
from urllib.parse import ParseResult

import pandas as pd

from spark_log_parser.extractor import Extractor, ExtractThresholds


class EventLogBuilder:
def __init__(
self,
source_url: ParseResult | str,
event_log_paths: list[Path] | list[str],
work_dir: Path | str,
s3_client=None,
extract_thresholds=ExtractThresholds(),
):
self.source_url = source_url

self.event_log_paths = self._validate_event_log_paths(event_log_paths)
self.work_dir = self._validate_work_dir(work_dir)
self.s3_client = s3_client
self.extractor = Extractor(
self.source_url, self.work_dir, self.s3_client, extract_thresholds
)

def _validate_event_log_paths(self, event_log_paths: list[Path] | list[str]) -> list[Path]:
return [Path(x) for x in event_log_paths]

def _validate_work_dir(self, work_dir: Path | str) -> Path:
work_dir_path = work_dir if isinstance(work_dir, Path) else Path(work_dir)
Expand All @@ -31,12 +26,11 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path:
return work_dir_path

def build(self) -> Path:
paths = self.extractor.extract()

if not paths:
if not self.event_log_paths:
raise ValueError("No files found")

self.event_log = self._get_event_log(paths)
self.event_log = self._get_event_log(self.event_log_paths)

return self.event_log

Expand Down Expand Up @@ -68,9 +62,11 @@ def _get_event_log(self, paths: list[Path]) -> Path:
return log_files[0]

def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path:
rollover_df = pd.DataFrame(
rollover_dat, columns=["rollover_index", "context_id", "path"]
).sort_values("rollover_index")
rollover_df = (
pd.DataFrame(rollover_dat, columns=["rollover_index", "context_id", "path"])
.sort_values("rollover_index")
.reset_index()
)

if not len(rollover_df.context_id.unique()) == 1:
raise ValueError("Not all rollover log files have the same Spark context ID")
Expand Down
Binary file added tests/logs/log_in_dir/databricks.json.gz
Binary file not shown.
47 changes: 20 additions & 27 deletions tests/test_bad_eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,41 @@
from pathlib import Path
from zipfile import ZipFile

from spark_log_parser.eventlog import EventLogBuilder
from spark_log_parser import eventlog, extractor


class BadEventLog(unittest.TestCase):
def test_multiple_context_ids(self):
event_log = Path("tests", "logs", "bad", "non-unique-context-id.zip").resolve()
def check_value_error(self, event_log_path, msg):

with tempfile.TemporaryDirectory() as temp_dir:
with self.assertRaises(
ValueError, msg="Not all rollover files have the same Spark context ID"
):
EventLogBuilder(event_log.as_uri(), temp_dir).build()
with self.assertRaises(ValueError) as cm:

event_log_paths = extractor.Extractor(event_log_path.as_uri(), temp_dir).extract()
eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice abstraction. I made an error when I coded the first version of this thinking that assertRaises will assert that the value of its "msg" argument matches that of the attribute of the exception. That's not the case: https://docs.python.org/3/library/unittest.html#unittest.TestCase.assertRaises

I should've done,

with self.assertRaises(ValueError) as cm:
                event_log_paths = extractor.Extractor(event_log_path.as_uri(), temp_dir).extract()
                eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
assert str(cm.exception) == msg, "Exception message matches"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Had to fix some of the error messages to match, too.

btw, curious why you went went unittest here instead of using pytest's exception testing capabilities.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall having made a deliberate choice between the 2, but I'm interested in differences that may exist between the 2 or examples with the pytest alternative. I do like that these related tests are grouped in a class, and that class offers assertRaises... felt right

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


assert str(cm.exception) == msg # , "Exception message matches"

def test_multiple_context_ids(self):
event_log = Path("tests", "logs", "bad", "non-unique-context-id.zip").resolve()
self.check_value_error(
event_log, "Not all rollover log files have the same Spark context ID"
)

def test_missing_dbc_event(self):
event_log = Path("tests", "logs", "bad", "missing-dbc-event.zip").resolve()

with tempfile.TemporaryDirectory() as temp_dir:
with self.assertRaises(ValueError, msg="Expected DBC event not found"):
EventLogBuilder(event_log.as_uri(), temp_dir).build()
self.check_value_error(event_log, "No rollover properties found in log file")

def test_duplicate_log_part(self):
event_log = Path("tests", "logs", "bad", "duplicate-part.tgz").resolve()

with tempfile.TemporaryDirectory() as temp_dir:
with self.assertRaises(ValueError, msg="Duplicate rollover file detected"):
EventLogBuilder(event_log.as_uri(), temp_dir).build()
self.check_value_error(event_log, "Duplicate rollover log file detected")

def test_missing_log_part(self):
event_log = Path("tests", "logs", "bad", "missing-part.zip").resolve()

with tempfile.TemporaryDirectory() as temp_dir:
with self.assertRaises(ValueError, msg="Rollover file appears to be missing"):
EventLogBuilder(event_log.as_uri(), temp_dir).build()
self.check_value_error(event_log, "Rollover log file appears to be missing")

def test_missing_first_part(self):
event_log = Path("tests", "logs", "bad", "missing-first-part.zip").resolve()

with tempfile.TemporaryDirectory() as temp_dir:
with self.assertRaises(ValueError, msg="Rollover file appears to be missing"):
EventLogBuilder(event_log.as_uri(), temp_dir).build()
self.check_value_error(event_log, "Rollover log file appears to be missing")

def test_only_non_first_part(self):
with tempfile.TemporaryDirectory() as temp_dir:
Expand All @@ -51,10 +46,8 @@ def test_only_non_first_part(self):
[zinfo for zinfo in zfile.infolist() if not zinfo.is_dir()][0], temp_dir
)

with self.assertRaises(ValueError, msg="Rollover file appears to be missing"):
EventLogBuilder(Path(temp_dir).as_uri(), temp_dir).build()
self.check_value_error(Path(temp_dir), "Rollover log file appears to be missing")

def test_empty_log_dir(self):
with tempfile.TemporaryDirectory() as temp_dir:
with self.assertRaises(ValueError, msg="No log files found"):
EventLogBuilder(Path(temp_dir).as_uri(), temp_dir).build()
self.check_value_error(Path(temp_dir), "No files found")
48 changes: 28 additions & 20 deletions tests/test_eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,32 @@
import zipfile
from pathlib import Path

from spark_log_parser import eventlog
from spark_log_parser import eventlog, extractor


def test_simple_emr_log():
event_log_path = Path("tests", "logs", "emr.zip").resolve()
def get_event(event_log_path):

with tempfile.TemporaryDirectory() as temp_dir:
event_log = eventlog.EventLogBuilder(
source_url=event_log_path.as_uri(), work_dir=temp_dir
).build()
event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract()
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()

with open(event_log) as log_fobj:
event = json.loads(log_fobj.readline())

assert all(key in event for key in ["Event", "Spark Version"]), "All keys are present"
return event


def test_simple_emr_log():
event_log_path = Path("tests", "logs", "emr.zip").resolve()
event = get_event(event_log_path)

assert all(key in event for key in ["Event", "Spark Version"]), "All keys are present"
assert event["Event"] == "SparkListenerLogStart", "Expected first event is present"


def test_simple_databricks_log():
event_log_path = Path("tests", "logs", "databricks.zip").resolve()

with tempfile.TemporaryDirectory() as temp_dir:
event_log = eventlog.EventLogBuilder(event_log_path.as_uri(), temp_dir).build()

with open(event_log) as log_fobj:
event = json.loads(log_fobj.readline())

event = get_event(event_log_path)
assert all(
key in event
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
Expand All @@ -40,12 +38,19 @@ def test_simple_databricks_log():

def test_raw_databricks_log():
event_log_path = Path("tests", "logs", "databricks.json").resolve()
event = get_event(event_log_path)

with tempfile.TemporaryDirectory() as temp_dir:
event_log = eventlog.EventLogBuilder(event_log_path.as_uri(), temp_dir).build()
assert all(
key in event
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
), "All keys are present"

with open(event_log) as log_fobj:
event = json.loads(log_fobj.readline())
assert event["Event"] == "DBCEventLoggingListenerMetadata", "Expected first event is present"


def test_log_in_dir():
event_log_path = Path("tests", "logs", "log_in_dir", "databricks.json.gz").resolve()
event = get_event(event_log_path)

assert all(
key in event
Expand All @@ -71,9 +76,12 @@ def test_databricks_messy_rollover_log_dir(self):
zfile.extractall(temp_dir_path)
self.validate_log(temp_dir_path, 3, 16945)

def validate_log(self, event_log: Path, log_file_total: int, log_entry_total: int):
def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_total: int):
with tempfile.TemporaryDirectory() as temp_dir:
event_log = eventlog.EventLogBuilder(event_log.as_uri(), temp_dir).build()
event_log_paths = extractor.Extractor(
event_log_path.resolve().as_uri(), temp_dir
).extract()
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()

with open(event_log) as log_fobj:
event = json.loads(log_fobj.readline())
Expand Down
8 changes: 5 additions & 3 deletions tests/test_eventlog_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pytest
from botocore.stub import ANY, Stubber

from spark_log_parser import eventlog
from spark_log_parser import eventlog, extractor


@pytest.mark.parametrize(
Expand Down Expand Up @@ -65,7 +65,8 @@ def test_emr_log_from_s3(event_log_url, event_log_file_archive, event_log_s3_dir
chunk = fobj.read(chunk_size)

with tempfile.TemporaryDirectory() as temp_dir, stubber:
event_log = eventlog.EventLogBuilder(event_log_url, temp_dir, s3).build()
event_log_paths = extractor.Extractor(event_log_url, temp_dir, s3).extract()
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()

with open(event_log) as log_fobj:
event = json.loads(log_fobj.readline())
Expand Down Expand Up @@ -134,7 +135,8 @@ def test_databricks_log_from_s3_dir(event_log_url, event_log_file_archive, event
chunk = zobj.read(chunk_size)

with stubber:
event_log = eventlog.EventLogBuilder(event_log_url, temp_dir, s3).build()
event_log_paths = extractor.Extractor(event_log_url, temp_dir, s3).extract()
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()

stubber.assert_no_pending_responses()

Expand Down
28 changes: 15 additions & 13 deletions tests/test_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,30 @@
import tempfile
from pathlib import Path

from spark_log_parser import eventlog
from spark_log_parser import eventlog, extractor
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication


def test_simple_databricks_log():
event_log_path = Path("tests", "logs", "databricks.zip").resolve()
def get_parsed_log(event_log_path):

with tempfile.TemporaryDirectory() as temp_dir:
event_log = eventlog.EventLogBuilder(event_log_path.as_uri(), temp_dir).build()
event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract()
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()

result_path = str(Path(temp_dir, "result"))
sparkApplication(eventlog=str(event_log)).save(result_path)

with open(result_path + ".json") as result_fobj:
parsed = json.load(result_fobj)

return parsed


def test_simple_databricks_log():
event_log_path = Path("tests", "logs", "databricks.zip").resolve()

parsed = get_parsed_log(event_log_path)

assert all(
key in parsed
for key in [
Expand All @@ -39,14 +47,7 @@ def test_simple_databricks_log():
def test_simple_emr_log():
event_log_path = Path("tests", "logs", "emr.zip").resolve()

with tempfile.TemporaryDirectory() as temp_dir:
event_log = eventlog.EventLogBuilder(event_log_path.as_uri(), temp_dir).build()

result_path = str(Path(temp_dir, "result"))
sparkApplication(eventlog=str(event_log)).save(result_path)

with open(str(result_path) + ".json") as result_fobj:
parsed = json.load(result_fobj)
parsed = get_parsed_log(event_log_path)

assert all(
key in parsed
Expand All @@ -70,7 +71,8 @@ def test_emr_missing_sql_events():
event_log_path = Path("tests", "logs", "emr_missing_sql_events.zip").resolve()

with tempfile.TemporaryDirectory() as temp_dir:
event_log = eventlog.EventLogBuilder(event_log_path.as_uri(), temp_dir).build()
event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract()
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
obj = sparkApplication(eventlog=str(event_log))

assert list(obj.sqlData.index.values) == [0, 2, 3, 5, 6, 7, 8]