In [16]:
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.compiler as compiler
from kfp import components
from kubernetes import client as k8s_client

In [34]:
def git_clone_op(repo_url, volume_op):
    #Volume for storing Git repository (PV)
    # volume_op = dsl.VolumeOp(
    #     name='create pipeline volume',
    #     resource_name='pipeline-pvc',
    #     modes=['ReadWriteMany'],
    #     size="1Gi"
    # )

    image = 'alpine/git:latest'

    commands = [
        f"git clone {repo_url} /workspace/vq-vae_1d",
        "pwd",
        'ls',
    ]

    # Mount Git repository in /workspace
    op = dsl.ContainerOp(
        name='git clone',
        image=image,
        command=['sh'],
        arguments=['-c', ' && '.join(commands)],
        container_kwargs={'image_pull_policy': 'IfNotPresent'},
        pvolumes={'/workspace': volume_op.volume}
    )

    return op

In [43]:
@dsl.pipeline(
    name='vq-vae pipeline',
    description='A pipeline to train vq-vae'
)
def vae_pipeline(data_type='gaze', f_name='vq_vae', num_hiddens=128, num_residual_hiddens=32,
                num_residual_layers=2, embedding_dim=8, num_embeddings=128,
                commitment_cost=0.25, decay=0.99, epoch=500,
                lr = 1e-3, batch_size=32,
                repo_url='http://OHMORI:house101@zaku.sys.es.osaka-u.ac.jp:10080/OHMORI/vq-vae_1d.git'):
    #secret setting for pipeline
    dsl.get_pipeline_conf().set_image_pull_secrets([k8s_client.V1LocalObjectReference(name="regcred")])

    # Volume for storing Git repository (PV)
    volume_op = dsl.VolumeOp(
        name='create pipeline volume',
        resource_name='pipeline-pvc',
        modes=['ReadWriteMany'],
        size="1Gi"
    )

    git_clone = git_clone_op(repo_url=repo_url, volume_op=volume_op)

    # commands = [
    #     "ls",
    #     # "pwd",
    #     # 'cd /home/jovyan/vq-vae_1d/data',
    #     # "ls",
    #     # "cd /home/jovyan/vq-vae_1d",
    #     # "ls",
    #     "pwd"
    # ]

    train=dsl.ContainerOp(
        name='VQ-VAE',
        image='zaku.sys.es.osaka-u.ac.jp:10081/ohmori/vq-vae_1d:v0.0.0',
        # command=['sh'],
        # arguments=['-c', ' && '.join(commands)],
        command=['python3', 'train.py'],
        arguments=[
            '--data_type', data_type,
            '--f_name', f_name,
            '--num_hiddens', num_hiddens,
            '--num_residual_hiddens', num_residual_hiddens,
            '--num_residual_layers', num_residual_layers,
            '--embedding_dim', embedding_dim,
            '--num_embeddings', num_embeddings,
            '--commitment_cost', commitment_cost,
            '--decay', decay,
            '--epoch', epoch,
            '--lr', lr,
            '--batch_size', batch_size
        ],
        pvolumes={'/home/jovyan': git_clone.pvolume},
        # output Tensorboard, 上手く動作しない
        file_outputs={
            'MLPipeline UI metadata': '/mlpipeline-ui-metadata.json'
        }
    ).set_gpu_limit(gpu=1, vendor='nvidia')

    # train.add_pvolumes({'/workspace': git_clone.pvolume})
    # train.add_volume_mount(volume_mount=k8s_client.V1VolumeMount(mount_path='/home/jovyan',name=git_clone.name))

    #set nfs information and mount nfs on Container(named train)
    nfs_volume_source = k8s_client.V1NFSVolumeSource(server="192.168.11.107", path="/data/share/data")
    train.add_volume(k8s_client.V1Volume(name='data', nfs=nfs_volume_source))
    train.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/home/jovyan/vq-vae_1d/data',name='data'))

    nfs_volume_source = k8s_client.V1NFSVolumeSource(server="192.168.11.107", path="/data/share/pth")
    train.add_volume(k8s_client.V1Volume(name='pth', nfs=nfs_volume_source))
    train.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/home/jovyan/vq-vae_1d/script/pth',name='pth'))

    nfs_volume_source = k8s_client.V1NFSVolumeSource(server="192.168.11.107", path="/data/share/logs")
    train.add_volume(k8s_client.V1Volume(name='logs', nfs=nfs_volume_source))
    train.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/home/jovyan/vq-vae_1d/script/logs',name='logs'))

    train.add_volume(k8s_client.V1Volume(name="dshm", empty_dir=k8s_client.V1EmptyDirVolumeSource(medium="Memory")))
    train.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/dev/shm',name='dshm'))

    #set workdirectory
    train.container.working_dir = "/home/jovyan/vq-vae_1d/script"
    
    print(train.pvolume)
    print(dir(train.pvolume))

    #delete PV?
    volume_op.delete().after(train)
    # dsl.ResourceOp(
    #     name='delete-volume',
    #     # k8s_resource=volume_op.k8s_resource,
    #     k8s_resource=train.pvolume.k8s_resource,
    #     action='delete'
    # ).after(train)


In [44]:
compiler.Compiler().compile(vae_pipeline, 'vq-vae-pipeline.zip')

{'aws_elastic_block_store': None,
 'azure_disk': None,
 'azure_file': None,
 'cephfs': None,
 'cinder': None,
 'config_map': None,
 'csi': None,
 'downward_api': None,
 'empty_dir': None,
 'fc': None,
 'flex_volume': None,
 'flocker': None,
 'gce_persistent_disk': None,
 'git_repo': None,
 'glusterfs': None,
 'host_path': None,
 'iscsi': None,
 'name': 'create-pipeline-volume',
 'nfs': None,
 'persistent_volume_claim': {'claim_name': '{{pipelineparam:op=create pipeline '
                                           'volume;name=name}}',
                             'read_only': None},
 'photon_persistent_disk': None,
 'portworx_volume': None,
 'projected': None,
 'quobyte': None,
 'rbd': None,
 'scale_io': None,
 'secret': None,
 'storageos': None,
 'vsphere_volume': None}
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '_

In [33]:
dsl.VolumeOp.pvolume?

Object `dsl.VolumeOp.pvolume` not found.


In [28]:
dir(dsl.VolumeOp)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_validate_memory_string',
 'add_affinity',
 'add_init_container',
 'add_node_selector_constraint',
 'add_pod_annotation',
 'add_pod_label',
 'add_sidecar',
 'add_toleration',
 'add_volume',
 'after',
 'apply',
 'attrs_with_pipelineparams',
 'delete',
 'inputs',
 'resource',
 'set_display_name',
 'set_retry',
 'set_timeout']

In [32]:
dsl.VolumeOp.delete?

[0;31mSignature:[0m [0mdsl[0m[0;34m.[0m[0mVolumeOp[0m[0;34m.[0m[0mdelete[0m[0;34m([0m[0mself[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m Returns a ResourceOp which deletes the resource.
[0;31mFile:[0m      /opt/conda/lib/python3.7/site-packages/kfp/dsl/_resource_op.py
[0;31mType:[0m      function


In [26]:
k8s_client.V1Volume?

[0;31mInit signature:[0m
[0mk8s_client[0m[0;34m.[0m[0mV1Volume[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0maws_elastic_block_store[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mazure_disk[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mazure_file[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mcephfs[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mcinder[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mconfig_map[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mcsi[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mdownward_api[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mempty_dir[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mfc[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mflex_volume[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0

In [25]:
dsl.VolumeOp.k8s_resource?

Object `dsl.VolumeOp.k8s_resource` not found.


In [17]:
dsl.ResourceOp?

[0;31mInit signature:[0m
[0mdsl[0m[0;34m.[0m[0mResourceOp[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mk8s_resource[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0maction[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;34m'create'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmerge_strategy[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msuccess_condition[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mfailure_condition[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mattribute_outputs[0m[0;34m:[0m [0mDict[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mstr[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0;34m**[0m[0mkwargs[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m      Represents an op which will be translated in

In [18]:
dsl.ContainerOp.add_pvolumes?

[0;31mSignature:[0m
[0mdsl[0m[0;34m.[0m[0mContainerOp[0m[0;34m.[0m[0madd_pvolumes[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mself[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpvolumes[0m[0;34m:[0m [0mDict[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mkubernetes[0m[0;34m.[0m[0mclient[0m[0;34m.[0m[0mmodels[0m[0;34m.[0m[0mv1_volume[0m[0;34m.[0m[0mV1Volume[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Updates the existing pvolumes dict, extends volumes and volume_mounts
and redefines the pvolume attribute.

Args:
    pvolumes: Dictionary. Keys are mount paths, values are Kubernetes
              volumes or inherited types (e.g. PipelineVolumes).
[0;31mFile:[0m      /opt/conda/lib/python3.7/site-packages/kfp/dsl/_container_op.py
[0;31mType:[0m      function


In [20]:
dir(dsl.VolumeOp)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_validate_memory_string',
 'add_affinity',
 'add_init_container',
 'add_node_selector_constraint',
 'add_pod_annotation',
 'add_pod_label',
 'add_sidecar',
 'add_toleration',
 'add_volume',
 'after',
 'apply',
 'attrs_with_pipelineparams',
 'delete',
 'inputs',
 'resource',
 'set_display_name',
 'set_retry',
 'set_timeout']

In [23]:
dir(dsl.VolumeOp.delete())

TypeError: delete() missing 1 required positional argument: 'self'