In [None]:
from google.cloud import aiplatform
from kfp import compiler, dsl, local
from kfp.dsl import Input, Output, Model, Metrics, component, Dataset

In [None]:
@component(
    base_image='python:3.11',
    packages_to_install=[
        'pandas>=2.2.3',
        'git+https://github.com/jindrvo1/blackfriday',
        'google-cloud-storage>=2.18.2',
        'fsspec>=2024.10.0',
        'gcsfs>=2024.10.0',
    ]
)
def prepare_data(
    gcs_train_data_path: str,
    gcs_test_data_path: str,
    X_train_output: Output[Dataset],
    y_train_output: Output[Dataset],
    X_val_output: Output[Dataset],
    y_val_output: Output[Dataset],
    X_test_output: Output[Dataset],
):
    from tgmblackfriday import BlackFridayDataset

    dataset = BlackFridayDataset(gcs_train_data_path, gcs_test_data_path)
    dataset.preprocess_dfs(return_res=False)

    X_train, y_train, X_val, y_val, X_test = dataset.prepare_features_and_target(test_size=0.2, shuffle=True)

    X_train.to_csv(X_train_output.path, index=False)
    y_train.to_csv(y_train_output.path, index=False)
    X_val.to_csv(X_val_output.path, index=False)
    y_val.to_csv(y_val_output.path, index=False)
    X_test.to_csv(X_test_output.path, index=False)

In [None]:
@component(
    base_image='python:3.11',
    packages_to_install=[
        'pandas>=2.2.3',
        'xgboost>=2.1.2',
        'scikit-learn>=1.5.2',
    ]
)
def train_model(
    X_train_input: Input[Dataset],
    y_train_input: Input[Dataset],
    X_val_input: Input[Dataset],
    y_val_input: Input[Dataset],
    model_output: Output[Model],
):
    import joblib
    from xgboost.sklearn import XGBRegressor
    import pandas as pd

    X_train = pd.read_csv(X_train_input.path)
    y_train = pd.read_csv(y_train_input.path)

    X_val = pd.read_csv(X_val_input.path)
    y_val = pd.read_csv(y_val_input.path)

    model = XGBRegressor(
        n_estimators=300,
        objective='reg:squarederror',
        eval_metric='rmse',
        learning_rate=0.1,
        max_depth=6,
        min_child_weight=1,
        seed=0
    )

    model = model.fit(
        X_train, y_train,
        eval_set=[(X_val, y_val)],
        verbose=50
    )

    joblib.dump(model, model_output.path)

In [None]:
@component(
    base_image='python:3.11',
    packages_to_install=[
        'pandas>=2.2.3',
        'xgboost>=2.1.2',
        'joblib>=1.4.2',
    ]
)
def predict(
    model_input: Input[Model],
    X_input: Input[Dataset],
    y_output: Output[Dataset],
):
    import joblib
    import pandas as pd

    model = joblib.load(model_input.path)

    X = pd.read_csv(X_input.path)
    y_pred = model.predict(X)

    y_pred_df = pd.DataFrame(y_pred)
    y_pred_df.to_csv(y_output.path, index=False)

In [None]:
@component(
    base_image='python:3.11',
    packages_to_install=[
        'pandas>=2.2.3',
        'scikit-learn>=1.5.2',
    ]
)
def calc_metrics(
    y_true_input: Input[Dataset],
    y_pred_input: Input[Dataset],
    metrics_output: Output[Metrics],
):
    import pandas as pd
    from sklearn.metrics import root_mean_squared_error, mean_absolute_error, mean_squared_error

    y_true = pd.read_csv(y_true_input.path)
    y_pred = pd.read_csv(y_pred_input.path)

    metrics_output.log_metric('rmse', root_mean_squared_error(y_true, y_pred))
    metrics_output.log_metric('mse', mean_squared_error(y_true, y_pred))
    metrics_output.log_metric('mae', mean_absolute_error(y_true, y_pred))

In [None]:
@dsl.pipeline()
def blackfriday_pipeline(
    gcs_train_data_path: str,
    gcs_test_data_path: str,
):
    data = prepare_data(
        gcs_train_data_path=gcs_train_data_path,
        gcs_test_data_path=gcs_test_data_path,
    )

    X_train = data.outputs['X_train_output']
    y_train = data.outputs['y_train_output']
    X_val = data.outputs['X_val_output']
    y_val = data.outputs['y_val_output']
    X_test = data.outputs['X_test_output']

    model_job = train_model(
        X_train_input=X_train,
        y_train_input=y_train,
        X_val_input=X_val,
        y_val_input=y_val,
    )

    y_val_pred_job = predict(
        model_input=model_job.outputs['model_output'],
        X_input=X_val,
    )
    y_val_pred_job.set_display_name('predict-validation')

    y_train_pred_job = predict(
        model_input=model_job.outputs['model_output'],
        X_input=X_train,
    )
    y_train_pred_job.set_display_name('predict-train')

    y_test_pred_job = predict(
        model_input=model_job.outputs['model_output'],
        X_input=X_test,
    )
    y_test_pred_job.set_display_name('predict-test')

    val_metrics = calc_metrics(
        y_true_input=y_val,
        y_pred_input=y_val_pred_job.outputs['y_output'],
    )
    val_metrics.set_display_name('validation-metrics')

    train_metrics = calc_metrics(
        y_true_input=y_train,
        y_pred_input=y_train_pred_job.outputs['y_output'],
    )
    train_metrics.set_display_name('train-metrics')

In [None]:
def run_pipeline(
    gcs_train_data_path: str,
    gcs_test_data_path: str,
    project_id: str,
    pipeline_root: str,
    region: str = 'eu-west3',
):
    package_path = 'blackfriday_pipeline.json'

    compiler.Compiler().compile(
        pipeline_func=blackfriday_pipeline,
        package_path=package_path
    )

    aiplatform.init(
        project=project_id,
        location=region
    )

    pipeline = aiplatform.PipelineJob(
        display_name=package_path.split('.')[0],
        template_path=package_path,
        pipeline_root=pipeline_root,
        parameter_values={
            'gcs_train_data_path': gcs_train_data_path,
            'gcs_test_data_path': gcs_test_data_path
        }
    )

    pipeline.run(service_account='gcs-sa@ml-spec-demo2.iam.gserviceaccount.com')

In [None]:
def run_pipeline_locally(
    gcs_train_data_path: str,
    gcs_test_data_path: str
):
    local.init(runner=local.DockerRunner())

    blackfriday_pipeline(
        gcs_train_data_path=gcs_train_data_path,
        gcs_test_data_path=gcs_test_data_path
    )


In [None]:
region = 'europe-west3'
project_id = 'ml-spec-demo2'

bucket = 'gs://blackfridaydataset'
pipeline_root = f'{bucket}/pipeline_root'

data_folder = 'source_data'
train_file = 'train.csv'
test_file = 'test.csv'

gcs_train_data_path = f'{bucket}/{data_folder}/{train_file}'
gcs_test_data_path = f'{bucket}/{data_folder}/{test_file}'

In [None]:
pipeline_kwargs = {
    'gcs_train_data_path': gcs_train_data_path,
    'gcs_test_data_path': gcs_test_data_path,
    'project_id': project_id,
    'pipeline_root': pipeline_root,
    'region': region
}

run_pipeline(**pipeline_kwargs)

In [None]:
# pipeline_kwargs = {
#     'gcs_train_data_path': gcs_train_data_path,
#     'gcs_test_data_path': gcs_test_data_path,
# }

# run_pipeline_locally(**pipeline_kwargs)