In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import models, transforms
from torch.utils.data import Dataset
import torch.nn.functional as F
from PIL import Image
import os

center_size = 300
batch_size = 32

class CustomDataset(Dataset):
    def __init__(self, image_folder, label_folder, transform):
        self.image_folder = image_folder
        self.label_folder = label_folder
        self.transform = transform

        self.image_files = sorted(os.listdir(image_folder))
        self.label_files = sorted(os.listdir(label_folder))

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_folder, self.image_files[idx])
        label_name = os.path.join(self.label_folder, self.label_files[idx])

        img = Image.open(img_name)

        if self.transform:
            img = self.transform(img)

        with open(label_name, 'r') as label_file:
            label = label_file.read()
            label = label.split()
            x = float(label[0])
            y = float(label[1])

        return img, torch.tensor([x, y], dtype=torch.float32)

class PatchEmbedding(nn.Module):
    def __init__(self, img_size:int, patch_size:int, in_chans:int=3, emb_dim:int=48):
        super(PatchEmbedding, self).__init__()
        self.img_size = img_size
        self.patch_size = patch_size

        self.proj = nn.Conv2d(
            in_chans,
            emb_dim,
            kernel_size=patch_size,
            stride=patch_size
        )

    def forward(self, x):
        with torch.no_grad():
            x = self.proj(x)
            x = x.flatten(2)
            x = x.transpose(1, 2)
        return x

class MultiHeadSelfAttention(nn.Module):
    def __init__(self, dim:int, n_heads:int=8, qkv_bias:bool=True, attn_p:float=0.01, proj_p:float=0.01):
        super(MultiHeadSelfAttention, self).__init__()
        self.n_heads = n_heads
        self.dim = dim
        self.head_dim = dim // n_heads
        self.scale = self.head_dim ** -0.5

        self.qkv = nn.Linear(dim, dim*3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_p)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_p)

    def forward(self, x):
        batch_size, n_tokens, x_dim = x.shape

        if x_dim != self.dim:
            raise ValueError
        if self.dim != self.head_dim*self.n_heads:
            raise ValueError

        qkv = self.qkv(x)
        qkv = qkv.reshape(batch_size, n_tokens, 3, self.n_heads, self.head_dim)
        qkv = qkv.permute(2, 0, 3, 1, 3)
        q, k, v = qkv[0], qkv[1], qkv[2]

        k_t = k.transpose(-2, -1)
        dot_product = (q @ k_t) * self.scale

        dot_product = dot_product[:, :, :, :center_size]
        print(f"dot_product shape : {dot_product.shape}")
        attn = dot_product.softmax(dim=-1)
        attn = self.attn_drop(attn)
        weighted_avg = attn @ v
        weighted_avg = weighted_avg.transpose(1, 2)

        weighted_avg = weighted_avg.flatten(2)
        x = self.proj(weighted_avg)
        x = self.proj_drop(x)

        return x

class MultiHeadSelfAttentionLayer(nn.Module):
    def __init__(self, dim:int, n_heads:int=8, qkv_bias:bool=True, attn_p:float=0.01, proj_p:float=0.01):
        super(MultiHeadSelfAttentionLayer, self).__init__()
        self.n_heads = n_heads
        self.dim = dim
        self.head_dim = dim // n_heads
        self.scale = self.head_dim ** -0.5

        self.qkv = nn.Linear(dim, dim*3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_p)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_p)

    def forward(self, x, mask=None):
        batch_size, n_tokens, x_dim = x.shape

        if x_dim != self.dim:
            raise ValueError
        if self.dim != self.head_dim * self.n_heads:
            raise ValueError

        qkv = self.qkv(x)
        qkv = qkv.reshape(batch_size, n_tokens, 3, self.n_heads, self.head_dim)
        print(f"batch_size : {batch_size}")
        print(f"n_tokens : {n_tokens}")
        print(f"n_heads : {self.n_heads}")
        print(f"head_dim : {self.head_dim}")
        qkv = qkv.permute(2, 0, 3, 1, 4)
        print(f"qkv shape : {qkv.shape}")
        q, k, v = qkv[0], qkv[1], qkv[2]

        print(f"q shape: {q.shape}")
        print(f"k shape: {k.shape}")
        print(f"v shape: {v.shape}")

        k_t = k.transpose(-2, -1)
        dot_product = (q @ k_t) * self.scale

        if mask is not None:
            mask = mask.unsqueeze(0).unsqueeze(0).expand(batch_size, self.n_heads, -1, -1)
            print(f"after mask : {mask.shape}")

            dot_product = dot_product[:, :, :18, :18]  
            attn = dot_product.softmax(dim=-1)
            attn = attn.masked_fill(mask, float('-inf'))
        else:
            attn = dot_product.softmax(dim=-1)
            print(f"dot_product shape : {dot_product.shape}")

        attn = self.attn_drop(attn)
        weighted_avg = attn @ v
        weighted_avg = weighted_avg.transpose(1, 2)

        weighted_avg = weighted_avg.flatten(2)
        x = self.proj(weighted_avg)
        x = self.proj_drop(x)

        return x, attn

class PositionwiseFeedforwardLayer(nn.Module):
    def __init__(self, hidden_dim, pf_dim, dropout_ratio):
        super(PositionwiseFeedforwardLayer, self).__init__()

        self.fc1 = nn.Linear(hidden_dim, pf_dim)
        self.fc2 = nn.Linear(pf_dim, hidden_dim)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, x):
        x = self.dropout(self.gelu(self.fc1(x)))
        x = self.fc2(x)
        return x

class EncoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio):
        super(EncoderLayer, self).__init__()

        self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        self.self_attention = MultiHeadSelfAttentionLayer(hidden_dim, n_heads, dropout_ratio)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, src, src_mask):
        _src, _ = self.self_attention(src, mask=src_mask)  
        src = self.self_attn_layer_norm(src + self.dropout(_src))
        _src = self.positionwise_feedforward(src)
        src = self.ff_layer_norm(src + self.dropout(_src))
        return src

class DecoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio):
        super(DecoderLayer, self).__init__()

        self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.enc_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        self.self_attention = MultiHeadSelfAttentionLayer(hidden_dim, n_heads, dropout_ratio)
        self.encoder_attention = MultiHeadSelfAttentionLayer(hidden_dim, n_heads, dropout_ratio)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, trg, enc_src, trg_mask, mask):  
        _trg, _ = self.self_attention(trg, mask=trg_mask) 
        trg = self.self_attn_layer_norm(trg + self.dropout(_trg))
        _trg, attention = self.encoder_attention(trg, enc_src, enc_src, mask=mask)  
        trg = self.enc_attn_layer_norm(trg + self.dropout(_trg))
        _trg = self.positionwise_feedforward(trg)
        trg = self.ff_layer_norm(trg + self.dropout(_trg))
        return trg, attention

class TransformerEncoder(nn.Module):
    def __init__(self, input_dim:int, mlp_hidden_dim:int, num_head:int=8, dropout:float=0.):
        super(TransformerEncoder, self).__init__()
        self.norm1 = nn.LayerNorm(input_dim)
        self.msa = MultiHeadSelfAttentionLayer(input_dim, n_heads=num_head)
        self.norm2 = nn.LayerNorm(input_dim)
        self.mlp = nn.Sequential(
            nn.Linear(input_dim, mlp_hidden_dim),
            nn.GELU(),
            nn.Linear(mlp_hidden_dim, input_dim),
            nn.GELU(),
        )
        self.encoder_layer = EncoderLayer(input_dim, num_head, mlp_hidden_dim, dropout)

    def forward(self, x):
        out, _ = self.msa(self.norm1(x))
        out = self.mlp(self.norm2(out)) + out
        out = self.encoder_layer(out, src_mask=None)
        return out

class TransformerDecoder(nn.Module):
    def __init__(self, input_dim:int, mlp_hidden_dim:int, num_head:int=8, dropout:float=0.):
        super(TransformerDecoder, self).__init__()
        self.norm1 = nn.LayerNorm(input_dim)
        self.norm2 = nn.LayerNorm(input_dim)
        self.norm3 = nn.LayerNorm(input_dim)

        self.self_attention = MultiHeadSelfAttentionLayer(input_dim, n_heads=num_head)
        self.encoder_attention = MultiHeadSelfAttentionLayer(input_dim, n_heads=num_head)

        self.mlp = nn.Sequential(
            nn.Linear(input_dim, mlp_hidden_dim),
            nn.GELU(),
            nn.Linear(mlp_hidden_dim, input_dim),
            nn.GELU(),
        )
        self.decoder_layer = DecoderLayer(input_dim, num_head, mlp_hidden_dim, dropout)

    def forward(self, x, enc_src, trg_mask, mask):
        _trg, attn_dec = self.self_attention(x, mask=trg_mask)
        trg = self.norm1(x + _trg)

        _trg, attn_enc = self.encoder_attention(trg, enc_src, enc_src, mask=mask)
        trg = self.norm2(trg + _trg)

        _trg = self.mlp(trg)
        trg = self.norm3(trg + _trg)

        return trg, attn_dec, attn_enc

class TransformerModel(nn.Module):
    def __init__(self, img_size, patch_size, emb_dim, mlp_hidden_dim, num_heads, dropout):
        super(TransformerModel, self).__init__()
        self.patch_embedding = PatchEmbedding(img_size, patch_size, 3, emb_dim)
        self.transformer_decoder = TransformerDecoder(emb_dim, mlp_hidden_dim, num_heads, dropout)
        self.n_heads = num_heads  

    def forward(self, x):
        enc_out = self.patch_embedding(x)

        max_len = center_size // 16
        trg_mask = torch.ones(max_len, max_len).to(x.device).triu(1).bool()
        mask = trg_mask.unsqueeze(0).expand(x.shape[0], -1, -1).contiguous()

        out = self.transformer_decoder(enc_out, enc_out, trg_mask, mask)
        return out

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((center_size, center_size)),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) 
])

image_path = "/content/drive/MyDrive/images"
label_path = "/content/drive/MyDrive/labels"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

train_dataset = CustomDataset(image_folder=image_path, label_folder=label_path, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

model = TransformerModel(img_size=center_size, patch_size=16, emb_dim=48, mlp_hidden_dim=100, num_heads=8, dropout=0.1)
model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 학습 루프
for epoch in range(100):
    total_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)

        predicted_x, predicted_y = outputs[:, 0], outputs[:, 1]
        true_x, true_y = labels[:, 0], labels[:, 1]

        loss_x = criterion(predicted_x, true_x)
        loss_y = criterion(predicted_y, true_y)
        loss = loss_x + loss_y

        total_loss += loss.item()

        loss.backward()
        optimizer.step()

    average_loss = total_loss / len(train_loader)
    print(f'Epoch [{epoch+1}/{100}], Loss: {average_loss}')
    print(f'Predicted X: {predicted_x[0].item()}, Predicted Y: {predicted_y[0].item()}')
    print(f'True X: {true_x[0].item()}, True Y: {true_y[0].item()}')

torch.save(model.state_dict(), "./drive/MyDrive/model_transformer_ver2.pth")
print("학습 완료")

테스트(영상)

In [None]:
import cv2

center_size = 300
batch_size = 32

model = TransformerModel(img_size=center_size, patch_size=16, emb_dim=48, mlp_hidden_dim=100, num_heads=8, dropout=0.1)
model.load_state_dict(torch.load("model_transformer.pth", map_location = torch.device('cpu')))
model.eval()

readvideo = cv2.VideoCapture("center_screen_recording_alone.avi")
cv2.namedWindow('win')

while True:
    ret, frame = readvideo.read()

    if not ret:
        break

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((center_size, center_size)),
    ])

    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    input_image = transform(pil_image).unsqueeze(0)
    
    with torch.no_grad():
        input_image = input_image.to(device)
        outputs = model(input_image)
        print(outputs)
        outputs = outputs.squeeze().tolist()

    cv2.circle(frame, (round(outputs[0] * 300), round(outputs[1] * 300)), 5, (0, 0, 255), 2)
    cv2.imshow('win', frame)

    key = cv2.waitKey(1)
    if key == ord('q'):
        break

# 리소스 해제
readvideo.release()
cv2.destroyAllWindows()