# üß™ M2.5‚Ä≤: HDC Semantic Header

**Hypothesis:** HDC vector can improve model inference by providing semantic context as pseudo-tokens.

**Architecture:**
```
[Text] ‚Üí HDC Encoder ‚Üí [10,000 ternary]
                            ‚Üì
                     MLP (10,000 ‚Üí k √ó hidden_dim)
                            ‚Üì
                    [k pseudo-tokens]
                            ‚Üì
         [pseudo-tokens] + [text embeddings] ‚Üí LLM ‚Üí output
```

**Task:** Sentiment classification (SST-2)

---

## Step 1: Setup

In [None]:
!pip install -q transformers datasets accelerate
!pip install -q sentence-transformers
!pip install -q tqdm numpy matplotlib

In [None]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader, Dataset
from transformers import AutoModelForCausalLM, AutoTokenizer
from sentence_transformers import SentenceTransformer
from datasets import load_dataset
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

print(f"PyTorch: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## Step 2: Load Model First (to get HIDDEN_SIZE)

In [None]:
# Use smaller model for faster loading
MODEL_NAME = "facebook/opt-350m"

print(f"Loading {MODEL_NAME}...")

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = 'left'

# Load without quantization - model is small enough
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16,
    device_map="auto",
    trust_remote_code=True
)

# Get hidden size
HIDDEN_SIZE = model.config.hidden_size
print(f"Model loaded! Hidden size: {HIDDEN_SIZE}")

## Step 3: HDC Encoder

In [None]:
class TernaryHDCEncoder:
    """Encode text to ternary HDC vectors via SentenceTransformer + projection"""
    
    def __init__(self, hd_dim=10000, sparsity=0.7, seed=42):
        self.hd_dim = hd_dim
        self.sparsity = sparsity
        self.st_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.st_dim = 384
        
        # Fixed random projection (float32)
        np.random.seed(seed)
        self.projection = torch.tensor(
            np.random.randn(self.st_dim, hd_dim).astype(np.float32) / np.sqrt(self.st_dim),
            dtype=torch.float32
        )
    
    def encode(self, texts):
        """Encode list of texts to ternary HDC vectors"""
        # Get ST embeddings
        st_emb = self.st_model.encode(texts, convert_to_tensor=True, show_progress_bar=False)
        
        # Ensure float32
        st_emb = st_emb.float()
        projection = self.projection.to(st_emb.device).float()
        
        # Project to HD space
        projected = st_emb @ projection
        
        # Ternarize
        ternary = torch.zeros_like(projected)
        for i in range(len(projected)):
            vec = projected[i]
            threshold = torch.quantile(torch.abs(vec), self.sparsity)
            ternary[i] = torch.where(vec > threshold, torch.ones_like(vec),
                                      torch.where(vec < -threshold, -torch.ones_like(vec),
                                                  torch.zeros_like(vec)))
        
        return ternary

# Test
hdc_encoder = TernaryHDCEncoder()
test_hdc = hdc_encoder.encode(["This is a test sentence."])
print(f"HDC shape: {test_hdc.shape}")
print(f"Sparsity: {(test_hdc == 0).float().mean():.1%}")

## Step 4: Semantic Header Module

In [None]:
class SemanticHeader(nn.Module):
    """
    Converts HDC vector to k pseudo-tokens that can be prepended to input embeddings.
    
    HDC (10,000) ‚Üí MLP ‚Üí k √ó hidden_dim
    """
    
    def __init__(self, hdc_dim=10000, hidden_dim=512, n_tokens=4):
        super().__init__()
        self.n_tokens = n_tokens
        self.hidden_dim = hidden_dim
        
        # MLP: HDC ‚Üí intermediate ‚Üí n_tokens √ó hidden_dim
        intermediate_dim = min(2048, hidden_dim * 4)
        self.mlp = nn.Sequential(
            nn.Linear(hdc_dim, intermediate_dim),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(intermediate_dim, n_tokens * hidden_dim)
        )
    
    def forward(self, hdc_vectors):
        """
        Args:
            hdc_vectors: (batch_size, hdc_dim)
        
        Returns:
            pseudo_tokens: (batch_size, n_tokens, hidden_dim)
        """
        batch_size = hdc_vectors.shape[0]
        output = self.mlp(hdc_vectors)
        pseudo_tokens = output.view(batch_size, self.n_tokens, self.hidden_dim)
        return pseudo_tokens

# Test with actual HIDDEN_SIZE
print(f"Model hidden size: {HIDDEN_SIZE}")
header = SemanticHeader(hdc_dim=10000, hidden_dim=HIDDEN_SIZE, n_tokens=4).to(device)
test_tokens = header(test_hdc.float().to(device))
print(f"Pseudo-tokens shape: {test_tokens.shape}")

## Step 5: Load Dataset

In [None]:
# Load SST-2
print("Loading SST-2 dataset...")
dataset = load_dataset("glue", "sst2")

# Limit for speed
MAX_TRAIN = 2000
MAX_VAL = 500

train_texts = dataset['train']['sentence'][:MAX_TRAIN]
train_labels = dataset['train']['label'][:MAX_TRAIN]
val_texts = dataset['validation']['sentence'][:MAX_VAL]
val_labels = dataset['validation']['label'][:MAX_VAL]

print(f"Train: {len(train_texts)} samples")
print(f"Val: {len(val_texts)} samples")
print(f"Labels: 0=negative, 1=positive")

In [None]:
# Pre-compute HDC vectors for all texts
print("\nEncoding texts to HDC...")

def batch_encode_hdc(texts, encoder, batch_size=64):
    all_vectors = []
    for i in tqdm(range(0, len(texts), batch_size)):
        batch = texts[i:i+batch_size]
        vectors = encoder.encode(batch)
        all_vectors.append(vectors.cpu())
    return torch.cat(all_vectors, dim=0)

train_hdc = batch_encode_hdc(train_texts, hdc_encoder)
val_hdc = batch_encode_hdc(val_texts, hdc_encoder)

print(f"Train HDC shape: {train_hdc.shape}")
print(f"Val HDC shape: {val_hdc.shape}")

## Step 6: Custom Dataset & Model

In [None]:
class SentimentDataset(Dataset):
    def __init__(self, texts, labels, hdc_vectors, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.hdc_vectors = hdc_vectors
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = f"Classify the sentiment of this text as positive or negative.\nText: {self.texts[idx]}\nSentiment:"
        
        encoding = self.tokenizer(
            text,
            truncation=True,
            max_length=self.max_length,
            padding='max_length',
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'hdc_vector': self.hdc_vectors[idx],
            'label': self.labels[idx]
        }

# Create datasets
train_dataset = SentimentDataset(train_texts, train_labels, train_hdc, tokenizer)
val_dataset = SentimentDataset(val_texts, val_labels, val_hdc, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")

In [None]:
class SentimentClassifierWithHeader(nn.Module):
    """
    Sentiment classifier with optional HDC semantic header.
    """
    
    def __init__(self, base_model, hidden_size, use_header=True, n_tokens=4):
        super().__init__()
        self.base_model = base_model
        self.use_header = use_header
        self.n_tokens = n_tokens
        self.hidden_size = hidden_size
        
        if use_header:
            self.semantic_header = SemanticHeader(
                hdc_dim=10000,
                hidden_dim=hidden_size,
                n_tokens=n_tokens
            )
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Linear(hidden_size, 256),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Linear(256, 2)
        )
    
    def forward(self, input_ids, attention_mask, hdc_vector=None):
        # Get input embeddings
        inputs_embeds = self.base_model.get_input_embeddings()(input_ids)
        
        if self.use_header and hdc_vector is not None:
            # Get pseudo-tokens from HDC
            pseudo_tokens = self.semantic_header(hdc_vector.float())
            pseudo_tokens = pseudo_tokens.to(inputs_embeds.dtype)
            
            # Prepend pseudo-tokens
            inputs_embeds = torch.cat([pseudo_tokens, inputs_embeds], dim=1)
            
            # Extend attention mask
            batch_size = attention_mask.shape[0]
            header_mask = torch.ones(batch_size, self.n_tokens, device=attention_mask.device)
            attention_mask = torch.cat([header_mask, attention_mask], dim=1)
        
        # Forward through LLM
        outputs = self.base_model(
            inputs_embeds=inputs_embeds,
            attention_mask=attention_mask,
            output_hidden_states=True
        )
        
        # Get last hidden state
        last_hidden = outputs.hidden_states[-1][:, -1, :]
        last_hidden = last_hidden.float()
        
        # Classify
        logits = self.classifier(last_hidden)
        
        return logits

## Step 7: Training Functions

In [None]:
def train_epoch(model, loader, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    criterion = nn.CrossEntropyLoss()
    
    for batch in tqdm(loader, desc="Training", leave=False):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        hdc_vector = batch['hdc_vector'].to(device)
        labels = batch['label'].to(device)
        
        optimizer.zero_grad()
        
        logits = model(input_ids, attention_mask, hdc_vector)
        loss = criterion(logits, labels)
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        preds = logits.argmax(dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    
    return total_loss / len(loader), correct / total


def evaluate(model, loader, device):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch in tqdm(loader, desc="Evaluating", leave=False):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            hdc_vector = batch['hdc_vector'].to(device)
            labels = batch['label'].to(device)
            
            logits = model(input_ids, attention_mask, hdc_vector)
            preds = logits.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    
    return correct / total

In [None]:
def run_experiment(use_header, n_tokens=4, epochs=3, lr=1e-4):
    """
    Run full training experiment.
    """
    name = f"HDC Header (k={n_tokens})" if use_header else "Baseline (no header)"
    print(f"\n{'='*60}")
    print(f"Experiment: {name}")
    print(f"{'='*60}")
    
    # Create model
    classifier = SentimentClassifierWithHeader(
        base_model=model,
        hidden_size=HIDDEN_SIZE,
        use_header=use_header,
        n_tokens=n_tokens
    ).to(device)
    
    # Freeze base model
    for param in classifier.base_model.parameters():
        param.requires_grad = False
    
    # Count trainable params
    trainable = sum(p.numel() for p in classifier.parameters() if p.requires_grad)
    print(f"Trainable parameters: {trainable:,}")
    
    # Optimizer
    optimizer = torch.optim.AdamW(
        filter(lambda p: p.requires_grad, classifier.parameters()),
        lr=lr
    )
    
    # Training loop
    history = {'train_loss': [], 'train_acc': [], 'val_acc': []}
    best_val_acc = 0
    
    for epoch in range(epochs):
        print(f"\nEpoch {epoch+1}/{epochs}")
        
        train_loss, train_acc = train_epoch(classifier, train_loader, optimizer, device)
        val_acc = evaluate(classifier, val_loader, device)
        
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_acc'].append(val_acc)
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
        
        print(f"  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")
    
    print(f"\n‚úÖ Best Val Accuracy: {best_val_acc:.4f}")
    
    # Cleanup
    del classifier
    torch.cuda.empty_cache()
    
    return {
        'name': name,
        'use_header': use_header,
        'n_tokens': n_tokens if use_header else 0,
        'best_val_acc': best_val_acc,
        'final_val_acc': history['val_acc'][-1],
        'history': history,
        'trainable_params': trainable
    }

## Step 8: Run Experiments

In [None]:
results = {}

# 1. Baseline (no header)
results['baseline'] = run_experiment(use_header=False, epochs=3)

# 2. HDC Header with k=2
results['header_k2'] = run_experiment(use_header=True, n_tokens=2, epochs=3)

# 3. HDC Header with k=4
results['header_k4'] = run_experiment(use_header=True, n_tokens=4, epochs=3)

# 4. HDC Header with k=8
results['header_k8'] = run_experiment(use_header=True, n_tokens=8, epochs=3)

## Step 9: Results

In [None]:
import matplotlib.pyplot as plt

# Plot training curves
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

colors = {'baseline': 'gray', 'header_k2': 'green', 'header_k4': 'blue', 'header_k8': 'red'}

# Validation accuracy
ax = axes[0]
for name, data in results.items():
    ax.plot(data['history']['val_acc'], label=f"{data['name']} (best: {data['best_val_acc']:.4f})",
            color=colors[name], linewidth=2, marker='o')
ax.set_xlabel('Epoch')
ax.set_ylabel('Validation Accuracy')
ax.set_title('Validation Accuracy by Epoch')
ax.legend()
ax.grid(True, alpha=0.3)

# Training loss
ax = axes[1]
for name, data in results.items():
    ax.plot(data['history']['train_loss'], label=data['name'],
            color=colors[name], linewidth=2, marker='o')
ax.set_xlabel('Epoch')
ax.set_ylabel('Training Loss')
ax.set_title('Training Loss by Epoch')
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('m2.5_prime_results.png', dpi=150)
plt.show()

In [None]:
# Summary table
print("\n" + "="*70)
print("üìä M2.5‚Ä≤ RESULTS: HDC SEMANTIC HEADER")
print("="*70)

baseline_acc = results['baseline']['best_val_acc']

print(f"\n{'Config':<30} {'Val Accuracy':>15} {'vs Baseline':>15} {'Params':>15}")
print("-" * 75)

sorted_results = sorted(results.items(), key=lambda x: x[1]['best_val_acc'], reverse=True)

for name, data in sorted_results:
    acc = data['best_val_acc']
    vs_baseline = ((acc - baseline_acc) / baseline_acc) * 100
    status = "üèÜ" if acc > baseline_acc else ""
    print(f"{data['name']:<30} {acc:>15.4f} {vs_baseline:>+14.2f}% {data['trainable_params']:>15,} {status}")

# Find best
best_name, best_data = sorted_results[0]

print(f"\n" + "="*70)
print("üî¨ ANALYSIS")
print("="*70)

if best_data['use_header']:
    improvement = ((best_data['best_val_acc'] - baseline_acc) / baseline_acc) * 100
    print(f"\n‚úÖ HDC Semantic Header IMPROVES accuracy by {improvement:.2f}%")
    print(f"   Best config: {best_data['name']}")
    print(f"   Accuracy: {best_data['best_val_acc']:.4f} vs baseline {baseline_acc:.4f}")
else:
    print(f"\n‚ö†Ô∏è HDC Semantic Header did NOT improve over baseline")
    print(f"   Baseline accuracy: {baseline_acc:.4f}")

# Verdict
print(f"\n" + "="*70)
print("üìã VERDICT")
print("="*70)

header_results = [r for r in results.values() if r['use_header']]
best_header = max(header_results, key=lambda x: x['best_val_acc'])

if best_header['best_val_acc'] > baseline_acc * 1.01:  # >1% improvement
    print("\n‚úÖ SUCCESS: HDC Semantic Header provides meaningful improvement")
    verdict = "SUCCESS"
elif best_header['best_val_acc'] >= baseline_acc * 0.995:  # Within 0.5%
    print("\n‚ö†Ô∏è PARTIAL: HDC Semantic Header matches baseline (no degradation)")
    verdict = "PARTIAL"
else:
    print("\n‚ùå FAILURE: HDC Semantic Header hurts performance")
    verdict = "FAILURE"

In [None]:
# Save results
import json

output = {
    "phase": "M2.5_prime",
    "experiment": "HDC Semantic Header",
    "hypothesis": "HDC vector as pseudo-tokens improves classification",
    "model": MODEL_NAME,
    "hidden_size": HIDDEN_SIZE,
    "dataset": "SST-2",
    "train_samples": MAX_TRAIN,
    "val_samples": MAX_VAL,
    "results": {
        name: {
            "name": data['name'],
            "use_header": data['use_header'],
            "n_tokens": data['n_tokens'],
            "best_val_acc": float(data['best_val_acc']),
            "final_val_acc": float(data['final_val_acc']),
            "trainable_params": data['trainable_params'],
            "vs_baseline_pct": float(((data['best_val_acc'] - baseline_acc) / baseline_acc) * 100)
        }
        for name, data in results.items()
    },
    "baseline_accuracy": float(baseline_acc),
    "best_header_accuracy": float(best_header['best_val_acc']),
    "best_header_config": best_header['name'],
    "improvement_pct": float(((best_header['best_val_acc'] - baseline_acc) / baseline_acc) * 100),
    "verdict": verdict
}

with open('phase_m2.5_prime_results.json', 'w') as f:
    json.dump(output, f, indent=2)

print("\nüìÅ Results saved to phase_m2.5_prime_results.json")
print("\n" + json.dumps(output, indent=2))

In [None]:
# Download
from google.colab import files
files.download('phase_m2.5_prime_results.json')
files.download('m2.5_prime_results.png')