In [62]:
import zipfile
import os
import torch
from torch import nn, optim
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image

### Домашнее задание 1, часть 2

Классификация изображений (пролив/не пролив)

In [28]:
if 'dataset' not in os.listdir():
    with zipfile.ZipFile('dataset.zip') as zipfile:
        zipfile.extractall()

#### Загрузка данных для обучения

In [29]:
class ImageDataset(Dataset):
    def __init__(self, img_dir, transform=None, target_size=(28, 28)):
        self.img_dir = img_dir
        self.transform = transform
        self.target_size = target_size
        self.image_files = [f for f in os.listdir(img_dir) if f.endswith(('jpg',))]
    
    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.image_files[idx])
        image = Image.open(img_path)
        image = transforms.functional.resize(image, self.target_size)
        
        if self.transform:
            image = self.transform(image)
            
        return image

In [30]:
tensor_transform = transforms.ToTensor()
train_dataset = ImageDataset(
    img_dir='dataset/train',
    transform=tensor_transform,
    target_size=(28, 28)
)

test_positive_dataset = ImageDataset(
    img_dir='dataset/proliv',
    transform=tensor_transform,
    target_size=(28, 28)
)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
test_positive_loader = torch.utils.data.DataLoader(test_positive_dataset, batch_size=32, shuffle=False)

#### Класс модели

In [31]:
class AutoEncoder(nn.Module):
    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28 * 28 * 3, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 36),
            nn.ReLU(),
            nn.Linear(36, 18),
            nn.ReLU(),
            nn.Linear(18, 9)
        )
        self.decoder = nn.Sequential(
            nn.Linear(9, 18),
            nn.ReLU(),
            nn.Linear(18, 36),
            nn.ReLU(),
            nn.Linear(36, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 28 * 28 * 3),
            nn.Sigmoid()
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded


#### Обучение

In [40]:
def train_autoencoder(autoencoder, train_loader, num_epochs):
    optimizer = torch.optim.Adam(autoencoder.parameters())
    criterion = nn.MSELoss()
    
    for epoch in range(num_epochs):
        for data in train_loader:
            data = data.view(data.size(0), -1)
            
            outputs = autoencoder(data)
            loss = criterion(outputs, data)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

In [None]:
autoencoder = AutoEncoder()
train_autoencoder(autoencoder, train_loader, num_epochs=20)

Epoch [1/10], Loss: 0.0017
Epoch [2/10], Loss: 0.0011
Epoch [3/10], Loss: 0.0009
Epoch [4/10], Loss: 0.0012
Epoch [5/10], Loss: 0.0009
Epoch [6/10], Loss: 0.0009
Epoch [7/10], Loss: 0.0008
Epoch [8/10], Loss: 0.0010
Epoch [9/10], Loss: 0.0011
Epoch [10/10], Loss: 0.0012


#### Оценка на изображениях с проливами

In [58]:
def evaluate_threshold(autoencoder, train_loader, test_loader):

    autoencoder.eval()
    criterion = torch.nn.MSELoss(reduction='none')
    
    train_losses = []
    with torch.no_grad():
        for data in train_loader:
            data = data.view(data.size(0), -1)
            outputs = autoencoder(data)
            loss = criterion(outputs, data)
            train_losses.extend(loss.mean(dim=1).tolist())

    test_losses = []
    with torch.no_grad():
        for data in test_loader:
            data = data.view(data.size(0), -1)
            outputs = autoencoder(data)
            loss = criterion(outputs, data)
            test_losses.extend(loss.mean(dim=1).tolist())
    
    threshold = np.mean(train_losses) + 3 * np.std(train_losses)

    true_positives = sum(l > threshold for l in test_losses)
    false_negatives = len(test_losses) - true_positives
    false_positives = sum(l > threshold for l in train_losses)
    
    print(f"Suggested Threshold: {threshold:.4f}")
    print(f'False negatives: {false_negatives}/{len(test_losses)}')
    print(f"Positive samples detected as anomalies: {true_positives}/{len(test_losses)}")
    print(f"Negative samples detected as anomalies: {false_positives}/{len(train_losses)}")
    
    return threshold, train_losses, test_losses

In [59]:
threshold, train_losses, test_losses = evaluate_threshold(
    autoencoder, 
    train_loader,
    test_positive_loader
)

Suggested Threshold: 0.0030
False negatives: 0/154
Positive samples detected as anomalies: 154/154
Negative samples detected as anomalies: 108/10000


#### Оценка на тестовом датасете с отметками

In [60]:
class LabeledImageDataset(Dataset):
    def __init__(self, img_dir, label_dict, transform=None, target_size=(28, 28)):
        self.img_dir = img_dir
        self.transform = transform
        self.target_size = target_size
        self.label_dict = label_dict
        self.image_files = [f for f in os.listdir(img_dir) if f.endswith(('jpg', 'png')) and f in label_dict]
    
    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.image_files[idx])
        image = Image.open(img_path)
        image = transforms.functional.resize(image, self.target_size)
        
        if self.transform:
            image = self.transform(image)
            
        label = self.label_dict[self.image_files[idx]]
        return image, label

In [61]:
def load_labels(label_file):
    labels = {}
    with open(label_file, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) == 2:
                filename, label = parts
                labels[filename] = int(label)
    return labels

In [65]:
def evaluate_model(autoencoder, test_img_dir, label_file, threshold):

    autoencoder.eval()
    transform = transforms.ToTensor()

    label_dict = load_labels(label_file)
    
    test_dataset = LabeledImageDataset(
        img_dir=test_img_dir,
        label_dict=label_dict,
        transform=transform,
        target_size=(28, 28)
    )
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    criterion = torch.nn.MSELoss(reduction='none')
    
    true_positives = 0
    true_negatives = 0
    false_positives = 0
    false_negatives = 0
    
    with torch.no_grad():
        for data, labels in test_loader:
            data = data.view(data.size(0), -1)
            outputs = autoencoder(data)
            loss = criterion(outputs, data).mean(dim=1)
            
            predictions = (loss > threshold).cpu().numpy()
            
            for pred, label in zip(predictions, labels):
                if label == 1:
                    if pred:
                        true_positives += 1
                    else:
                        false_negatives += 1
                else:
                    if not pred:
                        true_negatives += 1
                    else:
                        false_positives += 1
    
    total_positives = true_positives + false_negatives
    total_negatives = true_negatives + false_positives
    
    tpr = true_positives / total_positives if total_positives > 0 else 0
    tnr = true_negatives / total_negatives if total_negatives > 0 else 0
    
    print(f"Threshold: {threshold:.4f}")
    print(f"True Positives (TP): {true_positives}")
    print(f"False Negatives (FN): {false_negatives}")
    print(f"True Negatives (TN): {true_negatives}")
    print(f"False Positives (FP): {false_positives}")
    print(f"True Positive Rate: {tpr:.4f}")
    print(f"True Negative Rate: {tnr:.4f}")
    
    return {
        'TP': true_positives,
        'FN': false_negatives,
        'TN': true_negatives,
        'FP': false_positives,
        'TPR': tpr,
        'TNR': tnr,
    }

In [66]:
results = evaluate_model(autoencoder, "dataset/test/imgs", "dataset/test/test_annotation.txt", threshold)

Threshold: 0.0030
True Positives (TP): 100
False Negatives (FN): 29
True Negatives (TN): 3067
False Positives (FP): 598
True Positive Rate: 0.7752
True Negative Rate: 0.8368
