In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

In [None]:
# ----------------------------------------------------------------
#  Допустим, у нас уже есть класс Dataset, который отдаёт:
#    - image (тензор [C, H, W])
#    - tokens (тензор [T])
# ----------------------------------------------------------------
class FormulaDataset(Dataset):
    def __init__(self, images, token_sequences, transform=None):
        """
        images: список или массив (B, H, W, C) либо пути к изображениям
        token_sequences: список тензоров или списков индексов (B, T)
        transform: torchvision transforms для картинок, если нужно
        """
        self.images = images
        self.token_sequences = token_sequences
        self.transform = transform
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = self.images[idx]  # здесь может быть путь к файлу, нужно загрузить + применить transform
        tokens = self.token_sequences[idx]
        
        if self.transform:
            image = self.transform(image)  # например, Resize, ToTensor, нормализация
        
        # image: тензор [C, H, W]
        # tokens: тензор [T]
        return image, tokens

# ----------------------------------------------------------------
#  Коллатер (необязательно), если нужно паддить последовательности
# ----------------------------------------------------------------
def collate_fn(batch):
    """
    batch = [(image1, tokens1), (image2, tokens2), ...]
    Надо привести к одинаковой длине последовательности токенов, 
    если они разной длины, плюс сформировать batch тензоров для картинок.
    """
    images = []
    tokens_list = []
    lengths = []
    
    for (img, tok) in batch:
        images.append(img)
        tokens_list.append(tok)
        lengths.append(tok.size(0))
    
    # Допустим, хотим паддить по максимальной длине в батче
    max_len = max(lengths)
    
    # Паддим последовательности нулями (или <PAD>-токеном, если у вас в словаре есть такой ID)
    padded_tokens = []
    for tok in tokens_list:
        pad_size = max_len - tok.size(0)
        if pad_size > 0:
            # дополним нулями в конце
            pad = torch.zeros(pad_size, dtype=tok.dtype)
            # или tok.new_full((pad_size,), fill_value=PAD_ID)
            tok = torch.cat([tok, pad], dim=0)
        padded_tokens.append(tok.unsqueeze(0))
    
    padded_tokens = torch.cat(padded_tokens, dim=0)  # [B, max_len]
    
    # Склеим изображения в один тензор [B, C, H, W]
    images = torch.stack(images, dim=0)
    
    return images, padded_tokens, lengths

# ----------------------------------------------------------------
#  Модель (упрощённый вариант, можно взять тот, что выше)
# ----------------------------------------------------------------
class CNNEncoder(nn.Module):
    def __init__(self, input_channels=1, feature_dim=256):
        super(CNNEncoder, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(input_channels, 64, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(128, feature_dim, kernel_size=3, padding=1), nn.ReLU(),
        )
    
    def forward(self, images):
        # images: [B, C, H, W]
        features = self.conv_layers(images)  # [B, feature_dim, H/4, W/4]
        # Просто возьмём global average pooling как пример
        # (или можно оставить spatial map, чтобы потом использовать attention)
        pooled = features.mean(dim=[2, 3])  # [B, feature_dim]
        return pooled


class RNNDecoder(nn.Module):
    def __init__(self, vocab_size, embed_dim=256, hidden_dim=256):
        super(RNNDecoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim + hidden_dim, hidden_dim, batch_first=True)
        
        self.init_h = nn.Linear(hidden_dim, hidden_dim)
        self.init_c = nn.Linear(hidden_dim, hidden_dim)
        
        self.fc_out = nn.Linear(hidden_dim, vocab_size)
    
    def forward(self, encoder_out, tokens):
        """
        encoder_out: [B, hidden_dim] – вектор контекста от CNN
        tokens: [B, T] – входные токены (teacher forcing)
        """
        B, T = tokens.shape
        
        # начальные h, c
        h0 = self.init_h(encoder_out).unsqueeze(0)  # [1, B, hidden_dim]
        c0 = self.init_c(encoder_out).unsqueeze(0)  # [1, B, hidden_dim]
        
        # эмбеддинги токенов [B, T, embed_dim]
        emb = self.embedding(tokens)
        
        # на каждом шаге конкатенируем (emb[t], encoder_out) по фичам
        # для этого "растянем" encoder_out по длине T
        context_expanded = encoder_out.unsqueeze(1).expand(-1, T, -1)  # [B, T, hidden_dim]
        lstm_input = torch.cat([emb, context_expanded], dim=2)  # [B, T, embed_dim+hidden_dim]
        
        outputs, (hn, cn) = self.lstm(lstm_input, (h0, c0))  # [B, T, hidden_dim]
        logits = self.fc_out(outputs)  # [B, T, vocab_size]
        
        return logits


class Image2LatexModel(nn.Module):
    def __init__(self, vocab_size, 
                 cnn_input_channels=1, cnn_feature_dim=256,
                 embed_dim=256, hidden_dim=256):
        super().__init__()
        self.encoder = CNNEncoder(cnn_input_channels, cnn_feature_dim)
        self.decoder = RNNDecoder(vocab_size, embed_dim, hidden_dim)
    
    def forward(self, images, tokens):
        encoder_out = self.encoder(images)     # [B, feature_dim]
        logits = self.decoder(encoder_out, tokens)  # [B, T, vocab_size]
        return logits

# ----------------------------------------------------------------
#  Пример обучения
# ----------------------------------------------------------------
def train_one_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0.0
    for images, tokens, lengths in dataloader:
        images = images.to(device)
        tokens = tokens.to(device)
        
        # Forward
        logits = model(images, tokens)  # [B, T, vocab_size]
        
        # Нам надо сравнить logits с "истинными" токенами. 
        # Предположим, что в tokens как раз записаны "таргеты".
        # Но учтите, что иногда токен на входе — это shift-на-1 (без последнего),
        # а таргет — сам реальный. Для упрощённого примера используем один и тот же.
        
        # logits: [B, T, vocab_size]
        # tokens: [B, T]
        
        # Перегоняем всё в 2D для CrossEntropyLoss:
        B, T, V = logits.shape
        logits_2d = logits.view(B*T, V)       # [B*T, V]
        targets_2d = tokens.view(B*T)         # [B*T]
        
        loss = criterion(logits_2d, targets_2d)
        
        # Обратный проход
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(dataloader)

def validate_one_epoch(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for images, tokens, lengths in dataloader:
            images = images.to(device)
            tokens = tokens.to(device)
            
            logits = model(images, tokens)
            B, T, V = logits.shape
            logits_2d = logits.view(B*T, V)
            targets_2d = tokens.view(B*T)
            
            loss = criterion(logits_2d, targets_2d)
            total_loss += loss.item()
    return total_loss / len(dataloader)

In [None]:
if __name__ == "__main__":
    # Параметры
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    vocab_size = 100  # допустим, у нас словарь из 100 токенов
    batch_size = 8
    num_epochs = 5
    
    # Сформируем фиктивный датасет (для примера)
    # На практике вы используете свои изображения + токены
    train_images = [torch.randn(1, 64, 64) for _ in range(100)]  # 1-канальные изображения
    train_tokens = [torch.randint(0, vocab_size, (10,)) for _ in range(100)]
    
    val_images = [torch.randn(1, 64, 64) for _ in range(20)]
    val_tokens = [torch.randint(0, vocab_size, (10,)) for _ in range(20)]
    
    train_dataset = FormulaDataset(train_images, train_tokens)
    val_dataset = FormulaDataset(val_images, val_tokens)
    
    # DataLoader
    train_loader = DataLoader(train_dataset, batch_size=batch_size,
                              shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=batch_size,
                            shuffle=False, collate_fn=collate_fn)
    
    # Модель
    model = Image2LatexModel(vocab_size=vocab_size, 
                             cnn_input_channels=1,
                             cnn_feature_dim=256,
                             embed_dim=256,
                             hidden_dim=256)
    model = model.to(device)
    
    # Оптимизатор и функция потерь
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # допустим, 0 = PAD токен
    
    for epoch in range(num_epochs):
        train_loss = train_one_epoch(model, train_loader, optimizer, criterion, device)
        val_loss = validate_one_epoch(model, val_loader, criterion, device)
        
        print(f"Epoch {epoch+1}/{num_epochs}, "
              f"Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

