<a href="https://colab.research.google.com/github/thales20266/Projeto-Transfer-Learning/blob/main/recomenda%C3%A7%C3%A3o.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!/usr/bin/env python3
"""
image_recommender.py

Um script único que cobre:
- Treinamento por transferência (ResNet18)
- Extração de embeddings
- Busca por imagens similares localmente (NearestNeighbors)
- Integração opcional com Bing Image Search para buscar imagens na web

Requisitos:
pip install torch torchvision scikit-learn pillow requests tqdm

Uso (exemplos):
# Treinar
python image_recommender.py --mode train --data_dir ./data/train --model_out model.pth --epochs 5

# Construir índice de embeddings a partir de pasta de imagens (dataset de referência)
python image_recommender.py --mode build_index --ref_images ./data/ref_images --index_out index.npz --model_in model.pth

# Recomendar localmente para uma imagem de consulta
python image_recommender.py --mode recommend_local --model_in model.pth --index_in index.npz --query_image query.jpg --top_k 5

# Recomendar de imagens web (necessita BING_API_KEY)
python image_recommender.py --mode recommend_web --model_in model.pth --query_image query.jpg --bing_key YOUR_KEY --top_k 5
"""

import os
import io
import sys
import json
import time
import argparse
from pathlib import Path
from typing import List, Tuple

import requests
from PIL import Image
from tqdm import tqdm

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import transforms, datasets, models
from sklearn.neighbors import NearestNeighbors

# ---------------------
# Config / Transforms
# ---------------------
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

IMG_SIZE = 224
BATCH_SIZE = 32

transform_train = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

transform_eval = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

# ---------------------
# Model helpers
# ---------------------
def build_model(num_classes: int, pretrained=True, embedding_dim=512):
    """
    Retorna um modelo com classificação e possibilidade de extrair embeddings.
    Utiliza ResNet18 e altera a última camada.
    """
    model = models.resnet18(pretrained=pretrained)
    # ResNet18 final fc in_features
    in_f = model.fc.in_features
    model.fc = nn.Linear(in_f, num_classes)
    return model

class EmbeddingModel(nn.Module):
    """
    Wrapper para extrair embeddings (features antes da última FC)
    """
    def __init__(self, base_model: nn.Module):
        super().__init__()
        # copy all layers except final fc
        # For resnet, take everything up to avgpool and fc separately
        self.features = nn.Sequential(
            base_model.conv1,
            base_model.bn1,
            base_model.relu,
            base_model.maxpool,
            base_model.layer1,
            base_model.layer2,
            base_model.layer3,
            base_model.layer4,
            base_model.avgpool,  # yields (batch, in_f, 1, 1)
        )
        self.flatten = nn.Flatten()
        # final fc exists separately but we won't include it here
        self.final_in_features = base_model.fc.in_features

    def forward(self, x):
        x = self.features(x)
        x = self.flatten(x)  # (batch, final_in_features)
        return x

# ---------------------
# Training
# ---------------------
def train(
    data_dir: str,
    model_out: str,
    epochs: int = 5,
    lr: float = 1e-3,
    weight_decay: float = 1e-4,
):
    """
    Treina um classificador usando ImageFolder em data_dir (com subpastas por classe)
    Salva o modelo em model_out.
    """
    train_dataset = datasets.ImageFolder(root=data_dir, transform=transform_train)
    num_classes = len(train_dataset.classes)
    print(f"Found {len(train_dataset)} images, {num_classes} classes.")

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True)

    model = build_model(num_classes=num_classes, pretrained=True).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}")
        for imgs, labels in pbar:
            imgs = imgs.to(DEVICE)
            labels = labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * imgs.size(0)
            preds = outputs.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            pbar.set_postfix(loss=running_loss / total, acc=100. * correct / total)

        scheduler.step()
        epoch_loss = running_loss / len(train_dataset)
        epoch_acc = 100. * correct / len(train_dataset)
        print(f"Epoch {epoch+1} finished: loss={epoch_loss:.4f}, acc={epoch_acc:.2f}%")

    # Save model + class_to_idx for inference
    state = {
        "model_state": model.state_dict(),
        "classes": train_dataset.classes,
    }
    torch.save(state, model_out)
    print(f"Model saved to {model_out}")

# ---------------------
# Embedding & Index
# ---------------------
def load_model_for_embedding(model_path: str, device=DEVICE):
    """
    Carrega o modelo salvo e retorna EmbeddingModel and classification model (if available)
    """
    checkpoint = torch.load(model_path, map_location=device)
    # We need a base resnet18 to reconstruct embedding extractor
    # If checkpoint doesn't have num_classes stored, we'll try to infer
    # Create temp model to load weights
    # This assumes the original was resnet18
    # Build dummy classification model (we won't use its fc directly for embeddings)
    # For safety, try to infer number of classes from fc weight shape
    # If missing, fallback to 1000 and ignore mismatch
    # We'll construct model and load partial state
    # Simple approach: instantiate resnet18 with 1000 classes then try to load state (ignore mismatch for fc)
    base = models.resnet18(pretrained=False)
    # try to load state dict permissively
    sd = checkpoint.get("model_state", checkpoint)
    base_state = base.state_dict()
    # filter sd keys to those in base_state
    filtered = {k: v for k, v in sd.items() if k in base_state and v.shape == base_state[k].shape}
    base_state.update(filtered)
    base.load_state_dict(base_state)
    emb_model = EmbeddingModel(base).to(device)
    emb_model.eval()
    # We may also want the classifier object with class names
    classes = checkpoint.get("classes", None)
    return emb_model, classes

def image_to_tensor(img_path: str, transform=transform_eval):
    img = Image.open(img_path).convert("RGB")
    return transform(img).unsqueeze(0)  # batch dim

def compute_embeddings_for_folder(folder: str, model: nn.Module, batch_size=32) -> Tuple[np.ndarray, List[str]]:
    """
    Percorre as imagens em `folder`, calcula embeddings e retorna (embeddings, list_of_paths).
    """
    image_paths = []
    for ext in (".jpg", ".jpeg", ".png", ".webp", ".bmp"):
        image_paths.extend(Path(folder).rglob(f"*{ext}"))
    image_paths = sorted([str(p) for p in image_paths])
    print(f"Found {len(image_paths)} images in reference folder.")

    emb_list = []
    model.eval()
    with torch.no_grad():
        for i in range(0, len(image_paths), batch_size):
            batch_paths = image_paths[i:i+batch_size]
            tensors = [image_to_tensor(p) for p in batch_paths]
            batch = torch.cat(tensors, dim=0).to(next(model.parameters()).device)
            feats = model(batch)  # (B, D)
            feats_np = feats.cpu().numpy()
            emb_list.append(feats_np)
    embeddings = np.vstack(emb_list) if len(emb_list) else np.zeros((0, model.final_in_features))
    return embeddings, image_paths

def build_and_save_index(ref_folder: str, model_path: str, index_out: str):
    emb_model, _ = load_model_for_embedding(model_path)
    embeddings, paths = compute_embeddings_for_folder(ref_folder, emb_model, batch_size=BATCH_SIZE)
    # Fit NearestNeighbors
    nbrs = NearestNeighbors(n_neighbors=50, algorithm="auto", metric="cosine")
    if len(embeddings) == 0:
        print("No embeddings found — abort.")
        return
    nbrs.fit(embeddings)
    # Save embeddings and mapping
    np.savez_compressed(index_out, embeddings=embeddings, paths=np.array(paths))
    # We can't easily pickle sklearn's NearestNeighbors robustly across environments; we'll re-fit on load from embeddings
    print(f"Index saved to {index_out} (embeddings shape {embeddings.shape})")

def load_index(index_in: str):
    data = np.load(index_in, allow_pickle=True)
    embeddings = data["embeddings"]
    paths = data["paths"].tolist()
    nbrs = NearestNeighbors(n_neighbors=10, algorithm="auto", metric="cosine")
    nbrs.fit(embeddings)
    return nbrs, embeddings, paths

def recommend_local(query_image_path: str, model_path: str, index_in: str, top_k: int = 5):
    emb_model, _ = load_model_for_embedding(model_path)
    nbrs, embeddings, paths = load_index(index_in)
    # query embedding
    with torch.no_grad():
        t = image_to_tensor(query_image_path).to(next(emb_model.parameters()).device)
        q_emb = emb_model(t).cpu().numpy()
    dists, idxs = nbrs.kneighbors(q_emb, n_neighbors=top_k)
    results = []
    for dist, idx in zip(dists[0], idxs[0]):
        results.append({"path": paths[idx], "score": float(1.0 - dist)})  # convert cosine distance to similarity-ish
    return results

# ---------------------
# Web image search (Bing) + comparison
# ---------------------
def bing_image_search(query: str, bing_api_key: str, count: int = 10) -> List[str]:
    """
    Usa Bing Image Search v7 REST API para buscar imagens.
    Retorna lista de image urls.
    Você precisa configurar uma chave: azure/cognitive services Bing Image Search.
    Docs example: https://learn.microsoft.com/en-us/bing/search-apis/bing-image-search/overview
    """
    assert bing_api_key, "bing_api_key is required"
    endpoint = "https://api.bing.microsoft.com/v7.0/images/search"
    headers = {"Ocp-Apim-Subscription-Key": bing_api_key}
    params = {"q": query, "count": count}
    resp = requests.get(endpoint, headers=headers, params=params, timeout=10)
    resp.raise_for_status()
    data = resp.json()
    urls = []
    for v in data.get("value", []):
        content_url = v.get("contentUrl")
        if content_url:
            urls.append(content_url)
    return urls

def download_image_to_pil(url: str, timeout=8):
    try:
        r = requests.get(url, timeout=timeout)
        r.raise_for_status()
        return Image.open(io.BytesIO(r.content)).convert("RGB")
    except Exception as e:
        # ignore failed downloads
        return None

def recommend_web(query_image_path: str, model_path: str, bing_key: str, top_k: int = 5, bing_count: int = 20):
    """
    Busca imagens na web (Bing) com base em classes ou texto gerado pelo classificador.
    Simples approach: usa filename or predicted class as query.
    Melhorias possíveis: usar um modelo captioning para gerar query rica.
    """
    # Load classification model to predict a class label for query image
    checkpoint = torch.load(model_path, map_location=DEVICE)
    # We don't have a full class->index mapping routine here; we will try to create a resnet and load matching weights (like before)
    base = models.resnet18(pretrained=False)
    sd = checkpoint.get("model_state", checkpoint)
    base_state = base.state_dict()
    filtered = {k: v for k, v in sd.items() if k in base_state and v.shape == base_state[k].shape}
    base_state.update(filtered)
    base.load_state_dict(base_state)
    # Attempt to build a classifier for prediction
    classifier = base.to(DEVICE)
    classifier.eval()
    classes = checkpoint.get("classes", None)

    # Preprocess and predict
    with torch.no_grad():
        t = image_to_tensor(query_image_path).to(DEVICE)
        out = classifier(t)
        pred_idx = int(out.argmax(dim=1).item())
        pred_label = classes[pred_idx] if classes is not None and pred_idx < len(classes) else str(pred_idx)

    # Use label as Bing query. Alternative: generate caption with an image-to-text model if available.
    query = pred_label
    print(f"Predicted label for query image: {pred_label}. Using it to search Bing for similar images.")

    urls = bing_image_search(query, bing_api_key=bing_key, count=bing_count)
    print(f"Got {len(urls)} candidate urls from Bing. Downloading and embedding (may take a while).")

    emb_model, _ = load_model_for_embedding(model_path)
    query_emb = emb_model(image_to_tensor(query_image_path).to(next(emb_model.parameters()).device)).cpu().numpy()

    # Download images and compute embeddings incrementally
    candidates = []
    for url in urls:
        pil = download_image_to_pil(url)
        if pil is None:
            continue
        # transform to tensor
        img_t = transform_eval(pil).unsqueeze(0).to(next(emb_model.parameters()).device)
        with torch.no_grad():
            emb = emb_model(img_t).cpu().numpy()
        # compute cosine similarity
        # cosine similarity = 1 - cosine_distance
        dot = np.dot(query_emb, emb.T)[0][0]
        q_norm = np.linalg.norm(query_emb)
        e_norm = np.linalg.norm(emb)
        if q_norm == 0 or e_norm == 0:
            sim = 0.0
        else:
            sim = float(dot / (q_norm * e_norm))
        candidates.append({"url": url, "similarity": sim})
    # sort by similarity desc
    candidates = sorted(candidates, key=lambda x: x["similarity"], reverse=True)[:top_k]
    return {"predicted_label": pred_label, "results": candidates}

# ---------------------
# CLI
# ---------------------
def parse_args():
    p = argparse.ArgumentParser()
    p.add_argument("--mode", choices=["train", "build_index", "recommend_local", "recommend_web"], required=True)
    p.add_argument("--data_dir")
    p.add_argument("--model_out")
    p.add_argument("--model_in")
    p.add_argument("--epochs", type=int, default=5)
    p.add_argument("--ref_images")
    p.add_argument("--index_out")
    p.add_argument("--index_in")
    p.add_argument("--query_image")
    p.add_argument("--top_k", type=int, default=5)
    p.add_argument("--bing_key", help="Bing API key for web image search")
    return p.parse_args()

def main():
    args = parse_args()
    if args.mode == "train":
        assert args.data_dir and args.model_out
        train(args.data_dir, args.model_out, epochs=args.epochs)
    elif args.mode == "build_index":
        assert args.ref_images and args.model_in and args.index_out
        build_and_save_index(args.ref_images, args.model_in, args.index_out)
    elif args.mode == "recommend_local":
        assert args.model_in and args.index_in and args.query_image
        results = recommend_local(args.query_image, args.model_in, args.index_in, top_k=args.top_k)
        print("Top local recommendations:")
        print(json.dumps(results, indent=2))
    elif args.mode == "recommend_web":
        assert args.model_in and args.query_image and args.bing_key
        results = recommend_web(args.query_image, args.model_in, args.bing_key, top_k=args.top_k)
        print("Top web recommendations:")
        print(json.dumps(results, indent=2))
    else:
        print("Unknown mode")

if __name__ == "__main__":
    main()