Skip to content

Commit

Permalink
DM-35296 ctrl_bps_panda report
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoyuyoung committed Nov 21, 2022
1 parent f367ebb commit 34c177a
Showing 1 changed file with 102 additions and 2 deletions.
104 changes: 102 additions & 2 deletions python/lsst/ctrl/bps/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import json
import logging
import os
import re

import idds.common.utils as idds_utils
import pandaclient.idds_api
Expand All @@ -36,7 +37,7 @@
from idds.workflowv2.workflow import Workflow as IDDS_client_workflow
from lsst.ctrl.bps.bps_config import BpsConfig
from lsst.ctrl.bps.panda.idds_tasks import IDDSWorkflowGenerator
from lsst.ctrl.bps.wms_service import BaseWmsService, BaseWmsWorkflow
from lsst.ctrl.bps.wms_service import BaseWmsService, BaseWmsWorkflow, WmsRunReport, WmsStates
from lsst.resources import ResourcePath

_LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -405,7 +406,106 @@ def report(self, wms_workflow_id=None, user=None, hist=0, pass_thru=None, is_glo
Extra message for report command to print. This could be
pointers to documentation or to WMS specific commands.
"""
raise NotImplementedError
message = ""
run_reports = []

c = self.get_idds_client()
ret = c.get_requests(request_id=wms_workflow_id, with_detail=True)
_LOG.debug("PanDA get workflow status returned = %s", str(ret))

success = False
if ret[0] == 0 and ret[1][1]:
head = ret[1][1][0]
wms_report = WmsRunReport(
wms_id=str(head["request_id"]),
operator=head["username"],
project="",
campaign="",
payload="",
run=head["name"],
state=WmsStates.UNKNOWN,
total_number_jobs=0,
job_state_counts={state: 0 for state in WmsStates},
job_summary={},
run_summary="",
)

# SubFinished tasks has jobs in
# output_processed_files: Finished
# output_failed_files: Failed
# output_missing_files: Missing
stateMap = {
"Finished": [WmsStates.SUCCEEDED],
"SubFinished": [
WmsStates.SUCCEEDED,
WmsStates.FAILED,
WmsStates.PRUNED,
],
"Transforming": [
WmsStates.RUNNING,
WmsStates.SUCCEEDED,
WmsStates.FAILED,
WmsStates.UNREADY,
WmsStates.PRUNED,
],
"Failed": [WmsStates.FAILED, WmsStates.PRUNED],
}

fileMap = {
WmsStates.SUCCEEDED: "output_processed_files",
WmsStates.RUNNING: "output_processing_files",
WmsStates.FAILED: "output_failed_files",
WmsStates.UNREADY: "input_new_files",
WmsStates.PRUNED: "output_missing_files",
}

# workflow status to report as SUCCEEDED
wfStatus = ["Finished", "SubFinished", "Transforming"]

wfsucceeded = False

tasks = ret[1][1]
tasks.sort(key=lambda x: x["transform_workload_id"])

for req in tasks:
totaljobs = req["output_total_files"]
wms_report.total_number_jobs += totaljobs
tasklabel = req["transform_name"]
tasklabel = re.sub(wms_report.run + "_", "", tasklabel)
status = req["transform_status"]["attributes"]["_name_"]
taskstatus = {}
for state in WmsStates:
njobs = 0
for mappedstate in stateMap[status]:
if state in fileMap and mappedstate == state:
njobs = req[fileMap[mappedstate]] if req[fileMap[mappedstate]] else 0
if state == WmsStates.RUNNING:
njobs += req["output_new_files"] - req["input_new_files"]
break
wms_report.job_state_counts[state] += njobs
taskstatus.update({state: njobs})
wms_report.job_summary.update({tasklabel: taskstatus})

# the EXPECTED column
if wms_report.run_summary:
wms_report.run_summary += ";"
wms_report.run_summary += tasklabel + ":" + str(totaljobs)

if status in wfStatus:
wfsucceeded = True
wms_report.state = stateMap[status][0]

# All tasks have failed, set the workflow FAILED
if not wfsucceeded:
wms_report.state = WmsStates.FAILED

run_reports.append(wms_report)
success = True

if not success:
raise RuntimeError(f"Error to get workflow status: {ret} for id: {wms_workflow_id}")

return run_reports, message

def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False):
"""Query WMS for list of submitted WMS workflows/jobs.
Expand Down

0 comments on commit 34c177a

Please sign in to comment.