In [None]:
import os
import json
import cv2
import numpy as np
import torch
import torchvision.transforms as transforms
from torchvision import models
from torch.utils.data import DataLoader, Dataset
from torchvision.models import resnet50
from PIL import Image
import faiss

In [16]:
dataset_dir = "./dataset/DeepFashion2/"
image_dirs = {
    "train": os.path.join(dir, "deepfashion2_original_images/train/image"),
    "validation": os.path.join(dir, "deepfashion2_original_images/validation/image")
}
annotation_dirs = {
    "train": os.path.join(dir, "deepfashion2_original_images/train/annos"),
    "validation": os.path.join(dir, "deepfashion2_original_images/validation/annos")
}

In [17]:
def load_annotations(annotation_dir, image_dir, dataset_type="train"):
    """
    Load annotations, extract bounding boxes, and crop clothing images.
    """
    cropped_images = []
    for filename in tqdm(os.listdir(annotation_dir), desc=f"Processing {dataset_type} annotations"):
        if filename.endswith(".json"):
            json_path = os.path.join(annotation_dir, filename)
            image_path = os.path.join(image_dir, filename.replace(".json", ".jpg"))

            if not os.path.exists(image_path):
                continue

            with open(json_path, "r") as f:
                annotation = json.load(f)

            for item_key, item in annotation.items():
                if not isinstance(item, dict) or "bounding_box" not in item:
                    continue
                
                bbox = item["bounding_box"]  # [x1, y1, x2, y2]
                category_id = item["category_id"]

                # Read image
                img = cv2.imread(image_path)
                if img is None:
                    continue

                # Crop clothing item
                x1, y1, x2, y2 = map(int, bbox)
                cropped_img = img[y1:y2, x1:x2]

                # Save cropped image
                cropped_filename = f"{dataset_type}_{filename.replace('.json', '')}_{category_id}.jpg"
                cropped_path = os.path.join(output_dir, cropped_filename)
                cv2.imwrite(cropped_path, cropped_img)

                cropped_images.append(cropped_path)

    return cropped_images

output_dir = "./cropped_clothing"
os.makedirs(output_dir, exist_ok=True)
train_images = load_annotations(annotation_dirs["train"], image_dirs["train"], "train")
val_images = load_annotations(annotation_dirs["validation"], image_dirs["validation"], "validation")

Processing train annotations:  53%|█████▎    | 101337/191961 [28:40<25:38, 58.90it/s] 


error: OpenCV(4.11.0) D:\a\opencv-python\opencv-python\opencv\modules\imgcodecs\src\loadsave.cpp:929: error: (-215:Assertion failed) !_img.empty() in function 'cv::imwrite'


In [None]:
BATCH_SIZE = 32
EPOCHS = 10
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
class FashionDataset(Dataset):
    def __init__(self, image_paths, transform=None):
        self.image_paths = image_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

In [None]:
train_dataset = FashionDataset(train_images, transform=transform)
val_dataset = FashionDataset(val_images, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
model = resnet50(pretrained=True)
model.fc = torch.nn.Identity()  # Remove classification layer
model = model.to(DEVICE)

optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
criterion = torch.nn.MSELoss()

In [None]:
def train_feature_extractor():
    model.train()
    for epoch in range(EPOCHS):
        for images in train_loader:
            images = images.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, torch.zeros_like(outputs))  # Dummy loss for feature learning
            loss.backward()
            optimizer.step()

        print(f"Epoch [{epoch+1}/{EPOCHS}], Loss: {loss.item()}")

train_feature_extractor()

In [None]:
torch.save(model.state_dict(), "fashion_feature_extractor.pth")

approach 1

In [None]:
model.load_state_dict(torch.load("fashion_feature_extractor.pth"))
model.eval()

In [None]:
d = 2048
faiss_index = faiss.IndexFlatL2(d)

In [None]:
def extract_features(image_paths):
    features = []
    for img_path in tqdm(image_paths, desc="Extracting Features"):
        image = Image.open(img_path).convert("RGB")
        image = transform(image).unsqueeze(0).to(DEVICE)

        with torch.no_grad():
            feature = model(image).cpu().numpy()
        
        features.append(feature)

    return np.vstack(features)

feature_vectors = extract_features(train_images + val_images)
faiss_index.add(feature_vectors)
faiss.write_index(faiss_index, "fashion_faiss.index")

In [None]:
user_images_dir = "/path/to/user/uploads"
user_images = [os.path.join(user_images_dir, f) for f in os.listdir(user_images_dir) if f.endswith(".jpg")]
user_features = extract_features(user_images)
faiss_index.add(user_features)
faiss.write_index(faiss_index, "fashion_faiss.index")

approach 2

In [None]:
import os
import torch
import numpy as np
import faiss
from tqdm import tqdm
from PIL import Image
import torchvision.transforms as transforms
from torchvision.models import resnet50

In [None]:
user_images_dir = "./dataset/DeepFashion2/deepfashion2_original_images/test/test/image"

In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", DEVICE)
model = resnet50(pretrained=True)
model.fc = torch.nn.Identity()
model.load_state_dict(torch.load("fashion_feature_extractor.pth"))
model = model.to(DEVICE)
model.eval()

In [None]:
d = 2048
faiss_index = faiss.IndexFlatL2(d)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

user_images = [os.path.join(user_images_dir, img) for img in os.listdir(user_images_dir) if img.endswith(('.jpg', '.png'))]

In [None]:
def extract_features(image_paths):
    features = []
    for img_path in tqdm(image_paths, desc="Extracting Features from User Images"):
        image = Image.open(img_path).convert("RGB")
        image = transform(image).unsqueeze(0).to(DEVICE)

        with torch.no_grad():
            feature = model(image).cpu().numpy()

        features.append(feature)

    return np.vstack(features)

In [None]:
if user_images:
    user_feature_vectors = extract_features(user_images)
    faiss_index.add(user_feature_vectors)

    faiss.write_index(faiss_index, "user_images_faiss.index")

print(f"Stored {len(user_images)} user-uploaded images in FAISS.")

In [None]:
def find_similar_images(uploaded_image_path, top_k=5):
    image = Image.open(uploaded_image_path).convert("RGB")
    image = transform(image).unsqueeze(0).to(DEVICE)

    with torch.no_grad():
        feature = model(image).cpu().numpy()

    faiss_index = faiss.read_index("user_images_faiss.index")

    distances, indices = faiss_index.search(feature, top_k)

    return [user_images[i] for i in indices[0]]

uploaded_image = "/path/to/new_user_uploaded_image.jpg"
similar_images = find_similar_images(uploaded_image)

print("Top Similar User-Uploaded Images:", similar_images)