## **Setting up Pipeline Job using Kubeflow SDK in Component Based Approach**

In [36]:
from kfp.v2.dsl import component, pipeline
from kfp.v2.dsl import Dataset, Output, Input, Metrics, Markdown, Artifact
from kfp.v2 import compiler

### Batch Prediction Component

In [37]:
@component(
    base_image="gcr.io/ml-ops-segment-anything/sam:latest",
    packages_to_install=[
        'imantics',
        'scipy',
        'matplotlib'
    ]
)
def batch_prediction(
    bucket_name: str,
    input_dir: str,
    output_dir: str,
    prompt_json_file: str,
    max_mask: int,
    visualization: Output[Markdown],
):
    """ The following imports are specific to t"""
    import torch
    from typing import Dict, List
    from segment_anything import sam_model_registry, SamPredictor, SamAutomaticMaskGenerator
    import base64
    import numpy as np
    import cv2
    import logging
    from google.cloud import storage
    import base64
    import json
    import time
    from imantics import Polygons, Mask
    from scipy.ndimage import median_filter
    import matplotlib.pyplot as plt

    def show_points(coords, labels, ax, marker_size=375):
        """
            Show marker points on the image for the co-ordinates.

            Parameters
            ----------
            coords : np.ndarray
                prompt co-ordinates
            labels : int
                object label
            ax : subplot
                current axes
            marker_size: int
                Optional
            Returns
            -------
            None
        """
        pos_points = coords[labels==1]
        neg_points = coords[labels==0]
        ax.scatter(pos_points[:, 0], pos_points[:, 1], color='green', marker='*', s=marker_size, edgecolor='white', linewidth=1.25)
        ax.scatter(neg_points[:, 0], neg_points[:, 1], color='red', marker='*', s=marker_size, edgecolor='white', linewidth=1.25)  

    def show_mask(mask, ax, random_color=False):
        """
            Applies mask for an object in the image.

            Parameters
            ----------
            mask : np.ndarray
                object mask
            ax : subplot
                current axes
            random_color: Boolean
                Optional

            Returns
            -------
            None
        """
        if random_color:
            color = np.concatenate([np.random.random(3), np.array([0.6])], axis=0)
        else:
            color = np.array([30/255, 144/255, 255/255, 0.6])
        h, w = mask.shape[-2:]
        mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
        ax.imshow(mask_image)
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    """ Get Model artifact from cloud storage """
    blob = bucket.blob('sam-checkpoint/sam_vit_b_01ec64.pth')
    blob.download_to_filename('sam_vit_b_01ec64.pth')
    
    """ Load the model """
    sam = sam_model_registry["vit_b"](checkpoint="sam_vit_b_01ec64.pth")
    print(torch.cuda.is_available())
    sam.to("cuda")
    
    """ Automatic mask generation """
    mask_generator = SamAutomaticMaskGenerator(sam)
    
    """ SAM Masking with prompts """
    predictor = SamPredictor(sam)
    
    """ Initialize Images """
    blobs = bucket.list_blobs(prefix=f"{input_dir}/")
    image_extensions = ('.png', '.jpg', '.jpeg')
    image_blobs = [blob for blob in blobs if blob.name.lower().endswith(image_extensions)]
    
    """ Load the Prompt JSON """
    blob = bucket.blob(f'{input_dir}/{prompt_json_file}')
    if blob.exists():
        prompt_json = json.loads(blob.download_as_text()) #returns prompts dictionary
    else:
        prompt_json = None
    print("prompt_json")
    print(prompt_json)

    
    def reshape_image(image, size=512):
        """
            Applies mask for an object in the image.

            Parameters
            ----------
            image : np.ndarray
                input image
            size : int
                size of resized image

            Returns
            -------
            ratio, resized_image
            
        """
        # Ratio for showing up in Markdown
        if image.shape[0] < size and image.shape[1] < size: 
            ratio = 1
        else: 
            ratio = size / max(image.shape[0], image.shape[1])
        
        width = int(image.shape[1] * ratio) #resized image width
        height = int(image.shape[0] * ratio) #resized image height
        resized_image = cv2.resize(image, (width, height)) # resized image
        return ratio, resized_image

    """ Prediction for all images """
    start_time_pred = time.time()
    filenames, results = [], []
    
    for image_blob in image_blobs:
        filename = image_blob.name.split('/')[-1]
        filenames.append(filename)
        start_time = time.time()
        """ Downloading the image """
        image_blob.download_to_filename(filename)
        print(f"Image Downloading: {time.time() - start_time}s")
        
        """ Reshape the image """
        ratio, image = reshape_image(cv2.cvtColor(cv2.imread(filename), cv2.COLOR_BGR2RGB))
        
        if prompt_json and filename in prompt_json: # Predicting with prompt inputs
            """ Predicting with prompt inputs """
            prompt_info = prompt_json[filename]
            prompt_input, label_input = [], []
            
            """ Adjusting prompt co-ordinates for the new resized image based on the ratio """
            for label, points in prompt_info.items():
                for point in points:
                    point = [int(c * ratio) for c in point]
                    prompt_input.append(point)
                    label_input.append(int(label))
            
            predictor.set_image(image)
            start_time = time.time()
            
            """ Predicting with prompts """
            masks, scores, logits = predictor.predict(
                                        point_coords=np.array(prompt_input),
                                        point_labels=np.array(label_input),
                                        multimask_output=False
                                    )
            print(f"Prediction with Prompt: {time.time() - start_time}s") # Time taken to predict
            start_time = time.time()
            
            result = {
                "filename": filename,
                "image": image,
                "prediction_type" : "Predicting with Prompts",
                "prompt_input": prompt_input,
                "label_input": label_input,
                "ratio": ratio
            }
            result["masks"], result["polygon_vertices"] = {}, {}
            for idx, mask in enumerate(masks[:max_mask]):
                """ Perform Median Filtering on the mask"""
                median_filter_size = int(min(mask.shape)//20)
                if median_filter_size % 2 == 0:
                    median_filter_size += 1
                mask = median_filter(mask, median_filter_size)
                result["masks"][f'mask_{idx}'] = mask
                
                """ Generating Polygons representation of masks """
                polygons = Mask(mask).polygons()
                result["polygon_vertices"][f'mask_{idx}'] = list(map(lambda x: x.tolist(), polygons.points))
            print(f"Postprocessing {time.time() - start_time}s")
            
        else: 
            start_time = time.time()
            
            """ Predicting without prompts / Automatic segmentation, segmenting all the objects in the image """
            masks = mask_generator.generate(image)
            print(f"Prediction without Prompt: {time.time() - start_time}s")
            result = {
                "filename": filename,
                "image": image,
                "prediction_type" : "Predicting without Prompts",
                "ratio": ratio
            }
            sorted_masks = sorted(masks, key=(lambda x: x['area']), reverse=True)
            result["masks"], result["polygon_vertices"] = {}, {}
            start_time = time.time()
            
            for idx, mask in enumerate(sorted(masks, key=(lambda x: x['area']), reverse=True)[:max_mask]):
                """ Perform Median Filtering on the mask"""
                mask = mask['segmentation']
                median_filter_size = int(min(mask.shape)//20)
                if median_filter_size % 2 == 0:
                    median_filter_size += 1
                mask = median_filter(mask, median_filter_size)
                result["masks"][f'mask_{idx}'] = mask
                
                """ Generating Polygons representation of masks """
                polygons = Mask(mask).polygons()
                result["polygon_vertices"][f'mask_{idx}'] = list(map(lambda x: x.tolist(), polygons.points))
            print(f"Postprocessing {time.time() - start_time}s")
            
        results.append(result)
        
        """ Clearing Cache to avoid out of memory issue """
        torch.cuda.empty_cache()
        
    print(f"Time Taken for predictions, including preprocessing and postprocessing: {time.time() - start_time_pred}s")

    """ Export Images with masks to Output DIR in Cloud Storage """
    start_time = time.time()
    alpha = 0.5
    for filename, result in zip(filenames, results):
        """ Saving the original image """
        original_image_blob = bucket.blob(f"{output_dir}/{filename}/original.jpg")
        original_image_blob.upload_from_filename(filename)
        for mask_name, mask in result["masks"].items():
            mask = np.array(mask)
            plt.figure(figsize=(10,10))
            plt.imshow(result['image'])
            
            """ Adding mask upon the image """
            show_mask(mask, plt)
            
            """ Showing markers on the object """
            if "prompt_input" in result:
                input_point = np.array(result['prompt_input'])
                input_label = np.array(result['label_input'])
                show_points(input_point, input_label, plt)
            plt.axis('off')
            
            """ Saving the image """
            plt.savefig(f"{mask_name}.jpg")
            plt.close()
            
            """ Upload the saved image to cloud storage """
            bucket.blob(f"{output_dir}/{filename}/{mask_name}.jpg").upload_from_filename(f"{mask_name}.jpg")
    
    print(f"Time Taken for image exportations: {time.time() - start_time}s")
    
    """ Export Results in JSON format to Output DIR """
    start_time = time.time()
    for result in results:
        output_result = {}
        output_result["filename"] = result["filename"]
        output_result["prediction_type"] = result["prediction_type"]
        output_result["polygon_vertices"] = result["polygon_vertices"]
        output_result["ratio"] = result["ratio"]
        json_object = json.dumps(output_result, indent=4)
        with open("result.json", "w") as outfile:
            outfile.write(json_object)
        bucket.blob(f"{output_dir}/{result['filename']}/result.json").upload_from_filename("result.json")
    print(f"Time Taken for json exportations: {time.time() - start_time}s")
        
    """ Static Visualization / Adding all the images with masks to view in a markdown"""
    with open(visualization.path, 'w') as f:
        for result in results:
            image = result["image"]
            f.write(f"# {result['filename']} \n")
            f.write(f"## Prediction Type: {result['prediction_type']} \n")
            f.write("<table>")
            f.write("<tr><td>")
            f.write(f'<img src="https://storage.cloud.google.com/{bucket_name}/{output_dir}/{result["filename"]}/original.jpg">')
            f.write("</td>")
            f.write("<td></td><td></td><td></td></tr>")
            for idx, mask_name in enumerate(list(result["masks"].keys())):
                if idx % 4 == 0:
                    f.write("<tr>")
                f.write(f'<td><img src="https://storage.cloud.google.com/{bucket_name}/{output_dir}/{result["filename"]}/{mask_name}.jpg"></td>')
                if idx % 4 == 3:
                    f.write("</tr>")
            if idx % 4 != 3:
                f.write("</tr>")
            f.write("</table>\n\n")

### **Refer to [Pipeline Data Prep Notebook](./pipeline_exp_data_prep.ipynb) before proceeding further** 

### **Pipeline Initialization**

In [38]:
""" Set "bucket_name", "input_dir", "output_dir" and "prompt_json_file" to what is used in the pipeline used in the pipeline """ 
@pipeline(
    pipeline_root= f"gs://{bucket_name}/pipeline",
    name="sam-pipeline-test-batch-5", # Name of the pipeline
)
def sam_pipeline(
    bucket_name: str = "{bucket_name}",
    input_dir: str = "{input_dir}", # Input directory with all the images
    output_dir: str = "{output_dir}", # Output directory where all the results are stored
    prompt_json_file: str = "json_prompts/{prompt_json_file}", # Prompts JSON file 
    max_mask: int = 10 #  Setting maximum number of masks per image
):
    get_batch_prediction_op = (batch_prediction(
        bucket_name=bucket_name,
        input_dir=input_dir, 
        output_dir=output_dir,
        prompt_json_file=prompt_json_file,
        max_mask=max_mask
    )
        .set_cpu_limit("8")
        .set_memory_limit("64G")
        .add_node_selector_constraint("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_T4")
        .set_gpu_limit(1)
    )

In [39]:
compiler.Compiler().compile(
    pipeline_func=sam_pipeline,
    package_path='sam_pipe_test.jsonl')



In [100]:
""" Set the "input_directory" and "bucket_name" to the input directory and the bucket used in the pipeline"""
!gsutil cp sam_pipe_test.jsonl gs://f{bucket_name}/f{input_directory}

Copying file://sam_pipe_test.json [Content-Type=application/json]...
/ [1 files][ 14.0 KiB/ 14.0 KiB]                                                
Operation completed over 1 objects/14.0 KiB.                                     


## Running the pipeline job from Notebook / It can also be done in Vertex AI console

In [40]:
from google.cloud import aiplatform

job = aiplatform.PipelineJob(display_name = 'sam_pipeline_test_5',
                             template_path = 'sam_pipe_test.json',
                             enable_caching = False,
                             # failure_policy = "slow",
                             project="ml-ops-segment-anything",
                             location="us-west1",
                            )

job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/633534855904/locations/us-west1/pipelineJobs/sam-pipeline-test-batch-5-20230802044816
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/633534855904/locations/us-west1/pipelineJobs/sam-pipeline-test-batch-5-20230802044816')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-west1/pipelines/runs/sam-pipeline-test-batch-5-20230802044816?project=633534855904


In [1]:
""" MAKE SURE TO CHECK KUBEFLOW VERSION IF FACING ISSUES RUNNING THE PIPELINE / v. 1.8.22 WORKS FINE"""
#!pip install kfp==1.8.22

' MAKE SURE TO CHECK KUBEFLOW VERSION IF FACING ISSUES RUNNING THE PIPELINE / v. 1.8.22 WORKS FINE'

In [42]:
#!pip list | grep "kfp"