<a href="https://colab.research.google.com/github/DATAGEEKN/AI_SPRINT_2025_SmartVision_Counter_PaliGemma.ipynb/blob/main/AI_SPRINT_2025_SmartVision_Counter_PaliGemma.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from huggingface_hub import notebook_login
print("--- Starting Hugging Face Authentication ---")
notebook_login()
print("--- Hugging Face Authentication Widget Displayed. Please enter your token. ---")
print("After entering token and seeing 'Login successful', RESTART RUNTIME and then run the main code cell.")

In [None]:

# --- 1. Installations ---
print("\n--- 1. Performing Installations ---")
# Install necessary libraries. `kagglehub` is removed as it caused NameErrors and is not explicitly needed.
!pip install -q transformers peft accelerate bitsandbytes torch torchvision pillow supervision
print("Installations complete.")

In [None]:
# --- 2. Imports ---
print("\n--- 2. Performing Imports ---")
import os
import torch
import json
import shutil
import re
import numpy as np # For dummy image generation
from tqdm.notebook import tqdm
from sklearn.model_selection import train_test_split
from transformers import PaliGemmaProcessor, PaliGemmaForConditionalGeneration
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from torch.utils.data import Dataset, DataLoader
from transformers import Trainer, TrainingArguments
from PIL import Image, ImageDraw # For drawing on dummy images
import supervision as sv

In [None]:
# --- 3. API Key & Device Setup ---
print("\n--- 3. Setting up API Keys and Device ---")
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

kaggle_credentials_loaded = False
try:
    from google.colab import userdata
    os.environ["KAGGLE_USERNAME"] = userdata.get('KAGGLE_USERNAME')
    os.environ["KAGGLE_KEY"] = userdata.get('KAGGLE_KEY')
    if os.environ.get("KAGGLE_USERNAME") and os.environ.get("KAGGLE_KEY"):
        kaggle_credentials_loaded = True
        print("Kaggle API credentials loaded successfully from Colab Secrets.")
except Exception as e:
    print(f"ERROR: Could not load Kaggle credentials: {e}. Check Colab Secrets.")

# Roboflow API key no longer strictly needed for this version,  .
roboflow_api_key_loaded = True # Assume true if not critical for this specific code path
try:
    if userdata.get('ROBOFLOW_API_KEY'):
        os.environ["ROBOFLOW_API_KEY"] = userdata.get('ROBOFLOW_API_KEY')
        print("Roboflow API key found (though not critical for this dummy dataset version).")
except Exception: pass # Ignore if not present

if not kaggle_credentials_loaded: # Only Kaggle critical now for model download
    raise RuntimeError("CRITICAL: Kaggle API key is NOT loaded. Please fix in Colab Secrets and RESTART RUNTIME.")

os.makedirs("datasets", exist_ok=True)
os.makedirs("output_images", exist_ok=True)
print("API keys and device setup complete.")


In [None]:

# --- 4. Load PaliGemma Model and Configure LoRA ---
print("\n--- 4. Loading PaliGemma Model and Configuring LoRA ---")
model_id = "google/paligemma-3b-mix-224"
processor = PaliGemmaProcessor.from_pretrained(model_id)
model = PaliGemmaForConditionalGeneration.from_pretrained(
    model_id, torch_dtype=torch.bfloat16, device_map="auto"
)
# Prepare for 4-bit quantization and LoRA fine-tuning
model = prepare_model_for_kbit_training(model)
lora_config = LoraConfig(
    r=8, # LoRA rank
    target_modules=["q_proj", "o_proj", "k_proj", "v_proj", "gate_proj", "up_proj", "down_proj"],
    task_type="CAUSAL_LM"
)
model = get_peft_model(model, lora_config)

# --- Ensure LoRA adapters have requires_grad=True (Fix for grad_fn error) ---
for name, param in model.named_parameters():
    if "lora" in name:
        param.requires_grad = True
    elif "norm" in name: # Often beneficial to train layernorms
        param.requires_grad = True
    else:
        param.requires_grad = False # Freeze base model parameters
# --------------------------------------------------------------------------

model.print_trainable_parameters()
print("PaliGemma model loaded and LoRA configured.")



In [None]:
# --- 5. Generate DUMMY Dataset ---
print("\n--- 5. Generating DUMMY Dataset ---")

DUMMY_DATASET_DIR = "datasets/dummy_fruit_data"
os.makedirs(os.path.join(DUMMY_DATASET_DIR, 'train', 'images'), exist_ok=True)
os.makedirs(os.path.join(DUMMY_DATASET_DIR, 'val', 'images'), exist_ok=True)
os.makedirs(os.path.join(DUMMY_DATASET_DIR, 'test', 'images'), exist_ok=True)

# Define dummy classes and some colors for visualization
ALL_CLASSES = ["apple", "banana", "orange"]
CLASS_COLORS = [(255, 0, 0), (255, 255, 0), (255, 165, 0)] # Red, Yellow, Orange

def generate_dummy_image_and_annotation(image_id, dataset_type, classes, img_size=512, num_objects=3):
    img_array = np.zeros((img_size, img_size, 3), dtype=np.uint8) + 50 # Dark background
    img = Image.fromarray(img_array)
    draw = ImageDraw.Draw(img)

    annotations = []

    # Generate random objects and draw them
    for i in range(num_objects):
        class_idx = np.random.randint(0, len(classes))
        class_name = classes[class_idx]
        color = CLASS_COLORS[class_idx]

        # Random bounding box (ensure within bounds and minimum size)
        x_min = np.random.randint(0, img_size - 50)
        y_min = np.random.randint(0, img_size - 50)
        x_max = np.random.randint(x_min + 20, min(img_size, x_min + 100))
        y_max = np.random.randint(y_min + 20, min(img_size, y_min + 100))

        draw.rectangle([x_min, y_min, x_max, y_max], fill=color, outline=(255,255,255))
        draw.text((x_min + 5, y_min + 5), class_name, fill=(255,255,255))

        # Convert to PaliGemma 0-1023 normalized coordinates (y1, x1, y2, x2)
        y1_norm = int((y_min / img_size) * 1023)
        x1_norm = int((x_min / img_size) * 1023)
        y2_norm = int((y_max / img_size) * 1023)
        x2_norm = int((x_max / img_size) * 1023)

        loc_tokens = f"<loc{y1_norm:04d}><loc{x1_norm:04d}><loc{y2_norm:04d}><loc{x2_norm:04d}>"
        annotations.append({"loc_tokens": loc_tokens, "class_name": class_name})

    img_filename = f"{dataset_type}_{image_id:03d}.jpg"
    img_save_path = os.path.join(DUMMY_DATASET_DIR, dataset_type, 'images', img_filename)
    img.save(img_save_path)

    # Prepare JSONL entry for PaliGemma format
    annotations.sort(key=lambda x: x["class_name"])
    unique_classes_in_image = sorted(list(set(obj["class_name"] for obj in annotations)))
    prefix = "detect " + " ; ".join(unique_classes_in_image)
    suffix = " ; ".join([f"{obj['loc_tokens']} {obj['class_name']}" for obj in annotations])

    return {
        "image": os.path.join(dataset_type, 'images', img_filename),
        "prefix": prefix,
        "suffix": suffix
    }

# Generate dummy data for train, val, test splits
dummy_train_entries = []
dummy_val_entries = []
dummy_test_entries = []

num_train_samples = 50
num_val_samples = 10
num_test_samples = 10

for i in tqdm(range(num_train_samples), desc="Generating Train Data"):
    dummy_train_entries.append(generate_dummy_image_and_annotation(i, 'train', ALL_CLASSES))

for i in tqdm(range(num_val_samples), desc="Generating Val Data"):
    dummy_val_entries.append(generate_dummy_image_and_annotation(i, 'val', ALL_CLASSES))

for i in tqdm(range(num_test_samples), desc="Generating Test Data"):
    dummy_test_entries.append(generate_dummy_image_and_annotation(i, 'test', ALL_CLASSES))

# Save dummy data as JSONL files
TRAIN_JSONL_PATH = os.path.join(DUMMY_DATASET_DIR, '_annotations.train.jsonl')
VAL_JSONL_PATH = os.path.join(DUMMY_DATASET_DIR, '_annotations.val.jsonl')
TEST_JSONL_PATH = os.path.join(DUMMY_DATASET_DIR, '_annotations.test.jsonl')

with open(TRAIN_JSONL_PATH, 'w') as f:
    for entry in dummy_train_entries: f.write(json.dumps(entry) + '\n')
with open(VAL_JSONL_PATH, 'w') as f:
    for entry in dummy_val_entries: f.write(json.dumps(entry) + '\n')
with open(TEST_JSONL_PATH, 'w') as f:
    for entry in dummy_test_entries: f.write(json.dumps(entry) + '\n')
# Define paths consistent with dataset loaders
TRAIN_IMAGES_DIR = os.path.join(DUMMY_DATASET_DIR, 'train', 'images')
VAL_IMAGES_DIR = os.path.join(DUMMY_DATASET_DIR, 'val', 'images')
TEST_IMAGES_DIR = os.path.join(DUMMY_DATASET_DIR, 'test', 'images')
ROBOFLOW_DATA_ROOT = DUMMY_DATASET_DIR # Used as image_directory_root in dataset class
ALL_CLASSES = ALL_CLASSES # Already defined above

print("Dummy dataset generation complete.")


In [None]:
# --- 6. Define Custom Dataset Class and Instantiate DataLoaders ---
print("\n--- 6. Initializing Custom Dataset Loaders ---")

class PaliGemmaODDataset(Dataset):
    def __init__(self, jsonl_file_path: str, image_directory_root: str, processor):
        self.entries = self._load_entries(jsonl_file_path)
        self.image_directory_root = image_directory_root
        self.processor = processor
        self.max_length = 256
    def _load_entries(self, file_path: str):
        if not os.path.exists(file_path): print(f"WARNING: JSONL file not found at {file_path}. Dataset will be empty."); return []
        with open(file_path, 'r') as file: return [json.loads(line) for line in file]
    def __len__(self): return len(self.entries)
    def __getitem__(self, idx):
        entry = self.entries[idx]
        image_path = os.path.join(self.image_directory_root, entry['image'])
        if not os.path.exists(image_path): print(f"WARNING: Image file not found for entry {idx}: {image_path}. Skipping."); return self.__getitem__((idx + 1) % len(self.entries))
        image = Image.open(image_path).convert("RGB")

        # CORRECTED: Add <image> token to text_input as recommended by processor
        text_input = "<image>" + entry['prefix'] + entry['suffix']

        inputs = self.processor(
            text=text_input,
            images=image,
            return_tensors="pt",
            padding="max_length",
            truncation=True,
            max_length=self.max_length
        )
        inputs['labels'] = inputs['input_ids'].clone()
        inputs = {k: v.squeeze(0) for k, v in inputs.items()}
        return inputs

DATASET_ROOT = DUMMY_DATASET_DIR # Now points to the dummy data
train_dataset = PaliGemmaODDataset(TRAIN_JSONL_PATH, DATASET_ROOT, processor)
val_dataset = PaliGemmaODDataset(VAL_JSONL_PATH, DATASET_ROOT, processor) if VAL_JSONL_PATH and os.path.exists(VAL_JSONL_PATH) else None
test_dataset = PaliGemmaODDataset(TEST_JSONL_PATH, DATASET_ROOT, processor) if os.path.exists(TEST_JSONL_PATH) else None

print(f"Train dataset size: {len(train_dataset)} samples.")
if val_dataset: print(f"Validation dataset size: {len(val_dataset)} samples.")
if test_dataset: print(f"Test dataset size: {len(test_dataset)} samples.")
print("Custom Dataset Loaders Initialized.")


In [None]:

# --- 7. Fine-tuning Loop ---
print("\n--- 7. Starting Fine-tuning Process ---")
output_dir_path = "./paligemma_fine_tuned_fruit_counter"
os.makedirs(output_dir_path, exist_ok=True)
training_args = TrainingArguments(
    output_dir=output_dir_path, num_train_epochs=5,
    per_device_train_batch_size=1, # Reduced for memory (1 image per GPU batch)
    gradient_accumulation_steps=16, # Increased to compensate (effective batch size 1 * 16 = 16)
    learning_rate=2e-5, fp16=True if torch.cuda.is_available() else False,
    gradient_checkpointing=True, # Added for memory optimization (trades speed for memory)
    logging_dir="./logs", logging_steps=50,
    save_steps=200, save_total_limit=2,
    eval_strategy="steps" if val_dataset else "no", # Corrected argument name
    eval_steps=200 if val_dataset else None, load_best_model_at_end=True if val_dataset else False,
    metric_for_best_model="eval_loss" if val_dataset else None, greater_is_better=False, push_to_hub=False, report_to="tensorboard"
)
trainer = Trainer(model=model, args=training_args, train_dataset=train_dataset, eval_dataset=val_dataset, tokenizer=processor)

if len(train_dataset) == 0:
    print("ERROR: No training data available. Cannot start training.")
else:
    trainer.train()
    print("Fine-tuning training complete.")
    model.save_pretrained(output_dir_path)
    processor.save_pretrained(output_dir_path)
    print(f"Fine-tuned model and processor saved to: {output_dir_path}")
print("Fine-tuning Process Complete.")


In [None]:
# --- 8. Inference and Object Counting ---
print("\n--- 8. Starting Inference and Object Counting ---")
fine_tuned_model_path = "./paligemma_fine_tuned_fruit_counter"
# Ensure the model (fine-tuned or base) is loaded correctly for inference
if not os.path.exists(fine_tuned_model_path) or not os.path.exists(os.path.join(fine_tuned_model_path, 'config.json')):
    print(f"WARNING: Fine-tuned model not found at '{fine_tuned_model_path}'. Loading base model.")
    model_id = "google/paligemma-3b-mix-224"
    # processor defined earlier
    model = PaliGemmaForConditionalGeneration.from_pretrained(model_id, torch_dtype=torch.bfloat16, device_map="auto")
else:
    # processor defined earlier
    processor = PaliGemmaProcessor.from_pretrained(fine_tuned_model_path)
    model = PaliGemmaForConditionalGeneration.from_pretrained(fine_tuned_model_path, torch_dtype=torch.bfloat16, device_map="auto")
model.eval()

if 'ALL_CLASSES' not in globals() or not ALL_CLASSES:
    print("WARNING: ALL_CLASSES list is empty or not found. Defaulting to ['apple', 'banana', 'orange'].")
    ALL_CLASSES = ["apple", "banana", "orange"]
print(f"Classes for detection: {ALL_CLASSES}")

def predict_and_count(image_path: str, classes_to_detect: list, model, processor, device):
    if not os.path.exists(image_path): print(f"Error: Image not found at {image_path}."); return {}, None
    image = Image.open(image_path).convert("RGB")
    prompt = f"detect {'; '.join(classes_to_detect)}"
    # IMPORTANT: Ensure the <image> token is also prepended for inference
    inputs = processor(text="<image>" + prompt, images=image, return_tensors="pt").to(device)
    with torch.no_grad(): output_ids = model.generate(**inputs, max_new_tokens=128, do_sample=False)
    generated_text = processor.decode(output_ids[0], skip_special_tokens=True)
    clean_generated_text = generated_text.replace(prompt, '').strip() # Remove the prompt *without* <image> for clean parsing
    clean_generated_text = clean_generated_text.replace("<image>", "").strip() # Remove any remaining <image>

    print(f"\n--- Processing Image: {os.path.basename(image_path)} ---")
    print(f"Input Prompt: '{prompt}'")
    print(f"Cleaned Detection String: '{clean_generated_text}'")
    detections = sv.Detections.empty()
    try:
        # Corrected for supervision >= 0.10.0
        detections = sv.Detections.from_lmm(lmm='paligemma', result=clean_generated_text, resolution_wh=image.size, classes=classes_to_detect)
    except Exception as e:
        print(f"WARNING: Error parsing detections for {image_path} with supervision: {e}. Detections will be empty.");
    object_counts = {}
    if detections.class_id is not None:
        for class_id in detections.class_id:
            if class_id < len(classes_to_detect): object_counts[classes_to_detect[class_id]] = object_counts.get(classes_to_detect[class_id], 0) + 1
            else: print(f"WARNING: Detected class_id {class_id} out of bounds. Skipping count.");
    print("\nDetected Objects and Counts:")
    if not object_counts: print("No objects detected.");
    else:
        for obj, count in object_counts.items(): print(f"- {obj}: {count}");

    # Corrected for supervision >= 0.10.0 - Use BoxAnnotator and LabelAnnotator separately
    box_annotator = sv.BoxAnnotator()
    label_annotator = sv.LabelAnnotator() # New annotator for labels

    # First, annotate the boxes
    annotated_frame_np = box_annotator.annotate(scene=image.copy(), detections=detections)

    #annotate the labels on the already-boxed frame
    labels = [f"{classes_to_detect[class_id]}" for class_id in detections.class_id if class_id < len(classes_to_detect)]
    annotated_frame_np = label_annotator.annotate(scene=annotated_frame_np, detections=detections, labels=labels)

    annotated_frame_pil = Image.fromarray(annotated_frame_np)
    output_image_filename = f"annotated_{os.path.basename(image_path)}"
    output_image_path = os.path.join("output_images", output_image_filename)
    annotated_frame_pil.save(output_image_path)
    print(f"Annotated image saved to: {output_image_path}")
    return object_counts, output_image_path

print("\n--- Starting Inference Examples ---")
import random
inference_image_paths = []
num_inference_examples = 5

if test_dataset and len(test_dataset) > 0:
    print(f"Selecting {min(num_inference_examples, len(test_dataset))} random images from TEST dataset.")
    sample_entries = random.sample(test_dataset.entries, min(num_inference_examples, len(test_dataset)))
    inference_image_paths = [os.path.join(test_dataset.image_directory_root, entry['image']) for entry in sample_entries]
elif train_dataset and len(train_dataset) > 0:
    print(f"WARNING: Test dataset not available/empty. Selecting {min(num_inference_examples, len(train_dataset))} random images from TRAIN dataset.")
    sample_entries = random.sample(train_dataset.entries, min(num_inference_examples, len(train_dataset)))
    inference_image_paths = [os.path.join(train_dataset.image_directory_root, entry['image']) for entry in sample_entries]
else:
    print("ERROR: No training or test data found. Cannot run inference examples.")

for i, img_path in enumerate(inference_image_paths):
    counts, _ = predict_and_count(img_path, ALL_CLASSES, model, processor, DEVICE)
print("Inference and Object Counting Complete.")

