# Batch Prediction Pipeline Workaround for Reservations

Reference docs: https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-2.20.0/ 


This is a sample, some parameters may differ base on your implementation

## Create custom component that deploys a model to with a GPU reservation

In [None]:
import kfp
from kfp import dsl
from kfp.dsl import (
    Input,
    Output,
)  # Common artifact types
from google_cloud_pipeline_components.types.artifact_types import (
    VertexEndpoint,
    VertexModel,
)


@dsl.component(
    base_image="python:3.11",
    packages_to_install=["google-cloud-aiplatform", "google-cloud-pipeline-components"],
)
def create_endpoint_with_reservation(
    endpoint: Input[VertexEndpoint],
    model: str,
    deployed_name: str,
    machine_type: str,
    accelerator_type: str,
    accelerator_count: int,
    reservation_zone: str,
    project_id: str,
    reservation_name: str,
    min_replica: int,
    max_replica: int,
    location: str,
    deployed_model: Output[VertexModel],
) -> str:
    """
    Deploys a model to an existing Vertex AI endpoint with a specific reservation.

    Args:
        endpoint: The Vertex AI endpoint to deploy to.
        model: The model ID to deploy.
        deployed_name: The display name for the deployed model.
        machine_type: The machine type for the deployed model.
        accelerator_type: The accelerator type for the deployed model.
        accelerator_count: The number of accelerators for the deployed model.
        reservation_zone: The zone of the reservation.
        project_id: The project ID.
        reservation_name: The name of the reservation.
        min_replica: The minimum number of replicas.
        max_replica: The maximum number of replicas.
        location: The location of the endpoint and model.
        deployed_endpoint: The output artifact representing the deployed endpoint.
        deployed_model: The output artifact representing the deployed model.

    Returns:
        deployed_endpoint: Output[VertexEndpoint],
        deployed_model: Output[VertexModel],
    """
    
    from google.cloud import aiplatform

    aiplatform.init(
        project=project_id,
        location=location,
    )

    endpoint_fqn = endpoint.uri.split("v1/")[1]
    model_fqn = f"projects/{project_id}/locations/{location}/models/{model}"
    vertex_endpoint = aiplatform.Endpoint(endpoint_fqn)
    vertex_model = aiplatform.Model(model_name=model_fqn)

    vertex_endpoint.deploy(
        model=vertex_model,
        deployed_model_display_name=deployed_name,
        machine_type=machine_type,
        accelerator_type=accelerator_type,
        accelerator_count=accelerator_count,
        reservation_affinity_type="SPECIFIC_RESERVATION",
        reservation_affinity_key="compute.googleapis.com/reservation-name",
        reservation_affinity_values=[
            f"projects/{project_id}/zones/{reservation_zone}/reservations/{reservation_name}"
        ],
        min_replica_count=min_replica,
        max_replica_count=max_replica,
        sync=True,
    )
    # return types
    deployed_model.uri = f"https://{location}-aiplatform.googleapis.com/v1/{model_fqn}"


## Build another custom component that does the batch prediction from a gcs location
Note this is has some specific data manipulation to this model and may be different for other implementations

```bash
curl -L -o ~/Downloads/cifar10-python-in-csv.zip\
  https://www.kaggle.com/api/v1/datasets/download/fedesoriano/cifar10-python-in-csv
```

Unzip the file then upload the test file to the bucket.

```bash
gsutil cp test.csv gs://model_experimentation_2025/prediction_data/test.csv
```

#### Create a custom Dataproc serverless component

In [None]:
@dsl.component(
    base_image="python:3.11",
    packages_to_install=[
        "dataproc-spark-connect",
        "google-cloud-aiplatform",
        "google-cloud-pipeline-components",
    ],
)
def custom_batch_predict(
    project_id: str,
    location: str,
    endpoint: Input[VertexEndpoint],
    bucket: str,
    prediction_blob: str,
    destination_blob: str,
    batch_size: int,
    dataproc_serverless_template: str,
):
    """
    Custom batch prediction component using Dataproc Serverless.

    Args:
        project_id: Project ID of the Google Cloud project.
        location: Location of the Google Cloud project.
        endpoint: Vertex AI Endpoint resource.
        bucket: GCS bucket to read the input data from and write the predictions to.
        prediction_blob: GCS blob path for the input data.
        destination_blob: GCS blob path to write the predictions to.
        batch_size: Number of rows to process per batch.
        dataproc_serverless_template: Dataproc Serverless session template name.

    """
    import pandas as pd
    from pyspark.sql.connect.types import FloatType, ArrayType
    import logging
    import pyspark.sql.connect.functions as F
    from google.cloud.dataproc_v1 import Session

    session_config = Session()
    session_config.session_template = dataproc_serverless_template

    from google.cloud.dataproc_spark_connect import DataprocSparkSession

    spark = (
        DataprocSparkSession.builder.projectId(project_id)
        .location(location)
        .dataprocSessionConfig(session_config)
        .getOrCreate()
    )
    # set the batch size for number of rows handled per `predict` request
    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", f"{batch_size}")
    # sc = pyspark.SparkContext.getOrCreate()

    df = spark.read.option("header", True).csv(f"gs://{bucket}/{prediction_blob}")

    prediction_data = df.drop("label")

    cols = prediction_data.columns

    predictions_formatted_data = prediction_data.withColumn(
        "data", F.array(*cols)
    ).drop(*cols)

    endpoint_id = endpoint.uri.split("/")[-1]

    bp_args = {
        "project": project_id,
        "location": location,
        "endpoint_name": endpoint_id,
    }

    # this will vary by models - follow guidance here for prediction formats: https://cloud.google.com/vertex-ai/docs/tabular-data/classification-regression/get-online-predictions
    @F.pandas_udf(ArrayType(FloatType()))
    def make_vertex_batch_predict_fn(input_data: pd.Series) -> pd.Series:
        from google.cloud import aiplatform

        # establish a client for each map worker
        aiplatform.init(project=bp_args["project"], location=bp_args["location"])
        logging.info("aiplatform client established")

        model = aiplatform.Endpoint(bp_args["endpoint_name"])
        logging.info(f"Endpoint established: {model}")
        if input_data is None:
            return None
        float_input = input_data.apply(
            lambda string_features: string_features.astype(float)
        )  # cast to float
        reshaped_input = float_input.apply(
            lambda features: features.reshape(32, 32, 3) / 255.0
        )  # reshape and scape per the training
        list_typed_inputs = reshaped_input.apply(
            lambda reshaped_arrays: reshaped_arrays.tolist()
        )  # array to list for each element
        response = model.predict(list_typed_inputs.values.tolist())
        return pd.Series(response.predictions)

    spark.udf.register("batch_predict", make_vertex_batch_predict_fn)

    predictions_formatted_data.createOrReplaceTempView("predictions_formatted_data")

    # TODO: Error handling for quota/429s with backoff strategies

    predictions_df = spark.sql(
        """SELECT
    batch_predict(data) as predictions, data
    from predictions_formatted_data
    """
    )
    top2_qa_result = predictions_df.show(2)

    logging.info(f"top two results: \n{top2_qa_result}")

    # write the predictions to gcs
    predictions_df.write.option("lineSep", "\n").json(
        f"gs://{bucket}/{destination_blob}"
    )

## Important - make sure you enable `roles/compute.viewer` permissions for the Vertex Service Account

This is important so the tenant project running the pipeline can view and utilize the reservation.

In [53]:
#### Bind SA to the reservation
PROJECT_NUMBER = 679926387543

! gcloud compute reservations add-iam-policy-binding \
    a100-custom-image-reservation \
    --zone=us-central1-b \
    --member="serviceAccount:service-$PROJECT_NUMBER@gcp-sa-aiplatform.iam.gserviceaccount.com" \
    --role="roles/compute.viewer" \
    --project=wortz-project-352116

Updated IAM policy for reservation [a100-custom-image-reservation].
bindings:
- members:
  - serviceAccount:vertex-sa@wortz-project-352116.iam.gserviceaccount.com
  role: roles/compute.admin
- members:
  - serviceAccount:vertex-sa@wortz-project-352116.iam.gserviceaccount.com
  role: roles/compute.futureReservationAdmin
- members:
  - serviceAccount:vertex-sa@wortz-project-352116.iam.gserviceaccount.com
  role: roles/compute.instanceAdmin
- members:
  - serviceAccount:679926387543-compute@developer.gserviceaccount.com
  - serviceAccount:service-679926387543@gcp-sa-aiplatform.iam.gserviceaccount.com
  - serviceAccount:vertex-sa@wortz-project-352116.iam.gserviceaccount.com
  role: roles/compute.viewer
etag: BwY2yUa0B5c=
version: 1


## Pipeline with standard components integrated into the custom reservation deploy and batch predict

**Important**: create a Dataproc Template that uses default compute service account: `$PROJECT_NUMBER-compute@developer.gserviceaccount.com`

<img src='img/serverless_template.png' width=300px/>

In [None]:
from kfp import dsl
from google_cloud_pipeline_components.v1.endpoint import (
    EndpointCreateOp,
    EndpointDeleteOp,
    ModelUndeployOp,
)

bucket = ("model_experimentation_2025",)


@dsl.pipeline(
    name="deploy-model-with-reserved-gpu",
    description="Deploys a model to an endpoint using a reserved GPU.",
)
def deploy_model_pipeline(
    project_id: str,
    model: str,
    region: str,
    zone: str,
    reservation_name: str,
    endpoint_display_name: str,
    deployed_model_display_name: str,
    machine_type: str,
    accelerator_type: str,
    bucket: str,
    prediction_input_blob: str,
    prediction_output_blob: str,
    dataproc_serverless_template: str,
    batch_size: int,
    accelerator_count: int,
    min_replica: int,
    max_replica: int,
):

    # 1. Create an endpoint
    create_endpoint_op = EndpointCreateOp(
        project=project_id,
        location=region,
        display_name=endpoint_display_name,
    ).set_display_name("Create an Endpoint")

    # 2. Deploy the model to the endpoint with reserved GPU
    model_deploy_op = create_endpoint_with_reservation(
        endpoint=create_endpoint_op.outputs["endpoint"],
        model=model,
        location=region,
        deployed_name=deployed_model_display_name,
        machine_type=machine_type,
        accelerator_type=accelerator_type,
        accelerator_count=accelerator_count,
        reservation_zone=zone,
        project_id=project_id,
        reservation_name=reservation_name,
        min_replica=min_replica,
        max_replica=max_replica,
    ).set_display_name("Deploy with GPU Reservation")

    # 3. Dataproc spark-based batch prediction job here
    batch_predict_op = (
        custom_batch_predict(
            project_id=project_id,
            location=region,
            endpoint=create_endpoint_op.outputs["endpoint"],
            bucket=bucket,
            prediction_blob=prediction_input_blob,
            destination_blob=prediction_output_blob,
            batch_size=batch_size,
            dataproc_serverless_template=dataproc_serverless_template,
        )
        .after(model_deploy_op)
        .set_display_name("Batch Predict With Dataproc Serverless")
    )

    # 4. Teardown of resources post-prediction
    model_undeploy_op = (
        ModelUndeployOp(
            endpoint=create_endpoint_op.outputs["endpoint"],
            model=model_deploy_op.outputs["deployed_model"],
            # traffic_split={"0": 100} # Optional: to ensure all traffic is removed from this model_id
            # If this is the only model, it will be removed.
        )
        .set_display_name("Undeploy Model")
        .after(batch_predict_op)
    )

    EndpointDeleteOp(
        endpoint=create_endpoint_op.outputs[
            "endpoint"
        ],  # Use the same endpoint from deploy op
    ).set_display_name("Undeploy Endpoint").after(
        model_undeploy_op
    )  # Explicitly set dependency

## Compile the pipeline

In [66]:
kfp.compiler.Compiler().compile(
    pipeline_func=deploy_model_pipeline,
    package_path="predict_w_reservations.json",
)

#### Set the pipeline parameters dictionary

In [None]:
# Replace with your project ID, region, etc.
import time

epoch_time = time.time()
pipeline_params = dict(
    project_id="wortz-project-352116",
    model="3416616934593003520",
    region="us-central1",
    zone="us-central1-b",
    reservation_name="a100-custom-image-reservation",
    endpoint_display_name="Reservation_Endpoint",
    deployed_model_display_name="My_deployed_model",
    accelerator_type="NVIDIA_TESLA_A100",  # find these settings here https://cloud.google.com/compute/docs/accelerator-optimized-machines
    machine_type="a2-highgpu-1g",
    bucket="model_experimentation_2025",
    prediction_input_blob="prediction_data/test.csv",
    # prediction_output_blob=f"output_data/predictions_{epoch_time}.jsonl",
    prediction_output_blob=f"output_data/predictions_1749091255.759079.jsonl",
    dataproc_serverless_template="projects/wortz-project-352116/locations/us-central1/sessionTemplates/spark-connect-2-2",
    batch_size=30,  # set batch size to fit in .predict's 1.5 Mb limit
    accelerator_count=1,
    min_replica=1,
    max_replica=2,
)

# Run the pipeline with the compute SA 
The compute SA is chosen because it was used for the Dataproc Serverless Template, other SAs when configuring new templates

In [68]:
from google.cloud import aiplatform

aiplatform.init(
    project=pipeline_params["project_id"],
    location=pipeline_params["region"],
)
job = aiplatform.PipelineJob(
    display_name=f"Predictions with GPU Reservations",
    template_path="predict_w_reservations.json",
    parameter_values=pipeline_params,
    project=pipeline_params["project_id"],
    location=pipeline_params["region"],
    enable_caching=True,
)

job.submit(service_account="679926387543-compute@developer.gserviceaccount.com") 

Creating PipelineJob
PipelineJob created. Resource name: projects/679926387543/locations/us-central1/pipelineJobs/deploy-model-with-reserved-gpu-20250604214058
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/679926387543/locations/us-central1/pipelineJobs/deploy-model-with-reserved-gpu-20250604214058')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/deploy-model-with-reserved-gpu-20250604214058?project=679926387543


## Validate the output predictions

In [64]:
bucket = pipeline_params["bucket"]
blob = pipeline_params["prediction_output_blob"]
! gsutil cat -r 1-100 gs://$bucket/$blob/part-0000*

"data":["158","159","165","166","160","156","162","159","158","159","161","160","161","166","169","1"data":["203","206","210","211","208","204","202","199","199","206","208","208","208","205","207","2"data":["158","165","168","174","170","173","187","201","204","209","213","217","220","222","222","2"data":["160","154","145","160","103","82","115","133","151","141","140","93","93","103","146","99","data":["156","238","255","255","255","255","249","249","181","140","141","137","108","139","147","1"data":["75","89","102","75","63","107","161","174","127","160","174","172","220","197","181","213","data":["131","135","136","139","140","141","143","146","147","142","117","105","130","170","159","1"data":["226","223","223","225","226","227","228","227","227","227","227","227","227","227","227","2

In [61]:
! echo gs://$pipeline_params["bucket"]$pipeline_params["prediction_output_blob"]/part-0000*

gs://{project_id: wortz-project-352116, model: 3416616934593003520, region: us-central1, zone: us-central1-b, reservation_name: a100-custom-image-reservation, endpoint_display_name: Reservation_Endpoint, deployed_model_display_name: My_deployed_model, accelerator_type: NVIDIA_TESLA_A100, machine_type: a2-highgpu-1g, bucket: model_experimentation_2025, prediction_input_blob: prediction_data/test.csv, prediction_output_blob: output_data/predictions_1749087963.064943.jsonl, dataproc_serverless_template: projects/wortz-project-352116/locations/us-central1/sessionTemplates/spark-connect-2-2, batch_size: 30, accelerator_count: 1, min_replica: 1, max_replica: 2}[bucket]{project_id: wortz-project-352116, model: 3416616934593003520, region: us-central1, zone: us-central1-b, reservation_name: a100-custom-image-reservation, endpoint_display_name: Reservation_Endpoint, deployed_model_display_name: My_deployed_model, accelerator_type: NVIDIA_TESLA_A100, machine_type: a2-highgpu-1g, bucket: model_ex