# 03 Train SAM on Vertex AI

Custom image classification with a custom training container

SERVE_DOCKER_URI## Overview

This notebook demonstrates how to train a custom image classification model by creating a custom training SAM with Pytorch and the Vertex AI SDK. The notebook also demonstrates how to deploys the trained model to Vertex AI and generate predictions from it.

Learn more about [Migrate to Vertex AI](https://cloud.google.com/vertex-ai/docs/start/migrating-to-vertex-ai) and [Custom training](https://cloud.google.com/vertex-ai/docs/training/custom-training).

### Objective

In this tutorial, you learn how to train SAM using a custom container and Vertex AI training. After training, you also deploy the model to Vertex AI using a pre-built container and generate both batch and online predictions on it. 

This tutorial uses the following Google Cloud ML services and resources:

- *Vertex AI Training*
- *Vertex AI Model Registry*
- *Vertex AI Batch Predictions*
- *Vertex AI Endpoints*


The steps performed include:

- *Package the training code into a python application.*
- *Containerize the training application using Cloud Build and Artifact Registry.*
- *Create a custom container training job in Vertex AI and run it.*
- *Evaluate the model generated from the training job.*
- *Create a model resource for the trained model in Vertex AI Model Registry.*
- *Run a Vertex AI batch prediction job.*
- *Deploy the model resource to a Vertex AI Endpoint.*
- *Run a online prediction job on the model resource.*
- *Clean up the resources created.*

### Costs 

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage
* Cloud Build
* Artifact Registry

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing), [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), [Cloud Build pricing](https://cloud.google.com/build/pricing), [Artifact Registry pricing](https://cloud.google.com/artifact-registry/pricing) and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Installation

Install the packages required for executing this notebook.

In [1]:
! pip3 install --upgrade google-cloud-aiplatform \
                         google-cloud-storage --user

! pip3 install rasterio gcsfs google-auth

# if os.getenv("IS_TESTING"):
#     ! apt-get update && apt-get install -y python3-opencv-headless --user
#     ! apt-get install -y libgl1-mesa-dev --user
#     ! pip3 install --upgrade opencv-python-headless --user

Collecting rasterio
  Obtaining dependency information for rasterio from https://files.pythonhosted.org/packages/5e/19/4617aaaf3166b06c520db50de38108bf069e63512712a7edda6710f4687b/rasterio-1.3.8.post2-cp310-cp310-manylinux2014_x86_64.whl.metadata
  Using cached rasterio-1.3.8.post2-cp310-cp310-manylinux2014_x86_64.whl.metadata (14 kB)
Collecting affine (from rasterio)
  Using cached affine-2.4.0-py3-none-any.whl (15 kB)
Collecting cligj>=0.5 (from rasterio)
  Using cached cligj-0.7.2-py3-none-any.whl (7.1 kB)
Collecting snuggs>=1.4.1 (from rasterio)
  Using cached snuggs-1.4.7-py3-none-any.whl (5.4 kB)
Collecting click-plugins (from rasterio)
  Using cached click_plugins-1.1.1-py2.py3-none-any.whl (7.5 kB)
Using cached rasterio-1.3.8.post2-cp310-cp310-manylinux2014_x86_64.whl (20.6 MB)
Installing collected packages: snuggs, cligj, click-plugins, affine, rasterio
Successfully installed affine-2.4.0 click-plugins-1.1.1 cligj-0.7.2 rasterio-1.3.8.post2 snuggs-1.4.7


#### Set your project ID

**If you don't know your project ID**, try the following:
* Run `gcloud config list`.
* Run `gcloud projects list`.
* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)

In [2]:
# PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
PROJECT_ID = "imposing-mind-398223"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


#### Set the region

**Optional**: Update the 'REGION' variable to specify the region that you want to use. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [3]:
REGION = "us-central1"  # @param {type: "string"}

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [4]:
# BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}
BUCKET_URI = f"gs://meter-sam-imposing-mind-398223-unique"  # @param {type:"string"}

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [5]:
# ! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

### Import libraries

Load the Vertex AI SDK and other libraries for Python used in this notebook.

In [6]:
import base64
import json

import numpy as np
from google.cloud import aiplatform
import os
# import cv2

### Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [7]:
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

## Configure containers for training and prediction

In this step, you set the configuration parameters used while creating training and serving containers like number and type of GPUs needed(by default CPU is used) and machine-type need for serving.


### Set hardware accelerators

You can set hardware accelerators for training and prediction.

Set the variables `TRAIN_GPU/TRAIN_NGPU` and `DEPLOY_GPU/DEPLOY_NGPU` to use a container image supporting a GPU and the number of GPUs allocated to the virtual machine (VM) instance. For example, to use a GPU container image with 4 Nvidia Telsa K80 GPUs allocated to each VM, you would specify:

    (aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80, 4)


Otherwise specify `(None, None)` to use a container image to run on a CPU.

Learn more [here](https://cloud.google.com/vertex-ai/docs/general/locations#accelerators) hardware accelerator support for your region

In [8]:
TRAIN_GPU, TRAIN_NGPU = (None, None)
DEPLOY_GPU, DEPLOY_NGPU = (None, None)

### Set machine type

Next, set the machine type to use for training and prediction.

- Set the variables `TRAIN_COMPUTE` and `DEPLOY_COMPUTE` to configure  the compute resources for the VMs you use for for training and prediction.
 - `machine type`
     - `n1-standard`: 3.75GB of memory per vCPU.
     - `n1-highmem`: 6.5GB of memory per vCPU
     - `n1-highcpu`: 0.9 GB of memory per vCPU
 - `vCPUs`: number of \[2, 4, 8, 16, 32, 64, 96 \]

*Note: The following is not supported for training:*

 - `standard`: 2 vCPUs
 - `highcpu`: 2, 4 and 8 vCPUs

*Note: You may also use n2 and e2 machine types for training and deployment, but they do not support GPUs*.

In [9]:
TRAIN_MACHINE_TYPE = "n1-standard"
TRAIN_VCPU = "4"

TRAIN_COMPUTE = TRAIN_MACHINE_TYPE + "-" + TRAIN_VCPU
print("Train machine type", TRAIN_COMPUTE)

if os.getenv("IS_TESTING_DEPLOY_MACHINE"):
    MACHINE_TYPE = os.getenv("IS_TESTING_DEPLOY_MACHINE")
else:
    MACHINE_TYPE = "n1-standard"

DEPLOY_MACHINE_TYPE = "n1-standard"
DEPLOY_VCPU = "2"

DEPLOY_COMPUTE = DEPLOY_MACHINE_TYPE + "-" + DEPLOY_VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

Train machine type n1-standard-4
Deploy machine type n1-standard-2


## Package training application

In this step, you package the code for training SAM using your own custom container.

To use your own custom container, you build a Docker file. First, you create a directory for the container components.

### Package layout

Before you start the training, you look at how a Python package is assembled for a custom training job. When unarchived, the package contains the following directory/file layout.

- PKG-INFO
- README.md
- setup.cfg
- setup.py
- trainer
  - \_\_init\_\_.py
  - train.py

The files `setup.cfg` and `setup.py` are the instructions for installing the package into the operating environment of the Docker image.

The file `trainer/task.py` is the Python script for executing the custom training job. 

*Note: When the file is referred in the worker pool specification, the directory slash is replaced with a dot (`trainer.train`) and dropped the file suffix (`.py`).*

### Package Assembly

In the following cells, you assemble the training package.

In [10]:
# Make folder for Python training and serving script
! rm -rf custom
! mkdir custom
! mkdir custom/train
! mkdir custom/serve

# Add package information
! touch custom/train/README.md
! touch custom/serve/README.md

# Add required libary information
requirements_train = """git+https://github.com/facebookresearch/segment-anything.git
transformers
datasets
monai
rasterio
segment_anything
"""
! echo "$requirements_train" > custom/train/requirements.txt

requirements_serve = """transformers
torch
numpy
Pillow
rasterio
gcsfs
fastapi
uvicorn
"""
! echo "$requirements_serve" > custom/serve/requirements.txt

# setup_cfg = "[egg_info]\n\ntag_build =\n\ntag_date = 0"
# ! echo "$setup_cfg" > custom/setup.cfg

# setup_py = "import setuptools\n\nsetuptools.setup(\n\n    install_requires=[\n\n        'tensorflow_datasets==1.3.0',\n\n    ],\n\n    packages=setuptools.find_packages())"
# ! echo "$setup_py" > custom/setup.py

# pkg_info = "Metadata-Version: 1.0\n\nName: CIFAR10 image classification\n\nVersion: 0.0.0\n\nSummary: Demostration training script\n\nHome-page: www.google.com\n\nAuthor: Google\n\nAuthor-email: aferlitsch@google.com\n\nLicense: Public\n\nDescription: Demo\n\nPlatform: Vertex"
# ! echo "$pkg_info" > custom/PKG-INFO

### Contents of trainer task

In the next cell, you write the contents of the training script `task.py`. The script's details are not discussed in depth here.

In [11]:
%%writefile custom/train/trainer.py
import sys
import os
import io
import argparse
import numpy as np
import rasterio
from datasets import Dataset
from PIL import Image
from torch.utils.data import Dataset as TorchDataset, DataLoader
from transformers import SamProcessor
from transformers import SamModel
import torch
from segment_anything import sam_model_registry
from tqdm import tqdm
from statistics import mean
import time
import monai
import gcsfs
# from google.cloud import storage
# from io import BytesIO

def parse_arguments():
    """Parse command-line arguments."""
    parser = argparse.ArgumentParser(description="Train the SAM on Vertex AI.")

    # Directories and File Paths
    parser.add_argument('--input-dir', type=str, default='gs://meter-sam/train/stack/', help='Input data directory path.')
    parser.add_argument('--output-dir', type=str, default='gs://meter-sam/model/', help='Output data directory path.')

    # Hyperparameters
    parser.add_argument('--num-epochs', type=int, default=10, help='Number of training epochs.')
    parser.add_argument('--batch-size', type=int, default=4, help='Batch size for training.')
    parser.add_argument('--learning-rate', type=float, default=1e-5, help='Learning rate for optimizer.')
    parser.add_argument('--weight-decay', type=float, default=0, help='Weight decay for optimizer.')
    
    return parser.parse_args()

def read_tiff(file_path):
    """Read TIFF file and return its content."""
    try:
        with rasterio.open(file_path) as src:
            return src.read()
    except Exception as e:
        print(f"Error reading file: {file_path}. Error: {e}")
        return None

def get_bounding_box(ground_truth_map):
    """Compute bounding box for the given ground truth map."""
    y_indices, x_indices = np.where(ground_truth_map > 0)
    x_min, x_max = np.min(x_indices), np.max(x_indices)
    y_min, y_max = np.min(y_indices), np.max(y_indices)
    H, W = ground_truth_map.shape
    x_min = max(0, x_min - np.random.randint(0, 20))
    x_max = min(W, x_max + np.random.randint(0, 20))
    y_min = max(0, y_min - np.random.randint(0, 20))
    y_max = min(H, y_max + np.random.randint(0, 20))
    return [x_min, y_min, x_max, y_max]

class SAMDataset(TorchDataset):
    """
    This class is used to create a dataset that serves input images and masks.
    It takes a dataset and a processor as input and overrides the __len__ and __getitem__ methods of the Dataset class.
    """
    def __init__(self, dataset, processor):
        self.dataset = dataset
        self.processor = processor

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        image = item["image"]
        ground_truth_mask = np.array(item["label"])
        prompt = get_bounding_box(ground_truth_mask)
        inputs = self.processor(image, input_boxes=[[prompt]], return_tensors="pt")
        inputs = {k: v.squeeze(0) for k, v in inputs.items()}
        inputs["ground_truth_mask"] = ground_truth_mask
        return inputs

def prepare_datasets(args):
    """Prepare datasets for training."""
    images = read_tiff(args.input_dir + 'images.tif')
    masks = read_tiff(args.input_dir + 'masks.tif')
    # Convert the NumPy arrays to Pillow images and store them in a dictionary
    dataset_dict = {
        "image": [Image.fromarray(img) for img in images],
        "label": [Image.fromarray(mask) for mask in masks],
    }
    # Create the dataset using the datasets.Dataset class
    dataset = Dataset.from_dict(dataset_dict)
    # Initialize the processor
    processor = SamProcessor.from_pretrained("facebook/sam-vit-base")
    # Create an instance of the SAMDataset
    train_dataset = SAMDataset(dataset=dataset, processor=processor)
    return train_dataset, processor

def load_model():
    """Load SAM model."""
    model = SamModel.from_pretrained("facebook/sam-vit-base")
    for name, param in model.named_parameters():
        if name.startswith("vision_encoder") or name.startswith("prompt_encoder"):
            param.requires_grad_(False)
    return model

def train_sam(train_dataset, model, processor, args):
    """Train the SAM model."""
    # Create a DataLoader instance for the training dataset
    train_dataloader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True, drop_last=False, num_workers=4, pin_memory=True)
    optimizer = torch.optim.Adam(model.mask_decoder.parameters(), lr=args.learning_rate, weight_decay=args.weight_decay)
    seg_loss = monai.losses.DiceCELoss(sigmoid=True, squared_pred=True, reduction='mean')
    # Training device
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    # Start training
    start_time = time.time()
    model.train()
    for epoch in range(args.num_epochs):
        total_start_time = time.time()
        epoch_losses = []
        for batch in tqdm(train_dataloader):
            # forward passFar
            outputs = model(
                pixel_values=batch["pixel_values"].to(device),
                input_boxes=batch["input_boxes"].to(device),
                multimask_output=False)
            # compute loss
            predicted_masks = outputs.pred_masks.squeeze(1)
            ground_truth_masks = batch["ground_truth_mask"].float().to(device)
            loss = seg_loss(predicted_masks, ground_truth_masks.unsqueeze(1))
            # backward pass (compute gradients of parameters w.r.t. loss)
            optimizer.zero_grad()
            loss.backward()
            # optimize
            optimizer.step()
            epoch_losses.append(loss.item())
        elapsed_time = (time.time() - start_time) / 60
        print(f'EPOCH: {epoch}')
        print(f'Mean loss: {mean(epoch_losses)}')
        print(f'Time taken for the epoch: {elapsed_time:.2f} minutes\n')
    # Calculate total training time
    total_training_time = (time.time() - total_start_time) / 60
    print(f'Total training time: {total_training_time:.2f} minutes')   
    return model

def save_model(model, output_dir):
    """Save the trained model."""
    fs = gcsfs.GCSFileSystem(project='imposing-mind-398223')
    with fs.open(output_dir+"model.pth", 'wb') as f:
        torch.save(model.state_dict(), f)

def main():
    """Main function to orchestrate model training."""
    args = parse_arguments()
    train_dataset, processor = prepare_datasets(args)
    model = load_model()
    model = train_sam(train_dataset, model, processor, args)
    save_model(model, args.output_dir)

if __name__ == "__main__":
    main()

# # Load SAM from Google Cloud Service
    
# def load_model_from_gcs(bucket_name, blob_path):
#     """Load SAM from Google Cloud Storage."""
#     # Initialize a GCS client
#     client = storage.Client()
#     bucket = client.get_bucket(bucket_name)
#     blob = bucket.blob(blob_path)

#     # Download the blob contents into a BytesIO object
#     buffer = io.BytesIO()
#     blob.download_to_file(buffer)
#     buffer.seek(0)

#     # Load the model directly from the BytesIO object
#     state_dict = torch.load(buffer)
#     model = sam_model_registry["vit_h"]()
#     model.load_state_dict(state_dict)
#     return model

# # Hyperparameter tuning for Pytorch

# $pip install hypertune
# import hypertune

# # Define metric
# hp_metric = history.history['val_accuracy'][-1]

# # Report the metric to hypertune
# hpt = hypertune.HyperTune()
# hpt.report_hyperparameter_tuning_metric(
#     hyperparameter_metric_tag='accuracy', # metric name
#     metric_value=hp_metric, #metric value
#     global_step=num_epochs)

Writing custom/train/trainer.py


### Contents of sever task

In the next cell, you write the contents of the serving script `app.py`. The script's details are not discussed in depth here.

In [12]:
%%writefile custom/serve/server.py
import os
import numpy as np
import uvicorn
import logging
import torch
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from transformers import SamConfig, SamProcessor, SamModel
from PIL import Image
import gcsfs
from io import BytesIO
import base64
from typing import List, Dict

# Configuration
AIP_HEALTH_ROUTE = os.environ.get('AIP_HEALTH_ROUTE', '/health')
AIP_PREDICT_ROUTE = os.environ.get('AIP_PREDICT_ROUTE', '/predict')

# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialization of model, processor, etc.
fs = gcsfs.GCSFileSystem(project='imposing-mind-398223')
model_path = "gs://meter-sam/model/model.pth"
model_config = SamConfig.from_pretrained("facebook/sam-vit-base")
processor = SamProcessor.from_pretrained("facebook/sam-vit-base")
model = SamModel(config=model_config)

# with fs.open(model_path, 'rb') as f:
#     model_state_dict = torch.load(f)
#     model.load_state_dict(model_state_dict)

# If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.
with fs.open(model_path, 'rb') as f:
    model_state_dict = torch.load(f, map_location=torch.device('cpu'))
    model.load_state_dict(model_state_dict)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

app = FastAPI(title="SAM Vertex AI")

class Prediction(BaseModel):
    predictions: List[Dict]
    
@app.get(AIP_HEALTH_ROUTE, status_code=200)
async def health():
    return {'health': 'ok'}

@app.post(AIP_PREDICT_ROUTE, 
          response_model=Prediction,
          response_model_exclude_unset=True)

async def predict(request: Request):
    try:
        body = await request.json()
        print(f"Received request body: {body}")
        instances = body.get('instances')
        print(f"Received request instances: {instances}")
        image_str = [x['image'] for x in body['instances']][0]
        print(f"Received request image: {image_str}")
        
        if not image_str:
            error_msg = "Key 'image' not found in the request body."
            logger.warning(error_msg)
            return JSONResponse(content={"error": error_msg}, status_code=400)
        
        # Decode the base64 string and convert the bytes back to an image using PIL
        image = Image.open(BytesIO(base64.b64decode(image_str)))
        array = np.array(image)
        print(f"array: {array}")

        # Define the size of the array
        array_size = array.shape[1]

        # Define the size of the grid
        grid_size = 10

        # Generate the grid points
        x = np.linspace(0, array_size-1, grid_size)
        y = np.linspace(0, array_size-1, grid_size)

        # Generate a grid of coordinates
        xv, yv = np.meshgrid(x, y)

        # Convert the numpy arrays to lists
        xv_list = xv.tolist()
        yv_list = yv.tolist()

        # Combine the x and y coordinates into a list of list of lists
        input_points = [[[int(x), int(y)] for x, y in zip(x_row, y_row)] for x_row, y_row in zip(xv_list, yv_list)]

        # Reshape the grid to the expected shape of the input_points tensor
        input_points = torch.tensor(input_points).view(1, 1, grid_size*grid_size, 2)

        # Process the image
        patch = Image.fromarray(array)
        inputs = processor(patch, input_points=input_points, return_tensors="pt")
        inputs = {k: v.to(device) for k, v in inputs.items()}

        # Forward pass
        with torch.no_grad():
            outputs = model(**inputs, multimask_output=False)

        # Apply sigmoid
        patch_prob = torch.sigmoid(outputs.pred_masks.squeeze(1))
        patch_prob = patch_prob.cpu().numpy().squeeze()
        patch_predict = (patch_prob > 0.5).astype(np.uint8)

        # Convert predictions to bytes and then to base64
        base64_patch_prob = base64.b64encode(patch_prob.astype(np.float32).tobytes()).decode('utf-8')
        base64_patch_predict = base64.b64encode(patch_predict.astype(np.float32).tobytes()).decode('utf-8')
        print(f"base64_patch_prob: {base64_patch_prob}")
        print(f"base64_patch_predict: {base64_patch_predict}")
        
        result={
            'mask_probability': base64_patch_prob,
            'mask_prediction': base64_patch_predict
        }
        print(f"result: {result}")
        
        return Prediction(predictions=[result])
    
    except ValueError as ve:
        logger.warning(f"Value error: {ve}")
        raise HTTPException(status_code=400, detail=str(ve))
    except Exception as e:
        logger.error(f"Error during prediction: {e}")
        raise HTTPException(status_code=500, detail="Internal Server Error")

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)

Writing custom/serve/server.py


### Write the Dockerfile content

Your first step in containerizing your code is to create Dockerfile. In the Dockerfile, you include all the commands needed to run your container image. During the build process, all your packages are installed and an entry point is set for your training and serving code.

In [13]:
# Write the Dokerfile for trainer

In [14]:
%%writefile custom/train/Dockerfile
# Use the specified PyTorch GPU base image
FROM gcr.io/deeplearning-platform-release/pytorch-gpu.1-13

# Set working directory in the container
WORKDIR /train

# Install required libraries
COPY requirements.txt /train/
RUN pip install -r requirements.txt

# Copy the trainer code to the docker image
COPY trainer.py /train/

# Set up the entry point to invoke the trainer
ENTRYPOINT ["python", "trainer.py"]

Writing custom/train/Dockerfile


In [15]:
# Write the Dokerfile for Server

In [16]:
%%writefile custom/serve/Dockerfile
# Use the specified PyTorch GPU base image
FROM gcr.io/deeplearning-platform-release/pytorch-gpu.1-13

# Set the working directory in the container
WORKDIR /serve

# Install required libraries
COPY requirements.txt /serve/
RUN pip install -r requirements.txt

# Copy the server code to the docker image
COPY server.py /serve/

# Update and install system-level dependencies if needed (like gcc, g++, etc. for certain packages)
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Run server.py when the container launches
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8080"]

Writing custom/serve/Dockerfile


## Enable Artifact Registry API

You must enable the Artifact Registry API service for your project.

Learn more about [Enabling service](https://cloud.google.com/artifact-registry/docs/enable-service).

In [17]:
! gcloud services enable artifactregistry.googleapis.com

if os.getenv("IS_TESTING"):
    ! sudo apt-get update --yes && sudo apt-get --only-upgrade --yes install google-cloud-sdk-cloud-run-proxy google-cloud-sdk-harbourbridge google-cloud-sdk-cbt google-cloud-sdk-gke-gcloud-auth-plugin google-cloud-sdk-kpt google-cloud-sdk-local-extract google-cloud-sdk-minikube google-cloud-sdk-app-engine-java google-cloud-sdk-app-engine-go google-cloud-sdk-app-engine-python google-cloud-sdk-spanner-emulator google-cloud-sdk-bigtable-emulator google-cloud-sdk-nomos google-cloud-sdk-package-go-module google-cloud-sdk-firestore-emulator kubectl google-cloud-sdk-datastore-emulator google-cloud-sdk-app-engine-python-extras google-cloud-sdk-cloud-build-local google-cloud-sdk-kubectl-oidc google-cloud-sdk-anthos-auth google-cloud-sdk-app-engine-grpc google-cloud-sdk-pubsub-emulator google-cloud-sdk-datalab google-cloud-sdk-skaffold google-cloud-sdk google-cloud-sdk-terraform-tools google-cloud-sdk-config-connector
    ! gcloud components update --quiet

## Create a private Docker repository

Your first step is to create your own Docker repository in Google Artifact Registry.

1. Run the `gcloud artifacts repositories create` command to create a new Docker repository with your region with the description "docker repository".

2. Run the `gcloud artifacts repositories list` command to verify that your repository was created.

In [18]:
REPOSITORY = "meter-sam"

! gcloud artifacts repositories create {REPOSITORY} --repository-format=docker --location={REGION} --description="Docker repository"

! gcloud artifacts repositories list

[1;31mERROR:[0m (gcloud.artifacts.repositories.create) ALREADY_EXISTS: the repository already exists
Listing items under project imposing-mind-398223, across all locations.

                                                                   ARTIFACT_REGISTRY
REPOSITORY  FORMAT  MODE                 DESCRIPTION        LOCATION     LABELS  ENCRYPTION          CREATE_TIME          UPDATE_TIME          SIZE (MB)
meter-sam   DOCKER  STANDARD_REPOSITORY  Docker repository  us-central1          Google-managed key  2023-09-16T18:04:00  2023-10-13T17:21:06  14939.828


## Build the training container

### Create a repository

Next, you create a repository in the Artifact Registry where you store your training image.

Set names for your repository and custom container below.

In [13]:
CONTAINER_NAME = "landfill-test-train"

TAG = "latest"
TRAIN_IMAGE = (
    f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY}/{CONTAINER_NAME}:{TAG}"
)

In [14]:
TRAIN_IMAGE

'us-central1-docker.pkg.dev/imposing-mind-398223/meter-sam/landfill-test-train:latest'

Create the repository in Artifact Registry.

### Build the custom container for training

Next, you build and push a docker image to the created repository using Cloud Build. 

Learn more about the process of [Building and pushing a Docker image with Cloud Build](https://cloud.google.com/build/docs/build-push-docker-image).

In [21]:
%cd custom/train
!gcloud builds submit --quiet --region={REGION} --tag=$TRAIN_IMAGE
%cd ..

/home/jupyter/meter-sam/custom/train
Creating temporary tarball archive of 4 file(s) totalling 7.7 KiB before compression.
Uploading tarball of [.] to [gs://imposing-mind-398223_cloudbuild/source/1696897867.942236-14965ac74fde4f04b91c17a060563f88.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/imposing-mind-398223/locations/us-central1/builds/a8b53f58-8b18-4dca-a16b-c72aba23df55].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds;region=us-central1/a8b53f58-8b18-4dca-a16b-c72aba23df55?project=78123506305 ].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "a8b53f58-8b18-4dca-a16b-c72aba23df55"

FETCHSOURCE
Fetching storage object: gs://imposing-mind-398223_cloudbuild/source/1696897867.942236-14965ac74fde4f04b91c17a060563f88.tgz#1696897868208207
Copying gs://imposing-mind-398223_cloudbuild/source/1696897867.942236-14965ac74fde4f04b91c17a060563f88.tgz#1696897868208207...
/ [1 files][  3.2 KiB/  3.2 KiB] 

## Create and run custom training job


To train a custom model, you perform two steps: 1) create a custom training job, and 2) run the job.

For more details, see [Custom containers overview]([training.containers-overview](https://cloud.google.com/vertex-ai/docs/training/containers-overview).

### Create custom training job

A custom training job is created with the `CustomTrainingJob` class, with the following parameters:

- `display_name`: The human readable name for the custom training job.
- `container_uri`: The training container image.

In [15]:
job = aiplatform.CustomContainerTrainingJob(
    display_name="test-35000-base", container_uri=TRAIN_IMAGE
)

print(job)

<google.cloud.aiplatform.training_jobs.CustomContainerTrainingJob object at 0x7f7ca23ea790>


### Run the custom training job

Next, you run the custom job to start the training job by invoking the method `run`, with the following parameters:

- `args`: The command-line arguments to pass to the training script.
- `replica_count`: The number of compute instances for training (replica_count = 1 is single node training).
- `machine_type`: The machine type for the compute instances.
- `accelerator_type`: The hardware accelerator type.
- `accelerator_count`: The number of accelerators to attach to a worker replica.
- `base_output_dir`: The Cloud Storage location to write the model artifacts to.
- `sync`: Whether to block until completion of the job.

In [18]:
INPUT_DIR = "gs://meter-sam/test/stack/"
OUTPUT_DIR = "gs://meter-sam/test/model/"

EPOCHS = 5
BATCH_SIZE = 2

CMDARGS = [
    "--input-dir=" + INPUT_DIR,
    "--output-dir=" + OUTPUT_DIR,
    "--num-epochs=" + str(EPOCHS),
    "--batch-size=" + str(BATCH_SIZE),
]


# MODEL_DIR = "{}/{}".format(BUCKET_URI, UUID)
# DIRECT = True
# if DIRECT:
#     CMDARGS = [
#         "--model-dir=" + MODEL_DIR,
#         "--epochs=" + str(EPOCHS),
#         "--steps=" + str(STEPS),
#     ]
# else:
#     CMDARGS = [
#         "--epochs=" + str(EPOCHS),
#         "--steps=" + str(STEPS),
#     ]

if TRAIN_GPU:
    job.run(
        base_output_dir=OUTPUT_DIR,
        service_account='78123506305-compute@developer.gserviceaccount.com',
        args=CMDARGS,
        replica_count=1,
        machine_type=TRAIN_COMPUTE,
        accelerator_type=TRAIN_GPU.name,
        accelerator_count=TRAIN_NGPU,
        sync=True,
    )
else:
    job.run(
        base_output_dir=OUTPUT_DIR,
        service_account='78123506305-compute@developer.gserviceaccount.com',
        args=CMDARGS,
        replica_count=1,
        machine_type=TRAIN_COMPUTE,
        sync=False,
    )

model_path_to_deploy = OUTPUT_DIR

Training Output directory:
gs://meter-sam/test/model/ 
View Training:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/7748952576371982336?project=78123506305


RuntimeError: Training failed with:
code: 8
message: "The following quota metrics exceed quota limits: aiplatform.googleapis.com/custom_model_training_nvidia_t4_gpus"


### Wait for completion of custom training job

Next, wait for the custom training job to complete. Alternatively, one can set the parameter `sync` to `True` in the `run()` method to block until the custom training job is completed.

## Build the serving container

### Create a repository

Next, you create a repository in the Artifact Registry where you store your training image.

Set names for your repository and custom container below.

In [19]:
CONTAINER_NAME = "landfill-test-serve"

TAG = "latest"
SERVER_IMAGE = (
    f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY}/{CONTAINER_NAME}:{TAG}"
)

In [20]:
SERVER_IMAGE

'us-central1-docker.pkg.dev/imposing-mind-398223/meter-sam/landfill-test-serve:latest'

Create the repository in Artifact Registry.

### Build the custom container for serving

Next, you build and push a docker image to the created repository using Cloud Build. 

Learn more about the process of [Building and pushing a Docker image with Cloud Build](https://cloud.google.com/build/docs/build-push-docker-image).

In [21]:
%cd custom/serve
!gcloud builds submit --quiet --region={REGION} --tag=$SERVER_IMAGE
%cd ..

/home/jupyter/custom/serve
Creating temporary tarball archive of 4 file(s) totalling 5.5 KiB before compression.
Uploading tarball of [.] to [gs://imposing-mind-398223_cloudbuild/source/1697563778.821047-e9ae6d97c7cc4dcd8973d5ddbb5afee4.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/imposing-mind-398223/locations/us-central1/builds/6223885e-8185-4271-bc88-a8efa34941e1].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds;region=us-central1/6223885e-8185-4271-bc88-a8efa34941e1?project=78123506305 ].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "6223885e-8185-4271-bc88-a8efa34941e1"

FETCHSOURCE
Fetching storage object: gs://imposing-mind-398223_cloudbuild/source/1697563778.821047-e9ae6d97c7cc4dcd8973d5ddbb5afee4.tgz#1697563779027702
Copying gs://imposing-mind-398223_cloudbuild/source/1697563778.821047-e9ae6d97c7cc4dcd8973d5ddbb5afee4.tgz#1697563779027702...
/ [1 files][  2.6 KiB/  2.6 KiB]           

### Store script on your Cloud Storage bucket(optional)

Next, you package the serving folder into a compressed tar ball, and then store it in your Cloud Storage bucket.

In [23]:
! rm -f custom.tar custom.tar.gz
! tar cvf custom.tar custom
! gzip custom.tar
! gsutil cp custom.tar.gz gs://meter-sam/trainer/landfill_test.tar.gz

tar: custom: Cannot stat: No such file or directory
tar: Exiting with failure status due to previous errors
Copying file://custom.tar.gz [Content-Type=application/x-tar]...
/ [1 files][   56.0 B/   56.0 B]                                                
Operation completed over 1 objects/56.0 B.                                       


## Upload the model

Next, upload your model to a `Model` resource using `Model.upload()` method, with the following parameters:

- `display_name`: The human readable name for the `Model` resource.
- `artifact`: The Cloud Storage location of the trained model artifacts.
- `serving_container_image_uri`: The serving container image.
- `sync`: Whether to execute the upload asynchronously or synchronously.

If the `upload()` method is run asynchronously, you can subsequently block until completion with the `wait()` method.

Learn more about [Importing models to Vertex AI](https://cloud.google.com/vertex-ai/docs/general/import-model).

In [22]:
# The exported model directory
OUTPUT_DIR = "gs://meter-sam/test/model/"

# The serving port.
SERVE_PORT = 8080

# The task name
task="serving-sam"

# The model ID
model_id="facebook/sam-vit-base"

# The serving environment
serving_env = {
    "MODEL_ID": model_id,
    "TASK": task,
}

In [23]:
model = aiplatform.Model.upload(
    serving_container_image_uri=SERVER_IMAGE,
    artifact_uri=OUTPUT_DIR,
    serving_container_predict_route="/predict",
    serving_container_health_route="/health",
    serving_container_ports=[SERVE_PORT],
    serving_container_environment_variables=serving_env,
    display_name=task,
    sync=False,
)

## Make online predictions

You must deploy a model to an endpoint before that model can be used to serve online predictions. Deploying a model associates physical resources with the model so it can serve online predictions with low latency.

For more details, see [Overview of getting predictions on Vertex AI](https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api).

### Deploy the model

Next, deploy your model for online prediction. To deploy the model, you invoke the `deploy` method, with the following parameters:

- `deployed_model_display_name`: A human readable name for the deployed model.
- `traffic_split`: Percent of traffic at the endpoint that goes to this model, which is specified as a dictionary of one or more key/value pairs.
If only one model, then specify as { "0": 100 }, where "0" refers to this model being uploaded and 100 means 100% of the traffic.
If there are existing models on the endpoint, for which the traffic is split, then use model_id to specify as { "0": percent, model_id: percent, ... }, where model_id is the model id of an existing model to the deployed endpoint. The percents must add up to 100.
- `machine_type`: The type of machine to use for training.
- `accelerator_type`: The hardware accelerator type.
- `accelerator_count`: The number of accelerators to attach to a worker replica.
- `starting_replica_count`: The number of compute instances to initially provision.
- `max_replica_count`: The maximum number of compute instances to scale to. In this tutorial, only one instance is provisioned.

In [26]:
# Create the endpoint

In [24]:
DEPLOYED_NAME = "sam-serving"

TRAFFIC_SPLIT = {"0": 100}

MIN_NODES = 1
MAX_NODES = 1

endpoint = aiplatform.Endpoint.create(display_name=f"{DEPLOYED_NAME}-endpoint")

Creating Endpoint
Create Endpoint backing LRO: projects/78123506305/locations/us-central1/endpoints/1520055034191020032/operations/3214101667444162560
Creating Model
Create Model backing LRO: projects/78123506305/locations/us-central1/models/3256619301053923328/operations/2124230557620502528
Endpoint created. Resource name: projects/78123506305/locations/us-central1/endpoints/1520055034191020032
To use this Endpoint in another session:
endpoint = aiplatform.Endpoint('projects/78123506305/locations/us-central1/endpoints/1520055034191020032')


In [25]:
# Deploy the model to the endpoint

In [None]:
if DEPLOY_GPU:
    endpoint = model.deploy(
        endpoint=endpoint,
        deployed_model_display_name=DEPLOYED_NAME,
        traffic_split=TRAFFIC_SPLIT,
        machine_type=DEPLOY_COMPUTE,
        min_replica_count=MIN_NODES,
        max_replica_count=MAX_NODES,
        accelerator_type=DEPLOY_GPU,
        accelerator_count=DEPLOY_NGPU,
        service_account='78123506305-compute@developer.gserviceaccount.com',
        deploy_request_timeout=1800,
    )      
else:
    endpoint = model.deploy(
        endpoint=endpoint,
        deployed_model_display_name=DEPLOYED_NAME,
        traffic_split=TRAFFIC_SPLIT,
        machine_type=DEPLOY_COMPUTE,
        min_replica_count=MIN_NODES,
        max_replica_count=MAX_NODES,
        accelerator_type=DEPLOY_GPU,
        accelerator_count=0,
        service_account='78123506305-compute@developer.gserviceaccount.com',
        deploy_request_timeout=1800,
    )

Model created. Resource name: projects/78123506305/locations/us-central1/models/3256619301053923328@1
To use this Model in another session:
model = aiplatform.Model('projects/78123506305/locations/us-central1/models/3256619301053923328@1')
Deploying model to Endpoint : projects/78123506305/locations/us-central1/endpoints/1520055034191020032
Deploy Endpoint model backing LRO: projects/78123506305/locations/us-central1/endpoints/1520055034191020032/operations/445513796518150144


### Get test item

You use an example out of the test (holdout) portion of the dataset as a test item.

In [None]:
#Import libraries
from PIL import Image
import numpy as np
from io import BytesIO
import rasterio
import gcsfs

In [None]:
# Define the GCS path
gcs_path = 'gs://meter-sam/test/'
file_name='-104.884360899962_38.8362801837095.tif'

In [None]:
# Open the file using gcsfs and read the GeoTIFF with rasterio
fs = gcsfs.GCSFileSystem(project='imposing-mind-398223')
with fs.open(gcs_path+'image/'+file_name, 'rb') as f, rasterio.open(f) as src:
    transform = src.transform
    crs = src.crs

    # Read raster data
    array = src.read()
    
    # Squeeze out the first dimension to get (256, 256)
    array = array.squeeze()
    
    # Convert to PNG
    img = Image.fromarray(array.astype(np.uint8))  # Make sure to cast to uint8
    buffer = BytesIO()
    img.save(buffer, format="PNG")
    png_data = buffer.getvalue()

# Encode the PNG data to base64 and decode to string
image_str = base64.b64encode(png_data).decode('utf-8')

In [None]:
# import requests

# response = requests.post('projects/78123506305/locations/us-central1/models/1309111529378938880@1/predict', json=request)
# print(response.json())

### Prepare the request content
#### Request

Since in this example your test item is in a Cloud Storage bucket, you open and read the contents of the image using `tf.io.gfile.Gfile()`. To pass the test data to the prediction service, you encode the bytes into base64., which makes the content safe from modification while transmitting binary data over the network.

The format of each instance is:

    { serving_input: { 'b64': base64_encoded_bytes } }

Since the `predict()` method can take multiple items (instances), send your single test item as a list of one test item.

In [None]:
# # Prepare your request data
instances = [
    {"image": image_str}
]

In [None]:
# instance = {"image": image_str}

### Make the prediction

Now that your `Model` resource is deployed to an `Endpoint` resource, you can do online predictions by sending prediction requests to the Endpoint resource.

#### Response

The response from the `predict()` call is a Python dictionary with the following entries:

In [None]:
# The format of each instance should conform to the deployed model's prediction input schema.
Prediction = endpoint.predict(instances=instances).predictions

In [None]:
# Prediction

In [None]:
from typing import Dict, List, Union

from google.cloud import aiplatform
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value


def predict_custom_trained_model_sample(
    project: str,
    endpoint_id: str,
    instances: Union[Dict, List[Dict]],
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
):
    """
    `instances` can be either single instance of type dict or a list
    of instances.
    """
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)
    # The format of each instance should conform to the deployed model's prediction input schema.
    instances = instances if isinstance(instances, list) else [instances]
    instances = [
        json_format.ParseDict(instance_dict, Value()) for instance_dict in instances
    ]
    parameters_dict = {}
    parameters = json_format.ParseDict(parameters_dict, Value())
    endpoint = client.endpoint_path(
        project=project, location=location, endpoint=endpoint_id
    )
    response = client.predict(
        endpoint=endpoint, instances=instances, parameters=parameters
    )
    print("response")
    print(" deployed_model_id:", response.deployed_model_id)
    # The predictions are a google.protobuf.Value representation of the model's predictions.
    predictions = response.predictions
    for prediction in predictions:
        # print(" prediction:", dict(prediction))
        return dict(prediction)

In [None]:
# Gerenate prediction
prediction = predict_custom_trained_model_sample(
    project="imposing-mind-398223",
    endpoint_id="1520055034191020032",
    location="us-central1",
    instances=instances
)

In [None]:
# prediction

In [None]:
def base64_to_np_array(encoded_data):
    decoded_data = base64.b64decode(encoded_data)
    array_data = np.frombuffer(decoded_data, dtype=np.float32)
    return array_data

# Decode your results
mask_prob_array = base64_to_np_array(prediction["mask_probability"])
mask_predict_array = base64_to_np_array(prediction["mask_prediction"])

### Visulize and compare masks

In [None]:
import matplotlib.pyplot as plt

# Reshape arrays
mask_prob_array = mask_prob_array.reshape(256, 256)
mask_predict_array = mask_predict_array.reshape(256, 256)

# Load the ground truth mask:
with fs.open(gcs_path+'mask/'+file_name, 'rb') as f, rasterio.open(f) as src:
    ground_truth_array = src.read(1)

fig, ax = plt.subplots(1, 3, figsize=(18, 6))  # Change to 3 subplots

# Display mask probability
cax1 = ax[0].imshow(mask_prob_array, cmap='viridis')
fig.colorbar(cax1, ax=ax[0])
ax[0].set_title('Mask Probability')

# Display predicted mask
cax2 = ax[1].imshow(mask_predict_array, cmap='gray')
ax[1].set_title('Predicted Mask')

# Display ground truth mask
cax3 = ax[2].imshow(ground_truth_array, cmap='gray')
ax[2].set_title('Ground Truth Mask')

plt.tight_layout()
plt.show()

### Calculate prediction accuracy

In [None]:
correct_predictions = np.sum(ground_truth_array == mask_predict_array)
total_predictions = ground_truth_array.size

accuracy = correct_predictions / total_predictions
print(f"Accuracy: {accuracy*100:.2f}%")

### Convert and save masks to geotiff files on Google Cloud

In [64]:
def save_as_geotiff(data_array, file_name, transform, crs):
    with rasterio.open(
            file_name, 
            'w', 
            driver='GTiff', 
            height=data_array.shape[0],
            width=data_array.shape[1], 
            count=1, 
            dtype=data_array.dtype,
            crs=crs,
            transform=transform) as dst:
        dst.write(data_array, 1)

# Save the mask probability and prediction
predict_path='satellite_images/predict/'

save_as_geotiff(mask_prob_array, predict_path+'probability/-104.884360899962_38.8362801837095.tif', transform, crs)
save_as_geotiff(mask_predict_array, predict_path+'mask/-104.884360899962_38.8362801837095.tif', transform, crs)

RasterioIOError: Attempt to create new tiff file 'satellite_images/predict/probability/-104.884360899962_38.8362801837095.tif' failed: No such file or directory

In [None]:
# Upload Images to the Google Cloud Bucket
!gsutil -m cp -r {'satellite_images/predict'} gs://meter-sam/test

## Undeploy the model

When you are done doing predictions, you undeploy the model from the `Endpoint` resouce. This deprovisions all compute resources and ends billing for the deployed model.

In [None]:
endpoint.undeploy_all()

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

Set `delete_bucket` to **True** to delete the Cloud Storage bucket.

In [None]:
delete_bucket = False

# Delete the model using the Vertex model object
model.delete()

# Delete the endpoint using the Vertex endpoint object
endpoint.delete()

# Delete the custom trainig job
job.delete()

# Delete artifact repository
! gcloud artifacts repositories delete $REPOSITORY --location=$REGION --quiet

if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil rm -r $BUCKET_URI