In [None]:
import numpy as np
import matplotlib.pyplot as plt

from mini_torch import Tensor, nn, optim, F, schedulers

## Create Synthetic Text Dataset

We'll create a simple synthetic dataset for sentiment classification.

In [None]:
# Create a simple synthetic text classification dataset
np.random.seed(42)

# Parameters
vocab_size = 1000  # Vocabulary size
max_seq_len = 50   # Maximum sequence length
embed_dim = 64     # Embedding dimension
n_classes = 2      # Binary classification (positive/negative sentiment)

# Generate synthetic data
# Positive samples: contain more tokens from upper half of vocabulary
# Negative samples: contain more tokens from lower half of vocabulary

def generate_sequence(is_positive, seq_len=50):
    """Generate a synthetic sequence"""
    if is_positive:
        # Positive: more tokens from upper half (500-1000)
        tokens = np.random.choice(range(500, vocab_size), size=int(seq_len * 0.7))
        noise = np.random.choice(range(vocab_size), size=int(seq_len * 0.3))
    else:
        # Negative: more tokens from lower half (0-500)
        tokens = np.random.choice(range(0, 500), size=int(seq_len * 0.7))
        noise = np.random.choice(range(vocab_size), size=int(seq_len * 0.3))
    
    seq = np.concatenate([tokens, noise])
    np.random.shuffle(seq)
    return seq[:seq_len]

# Generate training data (2000 samples)
n_train = 2000
X_train = []
y_train = []

for i in range(n_train // 2):
    # Positive samples
    X_train.append(generate_sequence(is_positive=True, seq_len=max_seq_len))
    y_train.append(1)
    
    # Negative samples
    X_train.append(generate_sequence(is_positive=False, seq_len=max_seq_len))
    y_train.append(0)

X_train = np.array(X_train)
y_train = np.array(y_train)

# Generate test data (500 samples)
n_test = 500
X_test = []
y_test = []

for i in range(n_test // 2):
    X_test.append(generate_sequence(is_positive=True, seq_len=max_seq_len))
    y_test.append(1)
    X_test.append(generate_sequence(is_positive=False, seq_len=max_seq_len))
    y_test.append(0)

X_test = np.array(X_test)
y_test = np.array(y_test)

# Shuffle
train_idx = np.random.permutation(len(X_train))
X_train = X_train[train_idx]
y_train = y_train[train_idx]

test_idx = np.random.permutation(len(X_test))
X_test = X_test[test_idx]
y_test = y_test[test_idx]

print(f"Training set: {X_train.shape}")
print(f"Test set: {X_test.shape}")
print(f"Vocabulary size: {vocab_size}")
print(f"Max sequence length: {max_seq_len}")
print(f"\nSample sequence (first 10 tokens): {X_train[0][:10]}")
print(f"Sample label: {y_train[0]} ({'Positive' if y_train[0] == 1 else 'Negative'})")

## Define Attention-Based Classification Model

In [None]:
class AttentionClassifier(nn.Module):
    """
    Attention-based sequence classifier.
    
    Architecture:
    1. Embedding layer
    2. Positional encoding (simple learned)
    3. Multi-head attention layers
    4. Global average pooling
    5. Classification head
    """
    
    def __init__(self, vocab_size, embed_dim, num_heads, num_layers, 
                 max_seq_len, num_classes, dropout=0.1):
        super().__init__()
        
        self.embed_dim = embed_dim
        self.max_seq_len = max_seq_len
        
        # Embedding layer (simplified - using random embeddings)
        # In practice, this would be learned
        self.embedding = Tensor(np.random.randn(vocab_size, embed_dim).astype(np.float32) * 0.01)
        
        # Positional encoding (learned)
        self.pos_encoding = Tensor(np.random.randn(max_seq_len, embed_dim).astype(np.float32) * 0.01)
        
        # Transformer blocks
        self.transformer_blocks = []
        for _ in range(num_layers):
            block = nn.TransformerBlock(
                embed_dim=embed_dim,
                num_heads=num_heads,
                ff_dim=embed_dim * 4,
                dropout=dropout
            )
            self.transformer_blocks.append(block)
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Linear(embed_dim, embed_dim // 2),
            nn.Dropout(dropout),
            nn.Linear(embed_dim // 2, num_classes)
        )
        
        # Collect parameters
        self._parameters = [self.embedding, self.pos_encoding]
        for block in self.transformer_blocks:
            self._parameters.extend(block.parameters())
        self._parameters.extend(self.classifier.parameters())
    
    def forward(self, x):
        """
        Forward pass.
        
        Args:
            x: Input tensor of token indices, shape (batch_size, seq_len)
        
        Returns:
            Logits of shape (batch_size, num_classes)
        """
        batch_size, seq_len = x.shape
        
        # Embedding lookup: (batch, seq_len) -> (batch, seq_len, embed_dim)
        # Simple implementation: gather embeddings
        embedded = []
        for b in range(batch_size):
            seq_emb = []
            for t in range(seq_len):
                token_idx = int(x.data[b, t])
                seq_emb.append(self.embedding.data[token_idx])
            embedded.append(np.stack(seq_emb))
        embedded = Tensor(np.stack(embedded))  # (batch, seq_len, embed_dim)
        
        # Add positional encoding
        pos_enc = self.pos_encoding[:seq_len].unsqueeze(0)  # (1, seq_len, embed_dim)
        x = embedded + pos_enc  # Broadcasting over batch
        
        # Pass through transformer blocks
        for block in self.transformer_blocks:
            x = block(x)
        
        # Global average pooling over sequence dimension
        x = x.mean(axis=1)  # (batch, embed_dim)
        
        # Classification
        logits = self.classifier.modules_list[0](x)  # First linear
        logits = logits.relu()
        logits = self.classifier.modules_list[1](logits)  # Dropout
        logits = self.classifier.modules_list[2](logits)  # Final linear
        
        return logits

# Create model
print("Creating Attention-based Classifier...")
model = AttentionClassifier(
    vocab_size=vocab_size,
    embed_dim=embed_dim,
    num_heads=4,
    num_layers=2,
    max_seq_len=max_seq_len,
    num_classes=n_classes,
    dropout=0.1
)

print(f"Model created with {len(model.parameters())} parameter tensors")
print(f"Embedding dimension: {embed_dim}")
print(f"Number of attention heads: 4")
print(f"Number of transformer layers: 2")

## Training and Evaluation Functions

In [None]:
def train_epoch(model, X_train, y_train, optimizer, batch_size=32):
    """Train for one epoch"""
    model.train()
    n_samples = X_train.shape[0]
    indices = np.random.permutation(n_samples)
    
    epoch_loss = 0.0
    n_batches = 0
    
    for i in range(0, n_samples, batch_size):
        # Get batch
        batch_indices = indices[i:i+batch_size]
        X_batch = X_train[batch_indices]
        y_batch = y_train[batch_indices]
        
        # Convert to Tensors
        X_tensor = Tensor(X_batch.astype(np.float32))
        y_tensor = Tensor(y_batch.astype(np.float32), requires_grad=False)
        
        # Forward pass
        logits = model(X_tensor)
        
        # Compute loss
        loss = F.cross_entropy(logits, y_tensor)
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        
        # Update parameters
        optimizer.step()
        
        epoch_loss += loss.item()
        n_batches += 1
    
    return epoch_loss / n_batches


def evaluate(model, X_test, y_test, batch_size=32):
    """Evaluate model accuracy"""
    model.eval()
    n_samples = X_test.shape[0]
    correct = 0
    total = 0
    total_loss = 0.0
    n_batches = 0
    
    for i in range(0, n_samples, batch_size):
        X_batch = X_test[i:i+batch_size]
        y_batch = y_test[i:i+batch_size]
        
        # Convert to Tensor
        X_tensor = Tensor(X_batch.astype(np.float32))
        y_tensor = Tensor(y_batch.astype(np.float32), requires_grad=False)
        
        # Forward pass
        logits = model(X_tensor)
        
        # Compute loss
        loss = F.cross_entropy(logits, y_tensor)
        total_loss += loss.item()
        n_batches += 1
        
        # Get predictions
        predictions = np.argmax(logits.data, axis=1)
        
        correct += (predictions == y_batch).sum()
        total += len(y_batch)
    
    return correct / total, total_loss / n_batches

## Train the Model

In [None]:
# Training configuration
n_epochs = 20
batch_size = 32
learning_rate = 0.001

# Create optimizer
optimizer = optim.Adam(model.parameters(), learning_rate=learning_rate)

# Create scheduler
scheduler = schedulers.LRSchedulerOnPlateau(
    optimizer, 
    initial_lr=learning_rate, 
    patience=3, 
    factor=0.5,
    mode='min'
)

# Training loop
train_losses = []
test_losses = []
test_accuracies = []

print("\n" + "="*70)
print("Starting Attention Model Training")
print("="*70)

for epoch in range(n_epochs):
    print(f"\nEpoch {epoch+1}/{n_epochs}")
    print("-"*70)
    
    # Train
    train_loss = train_epoch(model, X_train, y_train, optimizer, batch_size)
    train_losses.append(train_loss)
    
    # Evaluate
    test_acc, test_loss = evaluate(model, X_test, y_test, batch_size)
    test_losses.append(test_loss)
    test_accuracies.append(test_acc)
    
    # Update learning rate based on validation loss
    scheduler.step(test_loss)
    
    print(f"Train Loss: {train_loss:.4f}")
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_acc:.4f}")
    print(f"Learning Rate: {optimizer.learning_rate:.6f}")

print("\n" + "="*70)
print("Training completed!")
print("="*70)

## Visualize Results

In [None]:
# Plot training curves
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Loss curves
ax1.plot(train_losses, marker='o', label='Train Loss', linewidth=2)
ax1.plot(test_losses, marker='s', label='Test Loss', linewidth=2)
ax1.set_xlabel('Epoch', fontsize=12)
ax1.set_ylabel('Loss', fontsize=12)
ax1.set_title('Training and Test Loss', fontsize=14, fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Accuracy curve
ax2.plot(test_accuracies, marker='o', color='green', linewidth=2)
ax2.set_xlabel('Epoch', fontsize=12)
ax2.set_ylabel('Accuracy', fontsize=12)
ax2.set_title('Test Accuracy', fontsize=14, fontweight='bold')
ax2.grid(True, alpha=0.3)
ax2.axhline(y=0.5, color='r', linestyle='--', label='Random Baseline', alpha=0.5)
ax2.legend()

plt.tight_layout()
plt.show()

print(f"\nBest Test Accuracy: {max(test_accuracies):.4f}")
print(f"Final Test Accuracy: {test_accuracies[-1]:.4f}")

## Analyze Predictions

In [None]:
# Get predictions on test set
X_tensor = Tensor(X_test[:100].astype(np.float32))
logits = model(X_tensor)
predictions = np.argmax(logits.data, axis=1)
true_labels = y_test[:100]

# Analyze predictions
correct = predictions == true_labels
accuracy = correct.sum() / len(correct)

print("Prediction Analysis (first 100 samples):")
print(f"Accuracy: {accuracy:.2%}")
print(f"Correct: {correct.sum()}/{len(correct)}")

# Show some examples
print("\nSample Predictions:")
print("-" * 60)
print(f"{'True Label':<15} {'Predicted':<15} {'Confidence':<15} {'Correct'}")
print("-" * 60)

for i in range(20):
    true_label = 'Positive' if true_labels[i] == 1 else 'Negative'
    pred_label = 'Positive' if predictions[i] == 1 else 'Negative'
    
    # Get confidence (softmax probability)
    probs = F.softmax(Tensor(logits.data[i:i+1]), axis=1)
    confidence = probs.data[0, predictions[i]]
    
    is_correct = '✓' if correct[i] else '✗'
    
    print(f"{true_label:<15} {pred_label:<15} {confidence:<15.3f} {is_correct}")

## Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

# Get all predictions
all_predictions = []
batch_size = 32
for i in range(0, len(X_test), batch_size):
    X_batch = X_test[i:i+batch_size]
    X_tensor = Tensor(X_batch.astype(np.float32))
    logits = model(X_tensor)
    preds = np.argmax(logits.data, axis=1)
    all_predictions.extend(preds)

# Compute confusion matrix
cm = confusion_matrix(y_test, all_predictions)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Negative', 'Positive'], 
            yticklabels=['Negative', 'Positive'])
plt.title('Confusion Matrix', fontsize=16, fontweight='bold')
plt.ylabel('True Label', fontsize=12)
plt.xlabel('Predicted Label', fontsize=12)
plt.tight_layout()
plt.show()

# Calculate metrics
tn, fp, fn, tp = cm.ravel()
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print("\nPerformance Metrics:")
print("="*40)
print(f"Accuracy:  {(tp + tn) / (tp + tn + fp + fn):.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")

## Summary

This notebook demonstrates:
1. **Multi-Head Attention** implementation from scratch
2. **Transformer blocks** with attention and feed-forward networks
3. **Sequence classification** using attention mechanisms
4. **Positional encoding** for sequence ordering
5. Training with **Adam optimizer** and **learning rate scheduling**

The attention mechanism allows the model to focus on different parts of the input sequence when making predictions, which is crucial for understanding context in sequential data.