In [2]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models
import torchvision.transforms.v2 as transforms
import pandas as pd
import numpy as np
from PIL import Image
import requests
from io import BytesIO
import firebase_admin
from firebase_admin import credentials, firestore, storage
from tqdm import tqdm
from sklearn.metrics import f1_score, precision_score, recall_score

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [9]:
class RecipeImageDataset(Dataset):
    def __init__(self, recipes_data, ingredient_mapping, transform=None):
    
        self.recipes = recipes_data
        self.transform = transform
        self.ingredient_to_idx = ingredient_mapping
        self.idx_to_ingredient = {idx: ingredient for ingredient, idx in self.ingredient_to_idx.items()}
        #Pre-compute labels to avoid doing this work during training
        self.labels = []
        for recipe in self.recipes:
           
            ingredients = recipe['ingredients']
            ingredient_vector = torch.zeros(len(self.ingredient_to_idx))
            
            for ingredient in ingredients:
                if ingredient in self.ingredient_to_idx:
                    ingredient_vector[self.ingredient_to_idx[ingredient]] = 1.0
            
            self.labels.append(ingredient_vector)
        
        print(f"Dataset created with {len(self.recipes)} recipes")
        print(f"Total unique ingredients: {len(self.ingredient_to_idx)}")
    
    def __len__(self):
        return len(self.recipes)
    
    def __getitem__(self, idx):
        recipe = self.recipes[idx]
        image_url = recipe['image_url']
        label = self.labels[idx]
        
        try:
            #Load image from URL
            response = requests.get(image_url)
            img = Image.open(BytesIO(response.content)).convert('RGB')
            
            if self.transform:
                img = self.transform(img)
            
            return img, label
            
        except Exception as e:
            print(f"Error loading image {image_url}: {e}")
            #Return a placeholder image in case of error
            placeholder = torch.zeros(3, 224, 224)
            return placeholder, label

In [10]:
def prepare_data_from_firebase(firestore_client, test_size=0.2):
    recipes = []
    unique_ingredients = set()
    
    #Get all recipes from Firestore
    recipes_ref = firestore_client.collection('recipes')
    all_recipes = list(recipes_ref.stream())
    
    #Extract recipe data and collect unique ingredients
    for recipe_doc in all_recipes:
        recipe = recipe_doc.to_dict()
        if ('image_url' in recipe and recipe['image_url'] and 
            'ingredients' in recipe and recipe['ingredients']):
            recipes.append(recipe)
            
            #Collect unique ingredients
            if isinstance(recipe['ingredients'], list):
                unique_ingredients.update(recipe['ingredients'])
    
    #Create ingredient mapping
    ingredient_to_idx = {ingredient: idx for idx, ingredient in enumerate(sorted(unique_ingredients))}
    
    #Train-test split
    n_recipes = len(recipes)
    indices = np.arange(n_recipes)
    np.random.seed(42)
    np.random.shuffle(indices)
    
    split_idx = int(n_recipes * (1 - test_size))
    train_indices = indices[:split_idx]
    test_indices = indices[split_idx:]
    
    #Select recipes for each split
    train_recipes = [recipes[i] for i in train_indices]
    test_recipes = [recipes[i] for i in test_indices]
    
    print(f"Loaded {len(train_recipes)} recipes for training")
    print(f"Loaded {len(test_recipes)} recipes for testing")
    print(f"Total unique ingredients: {len(unique_ingredients)}")
    
    return train_recipes, test_recipes, ingredient_to_idx


In [11]:
class IngredientRecognition(nn.Module):
    def __init__(self, num_ingredients):
        super(IngredientRecognition, self).__init__()
        
        #Load pre-trained EfficientNet
        self.backbone = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT)
        
        #Freeze early layers
        for param in list(self.backbone.parameters())[:-30]:
            param.requires_grad = False
            
        #Modify classifier for multi-label classification
        num_ftrs = self.backbone.classifier[1].in_features
        self.backbone.classifier = nn.Sequential(
            nn.Dropout(p=0.3, inplace=True),
            nn.Linear(num_ftrs, num_ingredients),
            nn.Sigmoid()  #Sigmoid for multi-label classification
        )
    
    def forward(self, x):
        return self.backbone(x)

In [12]:
def train_model(model, train_loader, val_loader, criterion, optimizer, 
                num_epochs=10, device='cuda'):
    
    best_val_f1 = 0.0
    best_model_weights = None
    
    for epoch in range(num_epochs):
        #Training phase
        model.train()
        running_loss = 0.0
        
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            
            with torch.set_grad_enabled(True):
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
            
            running_loss += loss.item() * inputs.size(0)
        
        epoch_loss = running_loss / len(train_loader.dataset)
        
        #Validation phase
        model.eval()
        val_preds = []
        val_targets = []
        
        for inputs, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Validation"):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            with torch.no_grad():
                outputs = model(inputs)
            
            #Threshold predictions
            threshold_preds = (outputs > 0.5).float()
            
            val_preds.extend(threshold_preds.cpu().numpy())
            val_targets.extend(labels.cpu().numpy())
        
        #Calculate metrics
        val_f1 = f1_score(
            np.array(val_targets).flatten(), 
            np.array(val_preds).flatten(), 
            average='macro'
        )
        
        val_precision = precision_score(
            np.array(val_targets).flatten(), 
            np.array(val_preds).flatten(), 
            average='macro',
            zero_division=0
        )
        
        val_recall = recall_score(
            np.array(val_targets).flatten(), 
            np.array(val_preds).flatten(), 
            average='macro',
            zero_division=0
        )
        
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f}, "
              f"Val F1: {val_f1:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}")
        
        #Save best model
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            best_model_weights = model.state_dict().copy()
    
    #Load best model weights
    model.load_state_dict(best_model_weights)
    return model

In [13]:
def get_recipe_by_ingredients(firestore_client, ingredients, top_k=3):
    recipes_ref = firestore_client.collection('recipes')
    all_recipes = list(recipes_ref.stream())
    
    matches = []
    for recipe_doc in all_recipes:
        recipe = recipe_doc.to_dict()
        if 'ingredients' in recipe and recipe['ingredients']:
            #Calculate how many ingredients match
            recipe_ingredients = set(recipe['ingredients'])
            input_ingredients = set(ingredients)
            
            common = len(recipe_ingredients.intersection(input_ingredients))
            coverage = common / len(input_ingredients) if input_ingredients else 0
            
            matches.append({
                'recipe': recipe,
                'common_ingredients': common,
                'coverage': coverage
            })
    
    #Sort by coverage (higher is better)
    matches.sort(key=lambda x: x['coverage'], reverse=True)
    return [match['recipe'] for match in matches[:top_k]]

In [15]:
def main():
    
    #Initialize Firebase
    cred_path = "recipegen.json"
    storage_bucket = "recipegen-710d0.firebasestorage.app"
    
    try:
        firebase_admin.get_app()
    except ValueError:
        cred = credentials.Certificate(cred_path)
        firebase_admin.initialize_app(cred, {
            "storageBucket": storage_bucket
        })
    
    firestore_client = firestore.client()
    
    train_recipes, val_recipes, ingredient_to_idx = prepare_data_from_firebase(
        firestore_client, 
        test_size=0.2
    )
    
    #Data transforms
    train_transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.RandomCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
        transforms.ToImage(), 
        transforms.ToDtype(torch.float32, scale=True),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    val_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToImage(), 
        transforms.ToDtype(torch.float32, scale=True),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    #Create datasets
    train_dataset = RecipeImageDataset(
        recipes_data=train_recipes,
        ingredient_mapping=ingredient_to_idx,
        transform=train_transform
    )
    
    val_dataset = RecipeImageDataset(
        recipes_data=val_recipes,
        ingredient_mapping=ingredient_to_idx,
        transform=val_transform
    )
    
    #Create dataloaders
    train_loader = DataLoader(
        train_dataset, 
        batch_size=32, 
        shuffle=True, 
        num_workers=4,
        pin_memory=True
    )
    
    val_loader = DataLoader(
        val_dataset, 
        batch_size=32, 
        shuffle=False, 
        num_workers=4,
        pin_memory=True
    )
    
    #Create model
    num_ingredients = len(ingredient_to_idx)
    model = IngredientRecognition(num_ingredients)
    model = model.to(device)
    
    #Define loss and optimizer
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    #Train model
    trained_model = train_model(
        model, 
        train_loader, 
        val_loader, 
        criterion, 
        optimizer, 
        num_epochs=15,
        device=device
    )
    
    #Save model
    torch.save({
        'model_state_dict': trained_model.state_dict(),
        'ingredient_to_idx': ingredient_to_idx,
        'idx_to_ingredient': {idx: ingredient for ingredient, idx in ingredient_to_idx.items()}
    }, 'food_recognition_model.pth')
    
    print("Model training complete and saved to food_recognition_model.pth")

In [None]:
if __name__ == "__main__":
    main()

Loaded 9768 recipes for training
Loaded 2443 recipes for testing
Total unique ingredients: 67176
Dataset created with 9768 recipes
Total unique ingredients: 67176
Dataset created with 2443 recipes
Total unique ingredients: 67176


Epoch 1/15 - Training:   0%|          | 0/306 [00:00<?, ?it/s]