## Training on AI Platform

Since we have a large number of files, we need to load the data using a csv generator rather than copy all the data to the VM. For this we will need to do preprocessing. We need to create a CSV file which has the path and the lab as two columns.

In [None]:
%%bash
for c in $(gsutil ls gs://open_project/chest_xray/test/NORMAL)
do echo $c,NORMAL;
done >> labels.csv;

In [None]:
%%bash
for c in $(gsutil ls gs://open_project/chest_xray/test/PNEUMONIA)
do echo $c,PNEUMONIA;
done >> labels.csv;

Split the training data to 80% train and 20% validation for training

In [28]:
import pandas as pd
df = pd.read_csv('/home/jupyter/open-project-chest-x-rays /labels.csv')
df = df.sample(frac=1).reset_index(drop=True)
train = df.sample(frac=0.8, random_state=123)
validation = df.loc[~df.index.isin(train.index)]
train.to_csv('train.csv', index=False)
validation.to_csv('validation.csv', index=False)

In [29]:
!gsutil cp train.csv gs://open_project/chest_xray/train.csv

Copying file://train.csv [Content-Type=text/csv]...
/ [1 files][326.6 KiB/326.6 KiB]                                                
Operation completed over 1 objects/326.6 KiB.                                    


In [30]:
!gsutil cp validation.csv gs://open_project/chest_xray/val.csv

Copying file://validation.csv [Content-Type=text/csv]...
/ [1 files][ 81.9 KiB/ 81.9 KiB]                                                
Operation completed over 1 objects/81.9 KiB.                                     


### Loading data

In [37]:
%%writefile trainer/task.py
import argparse
import json
import os

from trainer import model

import tensorflow as tf

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--job-dir",
        help="this model ignores this field, but it is required by gcloud",
        default="junk"
    )
    parser.add_argument(
        "--train_data_path",
        help="GCS location of training csv file",
        required=True
    )
    parser.add_argument(
        "--eval_data_path",
        help="GCS location of evaluation csv file",
        required=True
    )
    parser.add_argument(
        "--output_dir",
        help="GCS location to write checkpoints and export models",
        required=True
    )
    parser.add_argument(
        "--batch_size",
        help="Number of examples to compute gradient over.",
        type=int,
        default=32
    )
    parser.add_argument(
        "--num_epochs",
        help="Number of epochs to train the model.",
        type=int,
        default=10
    )
    parser.add_argument(
        "--eval_steps",
        help="""Positive number of steps for which to evaluate model. Default
        to None, which means to evaluate until input_fn raises an end-of-input
        exception""",
        type=int,
        default=None
    )

    # Parse all arguments
    args = parser.parse_args()
    arguments = args.__dict__

    # Unused args provided by service
    arguments.pop("job_dir", None)
    arguments.pop("job-dir", None)

    # Modify some arguments
    arguments["train_examples"] *= 1000

    # Append trial_id to path if we are doing hptuning
    # This code can be removed if you are not using hyperparameter tuning
#     arguments["output_dir"] = os.path.join(
#         arguments["output_dir"],
#         json.loads(
#             os.environ.get("TF_CONFIG", "{}")
#         ).get("task", {}).get("trial", "")
#     )

    # Run the training job
    model.train_and_evaluate(arguments)

Overwriting trainer/task.py


In [55]:
%%writefile trainer/model.py
import tensorflow as tf
import pathlib
import os
import datetime
import numpy as np
from tensorflow.keras import Sequential
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard
from tensorflow.keras import layers

from tensorflow.keras.layers import Dense, Flatten, Softmax

IMG_HEIGHT = 224
IMG_WIDTH = 224
IMG_CHANNELS = 3

CLASS_NAMES = ['NORMAL','PNEUMONIA']

AUTOTUNE = tf.data.experimental.AUTOTUNE

VALIDATION_IMAGES = 370

def decode_img(img, reshape_dims):
    # Convert the compressed string to a 3D uint8 tensor.
    img = tf.image.decode_jpeg(img, channels=IMG_CHANNELS)
    # Use `convert_image_dtype` to convert to floats in the [0,1] range.
    img = tf.image.convert_image_dtype(img, tf.float32)
    # Resize the image to the desired size.
    return tf.image.resize(img, reshape_dims)


def decode_csv(csv_row):
    record_defaults = ["path", "label"]
    filename, label_string = tf.io.decode_csv(csv_row, record_defaults)
    image_bytes = tf.io.read_file(filename=filename)
    label = tf.math.equal(CLASS_NAMES, label_string)
    return image_bytes, label

def read_and_preprocess(image_bytes, label, random_augment=False):
    img = decode_img(image_bytes, [IMG_WIDTH, IMG_HEIGHT])
    return img, label

def load_dataset(csv_of_filenames, batch_size, training=True):
    dataset = tf.data.TextLineDataset(filenames=csv_of_filenames) \
        .map(decode_csv).cache()

    if training:
        dataset = dataset \
            .map(read_and_preprocess) \
            .shuffle(10*batch_size) \
            .repeat(count=None)  # Indefinately.
    else:
        dataset = dataset \
            .map(read_and_preprocess) \
            .repeat(count=1)  # Each photo used once.

    # Prefetch prepares the next set of batches while current batch is in use.
    return dataset.batch(batch_size=batch_size).prefetch(buffer_size=AUTOTUNE)

def build_model():
    num_classes = 2

    model = Sequential([
        layers.experimental.preprocessing.Rescaling(1./255),
        Flatten(),
        Dense(num_classes),
        Softmax()
    ])

    model.compile(
      optimizer='adam',
      loss='categorical_crossentropy',
      metrics=['accuracy']) #since the last layer is a softmax we can only measure accuracy
    
    return model

def train_and_evaluate(args):
    model = build_model()
    train_ds = load_dataset(
        args["train_data_path"],
        args["batch_size"])

    eval_ds = load_dataset(
        args["eval_data_path"], args["batch_size"], training=False)
    if args["eval_steps"]:
        evalds = evalds.take(count=args["eval_steps"])


    checkpoint_path = os.path.join(args["output_dir"], "checkpoints/pneumonia")
    cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path, verbose=1, save_weights_only=True)
    history =     model.fit(
                          train_ds,
                          validation_data=eval_ds,
                          steps_per_epoch=5,
                          epochs=args["num_epochs"],
                          validation_steps=2,
#                           callbacks=[cp_callback]
                        )

    EXPORT_PATH = os.path.join(
        args["output_dir"], datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
    tf.saved_model.save(
        obj=model, export_dir=EXPORT_PATH)  # with default serving function
    
#     hp_metric = history.history['val_rmse'][-1]

#     hpt = hypertune.HyperTune()
#     hpt.report_hyperparameter_tuning_metric(
#         hyperparameter_metric_tag='rmse',
#         metric_value=hp_metric,
#         global_step=args['num_epochs'])
    
    print("Exported trained model to {}".format(EXPORT_PATH))



Overwriting trainer/model.py


In [56]:
%%bash
BUCKET='open_project'
OUTDIR=pneumonia_trained
rm -rf ${OUTDIR}
export PYTHONPATH=${PYTHONPATH}:${PWD}/
python3 -m trainer.task \
    --job-dir=./tmp \
    --train_data_path=gs://${BUCKET}/chest_xray/train.csv  \
    --eval_data_path=gs://${BUCKET}/chest_xray/val.csv \
    --output_dir=${OUTDIR} \
    --batch_size=32 \
    --num_epochs=1 

Exported trained model to pneumonia_trained/20210624013539


2021-06-24 01:35:27.078838: E tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/core/saved_model/write/count
2021-06-24 01:35:27.078899: E tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/core/saved_model/read/count
2021-06-24 01:35:27.078907: E tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/core/saved_model/write/api
2021-06-24 01:35:27.078914: E tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/core/saved_model/read/api
2021-06-24 01:35:28.415218: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-06-24 01:35:28.416196: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937

In [58]:
import os
PROJECT = "qwiklabs-gcp-00-8478c1b0a0c1"  # Replace with your PROJECT
BUCKET = PROJECT   # defaults to PROJECT
REGION = "us-central1"  # Replace with your REGION
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "2.1"
os.environ["PYTHONVERSION"] = "3.7"

In [61]:
%%bash

BUCKET='open_project'
OUTDIR=gs://open_project/chest_xray/trained_model
JOBID=pneumonia_$(date -u +%y%m%d_%H%M%S)

gcloud ai-platform jobs submit training ${JOBID} \
    --region=${REGION} \
    --module-name=trainer.task \
    --package-path=$(pwd)/trainer \
    --job-dir=${OUTDIR} \
    --staging-bucket=gs://${BUCKET} \
    --master-machine-type=n1-standard-8 \
    --scale-tier=CUSTOM \
    --runtime-version=${TFVERSION} \
    --python-version=${PYTHONVERSION} \
    -- \
    --train_data_path=gs://${BUCKET}/chest_xray/train.csv  \
    --eval_data_path=gs://${BUCKET}/chest_xray/val.csv \
    --output_dir=${OUTDIR} \
    --num_epochs=10 \
    --eval_steps=100 \
    --batch_size=32 

jobId: pneumonia_210624_014829
state: QUEUED


Job [pneumonia_210624_014829] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe pneumonia_210624_014829

or continue streaming the logs with the command

  $ gcloud ai-platform jobs stream-logs pneumonia_210624_014829
