In [9]:
minio_url = os.environ["minio_url"]
minio_access_key = os.environ['minio_access_key']
minio_secret_key = os.environ['minio_secret_key']
minio_region = os.environ['minio_region']
minio_bucket_name = os.environ['minio_bucket_name']

competition_name = os.environ['competition_name']

kaggle_access_key = os.environ["kaggle_access_key"]
kaggle_secret_key = os.environ["kaggle_secret_key"]

pv_mount_name = os.environ['pv_mount_name']
pv_count = os.environ['pv_count']

download_from = os.environ['download_from']

In [10]:
import kfp
import kfp.dsl as dsl

from kubernetes import client as k8s_client

from istio_auth_with_client import kfp_client

In [11]:

@dsl.pipeline(
    name="mount_test",
    description="mount_test"
)
def global_warming_level_3_pipeline():
    
    
    #list 보다 dict가 낫나?
    volume_list = []
    volume_dict = dict()
    
    for pv_num in range(1, int(pv_count)+1):
        
        mount_path = "/var/source/" + pv_mount_name + str(pv_num)
        
        vop = dsl.VolumeOp(name="create_pvc_"+str(pv_num),
                   resource_name='my-pvc-'+str(pv_num),
                   size='35Gi',#실전에서는 크게 잡아야됨.
                   modes=dsl.VOLUME_MODE_RWO, #RWO : read write once
                   set_owner_reference = True # https://github.com/kubeflow/pipelines/issues/6649#issuecomment-938509228
                  )
        volume_list.append(mount_path)
        volume_dict[mount_path] = vop.volume
    
    #print("volume_list", volume_list)
    #print("volume_dict", volume_dict)
    
    step_1 = kfp.dsl.ContainerOp(
        name="download_and_extract",
        image="escs34/global_warming:level_3_12gb_v10", #command를 넣을 수 있음.. argument랑
        command=["python3"],
        arguments=[
            "/var/source/level_3_12gb_main_a.py"
        ],
        #여기서 한번에 넣으려고 했는데 그랬더니 마지막 pv만 딱 연결됨. add_pvolume으로 되나 봐야 겠음
        #=> pv_num이 아니라 pv_count로 넣어서 마지막 값만 들어감... dict에 마지막 값밖에 없었음
        pvolumes=volume_dict
        #pvolumes={'/var/source/global_warming': vop.volume}
    )
    
    
    #for key, value in volume_dict.items():
    #    print("key, value : ", key)#,value)
    #    step_1.add_pvolumes({key:value})
    
    
    #step_2 는 pv 숫자 만큼 만들어 져야 함.
    step_2_list = []
    for pv_num in range(1,int(pv_count)+1):
        mount_path = "/var/source/" + pv_mount_name + str(pv_num)
        step_2 = kfp.dsl.ContainerOp(
            name = "resize_reformat_zip_data_and_upload",
            image="escs34/global_warming:level_3_12gb_v10",
            command=["python3"],
            arguments=[
                "/var/source/level_3_12gb_main_b.py"
            ],
            #pvolumes=volume_dict
            pvolumes={'/var/source/'+pv_mount_name+str(pv_num): volume_dict[mount_path]}
        )
        step_2_list.append(step_2)
        
    #print("step_2_list : ", step_2_list)
    
    steps = [step_1] + step_2_list
    pv_count_list = [pv_count] + [str(pv_num) for pv_num in range(1, int(pv_count)+1)]
    print("pv_count_list : ",pv_count_list)
    
    for i in range(len(steps)):
        steps[i].add_env_variable(k8s_client.V1EnvVar(name='minio_url', value=minio_url)) \
            .add_env_variable(k8s_client.V1EnvVar(name='minio_access_key', value=minio_access_key)) \
            .add_env_variable(k8s_client.V1EnvVar(name='minio_secret_key', value=minio_secret_key)) \
            .add_env_variable(k8s_client.V1EnvVar(name='minio_region', value=minio_region)) \
            .add_env_variable(k8s_client.V1EnvVar(name='minio_bucket_name', value=minio_bucket_name)) \
            .add_env_variable(k8s_client.V1EnvVar(name='competition_name', value=competition_name)) \
            .add_env_variable(k8s_client.V1EnvVar(name='kaggle_access_key', value=kaggle_access_key)) \
            .add_env_variable(k8s_client.V1EnvVar(name='kaggle_secret_key', value=kaggle_secret_key)) \
            .add_env_variable(k8s_client.V1EnvVar(name='pv_mount_name', value=pv_mount_name)) \
            .add_env_variable(k8s_client.V1EnvVar(name='pv_count', value=pv_count_list[i])) \
            .add_env_variable(k8s_client.V1EnvVar(name='download_from', value=download_from))

    for pv_num in range(int(pv_count)):
        step_2_list[pv_num].after(step_1)
        
    #pod를 compelete 후 20초 뒤에 삭제 (삭제해야 pv도 삭제돼서 사용.)
    #pod 삭제되도 pv가 남아 있는데?
    dsl.get_pipeline_conf().set_ttl_seconds_after_finished(20)

    

In [12]:
from kfp import compiler

In [13]:
exp = kfp_client.create_experiment(name="global_warming_level_3")
exp = kfp_client.get_experiment(experiment_name="global_warming_level_3")

In [14]:
compiler.Compiler().compile(global_warming_level_3_pipeline, "level_3.yaml")

pv_count_list :  ['8', '1', '2', '3', '4', '5', '6', '7', '8']


In [15]:
run = kfp_client.run_pipeline(exp.id, "level_3", "level_3.yaml")