# PYTHON SDK

In [1]:
import couler.argo as argo
from couler.core.templates.volume import Volume, VolumeMount
from couler.core.templates.volume_claim import VolumeClaimTemplate
from onepanel.couler import Submitter

## Workflow Parameters

In [2]:
# Volumes
path = '/mnt/temp' # path should start with '/' and shouldn't end with '/'
volume_claim_name = 'cvat' # Volume name must contain lowercase alphanumeric characters only
size = '20Gi'

workspace_name = 'Auto CVAT SDK'
workspace_node_pool = 'Standard_D4s_v3'
share_volume_size = '20480'# Disk size in MB for volume that store raw input data and custom pre-annotation models
data_volume_size = '30720' # Disk size in MB for volume that stores CVAT data, make this at least 50% larger than the share volume size
workspace_template_uid = 'cvat'

workspace_data_path = 'share/data' # Destination directory to download data in CVAT Workspace relative to `/share`
object_storage_data_path = 'rush/data/potholes/'  # Source directory for data in object storage

cvat_task_name = 'CVAT Task' # CVAT task name
cvat_task_labels = '''[{"name": "person"},{"name": "car"}]''' # CVAT task labels

## Launching a CVAT Workspace

In [3]:
def _launch_workspace():
    import os
    import subprocess
    import sys
    from time import sleep

    # Install onepanel-sdk
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'onepanel-sdk==0.21.0'])

    import onepanel.core.api
    from onepanel.core.api.rest import ApiException
    from onepanel.core.api.models import Parameter

    # Get mounted service account token to use as API Key
    with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:
        token = f.read()

    # Configure API key authorization: Bearer
    configuration = onepanel.core.api.Configuration(
        host = os.getenv('ONEPANEL_API_URL'),
        api_key = {
            'authorization': token
        }
    )
    configuration.api_key_prefix['authorization'] = 'Bearer'

    # Enter a context with an instance of the API client
    with onepanel.core.api.ApiClient(configuration) as api_client:
        # Create an instance of the API class
        api_instance = onepanel.core.api.WorkspaceServiceApi(api_client)
        namespace = os.getenv('ONEPANEL_RESOURCE_NAMESPACE')
        params = []
        params.append(Parameter(name='sys-name', value='{{inputs.parameters.para-launch-workspace-0}}')) # Workspace name
        params.append(Parameter(name='sys-node-pool', value='{{inputs.parameters.para-launch-workspace-1}}')) # Machine type
        params.append(Parameter(name='sys-data-volume-size', value='{{inputs.parameters.para-launch-workspace-2}}')) # Data volume size
        params.append(Parameter(name='sys-share-volume-size', value='{{inputs.parameters.para-launch-workspace-3}}')) # Shared volume size

        body = onepanel.core.api.CreateWorkspaceBody(
            parameters=params,
            workspace_template_uid = '{{inputs.parameters.para-launch-workspace-4}}', # Workspace template UID
        ) 
        try:
            workspace = api_instance.create_workspace(namespace, body)
            uid = workspace.uid
            print('Waiting for workspace to launch...\n')
            while workspace.status.phase != 'Running':
                workspace = api_instance.get_workspace(namespace, uid)
                sleep(15)
            print('Workspace is running at %s.\n' % workspace.url)

            output_dir = '{{inputs.parameters.para-launch-workspace-5}}'

            # Create output directories
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)

            # Write the data to the file so we can use it as a parameter in the workflow
            with open(output_dir + "/task.txt", "w+") as f:
                f.write(uid)

        except ApiException as e:
            print("Exception when calling WorkspaceServiceApi->create_workspace: %s\n" % e)


def launch_workspace(path, volume_claim_name, size):
    output = argo.create_local_artifact(
        path=path,
    )
    volume_claim_name = volume_claim_name
    pvc = VolumeClaimTemplate(volume_claim_name, size=size)
    volume_mount = VolumeMount(volume_claim_name, path)
    argo.create_workflow_volume(pvc) # VolumeClaims must be registered into the main workflow template

    argo.run_script(
        step_name = "launch-workspace",
        image = "python:3.7-alpine",
        command = ['python', '-u'],
        output = output,
        args = [workspace_name,
                workspace_node_pool,
                data_volume_size,
                share_volume_size,
                workspace_template_uid,
                path],
        volume_mounts = [volume_mount],
        source = _launch_workspace
    )

## Downloading Data from Object Storage

In [4]:
def _download_data():
    import os
    import http.client
    import json

    # Get mounted service account token to use as API Key
    with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:
        token = f.read()

    output_dir = '{{inputs.parameters.para-download-data-0}}'
    with open(output_dir + "/task.txt", "r+") as f:
        workspace_uid = f.read()

    endpoint = workspace_uid + '.' + os.getenv('ONEPANEL_RESOURCE_NAMESPACE')+ '.svc.cluster.local:8888'
    conn = http.client.HTTPConnection(endpoint)
    headers = {
    'onepanel-auth-token': token,
    'onepanel-username': 'default',
    'Content-Type': 'application/json',
    }


    payload = {
    'action': 'download',
    'path': '{{inputs.parameters.para-download-data-1}}',
    'delete': False,
    'prefix': '{{inputs.parameters.para-download-data-2}}'
    }

    conn.request("POST", "/sys/filesyncer/api/sync", json.dumps(payload), headers)
    res = conn.getresponse()
    data = res.read()

    data_json = json.loads(data)

    print('done')


def download_data():
    output = argo.create_local_artifact(
        path = path
    )

    argo.run_script(
        step_name = "download-data",
        image = "python:3.7-alpine",
        command = ['python', '-u'],
        output = output,
        args = [path,
                workspace_data_path,
                object_storage_data_path],
        source = _download_data
    )

## Creating a CVAT Task

In [5]:
def _create_cvat_task():
    import os
    import http.client
    import json

    # Get mounted service account token to use as API Key
    with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:
        token = f.read()

    output_dir = '{{inputs.parameters.para-create-cvat-task-0}}'
    with open(output_dir + "/task.txt", "r+") as f:
        workspace_uid = f.read()

    endpoint = workspace_uid + '.' + os.getenv('ONEPANEL_RESOURCE_NAMESPACE')+ '.svc.cluster.local:8080'
    conn = http.client.HTTPConnection(endpoint)
    headers = {
        'onepanel-auth-token': token,
        'onepanel-username': 'default',
        'Content-Type': 'application/json',
    }

    task = {}
    task['name'] = '{{inputs.parameters.para-create-cvat-task-1}}'
    task['labels'] = json.loads('{{inputs.parameters.para-create-cvat-task-2}}')
    conn.request("POST", "/api/v1/tasks", json.dumps(task), headers)
    res = conn.getresponse()
    data = res.read()

    data_json = json.loads(data)

    output_dir = '{{inputs.parameters.para-create-cvat-task-3}}'

    # Write the data to the file so we can use it as a parameter in the workflow
    with open(output_dir + "/task.txt", "w+") as f:
        f.write(str(data_json['id']))

    print('CVAT Task created.\n')


def create_cvat_task():
    output = argo.create_local_artifact(
        path = path
    )

    argo.run_script(
        step_name = "create-cvat-task",
        image = "python:3.7-alpine",
        command = ['python', '-u'],
        output = output,
        args = [path,
                cvat_task_name,
                cvat_task_labels,
                path],
        source = _create_cvat_task
    )

## Uploading Data into CVAT Task

In [6]:
def _set_cvat_task_data():
    import os
    import sys
    import http.client
    import json
    import urllib.parse
    import time

    # Get mounted service account token to use as API Key
    with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:
        token = f.read()

    print('Processing files...\n')

    workspace_uid = "-".join('{{inputs.parameters.para-set-cvat-task-data-0}}'.lower().split())

    workspace_url = workspace_uid + '.' + os.getenv('ONEPANEL_RESOURCE_NAMESPACE')+ '.svc.cluster.local:8888'

    conn = http.client.HTTPConnection(workspace_url)
    headers = {
    'onepanel-auth-token': token,
    'onepanel-username': 'default',
    'Content-Type': 'application/json',
    }


    ready = False
    while not ready:
        time.sleep(5)
        conn.request('GET', '/sys/filesyncer/api/status', "", headers)
        res = conn.getresponse()
        data = res.read()
        if data is not None:
            try:
                download_status = json.loads(data)
                if download_status['LastDownload'] is not None:
                    ready = True
            except Exception as e:
                pass


    workspace_url = workspace_uid + '.' + os.getenv('ONEPANEL_RESOURCE_NAMESPACE')+ '.svc.cluster.local:8080'
    conn = http.client.HTTPConnection(workspace_url)

    conn.request("GET", "/api/v1/tasks", headers=headers)
    res = conn.getresponse()
    data = res.read()

    data_json = json.loads(data)

    output_dir = '{{inputs.parameters.para-set-cvat-task-data-1}}'

    with open(output_dir + "/task.txt", "r+") as f:
        task_id = f.read()

    data_url = f'/api/v1/tasks/{task_id}/data'

    file_path = '/data/'

    if file_path and file_path[0] == '/':
        file_path = file_path[1:]

    if file_path and file_path[-1] == '/':
        file_path = file_path[:-1]

    file_path = '/' + file_path + '/'

    if file_path == '//':
        file_path = '/'


    params = urllib.parse.urlencode({
      'image_quality': 70, 
      'use-zip-chunks': True, 
      'server_files[0]': file_path
    })

    headers = {
      'onepanel-auth-token': token,
      'onepanel-username': 'default',
      'Content-type': 'application/x-www-form-urlencoded',
    }

    # Sleep for a little bit to make sure cvat finishes any processing
    time.sleep(60)
    conn.request('POST', data_url, params, headers)
    res = conn.getresponse()
    data = res.read()


    status_check_url = f'/api/v1/tasks/{task_id}/status?scheme=json'
    ready = False
    while not ready:
        time.sleep(5)
        conn.request('GET', status_check_url, "", headers)
        res = conn.getresponse()
        data = res.read()
        if data is not None:
            try:
                status = json.loads(data)

                if status['state'] is not None and status['state'] == 'Finished':
                    ready = True
            except Exception as e:
                pass

    print('Finished processing files\n')


def set_cvat_task_data():
    argo.run_script(
        step_name = "set-cvat-task-data",
        image = "python:3.7-alpine",
        command = ['python', '-u'],
        args = [workspace_name,
                path],
        source = _set_cvat_task_data
    )

In [7]:
def dag():
    argo.set_dependencies(lambda: launch_workspace(path, volume_claim_name, size), dependencies=None)
    argo.set_dependencies(lambda: download_data(), dependencies=['launch-workspace'])
    argo.set_dependencies(lambda: create_cvat_task(), dependencies=['download-data'])
    argo.set_dependencies(lambda: set_cvat_task_data(), dependencies=['create-cvat-task'])

In [8]:
argo._cleanup() # Required to clear global variables to run dag multiple times
dag() # Generate DAG

In [9]:
submitter = Submitter(workflow_name='auto-cvat-noks')

INFO:root:Onepanel configuration detected
INFO:root:Initialized


In [10]:
argo.run(submitter) # Execute Workflow

INFO:root:Workflow Template updated
INFO:root:Workflow Executed
