In [None]:
################################################################################
# COMPLETE Enhanced YOLOv12n Training Script with Custom Enhancements, QAT,
#  Results Processing
################################################################################

In [None]:
# 1. Environment Setup
# --------------------
print("Phase 1: Environment Setup")
print("==========================")

try:
    from google.colab import drive
    drive.mount('/content/drive')
    print("Google Drive mounted successfully.")
except Exception as e:
    print("Google Drive mount not required or failed:", e)

import subprocess
print("Installing/Updating Ultralytics and other necessary packages...")
try:
    # Ensure ultralytics is up-to-date for latest QAT features if any
    subprocess.run(["pip", "install", "--upgrade", "ultralytics", "torchvision", "torchaudio"], check=True)
    subprocess.run(["pip", "install", "roboflow", "tqdm", "pandas", "seaborn", "matplotlib", "pyyaml", "scipy"], check=True) # Added scipy for k-means
    print("Packages installed/updated successfully.")
except subprocess.CalledProcessError as e:
    print(f"Error installing packages: {e}")
    raise

Phase 1: Environment Setup
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Google Drive mounted successfully.
Installing/Updating Ultralytics and other necessary packages...
Packages installed/updated successfully.


In [None]:
!pip install torch torchvision torchaudio ultralytics tqdm



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import yaml
import numpy as np
from scipy.cluster.vq import kmeans
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from ultralytics import YOLO
from torch.ao.quantization import QuantStub, DeQuantStub
from ultralytics.nn.modules import Conv, C2f, Detect, Concat # Import building blocks
from tqdm import tqdm as tqdm_iterator # Renamed to avoid conflict with YOLO's internal tqdm
from IPython.display import Image, display # For displaying plots in Colab
from ultralytics.utils.plotting import plot_results as ultralytics_plot_results # Alias for clarity
from torch.ao.quantization import QuantStub, DeQuantStub


print("All initial imports and setup completed.\n")

All initial imports and setup completed.



In [None]:
# Load YOLO12n model
model = YOLO('yolo12n.pt')

# Create dummy input tensor
dummy_input = torch.randn(1, 3, 640, 640)

# Extract backbone layers (all but the last Detect layer)
layers = model.model.model[:-1]

# Forward pass through the backbone, correctly handling Concat layers
outputs = []
x = dummy_input
for layer in layers:
    if layer.f != -1:
        x = layer([outputs[i] for i in layer.f])
    else:
        x = layer(x)
    outputs.append(x)

# Last three outputs are backbone outputs (feature maps)
p3, p4, p5 = outputs[-3], outputs[-2], outputs[-1]

print("✅ Backbone Feature Maps:")
print(f"P3 shape: {p3.shape}")
print(f"P4 shape: {p4.shape}")
print(f"P5 shape: {p5.shape}")

# Inspect Detection Head configuration robustly
detect_layer = model.model.model[-1]

print("\n🔎 Detection Head Configuration:")
anchors = getattr(detect_layer, 'anchors', None)
print("Anchors:", anchors if anchors is not None else "Anchor-free")

strides = getattr(detect_layer, 'stride', None)
print("Strides:", strides.tolist() if strides is not None else "Not found")

nl = getattr(detect_layer, 'nl', len(strides) if strides is not None else "Unknown")
print("Number of detection layers (nl):", nl)

print(model)

✅ Backbone Feature Maps:
P3 shape: torch.Size([1, 128, 20, 20])
P4 shape: torch.Size([1, 384, 20, 20])
P5 shape: torch.Size([1, 256, 20, 20])

🔎 Detection Head Configuration:
Anchors: tensor([[ 0.5000,  1.5000,  2.5000,  ...,  8.5000,  9.5000, 10.5000],
        [ 0.5000,  0.5000,  0.5000,  ..., 20.5000, 20.5000, 20.5000]])
Strides: [8.0, 16.0, 32.0]
Number of detection layers (nl): 3
YOLO(
  (model): DetectionModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C3k2(
        (cv1): Conv(
          (conv): Conv

In [None]:
# 2. Define Custom Enhancement Modules
# ------------------------------------
print("\nPhase 2: Defining Custom Enhancement Modules")
print("==========================================")
class CBAM(nn.Module):
    def __init__(self, in_channels, reduction_ratio=16):
        super().__init__()
        self.channel_mlp = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(in_channels, in_channels // reduction_ratio, 1, bias=False),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels // reduction_ratio, in_channels, 1, bias=False)
        )
        self.sigmoid_channel = nn.Sigmoid()
        self.spatial_conv = nn.Conv2d(2, 1, kernel_size=7, padding=3, bias=False)
        self.sigmoid_spatial = nn.Sigmoid()

    def forward(self, x):
        ca_weights = self.sigmoid_channel(self.channel_mlp(x))
        x_ca = ca_weights * x
        sa_input = torch.cat([torch.max(x_ca, dim=1, keepdim=True)[0], torch.mean(x_ca, dim=1, keepdim=True)], dim=1)
        sa_weights = self.sigmoid_spatial(self.spatial_conv(sa_input))
        x_sa = sa_weights * x_ca
        return x_sa

class TransformerEncoderBlock(nn.Module):
    def __init__(self, channels, num_heads=4, num_layers=1, dim_feedforward=None, dropout=0.1):
        super().__init__()
        if dim_feedforward is None:
            dim_feedforward = channels * 2
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=channels,
            nhead=num_heads,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True,  # Critical for QAT compatibility
            activation=F.relu
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.quant = torch.quantization.QuantStub()
        self.dequant = torch.quantization.DeQuantStub()

    def forward(self, x):
        # Dequantize before Transformer
        x = self.dequant(x)

        b, c, h, w = x.size()
        seq = x.view(b, c, -1).permute(0, 2, 1)  # Adjusted for batch_first=True
        seq_out = self.transformer_encoder(seq)
        x_out = seq_out.permute(0, 2, 1).view(b, c, h, w)

        # Requantize after Transformer
        x_out = self.quant(x_out)
        return x_out

class SmallObjectFeatures(nn.Module):
    def __init__(self, in_channels, feature_channels=128):
        super().__init__()
        self.conv_block = nn.Sequential(
            Conv(in_channels, feature_channels, k=3, p=1),
            Conv(feature_channels, feature_channels, k=3, p=1)
        )
    def forward(self, x):
        return self.conv_block(x)

class BiFPN_FusionNode(nn.Module):
    def __init__(self, in_channels_high_res, in_channels_low_res, out_channels):
        super().__init__()
        # merge_conv after concatenation of two feature maps
        self.merge_conv = Conv(in_channels_high_res + in_channels_low_res, out_channels, k=1, p=0)

    def forward(self, x_high_res, x_low_res):
        if x_low_res.shape[2:] != x_high_res.shape[2:]:
            x_low_res = F.interpolate(x_low_res, size=x_high_res.shape[2:], mode='nearest')
        fused = torch.cat([x_high_res, x_low_res], dim=1)
        out = self.merge_conv(fused)
        return out


print("Custom enhancement modules defined.\n")


Phase 2: Defining Custom Enhancement Modules
Custom enhancement modules defined.



In [None]:
# 3. Define the Enhanced YOLO Model Structure
# ---------------------------------------------
print("\nPhase 3: Defining Custom Enhanced YOLO Model Structure")
print("======================================================")

class ModifiedYOLOBackbone(nn.Module):
    def __init__(self, original_model_sequential):
        super().__init__()
        self.layers = list(original_model_sequential.children())
        # Tap points deduced from yolo12n.pt structure (0-indexed)
        self.p2_tap_idx = 2  # Output of layer 2 (original index) for P2 features
        self.p3_tap_idx = 4  # Output of layer 4 for P3 features
        self.p4_tap_idx = 6  # Output of layer 6 for P4 features
        self.p5_tap_idx = 8  # Output of layer 8 for P5 features

        self.stage1 = nn.Sequential(*self.layers[0 : self.p2_tap_idx + 1])
        self.stage2 = nn.Sequential(*self.layers[self.p2_tap_idx + 1 : self.p3_tap_idx + 1])
        self.stage3 = nn.Sequential(*self.layers[self.p3_tap_idx + 1 : self.p4_tap_idx + 1])
        self.stage4 = nn.Sequential(*self.layers[self.p4_tap_idx + 1 : self.p5_tap_idx + 1])

        # Critical fix for export compatibility
        self.f = nn.Sequential(*self.layers)

    def forward(self, x):
        p2_feat = self.stage1(x)
        p3_feat = self.stage2(p2_feat)
        p4_feat = self.stage3(p3_feat)
        p5_feat = self.stage4(p4_feat)
        return p2_feat, p3_feat, p4_feat, p5_feat


class CustomEnhancedYOLOModel(nn.Module):
    def __init__(self, base_model_instance, nc):
        super().__init__()
        self.nc = nc

        # ✅ Correct backbone channels as per provided YOLO12n backbone
        self.ch_p3, self.ch_p4, self.ch_p5 = 128, 128, 256

        # Unified BiFPN channel width
        bifpn_channels = 256

        # ✅ Modified YOLO backbone explicitly using provided backbone
        self.modified_backbone = ModifiedYOLOBackbone(base_model_instance.model.model)

        # ✅ Projection layers matching backbone outputs exactly
        self.p3_proj = Conv(self.ch_p3, bifpn_channels, k=1)  # 128 -> 256
        self.p4_proj = Conv(self.ch_p4, bifpn_channels, k=1)  # 384 -> 256
        self.p5_proj = Conv(self.ch_p5, bifpn_channels, k=1)  # 256 -> 256

        # ✅ Optional small-object features based on backbone output (64 channels at P2)
        self.p2_feat_gen = SmallObjectFeatures(64, feature_channels=bifpn_channels)
        self.p2_neck_conv = Conv(bifpn_channels, bifpn_channels, k=1)

        # ✅ CBAM + Transformer layers match backbone exactly (384 input channels from P4)
        self.cbam_p4 = CBAM(in_channels=self.ch_p4) # clearly 384 channels
        self.transformer_p4 = TransformerEncoderBlock(self.ch_p4, num_heads=4)


        # ✅ Correct BiFPN FusionNodes (input_channels = sum of two features after projection)
        self.bifpn_p4_td = BiFPN_FusionNode(bifpn_channels, bifpn_channels, bifpn_channels)
        self.bifpn_p3_td = BiFPN_FusionNode(bifpn_channels, bifpn_channels, bifpn_channels)
        self.bifpn_p4_out = BiFPN_FusionNode(bifpn_channels, bifpn_channels, bifpn_channels)
        self.bifpn_p5_out = BiFPN_FusionNode(bifpn_channels, bifpn_channels, bifpn_channels)

        # ✅ Detection head explicitly expects BiFPN output channels
        self.detect_head = Detect(nc=self.nc, ch=[bifpn_channels]*3)
        self.detect_head.stride = torch.tensor([8.0, 16.0, 32.0])
        self.stride = self.detect_head.stride

        # ✅ Quantization stubs
        self.quant = QuantStub()
        self.dequant = DeQuantStub()

        # ✅ Ultralytics compatibility
        self.layers = nn.ModuleList([self.modified_backbone, self.detect_head])

    def forward(self, x, targets=None):
        x = self.quant(x)

        # Explicit backbone extraction
        p2, p3, p4, p5 = self.modified_backbone(x)

        print("P3 shape:", p3.shape)
        print("P4 shape:", p4.shape)
        print("P5 shape:", p5.shape)

        # Verify explicitly if needed
        #assert p3.size(1) == 128 and p4.size(1) == 384 and p5.size(1) == 256, "Backbone output mismatch!"

        # CBAM & Transformer explicitly applied BEFORE projection
        p4_enh = self.cbam_p4(p4)
        p4_enh = self.transformer_p4(p4_enh)

        # Explicit projections
        p3_proj = self.p3_proj(p3)
        p4_proj = self.p4_proj(p4_enh)
        p5_proj = self.p5_proj(p5)

        # BiFPN explicitly
        p4_td = self.bifpn_p4_td(p4_proj, p5_proj)
        p3_td = self.bifpn_p3_td(p3_proj, p4_td)

        p4_out = self.bifpn_p4_out(p4_td, p3_td)
        p5_out = self.bifpn_p5_out(p5_proj, p4_out)

        # Dequantize explicitly
        features = [self.dequant(f) for f in [p3_td, p4_out, p5_out]]

        detections = self.detect_head(features)

        if self.training:
            if targets is not None:
                loss, loss_dict = self.compute_loss(detections, targets)
                return loss, loss_dict
            else:
                return torch.zeros(1, device=x.device), {}
        else:
            return detections




Phase 3: Defining Custom Enhanced YOLO Model Structure


In [None]:
!pip install --upgrade torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 ultralytics

Looking in indexes: https://download.pytorch.org/whl/cu118


In [None]:
'''
import os
import yaml
import torch
from ultralytics import YOLO
from ultralytics.models.yolo.detect import DetectionTrainer
from ultralytics.utils.torch_utils import ModelEMA

torch._dynamo.config.suppress_errors = True  # Suppress potential Dynamo-related warnings
torch._dynamo.disable()

# --- STEP 1: Define Paths ---
drive_base_path = '/content/drive/My Drive/'
dataset_root_in_drive = 'SemesterProjectDatas/CombinedData'
project_root_in_drive = 'SemesterProjectDatas/Model/EnhancedYolo12nNew'

combined_dataset_path = os.path.join(drive_base_path, dataset_root_in_drive)
data_yaml = os.path.join(combined_dataset_path, 'data.yaml')
base_model_name = 'yolo12n.pt'
project_dir_enhanced_model = os.path.join(drive_base_path, project_root_in_drive)
experiment_name = 'run_yolo12n_enhanced_qat_final'

os.makedirs(project_dir_enhanced_model, exist_ok=True)

with open(data_yaml, 'r') as f:
    data_dict = yaml.safe_load(f)

nc_from_yaml = data_dict['nc']
class_names_from_yaml = data_dict['names']

# --- STEP 2: Load base YOLO model explicitly ---
base_yolo_for_weights = YOLO(base_model_name)

# --- STEP 3: Instantiate Custom Enhanced YOLO Model ---
custom_pytorch_model = CustomEnhancedYOLOModel(base_yolo_for_weights, nc=nc_from_yaml)
custom_pytorch_model.load_state_dict(base_yolo_for_weights.model.state_dict(), strict=False)
custom_pytorch_model.nc = nc_from_yaml
custom_pytorch_model.names = dict(enumerate(class_names_from_yaml))
custom_pytorch_model.yaml = base_yolo_for_weights.model.yaml
custom_pytorch_model.yaml['nc'] = nc_from_yaml
custom_pytorch_model.yaml['names'] = class_names_from_yaml

# --- STEP 4: Freeze backbone explicitly ---
for param in custom_pytorch_model.modified_backbone.parameters():
    param.requires_grad = False

# --- STEP 5: Forward patch for QAT ---
original_forward = custom_pytorch_model.forward


def qat_forward(x, targets=None):
    if isinstance(x, dict):
        for key in ['img', 'images', 'x']:
            if key in x:
                x = x[key]
                break
        else:
            raise ValueError("Dictionary input missing image keys.")

    if custom_pytorch_model.training:
        x = x.requires_grad_(True)

    preds = original_forward(x, targets)

    if custom_pytorch_model.training:
        if isinstance(preds, tuple):
            loss = preds[0]
            if isinstance(loss, dict):
                loss = sum(v for v in loss.values() if isinstance(v, torch.Tensor))
            if not loss.requires_grad:
                raise RuntimeError("Loss does not require grad; check forward implementation.")
            return loss
        return preds if torch.is_tensor(preds) else preds[0]

    return preds

custom_pytorch_model.forward = qat_forward

# --- STEP 6: QAT Preparation ---
custom_pytorch_model.train()
custom_pytorch_model.qconfig = torch.quantization.get_default_qat_qconfig('fbgemm')
excluded_modules = (TransformerEncoderBlock, CBAM, BiFPN_FusionNode)
for name, module in custom_pytorch_model.named_modules():
    if isinstance(module, excluded_modules):
        module.qconfig = None

torch.quantization.prepare_qat(custom_pytorch_model, inplace=True)

# --- STEP 7: Patch EMA for QAT compatibility ---
def safe_ema_update(self, model):
    with torch.no_grad():
        msd = model.state_dict()
        for k, ema_v in self.ema.state_dict().items():
            if k in msd and ema_v.shape == msd[k].shape:
                if ema_v.dtype.is_floating_point:
                    ema_v.copy_(ema_v * self.decay + msd[k].detach() * (1 - self.decay))

ModelEMA.update = safe_ema_update

# --- STEP 8: Disable EMA completely in YOLO ---
class CustomTrainer(DetectionTrainer):
    def get_model(self, cfg=None, weights=None, verbose=True):
        model = custom_pytorch_model.to(self.device)
        model.nc = self.data['nc']
        model.names = self.data['names']
        model.stride = base_yolo_for_weights.model.stride
        return model

    def build_ema(self, model):
        return None

# --- STEP 9: Assign model to YOLO explicitly ---
managed_model = YOLO()
managed_model.model = custom_pytorch_model
managed_model.overrides['model'] = None

# --- STEP 10: Training ---
train_args = {
    'data': data_yaml,
    'epochs': 100,
    'imgsz': 640,
    'batch': 16,
    'project': project_dir_enhanced_model,
    'name': experiment_name,
    'exist_ok': True,
    'device': 'cuda',
    'patience': 25,
    'optimizer': 'AdamW',
    'lr0': 0.001,
    'lrf': 0.01,
    'amp': False,
    'trainer': CustomTrainer
}

results = managed_model.train(**train_args)

# --- STEP 11: Save final model ---
# Ensure the save directory exists
model_save_dir = os.path.join(project_dir_enhanced_model, experiment_name)
os.makedirs(model_save_dir, exist_ok=True)

# Define the full path explicitly
custom_model_save_path = os.path.join(model_save_dir, 'custom_enhanced_model.pth')

# Save the model state dict
torch.save(custom_pytorch_model.state_dict(), custom_model_save_path)

print(f"✅ Custom Enhanced YOLOv12n QAT model saved at: {custom_model_save_path}")

# Save quantized model for deployment (optional)
quantized_model_path = os.path.join(project_dir_enhanced_model, experiment_name, 'quantized_model.pt')
# Convert QAT model to quantized model
quantized_model = torch.quantization.convert(custom_pytorch_model.eval(), inplace=False)
torch.save(quantized_model.state_dict(), quantized_model_path)
print(f"✅ Quantized model saved at: {quantized_model_path}")


'''
''''

import os
import yaml
import torch
from ultralytics import YOLO
from ultralytics.models.yolo.detect import DetectionTrainer
from ultralytics.utils.torch_utils import ModelEMA

# Suppress Dynamo warnings
torch._dynamo.config.suppress_errors = True
torch._dynamo.disable()

# --- STEP 1: Define Paths ---
drive_base_path = '/content/drive/My Drive/'
dataset_root_in_drive = 'SemesterProjectDatas/CombinedData'
project_root_in_drive = 'SemesterProjectDatas/Model/EnhancedYolo12nNew'

combined_dataset_path = os.path.join(drive_base_path, dataset_root_in_drive)
data_yaml = os.path.join(combined_dataset_path, 'data.yaml')
base_model_name = 'yolo12n.pt'
project_dir_enhanced_model = os.path.join(drive_base_path, project_root_in_drive)
experiment_name = 'run_yolo12n_enhanced_qat_final'

os.makedirs(project_dir_enhanced_model, exist_ok=True)

with open(data_yaml, 'r') as f:
    data_dict = yaml.safe_load(f)

nc_from_yaml = data_dict['nc']
class_names_from_yaml = data_dict['names']

# --- STEP 2: Load base YOLO model explicitly ---
base_yolo_for_weights = YOLO(base_model_name)

# --- STEP 3: Instantiate Custom Enhanced YOLO Model ---
custom_pytorch_model = CustomEnhancedYOLOModel(base_yolo_for_weights, nc=nc_from_yaml)
custom_pytorch_model.load_state_dict(base_yolo_for_weights.model.state_dict(), strict=False)
custom_pytorch_model.nc = nc_from_yaml
custom_pytorch_model.names = dict(enumerate(class_names_from_yaml))
custom_pytorch_model.yaml = base_yolo_for_weights.model.yaml
custom_pytorch_model.yaml['nc'] = nc_from_yaml
custom_pytorch_model.yaml['names'] = class_names_from_yaml

# --- STEP 4: Freeze backbone explicitly ---
for param in custom_pytorch_model.modified_backbone.parameters():
    param.requires_grad = False

# --- STEP 5: Forward patch for QAT with detailed debug statements ---
original_forward = custom_pytorch_model.forward

def qat_forward(x, targets=None):
    if isinstance(x, dict):
        for key in ['img', 'images', 'x']:
            if key in x:
                x = x[key]
                break
        else:
            raise ValueError("Dictionary input missing image keys.")

    if custom_pytorch_model.training:
        x = x.requires_grad_(True)

    preds = original_forward(x, targets)
    print("preds returned from original_forward:", preds)

    if custom_pytorch_model.training:
        if isinstance(preds, tuple):
            loss = preds[0]
            print("Initial loss from tuple:", loss)
            if isinstance(loss, dict):
                loss = sum(v for v in loss.values() if isinstance(v, torch.Tensor))
                print("Summed loss from dict:", loss)

            print("loss.requires_grad:", loss.requires_grad)
            if not loss.requires_grad:
                print("Problematic loss:", loss)
                raise RuntimeError("Loss does not require grad; check forward implementation.")
            return loss

        print("Pred tensor requires_grad:", preds.requires_grad if torch.is_tensor(preds) else preds[0].requires_grad)
        return preds if torch.is_tensor(preds) else preds[0]

    return preds

custom_pytorch_model.forward = qat_forward

# --- STEP 6: QAT Preparation ---
custom_pytorch_model.train()
custom_pytorch_model.qconfig = torch.quantization.get_default_qat_qconfig('fbgemm')
excluded_modules = (TransformerEncoderBlock, CBAM, BiFPN_FusionNode)
for name, module in custom_pytorch_model.named_modules():
    if isinstance(module, excluded_modules):
        module.qconfig = None

torch.quantization.prepare_qat(custom_pytorch_model, inplace=True)

# --- STEP 7: Patch EMA for QAT compatibility ---
def safe_ema_update(self, model):
    with torch.no_grad():
        msd = model.state_dict()
        for k, ema_v in self.ema.state_dict().items():
            if k in msd and ema_v.shape == msd[k].shape:
                if ema_v.dtype.is_floating_point:
                    ema_v.copy_(ema_v * self.decay + msd[k].detach() * (1 - self.decay))

ModelEMA.update = safe_ema_update

# --- STEP 8: Disable EMA completely in YOLO ---
class CustomTrainer(DetectionTrainer):
    def get_model(self, cfg=None, weights=None, verbose=True):
        model = custom_pytorch_model.to(self.device)
        model.nc = self.data['nc']
        model.names = self.data['names']
        model.stride = base_yolo_for_weights.model.stride
        return model

    def build_ema(self, model):
        return None

# --- STEP 9: Assign model to YOLO explicitly ---
managed_model = YOLO()
managed_model.model = custom_pytorch_model
managed_model.overrides['model'] = None

# --- STEP 10: Training ---
train_args = {
    'data': data_yaml,
    'epochs': 100,
    'imgsz': 640,
    'batch': 16,
    'project': project_dir_enhanced_model,
    'name': experiment_name,
    'exist_ok': True,
    'device': 'cuda',
    'patience': 25,
    'optimizer': 'AdamW',
    'lr0': 0.001,
    'lrf': 0.01,
    'amp': False,
    'trainer': CustomTrainer
}

custom_pytorch_model.train()
results = managed_model.train(**train_args)

# --- STEP 11: Save final model ---
model_save_dir = os.path.join(project_dir_enhanced_model, experiment_name)
os.makedirs(model_save_dir, exist_ok=True)

custom_model_save_path = os.path.join(model_save_dir, 'custom_enhanced_model.pth')
torch.save(custom_pytorch_model.state_dict(), custom_model_save_path)
print(f"✅ Custom Enhanced YOLOv12n QAT model saved at: {custom_model_save_path}")
'''

import os
import yaml
import torch
from ultralytics import YOLO
from ultralytics.models.yolo.detect import DetectionTrainer
from ultralytics.nn.modules import Conv, Detect
import torch.nn as nn
import torch.nn.functional as F
from torch.quantization import QuantStub, DeQuantStub


# Suppress Dynamo warnings
torch._dynamo.config.suppress_errors = True
torch._dynamo.disable()


# 1. Paths Setup
base_path = '/content/drive/My Drive/'
data_path = os.path.join(base_path, 'SemesterProjectDatas/CombinedData')
model_path = os.path.join(base_path, 'SemesterProjectDatas/Model/EnhancedYolo12nNew')
data_yaml = os.path.join(data_path, 'data.yaml')
base_model_name = 'yolo12n.pt'

with open(data_yaml, 'r') as f:
    data_dict = yaml.safe_load(f)

nc = data_dict['nc']

# 2. Load YOLO base
base_yolo = YOLO(base_model_name)



class CustomEnhancedYOLOModel(nn.Module):
    def __init__(self, base_model, nc):
        super().__init__()
        self.nc = nc
        bifpn_channels = 256

        self.modified_backbone = ModifiedYOLOBackbone(base_model.model.model)

        self.p3_proj = Conv(128, bifpn_channels, k=1)
        self.p4_proj = Conv(128, bifpn_channels, k=1)
        self.p5_proj = Conv(256, bifpn_channels, k=1)

        self.cbam_p4 = CBAM(128)
        self.transformer_p4 = TransformerEncoderBlock(128)

        self.bifpn_p4_td = BiFPN_FusionNode(bifpn_channels, bifpn_channels, bifpn_channels)
        self.bifpn_p3_td = BiFPN_FusionNode(bifpn_channels, bifpn_channels, bifpn_channels)
        self.bifpn_p4_out = BiFPN_FusionNode(bifpn_channels, bifpn_channels, bifpn_channels)
        self.bifpn_p5_out = BiFPN_FusionNode(bifpn_channels, bifpn_channels, bifpn_channels)

        self.detect_head = Detect(nc=self.nc, ch=[bifpn_channels]*3)
        self.detect_head.stride = torch.tensor([8.0, 16.0, 32.0])

        self.stride = self.detect_head.stride

        self.quant = QuantStub()
        self.dequant = DeQuantStub()

        self.yaml = base_model.yaml

    def forward(self, x):
        x = self.quant(x)
        p2, p3, p4, p5 = self.modified_backbone(x)

        p4_enh = self.cbam_p4(p4)
        p4_enh = self.transformer_p4(p4_enh)

        p3_proj, p4_proj, p5_proj = self.p3_proj(p3), self.p4_proj(p4_enh), self.p5_proj(p5)

        p4_td = self.bifpn_p4_td(p4_proj, p5_proj)
        p3_td = self.bifpn_p3_td(p3_proj, p4_td)
        p4_out = self.bifpn_p4_out(p4_td, p3_td)
        p5_out = self.bifpn_p5_out(p5_proj, p4_out)

        feats = [self.dequant(f) for f in [p3_td, p4_out, p5_out]]
        return self.detect_head(feats)

# Instantiate custom model
custom_model = CustomEnhancedYOLOModel(base_yolo, nc).cuda()

# Freeze backbone
for param in custom_model.modified_backbone.parameters():
    param.requires_grad = False

# 4. Proper QAT Setup
custom_model.train()
custom_model.qconfig = torch.quantization.get_default_qat_qconfig('fbgemm')
for module in custom_model.modules():
    if isinstance(module, (TransformerEncoderBlock, CBAM, BiFPN_FusionNode)):
        module.qconfig = None

custom_model = torch.quantization.prepare_qat(custom_model, inplace=False).cuda()

# 5. Trainer definition
class QATTrainer(DetectionTrainer):
    def get_model(self, cfg=None, weights=None, verbose=True):
        return custom_model

    def build_ema(self, model):
        return None

# 6. Start Training
train_args = {
    'data': data_yaml,
    'epochs': 100,
    'imgsz': 640,
    'batch': 16,
    'project': model_path,
    'name': 'final_qat_yolo',
    'exist_ok': True,
    'device': 'cuda',
    'optimizer': 'AdamW',
    'lr0': 0.001,
    'lrf': 0.01,
    'amp': False,
    'patience': 25,
    'trainer': QATTrainer
}

managed_model = YOLO()
managed_model.model = custom_model
managed_model.train(**train_args)

# 7. Convert & Save
custom_model.cpu().eval()
quantized_model = torch.quantization.convert(custom_model.eval(), inplace=False)

save_path = os.path.join(model_path, 'final_qat_yolo', 'quantized_model.pth')
torch.save(quantized_model.state_dict(), save_path)

print(f"✅ Quantized model saved at: {save_path}")



Ultralytics 8.3.135 🚀 Python-3.11.12 torch-2.7.0+cu118 CUDA:0 (NVIDIA A100-SXM4-40GB, 40507MiB)
[34m[1mengine/trainer: [0magnostic_nms=False, amp=False, augment=False, auto_augment=randaugment, batch=16, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=/content/drive/My Drive/SemesterProjectDatas/CombinedData/data.yaml, degrees=0.0, deterministic=True, device=0, dfl=1.5, dnn=False, dropout=0.0, dynamic=False, embed=None, epochs=100, erasing=0.4, exist_ok=True, fliplr=0.5, flipud=0.0, format=torchscript, fraction=1.0, freeze=None, half=False, hsv_h=0.015, hsv_s=0.7, hsv_v=0.4, imgsz=640, int8=False, iou=0.7, keras=False, kobj=1.0, line_width=None, lr0=0.001, lrf=0.01, mask_ratio=4, max_det=300, mixup=0.0, mode=train, model=yolo11n.pt, momentum=0.937, mosaic=1.0, multi_scale=False, name=final_qat_yolo, nbs=64, nms=False, opset=None, optimize=False, optimizer=AdamW, overlap_m

[34m[1mtrain: [0mScanning /content/drive/MyDrive/SemesterProjectDatas/CombinedData/train/labels.cache... 6527 images, 1141 backgrounds, 0 corrupt: 100%|██████████| 6527/6527 [00:00<?, ?it/s]






[34m[1malbumentations: [0mBlur(p=0.01, blur_limit=(3, 7)), MedianBlur(p=0.01, blur_limit=(3, 7)), ToGray(p=0.01, method='weighted_average', num_output_channels=3), CLAHE(p=0.01, clip_limit=(1.0, 4.0), tile_grid_size=(8, 8))
[34m[1mval: [0mFast image access ✅ (ping: 0.8±0.3 ms, read: 16.8±6.9 MB/s, size: 108.3 KB)


[34m[1mval: [0mScanning /content/drive/MyDrive/SemesterProjectDatas/CombinedData/valid/labels.cache... 591 images, 96 backgrounds, 0 corrupt: 100%|██████████| 591/591 [00:00<?, ?it/s]






Plotting labels to /content/drive/My Drive/SemesterProjectDatas/Model/EnhancedYolo12nNew/final_qat_yolo/labels.jpg... 
[34m[1moptimizer:[0m AdamW(lr=0.001, momentum=0.937) with parameter groups 140 weight(decay=0.0), 159 weight(decay=0.0005), 164 bias(decay=0.0)
Image sizes 640 train, 640 val
Using 8 dataloader workers
Logging results to [1m/content/drive/My Drive/SemesterProjectDatas/Model/EnhancedYolo12nNew/final_qat_yolo[0m
Starting training for 100 epochs...

      Epoch    GPU_mem   box_loss   cls_loss   dfl_loss  Instances       Size


  0%|          | 0/408 [00:00<?, ?it/s]


TypeError: fused_moving_avg_obs_fake_quant(): argument 'input' (position 1) must be Tensor, not dict

In [None]:

# --- STEP 12: Post-training quantization and export ---
# Ensure training_successful is defined explicitly
training_successful = os.path.exists(os.path.join(project_dir_enhanced_model, experiment_name, 'weights', 'best.pt'))

if training_successful:
    best_model_path = os.path.join(project_dir_enhanced_model, experiment_name, 'weights', 'best.pt')
    print(f"✅ Found best model from training at: {best_model_path}")

    if os.path.exists(best_model_path):
        # Load the trained YOLO model weights explicitly and correctly
        loaded_weights = torch.load(best_model_path, map_location='cpu')

        if 'model' in loaded_weights:
            model_state_dict = loaded_weights['model'].state_dict()
        elif 'state_dict' in loaded_weights:
            model_state_dict = loaded_weights['state_dict']
        else:
            model_state_dict = loaded_weights

        # Instantiate a fresh custom model for post-training quantization
        # (separate from the QAT model we've been working with)
        quant_base_model = YOLO(base_model_name)
        ptq_model = CustomEnhancedYOLOModel(
            base_model_instance=quant_base_model,
            nc=nc_from_yaml
        )
        ptq_model.load_state_dict(model_state_dict, strict=False)
        ptq_model.cpu().eval()

        # Configure post-training quantization explicitly
        ptq_model.qconfig = torch.quantization.get_default_qconfig('fbgemm')

        # Explicitly exclude incompatible modules from quantization
        for name, module in ptq_model.named_modules():
            if isinstance(module, (TransformerEncoderBlock, CBAM, BiFPN_FusionNode)):
                module.qconfig = None
                print(f"🚩 Excluded '{name}' ({module.__class__.__name__}) from PTQ quantization.")

        # Prepare and convert to INT8 explicitly (post-training quantization)
        torch.quantization.prepare(ptq_model, inplace=True)
        ptq_quantized_model = torch.quantization.convert(ptq_model, inplace=True)

        # Save INT8 post-training quantized PyTorch model explicitly
        ptq_model_path = os.path.join(project_dir_enhanced_model, experiment_name, 'ptq_quantized_model_int8.pth')
        os.makedirs(os.path.dirname(ptq_model_path), exist_ok=True)
        torch.save(ptq_quantized_model.state_dict(), ptq_model_path)
        print(f"✅ INT8 post-training quantized PyTorch model saved at: {ptq_model_path}")

        # Export the INT8 model to TFLite using YOLO's export functionality clearly
        # We'll export both the QAT model and the PTQ model for comparison

        # 1. Export QAT model to TFLite
        print("\n--- Exporting QAT model to TFLite ---")
        qat_export_model = YOLO(base_model_name)
        qat_export_model.model.load_state_dict(quantized_model.state_dict(), strict=False)

        # Explicitly perform export
        qat_export_model.export(
            format='tflite',
            imgsz=640,  # explicitly match training image size
            int8=True,
            data=data_yaml,
            device='cpu'
        )

        # Explicitly handle QAT TFLite paths
        qat_default_tflite_path = os.path.join(qat_export_model.export_dir, 'model.tflite')
        qat_target_tflite_path = os.path.join(project_dir_enhanced_model, experiment_name, 'model_qat_int8.tflite')

        # Ensure directories exist explicitly
        os.makedirs(os.path.dirname(qat_target_tflite_path), exist_ok=True)

        # Move TFLite model explicitly after verifying existence
        if os.path.exists(qat_default_tflite_path):
            import shutil
            shutil.copy2(qat_default_tflite_path, qat_target_tflite_path)  # Copy instead of move
            print(f"✅ TFLite QAT-INT8 model exported successfully: {qat_target_tflite_path}")
        else:
            print(f"❌ QAT TFLite export failed; file not found at: {qat_default_tflite_path}")

        # 2. Export PTQ model to TFLite
        print("\n--- Exporting PTQ model to TFLite ---")
        ptq_export_model = YOLO(base_model_name)
        ptq_export_model.model.load_state_dict(ptq_quantized_model.state_dict(), strict=False)

        # Explicitly perform export
        ptq_export_model.export(
            format='tflite',
            imgsz=640,  # explicitly match training image size
            int8=True,
            data=data_yaml,
            device='cpu'
        )

        # Explicitly handle PTQ TFLite paths
        ptq_default_tflite_path = os.path.join(ptq_export_model.export_dir, 'model.tflite')
        ptq_target_tflite_path = os.path.join(project_dir_enhanced_model, experiment_name, 'model_ptq_int8.tflite')

        # Move TFLite model explicitly after verifying existence
        if os.path.exists(ptq_default_tflite_path):
            import shutil
            shutil.copy2(ptq_default_tflite_path, ptq_target_tflite_path)  # Copy instead of move
            print(f"✅ TFLite PTQ-INT8 model exported successfully: {ptq_target_tflite_path}")
        else:
            print(f"❌ PTQ TFLite export failed; file not found at: {ptq_default_tflite_path}")

        print("\n--- Quantization and Export Summary ---")
        print("1. QAT model: Trained with quantization awareness throughout training")
        print("2. PTQ model: Best model quantized after training")
        print("Compare these models to see which approach gives better results for your use case.")

    else:
        print(f"❌ Best model weights not found at: {best_model_path}")
else:
    print("\n❌ Training was unsuccessful or incomplete. Post-training INT8 conversion skipped.")
    print("However, the QAT model has still been saved and can be used for inference.")


In [None]:
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
from IPython.display import display, Image as IPImage
from ultralytics.utils.plotting import plot_results as ultralytics_plot_results

print("\n\nPhase 6: Post-Training Metrics, Plotting, and Export")
print("========================================================")

# Check if training was successful by looking for best.pt
best_weights_path = os.path.join(project_dir_enhanced_model, experiment_name, 'weights', 'best.pt')
training_successful = os.path.exists(best_weights_path)

if training_successful:
    current_run_dir = os.path.join(project_dir_enhanced_model, experiment_name)
    path_to_best_model_pt = os.path.join(current_run_dir, 'weights', 'best.pt')

    if os.path.exists(path_to_best_model_pt):
        print(f"Best QAT-trained model found: {path_to_best_model_pt}")
        eval_model = YOLO(path_to_best_model_pt) # Load the best QAT model for evaluation

        print("\nEvaluating QAT-trained model on validation set...")
        # Pass split='val' if your data.yaml has a 'val' key pointing to val images
        # Ensure batch size for validation is manageable
        val_metrics = eval_model.val(data=data_yaml, imgsz=image_size_for_training, batch=max(1, batch_size // 2), device=device_to_use, plots=True, split='val')

        print("\n--- Processing and Saving Metrics (as per user request) ---")
        # Convert numpy types to regular Python types (using val_metrics object)
        metrics_dict = {
            'mAP50-95': float(val_metrics.box.map) if hasattr(val_metrics.box, 'map') else None,
            'mAP50': float(val_metrics.box.map50) if hasattr(val_metrics.box, 'map50') else None,
            'mAP75': float(val_metrics.box.map75) if hasattr(val_metrics.box, 'map75') else None,
            'Mean_Precision': float(val_metrics.box.mp) if hasattr(val_metrics.box, 'mp') else None,
            'Mean_Recall': float(val_metrics.box.mr) if hasattr(val_metrics.box, 'mr') else None,
            'Per_class_AP': val_metrics.box.maps.tolist() if hasattr(val_metrics.box, 'maps') and val_metrics.box.maps is not None else None,
            'Per_class_precision': val_metrics.box.p.tolist() if hasattr(val_metrics.box, 'p') and val_metrics.box.p is not None else None,
            'Per_class_recall': val_metrics.box.r.tolist() if hasattr(val_metrics.box, 'r') and val_metrics.box.r is not None else None,
            'F1_scores': val_metrics.box.f1.tolist() if hasattr(val_metrics.box, 'f1') and val_metrics.box.f1 is not None else None,
        }
        if hasattr(val_metrics, 'speed') and isinstance(val_metrics.speed, dict):
            metrics_dict['Inference_speed(ms)'] = {
                'preprocessing': float(val_metrics.speed.get('preprocess', 0)),
                'inference': float(val_metrics.speed.get('inference', 0)),
                'postprocessing': float(val_metrics.speed.get('postprocess', 0))
            }
        elif hasattr(val_metrics, 'speed'): # if speed is a single value
             metrics_dict['Inference_speed(ms)'] = float(val_metrics.speed)
        else:
            metrics_dict['Inference_speed(ms)'] = None

        # Save metrics to JSON in the specific run directory
        metrics_json_path = os.path.join(current_run_dir, 'evaluation_metrics.json')
        with open(metrics_json_path, 'w') as f:
            json.dump(metrics_dict, f, indent=4)
        print(f"Detailed evaluation metrics saved to: {metrics_json_path}")

        # Plot YOLO results (standard Ultralytics plots)
        results_csv_path = os.path.join(current_run_dir, 'results.csv')
        print(f"Using results CSV for plotting: {results_csv_path}")

        if os.path.exists(results_csv_path):
            try:
                # This function directly shows plots if in interactive env.
                ultralytics_plot_results(file=results_csv_path)
                # Find the generated 'results.png' and save it with a more specific name if needed,
                # or assume it's saved in current_run_dir by plot_results.
                # Let's save the current figure if plot_results generates one.
                plt.savefig(os.path.join(current_run_dir, 'training_progress_curves.png'))
                # plt.show() # May not be needed if plot_results shows it.
                print(f"Ultralytics plot_results saved to {os.path.join(current_run_dir, 'training_progress_curves.png')}")
            except Exception as e_plot:
                print(f"Ultralytics plot_results failed: {e_plot}")

            # Add custom loss plots from CSV (User's requested format)
            df = pd.read_csv(results_csv_path)
            df.columns = df.columns.str.strip() # Clean column names

            fig, axes = plt.subplots(2, 2, figsize=(15, 10))
            plot_specs = [
                ('train/box_loss', 'Box Loss', 'blue', (0,0)),
                ('train/cls_loss', 'Classification Loss', 'orange', (0,1)),
                ('train/dfl_loss', 'DFL Loss', 'green', (1,0))
            ]
            for col, title, color, pos in plot_specs:
                if col in df.columns:
                    axes[pos].plot(df['epoch'], df[col], label=title, color=color)
                    axes[pos].set_title(f'Training {title}')
                    axes[pos].set_xlabel('Epoch'); axes[pos].set_ylabel('Loss')
                    axes[pos].legend(); axes[pos].grid(True)
                else:
                    print(f"Column {col} not found for plotting.")
                    fig.delaxes(axes[pos]) # Remove unused subplot axis

            # Validation mAP plot
            ax_map = axes[1,1]
            plotted_map = False
            if 'metrics/mAP50(B)' in df.columns:
                ax_map.plot(df['epoch'], df['metrics/mAP50(B)'], label='mAP50 (Best)', color='red')
                plotted_map = True
            if 'metrics/mAP50-95(B)' in df.columns:
                ax_map.plot(df['epoch'], df['metrics/mAP50-95(B)'], label='mAP50-95 (Best)', color='purple')
                plotted_map = True

            if plotted_map:
                ax_map.set_title('Validation mAP')
                ax_map.set_xlabel('Epoch'); ax_map.set_ylabel('mAP')
                ax_map.legend(); ax_map.grid(True)
            else:
                print("mAP columns not found for plotting.")
                fig.delaxes(ax_map)

            plt.tight_layout()
            custom_loss_plot_path = os.path.join(current_run_dir, 'custom_loss_and_map_plots.png')
            plt.savefig(custom_loss_plot_path)
            plt.show()
            print(f"Custom loss and mAP plots saved to: {custom_loss_plot_path}")

            # Combined Training Losses Plot
            plt.figure(figsize=(12, 8))
            plotted_any_combined_loss = False
            if 'train/box_loss' in df.columns:
                plt.plot(df['epoch'], df['train/box_loss'], label='Box Loss', linewidth=2)
                plotted_any_combined_loss = True
            if 'train/cls_loss' in df.columns:
                plt.plot(df['epoch'], df['train/cls_loss'], label='Classification Loss', linewidth=2)
                plotted_any_combined_loss = True
            if 'train/dfl_loss' in df.columns:
                plt.plot(df['epoch'], df['train/dfl_loss'], label='DFL Loss', linewidth=2)
                plotted_any_combined_loss = True

            if plotted_any_combined_loss:
                plt.title('Training Losses over Epochs', fontsize=16)
                plt.xlabel('Epoch', fontsize=14); plt.ylabel('Loss', fontsize=14)
                plt.legend(fontsize=12); plt.grid(True, alpha=0.3)
                combined_losses_path = os.path.join(current_run_dir, 'combined_training_losses.png')
                plt.savefig(combined_losses_path, dpi=300)
                plt.show()
                print(f"Combined training losses plot saved to: {combined_losses_path}")
            else:
                print("No training loss columns found for combined plot.")
        else:
            print(f"Results CSV not found at {results_csv_path}, skipping custom plots.")

        # Display YOLO-generated plots (these are typically saved in the run directory)
        print("\n--- Displaying YOLO-generated Plots (if any) ---")
        yolo_plots_to_display = ['results.png', 'confusion_matrix.png', 'labels.jpg', 'PR_curve.png', 'F1_curve.png']
        # Also add the custom plots we saved
        custom_saved_plots = ['training_progress_curves.png', 'custom_loss_and_map_plots.png', 'combined_training_losses.png']

        for plot_name in yolo_plots_to_display + custom_saved_plots:
            plot_path = os.path.join(current_run_dir, plot_name)
            if os.path.exists(plot_path):
                print(f"\nDisplaying {plot_name}:")
                try:
                    display(IPImage(filename=plot_path))
                except Exception as img_e:
                    print(f"Could not display image {plot_path}: {img_e}")
            else:
                print(f"Plot {plot_name} not found in {current_run_dir}")

        # Export to TFLite INT8
        try:
            print("\nExporting QAT-trained model to TFLite INT8...")
            tflite_path = eval_model.export(format='tflite', imgsz=image_size_for_training, int8=True, data=data_yaml, nms=True) # Add nms=True for better deployment
            print(f"TFLite INT8 model exported to: {tflite_path}")
        except Exception as e_export:
            print(f"Error exporting to TFLite INT8: {e_export}")
            print("Ensure the export format and parameters are compatible with your trained model and Ultralytics version.")
    else:
        print(f"WARNING: Best QAT model (best.pt) not found at {path_to_best_model_pt}. Skipping final evaluation, plotting, and export.")
else:
    print("\nTraining was not successful or was interrupted. Skipping post-training steps.")

In [None]:
import json
print("\n\nPhase 7: Evaluation and Metrics using INT8 Quantized Model")
print("===========================================================")

if training_successful:
    current_run_dir = os.path.join(project_dir_enhanced_model, experiment_name)

    # Explicitly Load your INT8-converted model instead of 'best.pt'
    quantized_model_path = os.path.join(current_run_dir, 'quantized_model.pt')

    if os.path.exists(quantized_model_path):
        print(f"INT8 Quantized model found: {quantized_model_path}")

        # ✅ Instantiate your custom model architecture explicitly
        int8_model = CustomEnhancedYOLOModel(
            base_model_instance=YOLO(base_model_name),  # Re-init base model clearly
            nc=nc_from_yaml
        )

        # ✅ Prepare and convert explicitly for QAT (INT8)
        int8_model.qconfig = torch.quantization.get_default_qat_qconfig('fbgemm')
        torch.quantization.prepare_qat(int8_model, inplace=True)
        int8_model = torch.quantization.convert(int8_model, inplace=True)

        # ✅ Load INT8 weights explicitly
        int8_model.load_state_dict(torch.load(quantized_model_path))
        int8_model.eval()

        # ✅ Wrap the INT8 model explicitly into YOLO container
        eval_model = YOLO(base_model_name)  # fresh YOLO instance
        eval_model.model = int8_model
        eval_model.model.names = dict(enumerate(class_names_from_yaml))
        eval_model.model.nc = nc_from_yaml
        eval_model.model.stride = int8_model.stride
        eval_model.model.yaml = custom_pytorch_model.yaml  # Copy YAML config for compatibility

        print("\nEvaluating INT8 Quantized model on validation set...")
        val_metrics_int8 = eval_model.val(
            data=data_yaml,
            imgsz=image_size_for_training,
            batch=max(1, batch_size // 2),
            device='cpu',  # INT8 quantized model evaluation on CPU only
            plots=True,
            split='val'
        )

        print("\n--- Saving INT8 Quantized Model Metrics ---")

        # ✅ INT8 Quantized Metrics clearly labeled
        metrics_dict_int8 = {
            'mAP50-95': float(val_metrics_int8.box.map),
            'mAP50': float(val_metrics_int8.box.map50),
            'mAP75': float(val_metrics_int8.box.map75),
            'Mean_Precision': float(val_metrics_int8.box.mp),
            'Mean_Recall': float(val_metrics_int8.box.mr),
            'Per_class_AP': val_metrics_int8.box.maps.tolist(),
            'Per_class_precision': val_metrics_int8.box.p.tolist(),
            'Per_class_recall': val_metrics_int8.box.r.tolist(),
            'F1_scores': val_metrics_int8.box.f1.tolist(),
        }

        if hasattr(val_metrics_int8, 'speed') and isinstance(val_metrics_int8.speed, dict):
            metrics_dict_int8['Inference_speed(ms)'] = {
                'preprocessing': float(val_metrics_int8.speed.get('preprocess', 0)),
                'inference': float(val_metrics_int8.speed.get('inference', 0)),
                'postprocessing': float(val_metrics_int8.speed.get('postprocess', 0))
            }
        elif hasattr(val_metrics_int8, 'speed'):
            metrics_dict_int8['Inference_speed(ms)'] = float(val_metrics_int8.speed)
        else:
            metrics_dict_int8['Inference_speed(ms)'] = None

        # ✅ Save metrics to JSON clearly labeled for INT8 model
        metrics_int8_json_path = os.path.join(current_run_dir, 'evaluation_metrics_INT8.json')
        with open(metrics_int8_json_path, 'w') as f:
            json.dump(metrics_dict_int8, f, indent=4)

        print(f"INT8 evaluation metrics saved to: {metrics_int8_json_path}")

        # ✅ Export INT8 quantized model explicitly to TFLite (Recommended)
        try:
            print("\nExporting INT8 Quantized model to TFLite INT8 format...")
            tflite_int8_path = eval_model.export(
                format='tflite', imgsz=image_size_for_training,
                int8=True, data=data_yaml, nms=True, device='cpu'
            )
            print(f"✅ TFLite INT8 model exported to: {tflite_int8_path}")

            # Move to a standard location
            import shutil
            tflite_target_path = os.path.join(current_run_dir, 'model_int8.tflite')
            if os.path.exists(tflite_int8_path):
                shutil.copy2(tflite_int8_path, tflite_target_path)
                print(f"✅ TFLite INT8 model copied to: {tflite_target_path}")
        except Exception as e_export:
            print(f"❌ Error exporting to TFLite INT8: {e_export}")
            print("Ensure compatibility between INT8 model and Ultralytics export methods.")

    else:
        print(f"❌ INT8 Quantized model not found at {quantized_model_path}. Skipping evaluation.")
else:
    print("\n❌ Training failed or was interrupted. Skipping INT8 evaluation and plots.")

print("\n✅ Phase 7 execution completed.")

In [None]:
def process_video_from_drive(
    model_path,
    input_video_path,
    output_video_path=None,
    conf_threshold=0.25,
    fps_target=1
):
    """
    Process a video file from Google Drive using the quantized model,
    save the detection results as a new video.

    Args:
        model_path: Path to the quantized model (.pth or .tflite)
        input_video_path: Path to the input video file in Google Drive
        output_video_path: Path to save the output video (if None, will generate automatically)
        conf_threshold: Confidence threshold for detections (0-1)
        fps_target: Target frames per second for processing (default: 1)

    Returns:
        output_path: Path to the saved output video
    """
    import cv2
    import time
    import numpy as np
    from datetime import datetime

    # Validate input video path
    if not os.path.exists(input_video_path):
        raise FileNotFoundError(f"Input video not found at: {input_video_path}")

    # Generate output path if not provided
    if output_video_path is None:
        video_name = os.path.basename(input_video_path)
        video_name_no_ext = os.path.splitext(video_name)[0]
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_video_path = os.path.join(
            os.path.dirname(input_video_path),
            f"{video_name_no_ext}_detected_{timestamp}.mp4"
        )

    # Ensure output directory exists
    os.makedirs(os.path.dirname(output_video_path), exist_ok=True)

    print(f"Loading model