## Fruit and Vegetable Disease (Healthy vs Rotten) - Kaggle + Vertex AI Training (Custom Job) Example

* Kaggle page:  https://www.kaggle.com/datasets/muhammad0subhan/fruit-and-vegetable-disease-healthy-vs-rotten
* dataset: https://www.kaggle.com/datasets/muhammad0subhan/fruit-and-vegetable-disease-healthy-vs-rotten/data
* notebook: https://www.kaggle.com/code/osamaabobakr/fruit-and-vegetable-disease-healthy-vs-rotten

by: Justin Marciszewski | justinjm@google.com | AI/ML Specialist CE

## Setup



### Install packages

In [None]:
packages = [
    ('numpy', 'numpy'),
    ('os', 'os-sys'), # os is built-in, this is for demonstration
    ('cv2', 'opencv-python'),
    ('re', 're'), # re is built-in, this is for demonstration
    ('random', 'random'), # random is built-in, this is for demonstration
    ('matplotlib.pyplot', 'matplotlib'),
    ('seaborn', 'seaborn'),
    ('kaggle.api.kaggle_api_extended', 'kaggle'),
    ('sklearn.model_selection', 'scikit-learn'),
    ('sklearn.utils', 'scikit-learn'),
    ('keras', 'keras'),
    ('tensorflow.keras', 'tensorflow'),
    ('tensorflow.keras.layers', 'tensorflow'),
    ('tensorflow.keras.models', 'tensorflow'),
    ('tensorflow.keras.applications', 'tensorflow'),
    ('tensorflow.keras.preprocessing.image', 'tensorflow')
]

import importlib
install = False
for package in packages:
    try:
        importlib.import_module(package[0])
    except ImportError:
        print(f'installing package {package[1]}')
        install = True
        !pip install {package[1]} -U -q --user

if install:
    print("Installation of missing packages complete. Please run the next cell to restart the kernel before proceeding.")

### Restart Kernel (If Installs Occured)
After a kernel restart the code submission can start with the next cell after this one.

In [None]:
if install:
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Setup 

### Set constants

In [None]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

In [None]:
LOCATION = "us-central1"  
REGION = 'us-central1' 

EXPERIMENT = "03"
SERIES = "fruit-and-vegetable-image-model"

BUCKET_NAME = PROJECT_ID 

# data pre-processing
DATASET_VALIDATION_SPLIT = 0.2  # 20% for validation

## custom containar 
REPO_NAME = "fruit-and-vegetable-image-model-repo"
IMAGE_NAME = "tf_training"
IMAGE_TAG = "latest"
IMAGE_URI = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{IMAGE_NAME}:{IMAGE_TAG}"

## model training 
DESIRED_LABELS = [
    'Apple__Healthy', 'Apple__Rotten',
    'Banana__Healthy', 'Banana__Rotten',
    'Bellpepper__Healthy', 'Bellpepper__Rotten'
]
NUM_CLASSES = len(DESIRED_LABELS)

## Vertex AI custom job
MACHINE_TYPE = 'n1-standard-8'
MODEL_URI = "https://tfhub.dev/google/imagenet/resnet_v2_50/classification/5" 

### Packages

In [None]:
# Data Ingestion
from datetime import datetime
import os
from pathlib import Path
import subprocess
import time
import json
import re
import random
import tempfile
import threading
import pandas as pd

from google.cloud import storage
from google.cloud.exceptions import NotFound

from kaggle.api.kaggle_api_extended import KaggleApi

# Data pre-processing
from PIL import Image  # For image loading and preprocessing
from sklearn.model_selection import train_test_split

# Modeling 
import tensorflow as tf
from google.cloud import aiplatform
from concurrent.futures import ThreadPoolExecutor
import numpy as np

### Parameters

In [None]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
URI = f"gs://{BUCKET_NAME}/{SERIES}/{EXPERIMENT}" # custom job -> base_output_dir = f"{URI}/models/{TIMESTAMP}",
DIR = f"temp/{EXPERIMENT}"

LOCAL_DATA_DIR = f"{DIR}/data"
LOCAL_CSV_IMAGE_DATA_PATH = f"{LOCAL_DATA_DIR}/labels.csv"

DATASET_CSV = f"{URI}/{TIMESTAMP}/labels.csv"

### Experiment Tracking 

In [None]:
FRAMEWORK = 'tf'
TASK = 'classification'
MODEL_TYPE = 'tl'
EXPERIMENT_NAME = f'experiment-{SERIES}-{EXPERIMENT}-{FRAMEWORK}-{TASK}-{MODEL_TYPE}'
RUN_NAME = f'run-{TIMESTAMP}'

### Create a local directories for staging files 

* data files from creating labels.csv
* build files for creating custom container and running a custom job 
* model training output files and example input images for local inference

In [None]:
! rm -rf $LOCAL_DATA_DIR
! mkdir -p $LOCAL_DATA_DIR

In [None]:
if not os.path.exists(f"{DIR}/build"):
    os.makedirs(f"{DIR}/build")

In [None]:
if not os.path.exists(f"{DIR}/output"):
    os.makedirs(f"{DIR}/output")

## Clients 

In [None]:
#  Google Cloud Storage client
storage_client = storage.Client(project=PROJECT_ID)
aiplatform.init(project=PROJECT_ID, location=REGION)

## Create Storage Bucket

In [None]:
def check_and_create_bucket(bucket_name, location):
    try:
        storage_client.get_bucket(bucket_name)
        print(f"Bucket {bucket_name} already exists.")
    except NotFound:
        bucket = storage_client.create_bucket(bucket_or_name=bucket_name, location=location)
        print(f"Bucket {bucket_name} created.")

In [None]:
check_and_create_bucket(BUCKET_NAME, LOCATION)

## Get Data from Kaggle

### Setup Kaggle credentials

You will need a Kaggle account and locate or create a kaggle.json file in the directory: `/home/jupyter/.config/kaggle`

Steps:

* manually download your credentail file from kaggle.com -> Profile
* run this command in terminal to move it to the correct location: `mv kaggle.json .config/kaggle/kaggle.json`


### Download images 

In [None]:
# Set up Kaggle credentials 
os.environ['KAGGLE_USERNAME'] = 'YOUR_KAGGLE_USERNAME' 
os.environ['KAGGLE_KEY'] = 'YOUR_KAGGLE_API_KEY'

# Initialize the Kaggle API
api = KaggleApi()
api.authenticate()

# Specify the dataset you want to download
dataset_slug = 'muhammad0subhan/fruit-and-vegetable-disease-healthy-vs-rotten'

# Download the dataset
api.dataset_download_files(dataset_slug, path=LOCAL_DATA_DIR, unzip=True)

### Convert images

In [None]:
def convert_image_to_rgb_and_jpeg(image_path):
    """Converts and saves an image to RGB JPEG format, overwriting the original."""
    try:
        img = Image.open(image_path)

        if img.mode != 'RGB':
            img = img.convert('RGB')

        img.save(image_path, format='JPEG')  # Overwrite the original
        # print(f'Converted and saved: {image_path}')

    except Exception as e:
        print(f'Error processing {image_path}: {e}')

def process_directory(root_dir, subdirs_to_convert, max_workers=None):
    """Processes images within specified subdirectories using multithreading."""
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for root, dirs, files in os.walk(root_dir):
            # Filter directories based on the provided list
            dirs[:] = [d for d in dirs if d in subdirs_to_convert]

            for file in files:
                if file.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):  # Add more extensions if needed
                    image_path = Path(root) / file
                    executor.submit(convert_image_to_rgb_and_jpeg, image_path)

In [None]:
root_directory = f"{LOCAL_DATA_DIR}/Fruit And Vegetable Diseases Dataset"
subdirectories_to_convert = DESIRED_LABELS

process_directory(root_directory, subdirectories_to_convert)

## Load to GCS

In [None]:
# Loop over each subdirectory (label) and copy the contents using gsutil
for subdir in DESIRED_LABELS:
    source = f'"{LOCAL_DATA_DIR}/Fruit And Vegetable Diseases Dataset/{subdir}/*"'
    destination = f"{URI}/data/{subdir}/"
    print(destination)
    command = f"gsutil -m cp -r {source} {destination} > /dev/null 2>&1"
    
    # Execute the command using subprocess
    process = subprocess.run(command, shell=True)
    
    if process.returncode == 0:
        print(f"Successfully copied {subdir}")
    else:
        print(f"Failed to copy {subdir}")

## Prepare data 

## Create csv labels file and upload for use in model training

Create a csv file called `labels.csv` with the schema:  `gs://filename.jpg, label` 

This file should contain no headers and be located in GCS 

In [None]:
def get_file_list(bucket_name):
    # get list of all files from bucket
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs()
    file_list = ['gs://' + bucket_name + '/' + blob.name for blob in blobs]
    
    return file_list

In [None]:
file_list = get_file_list(BUCKET_NAME)
file_list[:10]

In [None]:
def create_dataframe(file_list, filter_pattern):
    # filter to include on filenames with jpg filename
    image_files = [file for file in file_list if file.endswith(('.jpg'))]
    df = pd.DataFrame(image_files, columns=['filename'])
    
    ## filter to only 3 foods per constants set above for demonstration purposes 
    df = df[df['filename'].str.contains(filter_pattern, regex=True)]
    
    # Extract the label from the GCS path (it's the second part after the bucket name)
    df['label'] = df['filename'].apply(lambda x: x.split('/')[6])  # Assuming the label is in the ith segment of the path
    
    return df

In [None]:
pd.options.display.max_colwidth = 100 # set option to view long strings 

df_labels = create_dataframe(file_list, 
                             filter_pattern = '|'.join(DESIRED_LABELS))
df_labels.head()

In [None]:
df_labels.shape[0]

In [None]:
df_labels['label'].value_counts()

### Save labels.csv

Save labels.csv locally and to GCS Bucket for use in vertex ai training in next step

In [None]:
## DEV - use only 100 images for dev
# df_labels.head(100).to_csv(LOCAL_CSV_IMAGE_DATA_PATH, index=False, header=False)
df_labels.to_csv(LOCAL_CSV_IMAGE_DATA_PATH, index=False, header=False)

In [None]:
bucket = storage_client.bucket(BUCKET_NAME)
blob = bucket.blob(f"{SERIES}/{EXPERIMENT}/{TIMESTAMP}/labels.csv")  # works? 
blob.upload_from_filename(LOCAL_CSV_IMAGE_DATA_PATH)

## Model Training

### Data pre-processing

In [None]:
# --- Preprocessing ---
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def create_tfrecord_from_gcs(gcs_path, label, tfrecord_writer, storage_client, label_map):
    """Creates a TFRecord from an image in GCS and writes it to the writer."""
    try:
        bucket_name, blob_name = gcs_path[5:].split('/', 1)
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        
        with tempfile.NamedTemporaryFile() as temp_file:
            blob.download_to_filename(temp_file.name)
            with open(temp_file.name, 'rb') as image_file:
                image_string = image_file.read()

        # Convert label to integer using the label map
        label_int = label_map[label]

        feature = {
            'image/encoded': _bytes_feature(image_string),
            'image/class/label': _int64_feature(label_int)
        }
        example = tf.train.Example(features=tf.train.Features(feature=feature))
        tfrecord_writer.write(example.SerializeToString())
    except Exception as e:
        print(f"Error processing {gcs_path}: {e}")


def create_and_upload_tfrecord(split_name, df, bucket, storage_client, label_map):
    """Creates a TFRecord file from the DataFrame and uploads it to GCS."""
    blob_name = f"{SERIES}/{EXPERIMENT}/{TIMESTAMP}/{split_name}.tfrecord"
    blob = bucket.blob(blob_name)
    writer_lock = threading.Lock()
    error_occurred = False  # Flag to track errors

    with tempfile.NamedTemporaryFile() as temp_file:
        writer = tf.io.TFRecordWriter(temp_file.name)
        for row in df.itertuples():
            try:
                create_tfrecord_from_gcs(row.image_path, row.label, writer, storage_client, label_map)
            except Exception as e:
                print(f"Error processing {row.image_path}: {e}")
                error_occurred = True  # Set the flag if an error occurs

        writer.close()

        # Check if any errors occurred before uploading
        if not error_occurred:
            blob.upload_from_filename(temp_file.name, timeout=600) # to avoid any connection timeout issues
            print(f"Uploaded {blob_name} to GCS.")
        else:
            print(f"Not uploading {blob_name} due to errors during TFRecord creation.")

# --- Main Preprocessing Function ---
def preprocess_data():
    """Reads CSV, splits data, creates label map, and creates/uploads TFRecords."""
    df = pd.read_csv(DATASET_CSV, header=None, names=['image_path', 'label'])
    train_df, val_df = train_test_split(df, test_size=DATASET_VALIDATION_SPLIT, random_state=42)

    # Create a label map (dictionary mapping labels to integer IDs)
    unique_labels = df['label'].unique()
    label_map = {label: i for i, label in enumerate(unique_labels)}

    storage_client = storage.Client(project=PROJECT_ID)
    bucket = storage_client.bucket(BUCKET_NAME)

    # save label map to GCS for use in prediction later 
    blob = bucket.blob(f"{SERIES}/{EXPERIMENT}/{TIMESTAMP}/label_map.json")  # Specify GCS path for label_map.json here
    blob.upload_from_string(json.dumps(label_map), content_type='application/json')

    create_and_upload_tfrecord('train', train_df, bucket, storage_client, label_map)
    create_and_upload_tfrecord('val', val_df, bucket, storage_client, label_map)

### Execute data pre-processing 

Create TFRecords files in GCS bucket

In [None]:
preprocess_data() 

## Create Training Script


In [None]:
with open(f"{DIR}/build/train.py", 'w') as f:
    f.write("""import os
import tensorflow as tf
import tensorflow_hub as hub

# Get Environment Variables
TFRECORD_PATH = os.environ['AIP_TFRECORD_PATH']
VALIDATION_PATH = os.environ['AIP_VALIDATION_PATH']
OUTPUT_PATH = os.environ['AIP_OUTPUT_PATH']
MODEL_URI = os.environ['MODEL_URI']
NUM_CLASSES = os.environ['NUM_CLASSES']

# sanity check TF version
print(f"TensorFlow version: {tf.__version__}")

# Load the TFRecords
def parse_tfrecord(example):
    feature_description = {
        'image/encoded': tf.io.FixedLenFeature([], tf.string),
        'image/class/label': tf.io.FixedLenFeature([], tf.int64),
    }
    example = tf.io.parse_single_example(example, feature_description)
    image = tf.io.decode_jpeg(example['image/encoded'], channels=3)
    image = tf.image.resize(image, [224, 224])  # Resize to match model input
    image = tf.cast(image, tf.float32) / 255.0  # Normalize to [0, 1]
    label = tf.cast(example['image/class/label'], tf.int32)
    return image, label


def get_dataset(filename):
    return tf.data.TFRecordDataset(filename).map(parse_tfrecord).shuffle(1000).batch(32).prefetch(1)

train_dataset = get_dataset(TFRECORD_PATH)
val_dataset = get_dataset(VALIDATION_PATH)

# Load the TensorFlow Hub model
model = hub.KerasLayer(MODEL_URI, trainable=False)

# Add your custom classification head
# CHECK - ensure this is same number of classes in your dataset ###########################
print(f"NUM_CLASSES: {NUM_CLASSES}")

model = tf.keras.Sequential([
    model,
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
])

# Compile the model
model.compile(
    optimizer='adam',
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

# Train the model
model.fit(train_dataset, epochs=10, validation_data=val_dataset)

# Save the model
model.save(OUTPUT_PATH)
""")

### Create Dockerfile

Refs:

* https://cloud.google.com/vertex-ai/docs/training/pre-built-containers

In [None]:
with open(f"{DIR}/build/Dockerfile", 'w') as f:
    f.write("""FROM us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-11:latest

# Set the working directory
WORKDIR /root

# Install dependencies
RUN pip install tensorflow-hub==0.12.0

# Copy the training script
COPY train.py /root/train.py

# Define the entry point
ENTRYPOINT ["python", "train.py"]
""")

## Build Container Image

### Create docker repository


In [None]:
!gcloud artifacts repositories create {REPO_NAME} --repository-format=docker --location={REGION} --description="Docker repository for fruit and vegetable image model training on Vertex AI"

### configure auth for docker

Before you push or pull container images, configure Docker to use the `gcloud` command-line tool to authenticate requests to `Artifact Registry` for your region.

In [None]:
!gcloud auth configure-docker {REGION}-docker.pkg.dev --quiet

### Build container image 

In [None]:
!gcloud builds submit --region={REGION} --tag={IMAGE_URI} --timeout=1h ./{DIR}/build 

## Custom Job Definition 

Refs 

* [Create custom training jobs  |  Vertex AI  |  Google Cloud](https://cloud.google.com/vertex-ai/docs/training/create-custom-job#create_custom_job-python_vertex_ai_sdk)
* customJob - https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.customJobs#CustomJob
    * customJobSpec - `baseOutputDirectory` https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec
* other example: https://github.com/statmike/vertex-ai-mlops/blob/main/05%20-%20TensorFlow/05c%20-%20Vertex%20AI%20Custom%20Model%20-%20TensorFlow%20-%20Custom%20Job%20With%20Custom%20Container.ipynb

In [None]:
def create_custom_job(project_id, 
                      region, 
                      display_name, 
                      bucket_name,
                      base_output_dir,
                      machine_type,
                      labels
                     ):
    
    worker_pool_specs = [
        {
            "machine_spec": {
                "machine_type": machine_type,
            },
            "replica_count": 1,
            "container_spec": {
                "image_uri": IMAGE_URI,
                "env": [  
                    {"name": "AIP_TFRECORD_PATH", "value": f"{URI}/{TIMESTAMP}/train.tfrecord"},
                    {"name": "AIP_VALIDATION_PATH", "value": f"{URI}/{TIMESTAMP}/val.tfrecord"},
                    {"name": "AIP_OUTPUT_PATH", "value": f"{base_output_dir}/output"},
                    {"name": "AIP_MODEL_DIR", "value": f"{base_output_dir}/output"},  # <- important! the model output location
                    {"name": "TFHUB_CACHE_DIR", "value": f"{base_output_dir}/tfhub_cache"}, 
                    {"name": "MODEL_URI", "value": MODEL_URI},
                    {"name": "NUM_CLASSES", "value": str(NUM_CLASSES)},
                ],
            },
        }
    ]

    custom_job = aiplatform.CustomJob(
        project=project_id,
        location=region,
        display_name=display_name,
        staging_bucket=bucket_name,
        base_output_dir=base_output_dir,
        worker_pool_specs=worker_pool_specs,
        labels=labels
    )
    
    return custom_job

###  Submit Job

In [None]:
custom_job = create_custom_job(
    project_id=PROJECT_ID,
    region=REGION,
    display_name = f'{SERIES}_{EXPERIMENT}_{TIMESTAMP}',
    bucket_name = BUCKET_NAME,  # aka - staging_bucket 
    base_output_dir = f"{URI}/models/{TIMESTAMP}", 
    machine_type=MACHINE_TYPE,
    labels = {'series' : f'{SERIES}', 'experiment' : f'{EXPERIMENT}', 'run_name' : f'{RUN_NAME}'}
)

In [None]:
# Submit the custom job to Vertex AI
custom_job.run()

## Download Model for local inference

### Helper functions 

For downloading model, a sample image and finally making a prediction

In [None]:
def download_blobs_with_prefix(bucket_name, prefix, local_directory):
    
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=prefix)

    for blob in blobs:
        # Skip "directory" objects
        if blob.name.endswith("/"):
            continue

        # Calculate the relative path within the prefix
        relative_path = blob.name[len(prefix):] 

        # Create the local directory for the relative path
        local_file_directory = os.path.join(local_directory, os.path.dirname(relative_path))
        os.makedirs(local_file_directory, exist_ok=True)

        # Download the blob
        local_file_path = os.path.join(local_directory, relative_path)
        blob.download_to_filename(local_file_path)
        print(f"Blob {blob.name} downloaded to {local_file_path}.")

        
def download_random_jpg(bucket_name, pattern):

    bucket = storage_client.bucket(bucket_name)
    # Get list of blobs (files) with the pattern
    blobs = [blob for blob in bucket.list_blobs() if re.search(pattern, blob.name)]
    
    if not blobs:
        print("No files found with the pattern:", pattern)
        return None
    
    # Choose a random blob
    random_blob = random.choice(blobs)

    # Download the blob
    local_filename = random_blob.name 
    local_directory = os.path.dirname(local_filename)
    os.makedirs(local_directory, exist_ok=True)  # Ensure directory exists
    
    random_blob.download_to_filename(local_filename)
    print(f"Downloaded {local_filename} from bucket {bucket_name}")

    return local_filename


def preprocess_image(image_path, target_size=(224, 224)):
    """Preprocesses an image for model prediction."""
    img = Image.open(image_path).convert('RGB')  # Ensure RGB format
    img = img.resize(target_size)
    img_array = np.array(img, dtype=np.float32) / 255.0  # Normalize & set to float32
    return img_array  # Remove extra dimension (model handles batching)

### 2. Download the model

In [None]:
download_blobs_with_prefix(bucket_name=BUCKET_NAME, 
                           prefix=f"{SERIES}/{EXPERIMENT}/models/{TIMESTAMP}/output/", 
                           local_directory=f"{DIR}/output")

### 3. Load the model

In [None]:
model = tf.saved_model.load(f"{DIR}/output")

### 4. Prepare an image for prediction

#### download a random image 

Filter to only 3 foods for demonstration purposes 

In [None]:
# same set of labels as before
downloaded_file = download_random_jpg(
    bucket_name=BUCKET_NAME, 
    pattern=f'({"|".join(DESIRED_LABELS)})(?!\.png$)') 

In [None]:
## display image to sanity check
display(Image.open(downloaded_file))

In [None]:
## and pre-process image for prediction
preprocessed_image = preprocess_image(downloaded_file)
# preprocessed_image

### 5. Make a prediction

In [None]:
# Add batch dimension
preprocessed_image = np.expand_dims(preprocessed_image, axis=0) 

# Get predictions
predictions = model(preprocessed_image)
class_probabilities = tf.nn.softmax(predictions)  # Get probabilities
predicted_class = tf.argmax(class_probabilities).numpy()

print(f"Predicted class: {predicted_class}")
print(f"Class probabilities: {class_probabilities}")

#### Get predicted class 

And finally download the `label_map.json` to lookup the predicted class name when making prediction so we have a useful output

In [None]:
bucket = storage_client.bucket(BUCKET_NAME)
blob = bucket.blob(f"{SERIES}/{EXPERIMENT}/{TIMESTAMP}/label_map.json")
label_map_json_string = blob.download_as_string()
label_map = json.loads(label_map_json_string)

predicted_class_index = tf.argmax(class_probabilities, axis=-1).numpy()[0]  # Extract scalar value
# Get the predicted class name (assuming you have predicted_class_index)
predicted_class_name = [label for label, index in label_map.items() if index == predicted_class_index][0]
print(f"Predicted Class Name: {predicted_class_name}")

### Cleanup downloaded image

Delete the downloaded image file to keep local directory clean

In [None]:
if os.path.exists(downloaded_file):  # Check if the file exists
    os.remove(downloaded_file)
    print(f"Deleted downloaded image file: {downloaded_file}")
else:
    print(f"Downloaded image file not found: {downloaded_file}")