Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions spark_log_parser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
"""Tools for providing Spark event log"""
__version__ = "0.1.4"

__version__ = "0.1.5"
28 changes: 18 additions & 10 deletions spark_log_parser/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import sys
import tempfile
import json
import shutil
from pathlib import Path
from urllib.parse import unquote

Expand Down Expand Up @@ -40,22 +42,28 @@ def main():
print("\n" + "*" * 12 + " Running the Log Parser for Spark Predictor " + "*" * 12 + "\n")
print("--Processing log file: " + str(args.log_file))

with tempfile.TemporaryDirectory() as work_dir:

event_log_paths = Extractor(
unquote(args.log_file.resolve().as_uri()), work_dir, thresholds=ExtractThresholds(size=20000000000)
).extract()

event_log = EventLogBuilder(event_log_paths, work_dir).build()
app = sparkApplication(eventlog=str(event_log))

if args.log_file.suffixes:
result_path = args.result_dir.joinpath(
"parsed-" + args.log_file.name[: -len("".join(args.log_file.suffixes))]
)
else:
result_path = args.result_dir.joinpath("parsed-" + args.log_file.name)

app.save(str(result_path))
with tempfile.TemporaryDirectory() as work_dir:

event_log_paths = Extractor(
unquote(args.log_file.resolve().as_uri()),
work_dir,
thresholds=ExtractThresholds(size=20000000000),
).extract()

event_log, parsed = EventLogBuilder(event_log_paths, work_dir).build()

if not parsed:
app = sparkApplication(spark_eventlog_path=str(event_log))
app.save(str(result_path))
else:
print("--Input log was already parsed")
shutil.copyfile(event_log, str(result_path) + ".json")

print(f"--Result saved to: {result_path}.json")
51 changes: 33 additions & 18 deletions spark_log_parser/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ def __init__(
event_log_paths: list[Path] | list[str],
work_dir: Path | str,
):

self.event_log_paths = self._validate_event_log_paths(event_log_paths)
self.work_dir = self._validate_work_dir(work_dir)


def _validate_event_log_paths(self, event_log_paths: list[Path] | list[str]) -> list[Path]:
return [Path(x) for x in event_log_paths]

Expand All @@ -26,41 +24,58 @@ def _validate_work_dir(self, work_dir: Path | str) -> Path:

return work_dir_path

def build(self) -> Path:
def build(self) -> tuple[Path, bool]:

if not self.event_log_paths:
raise ValueError("No files found")

self.event_log = self._get_event_log(self.event_log_paths)
self.event_log, self.parsed = self._get_event_log(self.event_log_paths)

return self.event_log
return self.event_log, self.parsed

def _get_event_log(self, paths: list[Path]) -> tuple[Path, bool]:

def _get_event_log(self, paths: list[Path]) -> Path:
log_files = []
rollover_dat = []
parsed = False
for path in paths:
with open(path) as fobj:
try:
try: # Test if it is a raw log
with open(path) as fobj:
line = json.loads(fobj.readline())
if "Event" in line:
log_files.append(path)
if line["Event"] == "DBCEventLoggingListenerMetadata":
rollover_dat.append(
(line["Rollover Number"], line["SparkContext Id"], path)
)
else:
raise ValueError

except ValueError:
try: # Test if it is a parsed log
with open(path) as fobj:
data = json.load(fobj)
if "jobData" in data:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, sparkApplication should be making the determination whether the object is a valid event log or not. It creates the parsed logs after all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle I agree, and in the future I we can make that happen. For now though, you'll see in the sync_backend sister PR that parsed/unparsed judgement has to come before the log has been parsed because that must be known before entering the spark_log_parser through SparkApplicationAdvanced, which inherits from sparkApplication. I know, it's real janky right now, but it's getting better bit by bit. A separation of these classes will happen, and we can make this even cleaner then -- I don't want to get carried away in the one PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, I was thinking it could be a classmethod like,

T = TypeVar('T', bound='sparkApplication')


class sparkApplication:
    @classmethod
    def is_parsed_log(cls: Type[T], eventlog: dict) -> boolean:
        ...

(btw, I learned the type annotations for classmethods here: https://stackoverflow.com/questions/44640479/type-annotation-for-classmethod-returning-instance)
which allows us to call the method before we create an instance of the class. That way we keep the parsed log logic in the same module.

I totally agree that we are making good steady progress. It's a balance between improving the structure, not losing any of the knowledge in the code, and delivering features n' fixes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...or staticmethod if we don't need that class variable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh I see what you mean, that's nice. I'll keep this in mind for the next batch of log_parser work.

log_files.append(path)
parsed = True
except ValueError:
continue
if "Event" in line:
log_files.append(path)
if line["Event"] == "DBCEventLoggingListenerMetadata":
rollover_dat.append(
(line["Rollover Number"], line["SparkContext Id"], path)
)

if len(log_files) > 1 and parsed:
raise ValueError("A parsed log file was submitted with other log files")

if rollover_dat:
if len(log_files) > len(rollover_dat):
raise ValueError("No rollover properties found in log file")
raise ValueError(
"Rollover logs were detected, but not all files had rollover properties"
)

return self._concat(rollover_dat)
return self._concat(rollover_dat), False

if len(log_files) > 1:
raise ValueError("No rollover properties found in log file")
raise ValueError("Multiple files detected without log rollover properties")

return log_files[0]
return log_files[0], parsed

def _concat(self, rollover_dat: list[tuple[str, str, str]]) -> Path:
rollover_df = (
Expand Down
48 changes: 23 additions & 25 deletions spark_log_parser/parsing_models/application_model_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,38 @@
class sparkApplication:
def __init__(
self,
objfile=None, # Previously saved object. This is the fastest and best option
appobj=None, # application_model object
eventlog=None, # spark eventlog path,
spark_eventlog_parsed_path=None,
spark_eventlog_path=None,
stdout=None,
debug=False,
):

self.eventlog = eventlog
self.spark_eventlog_path = spark_eventlog_path
self.spark_eventlog_parsed_path = spark_eventlog_parsed_path
self.existsSQL = False
self.existsExecutors = False
# self.sparkMetadata = {}
self.metadata = {}
self.stdout = stdout
self.debug = debug

if objfile is not None: # Load a previously saved sparkApplication Model
self.load(filepath=objfile)
if self.spark_eventlog_parsed_path is not None:
self.load(filepath=self.spark_eventlog_parsed_path)

if (appobj is not None) or (eventlog is not None): # Load an application_model or eventlog
elif self.spark_eventlog_path is not None:

if eventlog is not None:
t0 = time.time()
if "s3://" in eventlog:
path = eventlog.replace("s3://", "").split("/")
bucket = path[0]
path = "/".join(path[1:])
else:
path = eventlog
bucket = None

appobj = ApplicationModel(
eventlogpath=path, bucket=bucket, stdoutpath=stdout, debug=debug
)
logging.info("Loaded object from spark eventlog [%.2fs]" % (time.time() - t0))
t0 = time.time()
if "s3://" in self.spark_eventlog_path:
path = self.spark_eventlog_path.replace("s3://", "").split("/")
bucket = path[0]
path = "/".join(path[1:])
else:
logging.info("Loaded object from ApplicationModel object")
path = self.spark_eventlog_path
bucket = None

appobj = ApplicationModel(
eventlogpath=path, bucket=bucket, stdoutpath=stdout, debug=debug
)
logging.info("Loaded object from spark eventlog [%.2fs]" % (time.time() - t0))

self.validate_app(appobj, self.debug)

Expand Down Expand Up @@ -651,9 +647,11 @@ def save(self, filepath=None, compress=False):

def save_to_local(self, saveDat, filepath, compress):
if filepath is None:
if self.eventlog is None:
if self.spark_eventlog_path is None:
raise Exception('No input eventlog found. Must specify "filepath".')
inputFile = os.path.basename(os.path.normpath(self.eventlog)).replace(".gz", "")
inputFile = os.path.basename(os.path.normpath(self.spark_eventlog_path)).replace(
".gz", ""
)
filepath = inputFile + "-sync"

if compress is False:
Expand Down
Binary file added tests/logs/bad/mixed_parsed.zip
Binary file not shown.
Binary file added tests/logs/similarity_parsed.json.gz
Binary file not shown.
8 changes: 7 additions & 1 deletion tests/test_bad_eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def test_multiple_context_ids(self):

def test_missing_dbc_event(self):
event_log = Path("tests", "logs", "bad", "missing-dbc-event.zip").resolve()
self.check_value_error(event_log, "No rollover properties found in log file")
self.check_value_error(
event_log, "Rollover logs were detected, but not all files had rollover properties"
)

def test_duplicate_log_part(self):
event_log = Path("tests", "logs", "bad", "duplicate-part.tgz").resolve()
Expand All @@ -39,6 +41,10 @@ def test_missing_first_part(self):
event_log = Path("tests", "logs", "bad", "missing-first-part.zip").resolve()
self.check_value_error(event_log, "Rollover log file appears to be missing")

def test_mixed_parsed(self):
event_log = Path("tests", "logs", "bad", "mixed_parsed.zip").resolve()
self.check_value_error(event_log, "A parsed log file was submitted with other log files")

def test_only_non_first_part(self):
with tempfile.TemporaryDirectory() as temp_dir:
with ZipFile(Path("tests", "logs", "bad", "missing-first-part.zip")) as zfile:
Expand Down
26 changes: 18 additions & 8 deletions tests/test_eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,54 +11,58 @@ def get_event(event_log_path):

with tempfile.TemporaryDirectory() as temp_dir:
event_log_paths = extractor.Extractor(event_log_path.resolve().as_uri(), temp_dir).extract()
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()

with open(event_log) as log_fobj:
event = json.loads(log_fobj.readline())

return event
return event, parsed


def test_simple_emr_log():
event_log_path = Path("tests", "logs", "emr.zip").resolve()
event = get_event(event_log_path)
event, parsed = get_event(event_log_path)

assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present"
assert event["Event"] == "SparkListenerLogStart", "First event is not as expected"
assert not parsed


def test_simple_databricks_log():
event_log_path = Path("tests", "logs", "databricks.zip").resolve()
event = get_event(event_log_path)
event, parsed = get_event(event_log_path)

assert all(
key in event
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
), "Not all keys are present"
assert not parsed


def test_raw_databricks_log():
event_log_path = Path("tests", "logs", "databricks.json").resolve()
event = get_event(event_log_path)
event, parsed = get_event(event_log_path)

assert all(
key in event
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
), "Not all keys are present"

assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected"

assert not parsed


def test_log_in_dir():
event_log_path = Path("tests", "logs", "log_in_dir", "databricks.json.gz").resolve()
event = get_event(event_log_path)
event, parsed = get_event(event_log_path)

assert all(
key in event
for key in ["Event", "Spark Version", "Timestamp", "Rollover Number", "SparkContext Id"]
), "Not all keys are present"

assert event["Event"] == "DBCEventLoggingListenerMetadata", "First event is not as expected"
assert not parsed


class RolloverLog(unittest.TestCase):
Expand All @@ -82,7 +86,7 @@ def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_tota
event_log_paths = extractor.Extractor(
event_log_path.resolve().as_uri(), temp_dir
).extract()
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()

with open(event_log) as log_fobj:
event = json.loads(log_fobj.readline())
Expand All @@ -109,3 +113,9 @@ def validate_log(self, event_log_path: Path, log_file_total: int, log_entry_tota

assert rollover_count == log_file_total, "Not all log parts are present"
assert i + 1 == log_entry_total, "Not all events are present"
assert not parsed


if __name__ == "__main__":

test_simple_emr_log()
5 changes: 3 additions & 2 deletions tests/test_eventlog_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,14 @@ def test_emr_log_from_s3(event_log_url, event_log_file_archive, event_log_s3_dir

with tempfile.TemporaryDirectory() as temp_dir, stubber:
event_log_paths = extractor.Extractor(event_log_url, temp_dir, s3).extract()
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
event_log, parsed = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()

with open(event_log) as log_fobj:
event = json.loads(log_fobj.readline())

assert all(key in event for key in ["Event", "Spark Version"]), "Not all keys are present"
assert event["Event"] == "SparkListenerLogStart", "Expected first event is not present"
assert not parsed


@pytest.mark.parametrize(
Expand Down Expand Up @@ -135,7 +136,7 @@ def test_databricks_log_from_s3_dir(event_log_url, event_log_file_archive, event

with stubber:
event_log_paths = extractor.Extractor(event_log_url, temp_dir, s3).extract()
event_log = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()
event_log, _ = eventlog.EventLogBuilder(event_log_paths, temp_dir).build()

stubber.assert_no_pending_responses()

Expand Down
Loading