# monitor_workflow_and_report_results

Top-level Notebook for monitoring and reporting results of workflow DRS data access scale tests.

In [None]:
# Temporarily uninstall/reinstall until versioning is implemented.
! pip3 uninstall -y terra-workflow-scale-test-tools

In [None]:
! pip3 install --upgrade --no-cache-dir git+https://github.com/mbaumann-broad/terra-workflow-scale-test-tools.git

# Imports

In [None]:
import importlib
import os
from datetime import datetime
from pathlib import Path

from terra_workflow_scale_test_tools.monitor_response_times import \
    start_monitoring_in_current_process, stop_monitoring_in_current_process
from terra_workflow_scale_test_tools.user_input import UserInputUI
from terra_workflow_scale_test_tools.workflow_status import WorkflowDAO, wait_for_workflow_to_complete

In [None]:
def get_resource_path(filename: str) -> str:
    with importlib.resources.path("terra_workflow_scale_test_tools", filename) as path:
        return str(path)

In [None]:
class StopExecution(Exception):
    def _render_traceback_(self):
        pass

# Manual Input/Configuration

In [None]:
ui = UserInputUI()
ui.display()

In [None]:
# TODO Add more validation of the selected inputs.
if ui.get_submission_id() == "":
    print("Please enter a valid Submission Id then run this notebook cell again.")
    raise StopExecution

In [None]:
WF_SUBMISSION_ID = ui.get_submission_id()
WF_SUBMISSION_ID

In [None]:
monitor_response_time = ui.is_monitor_response_times()
monitor_response_time

In [None]:
PROJECT_TO_MONITOR = ui.get_data_service()
PROJECT_TO_MONITOR

In [None]:
TERRA_DEPLOYMENT_TIER=ui.get_terra_deployment_tier()
TERRA_DEPLOYMENT_TIER

In [None]:
copy_workflow_logs_for_analysis = ui.is_copy_workflow_logs_for_analysis()
copy_workflow_logs_for_analysis

In [None]:
extract_timeseries_data = ui.is_extract_timeseries_data()
extract_timeseries_data

In [None]:
display_timeseries_graphs = ui.is_display_timeseries_graphs()
display_timeseries_graphs

# Get Workflow Details

In [None]:
WORKSPACE_NAMESPACE = os.getenv('WORKSPACE_NAMESPACE')
WORKSPACE_NAME = os.getenv('WORKSPACE_NAME')

In [None]:
workflow_dao = WorkflowDAO(TERRA_DEPLOYMENT_TIER, WORKSPACE_NAMESPACE, WORKSPACE_NAME, WF_SUBMISSION_ID)
print("This may take a minute or more for large workflows ...")
print(f"\nSubmission Summary:\n{workflow_dao.get_workflow_summary_display_string()}")

In [None]:
WF_START_TIME = workflow_dao.get_submission_time('%Y/%m/%d %H:%M:%S')
WF_START_TIME

# General Constants

In [None]:
WORKSPACE_BUCKET=os.environ['WORKSPACE_BUCKET']
WORKSPACE_BUCKET

In [None]:
WF_SUBMISSION_GS_URI= f"{WORKSPACE_BUCKET}/{WF_SUBMISSION_ID}"
WF_SUBMISSION_GS_URI

In [None]:
TEST_RESULTS_DIR=Path('./test_results').resolve().as_posix()
! mkdir -p "{TEST_RESULTS_DIR}"
TEST_RESULTS_DIR

In [None]:
WF_TEST_RESULTS_DIR=os.path.join(TEST_RESULTS_DIR, f"submission_{WF_SUBMISSION_ID}")
! mkdir -p "{WF_TEST_RESULTS_DIR}"
WF_TEST_RESULTS_DIR

In [None]:
WF_TEST_RESULTS_WORKFLOW_LOGS_DIR=os.path.join(WF_TEST_RESULTS_DIR, "workflow-logs")
# Do not create directory here.
# It is created when the logs are copied, and it's presence indicates they have been copied.
WF_TEST_RESULTS_WORKFLOW_LOGS_DIR

In [None]:
MONITORING_OUTPUT_DIR=os.path.join(WF_TEST_RESULTS_DIR,
                                   f"monitoring_data_{datetime.strptime(WF_START_TIME, '%Y/%m/%d %H:%M:%S').strftime('%Y%m%d_%H%M%S')}")
! mkdir -p "{MONITORING_OUTPUT_DIR}"
MONITORING_OUTPUT_DIR

# Monitor response times during workflow execution

In [None]:
if monitor_response_time:
    start_monitoring_in_current_process(
        TERRA_DEPLOYMENT_TIER, PROJECT_TO_MONITOR, MONITORING_OUTPUT_DIR)

    wait_for_workflow_to_complete(workflow_dao)

    stop_monitoring_in_current_process()

# Copy workflow logs from the workspace bucket to the local filesystem

In [None]:
workflow_logs_path = Path(WF_TEST_RESULTS_WORKFLOW_LOGS_DIR)
workflow_logs_previously_copied = workflow_logs_path.exists() and workflow_logs_path.is_dir()
if copy_workflow_logs_for_analysis:
    if not workflow_logs_previously_copied:
        workflow_logs_path.mkdir(parents=True, exist_ok=False)
        # Copy the logs - this can take a long time (tens of minutes to hours)
        ! "{get_resource_path('copy_workflow_logs_to_local_fs.sh')}" -s "{WF_SUBMISSION_GS_URI}" -d "{WF_TEST_RESULTS_WORKFLOW_LOGS_DIR}" > "{WF_TEST_RESULTS_WORKFLOW_LOGS_DIR}/copy_workflow_logs_to_local_fs.log" 2>&1
    else:
        print(f"The workflow-logs directory already exists: {WF_TEST_RESULTS_WORKFLOW_LOGS_DIR}")
        print("Skipping copy of the workflow logs.")
else:
    print("Currently configured to skip copying of workflow logs.")


In [None]:
workflow_logs_copied = workflow_logs_path.exists() and workflow_logs_path.is_dir()
if workflow_logs_copied and extract_timeseries_data:
    ! "{get_resource_path('extract_drs_localization_timestamps.sh')}" -d "{WF_TEST_RESULTS_DIR}"
    ! "{get_resource_path('extract_drs_localization_fallback_timestamps.sh')}" -d "{WF_TEST_RESULTS_DIR}"

# Display the results of the workflow run

## Workflow DRS localization rates

In [None]:
if workflow_logs_copied and display_timeseries_graphs:
    %run "{get_resource_path('graph_drs_data_access_rates.ipynb')}"

In [None]:
if display_timeseries_graphs:
    %run "{get_resource_path('graph_response_time_data.ipynb')}"