# Submit Kubeflow Pipelines as `ScheduledWorkflow`s
This notebook shows examples of submitting Kubeflow [`ScheduledWorkflow`s](https://github.com/kubeflow/pipelines/blob/0.1.31/backend/src/crd/pkg/apis/scheduledworkflow/register.go) (SWFs) that run [Kubeflow Pipelines](https://www.kubeflow.org/docs/pipelines/) (KFPs).

It contains helper-functions for easy fetching and wrapping of KFPs in SWFs, as well as [examples demonstrating these capabilities](#examples).

## Wrapping a Kubeflow Pipeline in a `ScheduledWorkflow`
Conceptually, a "scheduled" Kubeflow Pipeline must take exactly one argument: the timestamp it was scheduled to run at. Any other parameters it normally accepts must be [partially-applied](https://en.wikipedia.org/wiki/Partial_application) away (i.e. assigned specific, concrete values), leaving just the timestamp parameter to be filled by the SWF machinery on each pipeline instantiation. 

## 

The timestamp passed by the SWF controller to each KFP instance it spawns will generally be one of two types:
- a timestamp close to the current time (when the SWF is caught up to the present, and spawning KFPs according to a `crontab`-style schedule or fixed interval)
- a timestamp in the past (when the SWF is created with a "start" time in the past, and is back-filling KFP runs to catch up to the present)


## TODOs
- [ ] factor out helpers
- tests:
  - example pipelines:
      - [ ] `gs://ml-pipeline-playground/coin.tar.gz`
      - [ ] [`https://storage.googleapis.com/ml-pipeline-playground/coin.tar.gz`](https://storage.googleapis.com/ml-pipeline-playground/coin.tar.gz)
      - [x] [`https://raw.githubusercontent.com/kubeflow/pipelines/0.1.31/samples/core/condition/condition.py`](https://raw.githubusercontent.com/kubeflow/pipelines/0.1.31/samples/core/condition/condition.py)
      - [ ] `component.yaml` example(s) (TODO: which one(s)?)
      - [ ] Bitcoin BigQuery monthly rollups
      - [ ] Current Weather (OpenWeather)
      - [ ] Stock movements (IEX API)
  - scheduling:
      - [ ] intervals
      - [x] start/end times
      - [ ] updating/clearing time-bounds
- [x] `!pip install` requirements
- [ ] (optionally) pass scheduled time to pipeline
- [ ] verify provided pipelines take no arguments (other than e.g. scheduled datetime)
- [ ] parameterize notebook w/ `papermill`
- [ ] support wrapping `component.yaml`s into Pipelines and running
  - [ ] verify no extra arguments to provided components
- [ ] publish runner container publicly
- [ ] publish SWF+KFP YAML template

## Install KFP SDK

In [None]:
!pip3 install kfp==0.1.31 --upgrade

In [479]:
from pkg_resources import get_distribution
kfp_version = get_distribution("kfp").version

# Helper Functions
Below are utilities for:
- [fetching/loading pipelines](#Pipeline-fetching/loading)
- [parsing their YAML definitions](#Pipeline-spec-accessors)
- [building `ScheduledWorkflow` resources](#ScheduledWorkflow-builders)
- [`import`ing remote `.py` files](#importing-remote-modules)

## Pipeline fetching/loading

In [470]:
def url_to_stream(url):
    """Return a file-like-object from a local or remote ("gs" or "https?") URL"""
    from urllib.parse import urlparse
    parsed = urlparse(url)
    scheme = parsed.scheme
    if not scheme:
        return open(url, 'rb')
    elif scheme == 'http' or scheme == 'https':
        from urllib.request import urlopen
        return urlopen(url)
    elif scheme == 'gs':
        from google.cloud import storage
        gcs = storage.Client()
        bucket_name = parsed.hostname
        bucket = gcs.get_bucket(bucket_name)
        path = parsed.path
        blob = bucket.blob(path)
        from tempfile import NamedTemporaryFile
        with NamedTemporaryFile() as f:
            blob.download_to_file(f.name)
            return open(f, 'rb')
    else:
        raise Exception("Unsure how to handle URL scheme '%s' (%s)" % (scheme, url))

def url_to_bytes(url):
    """Return the contents of a local or remote ("gs" or "https?") URL"""
    with url_to_stream(url) as f:
        return f.read()

In [415]:
def try_extract_pipeline_tar(bytes):
    """Attempt to parse `bytes` as a TAR archive and extract a `pipeline.yaml`"""
    import tarfile
    from tarfile import TarError
    try:
        from io import BytesIO
        with tarfile.open(fileobj=BytesIO(bytes), mode='r') as f:
            names = f.getnames()
            if names == ['pipeline.yaml']:
                tar_info = f.getmember('pipeline.yaml')
                if tar_info.isfile():
                    return f.extractfile(tar_info).read()
                raise Exception('"pipeline.yaml" in TAR archive is not a regular file')
            raise Exception('Expected TAR archive to contain only a "pipeline.yaml"; found %s' % names)
    except TarError:
        return None

In [416]:
def try_extract_pipeline_zip(bytes):
    """Attempt to parse `bytes` as a ZIP archive and extract a `pipeline.yaml`"""
    from zipfile import BadZipfile, ZipFile
    try:
        from io import BytesIO
        with ZipFile(BytesIO(bytes), mode='r') as f:
            names = f.namelist()
            if names == ['pipeline.yaml']:
                return f.read('pipeline.yaml')
            raise Exception('Expected ZIP archive to contain only a "pipeline.yaml"; found %s' % names)
    except BadZipFile:
        return None

In [417]:
def is_pipeline_func(pipeline):
    return \
        hasattr(pipeline, '__call__') and \
        hasattr(pipeline, '_pipeline_name') and \
        hasattr(pipeline, '_pipeline_description')

In [428]:
def load_pipeline(pipeline):
    """Load a pipeline's spec as a dict
    
    Input pipeline can be:
    - a @dsl.pipeline function (in which case it is compiled to YAML which is then parsed)
    - a local or remote ("gs" and "https?" schemes supported) YAML file (or ZIP or TAR archive containing a pipeline.yaml)
    """
    import yaml
    if is_pipeline_func(pipeline):
        from kfp.compiler import Compiler
        compiler = Compiler()
        pipeline_yaml = compiler.compile(pipeline, package_path=None)
        return yaml.safe_load(pipeline_yaml)
    bytes = url_to_bytes(pipeline)
    pipeline_yaml = try_extract_pipeline_tar(bytes)
    if pipeline_yaml is None:
        pipeline_yaml = try_extract_pipeline_zip(bytes)
        if pipeline_yaml is None:
            pipeline_yaml = bytes
    return yaml.safe_load(pipeline_yaml)

In [469]:
def load_yaml(path):
    """Load YAML from a local or remote URL"""
    import yaml
    return yaml.safe_load(url_to_bytes(path))

## Pipeline-spec accessors
Utilities for pulling various fields out of a Kubeflow Pipeline's spec.

In [420]:
def get_metadata(pipeline):
    if 'metadata' not in pipeline:
        raise Exception('No "metadata" found in pipeline')
    return pipeline['metadata']

In [421]:
def get_annotations(pipeline):
    metadata = get_metadata(pipeline)
    if 'annotations' not in metadata:
        raise Exception('No "annotations" found in pipeline "metadata"')
    annotations = metadata['annotations']
    if 'pipelines.kubeflow.org/pipeline_spec' not in annotations:
        raise Exception('"pipelines.kubeflow.org/pipeline_spec" not found in pipeline metadata.annotations')
    annotations = json.loads(annotations['pipelines.kubeflow.org/pipeline_spec'])
    return annotations['name'], annotations['description']

In [422]:
def get_name(pipeline):
    metadata = get_metadata(pipeline)

    if 'generateName' not in metadata:
        raise Exception('No "generateName" found in pipeline metadata')

    name = metadata['generateName']
    if name[-1] == '-':
        name = name[:-1]

    return name

In [423]:
def get_description(pipeline):
    (_, description) = get_annotations(pipeline)
    return description

## `ScheduledWorkflow` builders

### Load template `ScheduledWorkflow`+KFP YAML
To construct `ScheduledWorkflow`s below, we start with this template and fill in a few missing fields (pipeline spec, name, and description, as well as any other parameter overrides the user provides).

In [448]:
SWF_TEMPLATE_PATH = '/Users/ryan/c/pipelines/backend/src/crd/samples/scheduledworkflow/kfp.yaml'
SWF_TEMPLATE = load_yaml(SWF_TEMPLATE_PATH)

### Generate `ScheduledWorkflow`s for running Kubeflow Pipelines
`make_swf_kfp` generates the spec for a `ScheduledWorkflow` resource that will run a given KFP on a desired schedule.

#### Parameters
| Type | Name | Description |
| :-:| :-:| :---|
| `pipeline` | `str` | `dsl.pipeline` function or path to a file (or ZIP or TAR archive) containing the pipeline's YAML specification (paths can be to local files or `gs`- or `http`-schemed URLs). |
| `name` | `str` | Name of the ScheduledWorkflow resource to create; constructed from the underlying pipeline's name by default. |
| `description` | `str` | Description of the ScheduledWorkflow resource to create; constructed from the underlying pipeline's description by default. |
| `cron` | `str` | Crontab-formatted string specifying the schedule the pipeline should be run on; if neither `cron` nor `intervalSecond` is provided, the `DEFAULT_CRON_SCHEDULE` above is used. At most one of `cron` and `intervalSecond` should be provided. |
| `intervalSecond` | `int` | Interval at which to trigger runs of the provided pipeline. At most one of `cron` and `intervalSecond` should be provided. |
| `start` | `datetime` \| `str` | If provided, begin scheduling pipelines at this date+time (UTC is assumed if timezone is not made explicit) |
| `end` | `datetime` \| `str` | If provided, stop scheduling pipelines at this date+time (UTC is assumed if timezone is not made explicit) |
| `maxHistory` | `int` | Maximum number of pipeline runs' histories to store (default: 10) |
| `maxConcurrency` | `int` | Maximum number of concurrent runs of the pipeline (default: 10) |
| `enabled` | `bool` | Whether the generated ScheduledWorkflow should be enabled when it is created (default: True) |


In [462]:
DEFAULT_CRON_SCHEDULE = "1 * * * * *"
def make_swf_kfp(
    pipeline, 
    name=None, description=None, 
    cron=None, intervalSecond=None, 
    start=None, end=None,
    maxHistory=10, maxConcurrency=10, enabled=True
):
    """Create a ScheduledWorkflow resource that will run a given pipeline on a desired schedule.
    
    :param str pipeline @dsl.pipeline function or path to a file (or ZIP or TAR archive) containing the pipeline's YAML specification (paths can be to local files or "gs"- or "http"-schemed URLs).
    :param str name Name of the ScheduledWorkflow resource to create; constructed from the underlying pipeline's name by default.
    :param str description Description of the ScheduledWorkflow resource to create; constructed from the underlying pipeline's description by default.
    :param str cron Crontab-formatted string specifying the schedule the pipeline should be run on; if neither `cron` nor `intervalSecond` is provided, the `DEFAULT_CRON_SCHEDULE` above is used. At most one of `cron` and `intervalSecond` should be provided.
    :param int intervalSecond Interval at which to trigger runs of the provided pipeline. At most one of `cron` and `intervalSecond` should be provided.
    :param datetime|str start If provided, begin scheduling pipelines at this date+time (UTC is assumed if timezone is not made explicit)
    :param datetime|str end If provided, stop scheduling pipelines at this date+time (UTC is assumed if timezone is not made explicit)
    :param int maxHistory Maximum number of pipeline runs' histories to store (default: 10)
    :param int maxConcurrency Maximum number of concurrent runs of the pipeline (default: 10)
    :param bool enabled Whether the generated ScheduledWorkflow should be enabled when it is created (default: True)    
    """
    pipeline = load_pipeline(pipeline)
    
    from copy import deepcopy
    swf = deepcopy(SWF_TEMPLATE)
    
    spec = swf['spec']
    
    if name is None:
        name = get_name(pipeline)
    
    if description is None:
        description = get_description(pipeline)
    
    if (cron is not None) and (intervalSecond is not None):
        raise Exception('At most one of {"cron","interval"} should be provided; received cron %s, interval %s' % (cron, intervalSecond))
    
    if (cron is None) and (intervalSecond is None):
        cron = DEFAULT_CRON_SCHEDULE
        
    def set_date(key, dt):
        from dateutil.parser import parse
        if dt is not None:
            if isinstance(dt, str):
                dt = parse(dt)
            if dt.tzinfo is None:
                from pytz import utc
                dt = utc.localize(dt)
            schedule[key] = dt

    schedule = {}
    set_date('startTime', start)
    set_date('endTime', end)
    
    msg_parts = []  # accumulate pieces of scheduling metadata here (for inclusion in the ScheduledWorkflow's description)
    trigger = {}
    if cron is not None:
        schedule['cron'] = cron
        msg_parts.append('cron: %s' % cron)
        trigger['cronSchedule'] = schedule
    else:
        schedule['intervalSecond'] = intervalSecond
        msg_parts.append('interval: %ds' % intervalSecond)
        trigger['periodicSchedule'] = schedule
        
    spec['enabled'] = enabled
    spec['maxHistory'] = maxHistory
    spec['maxConcurrency'] = maxConcurrency
    spec['trigger'] = trigger

    if start is not None:
        msg_parts.append('start: %s' % str(start))
    if end is not None:
        msg_parts.append('end: %s' % str(end))

    spec['description'] = 'ScheduledWorkflow (%s): %s' % (', '.join(msg_parts), description)

    # Name for the SWF resource
    swf_name = 'swf-%s' % name
    spec['name'] = swf_name
    metadata = swf['metadata']
    metadata['name'] = swf_name

    workflow = spec['workflow']

    # Inline pipeline YAML into SWF YAML as a parameter to each run of the SWF, 
    # which will parse the pipeline and submit it
    parameters = workflow['parameters']
    import yaml
    parameters[1]['value'] = yaml.dump(pipeline)
    
    workflow_spec = workflow['spec']
    templates = workflow_spec['templates']
    template = templates[0]
    container = template['container']
    args = container['args']

    # populate "name" argument in template
    assert args[-2:] == [ '--name', '' ], "Unexpected final SWF args: %s" % args[-2:]
    args[-1] = name

    return swf

## `import`ing remote modules
ContextManager used for downloading (or moving) a file to a temporary location and importing from it

In [471]:
class Import(object):
    """ContextManager used for downloading (or moving) a file to a temporary location and importing from it"""
    import sys
    def __init__(self, url):
        self.url = url
        self.tmpdir = None
    
    def __enter__(self):
        from urllib.parse import urlparse
        url = self.url
        parsed = urlparse(url)
        scheme = parsed.scheme

        from tempfile import TemporaryDirectory
        self.tmpdir = TemporaryDirectory()

        from os.path import basename, join
        path = parsed.path
        name = basename(path)  # Preserve file's basename (for ease of importing from it)
        tmpdir = self.tmpdir.name
        dest = join(tmpdir, name)  # File to import from will be downloaded here
        
        if not scheme:
            # Local file: copy it to temporary directory (for consistent import-isolation semantics with remote "import"s)
            with open(url, 'rb') as src, open(dest, 'wb') as dst:
                from shutil import copyfileobj
                copyfileobj(src, dst)
        elif scheme == 'http' or scheme == 'https':
            from urllib.request import urlretrieve
            urlretrieve(url, dest)
        elif scheme == 'gs':
            from google.cloud import storage
            gcs = storage.Client()
            bucket_name = parsed.hostname
            bucket = gcs.get_bucket(bucket_name)
            key = parsed.path
            blob = bucket.blob(key)
            blob.download_to_file(dest)
        else:
            raise Exception("Unsure how to handle URL scheme '%s' (%s)" % (scheme, url))    

        # Add temporary directory to "import" path
        sys.path.append(tmpdir)

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # Close+Delete the temporary directory
        self.tmpdir.__exit__(exc_type, exc_val, exc_tb)
        tmpdir = self.tmpdir.name
        # Remove it from sys.path
        sys.path.remove(tmpdir)
        return False

# Examples
- ["Coin flip" sample](#coin-flip-sample) (via GitHub)
- ["Coin flip" sample](#coin.tar.gz) (via Google Cloud Storage)

## "Coin flip" sample
- wrap [the "coin flip" example from the Kubeflow Pipelines repo](https://github.com/kubeflow/pipelines/blob/0.1.31/samples/core/condition/condition.py) in a `ScheduledWorkflow`
- create it, let it run for a few minutes, update ("patch") it, and delete it.

### `import` pipeline definition from latest GitHub release

In [379]:
coin_flip_sample_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/%s/samples/core/condition/condition.py' % kfp_version
with Import(coin_flip_sample_url):
    # The condition.py has been downloaded to a temporary directory and placed on sys.path
    from condition import flipcoin_pipeline

### Make a `ScheduledWorkflow` wrapping the pipeline
Build a `ScheduledWorkflow` spec that runs the "coin flip" pipeline every minute, on the minute:

In [480]:
swf = make_swf_kfp(flipcoin_pipeline, cron="0 * * * * *")

### Create, Read, Update, and Delete the `ScheduledWorkflow`

#### Initialize a k8s client

In [465]:
from kubernetes import client, config
config.load_kube_config()
api = client.CustomObjectsApi()
args = dict(
    group="kubeflow.org",
    version="v1beta1",
    namespace="default",
    plural="scheduledworkflows",
)

#### Create

In [457]:
obj = api.create_namespaced_custom_object(body=swf, **args)
name = obj['metadata']['name']
name

'swf-conditional-execution-pipeline'

#### Read

In [483]:
resource = api.get_namespaced_custom_object(name=name, **args)

In [484]:
resource.keys()

dict_keys(['apiVersion', 'kind', 'status', 'spec', 'metadata'])

In [487]:
history = resource['status']['workflowHistory']; history

{'completed': [{'createdAt': '2019-09-29T21:00:07Z',
   'finishedAt': '2019-09-29T21:00:48Z',
   'index': 48,
   'name': 'swf-conditional-execution-pipeline-48-1519533253',
   'namespace': 'default',
   'phase': 'Succeeded',
   'scheduledAt': '2019-09-29T21:00:00Z',
   'selfLink': '/apis/argoproj.io/v1alpha1/namespaces/default/workflows/swf-conditional-execution-pipeline-48-1519533253',
   'startedAt': '2019-09-29T21:00:07Z',
   'uid': '16baa7fa-b4d6-4a38-8809-aa3c7588d557'},
  {'createdAt': '2019-09-29T20:11:37Z',
   'finishedAt': '2019-09-29T20:12:11Z',
   'index': 47,
   'name': 'swf-conditional-execution-pipeline-47-1536310872',
   'namespace': 'default',
   'phase': 'Succeeded',
   'scheduledAt': '2019-09-29T20:00:00Z',
   'selfLink': '/apis/argoproj.io/v1alpha1/namespaces/default/workflows/swf-conditional-execution-pipeline-47-1536310872',
   'startedAt': '2019-09-29T20:11:37Z',
   'uid': '80f3cb60-481d-4716-94ee-735690c055f6'},
  {'createdAt': '2019-09-29T20:01:00Z',
   'finishe

In [497]:
!which -a pip

/Users/ryan/.pyenv/versions/3.7.3/bin/pip
/Users/ryan/.pyenv/versions/3.7.3/bin/pip
/Users/ryan/.pyenv/shims/pip
/usr/local/bin/pip


In [494]:
!/Users/ryan/.pyenv/shims/pip install --upgrade pandas

Requirement already up-to-date: pandas in /Users/ryan/.pyenv/versions/3.7.3/lib/python3.7/site-packages (0.25.1)


In [495]:
import pandas as pd

ImportError: No module named 'pandas'

In [496]:
sys.executable

'/Users/ryan/.pyenv/versions/kf-pipelines-py3/bin/python'

In [491]:
completed = history['completed']

In [489]:
history.keys()

dict_keys(['completed'])

In [492]:
len(completed)

10

#### Update ("patch")

In [466]:
# Make any desired change to the ScheduledWorkflow body here:
swf = make_swf_kfp(flipcoin_pipeline, cron="0 0 * * * *", start='2019-09-28', end='2019-09-30 19:45')

In [467]:
# Submit new ScheduledWorkflow
api.patch_namespaced_custom_object(name=name, body=swf, **args)

{'apiVersion': 'kubeflow.org/v1beta1',
 'kind': 'ScheduledWorkflow',
 'metadata': {'creationTimestamp': '2019-09-29T19:40:27Z',
  'generation': 141,
  'labels': {'scheduledworkflows.kubeflow.org/enabled': 'true',
   'scheduledworkflows.kubeflow.org/status': 'Enabled'},
  'name': 'swf-conditional-execution-pipeline',
  'namespace': 'default',
  'resourceVersion': '231605',
  'selfLink': '/apis/kubeflow.org/v1beta1/namespaces/default/scheduledworkflows/swf-conditional-execution-pipeline',
  'uid': '384c2441-56ef-4b9f-883a-6c3b485dd8b4'},
 'spec': {'description': 'ScheduledWorkflow (cron: 0 0 * * * *, start: 2019-09-28, end: 2019-09-30 19:45): Shows how to use dsl.Condition().',
  'enabled': True,
  'maxConcurrency': 10,
  'maxHistory': 10,
  'name': 'swf-conditional-execution-pipeline',
  'trigger': {'cronSchedule': {'cron': '0 0 * * * *',
    'endTime': '2019-09-30T19:45:00+00:00',
    'startTime': '2019-09-28T00:00:00+00:00'}},
  'workflow': {'parameters': [{'name': 'datetime',
     'v

#### Delete

In [366]:
api.delete_namespaced_custom_object(name=name, body=client.V1DeleteOptions(), **args)

{'apiVersion': 'v1',
 'details': {'group': 'kubeflow.org',
  'kind': 'scheduledworkflows',
  'name': 'swf-conditional-execution-pipeline',
  'uid': '1d7b3943-795f-4b65-b67d-865c8ac10e3f'},
 'kind': 'Status',
 'metadata': {},
 'status': 'Success'}

## `coin.tar.gz`

### via `gs`-scheme URL

In [379]:
url = 'gs://ml-pipeline-playground/coin.tar.gz'

### via `storage.googleapis`

In [None]:
http = 'https://storage.googleapis.com/ml-pipeline-playground/coin.tar.gz'