In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI: Medical imaging pipeline with AutoML Vision and custom steps

## Overview

This notebook shows how to use the components defined in `google_cloud_pipeline_components` to build an AutoML Vision workflow with Vertex AI. It also demonstrates a custom data pre-processing step.

### Objective

In this example, you'll learn how to use components from `google_cloud_pipeline_components` to:
- create a _Dataset_
- train an AutoML Vision model
- deploy the trained model to an _endpoint_ for serving

You'll also learn how to create a custom component and include it in a pipeline.

### Usage
The pneumonia detection model in this notebook is intended for **demonstration purposes only**. This model is not intended for use in clinical diagnosis or clinical decision-making or for any other clinical use, and the performance of the model for clinical use has not been established.

### Dataset

The data used for this example comes from the [RSNA 2018 Pneumonia Detection Challenge](https://www.rsna.org/education/ai-resources-and-training/ai-image-challenge/RSNA-Pneumonia-Detection-Challenge-2018). The data is also available in a [Kaggle dataset](https://www.kaggle.com/c/rsna-pneumonia-detection-challenge/data). The dataset was originally drawn from [NIH Chest X-ray Dataset](https://nihcc.app.box.com/v/ChestXray-NIHCC)

### Costs 

This tutorial uses billable components of Google Cloud:

* Vertex AutoML
* Vertex AI Pipeline
* Cloud Storage

Learn about [AI Platform (Unified)
pricing](https://cloud.google.com/ai-platform-unified/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Resources
* Pipeline adapted from this sample: [AutoML Images pipelines using google-cloud-pipeline-components](https://github.com/GoogleCloudPlatform/ai-platform-samples/blob/master/ai-platform-unified/notebooks/official/pipelines/google-cloud-pipeline-components_automl_images.ipynb)
* Prediction code adapted from this sample: [predict_image_classification_sample.py](https://github.com/googleapis/python-aiplatform/blob/HEAD/samples/snippets/predict_image_classification_sample.py)


### References
* Xiaosong Wang, Yifan Peng, Le Lu, Zhiyong Lu, Mohammadhadi Bagheri, Ronald Summers, ChestX-ray8: Hospital-scale Chest X-ray Database and Benchmarks on Weakly-Supervised Classification and Localization of Common Thorax Diseases, IEEE CVPR, pp. 3462-3471, 2017
* George Shih , Carol C. Wu, Safwan S. Halabi, Marc D. Kohli, Luciano M. Prevedello, Tessa S. Cook, Arjun Sharma, Judith K. Amorosa, Veronica Arteaga, Maya Galperin-Aizenberg, Ritu R. Gill, Myrna C.B. Godoy, Stephen Hobbs, Jean Jeudy, Archana Laroia, Palmi N. Shah, Dharshan Vummidi, Kavitha Yaddanapudi, Anouk Stein, Augmenting the National Institutes of Health Chest Radiograph Dataset with Expert Annotations of Possible Pneumonia, Radiology: AI, January 30, 2019, https://doi.org/10.1148/ryai.2019180041


## Set up your local development environment

**If you are using Colab or AI Platform Notebooks**, your environment already meets
all the requirements to run this notebook. You can skip this step.

**Otherwise**, make sure your environment meets this notebook's requirements.
You need the following:

* The Google Cloud SDK
* Git
* Python 3
* virtualenv
* Jupyter notebook running in a virtual environment with Python 3

The Google Cloud guide to [Setting up a Python development
environment](https://cloud.google.com/python/setup) and the [Jupyter
installation guide](https://jupyter.org/install) provide detailed instructions
for meeting these requirements. The following steps provide a condensed set of
instructions:

1. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/)

1. [Install Python 3.](https://cloud.google.com/python/setup#installing_python)

1. [Install
   virtualenv](https://cloud.google.com/python/setup#installing_and_using_virtualenv)
   and create a virtual environment that uses Python 3. Activate the virtual environment.

1. To install Jupyter, run `pip install jupyter` on the
command-line in a terminal shell.

1. To launch Jupyter, run `jupyter notebook` on the command-line in a terminal shell.

1. Open this notebook in the Jupyter Notebook Dashboard.

## Before you begin

This notebook does not require a GPU runtime.

### Install additional packages


Set project properties:

In [None]:
PROJECT_ID = 'YOUR-PROJECT'  # <---CHANGE THIS @param {type:"string"}

In [None]:
!gcloud config set project {PROJECT_ID}

Check which notebook environment is being used, and authenticate if using Colab:

In [None]:
import sys

if 'google.colab' in sys.modules:
  USER_FLAG = ''
  from google.colab import auth
  auth.authenticate_user()
else:
  USER_FLAG = '--user'

Install libraries:

In [None]:
!pip3 install -U {USER_FLAG} kfp google-cloud-aiplatform google-cloud-pipeline-components

### Restart the kernel

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the versions of the packages you installed.  The KFP SDK version should be >=1.6.

In [None]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"


## Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

1. [Enable the AI Platform (Unified) API and Compute Engine API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,compute_component). {TODO: Update the APIs needed for your tutorial. Edit the API names, and update the link to append the API IDs, separating each one with a comma. For example, container.googleapis.com,cloudbuild.googleapis.com}

1. If you are running this notebook locally, you will need to install the [Cloud SDK](https://cloud.google.com/sdk).

1. Enter your project ID in the cell below. Then run the cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands.

#### Set your project ID

**If you don't know your project ID**, you may be able to get your project ID using `gcloud`.

In [None]:
import os

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Otherwise, set it here:

In [None]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "python-docs-samples-tests"  # @param {type:"string"}

In [None]:
!gcloud config set project {PROJECT_ID}

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append it onto the name of resources you create in this tutorial.

In [None]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Authenticate your Google Cloud account

**If you are using AI Platform Notebooks**, your environment is already
authenticated. Skip this step.

**If you are using Colab**, run the cell below and follow the instructions
when prompted to authenticate your account via oAuth.

**Otherwise**, follow these steps:

1. In the Cloud Console, go to the [**Create service account key**
   page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).

2. Click **Create service account**.

3. In the **Service account name** field, enter a name, and
   click **Create**.

4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type "AI Platform"
into the filter box, and select
   **AI Platform Administrator**. Type "Storage Object Admin" into the filter box, and select **Storage Object Admin**.

5. Click *Create*. A JSON file that contains your key downloads to your
local environment.

6. Enter the path to your service account key as the
`GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell.

In [None]:
import os
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

# If on AI Platform, then don't execute this code
if not os.path.exists("/opt/deeplearning/metadata/env_version"):
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

### Create a Cloud Storage bucket as necessary

You will need a Cloud Storage bucket for this example.  If you don't have one that you want to use, you can make one now.


Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets.

You may also change the `REGION` variable, which is used for operations
throughout the rest of this notebook. Make sure to [choose a region where AI Platform (Unified) services are
available](https://cloud.google.com/ai-platform-unified/docs/general/locations#available_regions). You may
not use a Multi-Regional Storage bucket for training with AI Platform.

In [None]:
BUCKET_NAME = "YOUR-REGIONAL-BUCKET"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}

In [None]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "gs://[your-bucket-name]":
    BUCKET_NAME = "gs://" + PROJECT_ID + "aip-" + TIMESTAMP

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $REGION $BUCKET_NAME

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [None]:
! gsutil ls -al $BUCKET_NAME

### Import libraries and define constants

In [None]:
# Environment settings

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

_ = !gcloud auth list --filter=status:ACTIVE --format="value(account)"    
USER = _[0].split('-')[0]

PIPELINE_ROOT = '{}/pipeline_root/{}'.format(BUCKET_NAME, USER)

PIPELINE_ROOT

In [None]:
# Pipeline settings

# Change this to the location in your bucket where you've stored in the images
BUCKET_PATH = 'data/pneumonia'

INPUT_IMAGES_URI=f'{BUCKET_NAME}/{BUCKET_PATH}/stage_2_train_images'
INPUT_LABELS_URI=f'{BUCKET_NAME}/{BUCKET_PATH}/stage_2_train_labels.csv'
OUTPUT_IMAGE_FORMAT='.png'
OUTPUT_IMAGES_URI=f'{BUCKET_NAME}/{BUCKET_PATH}/stage_2_train_images_converted'
OUTPUT_ANNOTATION_SET_URI=f'{BUCKET_NAME}/{BUCKET_PATH}/stage_2_train/classification.csv'

Do some imports:

In [None]:
import base64
import uuid
from typing import NamedTuple
from urllib.parse import urlparse

import kfp
from aiplatform.pipelines import client
from google.cloud import aiplatform, storage
from google.cloud.aiplatform.gapic.schema import predict
from google_cloud_pipeline_components import aiplatform as gcc_aip
from IPython.display import Image
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component)
from kfp.v2.google.client import AIPlatformClient

## Define custom component for data pre-processing


In [None]:
# Base image to used for custom component. Additional packages will be specified with packages_to_install parameter
CONTAINER_GCR_URI='gcr.io/deeplearning-platform-release/tf2-cpu.2-5'

# Component definition
@component(output_component_file="preproc.yaml", base_image=CONTAINER_GCR_URI, packages_to_install=['fsspec', 'gcsfs', 'pandas', 'pillow', 'pydicom'])
def preprocess_data(
    project: str,
    input_images_uri: str,
    input_labels_uri: str,
    output_image_format: str,
    output_images_uri: str,
    output_annotation_set_uri: str,    
) -> NamedTuple(
    "Outputs",
    [
        ("images_uri", str),
        ("annotation_set_uri", str),
    ],
):
    import os
    from glob import glob
    from pathlib import Path
    from urllib.parse import urlparse

    import pandas as pd
    import pydicom
    from google.cloud import storage
    from PIL import Image

    # Temporary locations in container for converting images
    INPUT_IMAGES_DIR = '/var/tmp/input_images'
    OUTPUT_IMAGES_DIR = '/var/tmp/output_images'
    OUTPUT_ANNOTATION_PATH = '/var/tmp/'
    
    storage_client = storage.Client(project=project)

    # Extract bucket name and path from gs:// URI
    def _parse_uri(uri):
        parsed_uri = urlparse(uri)
        bucket_name = parsed_uri.netloc
        path = parsed_uri.path[1:]
        return (bucket_name, path)

    # Download source images
    def _download_images(input_images_uri, test_mode=False):
        Path(INPUT_IMAGES_DIR).mkdir(parents=True, exist_ok=True)
        bucket_name, path = _parse_uri(input_images_uri)
        bucket = storage_client.get_bucket(bucket_name)
        blobs = bucket.list_blobs(prefix=path)

        # The image conversion process takes a long time.
        # To see if the pipeline works, set test_mode to True the first time
        if test_mode:
            blob = next(iter(blobs))
            filename = os.path.basename(blob.name)
            blob.download_to_filename(os.path.join(INPUT_IMAGES_DIR, filename))
        else:
            for blob in blobs:
                filename = os.path.basename(blob.name)
                blob.download_to_filename(os.path.join(INPUT_IMAGES_DIR, filename))

    # Convert images from DICOM to new image format (PNG is recommended lossless format)
    def _convert_images(output_image_format):
        Path(OUTPUT_IMAGES_DIR).mkdir(parents=True, exist_ok=True)
        files = [os.path.basename(x) for x in glob(os.path.join(INPUT_IMAGES_DIR, "*.dcm"))]

        # Convert images
        for f in files:
            ds = pydicom.read_file(os.path.join(INPUT_IMAGES_DIR, f))
            img = Image.fromarray(ds.pixel_array)
            img.save(os.path.join(OUTPUT_IMAGES_DIR, f.replace(".dcm", output_image_format)))

    # Create annotation set which contains image locations and labels in format AutoML expects
    def _create_annotation_set(input_labels_uri, output_images_uri, output_image_format, output_annotation_set_uri):
        _, path = _parse_uri(output_annotation_set_uri)
        output_annotation_file = os.path.basename(path)
        df_in = pd.read_csv(input_labels_uri)
        df_out = pd.DataFrame()
        df_out["set"] = ""
        df_out["file"] = output_images_uri + '/' + df_in.patientId + output_image_format
        df_out["target"] = df_in.Target
        df_out.drop_duplicates(inplace=True)
        df_out.to_csv(os.path.join(OUTPUT_ANNOTATION_PATH, output_annotation_file), index=False)

    # Upload converted images to Cloud Storage
    def _upload_images(output_images_uri):
        bucket_name, path = _parse_uri(output_images_uri)
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(path)
        files = [os.path.basename(x) for x in glob(os.path.join(OUTPUT_IMAGES_DIR, "*"))]
        for f in files:
            blob = bucket.blob(f'{path}/{f}')
            blob.upload_from_filename(os.path.join(OUTPUT_IMAGES_DIR, f))

    # Upload annotation set file to Cloud Storage
    def _upload_annotation_set(output_annotation_set_uri):
        bucket_name, path = _parse_uri(output_annotation_set_uri)
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(path)
        filename = os.path.basename(path)
        blob.upload_from_filename(os.path.join(OUTPUT_ANNOTATION_PATH, filename))

    _download_images(input_images_uri)
    _convert_images(output_image_format)
    _create_annotation_set(input_labels_uri, output_images_uri, output_image_format, output_annotation_set_uri)
    _upload_images(output_images_uri)
    _upload_annotation_set(output_annotation_set_uri)
    return (output_images_uri, output_annotation_set_uri)

## Define the pipeline

Define pipeline with each step implemented by an operator.

This pipeline is using one custom component defined earlier, `preprocess_data`,
and 3 components included in the `google_cloud_pipeline_components` package.

In [None]:
@kfp.dsl.pipeline(name="pneumonia-detection")
def pipeline(project:str, input_images_uri:str, input_labels_uri:str, output_image_format:str,
             output_images_uri:str, output_annotation_set_uri:str):

    # Preprocess data
    preprocess_data_op = preprocess_data(
        project=PROJECT_ID,
        input_images_uri=input_images_uri,
        input_labels_uri=input_labels_uri,
        output_image_format=output_image_format,
        output_images_uri = output_images_uri,
        output_annotation_set_uri=output_annotation_set_uri
    )

    # Create dataset
    dataset_create_op = gcc_aip.ImageDatasetCreateOp(
        project=PROJECT_ID,
        display_name="pneumonia",
        gcs_source=preprocess_data_op.outputs['annotation_set_uri'],
        import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification
    )

    # Train model
    training_op = gcc_aip.AutoMLImageTrainingJobRunOp(
        project=PROJECT_ID,
        display_name='train-pneumonia-detection_1',
        prediction_type='classification',
        model_type='CLOUD',
        dataset=dataset_create_op.outputs['dataset'],
        budget_milli_node_hours=8000
    )

    # Deploy model
    deploy_op = gcc_aip.ModelDeployOp(
        model=training_op.outputs["model"],
        project=PROJECT_ID
    )

## Compile and run the pipeline

Now, you're ready to compile the pipeline:

In [None]:
JOB_SPEC = 'pneumonia_detection_pipeline.json'

compiler.Compiler().compile(pipeline_func=pipeline,
        package_path=JOB_SPEC)

The pipeline compilation generates the `penumonia_detection_pipeline.json` job spec file.

Next, you'll instantiate an API client object:

In [None]:
api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

Then, you run the defined pipeline like this: 

In [None]:
response = api_client.create_run_from_job_spec(JOB_SPEC,
                                               parameter_values={
                                                   'project': PROJECT_ID,
                                                   'input_images_uri': INPUT_IMAGES_URI,
                                                   'input_labels_uri': INPUT_LABELS_URI,
                                                   'output_image_format': OUTPUT_IMAGE_FORMAT,
                                                   'output_images_uri': OUTPUT_IMAGES_URI,
                                                   'output_annotation_set_uri': OUTPUT_ANNOTATION_SET_URI
                                                                },
                                               pipeline_root=PIPELINE_ROOT)

Click on the generated link to see your run in the Cloud Console.  It should look something like this as it is running:

<a href="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" width="40%"/></a>

## Make prediction

### Constants

In [None]:
LOCAL_IMAGES_PATH = 'images'
PNEUMONIA_IMAGE = 'ce8e0d85-44a8-4b05-b421-cc5c510aa7a5.png'
NO_PNEUMONIA_IMAGE = '4c52f9c5-6ce0-4e28-ae5d-ddb37f374f31.png'

### Copy files into local directory for prediction

In [None]:
!mkdir -p '{LOCAL_IMAGES_PATH}'
!gsutil cp '{OUTPUT_IMAGES_URI}/{PNEUMONIA_IMAGE}' '{LOCAL_IMAGES_PATH}'
!gsutil cp '{OUTPUT_IMAGES_URI}/{NO_PNEUMONIA_IMAGE}' '{LOCAL_IMAGES_PATH}'

### Helper functions

In [None]:
def predict_image_classification_automl(
    project: str,
    endpoint_id: str,
    filename: str,
    location: str = REGION,
    api_endpoint: str = f"{REGION}-aiplatform.googleapis.com",
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    prediction_client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)
    
    with open(filename, "rb") as f:
        file_content = f.read()    
        
    # The format of each instance should conform to the deployed model's prediction input schema.
    encoded_content = base64.b64encode(file_content).decode("utf-8")
    instance = predict.instance.ImageClassificationPredictionInstance(
        content=encoded_content).to_value()
    instances = [instance]
    # See gs://google-cloud-aiplatform/schema/predict/params/image_classification_1.0.0.yaml for the format of the parameters.
    parameters = predict.params.ImageClassificationPredictionParams(
        confidence_threshold=0.5, max_predictions=5,
    ).to_value()
    endpoint = prediction_client.endpoint_path(
        project=project, location=location, endpoint=endpoint_id
    )
    response = prediction_client.predict(
        endpoint=endpoint, instances=instances, parameters=parameters
    )
    print("response")
    print(" deployed_model_id:", response.deployed_model_id)
    # See gs://google-cloud-aiplatform/schema/predict/prediction/classification.yaml for the format of the predictions.
    predictions = response.predictions
    for prediction in predictions:
        print(" prediction:", dict(prediction))

### Make predictions

Access endpoint ID which will be used to make prediction 

In [None]:
try:
    endpoint_name = endpoint.name
except:
    # Use last endpoint if no endpoint
    endpoint_name = aiplatform.Endpoint.list()[0].name

Predict on an image without pneumonia:

In [None]:
display(Image(f'{LOCAL_IMAGES_PATH}/{NO_PNEUMONIA_IMAGE}', width=384, height=384))
predict_image_classification_automl(project=PROJECT_ID, endpoint_id=endpoint_name, location=REGION, filename=f'{LOCAL_IMAGES_PATH}/{NO_PNEUMONIA_IMAGE}')

Predict on an image with pneumonia:

In [None]:
display(Image(f'{LOCAL_IMAGES_PATH}/{PNEUMONIA_IMAGE}', width=384, height=384))
predict_image_classification_automl(project=PROJECT_ID, endpoint_id=endpoint_name, location=REGION, filename=f'{LOCAL_IMAGES_PATH}/{PNEUMONIA_IMAGE}')

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

{**TODO**: Include commands to delete individual resources below. Include deletion of scheduled job.}

In [None]:
# Delete Cloud Storage objects that were created
! gsutil -m rm -r $PIPELINE_ROOT