In [41]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import random

In [40]:
# Function to generate a sequence with varying probabilities for 1s
def generate_sequence(length, p1, p2, special_regions):
    sequence = torch.zeros(length)
    global_prob = torch.rand(length)
    sequence[global_prob < p1] = 1

    for region in special_regions:
        a, b = region
        if 0 <= a < b <= length:
            special_prob = torch.rand(b - a)
            sequence[a:b][special_prob < p2] = 1

    return sequence


# Custom Dataset class
class SequenceDataset(Dataset):
    def __init__(self, seq_len, p1, p2, special_regions, num_samples):
        """
        Initializes the dataset.

        Args:
        - seq_len: Length of each sequence.
        - p1: Global probability for 1s.
        - p2: Probability for 1s in special regions.
        - special_regions: List of regions with higher probability.
        - num_samples: Number of sequences in the dataset.
        """
        self.seq_len = seq_len
        self.p1 = p1
        self.p2 = p2
        self.special_regions = special_regions
        self.num_samples = num_samples

    def __len__(self):
        # Total number of samples in the dataset
        return self.num_samples

    def __getitem__(self, idx):
        # Generates a sequence
        if random.random() < 0.5:
            sequence = generate_sequence(self.seq_len, self.p1, self.p1, self.special_regions)
            return sequence, 0
        else:
            sequence = generate_sequence(self.seq_len, self.p1, self.p2, self.special_regions)
            return sequence, 1

In [42]:
# Define the Transformer model
class TransformerClassifier(nn.Module):
    def __init__(self, input_dim, num_classes, nhead, num_encoder_layers, dim_feedforward, dropout=0.1):
        super(TransformerClassifier, self).__init__()
        
        # Input dimension must match the input feature size
        self.embedding = nn.Embedding(input_dim, input_dim)  # For categorical data, use nn.Linear for continuous
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=input_dim, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout),
            num_layers=num_encoder_layers
        )
        self.fc = nn.Linear(input_dim, num_classes)  # Output layer for classification

    def forward(self, x):
        # x shape: (seq_length, batch_size) for nn.Transformer
        x = self.embedding(x)  # Get embeddings for inputs
        x = self.transformer(x)  # Pass through the transformer
        x = x.mean(dim=0)  # Average pooling over sequence length
        x = self.fc(x)  # Classify
        return x

In [44]:
# Create dataset and dataloader
seq_len = 100  # Length of each sequence
p1 = 0.1  # Global probability for 1s
p2 = 0.5  # Probability for 1s in special regions
special_regions = [(10, 20), (50, 70)]  # Regions with higher probability
num_samples = 100  # Number of sequences to generate

# Create the dataset and the dataloader
dataset = SequenceDataset(seq_len, p1, p2, special_regions, num_samples)
dataloader = DataLoader(dataset, batch_size=20, shuffle=True)

test_dataset = SequenceDataset(seq_len, p1, p2, special_regions, 10)
test_dataloader = DataLoader(dataset, batch_size=20, shuffle=True)

model = TransformerClassifier(input_dim=2,
                              num_classes=2,
                              nhead=2,
                              num_encoder_layers=2,
                              dim_feedforward=2,
                              dropout=0.1)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10

In [37]:
# Training loop
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    total_loss = 0
    for batch, labels in dataloader:
        optimizer.zero_grad()  # Zero the gradients
        output = model(batch.T)  # Transpose batch for the transformer input
        loss = criterion(output, labels)
        loss.backward()  # Backpropagation
        optimizer.step()  # Update weights
        total_loss += loss.item()
    
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(dataloader)}')

NameError: name 'num_epochs' is not defined

In [33]:
batch

[tensor([[0., 1., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 1., 0.,  ..., 0., 0., 1.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]]),
 tensor([1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1])]

In [34]:
x = batch[0]

In [35]:
x

tensor([[0., 1., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 1., 0.,  ..., 0., 0., 1.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])

In [36]:
for i in range(x.shape[0]):
    print(sum(x[i]))

tensor(21.)
tensor(20.)
tensor(19.)
tensor(32.)
tensor(7.)
tensor(15.)
tensor(12.)
tensor(6.)
tensor(14.)
tensor(9.)
tensor(12.)
tensor(23.)
tensor(11.)
tensor(13.)
tensor(22.)
tensor(27.)
tensor(10.)
tensor(21.)


In [15]:
x.shape[0]

18