In [1]:
import os
import kfp
from kfp import dsl
from kfp import onprem
from kfp import components

In [2]:
def statistics_op(input_path, output_path):
    import os
    import tempfile, urllib, zipfile
    import tensorflow_data_validation as tfdv

    BASE_DIR = tempfile.mkdtemp()
    DATA_DIR = os.path.join(BASE_DIR, 'data')
    TRAIN_DATA = os.path.join(DATA_DIR, 'train', 'data.csv')

    # Download the zip file and unzip it
    zip, headers = urllib.request.urlretrieve(input_path)
    zipfile.ZipFile(zip).extractall(BASE_DIR)
    zipfile.ZipFile(zip).close()
    
    output_path = os.path.join(output_path, 'train')
    train_stats = tfdv.generate_statistics_from_csv(data_location=TRAIN_DATA, output_path=output_path )
    print("Saved statistics to", output_path )


statistics_op = components.func_to_container_op(statistics_op, base_image='kangwoo/pipeline-tfx:0.0.1')

In [3]:
@dsl.pipeline(
    name='TFDV pipeline',
    description='TFDV pipeline'
)
def tfdv_pipeline():
    pvc_name = "pipeline-data-pvc"
    volume_name = 'pipeline-data'
    volume_mount_path = '/mnt/data'

    output_path = os.path.join(volume_mount_path, 'pipeline/chicago_taxi', '1')
    
    statistics_task = statistics_op(input_path='https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/chicago_data.zip',
                                    output_path=output_path)\
        .apply(onprem.mount_pvc(pvc_name, volume_name=volume_name, volume_mount_path=volume_mount_path))



if __name__ == '__main__':
    kfp.compiler.Compiler().compile(tfdv_pipeline, 'tfdv-pipeline.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Visualization Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'TFDV Pipeline', 'tfdv-pipeline.zip')