In [39]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from PIL import Image
import os
import numpy as np
import time
import cv2
import json
from datetime import datetime
from torch.utils.data import  DataLoader
from torch.utils.data import TensorDataset
from torch.utils.data import random_split

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [40]:
def image_to_patches(image, patch_size):
    # 将图像转换为PyTorch张量
    image = np.array(image)
    image = torch.tensor(image).permute(2, 0, 1)  # 转换为 (C, H, W)
    # 获取图像尺寸
    C, H, W = image.shape
    # 计算水平和垂直方向上的分块数量
    num_patches_horizontal = W // patch_size
    num_patches_vertical = H // patch_size
    # 分割图像
    patches = image.unfold(1, patch_size, patch_size).unfold(2, patch_size, patch_size)
    patches = patches.contiguous().view(C, -1, patch_size, patch_size)
    patches = patches.permute(1, 0, 2, 3)  # 转换为 (N, C, patch_size, patch_size)
    return patches

def flatten_patches(patches):
    # 展平每个块
    flat_patches = patches.reshape(patches.size(0), -1)
    return flat_patches

def calculate_number_of_patches(image_size, patch_size):
    # 图像尺寸：(高度, 宽度)
    height, width = image_size[0], image_size[1]
    # 计算水平和垂直方向上的分块数量
    num_patches_horizontal = width // patch_size
    num_patches_vertical = height // patch_size
    # 总的块数
    total_patches = num_patches_horizontal * num_patches_vertical
    return total_patches



patch_size = 32
image_count = 0
data_x, data_y = [], []

image_path = "../Attachment/Attachment 1/"
for path in os.listdir(image_path):
    image_count += 1
    image = Image.open(image_path + path)
    image = list(np.array(image))
    #patches = image_to_patches(image, patch_size)
    #flat_patches = flatten_patches(patches) # 展平分割后的块
    #print(np.array(image).shape) # (185, 270, 3)
    #print(patches.shape) # torch.Size([40, 3, 32, 32])
    #print(flat_patches.shape) # torch.Size([40, 3072])
    data_x.append(image)

annonation_path = "../Attachment/Attachment 1-Annotation/"
for path in os.listdir(annonation_path):
    try:
        num_of_apples = 0
        f = open(annonation_path + path)
        data = json.load(f)
        for i in data['shapes']:
            num_of_apples += 1
        data_y.append(num_of_apples)
    except:
        data_y.append(0)

data_x = np.array(data_x)
data_y = np.array(data_y)
print(data_x.shape)
print(data_y.shape)

# 计算块的数量
#number_of_patches = flat_patches.shape[0] * image_count
#print(number_of_patches)

(200, 185, 270, 3)
(200,)


In [41]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super(MultiHeadAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads

        assert (
            self.head_dim * heads == embed_size
        ), "Embedding size needs to be divisible by heads"

        self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.fc_out = nn.Linear(heads * self.head_dim, embed_size)

    def forward(self, values, keys, query, mask):
        N = query.shape[0]
        value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

        # Split the embedding into self.heads different pieces
        values = values.reshape(N, value_len, self.heads, self.head_dim)
        keys = keys.reshape(N, key_len, self.heads, self.head_dim)
        queries = query.reshape(N, query_len, self.heads, self.head_dim)

        values = self.values(values)
        keys = self.keys(keys)
        queries = self.queries(queries)

        # Einsum does matrix multiplication for query*keys for each training example
        # with every other training example, don't be confused by einsum
        # it's just a way to do batch matrix multiplication
        energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])
        if mask is not None:
            energy = energy.masked_fill(mask == 0, float("-1e20"))

        attention = torch.softmax(energy / (self.embed_size ** (1 / 2)), dim=3)

        out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
            N, query_len, self.heads * self.head_dim
        )

        out = self.fc_out(out)
        return out


In [42]:
class FeedForwardNetwork(nn.Module):
    def __init__(self, embed_size, ff_hidden_size):
        super(FeedForwardNetwork, self).__init__()
        self.fc1 = nn.Linear(embed_size, ff_hidden_size)
        self.fc2 = nn.Linear(ff_hidden_size, embed_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [43]:
class EncoderLayer(nn.Module):
    def __init__(self, embed_size, heads, ff_hidden_size, dropout):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(embed_size, heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)
        self.ff = FeedForwardNetwork(embed_size, ff_hidden_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attention = self.attention(x, x, x, mask)
        x = self.norm1(attention + x)
        forward = self.ff(x)
        out = self.norm2(forward + x)
        return self.dropout(out)


In [63]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()

        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)
    
def create_no_mask(seq_length):
    return torch.zeros((seq_length, seq_length), dtype=torch.bool)

In [59]:
class CountingHead(nn.Module):
    def __init__(self, embed_size):
        super(CountingHead, self).__init__()
        self.fc = nn.Linear(embed_size, 1)

    def forward(self, x):
        x = x.mean(dim=1)
        out = self.fc(x)
        return out


In [75]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, embed_size, num_layers, heads, ff_hidden_size, dropout, max_length):
        super(Transformer, self).__init__()
        self.encoder = nn.Embedding(src_vocab_size, embed_size)
        self.positional_encoding = nn.Parameter(torch.zeros(1, max_length, embed_size))
        self.layers = nn.ModuleList(
            [EncoderLayer(embed_size, heads, ff_hidden_size, dropout) for _ in range(num_layers)]
        )
        #self.fc_out = nn.Linear(embed_size, num_classes)
        self.dropout = nn.Dropout(dropout)
        self.embed_size = embed_size
         
        # 添加任务特定的头部
        self.counting_head = CountingHead(embed_size)

    def forward(self, x, mask):
        N, H, W, C = x.shape  # N: 批次大小, H: 高度, W: 宽度, C: 通道数
        #positions = torch.arange(0, seq_length).expand(N, seq_length).to(device)
        seq_length = H * W * C  # 序列长度
        if seq_length > self.positional_encoding.size(1):
            # 如果需要，扩展位置编码
            self.positional_encoding = nn.Parameter(torch.zeros(1, seq_length, self.embed_size)).to(x.device)

        out = self.dropout(self.encoder(x) + self.positional_encoding[:, :seq_length, :])

        for layer in self.layers:
            out = layer(out, mask)

        #out = self.fc_out(out)
        out = self.counting_head(out)
        return out


In [76]:
def train(model, data_loader_train, loss_function, optimizer, epoch, train_size):
    '''
    完成一个 epoch 的训练
    '''
    sum_true = 0
    sum_loss = 0.0
    max_valid_acc = 0
    model.train()
    index = 0
    total_data = len(data_loader_train)
    for data in data_loader_train:
        # 可视化训练过程
        index += 1
        print('Training batch {}/{}'.format(index,total_data),end='\r')
        # 选取对应批次数据的输入和标签
        batch_x, batch_y = data[0].to(device), data[1].to(device)

        # 模型预测
        mask = create_no_mask(train_size).to(device)
        y_hat = model(batch_x, mask)
        loss = loss_function(y_hat, batch_y)

        optimizer.zero_grad()   # 梯度清零
        loss.backward()         # 计算梯度
        optimizer.step()        # 更新参数

        y_hat = torch.tensor([torch.argmax(_) for _ in y_hat]).to(device)
        sum_true += torch.sum(y_hat == batch_y).float()
        sum_loss += loss.item()

    train_acc = sum_true / train_size
    train_loss = sum_loss / train_size

    ''' valid_acc = valid()
    if valid_acc > max_valid_acc:
        torch.save(model, "checkpoint.pt")'''

    print(f"epoch: {epoch}, train loss: {train_loss:.4f}, train accuracy: {train_acc*100:.2f}%, valid accuracy: {valid_acc*100:.2f}%, time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) }")


In [77]:
vocab_size = 2000  #（如果按像素处理）或分割块的数量
embed_size = 512
num_layers = 4
heads = 8
ff_hidden_size = 2048
dropout = 0.1
max_length = vocab_size * 4
lr = 5e-5
epochs = 10
batch_size = 16
model = Transformer(vocab_size,embed_size,num_layers,heads,ff_hidden_size,dropout,max_length).to(device)
loss_function = nn.CrossEntropyLoss()                                       # 设置损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=5e-4)  # 设置优化器

data_x = torch.tensor(np.array(data_x))       
data_y = torch.tensor(np.array(data_y)).long()    
dataset = TensorDataset(data_x, data_y)
dataset_size = data_x.size(dim = 0)
train_size = int(dataset_size * 0.8)
test_size = int(dataset_size * 0.2)
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
data_loader_train = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
#data_loader_valid = DataLoader(dataset=dataset.valid, batch_size=batch_size, shuffle=False)
data_loader_test = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)
# 进行训练
for epoch in range(epochs):
    train(model, data_loader_train, loss_function, optimizer, epoch, train_size)

# 对测试集进行预测
#predict()

Training batch 1/10

RuntimeError: [enforce fail at alloc_cpu.cpp:80] data. DefaultCPUAllocator: not enough memory: you tried to allocate 4910284800 bytes.

In [None]:
data_x

tensor([[[[252, 251, 246],
          [253, 254, 248],
          [250, 249, 245],
          ...,
          [ 55,  69,  43],
          [ 61,  74,  44],
          [ 78,  91,  63]],

         [[252, 248, 245],
          [255, 255, 255],
          [255, 255, 251],
          ...,
          [ 42,  58,  29],
          [ 42,  57,  24],
          [ 54,  67,  37]],

         [[252, 248, 249],
          [255, 255, 253],
          [253, 249, 246],
          ...,
          [ 43,  56,  26],
          [ 41,  54,  26],
          [ 51,  64,  38]],

         ...,

         [[ 63,  64,  68],
          [ 53,  54,  59],
          [ 57,  55,  66],
          ...,
          [ 24,  38,  21],
          [ 24,  38,  21],
          [ 40,  52,  38]],

         [[ 65,  68,  73],
          [ 65,  66,  71],
          [ 70,  68,  79],
          ...,
          [ 25,  39,  22],
          [ 24,  38,  21],
          [ 35,  46,  30]],

         [[ 88,  91,  98],
          [ 80,  83,  92],
          [ 94,  94, 104],
         