# CROCUS Processing Workflow

## Imports Required for Workflow Generation

In [1]:
import os
import sys
import logging
from pathlib import Path
from argparse import ArgumentParser
from datetime import datetime
from datetime import timedelta
from Pegasus.api import *

## Class That Handles Workflow Generation

In [2]:
class CrocusWorkflow():
    wf = None
    sc = None
    tc = None
    rc = None
    props = None

    dagfile = None
    wf_dir = None
    shared_scratch_dir = None
    local_storage_dir = None
    wf_name = "crocus"

    # --- Init ---------------------------------------------------------------------
    def __init__(self, aqt_end_date, wxt_end_date, aqt_lag=1, wxt_lag=1, dagfile="workflow.yml"):
        self.dagfile = dagfile

        self.wf_dir = str(Path.cwd())
        self.shared_scratch_dir = os.path.join(self.wf_dir, "scratch")
        self.local_storage_dir = os.path.join(self.wf_dir, "output")

        self.aqt_end_date = aqt_end_date
        self.aqt_lag = aqt_lag
        self.wxt_end_date = wxt_end_date
        self.wxt_lag = wxt_lag


    # --- Write files in directory -------------------------------------------------
    def write(self):
        self.sc.write()
        self.props.write()
        self.rc.write()
        self.tc.write()
        self.wf.write()


    # --- Configuration (Pegasus Properties) ---------------------------------------
    def create_pegasus_properties(self):
        self.props = Properties()
        return


    # --- Site Catalog -------------------------------------------------------------
    def create_sites_catalog(self, exec_site_name="condorpool"):
        self.sc = SiteCatalog()

        local = (Site("local")
                    .add_directories(
                        Directory(Directory.SHARED_SCRATCH, self.shared_scratch_dir)
                            .add_file_servers(FileServer("file://" + self.shared_scratch_dir, Operation.ALL)),
                        Directory(Directory.LOCAL_STORAGE, self.local_storage_dir)
                            .add_file_servers(FileServer("file://" + self.local_storage_dir, Operation.ALL))
                    )
                )

        exec_site = (Site(exec_site_name)
                        .add_condor_profile(universe="vanilla")
                        .add_pegasus_profile(
                            style="condor"
                        )
                    )


        self.sc.add_sites(local, exec_site)
        return 


    # --- Transformation Catalog (Executables and Containers) ----------------------
    def create_transformation_catalog(self, exec_site_name="condorpool"):
        self.tc = TransformationCatalog()

        crocus_container = Container("crocus_container",
            container_type = Container.SINGULARITY,
            image="docker://papajim/crocus:latest",
            image_site="docker_hub"
        )

        # Add the crocus processing
        aqt_ingest = Transformation("aqt_ingest", site=exec_site_name, pfn=os.path.join(self.wf_dir, "executables/aqt-ingest.py"), is_stageable=True, container=crocus_container)
        wxt_ingest = Transformation("wxt_ingest", site=exec_site_name, pfn=os.path.join(self.wf_dir, "executables/wxt-ingest.py"), is_stageable=True, container=crocus_container)


        self.tc.add_containers(crocus_container)
        self.tc.add_transformations(aqt_ingest, wxt_ingest)
        return


    # --- Replica Catalog ----------------------------------------------------------
    def create_replica_catalog(self):
        self.rc = ReplicaCatalog()
        return


    # --- Create Workflow ----------------------------------------------------------
    def create_workflow(self):
        self.wf = Workflow(self.wf_name, infer_dependencies=True)
        
        #create aqt jobs
        aqt_start_date = self.aqt_end_date - timedelta(days=self.aqt_lag)
        for i in range(self.aqt_lag):
            curr_aqt_date = aqt_start_date + timedelta(days=i)
            date_time_str = curr_aqt_date.strftime("%Y-%m-%d")
            output_file = curr_aqt_date.strftime('crocus-neiu-aqt-a1-%Y%m%d-%H%M%S.nc'), 
            aqt_ingest_job = (
                Job("aqt_ingest", _id=f"crocus-neiu-aqt-{date_time_str}", node_label=f"crocus-neiu-aqt-{date_time_str}")
                    .add_args("--date", curr_aqt_date.strftime("%Y-%m-%d"))
                    .add_outputs(curr_aqt_date.strftime('crocus-neiu-aqt-a1-%Y%m%d-%H%M%S.nc'), register_replica=True, stage_out=True)
            )
        
            self.wf.add_jobs(aqt_ingest_job)



        #create wxt jobs
        wxt_start_date = self.wxt_end_date - timedelta(days=self.wxt_lag)
        for i in range(self.wxt_lag):
            curr_wxt_date = wxt_start_date + timedelta(days=i)
            date_time_str = curr_wxt_date.strftime("%Y-%m-%d")
            output_file = curr_wxt_date.strftime('crocus-neiu-wxt-a1-%Y%m%d-%H%M%S.nc')
            wxt_ingest_job = (Job("wxt_ingest", _id=f"crocus-neiu-wxt-{date_time_str}", node_label=f"crocus-neiu-wxt-{date_time_str}")
                    .add_args("--date", date_time_str)
                    .add_outputs(output_file, register_replica=True, stage_out=True)
            )
        
            self.wf.add_jobs(wxt_ingest_job)

    # --- Plan and Submit the Workflow ---------------------------------------------
    def plan_and_submit(self):
        try:
            self.wf.plan(
                submit=True,
                dir="./submit",
                output_sites=["local"]
            )
        except Exception as e:
            print(e)

    # --- Get status of the workflow -----------------------------------------------
    def status(self):
        try:
            self.wf.status(long=True)
        except PegasusClientError as e:
            print(e)
            
    # --- Get statistics of the workflow -------------------------------------------
    def statistics(self):
        try:
            self.wf.statistics()
        except PegasusClientError as e:
            print(e)

## Generate The Workflow
The CrocusWorkflow class accepts an AQT end date, an AQT lag, a WXT end date and a WXT lag.
The end date is not included in the pulled data, insted it is used as a reference for the lag to pull data for LAG previous days.

In [3]:
workflow = CrocusWorkflow(
    aqt_end_date = datetime.strptime("2024-03-15", "%Y-%m-%d"),
    wxt_end_date = datetime.strptime("2024-03-15", "%Y-%m-%d"),
    aqt_lag = 3,
    wxt_lag = 3
)

print("Creating execution sites...")
workflow.create_sites_catalog()

print("Creating workflow properties...")
workflow.create_pegasus_properties()

print("Creating transformation catalog...")
workflow.create_transformation_catalog()

print("Creating replica catalog...")
workflow.create_replica_catalog()

print("Creating crocus workflow dag...")
workflow.create_workflow()

workflow.write()

Creating execution sites...
Creating workflow properties...
Creating transformation catalog...
Creating replica catalog...
Creating crocus workflow dag...
defaultdict(<class 'collections.OrderedDict'>, {})
defaultdict(<class 'collections.OrderedDict'>, {})


## Plan and Submit the Workflow
We will now plan and submit the workflow for execution. By default we are running jobs on site condorpool i.e the selected ACCESS resource.

In [4]:
workflow.plan_and_submit()

2024.03.19 14:59:04.784 PDT:
2024.03.19 14:59:04.790 PDT:   -----------------------------------------------------------------------
2024.03.19 14:59:04.795 PDT:   File for submitting this DAG to HTCondor           : crocus-0.dag.condor.sub
2024.03.19 14:59:04.800 PDT:   Log of DAGMan debugging messages                 : crocus-0.dag.dagman.out
2024.03.19 14:59:04.806 PDT:   Log of HTCondor library output                     : crocus-0.dag.lib.out
2024.03.19 14:59:04.812 PDT:   Log of HTCondor library error messages             : crocus-0.dag.lib.err
2024.03.19 14:59:04.817 PDT:   Log of the life of condor_dagman itself          : crocus-0.dag.dagman.log
2024.03.19 14:59:04.823 PDT:
2024.03.19 14:59:04.828 PDT:   -no_submit given, not submitting DAG to HTCondor.  You can do this with:
2024.03.19 14:59:04.839 PDT:   -----------------------------------------------------------------------
2024.03.19 14:59:05.353 PDT:   Database version: '5.0.8dev' (sqlite:////home/georgpap/.pegasus/workflo

After the workflow has been successfully planned and submitted, you can use the Python Workflow object in order to monitor the status of the workflow. It shows in detail the counts of jobs of each status and also the whether the job is idle or running.

In [9]:
logging.basicConfig(level=logging.INFO)
workflow.status()


##################
# pegasus-status #
##################
STAT  IN_STATE  JOB
Run      02:37  crocus-0 ( /home/georgpap/GitHub/papajim/pegasus-sage/crocus-processing/submit/georgpap/pegasus/crocus/run0001 )
Run      01:52   ┣━wxt_ingest_crocus-neiu-wxt-2024-03-13
Run      01:52   ┣━wxt_ingest_crocus-neiu-wxt-2024-03-14
Run      01:52   ┗━wxt_ingest_crocus-neiu-wxt-2024-03-12
Summary: 4 Condor jobs total (R:4)

UNRDY READY   PRE  IN_Q  POST  DONE  FAIL %DONE STATE   DAGNAME
6     0     0     3     0     5     0  35.7 Running *crocus-0.dag
Summary: 1 DAG total (Running:1)


In [10]:
workflow.wf.wait()

[[1;32m#########################[0m] 100.0% ..Success ([1;34mUnready: 0[0m, [1;32mCompleted: 14[0m, [1;33mQueued: 0[0m, [1;36mRunning: 0[0m, [1;31mFailed: 0[0m)


<Pegasus.api.workflow.Workflow at 0x7fc2b028dc10>

##  Launch Pilots Jobs on ACCESS resources
At this point you should have some idle jobs in the queue. They are idle because there are no resources yet to execute on. Resources can be brought in with the HTCondor Annex tool, by sending pilot jobs (also called glideins) to the ACCESS resource providers. These pilots have the following properties:

A pilot can run multiple user jobs - it stays active until no more user jobs are available or until end of life has been reached, whichever comes first.

A pilot is partitionable - job slots will dynamically be created based on the resource requirements in the user jobs. This means you can fit multiple user jobs on a compute node at the same time.

A pilot will only run jobs for the user who started it.

The process of starting pilots is described in the [ACCESS Pegasus Documentation](https://xsedetoaccess.ccs.uky.edu/confluence/redirect/ACCESS+Pegasus.html)

## Statistics
Depending on if the workflow finished successfully or not, you have options on what to do next.
If the workflow failed you can use `workflow.analyze()` do get help finding out what went wrong.
If the workflow finished successfully, we can pull out some statistcs from the provenance database:

In [11]:
workflow.statistics()


######################
# pegasus-statistics #
######################
Database version: '5.0.8dev' (sqlite:////home/georgpap/GitHub/papajim/pegasus-sage/crocus-processing/submit/georgpap/pegasus/crocus/run0001/crocus-0.stampede.db)

#
# Pegasus Workflow Management System - http://pegasus.isi.edu
#
# Workflow summary:
#   Summary of the workflow execution. It shows total
#   tasks/jobs/sub workflows run, how many succeeded/failed etc.
#   In case of hierarchical workflow the calculation shows the
#   statistics across all the sub workflows.It shows the following
#   statistics about tasks, jobs and sub workflows.
#     * Succeeded - total count of succeeded tasks/jobs/sub workflows.
#     * Failed - total count of failed tasks/jobs/sub workflows.
#     * Incomplete - total count of tasks/jobs/sub workflows that are
#       not in succeeded or failed state. This includes all the jobs
#       that are not submitted, submitted but not completed etc. This
#       is calculated as  differenc