Skip to content

Commit

Permalink
WIP ctrl_bps_panda report
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoyuyoung committed Nov 10, 2022
1 parent f367ebb commit 7c37c44
Showing 1 changed file with 93 additions and 2 deletions.
95 changes: 93 additions & 2 deletions python/lsst/ctrl/bps/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import json
import logging
import os
import re

import idds.common.utils as idds_utils
import pandaclient.idds_api
Expand All @@ -36,7 +37,7 @@
from idds.workflowv2.workflow import Workflow as IDDS_client_workflow
from lsst.ctrl.bps.bps_config import BpsConfig
from lsst.ctrl.bps.panda.idds_tasks import IDDSWorkflowGenerator
from lsst.ctrl.bps.wms_service import BaseWmsService, BaseWmsWorkflow
from lsst.ctrl.bps.wms_service import BaseWmsService, BaseWmsWorkflow, WmsRunReport, WmsStates
from lsst.resources import ResourcePath

_LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -405,7 +406,97 @@ def report(self, wms_workflow_id=None, user=None, hist=0, pass_thru=None, is_glo
Extra message for report command to print. This could be
pointers to documentation or to WMS specific commands.
"""
raise NotImplementedError
message = ""
run_reports = []

c = self.get_idds_client()
ret = c.get_requests(request_id=wms_workflow_id, with_detail=True)
_LOG.debug("PanDA get workflow status returned = %s", str(ret))

success = False
if ret[0] == 0 and ret[1][1]:
head = ret[1][1][0]
wms_report = WmsRunReport(
wms_id = str(head['request_id']),
operator = head['username'],
project = "",
campaign = "",
payload = "",
run = head['name'],
state = WmsStates.UNKNOWN,
total_number_jobs = 0,
job_state_counts= {state: 0 for state in WmsStates},
job_summary = {},
run_summary = ""
)

# SubFinished tasks has jobs in
# output_processed_files: Finished
# output_failed_files: Failed
# output_missing_files: Missing
stateMap = {
"Finished": [WmsStates.SUCCEEDED],
"SubFinished": [WmsStates.SUCCEEDED, WmsStates.FAILED, WmsStates.MISFIT],
"Transforming": [WmsStates.RUNNING, WmsStates.SUCCEEDED, WmsStates.FAILED,
WmsStates.PENDING, WmsStates.MISFIT],
"Failed": [WmsStates.FAILED, WmsStates.MISFIT]
}

fileMap = {
WmsStates.SUCCEEDED: 'output_processed_files',
WmsStates.RUNNING: 'output_new_files',
WmsStates.FAILED: 'output_failed_files',
WmsStates.PENDING: 'output_processing_files',
WmsStates.MISFIT: 'output_missing_files'
}

# workflow status to report as SUCCEEDED
wfStatus = ["Finished", "SubFinished", "Transforming"]

wfsucceeded = False

tasks = ret[1][1]
tasks.sort(key = lambda x:x['transform_workload_id'])

for req in tasks:
totaljobs = req['output_total_files']
wms_report.total_number_jobs += totaljobs
taskid = req['transform_workload_id']
tasklabel = req['transform_name']
tasklabel = re.sub(wms_report.run + '_', '', tasklabel)
status = req['transform_status']['attributes']['_name_']
taskstatus = {}
for state in WmsStates:
njobs = 0
for mappedstate in stateMap[status]:
if state in fileMap and mappedstate == state:
njobs = req[fileMap[mappedstate]] if req[fileMap[mappedstate]] else 0
if state == WmsStates.RUNNING:
njobs -= req['input_new_files']
wms_report.job_state_counts[state] += njobs
taskstatus.update({state: njobs})
wms_report.job_summary.update({tasklabel : taskstatus})

# the EXPECTED column
if wms_report.run_summary:
wms_report.run_summary += ";"
wms_report.run_summary += tasklabel + ":" + str(totaljobs)

if status in wfStatus:
wfsucceeded = True
wms_report.state = stateMap[status][0]

# All tasks have failed, set the workflow FAILED
if not wfsucceeded:
wms_report.state = WmsStates.FAILED

run_reports.append(wms_report)
success = True

if not success:
raise RuntimeError(f"Error to get workflow status: {ret} for id: {wms_workflow_id}")

return run_reports, message

def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False):
"""Query WMS for list of submitted WMS workflows/jobs.
Expand Down

0 comments on commit 7c37c44

Please sign in to comment.