### Импорты

In [44]:
import torch
import torch.nn as nn
from torchvision import models, transforms
from pathlib import Path
import numpy as np
from annoy import AnnoyIndex
import pickle
import os
from PIL import Image
import pandas as pd
from tqdm import tqdm
from collections import defaultdict
from transformers import BlipProcessor, BlipForConditionalGeneration
from sentence_transformers import SentenceTransformer
import json

### Image Encoder

In [45]:
class ImageEncoder:
    def __init__(self):
        print("Loading ConvNeXT model...")
        self.model = models.convnext_large(pretrained=True)
        self.model = nn.Sequential(*list(self.model.children())[:-1])
        self.model.eval()
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = self.model.to(self.device)
        
        self.transform = transforms.Compose([
            transforms.Resize(236, interpolation=transforms.InterpolationMode.BICUBIC),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                                 std=[0.229, 0.224, 0.225])
        ])
        print(f"Model loaded successfully on {self.device}!")

    def get_embedding(self, image_path):
        try:
            image = Image.open(image_path).convert('RGB')
            image = self.transform(image).unsqueeze(0)
            image = image.to(self.device)
            
            with torch.no_grad():
                embedding = self.model(image)
                
            embedding = embedding.squeeze().cpu().numpy()
            embedding = embedding / np.linalg.norm(embedding)
            return embedding
        except Exception as e:
            print(f"Error processing {image_path}: {str(e)}")
            return None
        
    def get_combined_embedding(self, image_path, description, text_model):
        # Визуальный эмбеддинг
        image_embedding = self.get_embedding(image_path)
        if image_embedding is None:
            return None
        # Текстовый эмбеддинг
        text_embedding = text_model.encode(description, convert_to_tensor=True, normalize_embeddings=True).cpu().numpy()
        # Конкатенация с нормализацией 
        combined_embedding = np.concatenate((image_embedding, text_embedding))
        combined_embedding = combined_embedding / np.linalg.norm(combined_embedding)
        return combined_embedding

### Создание экземпляров энкодера и моделей

In [46]:
# Создаем экземпляр энкодера изображений
encoder = ImageEncoder()

# Загружаем модель BLIP для генерации описаний
print("Loading BLIP model for image captioning...")
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
caption_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
caption_model = caption_model.to(encoder.device)
print("BLIP model loaded successfully!")

# Загружаем модель SentenceTransformer для текстовых эмбеддингов
print("Loading SentenceTransformer model for text embeddings...")
text_model = SentenceTransformer('all-MiniLM-L6-v2', device=encoder.device)
print("SentenceTransformer model loaded successfully!")


Loading ConvNeXT model...




Model loaded successfully on cuda!
Loading BLIP model for image captioning...
BLIP model loaded successfully!
Loading SentenceTransformer model for text embeddings...
SentenceTransformer model loaded successfully!


### Генерация описаний

In [47]:
def generate_description(image_path, processor, model):
    try:
        image = Image.open(image_path).convert('RGB')
        inputs = processor(image, return_tensors="pt").to(encoder.device)
        with torch.no_grad():
            outputs = model.generate(**inputs)
        description = processor.decode(outputs[0], skip_special_tokens=True)
        return description
    except Exception as e:
        print(f"Error generating description for {image_path}: {str(e)}")
        return None

### Прогон описаний для датасета

In [78]:
def process_image_folder(folder_path, processor, model, output_file):
    folder_path = Path(folder_path)
    descriptions = {}
    print("Generating descriptions for images...")
    
    # Обработка изображений в корневой папке
    for image_file in folder_path.glob("*.*"):
        if image_file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.png']:
            description = generate_description(str(image_file), processor, model)
            if description:
                descriptions[image_file.name] = description  # Используем image_file.name в качестве ключа

    # Обработка изображений в поддиректориях
    for class_dir in tqdm(list(folder_path.iterdir())):
        if class_dir.is_dir():
            for image_file in class_dir.glob("*.*"):
                if image_file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.png']:
                    description = generate_description(str(image_file), processor, model)
                    if description:
                        descriptions[image_file.name] = description  # Используем image_file.name в качестве ключа

    # Сохраняем описания в файл JSON
    with open(output_file, 'w') as f:
        json.dump(descriptions, f)
    print(f"\nAll descriptions saved to {output_file}")



dataset_path = "/home/moo/Downloads/train_dataset_train_data_rkn/train_data_rkn/train"
descriptions_file = "./descriptions.json"
process_image_folder(dataset_path, processor, caption_model, output_file=descriptions_file)

Generating descriptions for images...


100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 105/105 [21:51<00:00, 12.49s/it]


All descriptions saved to ./descriptions.json





### Обработка и получение эмбеддингов

In [None]:
def process_dataset(dataset_path, encoder, text_model, descriptions_file, save_dir="./data"):
    dataset_path = Path(dataset_path)
    
    # Загрузка описаний из файла
    with open(descriptions_file, 'r') as f:
        descriptions_dict = json.load(f)
    
    embeddings_dict = {}
    file_mapping = {}
    class_mapping = {}
    reverse_class_mapping = {}  # Для быстрого получения класса по имени файла
    class_stats = defaultdict(int)
    
    idx = 0
    print("Processing dataset with combined embeddings...")
    
    for class_dir in tqdm(list(dataset_path.iterdir())):
        if class_dir.is_dir():
            class_name = class_dir.name
            for image_file in class_dir.glob("*.*"):
                if image_file.suffix.lower() in ['.jpg', '.jpeg', '.png']:
                    image_path = str(image_file)
                    description = descriptions_dict.get(image_path)
                    if description is None:
                        print(f"No description found for {image_path}, skipping.")
                        continue
                    embedding = encoder.get_combined_embedding(image_path, description, text_model)
                    if embedding is not None:
                        embeddings_dict[idx] = embedding
                        file_mapping[idx] = image_file.stem
                        class_mapping[idx] = class_name
                        reverse_class_mapping[image_file.stem] = class_name
                        class_stats[class_name] += 1
                        idx += 1
    
    print(f"\nTotal images processed: {idx}")
    print("\nClass distribution:")
    for class_name, count in class_stats.items():
        print(f"{class_name}: {count} images")
    
    os.makedirs(save_dir, exist_ok=True)
    with open(f"{save_dir}/processed_data_combined.pkl", "wb") as f:
        pickle.dump({
            'embeddings': embeddings_dict,
            'file_mapping': file_mapping,
            'class_mapping': class_mapping,
            'reverse_class_mapping': reverse_class_mapping,
            'class_stats': dict(class_stats)
        }, f)
    
    return embeddings_dict, file_mapping, class_mapping, reverse_class_mapping

# Обработка датасета
embeddings_dict, file_mapping, class_mapping, reverse_class_mapping = process_dataset(
    dataset_path, encoder, text_model, descriptions_file)

### Создание и сохранение индекса

In [None]:
def build_index(embeddings_dict, save_dir="./data_s2"):
    first_embedding = next(iter(embeddings_dict.values()))
    embedding_dim = len(first_embedding)
    
    index = AnnoyIndex(embedding_dim, 'angular')
    
    print("Building index...")
    for idx, embedding in embeddings_dict.items():
        index.add_item(idx, embedding)
    
    print("Building index with 100 trees...")
    index.build(100)
    index.save(f"{save_dir}/image_index.ann")
    
    return index

# Создание индекса
index = build_index(embeddings_dict)

### Поиск похожих изображений

In [None]:
def find_similar(query_image_path, description, index, encoder, text_model, file_mapping, class_mapping, n_results=10):
    query_embedding = encoder.get_combined_embedding(query_image_path, description, text_model)
    if query_embedding is None:
        return []
    
    n_candidates = min(n_results * 3, len(file_mapping))
    similar_idx, distances = index.get_nns_by_vector(
        query_embedding, n_candidates, include_distances=True)
    
    filtered_results = []
    seen_classes = set()
    
    for idx, dist in zip(similar_idx, distances):
        class_name = class_mapping[idx]
        if len(filtered_results) < n_results:
            if class_name not in seen_classes:
                filtered_results.append(file_mapping[idx])
                seen_classes.add(class_name)
    
    while len(filtered_results) < n_results and len(similar_idx) > len(filtered_results):
        idx = similar_idx[len(filtered_results)]
        filtered_results.append(file_mapping[idx])
    
    return filtered_results

### map10

In [None]:
def calculate_map10(predictions, dataset_path, test_class_mapping):
    """
    Правильный расчет MAP@10 на основе классов изображений.

    Args:
        predictions: dict {query_image: [recommended_images]}
        dataset_path: путь к тренировочному датасету
        test_class_mapping: dict {image_name: class_name} для тестовых изображений
    """
    from pathlib import Path
    import numpy as np
    from collections import defaultdict

    dataset_path = Path(dataset_path)
    ap_scores = []
    class_performances = defaultdict(list)

    # Создаем маппинг классов для тренировочных изображений
    train_class_mapping = {}
    for class_dir in dataset_path.iterdir():
        if class_dir.is_dir():
            class_name = class_dir.name
            for img_path in class_dir.glob("*.*"):
                if img_path.suffix.lower() in ['.jpg', '.jpeg', '.png']:
                    train_class_mapping[img_path.name] = class_name

    # Проверка наличия изображений в маппингах
    if not train_class_mapping:
        print("Ошибка: train_class_mapping пуст.")
    if not test_class_mapping:
        print("Ошибка: test_class_mapping пуст.")

    # Для каждого тестового изображения
    for query_image, recommended_images in predictions.items():
        query_class = test_class_mapping.get(query_image)
        if query_class is None:
            print(f"Класс не найден для тестового изображения: {query_image}")
            continue

        # Считаем релевантность для каждой позиции
        relevance = []
        for rec_image in recommended_images[:10]:
            rec_class = train_class_mapping.get(rec_image)
            is_relevant = 1 if rec_class == query_class else 0
            relevance.append(is_relevant)

        # Считаем AP@10
        precision_at_k = []
        num_relevant = 0

        for k, is_relevant in enumerate(relevance, 1):
            if is_relevant:
                num_relevant += 1
                precision_at_k.append(num_relevant / k)

        if num_relevant > 0:
            ap = sum(precision_at_k) / num_relevant
            ap_scores.append(ap)
            class_performances[query_class].append(ap)
        else:
            ap_scores.append(0)
            class_performances[query_class].append(0)

    # Считаем общий MAP@10
    map10 = np.mean(ap_scores) if ap_scores else 0

    # Считаем MAP@10 по классам
    class_map = {cls: np.mean(scores) for cls, scores in class_performances.items()}

    return map10, class_map

import csv

def load_test_class_mapping(mapping_file):
    test_class_mapping = {}
    with open(mapping_file, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            test_class_mapping[row['image_name']] = row['class_name']
    return test_class_mapping

# Загрузка маппинга
valid_mapping_file = '/home/moo/Downloads/train_dataset_train_data_rkn/train_data_rkn/valid_mapping.csv'
test_class_mapping = load_test_class_mapping(valid_mapping_file)

In [None]:
def evaluate_recommendations(test_dir, index, encoder, text_model, descriptions_file, file_mapping, class_mapping, dataset_path, test_class_mapping, output_file="submission_s2.csv"):
    """
    Генерация рекомендаций и подсчет метрик.
    """
    # Загрузка описаний для тестовых изображений
    with open(descriptions_file, 'r') as f:
        test_descriptions_dict = json.load(f)

    test_path = Path(test_dir)
    results = []
    predictions = {}

    print("Generating recommendations for test images...")
    for image_file in tqdm(list(test_path.glob("*.*"))):
        if image_file.suffix.lower() in ['.jpg', '.jpeg', '.png']:
            image_path = str(image_file)
            description = test_descriptions_dict.get(image_file.name)
            if description is None:
                print(f"No description found for {image_file.name}, skipping.")
                continue
            similar_images = find_similar(
                image_path, description, index, encoder, text_model, file_mapping, 
                class_mapping, n_results=10
            )
            print(similar_images)
            if similar_images:
                recs = ",".join(similar_images)
                results.append({
                    'image': image_file.name,  # Используем image_file.name
                    'recs': f'"{recs}"'
                })
                predictions[image_file.name] = similar_images  # Используем image_file.name

    # Сохраняем результаты
    df = pd.DataFrame(results)
    df.to_csv(output_file, index=False)
    print(f"\nSubmission saved to {output_file}")

    # Считаем метрики
    map10, class_map = calculate_map10(predictions, dataset_path, test_class_mapping)

    print(f"\nOverall MAP@10: {map10:.4f}")
    print("\nMAP@10 by class:")
    for class_name, class_score in sorted(class_map.items(), key=lambda x: x[1], reverse=True):
        print(f"{class_name}: {class_score:.4f}")

    return predictions, map10, class_map


In [None]:
def analyze_failure_cases(predictions, dataset_path, test_path, class_map):
    """
    Анализ классов с наименьшим значением MAP@10.
    """
    worst_classes = sorted(class_map.items(), key=lambda x: x[1])[:5]
    print("\nAnalyzing worst performing classes:")
    
    dataset_path = Path(dataset_path)
    test_path = Path(test_path)
    
    # Создаем маппинг классов для тренировочных изображений
    train_class_mapping = {}
    for class_dir in dataset_path.iterdir():
        if class_dir.is_dir():
            class_name = class_dir.name
            for img_path in class_dir.glob("*.*"):
                if img_path.suffix.lower() in ['.jpg', '.jpeg', '.png']:
                    train_class_mapping[img_path.stem] = class_name
    
    # Создаем маппинг классов для тестовых изображений
    test_class_mapping = {}
    for class_dir in test_path.iterdir():
        if class_dir.is_dir():
            class_name = class_dir.name
            for img_path in class_dir.glob("*.*"):
                if img_path.suffix.lower() in ['.jpg', '.jpeg', '.png']:
                    test_class_mapping[img_path.stem] = class_name
    
    for class_name, score in worst_classes:
        print(f"\nClass: {class_name}, MAP@10: {score:.4f}")
        
        # Находим примеры ошибок для этого класса
        for query_image, recs in predictions.items():
            query_class = test_class_mapping.get(query_image)
            if query_class == class_name:
                print(f"\nQuery image: {query_image}")
                print("Top 5 recommendations and their classes:")
                for i, rec in enumerate(recs[:5], 1):
                    rec_class = train_class_mapping.get(rec, "unknown")
                    print(f"{i}. {rec} (class: {rec_class})")
                # Выводим только один пример для каждого класса
                break

In [None]:
test_dir = "/home/moo/Downloads/train_dataset_train_data_rkn/train_data_rkn/validd"
dataset_path = "/home/moo/Downloads/train_dataset_train_data_rkn/train_data_rkn/train"
test_descriptions_file = "./test_descriptions.json"

In [None]:
# Описания для тестовых изображений
process_image_folder(test_dir, processor, caption_model, output_file=test_descriptions_file)

In [None]:
predictions, map10, class_map = evaluate_recommendations(
    test_dir=test_dir,
    index=index,
    encoder=encoder,
    text_model=text_model,
    descriptions_file=test_descriptions_file,
    file_mapping=file_mapping,
    class_mapping=class_mapping,
    dataset_path=dataset_path,
    test_class_mapping=test_class_mapping
)

# Анализ проблемных случаев
analyze_failure_cases(predictions, dataset_path, test_dir, class_map)

# Визуализация распределения MAP@10 по классам
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(12, 6))
sns.histplot(list(class_map.values()), bins=20, kde=False)
plt.title('Distribution of MAP@10 across classes')
plt.xlabel('MAP@10')
plt.ylabel('Number of classes')
plt.show()