
# Демонстрационная версия поиска изображений по запросу

**Задача:**

Для демонстрационной версии нужно обучить модель, которая получит векторное представление изображения, векторное представление текста, а на выходе выдаст число от 0 до 1 — покажет, насколько текст и картинка подходят друг другу.

## Описание данных

В файле `train_dataset.csv` находится информация, необходимая для обучения: имя файла изображения, идентификатор описания и текст описания. Для одной картинки может быть доступно до 5 описаний. Идентификатор описания имеет формат `<имя файла изображения>#<порядковый номер описания>`.

В папке `train_images` содержатся изображения для тренировки модели.

В файле `CrowdAnnotations.tsv` — данные по соответствию изображения и описания, полученные с помощью краудсорсинга. Номера колонок и соответствующий тип данных:

1. Имя файла изображения.
2. Идентификатор описания.
3. Доля людей, подтвердивших, что описание соответствует изображению.
4. Количество человек, подтвердивших, что описание соответствует изображению.
5. Количество человек, подтвердивших, что описание не соответствует изображению.

В файле `ExpertAnnotations.tsv` содержатся данные по соответствию изображения и описания, полученные в результате опроса экспертов. Номера колонок и соответствующий тип данных:

1. Имя файла изображения.
2. Идентификатор описания.

3, 4, 5 — оценки трёх экспертов.

Эксперты ставят оценки по шкале от 1 до 4, где 1 — изображение и запрос совершенно не соответствуют друг другу, 2 — запрос содержит элементы описания изображения, но в целом запрос тексту не соответствует, 3 — запрос и текст соответствуют с точностью до некоторых деталей, 4 — запрос и текст соответствуют полностью.

В файле `test_queries.csv` находится информация, необходимая для тестирования: идентификатор запроса, текст запроса и релевантное изображение. Для одной картинки может быть доступно до 5 описаний. Идентификатор описания имеет формат `<имя файла изображения>#<порядковый номер описания>`.

В папке `test_images` содержатся изображения для тестирования модели.

## Загрузка данных

In [1]:
# База и графики
import os
import re
from PIL import Image
import pandas as pd
import numpy as np
from math import sqrt
import matplotlib.pyplot as plt
from tqdm import notebook, tqdm
import warnings
warnings.filterwarnings('ignore')

# Текст
import spacy
import nltk
# from nltk.corpus import stopwords as nltk_stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk import pos_tag


# Нейросети
import torch
import transformers
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchvision.models import ResNet18_Weights, resnet18
import torchvision.transforms as transforms
from torch.autograd import Variable

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, make_scorer
from sklearn.model_selection import GroupShuffleSplit
from sklearn.model_selection import cross_val_score
from sklearn.dummy import DummyRegressor
from sklearn.linear_model import LinearRegression


In [2]:
# Проверка на доступность GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Используемое устройство: {device}")
print(torch.version.cuda)

Используемое устройство: cuda
11.8


In [3]:
BASE_PATH = './data/'
TRAIN_IMAGES = os.path.join(BASE_PATH, 'train_images')
TEST_IMAGES = os.path.join(BASE_PATH, 'test_images')
RS = 42
LETTERS = r'[^a-zA-Z\s]'
SPACES = r'([ ])\1+'

nltk.download('punkt_tab', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('averaged_perceptron_tagger_eng', quiet=True)

True

In [4]:
files = os.listdir(BASE_PATH)

for file in files:
    print(file)

FileNotFoundError: [WinError 3] Системе не удается найти указанный путь: './data/'

In [None]:
# загрузка информации для обучения:
try:
    train_df = pd.read_csv(os.path.join(BASE_PATH, 'train_dataset.csv'))
    print('Информация для обучения:')
    display(train_df.head(2))
    display(train_df.info())
except Exception as e:
    print('Информация для обучения недоступна:', e)

# загрузка информации для тестирования:
try:
    test_df = pd.read_csv(os.path.join(BASE_PATH, 'test_queries.csv'), sep='|', index_col='Unnamed: 0')
    print('Информация для тестирования:')
    display(test_df.head(2))
    display(test_df.info())
except Exception as e:
    print('Информация недоступна:', e)


# оценка изображений на краудсорсинге
try:
    crowd_annotat = pd.read_csv(os.path.join(BASE_PATH, 'CrowdAnnotations.tsv'), sep='\t',
                                names=['image', 'query_id', 'fraction', 'conf_cnt', 'not_conf_cnt'])
    print('Оценка изображений на краудсорсинге:')
    display(crowd_annotat.head(2))
    display(crowd_annotat.info())
except Exception as e:
    print('Информация недоступна:', e)


# оценка изображений экспертами
try:
    expert_annotat = pd.read_csv(os.path.join(BASE_PATH, 'ExpertAnnotations.tsv'), sep='\t',
                                names=['image', 'query_id', 'exp_1', 'exp_2', 'exp_3'])
    print('Оценка изображений экспертами:')
    display(expert_annotat.head(2))
    display(expert_annotat.info())
except Exception as e:
    print('Информация недоступна:', e)

In [None]:
table = [train_df, test_df, crowd_annotat, expert_annotat]
print('Посмотрим на пропуски:')
for i in table:
    display(i.isnull().mean().sort_values())


print('Посмотрим на дубликаты:')
for i in table:
     display(i.duplicated().sum())

**Выводы:**

1. Пропусков нет.
2. Дубликатов полных нет.
3. Названия колонок соотвествуют стандартам.
4. Типы данных соответствуют данным.
5. Текст на английском.
6. На первый взгляд не видно критичных вопросов.

## Исследовательский анализ данных

Датасет содержит экспертные и краудсорсинговые оценки соответствия текста и изображения.

В файле с экспертными мнениями для каждой пары изображение-текст имеются оценки от трёх специалистов. 
В файле с краудсорсинговыми оценками информация расположена в таком порядке:

1. Доля исполнителей, подтвердивших, что текст **соответствует** картинке.
2. Количество исполнителей, подтвердивших, что текст **соответствует** картинке.
3. Количество исполнителей, подтвердивших, что текст **не соответствует** картинке.

Модель должна возвращать на выходе вероятность соответствия изображения тексту, поэтому целевая переменная должна иметь значения от 0 до 1.


In [None]:
def e_d_a(variable, name, table):
    print(f'Смотрим статистику: {name}')
    # Статистика
    display(table[variable].describe())

    # Гистограмма
    plt.figure(figsize=(15, 6))
    table[variable].hist(bins=30, range=(max(0, table[variable].min()), table[variable].max()))
    plt.title(f'Распределение {name}')
    plt.xlabel(f'{name}')
    plt.ylabel('Частота')
    plt.show()

    # Диаграмма с усами
    plt.figure(figsize=(15, 4))
    table.boxplot(column=variable, vert=False, color='green', widths=0.6)
    plt.title(f'Диаграмма межквартильного размаха для {name}')
    plt.xlabel(f'{name}')
    plt.yticks([])
    plt.grid(True, linestyle='--', alpha=0.2, linewidth=1.5)
    plt.show()


    # Описание границ
    print('\n', f'{name} находится в диапазоне от {table[variable].min():.2f}',
          f'до {table[variable].max():.2f}, посмотрим как распределяются данные, видим на графике выше:'
         )

    # Вывод 75% и 98% квантилей
    print(f'75% объектов находятся в диапазоне до {table[variable].quantile(0.75):.1f}',
          f'\n \n Всего значений отличных от 0: {len(table[table[variable]>0]):.1f}.',
          f'\n 1Q = {table[variable].quantile(0.25):.1f},',
          f'\n 3Q = {table[variable].quantile(0.75):.1f},',
          f'\n Межквартильный размах = {(table[variable].quantile(0.75) - table[variable].quantile(0.25)):.1f},',
         )

def e_d_a_categorical(variable, name, table):
    counts = table[variable].value_counts().sort_values(ascending=False)
    print(f'Смотрим статистику: {name}')
    display(table[variable].describe())
    print('Лидеров и аутсайдеров: ')
    display(counts)
    if len(counts)>20:
        print('Посмотрим на графике ТОП-10: ')
        plt.bar(counts.head(10).index, counts.head(10).values)
        plt.xlabel(name)
        plt.ylabel('Количество')
        plt.title(f'Распределение признака {name}')
        plt.xticks(rotation=45, ha='right')
        plt.ylim(counts.min()*0.9, counts.max()*1.05)
        plt.show()
    else:
        print('Посмотрим на графике: ')
        plt.bar(counts.head(10).index, counts.head(10).values)
        plt.xlabel(name)
        plt.ylabel('Количество')
        plt.title(f'Распределение признака {name}')
        plt.xticks(rotation=45, ha='right')
        plt.ylim(counts.min()*0.9, counts.max()*1.05)
        plt.show()

In [None]:
def type_col(df):
    numeric_columns = df.select_dtypes(include='number').columns
    text_columns = df.select_dtypes(include='object').columns
    boolean_columns = df.select_dtypes(include='bool').columns
    print(f'Числовые признаки: {numeric_columns.tolist()}\n',
          f'Логические признаки: {boolean_columns.tolist()} \n',
          f'Строковые признаки: {text_columns.tolist()} \n',)
    try:
        display(df[numeric_columns.tolist()].describe())
    except:
        print('\n')
    try:
        display(df[boolean_columns.tolist()].describe())
    except:
        print('\n')
    try:
        display(df[text_columns.tolist()].describe())
    except:
        print('\n')



for i in table:
    type_col(i)

In [None]:
train_df.head(2)

In [None]:
num_images = 3

selected_rows = train_df.sample(n=num_images, random_state=RS)

print('Вывод изображений и описаний к ним:')


fig, axes = plt.subplots(num_images, 1, figsize=(12, 4 * num_images))

for i, (index, row) in enumerate(selected_rows.iterrows()):
    image_filename = row['image']
    image_text = train_df[train_df.image == image_filename]['query_text'].tolist()
#     print(image_text)
    image_path = os.path.join(TRAIN_IMAGES, image_filename)

    img = Image.open(image_path)
    axes[i].imshow(img)
    axes[i].axis('off')
    axes[i].set_title('\n'.join(image_text), fontsize=10)
    
plt.tight_layout() 
plt.show()

In [None]:
for i in table:
    numeric_columns = i.select_dtypes(include='number').columns
    text_columns = i.select_dtypes(include='object').columns
    for j in i.columns:
        if j in numeric_columns:
            e_d_a(j, j, i)
        if j in text_columns:
            e_d_a_categorical(j, j, i)

In [None]:
crowd_annotat['total_votes'] = crowd_annotat['conf_cnt'] + crowd_annotat['not_conf_cnt']

plt.figure(figsize=(4, 3))
crowd_annotat['total_votes'].hist()
plt.title("Общее кол-во голосов")
plt.xlabel("Количество голосов")
plt.ylabel("Частота")
plt.show()

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

crowd_annotat['fraction'].hist(ax=axes[0])
axes[0].set_title("Доля подтвердивших корректность описания")

crowd_annotat['conf_cnt'].hist(ax=axes[1])
axes[1].set_title("Количество подтв. корректность")

crowd_annotat['not_conf_cnt'].hist(ax=axes[2])
axes[2].set_title("Кол-во человек указавших некорректность")

plt.show()

**Выводы:**
1. Отзывы экспертов распределяются достаточно равномерно, почти 70% находятся в диапазоне от 1 до 2, по 4х бальной шкале.
2. По оценкам краудсорсинга ситуация обстоит по другому: больше оценок, но при этом больше разброс.
3. Боле 40 тысяч изображений содержит оценку 3 голосов. Большее количество голосов - уже исключение.
4. Как видим большое количество некорректно идентифицированных аннотаций к изображениям - оправдано. Некорректных аннотаций достаточно много. 

In [None]:
def mean_exp(x):
    return np.mean([x['exp_1'], x['exp_2'], x['exp_3']])

expert_annotat['mean_exp'] = expert_annotat.apply(mean_exp, axis=1)

train = train_df.copy()
train_df = train_df.merge(expert_annotat, on=['image', 'query_id'], how='outer')[['image', 'mean_exp', 'query_text', 'query_id']]
# перевод экспертные оценки из шкалы 1-4 в шкалу 0-1.
scaler = MinMaxScaler()
train_df['mean_exp'] = scaler.fit_transform(train_df[['mean_exp']])

In [None]:
train_df.sample(3)

**Важно:**

Опираться будем на объединенный набор данных оценками экспертов и краудсорсинговые оценки, что бы оставить как можно больше данных, пусть и не совсем чистых.

## Проверка данных

Например, если в некоторых странах, где работает компания, действуют ограничения по обработке изображений: поисковым сервисам и сервисам, предоставляющим возможность поиска, запрещено без разрешения родителей или законных представителей предоставлять любую информацию, в том числе, но не исключительно тексты, изображения, видео и аудио, содержащие описание, изображение или запись голоса детей. Ребёнком считается любой человек, не достигший 16 лет.

Поэтому при попытке посмотреть изображения, запрещённые законодательством, вместо картинок показывается дисклеймер:

> This image is unavailable in your country in compliance with local laws
>

У нас нет возможности воспользоваться данным функционалом. Поэтому все изображения, которые нарушают данный закон, нужно удалить из обучающей выборки.

In [None]:

def lemma_clear(text):
    lemm = nlp(text)
    lemm = " ".join([token.lemma_ for token in lemm])


    return " ".join(lemm.split())

Пишем функцию для лемматизации входного текста, что позволяет преобразовать слова в их базовые формы. Это упрощает анализ текста, так как разные формы одного и того же слова будут представлены одинаково. Кроме того, функция очищает результат от лишних пробелов, обеспечивая аккуратный вывод. Что должно повысить точность определения таких изображений.

Собираем список таких слов, для анализа:

In [None]:
nlp = spacy.load("en_core_web_sm")


In [None]:
child_dict = ['child', 'baby', 'boy', 'girl', 'teenager', 'schoolboy', 'youth', 'newborn']


def filter_child_words(text):
    doc = nlp(text)
    filtered_words = []
    
    for token in doc:
        if token.pos_ == 'NOUN' and (token.lemma_ in child_dict):
            filtered_words.append(token.text.lower())
    
    return filtered_words


corpus = train_df['query_text'].apply(filter_child_words)

Применяем фильтр, что бы убрать изображения нарушающие законодательство.

In [None]:
def is_ban_word(text):
    if isinstance(text, list):  
        text = ' '.join(text) 
    for s in child_dict:
        if text.find(s) > -1:
            return False
    return True

In [None]:
train_df['lem_query_text'] = corpus
train_df['is_in_law'] = corpus.apply(is_ban_word)

In [None]:
def spl(text):
    return text[:text.find('#')]

Список изображений нарушающих закон.

In [None]:
child_images = list(train_df[train_df['is_in_law']==False]['query_id'].apply(spl).unique())

In [None]:
print('Смотрим количество изображений нарушающих закон и общую длину df:')
print(len(child_images))
print(len(train_df))

In [None]:
def is_not_forb(text):
    if text in child_images:
        return False
    else:
        return True

train_df_clear = train_df[train_df['is_in_law']]
train_df = train_df_clear[train_df_clear['image'].apply(is_not_forb)]

In [None]:
print('Итого после фильтра осталось записей: ', len(train_df))

**Выводы:**
Были обработаны описания к фотографиям, если встречались слова которые указывали на нарушение закона, такие изображения были исключены из выборки.

## Векторизация текстов

In [None]:
tqdm.pandas()

# инициализация токенизатор, конфигурацию и модель BERT
tokenizer = transformers.BertTokenizer.from_pretrained('bert-base-uncased')
config = transformers.BertConfig.from_pretrained('bert-base-uncased')
model_emb_txt = transformers.BertModel.from_pretrained('bert-base-uncased', config=config).to(device)


tokenized = train_df['query_text'].progress_apply(lambda x:
                                           tokenizer.encode(x, max_length=512,
                                                            truncation=True, add_special_tokens=True))

padded = pad_sequence([torch.as_tensor(seq) for seq in tokenized], batch_first=True)

attention_mask = padded > 0
attention_mask = attention_mask.type(torch.LongTensor).to(device)

In [None]:
len(train_df['query_text'])

In [None]:
padded.shape[0]

In [None]:
batch_size = 100
embeddings = []

for i in tqdm(range(padded.shape[0] // batch_size)):
    batch = torch.LongTensor(padded[batch_size * i:batch_size * (i + 1)]).to(device)
    attention_mask_batch = torch.Tensor(attention_mask[batch_size * i:batch_size * (i + 1)]).to(device)

    with torch.no_grad():
        batch_embeddings = model_emb_txt(batch, attention_mask=attention_mask_batch)

    embeddings.append(batch_embeddings[0][:, 0, :].cpu().numpy())

# Обработка остатка
if padded.shape[0] % batch_size != 0:
    last_batch = torch.LongTensor(padded[(padded.shape[0] // batch_size) * batch_size:]).to(device)
    attention_mask_last_batch = torch.Tensor(attention_mask[(padded.shape[0] // batch_size) * batch_size:]).to(device)

    with torch.no_grad():
        last_batch_embeddings = model_emb_txt(last_batch, attention_mask=attention_mask_last_batch)

    embeddings.append(last_batch_embeddings[0][:, 0, :].cpu().numpy())

text_features = np.concatenate(embeddings)

In [None]:
len(text_features)

In [None]:
# дополнение датасета эмбедингами
train_df['text_embeddings'] = text_features.tolist()

## Векторизация изображений

Перейдём к векторизации изображений.

In [None]:
weights = ResNet18_Weights.DEFAULT
resnet = resnet18(weights=weights).to(device)

for param in resnet.parameters():
    param.requires_grad_(False)

# оставим только свёрточные слои
modules = list(resnet.children())[:-1]
resnet = nn.Sequential(*modules).to(device)

resnet.eval()



preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_images_list = train_df['image'].unique()

In [None]:
vektors = []


for image_name in tqdm(train_images_list):
    image = Image.open(os.path.join(TRAIN_IMAGES, image_name)).convert('RGB')
    image = preprocess(image)
    image = image.unsqueeze(0).to(device) 

    vektor = resnet(image).cpu().flatten().numpy()
    vektors.append(vektor)


In [None]:
images_vektors = np.array(vektors)
data = pd.DataFrame({'image': train_images_list, 'image_vector': images_vektors.tolist()})

train_df = train_df.merge(data, on='image', how='left')

## Объединение векторов

Подготовим данные для обучения: объединим векторы изображений и векторы текстов с целевой переменной.

In [None]:
train_df.head(2)

In [None]:
for i in range(2):
    print(f'''
    Размерность {i+1}-й пары:
    text_embeddings: {len(train_df.iloc[i][-2])}
    image_vector: {len(train_df.iloc[i][-1])}
    ''')

In [None]:
train_df['concatenated_vector'] = train_df.apply(lambda row: np.concatenate([row['text_embeddings'], row['image_vector']], axis=None), axis=1)
concatenated_vector = train_df.iloc[0, -1]

if isinstance(concatenated_vector, (np.ndarray, list)):
    print(f'Размерность объединенного вектора: {len(concatenated_vector)}')
else:
    print('Объединенный вектор не является массивом или списком.')

In [None]:
train_df.head(2)

## Обучение модели предсказания соответствия

Для обучения разделим датасет на тренировочную и тестовую выборки. Простое случайное разбиение не подходит: нужно исключить попадание изображения и в обучающую, и в тестовую выборки.


Для оценки качества буду использовать RMSE потому, что ее значения имеют ту же размерность, что и исходные данные, а это легче интерпретировать и более понятно.

In [None]:
def rmse_score(values, predict):
    return sqrt(mean_squared_error(values, predict))

rmse_scorer = make_scorer(rmse_score)

# функция для вывода результатов
def output_results(trial):
    print('Результаты подборы параметров:')
    print('  RMSE:', round(trial.value, 3))
    print('  Params: ')
    for key, value in trial.params.items():
        print("    {}: {}".format(key, value))


In [None]:
final_df = train_df.copy()
gss = GroupShuffleSplit(n_splits=1, train_size=.8, random_state=RS)
train_indices, test_indices = next(gss.split(X=final_df.drop(columns=['mean_exp']), y=final_df['mean_exp'],
                                             groups=final_df['image']))
train_df, valid_df = final_df.loc[train_indices], final_df.loc[test_indices]

X_train = train_df['concatenated_vector'].apply(pd.Series).values
y_train = train_df['mean_exp']

X_valid = valid_df['concatenated_vector'].apply(pd.Series).values
y_valid = valid_df['mean_exp']

### Baseline model

константная модель, которая предсказывает среднее значение

In [None]:
dummy_model = DummyRegressor(strategy="mean")

dummy_model.fit(X_train, y_train)

dummy_predictions = dummy_model.predict(X_valid)

rmse = rmse_score(y_valid, dummy_predictions)
print(f'RMSE(Dummy Model): {rmse}')

### LinearRegression

**Регрессия** со скалированием

In [None]:
final_df_sc = train_df.copy()
final_df_sc = final_df_sc.reset_index(drop=True)

scaler = MinMaxScaler()
final_df_sc['mean_exp'] = scaler.fit_transform(final_df_sc[['mean_exp']])


gss = GroupShuffleSplit(n_splits=1, train_size=.8, random_state=RS)
train_indices_sc, test_indices_sc = next(gss.split(X=final_df_sc.drop(columns=['mean_exp']), y=final_df_sc['mean_exp'],
                                                   groups=final_df_sc['image']))




train_df_sc, valid_df_sc = final_df_sc.loc[train_indices_sc], final_df_sc.loc[test_indices_sc]

X_train_sc = train_df_sc['concatenated_vector'].apply(pd.Series).values
y_train_sc = train_df_sc['mean_exp']

X_valid_sc = valid_df_sc['concatenated_vector'].apply(pd.Series).values
y_valid_sc = valid_df_sc['mean_exp']

In [None]:
model = LinearRegression()
model.fit(X_train_sc, y_train_sc)

# прогнозирование на тестовых данных
y_pred_sc = model.predict(X_valid_sc)

# оценка модели
print(f'RMSE: {rmse_score(y_valid_sc, y_pred_sc)}')

**Регрессия** без скаллера

In [None]:
final_df = train_df.copy()
final_df = final_df.reset_index(drop=True)
gss = GroupShuffleSplit(n_splits=1, train_size=.8, random_state=RS)
train_indices, test_indices = next(gss.split(X=final_df.drop(columns=['mean_exp']), y=final_df['mean_exp'],
                                             groups=final_df['image']))
train_df, valid_df = final_df.loc[train_indices], final_df.loc[test_indices]

X_train = train_df['concatenated_vector'].apply(pd.Series).values
y_train = train_df['mean_exp']

X_valid = valid_df['concatenated_vector'].apply(pd.Series).values
y_valid = valid_df['mean_exp']

In [None]:
model = LinearRegression()
model.fit(X_train, y_train)

# прогнозирование на тестовых данных
y_pred = model.predict(X_valid)

# оценка модели
print(f'RMSE: {rmse_score(y_valid, y_pred)}')

### Нейросеть

In [None]:

hidden_size_1 = 4096
hidden_size_2 = 2048
hidden_size_3 = 1024
hidden_size_4 = 512
output_size = 1

activation_1 = nn.ReLU()
activation_2 = nn.Tanh()
activation_3 = nn.ReLU()
activation_4 = nn.LeakyReLU()

drop_1 = 0.1
drop_2 = 0.0
drop_3 = 0.0
drop_4 = 0.0

learning_rate = 0.0001  
n_epochs = 1000 
batch_size = 128  

In [None]:
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_valid_tensor = torch.tensor(X_valid, dtype=torch.float32)
y_valid_np = np.array(y_valid, dtype=np.float32)
y_valid_tensor = torch.tensor(y_valid_np, dtype=torch.float32)
input_size = X_train_tensor.shape[1]

In [None]:
class Batch(Dataset):
    def __init__(self, data, labels):
        self.labels = labels
        self.data = data

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        label = self.labels[idx]
        data = self.data[idx]
        sample = {'data': data, 'mean_exp': label}
        return sample
    
dataset_train = Batch(X_train_tensor, y_train_tensor)
dataset_test = Batch(X_valid_tensor, y_valid_tensor)

class Baseline(nn.Module):
    def __init__(self, input_size, hidden_size_1, hidden_size_2, hidden_size_3, hidden_size_4, output_size,
                 drop_1, drop_2, drop_3, drop_4,
                 activation_1, activation_2, activation_3, activation_4):
        super(Baseline, self).__init__()

        self.fc1 = nn.Linear(input_size, hidden_size_1)
        self.act1 = activation_1
        self.drop1 = nn.Dropout(drop_1)

        self.fc2 = nn.Linear(hidden_size_1, hidden_size_2)
        self.act2 = activation_2
        self.drop2 = nn.Dropout(drop_2)

        self.fc3 = nn.Linear(hidden_size_2, hidden_size_3)
        self.act3 = activation_3
        self.drop3 = nn.Dropout(drop_3)

        self.fc4 = nn.Linear(hidden_size_3, hidden_size_4)
        self.act4 = activation_4
        self.drop4 = nn.Dropout(drop_4)

        self.fc5 = nn.Linear(hidden_size_4, output_size)

        self.init_weights()

    def forward(self, x):
        x = self.drop1(self.act1(self.fc1(x)))
        x = self.drop2(self.act2(self.fc2(x)))
        x = self.drop3(self.act3(self.fc3(x)))
        x = self.drop4(self.act4(self.fc4(x)))
        x = self.fc5(x)
        return x

    def init_weights(m):
        if isinstance(m, nn.Linear):
            torch.nn.init.kaiming_normal_(m.weight)
            m.bias.data.fill_(0.01)
            
class CustomEarlyStopping():
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience #сколько эпох ждать
        self.min_delta = min_delta #разница функций потерь для активации
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif self.best_loss - val_loss > self.min_delta:  
            self.best_loss = val_loss
            self.counter = 0
        elif self.best_loss - val_loss < self.min_delta: 
            self.counter += 1
            if self.counter >= self.patience:
                print(f'INFO: Ранняя остановка. Счетчик: {self.counter}/{self.patience}')
                self.early_stop = True 

In [None]:
def train(model, train_dataloader, test_dataloader, optimizer, loss_fn, n_epochs, patience, min_delta):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    result = {
        'rmse_train': [],
        'rmse_test': [],
        'best_epoch': None,
        'stopping_epoch': None,
        'best_model': None
    }

    early_stopping = CustomEarlyStopping(patience, min_delta)
    best_model_state = None  

    for epoch in range(n_epochs):
        # === Тренировка ===
        model.train()
        train_loss = 0.0

        for batch in train_dataloader:
            data = batch['data'].to(device)
            label_train = batch['mean_exp'].to(device)

            optimizer.zero_grad()
            predictions = model(data).flatten()
            loss_value = loss_fn(predictions, label_train)
            train_loss += loss_value.item()

            loss_value.backward()
            optimizer.step()

        rmse_train = sqrt(train_loss / len(train_dataloader))
        result['rmse_train'].append(rmse_train)

        # === Тестирование ===
        model.eval()
        test_loss = 0.0

        with torch.no_grad():
            for batch in test_dataloader:
                data = batch['data'].to(device)
                label_test = batch['mean_exp'].to(device)

                predictions = model(data).flatten()
                loss_value = loss_fn(predictions, label_test)
                test_loss += loss_value.item()

        rmse_test = sqrt(test_loss / len(test_dataloader))
        result['rmse_test'].append(rmse_test)

        # === Логирование каждые 10 эпох ===
        if epoch % 10 == 0 or epoch == n_epochs - 1:
            print(f"Epoch: {epoch} RMSE_train: {rmse_train:.4f} RMSE_test: {rmse_test:.4f}")

        # === Ранняя остановка ===
        early_stopping(rmse_test)
        if early_stopping.counter == 1: 
            best_model_state = model.state_dict()

        if early_stopping.early_stop:
            result['best_epoch'] = epoch - patience
            result['stopping_epoch'] = epoch
            print(f"Early stopping at epoch {epoch}. Best RMSE_test: {min(result['rmse_test']):.4f}")
            break

    # Сохраняем лучшую модель
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    result['best_model'] = model

    if best_model_state is not None:
        model.load_state_dict(best_model_state)  # Перезагружаем лучшее состояние
    result['best_model'] = model

    # Обновляем лучший регрессор
    best_regressor = model

    return result, best_regressor

In [None]:
model = Baseline(input_size, hidden_size_1, hidden_size_2, hidden_size_3, hidden_size_4, output_size,
                 drop_1, drop_2, drop_3, drop_4,
                 activation_1, activation_2, activation_3, activation_4)

loss = nn.MSELoss()

optimizer = optim.Adam(model.parameters(), lr=learning_rate)

train_dataloader = DataLoader(dataset_train, batch_size=batch_size, shuffle=True,
                              num_workers=0)
test_dataloader = DataLoader(dataset_test, batch_size=batch_size, num_workers=0)

In [None]:
result, best_regressor = train(model=model,
               train_dataloader=train_dataloader,
               test_dataloader=test_dataloader,
               optimizer=optimizer,
               loss_fn=loss,
               n_epochs=n_epochs,
               patience=20,
               min_delta=0)

In [None]:
plt.figure(figsize=(8, 4))
plt.plot(result['rmse_train'], color='green', label='Train RMSE')
plt.plot(result['rmse_test'], color='blue', label='Test RMSE')


plt.axvline(x=result['best_epoch'], color='y', linestyle='--', label='Best epoch')


plt.axvline(x=result['stopping_epoch'], color='red', linestyle='--', label='Stopping fit')

plt.title('Визуализация процесса обучения:')
plt.xlabel('Epochs')
plt.ylabel('RMSE')
plt.legend(loc='lower left')
plt.show()

## Тестирование модели

In [None]:
test_images_list = test_df['image'].unique()

test_vektors = []

for image_name in tqdm(test_images_list):
    image = Image.open(os.path.join(TEST_IMAGES, image_name)).convert('RGB')
    image = preprocess(image)
    image = Variable(image.unsqueeze(0)).to(device)

    vektor = resnet(image).cpu().flatten().numpy() # преобразование в одномерный массив
    test_vektors.append(vektor)

In [None]:
images_test_vektors = np.array(test_vektors)
test = pd.DataFrame({'image': test_images_list, 'image_vector': images_test_vektors.tolist()})

In [None]:
 # Замена нескольких пробелов одним и нежелательные символы
def clean_text(txt):
    txt = txt.lower()
    txt = re.sub(LETTERS, ' ', txt)  
    txt = re.sub(SPACES, r'\1', txt) 
    return txt

def stopwords_tokenize(x):
    tokens = word_tokenize(x)  # Токенизация текста
    tokenization = [word for word in tokens if word not in english_stopwords]
    return ' '.join(tokenization)

def clean_tokenize_text(text):
    text = clean_text(text)
    text = stopwords_tokenize(text)
    return text.split()

# Генерация текстового вектора
def get_text_embedding(input_text):
    tokenized = tokenizer.encode(input_text, max_length=512, truncation=True, add_special_tokens=True)
    padded = pad_sequence([torch.as_tensor(tokenized)], batch_first=True)
    attention_mask = padded > 0
    attention_mask = attention_mask.type(torch.LongTensor).to(device)

    with torch.no_grad():
        text_embedding = model_emb_txt(padded.to(device), attention_mask=attention_mask)[0][:, 0, :].cpu().numpy()
    return text_embedding[0].tolist()

# Функция тестирования
def imag_test(query_text):

    text_embedding = get_text_embedding(query_text)

    test['vector'] = test['image_vector'].apply(lambda x: text_embedding + x)
    test['vector'] = test['vector'].apply(lambda x: np.array(x, dtype=np.float32))

    test_vectors = np.stack(test['vector'].to_numpy())
    X_test_tensor = torch.tensor(test_vectors, dtype=torch.float32).to(device)

    # Предсказания модели
    with torch.no_grad():
        test['pred'] = best_regressor(X_test_tensor).detach().cpu().numpy()

    max_score = test['pred'].max()
    path = test[test['pred'] == max_score]['image'].values[0]
    return max_score, path

# Функция вывода изображения
def display_image_with_caption(query_text):
    text = clean_tokenize_text(query_text)

    # Проверка на запрещённые слова
    if any(i in text for i in child_dict):
        print(query_text)
        print('Изображение не доступно в данном регионе')
    else:
        max_score, image_path = imag_test(query_text)
        fig, ax = plt.subplots(figsize=(6, 6))
        image_path = os.path.join(TEST_IMAGES, image_path)
        img = Image.open(image_path)
        ax.imshow(img)
        ax.set_title(query_text, fontsize=12)
        ax.axis('off')
        plt.show()
        print(f'Мера соответствия изображения составляет: {max_score}')


In [None]:
# from nltk.corpus import stopwords
# nltk.download('stopwords')

english_stopwords = stopwords.words('english')

display_image_with_caption('Women and coctail')

In [None]:
display_image_with_caption('Dog and cat')

In [None]:
display_image_with_caption('a child on a swing')

## Выводы

Код работает, детей фильтрует, а вот качество страдает. Нужно подобрать модель которая лучше будет работать с изображениями.