In [9]:
import digitalhub as dh

!pip install -r requirements.txt

Collecting git+https://github.com/fpaissan/micromind (from -r requirements.txt (line 4))
  Cloning https://github.com/fpaissan/micromind to /tmp/pip-req-build-ign29buv
  Running command git clone --filter=blob:none --quiet https://github.com/fpaissan/micromind /tmp/pip-req-build-ign29buv
  Resolved https://github.com/fpaissan/micromind to commit 8d60ec970ccf19db2b802d96916897b6e7f76b34
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone


In [2]:
project_object_detection = dh.get_or_create_project(name="object_detection")

# Simple YOLO Model training

In [12]:
%%writefile functions/train_model.py

from ultralytics import YOLO

def train_yolo_model(project, context, params:dict):
    # Load a pretrained model
    model = YOLO(params["base_model"])
    
    # Train the model
    model.train(data=params["data"], epochs=params["epochs"], imgsz=params["imgsz"])
    project.log_model(
            name="yolo_object_detection",
            kind="model",
            framework="ultralytics",
            algorithm="YOLO",
            source="yolo11n.pt",
            metrics={}
    )  

Overwriting functions/train_model.py


In [13]:
model_fit_fn = project_object_detection.new_function(name="object_detection_yolo",
                                kind="python",
                                python_version="PYTHON3_10",
                                code_src="functions/train_model.py",
                                handler="train_yolo_model",
                                requirements=["requests==2.25.1", "opencv-python", "ultralytics==8.0.215", "git+https://github.com/fpaissan/micromind"])

In [15]:
params = {"data": 'Argoverse.yaml', "epochs": 1, "imgsz":640, "base_model": "yolo11n.pt"}
train_run = model_fit_fn.run(action="job", parameters=params)

# Data Drift Detector

In [2]:
import numpy as np
import torch
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from alibi_detect.cd import MMDDrift
from alibi_detect.utils.saving import save_detector, load_detector
from PIL import Image
import os
import shutil

# Load CIFAR-10 dataset as the reference dataset
def load_cifar10_data():
    transform = transforms.Compose([
        transforms.Resize((32, 32)),  # Resize images to a consistent size
        transforms.ToTensor(),        # Convert images to PyTorch tensors
    ])
    dataset = CIFAR10(root='./data', train=True, download=True, transform=transform)
    dataloader = DataLoader(dataset, batch_size=1000, shuffle=True)
    X_ref = next(iter(dataloader))[0].numpy()  # Extract the first batch of images
    return X_ref

# Load datasets
X_ref = load_cifar10_data()  # Reference dataset (CIFAR-10)

# Initialize the MMD drift detector
mmd_detector = MMDDrift(
    X_ref,
    p_val=0.05,              # Significance level for drift detection
    backend='tensorflow',    # Use PyTorch backend
    n_permutations=100       # Number of permutations for p-value calculation
)
# Save the detector for future use
folder_detector = "mmd_drift_detector" 
save_detector(mmd_detector, folder_detector)
output_zip = "mmd_drift_detector_backup"

# Create a zip archive
shutil.make_archive(output_zip, 'zip', folder_detector)
#project.log_artifact(name="drift_detector",
#                    kind="artifact",
#                    source=f"model/ts_clustering_{n_clusters}_clusters.json")

'/home/acelepija/aipc-model/aipc_examples/object_detection_aipc/implementation/src/mmd_drift_detector_backup.zip'

In [None]:
%%writefile functions/data_drift_monitor.py

import os
import torch
import shutil
import torchvision.transforms as transforms

# Load COCO8 dataset as the test dataset
def load_coco8_data(coco8_path):
    transform = transforms.Compose([
        transforms.Resize((32, 32)),  # Resize images to match CIFAR-10 size
        transforms.ToTensor(),        # Convert images to PyTorch tensors
    ])
    images = []
    for img_name in os.listdir(coco8_path):
        img_path = os.path.join(coco8_path, img_name)
        img = Image.open(img_path).convert('RGB')  # Ensure 3 channels (RGB)
        img = transform(img)
        images.append(img)
    X_test = torch.stack(images).numpy()  # Convert list of tensors to numpy array
    return X_test


def check_data_drift(X_test):
    zip_file = "mmd_drift_detector_backup.zip" 
    extract_to = "mmd_drift_detector"  
    
    # Extract the zip file
    shutil.unpack_archive(zip_file, extract_to)
    
    coco8_path = './data/coco8/images/train'    
    X_test = load_coco8_data(coco8_path)  
    
    # Load the detector
    loaded_detector = load_detector('mmd_drift_detector')
    # Check for drift on the test data
    preds = mmd_detector.predict(X_test)
    
    # Output the results
    print(f"Drift detected: {preds['data']['is_drift']}")
    print(f"p-value: {preds['data']['p_val']}")
    print(f"MMD statistic: {preds['data']['distance']}")
    print(preds)
    return preds['data']['is_drift']

# PhiNet YOLO Model training

In [None]:
%% writefile function/train_model_phinet.py

import torch
import torch.nn as nn
from prepare_data import create_loaders, setup_mixup
from timm.loss import (
    BinaryCrossEntropy,
    LabelSmoothingCrossEntropy,
    SoftTargetCrossEntropy,
)

import micromind as mm
from micromind.networks import PhiNet, XiNet
from micromind.utils import parse_configuration
import sys


class ImageClassification(mm.MicroMind):
    """Implements an image classification class. Provides support
    for timm augmentation and loss functions."""

    def __init__(self, hparams, *args, **kwargs):
        super().__init__(hparams, *args, **kwargs)

        if hparams.model == "phinet":
            self.modules["classifier"] = PhiNet(
                input_shape=hparams.input_shape,
                alpha=hparams.alpha,
                num_layers=hparams.num_layers,
                beta=hparams.beta,
                t_zero=hparams.t_zero,
                compatibility=False,
                divisor=hparams.divisor,
                downsampling_layers=hparams.downsampling_layers,
                return_layers=hparams.return_layers,
                # classification-specific
                include_top=True,
                num_classes=hparams.num_classes,
            )
        elif hparams.model == "xinet":
            self.modules["classifier"] = XiNet(
                input_shape=hparams.input_shape,
                alpha=hparams.alpha,
                gamma=hparams.gamma,
                num_layers=hparams.num_layers,
                return_layers=hparams.return_layers,
                # classification-specific
                include_top=True,
                num_classes=hparams.num_classes,
            )

        self.mixup_fn, _ = setup_mixup(hparams)

        print("Number of parameters for each module:")
        print(self.compute_params())

        print("Number of MAC for each module:")
        print(self.compute_macs(hparams.input_shape))

    def setup_criterion(self):
        """Setup of the loss function based on augmentation strategy."""
        # setup loss function
        if (
            self.hparams.mixup > 0
            or self.hparams.cutmix > 0.0
            or self.hparams.cutmix_minmax is not None
        ):
            # smoothing is handled with mixup target transform which outputs sparse,
            # soft targets
            if self.hparams.bce_loss:
                train_loss_fn = BinaryCrossEntropy(
                    target_threshold=self.hparams.bce_target_thresh
                )
            else:
                train_loss_fn = SoftTargetCrossEntropy()
        elif self.hparams.smoothing:
            if self.hparams.bce_loss:
                train_loss_fn = BinaryCrossEntropy(
                    smoothing=self.hparams.smoothing,
                    target_threshold=self.hparams.bce_target_thresh,
                )
            else:
                train_loss_fn = LabelSmoothingCrossEntropy(
                    smoothing=self.hparams.smoothing
                )
        else:
            train_loss_fn = nn.CrossEntropyLoss()

        return train_loss_fn

    def forward(self, batch):
        """Computes forward step for image classifier.

        Arguments
        ---------
        batch : List[torch.Tensor, torch.Tensor]
            Batch containing the images and labels.

        Returns
        -------
        Predicted class and augmented class. : Tuple[torch.Tensor, torch.Tensor]
        """
        img, target = batch
        if not self.hparams.prefetcher:
            img, target = img.to(self.device), target.to(self.device)
            if self.mixup_fn is not None:
                img, target = self.mixup_fn(img, target)

        return (self.modules["classifier"](img), target)

    def compute_loss(self, pred, batch):
        """Sets up the loss function and computes the criterion.

        Arguments
        ---------
        pred : Tuple[torch.Tensor, torch.Tensor]
            Predicted class and augmented class.
        batch : List[torch.Tensor, torch.Tensor]
            Same batch as input to the forward step.

        Returns
        -------
        Cost function. : torch.Tensor
        """
        self.criterion = self.setup_criterion()

        # taking it from pred because it might be augmented
        return self.criterion(pred[0], pred[1])

    def configure_optimizers(self):
        """Configures the optimizes and, eventually the learning rate scheduler."""
        opt = torch.optim.Adam(self.modules.parameters(), lr=3e-4, weight_decay=0.0005)
        return opt


def top_k_accuracy(k=1):
    """
    Computes the top-K accuracy.

    Arguments
    ---------
    k : int
       Number of top elements to consider for accuracy.

    Returns
    -------
        accuracy : Callable
            Top-K accuracy.
    """

    def acc(pred, batch):
        if pred[1].ndim == 2:
            target = pred[1].argmax(1)
        else:
            target = pred[1]
        _, indices = torch.topk(pred[0], k, dim=1)
        correct = torch.sum(indices == target.view(-1, 1))
        accuracy = correct.item() / target.size(0)

        return torch.Tensor([accuracy]).to(pred[0].device)

    return acc

def train_model_phinet(project, cfg_file):
    hparams = parse_configuration(cfg_file)

    train_loader, val_loader = create_loaders(hparams)

    exp_folder = mm.utils.checkpointer.create_experiment_folder(
        hparams.output_folder, hparams.experiment_name
    )

    checkpointer = mm.utils.checkpointer.Checkpointer(
        exp_folder, hparams=hparams, key="loss"
    )

    mind = ImageClassification(hparams=hparams)

    top1 = mm.Metric("top1_acc", top_k_accuracy(k=1), eval_only=True)
    top5 = mm.Metric("top5_acc", top_k_accuracy(k=5), eval_only=True)

    mind.train(
        epochs=hparams.epochs,
        datasets={"train": train_loader, "val": val_loader},
        metrics=[top5, top1],
        checkpointer=checkpointer,
        debug=hparams.debug,
    )

    mind.test(datasets={"test": val_loader}, metrics=[top1, top5])
