In [None]:
import pandas as pd
import numpy as np
from sklearn.utils import resample
import matplotlib.pyplot as plt
import torch
from PIL import Image
from transformers import CLIPProcessor, CLIPModel, AutoTokenizer
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
from tqdm.auto import tqdm
import numpy as np

In [None]:
df = pd.read_csv('data/paintings_with_food_nlp.csv')
df['has_food'] = df.iloc[:, 2:].sum(axis=1) > 0
df = df[['item', 'image_path', 'has_food']]
df

In [None]:

# Assuming your dataframe is called df
print("Total samples:", len(df))
print("With food:", sum(df['has_food']))
print("Without food:", sum(~df['has_food']))
print("Ratio food/no-food:", sum(df['has_food'])/len(df))
  


In [None]:


def prepare_input(image_path, has_food, processor):
    """Process a single image and create corresponding text"""
    image = Image.open(image_path).convert('RGB')
    text = 'a painting containing food' if has_food else 'a painting not containing food'
    
    # Process image and text using CLIP processor
    inputs = processor(
        images=image,
        text=[text],
        return_tensors="pt",
        padding="max_length",
        max_length=77,
        truncation=True
    )
    
    # Add label
    inputs['labels'] = torch.tensor([float(has_food)])
    return inputs

def create_batch(samples):
    """Collate function to create batches"""
    batch = {
        'pixel_values': torch.stack([x['pixel_values'][0] for x in samples]),
        'input_ids': torch.stack([x['input_ids'][0] for x in samples]),
        'attention_mask': torch.stack([x['attention_mask'][0] for x in samples]),
        'labels': torch.stack([x['labels'] for x in samples])
    }
    return batch

def train_epoch(model, train_loader, optimizer, device):
    """Train for one epoch"""
    model.train()
    total_loss = 0
    progress_bar = tqdm(train_loader, desc="Training")
    
    for batch in progress_bar:
        # Move batch to device
        batch = {k: v.to(device) for k, v in batch.items()}
        
        # Forward pass
        outputs = model(
            input_ids=batch['input_ids'],
            attention_mask=batch['attention_mask'],
            pixel_values=batch['pixel_values']
        )
        
        # Get image and text features
        image_features = outputs.image_embeds
        text_features = outputs.text_embeds
        
        # Compute similarity scores
        similarity = torch.sum(image_features * text_features, dim=-1)
        
        # Compute loss
        loss = F.binary_cross_entropy_with_logits(similarity, batch['labels'])
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        progress_bar.set_postfix({'loss': loss.item()})
    
    return total_loss / len(train_loader)

def evaluate(model, eval_loader, device):
    """Evaluate the model"""
    model.eval()
    total_loss = 0
    predictions = []
    true_labels = []
    
    with torch.no_grad():
        for batch in tqdm(eval_loader, desc="Evaluating"):
            batch = {k: v.to(device) for k, v in batch.items()}
            
            outputs = model(
                input_ids=batch['input_ids'],
                attention_mask=batch['attention_mask'],
                pixel_values=batch['pixel_values']
            )
            
            image_features = outputs.image_embeds
            text_features = outputs.text_embeds
            similarity = torch.sum(image_features * text_features, dim=-1)
            
            loss = F.binary_cross_entropy_with_logits(similarity, batch['labels'])
            total_loss += loss.item()
            
            predictions.extend(torch.sigmoid(similarity).cpu().numpy())
            true_labels.extend(batch['labels'].cpu().numpy())
    
    predictions = np.array(predictions) > 0.5
    accuracy = (predictions == np.array(true_labels)).mean()
    
    return total_loss / len(eval_loader), accuracy

def train_model(df, num_epochs=3, batch_size=16, learning_rate=5e-5):
    """Main training function"""
    # Set device
    device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Load model and processor
    model_name = "openai/clip-vit-base-patch32"
    processor = CLIPProcessor.from_pretrained(model_name)
    model = CLIPModel.from_pretrained(model_name)
    model = model.to(device)
    
    # Split data
    train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)
    
    # Process datasets
    train_samples = [
        prepare_input(row['image_path'], row['has_food'], processor)
        for _, row in tqdm(train_df.iterrows(), desc="Processing train data")
    ]
    
    val_samples = [
        prepare_input(row['image_path'], row['has_food'], processor)
        for _, row in tqdm(val_df.iterrows(), desc="Processing val data")
    ]
    
    # Create data loaders
    train_loader = DataLoader(
        train_samples,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=create_batch
    )
    
    val_loader = DataLoader(
        val_samples,
        batch_size=batch_size,
        shuffle=False,
        collate_fn=create_batch
    )
    
    # Initialize optimizer
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    
    # Training loop
    best_accuracy = 0
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch + 1}/{num_epochs}")
        
        # Train
        train_loss = train_epoch(model, train_loader, optimizer, device)
        
        # Evaluate
        val_loss, accuracy = evaluate(model, val_loader, device)
        
        print(f"Train Loss: {train_loss:.4f}")
        print(f"Val Loss: {val_loss:.4f}")
        print(f"Accuracy: {accuracy:.4f}")
        
        # Save best model
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            torch.save(model.state_dict(), 'best_food_detector.pth')
    
    return model, processor

def predict(image_path, model, processor, device):
    """Make prediction for a single image"""
    inputs = prepare_input(image_path, False, processor)  # label doesn't matter here
    
    # Move to device
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(
            input_ids=inputs['input_ids'],
            attention_mask=inputs['attention_mask'],
            pixel_values=inputs['pixel_values']
        )
        
        image_features = outputs.image_embeds
        text_features = outputs.text_embeds
        similarity = torch.sum(image_features * text_features, dim=-1)
        
    probability = torch.sigmoid(similarity).cpu().numpy()[0]
    return probability > 0.5, probability



In [None]:


def prepare_balanced_data(df, balance_strategy='undersample'):
    """
    Balance the dataset using different strategies
    
    Parameters:
    - df: DataFrame with 'has_food' column
    - balance_strategy: 'undersample', 'oversample', or 'weighted'
    
    Returns:
    - Balanced DataFrame or (DataFrame, sample_weights)
    """
    food_samples = df[df['has_food']]
    no_food_samples = df[~df['has_food']]
    
    print(f"Original distribution:")
    print(f"Food samples: {len(food_samples)}")
    print(f"No food samples: {len(no_food_samples)}")
    
    if balance_strategy == 'undersample':
        # Undersample majority class
        no_food_balanced = resample(
            no_food_samples,
            replace=False,
            n_samples=len(food_samples),
            random_state=42
        )
        balanced_df = pd.concat([food_samples, no_food_balanced])
        print(f"\nAfter undersampling:")
        print(f"Total samples: {len(balanced_df)}")
        return balanced_df
        
    elif balance_strategy == 'oversample':
        # Oversample minority class
        food_balanced = resample(
            food_samples,
            replace=True,
            n_samples=len(no_food_samples),
            random_state=42
        )
        balanced_df = pd.concat([food_balanced, no_food_samples])
        print(f"\nAfter oversampling:")
        print(f"Total samples: {len(balanced_df)}")
        return balanced_df
        
    elif balance_strategy == 'weighted':
        # Calculate class weights
        total_samples = len(df)
        weight_for_0 = (1 / len(no_food_samples)) * (total_samples / 2)
        weight_for_1 = (1 / len(food_samples)) * (total_samples / 2)
        
        sample_weights = np.where(df['has_food'], weight_for_1, weight_for_0)
        print("\nUsing weighted sampling")
        print(f"Weight for no food: {weight_for_0:.3f}")
        print(f"Weight for food: {weight_for_1:.3f}")
        return df, sample_weights

def modify_train_function_for_weights(train_model_fn):
    """
    Modify the training function to use sample weights
    """
    def weighted_loss(predictions, targets, weights):
        return F.binary_cross_entropy_with_logits(
            predictions, 
            targets,
            weight=weights,
            reduction='mean'
        )
    
    # Modify the training loop to include weights
    def train_epoch_weighted(model, train_loader, optimizer, device):
        model.train()
        total_loss = 0
        progress_bar = tqdm(train_loader, desc="Training")
        
        for batch in progress_bar:
            batch = {k: v.to(device) for k, v in batch.items()}
            
            outputs = model(
                input_ids=batch['input_ids'],
                attention_mask=batch['attention_mask'],
                pixel_values=batch['pixel_values']
            )
            
            image_features = outputs.image_embeds
            text_features = outputs.text_embeds
            similarity = torch.sum(image_features * text_features, dim=-1)
            
            loss = weighted_loss(
                similarity, 
                batch['labels'],
                batch['weights'].to(device)
            )
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            progress_bar.set_postfix({'loss': loss.item()})
        
        return total_loss / len(train_loader)
    
    return train_epoch_weighted

# Example usage
if __name__ == "__main__":


    
    # Choose one of these approaches:
    
    # 1. Undersampling approach
    balanced_df = prepare_balanced_data(df, 'undersample')
    """  # 2. Oversampling approach
    balanced_df = prepare_balanced_data(df, 'oversample')
    model, processor = train_model(balanced_df)
    
    # 3. Weighted approach
    df, sample_weights = prepare_balanced_data(df, 'weighted')
    # You'll need to modify the train_model function to use weights """
    
 # Train model
    model, processor = train_model(df)
    
    # Example prediction
    image_path = "test_image.jpg"
    has_food, confidence = predict(image_path, model, processor, 
                                 torch.device('mps' if torch.backends.mps.is_available() else 'cpu'))
    print(f"Contains food: {has_food} (confidence: {confidence:.2f})")
    

  

In [None]:



# Choose one of these approaches:

# 1. Undersampling approach
balanced_df = prepare_balanced_data(df, 'undersample')

"""  # 2. Oversampling approach
balanced_df = prepare_balanced_data(df, 'oversample')
model, processor = train_model(balanced_df)

# 3. Weighted approach
df, sample_weights = prepare_balanced_data(df, 'weighted')
# You'll need to modify the train_model function to use weights """

# Train model
model, processor = train_model(df)

# Example prediction
image_path = "test_image.jpg"

has_food, confidence = predict(image_path, model, processor, torch.device('mps' if torch.backends.mps.is_available() else 'cpu'))

print(f"Contains food: {has_food} (confidence: {confidence:.2f})")


