# SKY Assignment

All the below steps are peformed in the notebook:
1. Update the model specification to include an additional convolutional layer containing 16 filters with a kernel size of 3.
2. Add a max pooling layer after each of the convolutional layers to reduce the dimensionality of the outputs passed to the next layer.
3. Add a callback to save a model checkpoint after every training epoch.
4. Define a simple Vertex AI pipeline that:
    1. Trains the model as specified in part (3).
    2. Uploads the trained model to Vertex AI as a model resource.
    3. Deploys the model resource to the an endpoint resource.
5. Provide code to compile and run the pipeline.

## Introduction
This notebook outlines the process to train a deep learning model on the CIFAR-10 dataset using TensorFlow. The model will then be deployed to Vertex AI.


### Prerequisites
- Ensure you have access to a Google Cloud project with Vertex AI, Artifact Registry and Google Cloud Storage API enabled. 

## Setup
- First, we import necessary libraries and set up environment variables. Google Cloud credentials should be correctly configured for this notebook to interact with Google Cloud resources.
- To begin with, I have created a service account in IAM service account with owner role and downloaded the json key credentials file.

In [8]:
from kfp import dsl
from kfp.dsl import (component, 
                     Metrics,
                     OutputPath,
                     Output)
import google.cloud.aiplatform as aiplatform
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "myproject-220424-39ec352fe30c.json"


In [9]:
from google.cloud import storage

storage_client = storage.Client()
# List buckets
for bucket in storage_client.list_buckets():
    print(bucket.name)


cloud-ai-platform-973f4c90-d1ae-41e0-908a-311b0ed228b9
sky-assign-bucket-final-23042024
sky-project-bucket-22042024
sky_assign_bucket_23042024


## Define Parameters
- Here we define parameters like project name, bucket name, REGION, PIPELINE_ROOT and BASE_IMAGE. These should be updated according to your Google Cloud setup.
- I have built and pushed the BASE_IMAGE image to Google Artifact Registry. Please find the code for build and push the image in "docker_build.sh" and "docker_push_gcr.sh" file. Also find the requirements.txt file which includes the dependencies. 
  - Create a repository in Google artifact registry for storing the docker images.
  - Make sure edit the parameters such as PROJECT_ID, REGION, REPOSITORY, IMAGE and IMAGE_TAG.
  - Run "docker_build.sh" and then Run "docker_push_gcr.sh"
- Once Docker image is pushed we can define the base image path (mentioned in docker_push_gcr.sh) below.


In [10]:
BUCKET_NAME = "sky-assign-bucket-final-23042024"
PROJECT_NAME = "myproject-220424"
REGION = "europe-west2"
BASE_IMAGE = f"{REGION}-docker.pkg.dev/{PROJECT_NAME}/sky-assign/training:latest"
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root/"

## Define Training Component
- This component is responsible for training the model using TensorFlow.
- Create a folder named 'checkpoints' in the same bucket.
- It involves:
    - Data loading
    - Preprocessing
    - Model Training
    - Checkpoint Callback to save a model checkpoint after every training epoch
    - Model Saving
    - returns the model path that further used in deploy component 

- This training script not only trains the model but also logs training and validation metrics, evaluates the model on a test dataset, and saves the model to Google Cloud Storage.

In [13]:
@component(base_image=BASE_IMAGE)
def train_model(project: str, bucket_name: str, model_path: OutputPath(str), metrics: Output[Metrics]) -> str:
    import tensorflow_datasets as tfds
    import tensorflow as tf
    import os
    
    def scale(image, label):
        image = tf.cast(image, tf.float32)
        image /= 255.0
        return image, label

    BUFFER_SIZE = 10000
    BATCH_SIZE = 64
    VAL_SPLIT = 0.2 

    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(32, 32, 3)),
        tf.keras.layers.MaxPooling2D(2),
        tf.keras.layers.Conv2D(16, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(2),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    model.compile(
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
        metrics=['accuracy'])    
    
    # Model checkoint 
    checkpoint_path = f'gs://{bucket_name}/checkpoints'
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath= os.path.join(checkpoint_path,'epoch_{epoch}'),
    save_best_only=False)
    
    # Load the dataset 
    datasets, info = tfds.load(name='cifar10', with_info=True, as_supervised=True)
    
    # Split the dataset to train, validation and test set
    train_data = datasets['train'].map(scale).shuffle(BUFFER_SIZE)
    val_size = int(VAL_SPLIT * info.splits['train'].num_examples)
    train_size = info.splits['train'].num_examples - val_size
    train_dataset = train_data.take(train_size).repeat().batch(BATCH_SIZE)
    val_dataset = train_data.skip(train_size).batch(BATCH_SIZE)
    test_dataset = datasets['test'].map(scale).batch(BATCH_SIZE)
    
    # Model training
    history = model.fit(
        x=train_dataset,
        epochs=10,
        steps_per_epoch=train_size // BATCH_SIZE,
        validation_data=val_dataset,
        callbacks=[checkpoint_callback])
    
    # Logging train metrics 
    for metric, values in history.history.items():
        metrics.log_metric('train_'+metric, float(values[-1]))
    
    # Evaluate the model on the test set and logging test metrics
    test_loss, test_accuracy = model.evaluate(test_dataset)
    metrics.log_metric("test_loss", float(test_loss))
    metrics.log_metric("test_accuracy", float(test_accuracy))
    
    # Save the model in the GCP storage bucket
    model_dir = f"gs://{bucket_name}/models/"
    model_path_str = os.path.join(model_dir, "my_model")
    model.save(model_path_str)

    # Write the model path to the output path
    with open(model_path, 'w') as f:
        f.write(model_path_str)
    


## Define Deployment Component
- After training, this component is responsible for deploying the trained model to Vertex AI for serving predictions. 
    - The model is uploaded as a Vertex AI Model resource.
    - Then deployed to an endpoint.
- This component uploads the trained model to Vertex AI and creates an endpoint for it. The model is then deployed to this endpoint, making it ready for predictions.

In [14]:
@component(base_image=BASE_IMAGE)
def deploy_model(project: str, location: str, model_path: str):
    from google.cloud.aiplatform import Model
    from google.cloud.aiplatform import Endpoint
    from datetime import datetime
    
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    model = Model.upload(
        display_name="cifar10_model",
        artifact_uri=model_path,
        serving_container_image_uri="europe-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-10:latest"
    )
    endpoint = Endpoint.create(display_name="cifar10_endpoint")
    # Upload model to vertex ai model resourse
    endpoint.deploy(model=model, deployed_model_display_name='"deployed_cifar10_model_{}"'.format(TIMESTAMP), machine_type="n1-standard-4")

## Define and Compile the Pipeline
- This pipeline orchestrates the training and deployment process. It defines the workflow where the model is first trained and then deployed.
- This code snippet defines a Kubeflow Pipeline that first trains a CIFAR-10 model and then deploys it. The compiled pipeline can be submitted to Vertex AI Pipelines.

In [15]:
from kfp import compiler
from google.cloud.aiplatform import pipeline_jobs

@dsl.pipeline(name="cifar10-training-deployment",
              pipeline_root = PIPELINE_ROOT)

def pipeline(project: str, bucket_name: str):
    train_op = train_model(project=PROJECT_NAME, bucket_name=BUCKET_NAME)
    deploy_model(project=PROJECT_NAME, location="eu-west2", model_path=train_op.outputs['model_path'])

# Compile the pipeline
compiler.Compiler().compile(pipeline_func=pipeline, package_path="cifar10_pipeline.json")

## Execute the Pipeline
- Finally, execute the compiled pipeline on Vertex AI. This step requires specifying the project, region, and other necessary parameters.
- This command initiates the pipeline run on Vertex AI, where it executes according to the defined tasks—training the model and then deploying it.

In [16]:
from google.cloud.aiplatform import PipelineJob

# Parameters
project_id = PROJECT_NAME
bucket_name = BUCKET_NAME
region = REGION

pipeline_job = PipelineJob(
    display_name="cifar10_pipeline",
    template_path="cifar10_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        'project': project_id,
        'bucket_name': bucket_name
    },
    location=region,
)

pipeline_job.run()


Creating PipelineJob
PipelineJob created. Resource name: projects/1009435750450/locations/europe-west2/pipelineJobs/cifar10-training-deployment-20240424120326
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/1009435750450/locations/europe-west2/pipelineJobs/cifar10-training-deployment-20240424120326')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west2/pipelines/runs/cifar10-training-deployment-20240424120326?project=1009435750450
PipelineJob projects/1009435750450/locations/europe-west2/pipelineJobs/cifar10-training-deployment-20240424120326 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1009435750450/locations/europe-west2/pipelineJobs/cifar10-training-deployment-20240424120326 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1009435750450/locations/europe-west2/pipelineJobs/cifar10-training-deployment-20240424120326 current state:
PipelineState.PIPELINE_S