In [28]:
!pip install rouge nltk
!pip install -q transformers==4.35.2

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m123.5/123.5 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.9/7.9 MB[0m [31m56.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m566.1/566.1 kB[0m [31m34.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.6/3.6 MB[0m [31m65.5 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
datasets 4.1.1 requires pyarrow>=21.0.0, but you have pyarrow 19.0.1 which is incompatible.
sentence-transformers 4.1.0 requires transformers<5.0.0,>=4.41.0, but you have transformers 4.35.2 which is incompatible.
gradio 5.38.1 requires pydantic<2.12,>=2.0, but you have pydantic 2.12.0a1 which is inc

## Imports and Device Setup

In [26]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image
import xml.etree.ElementTree as ET
from transformers import RobertaTokenizer, RobertaModel, RobertaConfig
from sklearn.model_selection import train_test_split
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge import Rouge
import warnings

import sys

# Patch transformers BEFORE importing
import importlib.util
spec = importlib.util.find_spec("transformers")
if spec:
    import transformers.utils.hub
    transformers.utils.hub.list_repo_templates = lambda *args, **kwargs: []
    print("✅ Transformers patched successfully")
warnings.filterwarnings('ignore')

# Set device and memory optimization for Kaggle
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

✅ Transformers patched successfully
Using device: cuda


### Kaggle Dataset paths

In [4]:
KAGGLE_INPUT_PATH = '/kaggle/input/chest-xrays-indiana-university'
IMAGES_PATH = os.path.join(KAGGLE_INPUT_PATH, 'images', 'images_normalized')
REPORTS_PATH = os.path.join(KAGGLE_INPUT_PATH, 'indiana_reports.csv')
PROJECTIONS_PATH = os.path.join(KAGGLE_INPUT_PATH, 'indiana_projections.csv')

print(KAGGLE_INPUT_PATH) 
print(IMAGES_PATH)
print(REPORTS_PATH)
print(PROJECTIONS_PATH)

/kaggle/input/chest-xrays-indiana-university
/kaggle/input/chest-xrays-indiana-university/images/images_normalized
/kaggle/input/chest-xrays-indiana-university/indiana_reports.csv
/kaggle/input/chest-xrays-indiana-university/indiana_projections.csv


## Hyperparameters

In [5]:
BATCH_SIZE = 16
EMBED_SIZE = 512  # Reduced from 768
HIDDEN_SIZE = 512
MAX_LENGTH = 128  # Reduced from 256
NUM_EPOCHS = 10
LEARNING_RATE = 1e-4
USE_LORA = True  # Use LoRA for memory efficiency

## LoRA Implementation

In [6]:
class LoRALayer(nn.Module):
    """Low-Rank Adaptation layer for efficient fine-tuning"""
    
    def __init__(self, in_features, out_features, rank=16, alpha=32):
        super().__init__()
        self.rank = rank
        self.alpha = alpha
        self.scaling = alpha / rank
        
        # LoRA parameters
        self.lora_A = nn.Parameter(torch.randn(in_features, rank) * 0.01)
        self.lora_B = nn.Parameter(torch.zeros(rank, out_features))
        
        # Freeze original weights
        self.weight = nn.Parameter(torch.randn(out_features, in_features), requires_grad=False)
        
    def forward(self, x):
        # Original transformation
        result = torch.matmul(x, self.weight.t())
        
        # Add LoRA adaptation
        lora_output = torch.matmul(torch.matmul(x, self.lora_A), self.lora_B)
        result += lora_output * self.scaling
        
        return result

## Dataset Setup

In [7]:
class XRayDataset(Dataset):
    """Custom dataset for X-ray images and reports"""
    
    def __init__(self, image_paths, reports, tokenizer, max_length=256, transform=None):
        self.image_paths = image_paths
        self.reports = reports
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.transform = transform
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        # Load and process image
        image_path = self.image_paths[idx]
        try:
            image = Image.open(image_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
        except:
            # Return a black image if loading fails
            image = torch.zeros(3, 224, 224)
        
        # Process text
        report = str(self.reports[idx])
        encoded = self.tokenizer(
            report,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )
        
        return {
            'image': image,
            'input_ids': encoded['input_ids'].squeeze(),
            'attention_mask': encoded['attention_mask'].squeeze(),
            'report': report
        }

## Feature Extraction ResNet(CNN)

In [8]:
class CNNEncoder(nn.Module):
    """CNN Encoder using ResNet for feature extraction"""
    
    def __init__(self, embed_size=768, use_lora=False):
        super(CNNEncoder, self).__init__()
        # Use ResNet18 for memory efficiency on Kaggle
        resnet = models.resnet18(pretrained=True)
        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        
        if use_lora:
            self.linear = LoRALayer(resnet.fc.in_features, embed_size)
        else:
            self.linear = nn.Linear(resnet.fc.in_features, embed_size)
            
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)
        
    def forward(self, images):
        with torch.no_grad():
            features = self.resnet(images)
        features = features.reshape(features.size(0), -1)
        features = self.linear(features)
        features = self.bn(features)
        return features

## Adding RoBERTa

In [9]:
class XRayReportGenerator(nn.Module):
    """Main model combining CNN and RoBERTa with optional LoRA"""
    
    def __init__(self, vocab_size, embed_size=512, hidden_size=512, use_lora=False):
        super(XRayReportGenerator, self).__init__()
        
        self.use_lora = use_lora
        
        # Image encoder
        self.encoder = CNNEncoder(embed_size, use_lora=use_lora)
        
        # Text decoder using RoBERTa
        config = RobertaConfig(
            vocab_size=vocab_size,
            hidden_size=hidden_size,
            num_hidden_layers=3,  # Reduced for memory
            num_attention_heads=8,
            intermediate_size=1024,
            max_position_embeddings=128
        )
        self.roberta = RobertaModel(config)
        
        # Projection layers with optional LoRA
        if use_lora:
            self.image_projection = LoRALayer(embed_size, hidden_size)
            self.output_projection = LoRALayer(hidden_size, vocab_size)
        else:
            self.image_projection = nn.Linear(embed_size, hidden_size)
            self.output_projection = nn.Linear(hidden_size, vocab_size)
            
        self.dropout = nn.Dropout(0.1)
        
        # Freeze RoBERTa layers if using LoRA
        if use_lora:
            for param in self.roberta.parameters():
                param.requires_grad = False
        
    def forward(self, images, input_ids=None, attention_mask=None):
        # Encode images
        image_features = self.encoder(images)
        image_features = self.image_projection(image_features)
        image_features = image_features.unsqueeze(1)  # Add sequence dimension
        
        if input_ids is not None:
            # Training mode: use teacher forcing
            text_embeddings = self.roberta.embeddings(input_ids)
            
            # Concatenate image features with text embeddings
            combined_embeddings = torch.cat([image_features, text_embeddings], dim=1)
            
            # Create attention mask for combined sequence
            batch_size = images.size(0)
            image_mask = torch.ones(batch_size, 1).to(images.device)
            if attention_mask is not None:
                combined_mask = torch.cat([image_mask, attention_mask], dim=1)
            else:
                combined_mask = image_mask
            
            # Pass through RoBERTa
            outputs = self.roberta(
                inputs_embeds=combined_embeddings,
                attention_mask=combined_mask
            )
            
            # Project to vocabulary
            hidden_states = outputs.last_hidden_state
            hidden_states = self.dropout(hidden_states)
            logits = self.output_projection(hidden_states)
            
            return logits[:, 1:, :]  # Remove image feature from output
        else:
            # Inference mode
            return image_features

    def generate_report(self, image, tokenizer, max_length=100, temperature=0.7):
        """Generate report from a single image"""
        self.eval()
        with torch.no_grad():
            if image.dim() == 3:
                image = image.unsqueeze(0)
            image = image.to(device)
            
            # Start with BOS token
            generated = torch.tensor([[tokenizer.bos_token_id]]).to(device)
            
            for _ in range(max_length):
                attention_mask = torch.ones_like(generated)
                
                logits = self.forward(image, generated, attention_mask)
                next_token_logits = logits[0, -1, :] / temperature
                
                # Greedy decoding
                next_token = torch.argmax(next_token_logits).unsqueeze(0).unsqueeze(0)
                
                generated = torch.cat([generated, next_token], dim=1)
                
                if next_token.item() == tokenizer.eos_token_id:
                    break
            
            report = tokenizer.decode(generated[0], skip_special_tokens=True)
            return report

## Data preparation

In [10]:
def prepare_data():
    print("Loading data...")
    reports_df = pd.read_csv(REPORTS_PATH)
    projections_df = pd.read_csv(PROJECTIONS_PATH)
    
    # Merge and clean
    merged_df = projections_df.merge(reports_df, on='uid', how='inner')
    merged_df = merged_df.dropna(subset=['findings', 'filename'])
    
    # Filter for frontal views only
    merged_df = merged_df[merged_df['projection'].isin(['Frontal', 'AP', 'PA'])]
    
    # Create full image paths
    merged_df['image_path'] = merged_df['filename'].apply(
        lambda x: os.path.join(IMAGES_PATH, x)
    )
    
    # Keep only existing images
    merged_df = merged_df[merged_df['image_path'].apply(os.path.exists)]
    
    print(f"Total samples found: {len(merged_df)}")
    
    # Limit dataset for faster training on Kaggle
    merged_df = merged_df.head(1000)
    print(f"Using {len(merged_df)} samples for training")
    
    return merged_df

## Training function

In [11]:
def train_epoch(model, dataloader, optimizer, criterion, tokenizer, epoch):
    model.train()
    total_loss = 0
    
    pbar = tqdm(dataloader, desc=f"Epoch {epoch}")
    for batch_idx, batch in enumerate(pbar):
        images = batch['image'].to(device)
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        
        optimizer.zero_grad()
        
        # Forward pass
        logits = model(images, input_ids, attention_mask)
        
        # Shift for next token prediction
        logits = logits[:, :-1, :].contiguous()
        targets = input_ids[:, 1:].contiguous()
        
        # Calculate loss
        loss = criterion(logits.view(-1, logits.size(-1)), targets.view(-1))
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        
        total_loss += loss.item()
        pbar.set_postfix({'loss': f'{loss.item():.4f}'})
        
        # Clear cache periodically
        if batch_idx % 10 == 0:
            torch.cuda.empty_cache()
    
    return total_loss / len(dataloader)

## Evaluation function

In [12]:
def evaluate_model(model, dataloader, tokenizer, num_samples=20):
    """Evaluate on limited samples for speed"""
    model.eval()
    bleu_scores = []
    rouge_scores = {'rouge-1': [], 'rouge-2': [], 'rouge-l': []}
    rouge_evaluator = Rouge()
    smoothing = SmoothingFunction().method1
    
    sample_count = 0
    
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            images = batch['image'].to(device)
            reference_reports = batch['report']
            
            for i, image in enumerate(images):
                if sample_count >= num_samples:
                    break
                    
                generated_report = model.generate_report(image, tokenizer, max_length=50)
                reference = reference_reports[i]
                
                # BLEU score
                reference_tokens = reference.lower().split()
                generated_tokens = generated_report.lower().split()
                
                if len(generated_tokens) > 0:
                    bleu = sentence_bleu([reference_tokens], generated_tokens, 
                                        smoothing_function=smoothing)
                    bleu_scores.append(bleu)
                
                # ROUGE scores
                try:
                    if generated_report.strip() and reference.strip():
                        rouge_result = rouge_evaluator.get_scores(generated_report, reference)[0]
                        rouge_scores['rouge-1'].append(rouge_result['rouge-1']['f'])
                        rouge_scores['rouge-2'].append(rouge_result['rouge-2']['f'])
                        rouge_scores['rouge-l'].append(rouge_result['rouge-l']['f'])
                except:
                    pass
                
                sample_count += 1
            
            if sample_count >= num_samples:
                break
    
    avg_bleu = np.mean(bleu_scores) if bleu_scores else 0
    avg_rouge1 = np.mean(rouge_scores['rouge-1']) if rouge_scores['rouge-1'] else 0
    avg_rouge2 = np.mean(rouge_scores['rouge-2']) if rouge_scores['rouge-2'] else 0
    avg_rougel = np.mean(rouge_scores['rouge-l']) if rouge_scores['rouge-l'] else 0
    
    return {
        'BLEU': avg_bleu,
        'ROUGE-1': avg_rouge1,
        'ROUGE-2': avg_rouge2,
        'ROUGE-L': avg_rougel
    }

## Pipeline

In [21]:
def main():
    print("="*60)
    print("X-Ray Report Generator - Training Pipeline")
    print("="*60)
    
    # Prepare data
    df = prepare_data()
    
    # Split data
    train_df, val_df = train_test_split(df, test_size=0.15, random_state=42)
    
    print(f"\n📊 Dataset Split:")
    print(f"   Train samples: {len(train_df)}")
    print(f"   Val samples:   {len(val_df)}")
    
    # Initialize tokenizer - WORKAROUND for chat template bug
    print("\n🔧 Loading tokenizer...")
    from transformers import AutoTokenizer, RobertaTokenizerFast
    import transformers
    
    # Monkey patch to bypass the chat template bug
    original_list_repo_templates = None
    try:
        from transformers.utils.hub import list_repo_templates
        original_list_repo_templates = list_repo_templates
        # Replace with a function that returns empty list
        transformers.utils.hub.list_repo_templates = lambda *args, **kwargs: []
    except:
        pass
    
    tokenizer = None
    
    # Method 1: Try FacebookAI/roberta-base with patch
    try:
        tokenizer = RobertaTokenizerFast.from_pretrained('FacebookAI/roberta-base')
        print("   ✅ Loaded FacebookAI/roberta-base")
    except Exception as e1:
        print(f"   Method 1 failed: {str(e1)[:80]}...")
    
    # Method 2: Try roberta-base with patch
    if tokenizer is None:
        try:
            tokenizer = AutoTokenizer.from_pretrained('roberta-base', use_fast=True)
            print("   ✅ Loaded roberta-base")
        except Exception as e2:
            print(f"   Method 2 failed: {str(e2)[:80]}...")
    
    # Method 3: Try distilroberta-base with patch
    if tokenizer is None:
        try:
            tokenizer = AutoTokenizer.from_pretrained('distilroberta-base', use_fast=True)
            print("   ✅ Loaded distilroberta-base")
        except Exception as e3:
            print(f"   Method 3 failed: {str(e3)[:80]}...")
    
    # Method 4: Load from local cache if available
    if tokenizer is None:
        try:
            tokenizer = AutoTokenizer.from_pretrained('roberta-base', local_files_only=True)
            print("   ✅ Loaded from local cache")
        except Exception as e4:
            print(f"   Method 4 failed: {str(e4)[:80]}...")
    
    # Restore original function
    if original_list_repo_templates is not None:
        transformers.utils.hub.list_repo_templates = original_list_repo_templates
    
    if tokenizer is None:
        raise Exception("All tokenizer loading methods failed! Try upgrading transformers: pip install --upgrade transformers")
    
    print(f"   Vocab size: {tokenizer.vocab_size}")
    
    # Data transforms
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                           std=[0.229, 0.224, 0.225])
    ])
    
    # Create datasets
    print("\n📦 Creating datasets...")
    train_dataset = XRayDataset(
        train_df['image_path'].tolist(),
        train_df['findings'].tolist(),
        tokenizer,
        max_length=MAX_LENGTH,
        transform=transform
    )
    
    val_dataset = XRayDataset(
        val_df['image_path'].tolist(),
        val_df['findings'].tolist(),
        tokenizer,
        max_length=MAX_LENGTH,
        transform=transform
    )
    
    # Create dataloaders
    train_loader = DataLoader(
        train_dataset, 
        batch_size=BATCH_SIZE, 
        shuffle=True, 
        num_workers=2,
        pin_memory=True
    )
    val_loader = DataLoader(
        val_dataset, 
        batch_size=4, 
        shuffle=False, 
        num_workers=2,
        pin_memory=True
    )
    
    # Initialize model
    print("\n🏗️  Building model...")
    model = XRayReportGenerator(
        vocab_size=tokenizer.vocab_size,
        embed_size=EMBED_SIZE,
        hidden_size=HIDDEN_SIZE,
        use_lora=USE_LORA
    ).to(device)
    
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    total_params = sum(p.numel() for p in model.parameters())
    
    print(f"   Total parameters:     {total_params:,}")
    print(f"   Trainable parameters: {trainable_params:,}")
    print(f"   Percentage trainable: {100 * trainable_params / total_params:.2f}%")
    
    # Training setup
    criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
    optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=0.01)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS)
    
    # Training loop
    print("\n" + "="*60)
    print("Starting Training")
    print("="*60)
    
    best_bleu = 0
    best_epoch = 0
    
    for epoch in range(1, NUM_EPOCHS + 1):
        print(f"\n📈 Epoch {epoch}/{NUM_EPOCHS}")
        print("-" * 60)
        
        # Train
        train_loss = train_epoch(model, train_loader, optimizer, criterion, tokenizer, epoch)
        print(f"   Average Train Loss: {train_loss:.4f}")
        
        # Evaluate
        print("\n   Evaluating model...")
        metrics = evaluate_model(model, val_loader, tokenizer, num_samples=20)
        
        print(f"\n   📊 Validation Metrics:")
        print(f"      BLEU Score:  {metrics['BLEU']:.4f}")
        print(f"      ROUGE-1:     {metrics['ROUGE-1']:.4f}")
        print(f"      ROUGE-2:     {metrics['ROUGE-2']:.4f}")
        print(f"      ROUGE-L:     {metrics['ROUGE-L']:.4f}")
        
        # Save best model
        if metrics['BLEU'] > best_bleu:
            best_bleu = metrics['BLEU']
            best_epoch = epoch
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'bleu': best_bleu,
                'metrics': metrics
            }, 'best_xray_model.pth')
            print(f"      ✅ Best model saved! (BLEU: {best_bleu:.4f})")
        
        scheduler.step()
        
        # Memory cleanup
        gc.collect()
        torch.cuda.empty_cache()
    
    print("\n" + "="*60)
    print("✨ Training Complete!")
    print("="*60)
    print(f"Best BLEU Score: {best_bleu:.4f} (Epoch {best_epoch})")
    
    return model, tokenizer, val_df

## Inference

In [22]:
def generate_report_from_path(image_path, model, tokenizer, transform):
    """Generate report from image path"""
    model.eval()
    
    # Load and preprocess image
    image = Image.open(image_path).convert('RGB')
    image = transform(image)
    
    # Generate report
    report = model.generate_report(image, tokenizer, max_length=80)
    
    return report

## Demo Function

In [23]:
def demo_report_generation(model, tokenizer, val_df, transform, num_examples=3):
    """Show example predictions"""
    print("\n" + "="*60)
    print("🔍 Demo: Generating Reports for Sample Images")
    print("="*60)
    
    model.eval()
    
    for i in range(min(num_examples, len(val_df))):
        print(f"\n--- Example {i+1} ---")
        
        image_path = val_df.iloc[i]['image_path']
        actual_report = val_df.iloc[i]['findings']
        
        print(f"Image: {os.path.basename(image_path)}")
        print(f"\n📄 Actual Report:\n{actual_report[:200]}...")
        
        generated_report = generate_report_from_path(image_path, model, tokenizer, transform)
        print(f"\n🤖 Generated Report:\n{generated_report}")
        print("-" * 60)

## Run

In [29]:
if __name__ == "__main__":
    # Train the model
    model, tokenizer, val_df = main()
    
    # Create transform for inference
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                           std=[0.229, 0.224, 0.225])
    ])
    
    # Show demo predictions
    demo_report_generation(model, tokenizer, val_df, transform, num_examples=3)
    
    print("\n" + "="*60)
    print("✅ Pipeline Complete!")
    print("="*60)
    print("\n💡 To generate a report for a new image, use:")
    print("   report = generate_report_from_path(image_path, model, tokenizer, transform)")
    print("\n📁 Best model saved as: best_xray_model.pth")
    print("="*60)

X-Ray Report Generator - Training Pipeline
Loading data...
Total samples found: 3307
Using 1000 samples for training

📊 Dataset Split:
   Train samples: 850
   Val samples:   150

🔧 Loading tokenizer...
   Method 1 failed: 404 Client Error. (Request ID: Root=1-68fa95b9-4a7b2eeb391bb44d5efd722d;7de6670f...
   Method 2 failed: 404 Client Error. (Request ID: Root=1-68fa95b9-7de5e2181fa5ed2a2d214848;3afa9685...
   Method 3 failed: 404 Client Error. (Request ID: Root=1-68fa95ba-73ccd714553496ef1b6f0127;03ba54a7...
   Method 4 failed: expected str, bytes or os.PathLike object, not NoneType...


Exception: All tokenizer loading methods failed! Try upgrading transformers: pip install --upgrade transformers