# Классификация спутниковых снимков

Свойство | Значение
-|-
Источник данных | [Kaggle (planets-dataset)](https://www.kaggle.com/datasets/nikitarom/planets-dataset/data)
Характер данных | Спутниковые снимки Амазонки
Задача | Мульти-классовая классификация
Инструменты | Python, Pandas, PyTorch, Scikit-learn, ResNet50

## Содержание

- [Подготовка окружения](#подготовка-окружения)
- [EDA](#eda)
    - [Наборы данных](#наборы-данных)
    - [Анализ](#анализ)
- [Обучение](#обучение)
- [Тестирование](#тестирование)

---

## Подготовка окружения

Установка библиотек:

In [None]:
!pip install pandas numpy opencv-python matplotlib pathlib torch torchvision torchsummary scikit-learn

Импорт библиотек:

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from typing import Tuple, List, Union
from tqdm.notebook import tqdm
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import fbeta_score
from torchvision import transforms as T
from torchvision import models
from torchsummary import summary
from torch.utils.data import DataLoader, Dataset
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
import torch
import torch.nn as nn
import cv2 as cv
import os
import random

Фиксация случайных чисел:

In [None]:
SEED = 12345

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

Определение текущего устройства и количества доступных ядер:

In [None]:
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Current device: {DEVICE}')

---

## EDA

Объявление пути до папки с набором данных:

In [None]:
PATH = Path('../dataset')

Выведение содержимого папки с набором данных:

In [None]:
for root, dirs, files in os.walk(str(PATH)):
    level = root.replace(str(PATH), '').count(os.sep)
    indent = ' ' * 4 * level
    subindent = ' ' * 4 * (level + 1)

    print(f'{indent}{os.path.basename(root)}/')

    counter = 0
    counter_max = 3
    counter_stop = False

    if len(files) > 0:
        print(f'{indent}[{len(files)} files]')

    for file in files:
        if file.lower().endswith('.jpg'):
            if counter < counter_max:
                print(f'{subindent}{file}')
                counter += 1
            elif not counter_stop:
                print(f'{subindent}...')
                counter_stop = True
        else:
            print(f'{subindent}{file}')

Объявление путей до наборов данных:

In [None]:
PATH_TRAIN = PATH / 'planet' / 'planet' / 'train-jpg'
PATH_TEST = PATH / 'planet' / 'planet' / 'test-jpg'
PATH_TEST_ADDITIONAL = PATH / 'test-jpg-additional' / 'test-jpg-additional'

### Наборы данных

Объявление функции, выводящей информацию о данных:

In [None]:
def print_data_info(data: pd.DataFrame):    
    display(data.head())

    print(f'Data shape: {data.shape}\n')

    for column in data.columns:
        print(column)
        indent = ' ' * 4

        print(f'{indent}Uniques: {len(data[column].unique())}')
        print(f'{indent}Nans: {data[column].isna().sum()}')

        if column == 'image_name':
            print(f"{indent}Variety of names: {data[column].str.split('_').apply(lambda x: x[0]).unique()}")

Загрузка тестового наборов данных:

In [None]:
data_test_full = pd.read_csv(PATH / 'planet' / 'planet' / 'sample_submission.csv')
print_data_info(data_test_full)

>**NB-1**
>
>Столбец `image_name` содержит два вида названий файлов: `test_*` и `file_*`. Поскольку в директории `test-jpg-additional` располагаются файлы с названиями вида `file_*`, необходимо проверить, соответствует ли: 
>* Количество строк вида `test_*` количеству файлов в директории `test-jpg`
>* Количество строк в `file_*` количеству файлов в директории `test-jpg-additional`
>
>**NB-2**
>
>Столбец `tags` содержит разделённые пробелами значения классов. Поскольку каждое изображение может одновременно относиться к нескольким категориям, необходимо преобразовать строковые значения к списку отдельных меток.

Сравнение количества строк с количеством файлов:

In [None]:
assert data_test_full[data_test_full.image_name.str.contains('test')].shape[0] == len(os.listdir(PATH_TEST))
assert data_test_full[data_test_full.image_name.str.contains('file')].shape[0] == len(os.listdir(PATH_TEST_ADDITIONAL))

>* Количество строк вида `test_*` соответствует количеству файлов в директории `test-jpg`
>* Количество строк в `file_*` соответствует количеству файлов в директории `test-jpg-additional`

Преобразование строковых значений к списку меток:

In [None]:
data_test_full['tags'] = data_test_full.tags.str.split()
data_test_full.head()

Разделение тестового набора данных по названиям файлов в разных папках:

In [None]:
data_test = data_test_full[data_test_full.image_name.str.contains('test')]
data_test_additional = data_test_full[data_test_full.image_name.str.contains('file')]

Загрузка обучающего набора данных:

In [None]:
data_train = pd.read_csv(PATH / 'planet' / 'planet' / 'train_classes.csv')
print_data_info(data_train)

Преобразование строковых значений к списку меток:

In [None]:
data_train['tags'] = data_train.tags.str.split()
data_train.head()

### Анализ

Построение графика распределения классов по обучающему набору данных:

In [None]:
tags_in_data_train_classes = (
    pd.Series([
        tag 
        for row in data_train.tags.values 
        for tag in row
    ])
    .value_counts()
    .reset_index()
    .sort_values(by='count', ascending=True)
)

tags_in_data_train_classes.columns = ['tags', 'count']

print(f'Total tags: {tags_in_data_train_classes.shape[0]}')

tags_in_data_train_classes.plot(
    x='tags',
    y='count',
    kind='barh',
    figsize=(7, 5),
    title='Tags distribution\n',
    xlabel='count',
    ylabel='tags',
    grid=True,
    legend=False
)
plt.show()

>**NB**
>
>Набор данных несбалансирован по классам.

Объявление списков классов и редких классов:

In [None]:
TAGS = tags_in_data_train_classes.tags.values
TAGS_RARE = tags_in_data_train_classes.query('count < 2000').tags.values

TAGS_RARE

Проверка наличия в тестовом наборе данных значений классов, которые отсутствуют в обучающем наборе данных:

In [None]:
for tag in data_test.tags.values[0]:
    if tag not in tags_in_data_train_classes.tags.values:
        print(f'Tag {tag} does not exist in train tags')

>Во всех наборах данных представлены одни и те же классы.

Объявление функции, выводящий образцы изображений по каждому из классов:

In [None]:
def display_images(labels: pd.Series, data: pd.DataFrame, path: Path, cols: int = 5, height: float = 0.7):
    images = []

    for tag in labels.values:
        data_with_tag = data[data.tags.apply(lambda x: tag in x)]

        if data_with_tag.shape[0] > 0:
            images.append((tag, data_with_tag))

    n_tags = len(images)

    rows = (n_tags + cols - 1) // cols

    fig, axes = plt.subplots(rows, cols, figsize=(10, n_tags * height))
    axes = axes.flatten()

    plt.suptitle('Images with tags\n', fontsize=14)

    for index, (tag, data_with_tag) in enumerate(images):        
        index_random = np.random.randint(data_with_tag.shape[0])
        image_file_name = data_with_tag.image_name.values[index_random]

        image = cv.imread(str(path / f'{image_file_name}.jpg'))
        image = cv.cvtColor(image, cv.COLOR_BGR2RGB)

        image_tags = data_with_tag.tags.values[index_random]
        
        axes[index].text(
            x=0.5, 
            y=1.15 + 0.09 * len(image_tags), 
            s=tag, 
            fontweight='bold',
            fontsize=10,
            ha='center', 
            transform=axes[index].transAxes
        )

        axes[index].text(
            x=0.5, 
            y=1.15, 
            s='\n'.join(image_tags),
            fontsize=8,
            ha='center', 
            transform=axes[index].transAxes
        )

        axes[index].text(
            x=0.5, 
            y=1.05, 
            s=image.shape,
            fontsize=8,
            ha='center', 
            transform=axes[index].transAxes
        )

        axes[index].imshow(image)
        axes[index].axis('off')

    for i in range(n_tags, len(axes)):
        axes[i].axis('off')

    plt.show()

Выведение изображений по каждому классу в тестовом наборе данных:

In [None]:
display_images(
    labels=tags_in_data_train_classes.tags, 
    data=data_test[
        data_test.image_name.str.contains('test')
    ], 
    path=PATH_TEST,
    height=0.8
)

Выведение изображений по каждому классу в обучающем наборе данных:

In [None]:
display_images(
    labels=tags_in_data_train_classes.tags, 
    data=data_train, 
    path=PATH_TRAIN
)

## Обучение

Разделение тренировочного набора данных на обучающую и валидационную выборки:

In [None]:
X_train, X_valid = train_test_split(data_train, test_size=0.2)

X_train = X_train.reset_index(drop=True)
X_valid = X_valid.reset_index(drop=True)

print(f'X train shape: {X_train.shape}')
print(f'X valid shape: {X_valid.shape}')

Объявление значений на основе датасета Imagenet:

In [None]:
IMAGENET = {
    'size': 224,
    'mean': [0.485, 0.456, 0.406],
    'std': [0.229, 0.224, 0.225]
}

Объявление класса, формирующего кастомный датасет:

In [None]:
class PlanetDataset(Dataset):

    def __init__(self, data: pd.DataFrame, labels: List[List[int]], path: Path, transform: T.Compose = None):
        super().__init__()
        self.data = data
        self.labels = labels
        self.path = path
        self.transform = transform
    

    def __len__(self):
        return len(self.data)


    def __getitem__(self, index: int) -> Tuple[np.ndarray, str]:
        path_image = f'{self.data.image_name[index]}.jpg'
        label = torch.tensor(self.labels[index]).float()

        image = cv.imread(str(self.path / path_image))
        image = cv.cvtColor(image, cv.COLOR_BGR2RGB)

        if self.transform:
            image = self.transform(image)
        
        return image, label
    

    def _untransform(self, image: Union[np.ndarray | torch.Tensor]):
        image_untensored = image.clone().permute(1, 2, 0).numpy()  # CHW -> HWC
        image_unnormalized = image_untensored * IMAGENET.get('std') + IMAGENET.get('mean')
        return np.clip(image_unnormalized, 0, 1)
    

    def display_image(self, index: int):
        image, label = self[index]
        label = f'{self.data.tags[index]}\n{label}'

        if self.transform:
            image = self._untransform(image)

        plt.figure(figsize=(4, 4))
        plt.title(f'{label}\n')
        plt.imshow(image)
        plt.axis('off')
        plt.show()


Объявление сущностей для обучения модели:

In [None]:
encoder = MultiLabelBinarizer()
ohe_tags_train = encoder.fit_transform(X_train.tags.values)
ohe_tags_valid = encoder.transform(X_valid.tags.values)

transform_train = T.Compose([
    T.ToPILImage(),
    T.Resize(IMAGENET.get('size')),
    T.RandomHorizontalFlip(),
    T.RandomVerticalFlip(),
    T.RandomRotation(degrees=45),
    T.ColorJitter(
        brightness=0.2, 
        contrast=0.2, 
        saturation=0.2, 
        hue=0.05
    ),
    T.ToTensor(),
    T.Normalize(
        mean=IMAGENET.get('mean'),
        std=IMAGENET.get('std')
    )
])

transform_valid = T.Compose([
    T.ToPILImage(),
    T.Resize(IMAGENET.get('size')),
    T.ToTensor(),
    T.Normalize(
        mean=IMAGENET.get('mean'),
        std=IMAGENET.get('std')
    )
])

dataset_train = PlanetDataset(
    data=X_train, 
    labels=ohe_tags_train, 
    path=PATH_TRAIN, 
    transform=transform_train
)

dataset_valid = PlanetDataset(
    data=X_valid, 
    labels=ohe_tags_valid, 
    path=PATH_TRAIN, 
    transform=transform_valid
)

BATCH_SIZE = 64

dataloader_train = DataLoader(
    dataset_train,
    batch_size=BATCH_SIZE,
    shuffle=True
)

dataloader_valid = DataLoader(
    dataset_valid,
    batch_size=BATCH_SIZE,
    shuffle=False
)

Выведение одного изображения из обучающего датасета:

In [None]:
dataset_train.display_image(1)

Компонент | Реализация | Описание
-|-|-
Тип задачи | | Мульти-классовая классификация
Модель | `torchvision.models.resnet50` | - Предобученная нейросеть<br>- С замороженными параметрами<br>- С переопределённым классификатором
Метрика | $ F_{\beta} = \frac{(1 + \beta^2)  \cdot precision \cdot recall}{(\beta^2 \cdot precision) + recall} $<br><br>`sklearn.metrics.fbeta_score` | Гармоническое среднее (с акцентом на Recall) между:<br>- Precision (точность) - доля правильно предсказанных меток $ \frac{TP}{TP + FP} $<br>- Recall (полнота) - доля предсказанных меток, которые действительно правильные $ \frac{TP}{TP + FN} $
Оптимизатор | `torch.optim.Adam` | - Адаптирует learning rate индивидуально для каждого параметра<br>- Используется только для обучаемых слоёв
Функция потерь | `torch.nn.BCEWithLogitsLoss` | Численно стабильна. Объединяет:<br>- `torch.nn.Sigmoid` - преобразовывает выходные значения в диапазон `[0, 1]`<br>- `torch.nn.BCELoss` - измеряет разницу между истинными вероятностями и предсказанными 
Планировщик learning rate | `torch.optim.lr_scheduler.StepLR` | Стабилизация обучения, не позволяющая модели переучиваться

Объявление модели:

In [None]:
model = models.resnet50(pretrained=True)

for param in model.parameters():
    param.requires_grad = False

model.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
model.fc = nn.Sequential(
    nn.Flatten(),
    nn.Linear(2048, 512),
    nn.ReLU(inplace=True),
    nn.Dropout(0.6),
    nn.Linear(512, len(TAGS))
)
model.to(DEVICE)

for param in model.fc.parameters():
    param.requires_grad = True

LEARNING_RATE = 1e-4

optimizer = Adam(
    params=filter(lambda x: x.requires_grad, model.parameters()), 
    lr=LEARNING_RATE
)

lr_scheduler = StepLR(optimizer, step_size=3, gamma=0.1)

tags_count = ohe_tags_train.sum(axis=0)
pos_weight = torch.tensor(
    data=(len(ohe_tags_train) - tags_count) / (tags_count + 1e-6),
    dtype=torch.float32
).to(DEVICE)

loss_fn = nn.BCEWithLogitsLoss()

Выведение сводки слоёв модели:

In [None]:
summary(model, input_size=(3, IMAGENET.get('size'), IMAGENET.get('size')), device=DEVICE)

Объявление функции, вычисляющей метрику:

In [None]:
def f2_score(y_true: np.array, y_pred: np.array, threshold: float):
    y_pred_binary = (y_pred > threshold).astype(int)
    return fbeta_score(y_true, y_pred_binary, beta=2, average='samples')

Обучение модели:

In [None]:
loss_train, loss_valid = [], []
score_train, score_valid = [], []

loss_best, score_best = np.inf, 0.0
epoch_best, threshold_best = 0, 0.0
pred_best, y_best = None, None
model_best = None

EPOCHS = 3
THRESHOLDS = np.linspace(0.1, 0.5, 9)

for epoch in tqdm(range(EPOCHS), desc='Training'):
    model.train()

    loss_train_epoch, loss_valid_epoch = [], []
    pred_train_epoch, pred_valid_epoch = [], []
    y_train_epoch, y_valid_epoch = [], []

    for X, y in tqdm(dataloader_train, leave=False, desc=f'Epoch {epoch} TRAIN'):
        X, y = X.to(DEVICE), y.to(DEVICE)

        optimizer.zero_grad()
        pred_logits = model(X)
        loss = loss_fn(pred_logits, y.float())
        loss.backward()
        optimizer.step()

        with torch.no_grad():
            pred_probs = torch.sigmoid(pred_logits)
            loss_train_epoch.append(loss.item())
            pred_train_epoch.extend(pred_probs.cpu().numpy())
            y_train_epoch.extend(y.cpu().numpy())
    
    model.eval()
    
    with torch.no_grad():
        for X, y in tqdm(dataloader_valid, leave=False, desc=f'Epoch {epoch} VALID'):
            X, y = X.to(DEVICE), y.to(DEVICE)

            pred_logits = model(X)
            loss = loss_fn(pred_logits, y.float())

            pred_probs = torch.sigmoid(pred_logits)
            loss_valid_epoch.append(loss.item())
            pred_valid_epoch.extend(pred_probs.cpu().numpy())
            y_valid_epoch.extend(y.cpu().numpy())
    
    loss_train_avg = np.mean(loss_train_epoch)
    loss_valid_avg = np.mean(loss_valid_epoch)

    score_valid_epoch_thresholds = [
        f2_score(np.array(y_valid_epoch), np.array(pred_valid_epoch), threshold)
        for threshold in THRESHOLDS
    ]

    threshold_best_index = int(np.argmax(score_valid_epoch_thresholds))
    best_threshold = THRESHOLDS[threshold_best_index]
    score_valid_epoch = score_valid_epoch_thresholds[threshold_best_index]
    score_train_epoch = f2_score(np.array(y_train_epoch), np.array(pred_train_epoch), best_threshold)

    loss_train.append(loss_train_avg)
    loss_valid.append(loss_valid_avg)
    score_train.append(score_train_epoch)
    score_valid.append(score_valid_epoch)

    if score_valid_epoch > score_best:
        score_best = score_valid_epoch
        loss_best = loss_valid_epoch
        pred_best = np.array(pred_valid_epoch)
        y_best = np.array(y_valid_epoch)
        epoch_best = epoch
        model_best = model

    print(
        f'Epoch: {epoch}/{EPOCHS} | '\
        f'train loss: {round(loss_train_avg, 3)} | '\
        f'valid loss: {round(loss_valid_avg, 3)} | ' \
        f'train F2: {round(score_train_epoch, 3)} | ' \
        f'valid F2: {round(score_valid_epoch, 3)}'
    )

    lr_scheduler.step()

result = {
    'loss_train': loss_train,
    'loss_valid': loss_valid,
    'score_train': score_train,
    'score_valid': score_valid,
    'pred_best': pred_best,
    'y_best': y_best,
    'threshold_best': threshold_best,
    'epoch_best': epoch_best,
    'model_best': model_best
}

## Тестирование

Объявление сущностей для тестирования модели:

In [None]:
ohe_tags_test = np.zeros((data_test.shape[0], 17))

dataset_test = PlanetDataset(
    data=data_test,
    labels=ohe_tags_test,
    path=PATH_TEST,
    transform=transform_valid
)

dataloader_test = DataLoader(
    dataset=dataset_test,
    batch_size=BATCH_SIZE,
    shuffle=False
)

Тестирование модели:

In [None]:
image_test, pred_test, y_test = [], [], []

model.eval()
with torch.no_grad():
    for X, y in tqdm(dataloader_test, leave=False, desc=f'Testing'):
        X, y = X.to(DEVICE), y.to(DEVICE)

        pred_logits = model(X)
        pred_probs = torch.sigmoid(pred_logits)
        
        image_test.extend(X.cpu())
        pred_test.extend(pred_probs.cpu().numpy())
        y_test.extend(y.cpu().numpy())

Выведение изображений с предсказанными значениями:

In [None]:
fig, axes = plt.subplots(1, 5, figsize=(10, 4))
axes = axes.flatten()

plt.suptitle('Test results\n', fontsize=14)

for index in range(5):
    image = image_test[index].cpu().permute(1, 2, 0).numpy()
    image = image * IMAGENET.get('std') + IMAGENET.get('mean')
    image = np.clip(image, 0, 1)

    pred_labels = (pred_test[index] > 0.2).astype(int)
    pred_tags = [TAGS[i] for i, v in enumerate(pred_labels) if v == 1]

    axes[index].text(
        x=0.5, 
        y=1.08, 
        s='\n'.join(pred_tags),
        fontsize=8,
        ha='center', 
        transform=axes[index].transAxes
    )

    axes[index].imshow(image)
    axes[index].axis('off')

plt.show()

<div style="text-align: center; font-size: 20px; padding: 15px 0;">
    <a href="#классификация-спутниковых-снимков" style="text-decoration: none; color: #296eaa; border: 2px dashed #296eaa; opacity: 0.8; border-radius: 3px; padding: 10px 80px;">
        Наверх ↑
    </a>
</div>