Skip to content

Commit

Permalink
(#6): adding logs parser for Pegasus 5.0 (yaml)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelfsilva committed Dec 20, 2020
1 parent 723fb3c commit b0b84f0
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 44 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pygraphviz>=1.5
python-dateutil>=2.8.1
requests>=2.24.0
scipy>=1.5.2
setuptools~=49.3.1
setuptools>=49.3.1
pyyaml>=5.3.1
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 The WorkflowHub Team.
# Copyright (c) 2020-2021 The WorkflowHub Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -36,13 +36,13 @@
'python-dateutil',
'requests',
'scipy',
'setuptools'
'setuptools',
'pyyaml'
],
classifiers=[
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Operating System :: OS Independent',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
Expand All @@ -54,5 +54,5 @@
'Topic :: Documentation :: Sphinx',
'Topic :: System :: Distributed Computing'
],
python_requires='>=3.5',
python_requires='>=3.6',
)
206 changes: 167 additions & 39 deletions workflowhub/trace/logs/pegasus.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import fnmatch
import json
import os
import yaml
import xml.etree.ElementTree

from datetime import datetime
Expand All @@ -30,6 +31,8 @@ class PegasusLogsParser(LogsParser):
:param submit_dir: Pegasus submit directory.
:type submit_dir: str
:param legacy: Whether the submit directory is from a Pegasus 4.x version.
:type legacy: bool
:param description: Workflow trace description.
:type description: str
:param ignore_auxiliary: Ignore auxiliary jobs.
Expand All @@ -41,6 +44,7 @@ class PegasusLogsParser(LogsParser):
def __init__(self, submit_dir: str,
description: Optional[str] = None,
ignore_auxiliary: Optional[bool] = True,
legacy: Optional[bool] = False,
logger: Optional[Logger] = None) -> None:
"""Create an object of the pegasus log parser."""
super().__init__(description, logger)
Expand All @@ -53,12 +57,12 @@ def __init__(self, submit_dir: str,
self.logger.warning('Ignoring Pegasus auxiliary jobs.')

self.submit_dir = submit_dir
self.legacy = legacy
self.ignore_auxiliary = ignore_auxiliary
self.workflow = None
self.workflow_name = None
self.schema_version = None
self.wms_name = 'Pegasus'
self.wms_version = None
self.wms_url = 'https://pegasus.isi.edu'
self.executed_at = None
self.makespan = None
Expand All @@ -74,48 +78,96 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
:return: A workflow trace object.
:rtype: Workflow
"""
# parse braindump.txt file
self._parse_braindump()
self.workflow_name = workflow_name

if self.legacy:
# parse braindump file
self._parse_braindump()
# parse DAX file
self._parse_dax()

else:
# parse workflow YAML
self._parse_workflow()

# create base workflow trace object
self.workflow = Workflow(name=workflow_name if workflow_name else self.workflow_name,
description=self.description,
wms_name=self.wms_name,
wms_version=self.wms_version,
wms_url=self.wms_url,
executed_at=self.executed_at)
# parse DAX file
self._parse_dax()
# parse DAG file
self._parse_dag()

return self.workflow

def _parse_braindump(self):
"""Parse the Pegasus braindump.txt file"""
braindump_file = self.submit_dir + '/braindump.txt'
braindump_file = '{}/braindump.txt'.format(self.submit_dir)

if not os.path.exists(braindump_file):
raise OSError('Unable to find braindump.txt file in:\n\t' + self.submit_dir)
raise OSError('Unable to find braindump file: {}'.format(braindump_file))

with open(braindump_file) as f:
for line in f:
if line.startswith('planner_version'):
self.wms_version = line.split()[1]
wms_version = line.split()[1]
elif line.startswith('pegasus_wf_name'):
self.trace_name = line.split()[1]
elif line.startswith('timestamp'):
self.executed_at = line.split()[1]
executed_at = line.split()[1]

# sanity checks
if not self.wms_version:
if not wms_version:
self.logger.warning('Unable to determine pegasus version.')
if not self.trace_name:
self.logger.warning(
'Unable to determine trace name from "pegasus_wf_name" (braindump.txt).')
if not self.executed_at:
self.logger.warning(
'Unable to determine execution time from "timestamp" (braindump.txt).')
self.logger.warning('Unable to determine trace name from "pegasus_wf_name".')
if not executed_at:
self.logger.warning('Unable to determine execution time from "timestamp".')

# create base workflow trace object
self.workflow = Workflow(name=self.workflow_name,
description=self.description,
wms_name=self.wms_name,
wms_version=wms_version,
wms_url=self.wms_url,
executed_at=executed_at)

def _parse_workflow(self):
"""Parse the Workflow file."""
workflow_file = '{}/workflow.yml'.format(self.submit_dir)

if not os.path.exists(workflow_file):
raise OSError('Unable to find workflow file: {}'.format(workflow_file))

with open(workflow_file) as f:
data = yaml.load(f, Loader=yaml.SafeLoader)
self.logger.info('Processing Pegasus workflow file: {}'.format(os.path.basename(workflow_file)))

# create base workflow trace object
self.workflow = Workflow(name=self.workflow_name,
description=self.description,
wms_name=self.wms_name,
wms_version=data['pegasus'],
wms_url=self.wms_url,
executed_at=data['x-pegasus']['createdOn'])

for j in data['jobs']:
task_name = '{}_{}'.format(j['name'], j['id'])

list_files = [File(
name=f['lfn'],
size=0,
link=FileLink(f['type']),
logger=self.logger
) for f in j['uses']]

self.workflow.add_node(
task_name,
task=Task(
name=task_name,
task_type=TaskType.COMPUTE,
runtime=0,
args=j['arguments'],
cores=0,
files=list_files,
logger=self.logger
)
)

def _parse_dax(self):
"""Parse the DAX file."""
Expand Down Expand Up @@ -304,7 +356,7 @@ def _parse_job_output(self, task):
:param task: Task object.
:type task: Task
"""
output_list = self._fetch_all_files("out.*", task.name)
output_list = self._fetch_all_files('out.*', task.name)
if len(output_list) == 0:
self.logger.warning('Job has no kickstart record. Skipping it.')
if task.name.lower().startswith(('stage_', 'create_dir', 'cleanup', 'clean_up', 'register_')):
Expand All @@ -315,6 +367,97 @@ def _parse_job_output(self, task):
self.logger.debug('Job "{}" has multiple runs. Parsing last attempt.'.format(task.name))
output_file = output_list[-1]

# setting task type if transfer
if task.name.lower().startswith(('stage_in_', 'stage_out')):
task.type = TaskType.TRANSFER

# parsing job output file
self.logger.debug('Parsing Job output file: {}'.format(output_file))
if self.legacy:
self._parse_job_output_legacy(task, output_file)
else:
self._parse_job_output_latest(task, output_file)

# parsing meta file
self._parse_meta_file(task.name)

# parsing .sub file to get job priorities
sub_list = self._fetch_all_files("sub", task.name)
if not sub_list:
self.logger.warning('Job {} has no .sub record. Skipping it.'.format(task.name))
else:
with open(sub_list[0]) as f:
for line in f:
if line.startswith('priority'):
task.priority = int(line.split()[2])

def _parse_job_output_latest(self, task, output_file):
"""
Parse the kickstart job output file in YAML format (e.g., .out.000).
:param task: Task object.
:type task: Task
:param output_file: Output file name.
:type output_file: str
"""
tmp_file = '.pegasus-parser-tmp'
with open(tmp_file, 'w') as t:
with open(output_file) as f:
for line in f:
if not line.startswith('---------------'):
t.write(line)

with open(tmp_file) as f:
data = yaml.load(f, Loader=yaml.FullLoader)[0]

if data['transformation'].startswith('pegasus:') or task.name.lower().startswith('chmod_'):
task.type = TaskType.AUXILIARY

mainjob = data['mainjob']
task.runtime = float(mainjob['duration'])
task.memory = int(mainjob['usage']['maxrss'])
total_time = float(mainjob['usage']['utime']) + float(mainjob['usage']['stime'])
if total_time > 0:
task.avg_cpu = float('%.4f' % (100 * (total_time / task.runtime)))

bytes_read = 0
bytes_written = 0

# get job memory and I/O information
if mainjob['procs']:
for p in mainjob['procs']:
bytes_read += max(int(p['rbytes']), int(p['rchar']))
bytes_written += max(int(p['wbytes']), int(p['wchar']))
if bytes_read > 0:
task.bytes_read = bytes_read
if bytes_written > 0:
task.bytes_written = bytes_written

# machine
task.machine = Machine(
name=data['machine']['uname_nodename'],
cpu={
'count': data['machine']['cpu_count'],
'speed': data['machine']['cpu_speed'],
'vendor': data['machine']['cpu_vendor']
},
system=MachineSystem(data['machine']['uname_system']),
architecture=data['machine']['uname_machine'],
memory=data['machine']['ram_total'],
release=data['machine']['uname_release']
)

os.remove(tmp_file)

def _parse_job_output_legacy(self, task, output_file):
"""
Parse the kickstart job output file in XML format (e.g., .out.000).
:param task: Task object.
:type task: Task
:param output_file: Output file name.
:type output_file: str
"""
runtime = 0
total_time = 0
bytes_read = 0
Expand All @@ -325,6 +468,7 @@ def _parse_job_output(self, task):
# clean output file from PBS logs
line_num = 0
temp_file = None

with open(output_file) as f:
for line in f:
line_num += 1
Expand All @@ -349,9 +493,6 @@ def _parse_job_output(self, task):
if e.get('transformation').startswith('pegasus:') or task.name.lower().startswith('chmod_'):
task.type = TaskType.AUXILIARY

if task.name.lower().startswith(('stage_in_', 'stage_out')):
task.type = TaskType.TRANSFER

for mj in e.findall('{http://pegasus.isi.edu/schema/invocation}mainjob'):
runtime += float(mj.get('duration'))

Expand Down Expand Up @@ -432,16 +573,3 @@ def _parse_job_output(self, task):
# cleaning temporary file
if temp_file:
os.remove(output_file)

# parsing meta file
self._parse_meta_file(task.name)

# parsing .sub file to get job priorities
sub_list = self._fetch_all_files("sub", task.name)
if not sub_list:
self.logger.warning('Job {} has no .sub record. Skipping it.'.format(task.name))
else:
with open(sub_list[0]) as f:
for line in f:
if line.startswith('priority'):
task.priority = int(line.split()[2])

0 comments on commit b0b84f0

Please sign in to comment.