# Lab 4 (inference): Running Nextflow pipelines with HealthOmics Workflows

In this notebook you will learn how to create, run, and debug Nextflow based pipelines that process data from HealthOmics Storage and Amazon S3 using HealthOmics Workflows.

## Prerequisites
### Python requirements
* Python >= 3.8
* Packages:
  * boto3 >= 1.26.19
  * botocore >= 1.29.19

### AWS requirements

#### AWS CLI
You will need the AWS CLI installed and configured in your environment. Supported AWS CLI versions are:

* AWS CLI v2 >= 2.9.3 (Recommended)
* AWS CLI v1 >= 1.27.19

#### Output buckets
You will need a bucket **in the same region** you are running this tutorial in to store workflow outputs.

#### Input data
If you modify any of the workflows to retrieve input data (e.g. references or raw reads), that data **MUST reside in the same region**. AWS HealthOmics does not support cross-region read or write at this time.

## Step 1: Setup and Configuration

First, let's get our AWS account information and set up variables we'll use throughout the notebook.

In [2]:
import json
from datetime import datetime
import glob
import io
import os
from pprint import pprint
from textwrap import dedent
from time import sleep
from urllib.parse import urlparse
from zipfile import ZipFile, ZIP_DEFLATED

import boto3
import botocore.exceptions

# Get AWS account information
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()['Account']
region = boto3.Session().region_name

# Define S3 bucket and folder names
S3_BUCKET = f'nimbustx-boltz2'
LAB1_FOLDER = 'lab1-progen'
LAB2_FOLDER = 'lab2-amplify'
LAB3_FOLDER = 'lab3-esmfold'
LAB4_FOLDER = 'lab4-boltz'

print(f"Account ID: {account_id}")
print(f"Region: {region}")
print(f"S3 Bucket: {S3_BUCKET}")

Account ID: 648913559821
Region: us-east-1
S3 Bucket: nimbustx-boltz2


In [3]:
# Create local data folders for lab 4 
!mkdir -p data/$LAB4_FOLDER

### Upload Model file to S3

In [None]:
!aws s3 sync data/{LAB4_FOLDER}/model s3://{S3_BUCKET}/{LAB4_FOLDER}/model

### Modify Nextflow Configuration

In [None]:
filename = "boltz/definition/nextflow.config"

try:
    with open(filename, 'r') as file:
        file_content = file.read()

    modified_content = file_content.replace('ModelS3Location', f"s3://{S3_BUCKET}/{LAB4_FOLDER}/model/")
    modified_content = modified_content.replace('ECRImageURI', f"{account_id}.dkr.ecr.{region}.amazonaws.com/boltz2:latest")

    with open(filename, 'w') as file:
        file.write(modified_content)
except FileNotFoundError:
    print(f"Error: File '{filename}' not found.")
except Exception as e:
    print(f"An error occurred: {e}")

### Find IAM Role to run job 

In [None]:
from utils.iam_helper import IamHelper

# IAM role for ESMFold run
iam_helper = IamHelper()
job_role_arn = iam_helper.find_role_arn_by_pattern('OmicsWorkflowRole')

print(f'Job role ARN: {job_role_arn}') 

## Step 2: Using AWS HealthOmics Workflows - the basics
AWS HealthOmics Workflows allows you to perform bioinformatics compute - like genomics secondary analysis - at scale on AWS. These compute workloads are defined using workflow languages like WDL and Nextflow that specify multiple compute tasks and their input and output dependencies.

To run this workflow, we'll start by creating a client for the `omics` service.

In [None]:
omics = boto3.client('omics')

Now we need to bundle up the workflow as a zip-file and call the `create_workflow` API for `omics`.  We'll encapsulate these operations in a function for reuse later.

In [None]:
def create_workflow(
    workflow_root_dir, 
    parameters={"param_name":{"description": "param_desc"}}, 
    name=None, 
    description=None, 
    main=None):
    buffer = io.BytesIO()
    print("creating zip file:")
    with ZipFile(buffer, mode='w', compression=ZIP_DEFLATED) as zf:
        for file in glob.iglob(os.path.join(workflow_root_dir, '**/*'), recursive=True):
            if os.path.isfile(file):
                arcname = file.replace(os.path.join(workflow_root_dir, ''), '')
                print(f".. adding: {file} -> {arcname}")
                zf.write(file, arcname=arcname)

    response = omics.create_workflow(
        name=name,
        description=description,
        definitionZip=buffer.getvalue(),  # this argument needs bytes
        main=main,
        parameterTemplate=parameters,
    )

    workflow_id = response['id']
    print(f"workflow {workflow_id} created, waiting for it to become ACTIVE")

    try:
        waiter = omics.get_waiter('workflow_active')
        waiter.wait(id=workflow_id)

        print(f"workflow {workflow_id} ready for use")
    except botocore.exceptions.WaiterError as e:
        print(f"workflow {workflow_id} FAILED:")
        print(e)

    workflow = omics.get_workflow(id=workflow_id)
    return workflow

There are a few things to notice:

* To avoid polluting the local filesystem the zip-file is created in-memory with a byte buffer. If your workflow has many files such that the resultant bundle is large, you should consider alternative means of creating the zip file.
* A `main.(ext)` file, where `ext` matches the type of the workflow (e.g. `wdl`, or `nf`) must be at the root level of the zip file. HealthOmics uses this file as the primary entry point for the workflow. This is relevant for more modular workflows that have multiple definition files. In the call below, we explicitly point to `main.wdl`.
* The `definitionZip` argument takes a binary value and reads the byte buffer value directly.
* The `parameters` argument is a list of `parameterTemplate`s, which for now provide the parameter's name, and a description of what the parameter is. Actual parameter values are provided when the workflow is "run" - more on this below.

We can now use this function to create a workflow in HealthOmics Workflows from our WDL definition above:

In [None]:
workflow = create_workflow(
    'boltz/definition', 
    parameters={"input_path": {"description": "Path to fasta or yaml input file."}},
    name="Boltz2",
    description="Sample Boltz2 workflow",
    main="main.nf"
)
pprint(workflow)

Now we can start a workflow run with some input data using the `start_run` API call.

Note the following:
* Here the parameter value `input_file` is associated with an S3 uri. This is specific to this case. Workflow parameters will vary depending on the workflow definition they are associated with.

* We provide the ARN to the service role we created above. You can specify different roles as needed depending on what resources your workflow needs access to.

* We provide an `outputUri` with `start_run`. This is where the workflow will place **final** outputs as they are defined by the workflow definition (e.g. values in the `workflow.output` block of a WDL workflow). All intermediate results are discarded once the workflow completes.

In the cell below, we're using `waiters` to check for when the run starts and completes. These will block the current execution thread.

It will take about **30 minutes** for this workflow to start (scale up resources), run, and stop (scale down resources). Because this workflow is simple, the time it spends in a `RUNNING` state is fairly short relative to the scale-up/down times. For more complex workflows, or ones that process large amounts of data, the `RUNNING` state will be much longer (e.g. several hours). In that case, it's recommended to asynchronously check on the workflow status.

In [None]:
!aws s3 cp boltz/input/example.fa s3://{S3_BUCKET}/{LAB4_FOLDER}/input/

In [None]:
input_uri = f"s3://{S3_BUCKET}/{LAB4_FOLDER}/input/example.fa"
output_uri = f"s3://{S3_BUCKET}/{LAB4_FOLDER}/results/"

run = omics.start_run(
    workflowId=workflow['id'],
    name="Sample workflow run",
    roleArn=job_role_arn,
    parameters={
        "input_path": input_uri
    },
    outputUri=output_uri,
)

print(f"running workflow {workflow['id']}, starting run {run['id']}")
try:
    waiter = omics.get_waiter('run_running')
    waiter.wait(id=run['id'], WaiterConfig={'Delay': 30, 'MaxAttempts': 60})

    print(f"run {run['id']} is running")

    waiter = omics.get_waiter('run_completed')
    waiter.wait(id=run['id'], WaiterConfig={'Delay': 60, 'MaxAttempts': 60})

    print(f"run {run['id']} completed")
except botocore.exceptions.WaiterError as e:
    print(e)

Once the run completes we can verify its status by either listing it:

In [None]:
[_ for _ in omics.list_runs()['items'] if _['id'] == run['id']]

or getting its full details:

In [None]:
omics.get_run(id=run['id'])

We can verify that the correct output was generated by listing the `outputUri` for the workflow run:

In [None]:
s3uri = urlparse(omics.get_run(id=run['id'])['outputUri'])
prefix = f"{s3uri.path[1:]}{run['id']}"
boto3.client('s3').list_objects_v2(
    Bucket=s3uri.netloc, 
    Prefix=prefix
)['Contents']

Workflows typically have multiple tasks. We can list workflow tasks with:

In [None]:
tasks = omics.list_run_tasks(id=run['id'])
pprint(tasks['items'])

and get specific task details with:

In [None]:
task = omics.get_run_task(id=run['id'], taskId=tasks['items'][0]['taskId'])
pprint(task)

After running the cell above we should see that each task has an associated CloudWatch Logs LogStream. These capture any text generated by the workflow task that has been sent to either `STDOUT` or `STDERR`. These outputs are helpful for debugging any task failures and can be retrieved with:

In [None]:
events = boto3.client('logs').get_log_events(
    logGroupName="/aws/omics/WorkflowLog",
    logStreamName=f"run/{run['id']}/task/{task['taskId']}"
)
for event in events['events']:
    print(event['message'])