# üî¨ Phase 4: Learned Compression & Segmented Hypervectors

## Expert Recommendations
1. **Learned Compression:** 16384d ‚Üí 4096d via trainable W_pair
2. **Segmented:** Split 4096d into [P:1536 | H:1536 | Interaction:1024]
3. **Role-Filler:** Add role vectors for asymmetry

## Goal
Achieve ~65% accuracy in **4096d** (vs current 65.6% in 16384d)

---

In [None]:
!pip install -q sentence-transformers datasets
print("‚úÖ Dependencies installed")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from datetime import datetime
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import json

from datasets import load_dataset
from sentence_transformers import SentenceTransformer

print(f"PyTorch: {torch.__version__}")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")
print(f"\nüî¨ Phase 4: Learned Compression & Segmented")

In [None]:
# Use best encoder from Phase 2
ENCODER_NAME = 'sentence-transformers/nli-mpnet-base-v2'
print(f"Loading {ENCODER_NAME}...")
encoder = SentenceTransformer(ENCODER_NAME)
SEMANTIC_DIM = encoder.get_sentence_embedding_dimension()
print(f"‚úÖ Encoder loaded: {SEMANTIC_DIM}d")

In [None]:
# Load MNLI
print("\nLoading MNLI...")
dataset = load_dataset("glue", "mnli")

TRAIN_SIZE = 5000
TEST_SIZE = 500

train_data = dataset['train'].shuffle(seed=42).select(range(TRAIN_SIZE))
test_data = dataset['validation_matched'].select(range(TEST_SIZE))

train_labels = np.array(train_data['label'])
test_labels = np.array(test_data['label'])

print(f"‚úÖ Train: {TRAIN_SIZE}, Test: {TEST_SIZE}")

In [None]:
# Pre-compute embeddings
print("\nüîÑ Computing embeddings...")

train_p = encoder.encode(list(train_data['premise']), show_progress_bar=True)
train_h = encoder.encode(list(train_data['hypothesis']), show_progress_bar=True)
test_p = encoder.encode(list(test_data['premise']), show_progress_bar=True)
test_h = encoder.encode(list(test_data['hypothesis']), show_progress_bar=True)

# Convert to torch
train_p_t = torch.tensor(train_p, dtype=torch.float32)
train_h_t = torch.tensor(train_h, dtype=torch.float32)
test_p_t = torch.tensor(test_p, dtype=torch.float32)
test_h_t = torch.tensor(test_h, dtype=torch.float32)
train_labels_t = torch.tensor(train_labels, dtype=torch.long)
test_labels_t = torch.tensor(test_labels, dtype=torch.long)

print(f"‚úÖ Embeddings: {train_p.shape}")

---
## Approach 1: Learned Compression (16384 ‚Üí 4096)
---

In [None]:
class LearnedCompressionModel(nn.Module):
    """
    Two-Vector features (16384d) ‚Üí Learned compression ‚Üí 4096d ‚Üí Ternary ‚Üí MLP
    """
    def __init__(self, semantic_dim=768, hdc_dim=4096, hidden_dim=512, 
                 num_classes=3, dropout=0.3):
        super().__init__()
        
        self.hdc_dim = hdc_dim
        
        # Random projection (frozen) - same as before
        projection = torch.randn(semantic_dim, hdc_dim)
        projection = projection / projection.norm(dim=0, keepdim=True)
        self.register_buffer('projection', projection)
        
        # Learned compression: 16384 ‚Üí 4096
        self.compressor = nn.Linear(hdc_dim * 4, hdc_dim, bias=False)
        
        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(hdc_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, num_classes)
        )
    
    def ternary_quantize(self, x, threshold_factor=0.3):
        """Differentiable approximation of ternary quantization."""
        # During training: soft quantization
        # During eval: hard quantization
        if self.training:
            # Use tanh as soft approximation
            return torch.tanh(x * 3)  # Scale to make it more discrete-like
        else:
            # Hard ternary
            std = x.std(dim=1, keepdim=True)
            thr = threshold_factor * std
            return torch.where(x > thr, torch.ones_like(x),
                              torch.where(x < -thr, -torch.ones_like(x), 
                                         torch.zeros_like(x)))
    
    def forward(self, p_emb, h_emb, return_hdc=False):
        # Project to HDC space
        p_hdc = p_emb @ self.projection
        h_hdc = h_emb @ self.projection
        
        # Two-vector features
        diff = p_hdc - h_hdc
        prod = p_hdc * h_hdc
        features = torch.cat([p_hdc, h_hdc, diff, prod], dim=1)  # 16384d
        
        # Learned compression
        compressed = self.compressor(features)  # 4096d
        
        # Ternary quantization
        hdc_vec = self.ternary_quantize(compressed)
        
        if return_hdc:
            return hdc_vec
        
        # Classify
        logits = self.classifier(hdc_vec)
        return logits

print("‚úÖ LearnedCompressionModel defined")

---
## Approach 2: Segmented Hypervectors
---

In [None]:
class SegmentedHDCModel(nn.Module):
    """
    Segmented approach: [P_block | H_block | Interaction_block]
    No interference between P and H.
    """
    def __init__(self, semantic_dim=768, total_dim=4096, hidden_dim=512,
                 num_classes=3, dropout=0.3):
        super().__init__()
        
        # Segment sizes
        self.p_dim = 1536
        self.h_dim = 1536
        self.i_dim = total_dim - self.p_dim - self.h_dim  # 1024
        self.total_dim = total_dim
        
        # Separate projections for each block
        # P and H use same projection (for comparability)
        proj_main = torch.randn(semantic_dim, self.p_dim)
        proj_main = proj_main / proj_main.norm(dim=0, keepdim=True)
        self.register_buffer('proj_main', proj_main)
        
        # Interaction projection
        proj_inter = torch.randn(semantic_dim, self.i_dim)
        proj_inter = proj_inter / proj_inter.norm(dim=0, keepdim=True)
        self.register_buffer('proj_inter', proj_inter)
        
        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(total_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, num_classes)
        )
    
    def ternary_quantize(self, x, threshold_factor=0.3):
        if self.training:
            return torch.tanh(x * 3)
        else:
            std = x.std(dim=1, keepdim=True)
            thr = threshold_factor * std
            return torch.where(x > thr, torch.ones_like(x),
                              torch.where(x < -thr, -torch.ones_like(x), 
                                         torch.zeros_like(x)))
    
    def forward(self, p_emb, h_emb, return_hdc=False):
        # Block 1: Premise
        p_block = p_emb @ self.proj_main  # 1536d
        
        # Block 2: Hypothesis  
        h_block = h_emb @ self.proj_main  # 1536d
        
        # Block 3: Interaction (computed in semantic space, then projected)
        diff_emb = p_emb - h_emb
        prod_emb = p_emb * h_emb
        i_block = (diff_emb + prod_emb) @ self.proj_inter  # 1024d
        
        # Concatenate segments
        full_vec = torch.cat([p_block, h_block, i_block], dim=1)  # 4096d
        
        # Ternary quantization
        hdc_vec = self.ternary_quantize(full_vec)
        
        if return_hdc:
            return hdc_vec
        
        # Classify
        logits = self.classifier(hdc_vec)
        return logits

print("‚úÖ SegmentedHDCModel defined")

---
## Approach 3: Role-Filler Binding
---

In [None]:
class RoleFillerModel(nn.Module):
    """
    Role-Filler binding: P*R_prem + H*R_hyp + interaction
    Proper HDC/VSA style.
    """
    def __init__(self, semantic_dim=768, hdc_dim=4096, hidden_dim=512,
                 num_classes=3, dropout=0.3):
        super().__init__()
        
        self.hdc_dim = hdc_dim
        
        # Random projection
        projection = torch.randn(semantic_dim, hdc_dim)
        projection = projection / projection.norm(dim=0, keepdim=True)
        self.register_buffer('projection', projection)
        
        # Role vectors (fixed, random ternary)
        torch.manual_seed(42)
        # Sparse ternary: ~25% -1, ~50% 0, ~25% +1
        r_prem = torch.zeros(hdc_dim)
        r_hyp = torch.zeros(hdc_dim)
        
        # Generate sparse ternary
        for r in [r_prem, r_hyp]:
            mask = torch.rand(hdc_dim)
            r[mask < 0.25] = -1
            r[mask > 0.75] = 1
        
        self.register_buffer('r_prem', r_prem)
        self.register_buffer('r_hyp', r_hyp)
        
        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(hdc_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, num_classes)
        )
    
    def ternary_quantize(self, x, threshold_factor=0.3):
        if self.training:
            return torch.tanh(x * 3)
        else:
            std = x.std(dim=1, keepdim=True)
            thr = threshold_factor * std
            return torch.where(x > thr, torch.ones_like(x),
                              torch.where(x < -thr, -torch.ones_like(x), 
                                         torch.zeros_like(x)))
    
    def forward(self, p_emb, h_emb, return_hdc=False):
        # Project to HDC
        p_hdc = p_emb @ self.projection
        h_hdc = h_emb @ self.projection
        
        # Bind with role vectors
        p_bound = p_hdc * self.r_prem
        h_bound = h_hdc * self.r_hyp
        
        # Bundle + difference for directionality
        diff = p_hdc - h_hdc
        pair_hv = p_bound + h_bound + diff
        
        # Ternary quantization
        hdc_vec = self.ternary_quantize(pair_hv)
        
        if return_hdc:
            return hdc_vec
        
        # Classify
        logits = self.classifier(hdc_vec)
        return logits

print("‚úÖ RoleFillerModel defined")

---
## Baseline: Two-Vector (16384d) for comparison
---

In [None]:
class TwoVectorBaseline(nn.Module):
    """Baseline: Two-Vector with 16384d (from Phase 3)."""
    def __init__(self, semantic_dim=768, hdc_dim=4096, hidden_dim=1024,
                 num_classes=3, dropout=0.3):
        super().__init__()
        
        self.hdc_dim = hdc_dim
        
        projection = torch.randn(semantic_dim, hdc_dim)
        projection = projection / projection.norm(dim=0, keepdim=True)
        self.register_buffer('projection', projection)
        
        # Larger classifier for 16384d input
        self.classifier = nn.Sequential(
            nn.Linear(hdc_dim * 4, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, num_classes)
        )
    
    def ternary_quantize(self, x, threshold_factor=0.3):
        if self.training:
            return torch.tanh(x * 3)
        else:
            std = x.std(dim=1, keepdim=True)
            thr = threshold_factor * std
            return torch.where(x > thr, torch.ones_like(x),
                              torch.where(x < -thr, -torch.ones_like(x), 
                                         torch.zeros_like(x)))
    
    def forward(self, p_emb, h_emb, return_hdc=False):
        p_hdc = p_emb @ self.projection
        h_hdc = h_emb @ self.projection
        
        diff = p_hdc - h_hdc
        prod = p_hdc * h_hdc
        features = torch.cat([p_hdc, h_hdc, diff, prod], dim=1)  # 16384d
        
        hdc_vec = self.ternary_quantize(features)
        
        if return_hdc:
            return hdc_vec
        
        logits = self.classifier(hdc_vec)
        return logits

print("‚úÖ TwoVectorBaseline defined")

In [None]:
# Dataset
class PairDataset(Dataset):
    def __init__(self, p_emb, h_emb, labels):
        self.p_emb = p_emb
        self.h_emb = h_emb
        self.labels = labels
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.p_emb[idx], self.h_emb[idx], self.labels[idx]

train_dataset = PairDataset(train_p_t, train_h_t, train_labels_t)
test_dataset = PairDataset(test_p_t, test_h_t, test_labels_t)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

print("‚úÖ DataLoaders ready")

In [None]:
def train_model(model, train_loader, test_loader, num_epochs=30, lr=1e-3):
    """Train model and return best accuracy."""
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)
    
    best_acc = 0
    history = []
    
    for epoch in range(num_epochs):
        # Train
        model.train()
        for p_emb, h_emb, labels in train_loader:
            p_emb = p_emb.to(device)
            h_emb = h_emb.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            logits = model(p_emb, h_emb)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()
        
        scheduler.step()
        
        # Evaluate
        model.eval()
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for p_emb, h_emb, labels in test_loader:
                p_emb = p_emb.to(device)
                h_emb = h_emb.to(device)
                
                logits = model(p_emb, h_emb)
                preds = torch.argmax(logits, dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.numpy())
        
        acc = accuracy_score(all_labels, all_preds)
        history.append(acc)
        best_acc = max(best_acc, acc)
        
        if (epoch + 1) % 10 == 0:
            print(f"   Epoch {epoch+1}: {acc:.1%}")
    
    return best_acc, history

In [None]:
print("\n" + "="*60)
print("üî¨ RUNNING PHASE 4 EXPERIMENTS")
print("="*60)

results = {}

# Models to test
models = {
    '0_baseline_16384d': (TwoVectorBaseline, 16384),
    '1_learned_compression_4096d': (LearnedCompressionModel, 4096),
    '2_segmented_4096d': (SegmentedHDCModel, 4096),
    '3_role_filler_4096d': (RoleFillerModel, 4096),
}

for name, (ModelClass, dim) in models.items():
    print(f"\nüìä Testing: {name} ({dim}d)")
    
    model = ModelClass()
    acc, history = train_model(model, train_loader, test_loader, num_epochs=30)
    
    results[name] = {
        'accuracy': acc,
        'dim': dim,
        'history': history
    }
    
    print(f"   ‚úÖ Best: {acc:.1%}")
    
    del model
    torch.cuda.empty_cache()

In [None]:
print("\n" + "="*60)
print("üìä RESULTS SUMMARY")
print("="*60)

baseline_acc = results['0_baseline_16384d']['accuracy']
phase2_best = 0.638  # From Phase 2

print(f"\n{'Approach':<35} {'Dim':<8} {'Accuracy':<10} {'vs Baseline'}")
print("-" * 65)

sorted_results = sorted(results.items(), key=lambda x: x[1]['accuracy'], reverse=True)

for name, data in sorted_results:
    diff = (data['accuracy'] - baseline_acc) * 100
    marker = "üèÜ" if data['accuracy'] == sorted_results[0][1]['accuracy'] else "  "
    print(f"{marker} {name:<33} {data['dim']:<8} {data['accuracy']:.1%}      {diff:+.1f}%")

print(f"\nüìà Phase 2 best (for reference): {phase2_best:.1%}")

In [None]:
# Key analysis
best_name = sorted_results[0][0]
best_acc = sorted_results[0][1]['accuracy']
best_dim = sorted_results[0][1]['dim']

# Find best 4096d approach
best_4096 = max([(n, d) for n, d in results.items() if d['dim'] == 4096],
                key=lambda x: x[1]['accuracy'])

print("\n" + "="*60)
print("üéØ KEY FINDINGS")
print("="*60)

print(f"\nüèÜ Overall best: {best_name}")
print(f"   Accuracy: {best_acc:.1%}, Dim: {best_dim}")

print(f"\nüéØ Best 4096d approach: {best_4096[0]}")
print(f"   Accuracy: {best_4096[1]['accuracy']:.1%}")
print(f"   vs 16384d baseline: {(best_4096[1]['accuracy'] - baseline_acc)*100:+.1f}%")

if best_4096[1]['accuracy'] >= baseline_acc - 0.02:
    print(f"\n‚úÖ SUCCESS: 4096d achieves similar accuracy to 16384d!")
    print(f"   Can use 4x smaller vectors for protocol.")
    verdict = "COMPRESSION_SUCCESS"
elif best_4096[1]['accuracy'] >= phase2_best:
    print(f"\nüìà PARTIAL: 4096d better than Phase 2, but below 16384d baseline")
    verdict = "PARTIAL_SUCCESS"
else:
    print(f"\n‚ö†Ô∏è 4096d approaches underperform")
    verdict = "NEEDS_MORE_WORK"

In [None]:
# Visualize
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Bar chart
ax = axes[0]
names = [n.replace('_', '\n') for n, _ in sorted_results]
accs = [d['accuracy'] for _, d in sorted_results]
dims = [d['dim'] for _, d in sorted_results]
colors = ['lightgreen' if d == 4096 else 'lightblue' for d in dims]

bars = ax.bar(names, accs, color=colors, edgecolor='black')
ax.axhline(y=baseline_acc, color='blue', linestyle='--', alpha=0.7, 
           label=f'16384d baseline ({baseline_acc:.1%})')
ax.axhline(y=phase2_best, color='red', linestyle=':', alpha=0.7,
           label=f'Phase 2 best ({phase2_best:.1%})')
ax.axhline(y=0.33, color='gray', linestyle=':', alpha=0.3)
ax.set_ylabel('Accuracy')
ax.set_title('Phase 4: Compression Approaches')
ax.legend(loc='lower right')
ax.set_ylim(0.3, max(accs) + 0.05)

for bar, acc, dim in zip(bars, accs, dims):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
            f'{acc:.1%}\n({dim}d)', ha='center', fontsize=9, fontweight='bold')

# Learning curves
ax = axes[1]
for name, data in results.items():
    short_name = name.split('_')[1] if '_' in name else name
    ax.plot(data['history'], label=f"{short_name}: {data['accuracy']:.1%}", linewidth=2)

ax.axhline(y=baseline_acc, color='blue', linestyle='--', alpha=0.5)
ax.set_xlabel('Epoch')
ax.set_ylabel('Test Accuracy')
ax.set_title('Learning Curves')
ax.legend(loc='lower right')
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('phase4_compression_results.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Save results
output = {
    'experiment': 'Phase 4: Learned Compression & Segmented',
    'dataset': 'MNLI',
    'encoder': ENCODER_NAME,
    'train_size': TRAIN_SIZE,
    'test_size': TEST_SIZE,
    'results': {k: {'accuracy': v['accuracy'], 'dim': v['dim']} 
                for k, v in results.items()},
    'best_overall': best_name,
    'best_4096d': best_4096[0],
    'best_4096d_accuracy': best_4096[1]['accuracy'],
    'baseline_16384d': baseline_acc,
    'compression_loss': baseline_acc - best_4096[1]['accuracy'],
    'verdict': verdict,
    'timestamp': datetime.now().isoformat()
}

with open('phase4_compression_results.json', 'w') as f:
    json.dump(output, f, indent=2)

print("\n‚úÖ Results saved!")
print(json.dumps(output, indent=2))

In [None]:
print("\n" + "="*60)
print("üìã CONCLUSIONS & NEXT STEPS")
print("="*60)

print(f"""
Phase 4 Results:
- Baseline (16384d): {baseline_acc:.1%}
- Best 4096d: {best_4096[1]['accuracy']:.1%} ({best_4096[0]})
- Compression loss: {(baseline_acc - best_4096[1]['accuracy'])*100:.1f}%

For Resonance Protocol:
- If 4096d is acceptable: Use {best_4096[0]}
- If need maximum accuracy: Use 16384d Two-Vector
- Trade-off: {(baseline_acc - best_4096[1]['accuracy'])*100:.1f}% accuracy for 4x compression

Remaining options to try:
1. Learned sentence‚ÜíHDC projection (W_sent)
2. Larger HDC dim (8192d) as middle ground
3. More training data
4. Fine-tune NLI encoder for HDC
""")

In [None]:
from google.colab import files
files.download('phase4_compression_results.json')
files.download('phase4_compression_results.png')