# Visualise dataset




In [None]:
import sys

sys.path.append("/workspaces/Face-Embeddings")
import random
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from torch import Tensor

from src.data.dataset import TripletDataset
from src.data.transform import get_train_transforms


def tensor2array(img: Tensor):
    img = img.numpy().transpose(1, 2, 0)
    img = (img - img.min()) / (img.max() - img.min())
    img = (img * 255).astype(np.uint8)
    return img


def visualize_triplet(anchor, positive, negative, figsize=(15, 5)):
    """
    Visualize a triplet (anchor, positive, negative) in a grid.

    Args:
        anchor: PIL Image of the anchor
        positive: PIL Image of the positive sample
        negative: PIL Image of the negative sample
        figsize: Size of the figure (width, height)
    """
    fig, axes = plt.subplots(1, 3, figsize=figsize)

    # Display images
    axes[0].imshow(anchor)
    axes[0].set_title("Anchor")
    axes[0].axis("off")

    axes[1].imshow(positive)
    axes[1].set_title("Positive (Same Identity)")
    axes[1].axis("off")

    axes[2].imshow(negative)
    axes[2].set_title("Negative (Different Identity)")
    axes[2].axis("off")

    plt.tight_layout()
    return fig

In [None]:
dataset = TripletDataset(
    data_dir=Path("../data/processed/train"),
    transforms=get_train_transforms(224, 224, use_face_swap=False),
    triplet_cache_size=10000,  # Опционально
)

In [None]:
n = random.randint(0, 100)
anchor = Image.fromarray(tensor2array(dataset[n]["anchor"]))
positive = Image.fromarray(tensor2array(dataset[n]["positive"]))
negative = Image.fromarray(tensor2array(dataset[n]["negative"]))
# Visualize the triplet
fig = visualize_triplet(anchor, positive, negative)

# Split


In [None]:
import json
import random
import shutil
from pathlib import Path

from tqdm import tqdm

# Загрузка метаданных
with open(Path("data/original/train/meta.json"), "r") as f:
    meta = json.load(f)

# Создание директорий для разделенных данных
base_dir = Path("data/processed")
for split in ["train", "val", "test"]:
    (base_dir / split / "images").mkdir(parents=True, exist_ok=True)

# Получение всех уникальных идентификаторов лиц
face_ids = set()
for path in meta.keys():
    face_id = path.split("/")[0]
    face_ids.add(face_id)

# Преобразование в список для перемешивания
face_ids = list(face_ids)
random.shuffle(face_ids)

# Разделение на train (70%), val (15%), test (15%)
total = len(face_ids)
train_ids = set(face_ids[: int(0.7 * total)])
val_ids = set(face_ids[int(0.7 * total) : int(0.85 * total)])
test_ids = set(face_ids[int(0.85 * total) :])

# Создание отдельных meta.json для каждого сплита
train_meta = {}
val_meta = {}
test_meta = {}

# Копирование файлов в соответствующие директории
src_dir = Path("data/original/train")

for path, is_fake in tqdm(meta.items()):
    face_id = path.split("/")[0]

    # Определение целевого сплита и соответствующего meta-словаря
    if face_id in train_ids:
        split = "train"
        split_meta = train_meta
    elif face_id in val_ids:
        split = "val"
        split_meta = val_meta
    else:
        split = "test"
        split_meta = test_meta

    # Добавление записи в соответствующий meta.json
    split_meta[path] = is_fake

    # Создание директории для лица, если её нет
    face_dir = base_dir / split / "images" / face_id
    face_dir.mkdir(exist_ok=True)

    # Исходный и целевой пути
    src_path = src_dir / "images" / path
    dst_path = base_dir / split / "images" / path

    # Копирование файла
    shutil.copy(src_path, dst_path)

# Сохранение meta.json для каждого сплита
for split, split_meta in [
    ("train", train_meta),
    ("val", val_meta),
    ("test", test_meta),
]:
    with open(base_dir / split / "meta.json", "w") as f:
        json.dump(split_meta, f, indent=4)

print(
    f"Разделение завершено. Обучающая выборка: {len(train_ids)} лиц, "
    f"Валидационная выборка: {len(val_ids)} лиц, "
    f"Тестовая выборка: {len(test_ids)} лиц"
)

In [None]:
import json
from collections import Counter
from pathlib import Path

import pandas as pd
import seaborn as sns

# Путь к обработанным данным
processed_dir = Path("data/processed")


# Функция для анализа сплита
def analyze_split(split_name):
    split_dir = processed_dir / split_name
    meta_path = split_dir / "meta.json"

    # Загрузка метаданных
    with open(meta_path, "r") as f:
        meta = json.load(f)

    # Подсчет количества изображений
    total_images = len(meta)

    # Подсчет количества реальных и поддельных изображений
    fake_count = sum(1 for label in meta.values() if label == 1)
    real_count = total_images - fake_count

    # Подсчет количества уникальных лиц
    face_ids = set(path.split("/")[0] for path in meta.keys())
    total_faces = len(face_ids)

    # Подсчет изображений для каждого лица
    face_image_counts = Counter(path.split("/")[0] for path in meta.keys())

    # Подсчет реальных и поддельных изображений для каждого лица
    face_label_counts = {}
    for path, is_fake in meta.items():
        face_id = path.split("/")[0]
        if face_id not in face_label_counts:
            face_label_counts[face_id] = {"real": 0, "fake": 0}

        if is_fake == 1:
            face_label_counts[face_id]["fake"] += 1
        else:
            face_label_counts[face_id]["real"] += 1

    return {
        "split": split_name,
        "total_images": total_images,
        "real_images": real_count,
        "fake_images": fake_count,
        "total_faces": total_faces,
        "face_image_counts": face_image_counts,
        "face_label_counts": face_label_counts,
    }


# Анализ всех сплитов
splits = ["train", "val", "test"]
stats = {}

for split in splits:
    stats[split] = analyze_split(split)

# Вывод основной статистики
summary = []
for split in splits:
    s = stats[split]
    summary.append(
        {
            "Split": split,
            "Total Images": s["total_images"],
            "Real Images": s["real_images"],
            "Fake Images": s["fake_images"],
            "Real %": round(s["real_images"] / s["total_images"] * 100, 2),
            "Fake %": round(s["fake_images"] / s["total_images"] * 100, 2),
            "Total Faces": s["total_faces"],
            "Avg Images per Face": round(s["total_images"] / s["total_faces"], 2),
        }
    )

# Создание DataFrame для удобного отображения
summary_df = pd.DataFrame(summary)
print("Общая статистика по выборкам:")
print(summary_df)

# Визуализация распределения классов
plt.figure(figsize=(12, 6))
sns.barplot(
    x="Split",
    y="Total Images",
    hue="Class",
    data=pd.DataFrame(
        [
            {"Split": s["Split"], "Total Images": s["Real Images"], "Class": "Real"}
            for s in summary
        ]
        + [
            {"Split": s["Split"], "Total Images": s["Fake Images"], "Class": "Fake"}
            for s in summary
        ]
    ),
)
plt.title("Распределение классов по выборкам")
plt.savefig(processed_dir / "class_distribution.png")

# Анализ распределения количества изображений на лицо
plt.figure(figsize=(15, 5))
for i, split in enumerate(splits):
    plt.subplot(1, 3, i + 1)
    counts = list(stats[split]["face_image_counts"].values())
    sns.histplot(counts, kde=True)
    plt.title(f"{split.capitalize()}: Изображений на лицо")
    plt.xlabel("Количество изображений")
    plt.ylabel("Количество лиц")
plt.tight_layout()
plt.savefig(processed_dir / "images_per_face.png")

print("\nСтатистика сохранена в виде графиков в директории processed")

# Проверка физического наличия файлов
for split in splits:
    split_dir = processed_dir / split
    meta_path = split_dir / "meta.json"

    with open(meta_path, "r") as f:
        meta = json.load(f)

    missing_files = []
    for path in meta.keys():
        file_path = split_dir / "images" / path
        if not file_path.exists():
            missing_files.append(str(file_path))

    if missing_files:
        print(f"\nВ выборке {split} отсутствуют следующие файлы:")
        for path in missing_files[:10]:  # Показываем только первые 10
            print(f"  - {path}")
        if len(missing_files) > 10:
            print(f"  ... и еще {len(missing_files) - 10} файлов")
    else:
        print(f"\nВсе файлы в выборке {split} присутствуют физически")