Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spark_log_parser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Tools for providing Spark event log"""
__version__ = "0.1.3"

__version__ = "0.1.2"
8 changes: 5 additions & 3 deletions spark_log_parser/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
logging.captureWarnings(True)

from spark_log_parser.eventlog import EventLogBuilder # noqa: E402
from spark_log_parser.extractor import Extractor # noqa: E402
from spark_log_parser.extractor import Extractor, ExtractThresholds # noqa: E402
from spark_log_parser.parsing_models.application_model_v2 import sparkApplication # noqa: E402

logger = logging.getLogger("spark_log_parser")
Expand Down Expand Up @@ -42,8 +42,10 @@ def main():

with tempfile.TemporaryDirectory() as work_dir:

log_path = unquote(args.log_file.resolve().as_uri())
event_log_paths = Extractor(log_path, work_dir).extract()
event_log_paths = Extractor(
unquote(args.log_file.resolve().as_uri()), work_dir, thresholds=ExtractThresholds(size=20000000000)
).extract()

event_log = EventLogBuilder(event_log_paths, work_dir).build()
app = sparkApplication(eventlog=str(event_log))

Expand Down
1 change: 1 addition & 0 deletions spark_log_parser/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(
self.event_log_paths = self._validate_event_log_paths(event_log_paths)
self.work_dir = self._validate_work_dir(work_dir)


def _validate_event_log_paths(self, event_log_paths: list[Path] | list[str]) -> list[Path]:
return [Path(x) for x in event_log_paths]

Expand Down
6 changes: 6 additions & 0 deletions spark_log_parser/parsing_models/application_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ def __init__(self, eventlogpath, bucket=None, stdoutpath=None, debug=False): #
missing_event=f"Job Start for Stage {stage_id}"
)

if "Submission Time" not in json_data["Stage Info"]:
# PROD-426 Submission Time key may be missing from stages that
# don't get submitted. There is usually a StageCompleted event
# shortly after.
continue

for job_id in self.jobs_for_stage[stage_id]:
self.jobs[job_id].stages[stage_id].submission_time = (
json_data["Stage Info"]["Submission Time"] / 1000
Expand Down
23 changes: 12 additions & 11 deletions tests/test_eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def test_simple_emr_log():
event_log_path = Path("tests", "logs", "emr.zip").resolve()
event = get_event(event_log_path)

assert all(key in event for key in ["Event", "Spark Version"]), "All keys are present"
assert event["Event"] == "SparkListenerLogStart", "Expected first event is present"
assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present"
assert event["Event"] == "SparkListenerLogStart", "First event is not as expected"


def test_simple_databricks_log():
Expand All @@ -33,7 +33,7 @@ def test_simple_databricks_log():
assert all(
key in event
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
), "All keys are present"
), "Not all keys are present"


def test_raw_databricks_log():
Expand All @@ -43,9 +43,10 @@ def test_raw_databricks_log():
assert all(
key in event
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
), "All keys are present"
), "Not all keys are present"

assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected"

assert event["Event"] == "DBCEventLoggingListenerMetadata", "Expected first event is present"


def test_log_in_dir():
Expand All @@ -55,9 +56,9 @@ def test_log_in_dir():
assert all(
key in event
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
), "All keys are present"
), "Not all keys are present"

assert event["Event"] == "DBCEventLoggingListenerMetadata", "Expected first event is present"
assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected"


class RolloverLog(unittest.TestCase):
Expand Down Expand Up @@ -100,11 +101,11 @@ def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_tota
"Rollover Number",
"SparkContext Id",
]
), "All keys are present"
), "Not all keys are present"
assert (
rollover_count == event["Rollover Number"]
), "Contiguous monotonically increasing IDs"
), "Rollover IDs are not contiguous and monotonically increasing"
rollover_count += 1

assert rollover_count == log_file_total, "All log parts are present"
assert i + 1 == log_entry_total, "All events are present"
assert rollover_count == log_file_total, "Not all log parts are present"
assert i + 1 == log_entry_total, "Not all events are present"
13 changes: 6 additions & 7 deletions tests/test_eventlog_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ def test_emr_log_from_s3(event_log_url, event_log_file_archive, event_log_s3_dir
with open(event_log) as log_fobj:
event = json.loads(log_fobj.readline())

assert all(key in event for key in ["Event", "Spark Version"]), "All keys are present"

assert event["Event"] == "SparkListenerLogStart", "Expected first event is present"
assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present"
assert event["Event"] == "SparkListenerLogStart", "Expected first event is not present"


@pytest.mark.parametrize(
Expand Down Expand Up @@ -158,13 +157,13 @@ def test_databricks_log_from_s3_dir(event_log_url, event_log_file_archive, event
"Rollover Number",
"SparkContext Id",
]
), "All keys are present"
), "Not all keys are present"
assert (
rollover_count == event["Rollover Number"]
), "Contiguous monotonically increasing IDs"
), "Rollover IDs are not contiguous and monotonically increasing"
rollover_count += 1
except Exception as exc:
raise ValueError("Problem with line %d: %s" % (i, event_str), exc)

assert rollover_count == 3, "All log parts are present"
assert i + 1 == 16945, "All events are present"
assert rollover_count == 3, "Not all log parts are present"
assert i + 1 == 16945, "Not all events are present"
4 changes: 2 additions & 2 deletions tests/test_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_simple_databricks_log():
"stageData",
"taskData",
]
), "All keys are present"
), "Not all keys are present"

assert (
parsed["metadata"]["application_info"]["name"] == "Databricks Shell"
Expand All @@ -60,7 +60,7 @@ def test_simple_emr_log():
"stageData",
"taskData",
]
), "All keys are present"
), "Not all keys are present"

assert (
parsed["metadata"]["application_info"]["name"] == "Text Similarity"
Expand Down