### Это эксперимент по обучению модели на определение угла наклона. Синтетически поворачиваем фотографию и обучаем предсказывать угол наклона.

## Что-то выучил, но в целом, не очень хорошо. Возможно, нужно пересмотреть аугментацию, взять чистые данные, что-то сделать с моделью.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision.models import mobilenet_v2
from torchvision import transforms
from torchvision.transforms import InterpolationMode
import cv2
import random
import numpy as np
import matplotlib.pyplot as plt
import glob

# Set device
device = torch.device(
    "mps"
    if torch.backends.mps.is_available()
    else "cuda" if torch.cuda.is_available() else "cpu"
)

In [None]:
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF
import random
import cv2
from torch.utils.data import Dataset, DataLoader
import torch
import glob
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim


class RotationDataset(Dataset):
    def __init__(
        self, image_paths, epoch_size=512, target_size=(224, 224), is_val=False
    ):
        self.all_image_paths = image_paths
        self.epoch_size = epoch_size
        self.image_paths = []
        self.target_size = target_size
        self.is_val = is_val
        self.shuffle_and_select()

        if self.is_val:
            self.transform = transforms.Compose(
                [
                    transforms.Resize(self.target_size),
                    transforms.ToTensor(),
                    transforms.Normalize(
                        mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
                    ),
                ]
            )
        else:
            self.transform = transforms.Compose(
                [
                    transforms.ToTensor(),
                    transforms.Normalize(
                        mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
                    ),
                ]
            )

    def __len__(self):
        return len(self.image_paths)

    def shuffle_and_select(self):
        if len(self.all_image_paths) > self.epoch_size:
            self.image_paths = random.sample(self.all_image_paths, self.epoch_size)
        else:
            self.image_paths = self.all_image_paths

    def __getitem__(self, index):
        img_path = self.image_paths[index]
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if not self.is_val:
            angle_max = 60
            angle = random.uniform(-angle_max, angle_max)
            img_pil = TF.to_pil_image(img)
            img_rotated = TF.rotate(img_pil, angle, expand=True)

            # Calculate larger crop size dynamically based on rotation
            original_size = img_rotated.size
            scale_factor = 1.3  # Adjust this factor based on how much larger you want the crop to be
            larger_crop_size = (
                int(self.target_size[0] * scale_factor),
                int(self.target_size[1] * scale_factor),
            )
            img_cropped = TF.center_crop(img_rotated, larger_crop_size)

            # Resize to the target size
            img_resized = TF.resize(img_cropped, self.target_size)
        else:
            img_resized = TF.to_pil_image(img)
            angle = 0

        img_transformed = self.transform(img_resized)

        return img_transformed, torch.tensor(angle, dtype=torch.float32), img_path


# Load image paths
image_paths = glob.glob("../data/real_estate_images/*.jpg")

# Create dataset and dataloader
dataset = RotationDataset(image_paths)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Define the model
model = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)
model.classifier[1] = nn.Linear(model.last_channel, 1)
model.to(device)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
def imshow(ax, inp, title=None):
    """Imshow for Tensor on a given axes."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean  # denormalize
    inp = np.clip(inp, 0, 1)
    ax.imshow(inp)
    if title is not None:
        ax.set_title(title)
    ax.axis("off")


dataset = RotationDataset(image_paths, is_val=False)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# Get a batch of training data
images, angles, paths = next(iter(dataloader))

# Plot images in a 3x3 grid
fig, axes = plt.subplots(3, 3, figsize=(9, 9))
for i, ax in enumerate(axes.flatten()):
    if i < len(images):  # Check to avoid IndexError if fewer images than subplots
        img = images[i]
        angle = angles[i]
        imshow(ax, img, title=f"Angle: {angle:.2f}")
    else:
        ax.axis("off")  # Turn off axis for empty subplots
plt.tight_layout()
plt.show()

## train model

In [None]:
num_epochs = 50
dataset = RotationDataset(epoch_size=4 * 1024, image_paths=image_paths)
for epoch in range(num_epochs):
    dataset.shuffle_and_select()  # Shuffle and select new images for this epoch
    dataloader = DataLoader(dataset, batch_size=32, shuffle=False)

    model.train()
    running_loss = 0.0
    for images, angles, paths in dataloader:
        images = images.to(device)
        angles = angles.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs.squeeze(), angles)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # в отдельной валидации потребности нет

    epoch_loss = running_loss / len(dataloader)
    mae = np.sqrt(epoch_loss)  # Calculate MAE as the square root of MSE

    print(
        f"Epoch [{epoch+1}/{num_epochs}] Loss: {epoch_loss:.4f}, MAE: {mae:.2f} degrees"
    )

## Detect defects in real images

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from torchvision.transforms import ToPILImage

dataset = RotationDataset(image_paths, epoch_size=5000, is_val=True)
dataset.shuffle_and_select()
# dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
model.eval()


def denormalize(img_tensor):
    mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
    std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
    return img_tensor * std + mean


def visualize_predictions(dataset, model, num_images=25, threshold=20):
    fig, axes = plt.subplots(5, 5, figsize=(15, 15))
    axes = axes.flatten()

    count = 0
    for i in range(len(dataset)):
        img, _, _ = dataset[i]
        img_tensor = img.unsqueeze(0).to(device)

        with torch.no_grad():
            predicted_angle = model(img_tensor).item()

        if abs(predicted_angle) > threshold:
            img = denormalize(img)
            img = img.permute(1, 2, 0).numpy()
            img = (img * 255).astype(np.uint8)
            img_height, img_width, _ = img.shape
            img = Image.fromarray(img)

            axes[count].imshow(img)
            axes[count].set_title(f"Predicted: {predicted_angle:.2f}°")
            axes[count].axis("off")

            # Add a red thin line across the entire image
            angle_rad = np.deg2rad(predicted_angle)
            line_start = (0, int(img_height / 2 - img_width / 2 * np.tan(angle_rad)))
            line_end = (
                img_width,
                int(img_height / 2 + img_width / 2 * np.tan(angle_rad)),
            )
            axes[count].plot(
                [line_start[0], line_end[0]],
                [line_start[1], line_end[1]],
                "r-",
                linewidth=1,
            )

            # Add a perpendicular line to form a red cross
            perp_angle_rad = angle_rad + np.pi / 2
            center_x, center_y = img_width / 2, img_height / 2
            line_length = min(img_width, img_height) / 2
            perp_line_start_x = center_x - line_length * np.cos(perp_angle_rad)
            perp_line_start_y = center_y - line_length * np.sin(perp_angle_rad)
            perp_line_end_x = center_x + line_length * np.cos(perp_angle_rad)
            perp_line_end_y = center_y + line_length * np.sin(perp_angle_rad)

            axes[count].plot(
                [perp_line_start_x, perp_line_end_x],
                [perp_line_start_y, perp_line_end_y],
                "r-",
                linewidth=1,
            )

            count += 1
            # print(count)

            if count == num_images:
                break

    plt.tight_layout()
    plt.show()


visualize_predictions(dataset, model, threshold=10)

## simple benchmark

Но у нас модели будут оцениваться в скрипте через докер и прочее, чтобы быть ближе к реальным условиям.

In [None]:
# import time
# import torch
# from torch.utils.data import DataLoader

# dataset = RotationDataset(image_paths, epoch_size=10000, is_val=False)


# def benchmark_model(model, dataset, num_batches=10, batch_size=32):
#     dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
#     model.eval()
#     device = next(model.parameters()).device  # Get the device model is on

#     times = []

#     with torch.no_grad():
#         for i, (images, labels, paths) in enumerate(dataloader):
#             if i >= num_batches:
#                 break
#             start_time = time.time()
#             images = images.to(device)
#             outputs = model(images)
#             predictions = torch.sigmoid(outputs)
#             end_time = time.time()

#             times.append(end_time - start_time)

#     # Calculate the average time per batch and images per second
#     avg_time_per_batch = sum(times) / len(times)
#     images_per_second = batch_size / avg_time_per_batch

#     print(f"Average time per batch: {avg_time_per_batch:.3f} seconds")
#     print(f"Images processed per second: {images_per_second:.2f}")

#     return images_per_second


# # Example usage
# images_per_second = benchmark_model(model, dataset, num_batches=10, batch_size=32)

In [None]:
# model_save_path = "mobilenet_v2_tilting.pth"

# torch.save(model.state_dict(), model_save_path)

In [None]:
# model_loaded = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)
# model_loaded.classifier[1] = nn.Linear(model_loaded.last_channel, 1)
# model_loaded.to(device)

# model_weights_path = 'mobilenet_v2_tilting.pth'
# model_loaded.load_state_dict(torch.load(model_weights_path))
# model_loaded.eval()  # Set the model to evaluation mode