In [9]:
from kfp import dsl

@dsl.container_component
def error_analysis(bucket_name: str, data_path: str, out_path: str):
    return dsl.ContainerSpec(
        image="valeriy459/error-analysis:latest",
        command=["python3", "error_analysis.py"],  # Ensure this script is in the container
        args=["--bucket_name", bucket_name, "--data_path", data_path, "--out_path", out_path],
    )

from kfp import dsl

@dsl.container_component
def prepare_data(bucket_name: str, data_path: str):
    return dsl.ContainerSpec(
        image="valeriy459/prepare-data:latest",
        command=["python3", "prepare_data.py"],  # Ensure this script is inside the container
        args=["--bucket_name", bucket_name, "--data_path", data_path],
    )
from kfp import dsl

@dsl.container_component
def train_model(bucket_name: str, train_batch_size: int, eval_batch_size: int):
    return dsl.ContainerSpec(
        image="valeriy459/train-model:latest",
        command=["python3", "train_model.py"],  # Ensure this script is inside the container
        args=[
            "--bucket_name", bucket_name,
            "--train_batch_size",train_batch_size,
            "--eval_batch_size", eval_batch_size,
        ],
    )

from kfp import dsl

@dsl.container_component
def inference(bucket_name: str, model_path: str, batch_size: int):
    return dsl.ContainerSpec(
        image="valeriy459/inference:latest",
        command=["python3", "inference.py"],  # Ensure script is inside the container
        args=[
            "--bucket_name", bucket_name,
            "--model_path", model_path,
            "--batch_size",batch_size,
        ],
    )


In [10]:
from kfp import dsl

@dsl.pipeline(name="news-classification-pipeline")
def pipeline(
    bucket_name: str = "my-bucket",
    data_path: str = "data.json",
    model_path: str = "best_model.pt",
    train_batch_size: int = 32,
    eval_batch_size: int = 200,
    inference_batch_size: int = 64,
    error_analysis_out: str = "error_analysis"
):
    # Step 1: Prepare Data
    prepare_task = prepare_data(bucket_name=bucket_name, data_path=data_path)

    # Step 2: Train Model (depends on prepared data)
    train_task = train_model(
        bucket_name=bucket_name,
        train_batch_size=train_batch_size,
        eval_batch_size=eval_batch_size
    ).after(prepare_task)

    # Step 3: Inference (depends on trained model)
    inference_task = inference(
        bucket_name=bucket_name,
        model_path=model_path,
        batch_size=inference_batch_size
    ).after(train_task)

    # Step 4: Error Analysis (depends on inference)
    error_analysis_task = error_analysis(
        bucket_name=bucket_name,
        data_path=data_path,
        out_path=error_analysis_out
    ).after(inference_task)


In [11]:
from kfp import compiler
compiler.Compiler().compile(pipeline, "pipeline.yaml")