# Qwen 2.5-2B VL Training on HAM10000 Dataset

**Streamlined training pipeline for Kaggle environment**


In [None]:
# Install dependencies
%pip install transformers accelerate peft tqdm pillow datasets pandas bitsandbytes --q

In [None]:
# Import libraries and configuration
import torch
from torch.utils.data import DataLoader
from transformers import (
    Qwen2VLForConditionalGeneration,
    Qwen2VLProcessor,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer
)
from peft import LoraConfig, get_peft_model, TaskType
from datasets import Dataset as HFDataset
import pandas as pd
from PIL import Image
import os
from pathlib import Path

CONFIG = {
    "model_name": "Qwen/Qwen2-VL-2B-Instruct",
    "train_image_dir": "/kaggle/input/small-isic",
    "spatial_dataset_file": "./ham10000_with_spatial_data.json",
    "output_dir": "./qwen2_5_vl_trained",
    "num_train_epochs": 2,
    "learning_rate": 5e-5,
    "per_device_train_batch_size": 1,
    "gradient_accumulation_steps": 1,
    "max_length": 512,
    "lora_r": 32,
    "lora_alpha": 64,
    "lora_dropout": 0.05,
    
    # Spatial awareness settings
    "include_spatial_descriptions": True,
    "spatial_description_ratio": 0.2,
    
    # Additional stability settings
    "warmup_steps": 100,
    "weight_decay": 0.01,
    "max_grad_norm": 1.0,
    "seed": 42
}

print("Configuration loaded ✓")


In [None]:
# Load model with quantization
# bnb_config = BitsAndBytesConfig(
#     load_in_4bit=True,
#     bnb_4bit_use_double_quant=True,
#     bnb_4bit_quant_type="nf4",
#     bnb_4bit_compute_dtype=torch.bfloat16
# )

model = Qwen2VLForConditionalGeneration.from_pretrained(
    CONFIG["model_name"],
    device_map="auto",
    torch_dtype=torch.bfloat16,
    # quantization_config=bnb_config,
    trust_remote_code=True
)

processor = Qwen2VLProcessor.from_pretrained(
    CONFIG["model_name"],
    trust_remote_code=True
)

if processor.tokenizer.pad_token is None:
    processor.tokenizer.pad_token = processor.tokenizer.eos_token
    processor.tokenizer.pad_token_id = processor.tokenizer.eos_token_id

# Apply QLoRA with gradient fix
# peft_config = LoraConfig(
#     lora_alpha=CONFIG["lora_alpha"],
#     lora_dropout=CONFIG["lora_dropout"],
#     r=CONFIG["lora_r"],
#     bias="none",
#     target_modules=["q_proj", "v_proj", "k_proj", "o_proj"],
#     task_type=TaskType.CAUSAL_LM,
# )

# model = get_peft_model(model, peft_config)
# model.print_trainable_parameters()

# Fix gradient computation
# for name, param in model.named_parameters():
#     if 'lora' in name:
#         param.requires_grad = True

print("Model loaded ✓")

In [None]:
# Load spatial dataset
def load_spatial_dataset(limit=None):
    import json
    import random
    
    # Load annotated data with spatial information
    with open(CONFIG["spatial_dataset_file"], 'r') as f:
        spatial_data = json.load(f)
    
    image_dir = Path(CONFIG["train_image_dir"])
    conversations = []
    
    for item in spatial_data:
        image_id = item['image_id']
        
        # Find image file
        image_path = None
        for ext in [".jpg", ".jpeg", ".png"]:
            img_path = image_dir / f"{image_id}{ext}"
            if img_path.exists():
                image_path = str(img_path)
                break
        
        if not image_path:
            continue
            
        # Get diagnosis info
        diagnosis = item['dx']
        dx_names = {
            'akiec': 'actinic keratosis',
            'bcc': 'basal cell carcinoma', 
            'bkl': 'benign keratosis-like lesion',
            'df': 'dermatofibroma',
            'mel': 'melanoma',
            'nv': 'melanocytic nevus',
            'vasc': 'vascular lesion'
        }
        diagnosis_full = dx_names.get(diagnosis, diagnosis)
        
        # Create conversation with optional spatial awareness
        use_spatial = (CONFIG["include_spatial_descriptions"] and 
                      item.get('mask_available', False) and 
                      random.random() < CONFIG["spatial_description_ratio"])
        
        if use_spatial and item.get('spatial_description'):
            user_prompt = "Analyze this skin lesion, provide a diagnosis, and describe its location."
            spatial_desc = item['spatial_description'].replace('lesion located in', 'The lesion is located in the')
            assistant_response = f"This appears to be {diagnosis_full}. {spatial_desc}."
        else:
            user_prompt = "Analyze this skin lesion and provide a diagnosis."
            assistant_response = f"This appears to be {diagnosis_full}."
        
        conversation = {
            "image_path": image_path,
            "conversation": [
                {
                    "role": "user",
                    "content": [
                        {"type": "image"},
                        {"type": "text", "text": user_prompt}
                    ]
                },
                {
                    "role": "assistant", 
                    "content": [
                        {"type": "text", "text": assistant_response}
                    ]
                }
            ],
            "metadata": {
                "lesion_id": item.get('lesion_id'),
                "diagnosis": diagnosis,
                "has_spatial": use_spatial,
                "bbox": item.get('bbox'),
                "area_coverage": item.get('area_coverage'),
                "mask_available": item.get('mask_available', False)
            }
        }
        conversations.append(conversation)
        
        if limit and len(conversations) >= limit:
            break
    
    return conversations

dataset = load_spatial_dataset(limit=1000)
spatial_count = sum(1 for c in dataset if c['metadata']['has_spatial'])
print(f"Dataset loaded ✓ ({len(dataset)} samples, {spatial_count} with spatial descriptions)")


In [None]:
# Enhanced data collator for spatial-aware dataset
def collate_fn(examples):
    texts = []
    images = []
    
    for example in examples:
        # Handle both local files and HF dataset images
        if example["image_path"] is None:
            # HF dataset: image is already loaded in conversation
            for msg in example["conversation"]:
                if msg["role"] == "user":
                    for content in msg["content"]:
                        if content["type"] == "image":
                            image = content["image"].convert('RGB')
                            break
        else:
            # Local dataset: load from file path
            image = Image.open(example["image_path"]).convert('RGB')
        
        images.append(image)
        
        text = processor.apply_chat_template(
            example["conversation"], 
            tokenize=False, 
            add_generation_prompt=False
        )
        texts.append(text)
    
    batch = processor(
        text=texts,
        images=images, 
        return_tensors="pt",
        padding=True,
        truncation=True,
        max_length=CONFIG["max_length"]
    )
    
    labels = batch["input_ids"].clone()
    labels[labels == processor.tokenizer.pad_token_id] = -100
    
    # Mask image tokens in labels to prevent learning image embeddings as text
    if hasattr(processor, 'image_token_id'):
        labels[labels == processor.image_token_id] = -100
    
    batch["labels"] = labels
    return batch


In [None]:
# Prepare datasets with proper train/test split
import random
random.seed(CONFIG["seed"])

# Shuffle dataset to avoid bias
shuffled_dataset = dataset.copy()
random.shuffle(shuffled_dataset)

# Use 70/30 split for better evaluation
train_size = int(0.8 * len(shuffled_dataset))
train_data = shuffled_dataset[:train_size]
eval_data = shuffled_dataset[train_size:]

train_dataset = HFDataset.from_list(train_data)
eval_dataset = HFDataset.from_list(eval_data)

print(f"Training: {len(train_data)} | Evaluation: {len(eval_data)}")

In [None]:
# Setup training
training_args = TrainingArguments(
    output_dir=CONFIG["output_dir"],
    per_device_train_batch_size=CONFIG["per_device_train_batch_size"],
    gradient_accumulation_steps=CONFIG["gradient_accumulation_steps"],
    num_train_epochs=CONFIG["num_train_epochs"],
    learning_rate=CONFIG["learning_rate"],
    
    # Learning rate scheduling and regularization
    warmup_steps=CONFIG["warmup_steps"],
    lr_scheduler_type="cosine",
    weight_decay=CONFIG["weight_decay"],
    max_grad_norm=CONFIG["max_grad_norm"],
    
    # Logging and saving
    logging_steps=50,
    save_steps=1000,
    eval_steps=1000,
    # evaluation_strategy="steps",
    save_strategy="steps",
    save_total_limit=1,
    
    # Memory and performance optimizations
    remove_unused_columns=False,
    bf16=True,
    # gradient_checkpointing=True,
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=collate_fn,
    tokenizer=processor.tokenizer,
)

In [None]:
# Start training
print("Starting training...")
trainer.train()
print("Training complete ✓")

# Save model and processor
trainer.save_model(CONFIG["output_dir"])
processor.save_pretrained(CONFIG["output_dir"])
print("Model saved")

In [None]:
# Test the trained model inference

def test_model_inference(model, processor, test_samples, num_tests=50):
    """Test model inference on sample images"""
    
    results = []
    
    for i, sample in enumerate(test_samples[:num_tests]):
        print(f"\n--- Test {i+1}/{num_tests} ---")
        
        try:
            # Load test image
            image_path = sample["image_path"]
            test_image = Image.open(image_path).convert('RGB')
            print(f"Image: {os.path.basename(image_path)}")
            
            # Create conversation for inference (test both diagnosis and spatial)
            has_spatial_data = sample["metadata"]["has_spatial"]
            if has_spatial_data:
                user_prompt = "Analyze this skin lesion, provide a diagnosis, and describe its location."
            else:
                user_prompt = "Analyze this skin lesion and provide a diagnosis."
                
            conversation = [
                {
                    "role": "user",
                    "content": [
                        {"type": "image"},
                        {"type": "text", "text": user_prompt}
                    ]
                }
            ]
            
            # Process input
            text_prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
            inputs = processor(
                text=[text_prompt], 
                images=[test_image], 
                return_tensors="pt"
            ).to(model.device)
            
            # Generate response
            with torch.no_grad():
                generated_ids = model.generate(
                    **inputs,
                    max_new_tokens=150,
                    temperature=0.7,
                    do_sample=True,
                    pad_token_id=processor.tokenizer.eos_token_id
                )
            
            # Decode response
            response = processor.batch_decode(
                generated_ids[:, inputs["input_ids"].shape[1]:], 
                skip_special_tokens=True
            )[0].strip()
            
            # Get ground truth from dataset
            ground_truth = sample["conversation"][1]["content"][0]["text"]
            
            # Simple and minimal evaluation
            bbox_gt = sample["metadata"].get("bbox")
            
            # Debug: Print what's in metadata
            if i == 0:  # Only for first test
                print(f"DEBUG - Metadata keys: {list(sample['metadata'].keys())}")
                print(f"DEBUG - Has bbox: {bbox_gt}")
                print(f"DEBUG - Has spatial: {sample['metadata'].get('has_spatial')}")
            
            # Extract diagnosis keywords for comparison
            diagnosis_keywords = {
                'actinic keratosis': 'akiec',
                'basal cell carcinoma': 'bcc', 
                'benign keratosis-like lesion': 'bkl',
                'dermatofibroma': 'df',
                'melanoma': 'mel',
                'melanocytic nevus': 'nv',
                'vascular lesion': 'vasc'
            }
            
            # Find diagnosis in ground truth and prediction
            gt_dx = None
            pred_dx = None
            
            for full_name, short_code in diagnosis_keywords.items():
                if full_name in ground_truth.lower():
                    gt_dx = short_code
                if full_name in response.lower():
                    pred_dx = short_code
                    
            diagnosis_correct = gt_dx == pred_dx and gt_dx is not None
            
            # Check spatial if available
            spatial_correct = True  # Default
            gt_spatial = ""
            pred_spatial = ""
            
            if has_spatial_data:
                if "located in" in ground_truth.lower():
                    gt_spatial = ground_truth.split("located in")[-1].strip().rstrip(".")
                if "located in" in response.lower():
                    pred_spatial = response.split("located in")[-1].strip().rstrip(".")
                
                if gt_spatial and pred_spatial:
                    spatial_correct = gt_spatial.lower() == pred_spatial.lower()
                elif gt_spatial and not pred_spatial:
                    spatial_correct = False  # Should have predicted spatial
                elif not gt_spatial and pred_spatial:
                    spatial_correct = False  # Shouldn't have predicted spatial
            
            is_correct = diagnosis_correct and spatial_correct
            
            print(f"Ground Truth: {ground_truth}")
            print(f"Model Output: {response}")
            if has_spatial_data and (gt_spatial or pred_spatial):
                print(f"Spatial GT: '{gt_spatial}' | Pred: '{pred_spatial}'")
                if bbox_gt and len(bbox_gt) == 4:
                    print(f"Bbox: [{bbox_gt[0]:.0f}, {bbox_gt[1]:.0f}, {bbox_gt[2]:.0f}, {bbox_gt[3]:.0f}]")
                    area_cov = sample["metadata"].get("area_coverage")
                    if area_cov:
                        print(f"Area Coverage: {area_cov:.2%}")
                elif sample["metadata"].get("mask_available"):
                    print(f"Bbox: Available but not loaded properly")
                else:
                    print(f"Bbox: Not available for this sample")
            print(f"Diagnosis: {'✓' if diagnosis_correct else '❌'} | Spatial: {'✓' if spatial_correct else '❌' if has_spatial_data else 'N/A'}")
            print(f"Overall: {'✓ CORRECT' if is_correct else '❌ INCORRECT'}")
            
            results.append({
                "image": os.path.basename(image_path),
                "ground_truth": ground_truth,
                "prediction": response,
                "diagnosis_correct": diagnosis_correct,
                "spatial_correct": spatial_correct,
                "has_spatial": has_spatial_data,
                "bbox": bbox_gt,
                "success": is_correct
            })
            
        except Exception as e:
            print(f"❌ Error processing test {i+1}: {e}")
            results.append({
                "image": os.path.basename(image_path) if 'image_path' in locals() else f"test_{i+1}",
                "error": str(e),
                "success": False
            })
    
    return results

# Loading model and processor and Run inference tests
print("Running inference tests on trained model...")
model = Qwen2VLForConditionalGeneration.from_pretrained(CONFIG["output_dir"],
                             device_map="auto", torch_dtype=torch.bfloat16,
                             trust_remote_code=True)
processor = Qwen2VLProcessor.from_pretrained(CONFIG["output_dir"],
                                             trust_remote_code=True)

# Load test dataset directly from image folder
test_dataset = load_spatial_dataset()
test_results = test_model_inference(model, processor, test_dataset, num_tests=50)

# Enhanced summary with diagnosis and spatial accuracy
successful_tests = sum(1 for r in test_results if r["success"])
diagnosis_correct = sum(1 for r in test_results if r["diagnosis_correct"])
spatial_tests = [r for r in test_results if r["has_spatial"]]
spatial_correct = sum(1 for r in spatial_tests if r["spatial_correct"])

total_tests = len(test_results)
overall_accuracy = (successful_tests / total_tests) * 100 if total_tests > 0 else 0
diagnosis_accuracy = (diagnosis_correct / total_tests) * 100 if total_tests > 0 else 0
spatial_accuracy = (spatial_correct / len(spatial_tests)) * 100 if spatial_tests else 0

print(f"\n{'='*60}")
print(f"ENHANCED TEST SUMMARY")
print(f"{'='*60}")
print(f"Overall Accuracy:    {successful_tests}/{total_tests} ({overall_accuracy:.1f}%)")
print(f"Diagnosis Accuracy:  {diagnosis_correct}/{total_tests} ({diagnosis_accuracy:.1f}%)")
print(f"Spatial Accuracy:    {spatial_correct}/{len(spatial_tests)} ({spatial_accuracy:.1f}%) [{len(spatial_tests)} spatial samples]")

# Breakdown of failures
diagnosis_fails = [r for r in test_results if not r["diagnosis_correct"]]
spatial_fails = [r for r in spatial_tests if not r["spatial_correct"]]

print(f"\nFailure Breakdown:")
print(f"• Diagnosis errors: {len(diagnosis_fails)}")
print(f"• Spatial errors: {len(spatial_fails)}")

# Show diagnosis failures
if diagnosis_fails:
    print(f"\nDiagnosis Failures ({len(diagnosis_fails)} total):")
    for i, fail in enumerate(diagnosis_fails[:3]):
        gt_diag = fail['ground_truth'].split('.')[0].strip()
        pred_diag = fail['prediction'].split('.')[0].strip()
        print(f"  {fail['image']}: {gt_diag} → {pred_diag}")
    if len(diagnosis_fails) > 3:
        print(f"  ... and {len(diagnosis_fails) - 3} more")

# Show spatial failures  
if spatial_fails:
    print(f"\nSpatial Failures ({len(spatial_fails)} total):")
    for i, fail in enumerate(spatial_fails[:3]):
        print(f"  {fail['image']}: GT vs Pred spatial mismatch")
    if len(spatial_fails) > 3:
        print(f"  ... and {len(spatial_fails) - 3} more")

print(f"{'='*60}")
 