In [1]:
import kfp
import kfp.components as comp


In [2]:
def download_ml25m_data(output_path: comp.OutputPath(str)):
    import requests
    from tqdm import tqdm
    url = 'https://files.grouplens.org/datasets/movielens/ml-25m.zip'
    response = requests.get(url, stream=True, verify=False)
    file_size = int(response.headers.get("Content-Length", 0))
    progress_bar = tqdm(total=file_size, unit="B", unit_scale=True)
    print(output_path)
    with open(output_path, 'wb') as file: 
        for chunk in response.iter_content(chunk_size=1024*2): #D
            # Update the progress bar with the size of the downloaded chunk #D
            progress_bar.update(len(chunk)) #D
            file.write(chunk)
            
def unzip_data(input_path: comp.InputPath(str), ratings_output_path: comp.OutputPath(str), movies_output_path: comp.OutputPath(str)):
    import zipfile

    with zipfile.ZipFile(input_path, 'r') as z:
        with open(ratings_output_path, 'wb') as f:
            f.write(z.read('ml-25m/ratings.csv'))
        with open(movies_output_path, 'wb') as f:
            f.write(z.read('ml-25m/movies.csv'))
    
def split_dataset(input_parquet: comp.InputPath(str), dataset_path: comp.OutputPath(str), random_state: int = 42):
    from sklearn.model_selection import train_test_split
    import os
    import pandas as pd
    train_ratio = 0.75
    validation_ratio = 0.15
    test_ratio = 0.10
    ratings_df = pd.read_parquet(input_parquet)

    # train is now 75% of the entire data set
    train, test = train_test_split(
        ratings_df,                                    
        test_size=1 - train_ratio,
        random_state=random_state)

    # test is now 10% of the initial data set
    # validation is now 15% of the initial data set
    val, test = train_test_split(   
        test,
        test_size=test_ratio / (test_ratio + validation_ratio),
        random_state=random_state)
    os.mkdir(dataset_path)
    train.to_parquet(os.path.join(dataset_path, 'train.parquet.gzip'), compression='gzip')
    test.to_parquet(os.path.join(dataset_path, 'test.parquet.gzip'), compression='gzip')
    val.to_parquet(os.path.join(dataset_path, 'val.parquet.gzip'), compression='gzip')

def csv_to_parquet(inputFile: comp.InputPath(str), output_path: comp.OutputPath(str)):
    import pandas as pd
    df = pd.read_csv(inputFile, index_col=False)
    df.to_parquet(output_path, compression='gzip') 
    
def put_to_minio(inputFile: comp.InputPath(str), upload_file_name:str='', bucket: str='datasets'):
    import boto3
    import os
    minio_client = boto3.client(                          
        's3',                                              
        endpoint_url='http://minio-service.kubeflow:9000',
        aws_access_key_id='minio',
        aws_secret_access_key='minio123') 
    try:
        minio_client.create_bucket(Bucket=bucket)
    except Exception as e:
        # Bucket already created.
        pass
    if os.path.isdir(inputFile):
        for file in os.listdir(inputFile):
            s3_path = os.path.join('ml-25m', file)
            minio_client.upload_file(os.path.join(inputFile, file), bucket, s3_path)
    else:
        if upload_file_name == '':
            _, file = os.path.split(inputFile)
        else:
            file = upload_file_name
        s3_path = os.path.join('ml-25m', file)
        minio_client.upload_file(inputFile, bucket, s3_path)
        
def qa_data(bucket:str = 'datasets', dataset:str = 'ml-25m'):
    from pyarrow import fs, parquet
    print("Running QA")
    minio = fs.S3FileSystem(
        endpoint_override='http://minio-service.kubeflow:9000',
         access_key='minio',
         secret_key='minio123',
         scheme='http')
    train_parquet = minio.open_input_file(f'{bucket}/{dataset}/train.parquet.gzip')
    df = parquet.read_table(train_parquet).to_pandas()
    assert df.shape[1] == 4
    assert df.shape[0] >= 0.75 * 25 * 1e6
    print('QA passed!')

In [3]:
download_op = comp.create_component_from_func(download_ml25m_data, output_component_file='download_ml25m_component.yaml', packages_to_install=["requests", "tqdm"])
unzip_op = comp.create_component_from_func(unzip_data, output_component_file='unizip_data.yaml')
csv_to_parquet_op = comp.create_component_from_func(csv_to_parquet, output_component_file='csv_to_paraquet.yaml', packages_to_install=["pandas", "fastparquet"])
split_dataset_op = comp.create_component_from_func(split_dataset, output_component_file='split_dataset.yaml', packages_to_install=["scikit-learn", "pandas", "fastparquet"])
upload_to_minio_op = comp.create_component_from_func(put_to_minio, output_component_file='put_to_minio.yaml', packages_to_install=["boto3"])
qa_component_op = comp.create_component_from_func(qa_data, output_component_file='qa_component.yaml', packages_to_install=["pyarrow", "pandas"])

In [4]:
import kfp.dsl as dsl
client = kfp.Client() # change arguments accordingly
@dsl.pipeline(
  name='Data prep pipeline',
  description='A pipeline that retrieves data from movielens and ingests it into paraquet files on minio'
)
def dataprep_pipeline(minio_bucket:str='datasets', random_init:int=42):
    download_dataset = download_op()
    unzip_folder = unzip_op(download_dataset.output)
    ratings_parquet_op = csv_to_parquet_op(unzip_folder.outputs['ratings_output'])
    movies_parquet_op = csv_to_parquet_op(unzip_folder.outputs['movies_output'])
    split_op = split_dataset_op(ratings_parquet_op.output,random_state=random_init)
    u1 = upload_to_minio_op(movies_parquet_op.output, upload_file_name='movies.parquet.gzip', bucket=minio_bucket)
    u2 = upload_to_minio_op(split_op.output, bucket=minio_bucket)
    qa_component_op(bucket=minio_bucket).after(u2)

# Create a pipeline run, using the client you initialized in a prior step.
kfp.compiler.Compiler().compile(
    pipeline_func=dataprep_pipeline,
    package_path='dataPrep_pipeline.yaml')

In [None]:
s = client.get_pipeline_id(name='ml-25m-processing')
if s:
    client.delete_pipeline(pipeline_id=s)
pipeline = client.pipeline_uploads.upload_pipeline('dataPrep_pipeline.yaml', name='ml-25m-processing')