Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@ pip3 install https://github.com/synccomputingcode/spark_log_parser/archive/main.
If you have not already done so, complete the [instructions](https://github.com/synccomputingcode/user_documentation/wiki#accessing-autotuner-input-data) to download the Apache Spark event log.

### Step 1: Parse the log to strip away sensitive information
1. To process a log file, execute the parse.py script in the sync_parser folder, and provide a
log file destination with the -l flag.

1. To process a log file execute the spark-log-parser command with a log file path and a directory in which to store the result like so:
```shell
spark-log-parser -l <log file location> -r <results directory>
spark-log-parser -l <log file location> -r <result directory>
```

The parsed file `parsed-<log file name>` will appear in the results directory.

The parsed file `parsed-<log file name>` will appear in the result directory.

2. Send Sync Computing the parsed log

Expand Down
134 changes: 70 additions & 64 deletions spark_log_parser/parsing_models/validation_event_data.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,70 @@
from .exceptions import LazyEventValidationException
from .exceptions import ParserErrorMessages as MSGS

import logging

logger = logging.getLogger("EventDataValidation")

class EventDataValidation():
"""
Validate the existence of certain Spark Listener Events
"""
def __init__(self, app=None, debug=False):

self.app = app
self.debug = debug # When 'True' disables exception raises for debugging
self.message = ''

def validate(self):
"""
Run the validation methods. If one or more errors exist then log the error, then throw
and exception. Logging is used here so that the problem is still indicated when in debug
mode.
"""
self.validate_job_events()
self.validate_stage_events()

if (len(self.message)>0):
logger.error(self.message)
if not self.debug:
raise LazyEventValidationException(error_message = self.message)

def validate_job_events(self):
"""
Look for missing job events.

4/20/2022: Currently only the JobComplete is detected, because a missing
JobStart event will result in a more urgent exception being thrown in SparkApplication
"""

miss_job_ends = []
for jid, job in self.app.jobs.items():
if not hasattr(job, 'completion_time'):
miss_job_ends.append(jid)

if len(miss_job_ends)>0:
msg = f'{MSGS.MISSING_EVENT_JOB_END}{miss_job_ends}. '
self.message += f'{MSGS.MISSING_EVENT_JOB_END}{miss_job_ends}. '

def validate_stage_events(self):
"""
Look for missing stage events.

4/20/2022: Currently only the StageSubmitted is detected, because a
missing StageCompleted event will result in a more urgent exception being thrown in
SparkApplication
"""
miss_stage_completes = []
for jid, job in self.app.jobs.items():
for sid, stage in job.stages.items():
if not hasattr(stage, 'completion_time'):
miss_stage_completes.append(sid)

if len(miss_stage_completes)>0:
self.message += f'{MSGS.MISSING_EVENT_STAGE_COMPLETE}{miss_stage_completes}. '
import logging

from .exceptions import LazyEventValidationException
from .exceptions import ParserErrorMessages as MSGS

logger = logging.getLogger("EventDataValidation")


class EventDataValidation:
"""
Validate the existence of certain Spark Listener Events
"""

def __init__(self, app=None, debug=False):

self.app = app
self.debug = debug # When 'True' disables exception raises for debugging
self.message = ""

def validate(self):
"""
Run the validation methods. If one or more errors exist then log the error, then throw
and exception. Logging is used here so that the problem is still indicated when in debug
mode.
"""
if not self.app.finish_time:
self.message += (
MSGS.MISSING_EVENT_GENERIC_MESSAGE + "'Application / Stage / SQL Completion'. "
)

self.validate_job_events()
self.validate_stage_events()

if len(self.message) > 0:
logger.error(self.message)
if not self.debug:
raise LazyEventValidationException(error_message=self.message)

def validate_job_events(self):
"""
Look for missing job events.

4/20/2022: Currently only the JobComplete is detected, because a missing
JobStart event will result in a more urgent exception being thrown in SparkApplication
"""

miss_job_ends = []
for jid, job in self.app.jobs.items():
if not hasattr(job, "completion_time"):
miss_job_ends.append(jid)

if len(miss_job_ends) > 0:
self.message += f"{MSGS.MISSING_EVENT_JOB_END}{miss_job_ends}. "

def validate_stage_events(self):
"""
Look for missing stage events.

4/20/2022: Currently only the StageSubmitted is detected, because a
missing StageCompleted event will result in a more urgent exception being thrown in
SparkApplication
"""
miss_stage_completes = []
for jid, job in self.app.jobs.items():
for sid, stage in job.stages.items():
if not hasattr(stage, "completion_time"):
miss_stage_completes.append(sid)

if len(miss_stage_completes) > 0:
self.message += f"{MSGS.MISSING_EVENT_STAGE_COMPLETE}{miss_stage_completes}. "