In [2]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

class InfixToPostfixDataset(Dataset):
    """
    PyTorch Dataset to generate unique infix and postfix sequences with specified depths.

    Parameters:
    - num_samples: Number of unique sequences to generate.
    - min_depth: Minimum depth of the expression tree.
    - max_depth: Maximum depth of the expression tree.
    - seed: Random seed for reproducibility.
    """
    def __init__(self, num_samples, min_depth=1, max_depth=3, seed=42):
        self.num_samples = num_samples
        self.min_depth = min_depth
        self.max_depth = max_depth
        self.seed = seed
        np.random.seed(self.seed)
        self.data = self._generate_data()

    def _generate_data(self):
        """Generate unique infix and postfix sequences."""
        data = set()
        while len(data) < self.num_samples:
            infix, postfix = self._generate_random_expression()
            if infix not in {x[0] for x in data}:  # Ensure unique infix expressions
                data.add((infix, postfix))
        return list(data)

    def _generate_random_expression(self):
        """Generate a random infix and postfix expression pair."""
        def random_digit():
            return str(np.random.randint(1, 10))

        def random_operator():
            return np.random.choice(['+', '-', '*'])

        def generate_expression(depth=0):
            if depth >= self.max_depth or (depth >= self.min_depth and np.random.random() > 0.7):
                num = random_digit()
                return num, num
            else:
                left_infix, left_postfix = generate_expression(depth + 1)
                right_infix, right_postfix = generate_expression(depth + 1)
                operator = random_operator()
                infix = f"({left_infix} {operator} {right_infix})"
                postfix = f"{left_postfix} {right_postfix} {operator}"
                return infix, postfix

        return generate_expression()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        infix, postfix = self.data[idx]
        return {
            'infix': infix,
            'postfix': postfix
        }

dataset = InfixToPostfixDataset(num_samples=10000, min_depth=1, max_depth=4)

from torch.utils.data import random_split

# Set the seed for reproducibility
torch.manual_seed(42)

# Define the split ratio
train_ratio = 0.8
test_ratio = 0.2

# Calculate the sizes of each set
train_size = int(train_ratio * len(dataset))
test_size = len(dataset) - train_size

# Split the dataset
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

print(f"Train dataset size: {len(train_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")
PAD_TOKEN = '<PAD>'
BOS_TOKEN = '<BOS>'
EOS_TOKEN = '<EOS>'
tokens = [PAD_TOKEN, BOS_TOKEN, EOS_TOKEN] + list('0123456789+-*()')
vocab = {token: i for i, token in enumerate(tokens)}

print("Tokens:", tokens)
print("Vocabulary size:", len(vocab))
print("Vocabulary:", vocab)

def pad_batch(batch):
    """
    Custom collation function to pad batches of tokenized input and target sequences,
    ignoring spaces during tokenization.

    Parameters:
    - batch: List of dictionaries with 'infix' and 'postfix' keys.

    Returns:
    - padded_infix: Padded tensor of tokenized infix sequences.
    - padded_postfix: Padded tensor of tokenized postfix sequences.
    """
    # Extract infix and postfix sequences
    infix_seqs = [item['infix'] for item in batch]
    postfix_seqs = [item['postfix'] for item in batch]

    # Tokenize sequences, ignoring spaces
    tokenized_infix = [[vocab[char] for char in seq if char != ' '] for seq in infix_seqs]
    tokenized_postfix = [[vocab[char] for char in seq if char != ' '] for seq in postfix_seqs]

    # Find the maximum length in the batch
    max_infix_len = max(len(seq) for seq in tokenized_infix)
    max_postfix_len = max(len(seq) for seq in tokenized_postfix)

    # Pad sequences
    padded_infix = [seq + [vocab[PAD_TOKEN]] * (max_infix_len - len(seq)) for seq in tokenized_infix]
    padded_postfix = [seq + [vocab[PAD_TOKEN]] * (max_postfix_len - len(seq)) for seq in tokenized_postfix]

    # Convert to tensors
    padded_infix = torch.tensor(padded_infix, dtype=torch.long)
    padded_postfix = torch.tensor(padded_postfix, dtype=torch.long)

    return padded_infix, padded_postfix


train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True, collate_fn=pad_batch)
test_loader = DataLoader(test_dataset, batch_size=100, shuffle=False, collate_fn=pad_batch)

x, labels = next(iter(train_loader))
print(x.shape)

Train dataset size: 8000
Test dataset size: 2000
Tokens: ['<PAD>', '<BOS>', '<EOS>', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '-', '*', '(', ')']
Vocabulary size: 18
Vocabulary: {'<PAD>': 0, '<BOS>': 1, '<EOS>': 2, '0': 3, '1': 4, '2': 5, '3': 6, '4': 7, '5': 8, '6': 9, '7': 10, '8': 11, '9': 12, '+': 13, '-': 14, '*': 15, '(': 16, ')': 17}
torch.Size([100, 57])


In [3]:
x, labels = next(iter(train_loader))
print(x.shape)

torch.Size([100, 61])


In [None]:
import torch
import torch.nn as nn

class TransformerInfixToPostfix(nn.Module):
    def __init__(self, vocab_size, embed_size=128, num_heads=8, num_encoder_layers=3,
                 num_decoder_layers=3, ff_hidden_dim=512, max_len=50, dropout=0.1, pad_idx=0):
        super(TransformerInfixToPostfix, self).__init__()
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.pad_idx = pad_idx

        # Embedding layers for input and output tokens
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_encoding = self._create_positional_encoding(embed_size, max_len)

        # Transformer with batch_first=True
        self.transformer = nn.Transformer(
            d_model=embed_size,
            nhead=num_heads,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=ff_hidden_dim,
            dropout=dropout,
            batch_first=True  # Use batch_first=True to simplify input dimensions
        )

        # Output projection
        self.fc_out = nn.Linear(embed_size, vocab_size)

    def forward(self, src, tgt):
        # Embed source and target sequences
        src_emb = self.embedding(src) + self.positional_encoding[:src.size(1), :]
        tgt_emb = self.embedding(tgt) + self.positional_encoding[:tgt.size(1), :]

        # Generate masks
        src_mask = None  # No causal masking for the source
        tgt_mask = self._generate_square_subsequent_mask(tgt.size(1)).to(src.device)
        src_padding_mask = self._generate_padding_mask(src).to(src.device)
        tgt_padding_mask = self._generate_padding_mask(tgt).to(src.device)

        # Pass through Transformer (no need to permute dimensions)
        output = self.transformer(
            src_emb, tgt_emb, src_mask=src_mask, tgt_mask=tgt_mask,
            src_key_padding_mask=src_padding_mask, tgt_key_padding_mask=tgt_padding_mask
        )

        # Project to vocabulary size
        output = self.fc_out(output)  # (batch_size, seq_len, vocab_size)
        return output

    def _create_positional_encoding(self, embed_size, max_len):
        """Generate positional encoding."""
        pos_enc = torch.zeros(max_len, embed_size)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * (-torch.log(torch.tensor(10000.0)) / embed_size))
        pos_enc[:, 0::2] = torch.sin(position * div_term)
        pos_enc[:, 1::2] = torch.cos(position * div_term)
        return pos_enc.unsqueeze(0)  # Add batch dimension

    def _generate_square_subsequent_mask(self, size):
        """Generate a subsequent mask for the decoder to prevent attention to future tokens."""
        mask = torch.triu(torch.ones(size, size)) == 1
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def _generate_padding_mask(self, seq):
        """Create a padding mask for sequences."""
        return seq == self.pad_idx


In [None]:
def train_model(model, dataloader, optimizer, criterion, num_epochs, device):
    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0
        for batch in dataloader:
            src, tgt = batch
            src, tgt = src.to(device), tgt.to(device)
            
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            # Forward pass
            optimizer.zero_grad()
            logits = model(src, tgt_input)

            # Compute loss
            logits = logits.reshape(-1, logits.size(-1))
            tgt_output = tgt_output.reshape(-1)
            loss = criterion(logits, tgt_output)

            # Backpropagation
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss / len(dataloader)}")
