# Submitting jobs using the OGC API

## Summary

The [OGC ("Open Spatial Consortium") Processes API ("Abstract Program Interface")](https://ogcapi.ogc.org/processes/) is an international specification for executing remote processing of Earth Science data. The user makes HTTP(s) requests to execute data processing on a remote server, passing all required information in JSON ("JavaScript Object Notation") format. The user can use the OGC API to register, execute, and interact with CWL ("Common Workflow Language) defined [Airflow DAGs ("Directed Acyclic Graph")](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html) as OGC API processes.

This tutorial covers how to execute a CWL DAG using the `Unity-Py` Python client program which interfaces with the OGC Processes API. This can be used to execute any CWL workflow which invokes a science algorithm packaged as a Docker container.

**Note on process vs. job:** A process is equivalent to an Airflow DAG in the Unity sytem and describes an algorithm you wish to execute while a job is the execution of that process. A process may have many jobs executing at a single time while a job belongs to a single process.

## Requirements

- Any compute environment: Local or in the cloud
- This tutorial assumes you have deployed an OGC process to the OGC Processes API endpoint but covers briefly how to register (and deregister) a process at the end
- This notebook uses the [Unity-Py](https://github.com/unity-sds/unity-monorepo/tree/main/libs/unity-py) library to interact with the OGC API
- Unity environment (e.g., DEV, TEST, PROD) and venue (e.g., unity-sbg-dev, unity-emit-dev, unity-asips-int) names. See [this documentation](https://unity-sds.gitbook.io/docs/system-docs/architecture/deployments-projects-and-venues/unity-owned-venues) for guidance
- Unity username and password
- A URL for the OGC API processes endpoint, something like: `https://unity-dev-httpd-alb-XXXXXXXXXX.us-west-2.elb.amazonaws.com:1234/unity/dev/ogc/`

## Learning Objectives

- List all processes that have been registered at the OGC endpoint URL
- Get details on a single process
- Execute a job
- Monitor a job
- Get all jobs executing for a process
- Delete a job

In [1]:
from datetime import datetime
import time

from IPython.display import JSON
from ipywidgets import widgets

import requests

from unity_sds_client.unity import Unity    # Unity-Py python client library
from unity_sds_client.unity_services import UnityServices
from unity_sds_client.resources.job_status import JobStatus
from unity_sds_client.unity import UnityEnvironments

## Set up Unity-Py environment

The following code will set up the Unity-Py client so that it can interact with an OGC Processes API endpoint. Then you can query the API for process information.

In [2]:
# Point Unity-Py to DEV environment for testing and development work
unity = Unity(UnityEnvironments.DEV)    # Enter Unity username and password

Please enter your Unity username:  tebaldi
Please enter your Unity password:  ········


In [None]:
# Venue-level configuration
unity.set_venue_id("")   # Leave blank for now

# Define process service to interact with the Processes API
process_service = unity.client(UnityServices.PROCESS_SERVICE)
process_service.endpoint = input("Please enter the URL for the OGC Processes API: ")

## List available processes

In [4]:
# Retrieve all processes that have been registered at the OGC Processes API endpoint
processes = process_service.get_processes()

for process in processes:
    print(f"{process.id} details: {process}\n")

# Available Process object attributes
# id, title, job_control_options, keywords, process_version, abstract, inputs, outputs

cwl_dag details: unity_sds_client.resources.Process(
    id="cwl_dag",
    process_version="1.0.0"
    title="Generic CWL Process",
    abstract="This process executes any CWL workflow.",
    keywords="None"
)



In [5]:
# Retrieve a specific DAG that you would like to execute a job for
process = process_service.get_process("cwl_dag")
print(process)

unity_sds_client.resources.Process(
    id="cwl_dag",
    process_version="1.0.0"
    title="Generic CWL Process",
    abstract="This process executes any CWL workflow.",
    keywords="None"
)


## Execute a process (i.e. create a job)

The following code defines the inputs/outputs needed to execute a job from the process defined above.

In [6]:
# Set the necessary workflow inputs
data = {
  "inputs": {
    "cwl_workflow": "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl",    # Raw GitHub link to the CWL definition
    "cwl_args": "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml",    # YAML file that contains CWL workflow arguments
    "request_instance_type": "r7i.xlarge",    # Specify the EC2 instance type used to execute the job
    "request_storage": "10Gi"    # Specify how much ephemeral storage is allocated to the job
  },
  "outputs": {
    "result": {
      "transmissionMode": "reference"    # Indicate the return of data is in the form of a link rather than a raw value
    }
  }
}

job = process.execute(data)
print(job)   # Print initial details about the job

unity_sds_client.resources.Job(
    id="22ccd35f-fd72-4ffd-acbb-3de9c4424b69",
    process="cwl_dag",
    status="accepted",
    inputs=None
)


## Monitor the job

In [7]:
# Retrieve status and poll value until the job is complete
status = job.get_status().status
while status in [JobStatus.ACCEPTED, JobStatus.RUNNING]:
    print(f"Job {job.id} is not complete; Currently in state: {job.get_status().status}")
    time.sleep(5)    # Wait 5 seconds and query status again
    status = job.get_status().status

print(f"Job {job.id} completed with status {job.get_status().status}")

Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69 is not complete; Currently in state: JobStatus.RUNNING
Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69 is not complete; Currently in state: JobStatus.RUNNING
Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69 is not complete; Currently in state: JobStatus.RUNNING
Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69 is not complete; Currently in state: JobStatus.RUNNING
Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69 is not complete; Currently in state: JobStatus.RUNNING
Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69 is not complete; Currently in state: JobStatus.RUNNING
Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69 is not complete; Currently in state: JobStatus.RUNNING
Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69 is not complete; Currently in state: JobStatus.RUNNING
Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69 is not complete; Currently in state: JobStatus.RUNNING
Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69 is not complete; Currently in state: JobStatus.RUNNING
Job 22ccd35f-fd72-4ffd-acbb-3de9c4424b69

## Display all jobs for a process

In [8]:
jobs = process_service.get_jobs()

print(f"Total jobs: {len(jobs)}")
print("Job details: ")
for job in jobs:
    print(job)

Total jobs: 1
Job details: 
unity_sds_client.resources.Job(
    id="22ccd35f-fd72-4ffd-acbb-3de9c4424b69",
    process="cwl_dag",
    status="successful",
    inputs=None
)


## Delete a job

This will delete the job that was created and executed above and is stored in the `job` variable. You can also run this when a job is running to stop job execution and delete it.

In [9]:
status = job.dismiss()
print(status)

unity_sds_client.resources.Job(
    id="22ccd35f-fd72-4ffd-acbb-3de9c4424b69",
    process="cwl_dag",
    status="dismissed",
    inputs=None
)


In [10]:
# Display all jobs
jobs = process_service.get_jobs()
print(f"Total jobs: {len(jobs)}")

Total jobs: 0


## Registering a process (optional)

You will need to register a process before it can be executed. You can make a request to register a new science algorithm, encoded as an Airflow DAG. The DAG (a Python program) needs to be checked in within the GitHub repository that was configured during the SPS deployment. In other words, the DAG author will check the latest version of the code into GitHub in the specified folder, and then they can make an HTTP request to the OGC Processes API to register that process for execution. 

Let's consider the following DAG: https://github.com/unity-sds/unity-sps/blob/develop/airflow/dags/cwl_dag.py. We will assume that your SPS deployment has been configured to monitor the GitHub repository https://github.com/unity-sds/unity-sps at the path "airflow/dags" in the brach "main".

In order to register a process you will need to define a [process description](https://docs.ogc.org/is/18-062r2/18-062r2.html#toc37) which provides metadata that is used to register the process and execute jobs for that process.

### Define the process description as a Python dictionary

In [11]:
cwl_dag_process_description = {
    "executionUnit": {
        "image": "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.5.1",
        "type": "docker"
    },
    "processDescription": {
        "description": "This process executes any CWL workflow.",
        "id": "cwl_dag",
        "inputs": {
            "cwl_args": {
                "description": "The URL of the CWL workflow's YAML parameters file",
                "maxOccurs": 1,
                "minOccurs": 1,
                "schema": {
                    "format": "uri",
                    "type": "string"
                },
                "title": "CWL Workflow Parameters URL"
            },
            "cwl_workflow": {
                "description": "The URL of the CWL workflow",
                "maxOccurs": 1,
                "minOccurs": 1,
                "schema": {
                    "format": "uri",
                    "type": "string"
                },
                "title": "CWL Workflow URL"
            },
            "request_instance_type": {
                "description": "The specific EC2 instance type requested for the job",
                "maxOccurs": 1,
                "minOccurs": 1,
                "schema": {
                    "type": "string"
                },
                "title": "Requested EC2 Type"
            },
            "request_storage": {
                "description": "The amount of storage requested for the job",
                "maxOccurs": 1,
                "minOccurs": 1,
                "schema": {
                    "type": "string"
                },
                "title": "Requested Storage"
            }
        },
        "jobControlOptions": [
            "async-execute"
        ],
        "outputs": {
            "result": {
                "description": "The result of the SBG Preprocess Workflow execution",
                "schema": {
                    "$ref": "some-ref"
                },
                "title": "Process Result"
            }
        },
        "title": "Generic CWL Process",
        "version": "1.0.0"
    }
}
    

### Register the process

Note this will throw an error if you try to register a process that has already been registered.

In [None]:
# Set up the Unity venue
unity = Unity(UnityEnvironments.DEV)    # Enter Unity username and password
unity.set_venue_id("")   # Leave blank for now
process_service = unity.client(UnityServices.PROCESS_SERVICE)
process_service.endpoint = input("Please enter the URL for the OGC Processes API: ")

# Register the process
response = process_service.deploy_process(cwl_dag_process_description)
print(f"\nRegistered process response: {response}")

# List all processes
processes = process_service.get_processes()
print("\nAll registered processes: ")
for process in processes:
    print(process)

## Deregister a process (optional)

When you deregister a process it is removed from the SPS deployment. The `Unity-Py` library currently does not have a process for deregistering processes so you will need to use the `requests` library to preform a `DELETE` request.

In [None]:
# Set up the Unity venue
unity = Unity(UnityEnvironments.DEV)    # Enter Unity username and password
unity.set_venue_id("")   # Leave blank for now
process_service = unity.client(UnityServices.PROCESS_SERVICE)
process_service.endpoint = input("Please enter the URL for the OGC Processes API: ")

# List all processes
processes = process_service.get_processes()
print("All registered processes: ")
for process in processes:
    print(process)

In [15]:
# Grab process "id" from above and submit a delete request
response = requests.delete(f"{process_service.endpoint}/processes/cwl_dag")   # Entering id from process details above as the last element in the URL
print(f"Deregistered process response: {response}")    # 204 indicates success

Deregistered process response: <Response [204]>
