In [None]:
# Workflow with artifacts

This Notebook aim to show how data can be downloaded from Minio and between containers of same workflow.
This notebook can be launched only after the upload of some data in Notebook 1 (Minio example).
The workflow created is composed by two steps:
- initialization: load data from S3 bucket in minio (and automatically decompress them in container)
- lisitings: share data of initialization container and show files
    
All artifacts are moved between steps of the workload by mean of compression and saving in the Minio storage.

In [1]:
# 1) Upload subject data into the S3 repository
# 2) First container get data from S3 and unpack it in a directory called /subjects/<subject_id>
# 3) First container exports data contained in /subjects
# 4) Next container uses data from /subjects of previous

In [4]:
from minio import Minio
from minio.error import ResponseError

minioClient = Minio('localhost:9000',
                  access_key='minioadmin',
                  secret_key='minioadmin',
                  secure=False)

In [5]:
from minio.error import BucketAlreadyOwnedByYou, BucketAlreadyExists, ResponseError

# Make a bucket with the make_bucket API call.
try:
    minioClient.make_bucket("subjects-data", location="us-east-1")
except BucketAlreadyOwnedByYou as err:
    pass
except BucketAlreadyExists as err:
    print("Bucket already exists")
    pass
except ResponseError as err:
    raise

In [6]:
# Put an object 'pumaserver_debug.log' with contents from 'pumaserver_debug.log'.
try:
    minioClient.fput_object('subjects-data', 'controllo-0040013.tar.gz', '/home/michele/Projects/rmi-pipelines/data/controllo-0040013.tar.gz')
    minioClient.fput_object('subjects-data', 'controllo-0040014.tar.gz', '/home/michele/Projects/rmi-pipelines/data/controllo-0040014.tar.gz')
    minioClient.fput_object('subjects-data', 'controllo-0040017.tar.gz', '/home/michele/Projects/rmi-pipelines/data/controllo-0040017.tar.gz')
    minioClient.fput_object('subjects-data', 'controllo-0040018.tar.gz', '/home/michele/Projects/rmi-pipelines/data/controllo-0040018.tar.gz')
except ResponseError as err:
    print(err)

In [7]:
# Create a workflow to download a subject into a local file of a container
# The container unpack it and list files, than it share the directory with
# the next container that list the files as well

In [37]:
# Create a Workflow with a single task

from argo.workflows.client.models import *

from kubernetes.client.models import V1SecretKeySelector
access_key_secret = V1SecretKeySelector(name="s3-secret", key="accessKey")
secret_key_secret = V1SecretKeySelector(name="s3-secret", key="secretKey")

spec = V1alpha1WorkflowSpec(
    entrypoint="steps", 
#    volumes=[V1Volume(
#        name="workdir",
#        empty_dir=V1EmptyDirVolumeSource()
#    )],
    templates=[
        V1alpha1Template(
            name="steps",
            steps=[
                [
                    V1alpha1WorkflowStep(name="initialization", template="init")
                ],
                [
                    V1alpha1WorkflowStep(name="listings", template="listing", arguments=V1alpha1Arguments(
                        artifacts=[
                            V1alpha1Artifact(name="message", _from="{{steps.initialization.outputs.artifacts.message}}")
                        ]
                    ))
                ]
            ]
            #,
            #archive_location=V1alpha1ArtifactLocation(s3=V1alpha1S3Artifact(
            #    endpoint="minio.default:9000",
            #    insecure=True,
            #    bucket="subjects-data",
            #    access_key_secret=access_key_secret,
            #    secret_key_secret=secret_key_secret,
            #    key="message-stocazzo"
            #))
        ),       
        V1alpha1Template(
            name="init", 
            container=V1Container(
                name="init", 
                image="alpine", 
                command=["sh", "-c", "cd /subjects/0040013 ; ls -lh ; echo 'michele' > /test.txt ; sleep 5 ; ls / ; cat /test.txt"],
                args=[]
            ),
            inputs=V1alpha1Inputs(artifacts=[
                V1alpha1Artifact(name="subject", path="subjects/0040013", s3=V1alpha1S3Artifact(
                    bucket="subjects-data",
                    endpoint="minio.default:9000",
                    insecure=True,
                    access_key_secret=access_key_secret,
                    secret_key_secret=secret_key_secret,
                    key="controllo-0040013.tar.gz"))
            ]),
            outputs=V1alpha1Outputs(artifacts=[
                V1alpha1Artifact(name="message", path="/subjects")
            ])
            #,
            #archive_location=V1alpha1ArtifactLocation(s3=V1alpha1S3Artifact(
            #    endpoint="minio.default:9000",
            #    insecure=True,
            #    bucket="subjects-data",
            #    access_key_secret=access_key_secret,
            #    secret_key_secret=secret_key_secret,
            #    key="message-test"
            #))
        ),
        V1alpha1Template(
            name="listing", 
            container=V1Container(
                name="listing", 
                image="alpine", 
                command=["sh", "-c", "cd /subjects/0040013 ; ls -lh"],
                args=[]
            ),
            inputs=V1alpha1Inputs(artifacts=[
                V1alpha1Artifact(name="message", path="/subjects")
            ])
            #,
            #archive_location=V1alpha1ArtifactLocation(s3=V1alpha1S3Artifact(
            #    endpoint="minio.default:9000",
            #    insecure=True,
            #    bucket="subjects-data",
            #    access_key_secret=access_key_secret,
            #    secret_key_secret=secret_key_secret,
            #    key="message-stocazzo"
            #))
        )
    ])

workflow = V1alpha1Workflow(
    api_version="argoproj.io/v1alpha1",
    kind="Workflow",
    metadata=V1ObjectMeta(generate_name="s3-artifacts-"), 
    spec=spec,
    status=V1alpha1WorkflowStatus())

In [38]:
from argo.workflows.client import V1alpha1Api
from argo.workflows.config import load_kube_config

load_kube_config()  # loads local configuration from ~/.kube/config

v1alpha1 = V1alpha1Api()

v1alpha1.create_namespaced_workflow('default', workflow)

{'api_version': 'argoproj.io/v1alpha1',
 'kind': 'Workflow',
 'metadata': {'annotations': None,
              'cluster_name': None,
              'creation_timestamp': datetime.datetime(2019, 12, 26, 23, 54, 47, tzinfo=tzutc()),
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': 's3-artifacts-',
              'generation': 1,
              'initializers': None,
              'labels': None,
              'managed_fields': None,
              'name': 's3-artifacts-k84dr',
              'namespace': 'default',
              'owner_references': None,
              'resource_version': '715554',
              'self_link': '/apis/argoproj.io/v1alpha1/namespaces/default/workflows/s3-artifacts-k84dr',
              'uid': '6147ffba-1512-4d0e-ba32-8c89cfe4754a'},
 'spec': {'active_deadline_seconds': None,
          'affinity': None,
          'arguments': None,
          'artifact_reposi