Skip to content

Commit

Permalink
Merge pull request #11 from lsst/tickets/DM-34964
Browse files Browse the repository at this point in the history
DM-34964: add cancel, restart and ping function in the bps panda plugin
  • Loading branch information
wguanicedew committed Jul 7, 2022
2 parents 0005c0f + be97783 commit 16c11f7
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 19 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-34964.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add cancel, restart, report and ping functions in bps panda plugin.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ build-backend = "setuptools.build_meta"

[tool.black]
line-length = 110
target-version = ["py38"]
target-version = ["py310"]

[tool.isort]
profile = "black"
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/bps/panda/idds_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def define_tasks(self):
if bps_node.number_of_retries:
task.max_attempt = bps_node.number_of_retries
else:
task.max_attempt = self.number_of_retries.get(task_name, 3)
task.max_attempt = self.number_of_retries.get(task_name, 5)
if bps_node.request_walltime:
task.max_walltime = bps_node.request_walltime
else:
Expand Down Expand Up @@ -319,7 +319,7 @@ def get_final_task(self):
if final_job.number_of_retries:
task.max_attempt = final_job.number_of_retries
else:
task.max_attempt = self.number_of_retries.get(task.name, 3)
task.max_attempt = self.number_of_retries.get(task.name, 5)
if final_job.request_walltime:
task.max_walltime = final_job.request_walltime
else:
Expand Down
230 changes: 214 additions & 16 deletions python/lsst/ctrl/bps/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import binascii
import concurrent.futures
import json
import logging
import os

Expand All @@ -33,8 +34,8 @@
from idds.doma.workflowv2.domapandawork import DomaPanDAWork
from idds.workflowv2.workflow import AndCondition
from idds.workflowv2.workflow import Workflow as IDDS_client_workflow
from lsst.ctrl.bps.bps_config import BpsConfig
from lsst.ctrl.bps.panda.idds_tasks import IDDSWorkflowGenerator
from lsst.ctrl.bps.panda.panda_auth_utils import panda_auth_update
from lsst.ctrl.bps.wms_service import BaseWmsService, BaseWmsWorkflow
from lsst.resources import ResourcePath

Expand Down Expand Up @@ -195,19 +196,16 @@ def submit(self, workflow):
conditions.append(work.is_terminated)
and_cond = AndCondition(conditions=conditions, true_works=[DAG_final_work])
idds_client_workflow.add_condition(and_cond)
_, idds_server = self.config.search("iddsServer", opt={"default": None})
c = pandaclient.idds_api.get_api(
idds_utils.json_dumps, idds_host=idds_server, compress=True, manager=True
)
ret = c.submit(idds_client_workflow, username=None, use_dataset_name=False)
_LOG.debug("iDDS client manager submit returned = %s", str(ret))
idds_client = self.get_idds_client()
ret = idds_client.submit(idds_client_workflow, username=None, use_dataset_name=False)
_LOG.debug("iDDS client manager submit returned = %s", ret)

# Check submission success
# https://panda-wms.readthedocs.io/en/latest/client/rest_idds.html
if ret[0] == 0 and ret[1][0]:
request_id = int(ret[1][-1])
status, result, error = self.get_idds_result(ret)
if status:
request_id = int(result)
else:
raise RuntimeError(f"Error submitting to PanDA service: {str(ret)}")
raise RuntimeError(f"Error submitting to PanDA service: {error}")

_LOG.info("Submitted into iDDs with request id=%s", request_id)
workflow.run_id = request_id
Expand Down Expand Up @@ -285,6 +283,97 @@ def copy_files_for_distribution(tasks, file_distribution_uri):

return files_plc_hldr, direct_IO_files

def get_idds_client(self):
"""Get the idds client
Returns
-------
idds_client: `idds.client.clientmanager.ClientManager`
iDDS ClientManager object.
"""
idds_server = None
if isinstance(self.config, BpsConfig):
_, idds_server = self.config.search("iddsServer", opt={"default": None})
elif isinstance(self.config, dict) and "iddsServer" in self.config:
idds_server = self.config["iddsServer"]
# if idds_server is None, a default value on the panda relay service
# will be used
idds_client = pandaclient.idds_api.get_api(
idds_utils.json_dumps, idds_host=idds_server, compress=True, manager=True
)
return idds_client

def get_idds_result(self, ret):
"""Parse the results returned from iDDS.
Parameters
----------
ret: `tuple` of (`int`, (`bool`, payload)).
The first part ret[0] is the status of PanDA relay service.
The part of ret[1][0] is the status of iDDS service.
The part of ret[1][1] is the returned payload.
If ret[1][0] is False, ret[1][1] can be error messages.
Returns
-------
status: `bool`
The status of iDDS calls.
result: `int` or `list` or `dict`
The result returned from iDDS.
error: `str`
Error messages.
"""
# https://panda-wms.readthedocs.io/en/latest/client/rest_idds.html
if not (isinstance(ret, tuple) or isinstance(ret, list)) or ret[0] != 0:
# Something wrong with the PanDA relay service.
# The call may not be delivered to iDDS.
status = False
result = None
error = "PanDA relay service returns errors: %s" % str(ret)
else:
if ret[1][0]:
status = True
result = ret[1][1]
error = None
else:
# iDDS returns errors
status = False
result = None
error = "iDDS returns errors: %s" % str(ret[1][1])
return status, result, error

def restart(self, wms_workflow_id):
"""Restart a workflow from the point of failure.
Parameters
----------
wms_workflow_id : `str`
Id that can be used by WMS service to identify workflow that
need to be restarted.
Returns
-------
wms_id : `str`
Id of the restarted workflow. If restart failed, it will be set
to `None`.
run_name : `str`
Name of the restarted workflow. If restart failed, it will be set
to `None`.
message : `str`
A message describing any issues encountered during the restart.
If there were no issue, an empty string is returned.
"""
idds_client = self.get_idds_client()
ret = idds_client.retry(request_id=wms_workflow_id)
_LOG.debug("Restart PanDA workflow returned = %s", ret)

status, result, error = self.get_idds_result(ret)
if status:
_LOG.info("Restarting PanDA workflow %s", result)
return wms_workflow_id, None, json.dumps(result)
else:
return None, None, "Error retry PanDA workflow: %s" % str(error)

def report(self, wms_workflow_id=None, user=None, hist=0, pass_thru=None, is_global=False):
"""Stub for future implementation of the report method
Expected to return run information based upon given constraints.
Expand Down Expand Up @@ -312,9 +401,117 @@ def report(self, wms_workflow_id=None, user=None, hist=0, pass_thru=None, is_glo
Extra message for report command to print. This could be
pointers to documentation or to WMS specific commands.
"""
message = ""
run_reports = None
return run_reports, message
raise NotImplementedError

def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False):
"""Query WMS for list of submitted WMS workflows/jobs.
This should be a quick lookup function to create list of jobs for
other functions.
Parameters
----------
wms_id : `int` or `str`, optional
Id or path that can be used by WMS service to look up job.
user : `str`, optional
User whose submitted jobs should be listed.
require_bps : `bool`, optional
Whether to require jobs returned in list to be bps-submitted jobs.
pass_thru : `str`, optional
Information to pass through to WMS.
is_global : `bool`, optional
If set, all available job queues will be queried for job
information. Defaults to False which means that only a local job
queue will be queried for information.
Only applicable in the context of a WMS using distributed job
queues (e.g., HTCondor). A WMS with a centralized job queue
(e.g. PanDA) can safely ignore it.
Returns
-------
req_ids : `list` [`Any`]
Only job ids to be used by cancel and other functions. Typically
this means top-level jobs (i.e., not children jobs).
"""
if wms_id is None and user is not None:
raise RuntimeError(
"Error to get workflow status report: wms_id is required"
" and filtering workflows with 'user' is not supported."
)

idds_client = self.get_idds_client()
ret = idds_client.get_requests(request_id=wms_id)
_LOG.debug("PanDA get workflows returned = %s", ret)

status, result, error = self.get_idds_result(ret)
if status:
req_ids = [req["request_id"] for req in result]
return req_ids
else:
raise RuntimeError(f"Error list PanDA workflow requests: {error}")

def cancel(self, wms_id, pass_thru=None):
"""Cancel submitted workflows/jobs.
Parameters
----------
wms_id : `str`
ID or path of job that should be canceled.
pass_thru : `str`, optional
Information to pass through to WMS.
Returns
-------
deleted : `bool`
Whether successful deletion or not. Currently, if any doubt or any
individual jobs not deleted, return False.
message : `str`
Any message from WMS (e.g., error details).
"""
idds_client = self.get_idds_client()
ret = idds_client.abort(request_id=wms_id)
_LOG.debug("Abort PanDA workflow returned = %s", ret)

status, result, error = self.get_idds_result(ret)
if status:
_LOG.info("Aborting PanDA workflow %s", result)
return True, json.dumps(result)
else:
return False, "Error abort PanDA workflow: %s" % str(error)

def ping(self, pass_thru=None):
"""Checks whether PanDA WMS services are up, reachable,
and can authenticate if authentication is required.
The services to be checked are those needed for submit, report, cancel,
restart, but ping cannot guarantee whether jobs would actually run
successfully. Any messages should be sent directly to the logger.
Parameters
----------
pass_thru : `str`, optional
Information to pass through to WMS.
Returns
-------
status : `int`
0 for success, non-zero for failure
message : `str`
Any message from WMS (e.g., error details).
"""
idds_client = self.get_idds_client()
ret = idds_client.ping()
_LOG.debug("Ping PanDA service returned = %s", ret)

status, result, error = self.get_idds_result(ret)
if status:
if "Status" in result and result["Status"] == "OK":
return 0, None
else:
return -1, "Error ping PanDA service: %s" % str(result)
else:
return -1, "Error ping PanDA service: %s" % str(error)

def run_submission_checks(self):
"""Checks to run at start if running WMS specific submission steps.
Expand All @@ -326,8 +523,9 @@ def run_submission_checks(self):
if key not in os.environ:
raise OSError(f"Missing environment variable {key}")

_, idds_server = self.config.search("iddsServer", opt={"default": None})
panda_auth_update(idds_server, reset=False)
status, message = self.ping()
if status != 0:
raise RuntimeError(message)


class PandaBpsWmsWorkflow(BaseWmsWorkflow):
Expand Down

0 comments on commit 16c11f7

Please sign in to comment.