# Laminet Training with Field-Based Text Generation

This notebook implements the Laminet (Lamina Networks) architecture with a field-based text generation component. Unlike the previous version which only retrieved responses, this model generates new text using field evolution principles without relying on transformer-based language models.

## Setup and Environment

In [None]:
# Install required dependencies
!pip install -q torch torchvision matplotlib numpy tqdm scikit-learn ipywidgets transformers

In [None]:
import os
import json
import time
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
from torch.nn import functional as F
from transformers import AutoTokenizer, AutoModel, BertTokenizer
from tqdm.notebook import tqdm
from matplotlib.animation import FuncAnimation
from IPython.display import HTML, display
from sklearn.manifold import TSNE
import random
import math
from torch.cuda.amp import autocast, GradScaler  # For mixed precision training

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory Allocated: {torch.cuda.memory_allocated(0)/1024**2:.2f} MB")
    print(f"Memory Cached: {torch.cuda.memory_reserved(0)/1024**2:.2f} MB")

## Laminet Base Architecture

First, we'll implement the core Laminet components that evolve field points under semantic forces.

In [None]:
class FieldPoint(nn.Module):
    """Represents a point in the semantic field with position, velocity, mass, and charge."""
    def __init__(self, embed_dim, init_position=None, init_mass=1.0, init_charge=0.0, init_decay=0.1):
        super().__init__()
        # Initialize position or use provided position
        if init_position is not None:
            self.position = nn.Parameter(init_position.clone().detach())
        else:
            self.position = nn.Parameter(torch.randn(embed_dim) * 0.02)
            
        # Initialize velocity with zeros
        self.velocity = nn.Parameter(torch.zeros(embed_dim))
        
        # Mass and charge parameters
        self.log_mass = nn.Parameter(torch.tensor(math.log(init_mass)))
        self.charge = nn.Parameter(torch.tensor(init_charge))
        self.decay = nn.Parameter(torch.tensor(init_decay))
        
    @property
    def mass(self):
        # Mass is always positive
        return torch.exp(self.log_mass)
    
    def reset_velocity(self):
        # Reset velocity to zero (useful between batches)
        with torch.no_grad():
            self.velocity.zero_()
            
    def __repr__(self):
        return f"FieldPoint(pos={self.position.norm():.2f}, vel={self.velocity.norm():.2f}, mass={self.mass.item():.2f}, charge={self.charge.item():.2f})"

In [None]:
class EvolutionEngine(nn.Module):
    """Evolves field points based on semantic forces."""
    def __init__(self, epsilon=1e-6, min_distance=0.1, max_force=10.0):
        super().__init__()
        self.epsilon = epsilon  # Prevent division by zero
        self.min_distance = min_distance  # Minimum distance to prevent excessive forces
        self.max_force = max_force  # Maximum force magnitude
        
    def compute_forces(self, positions, charges, masses):
        """Optimized force computation with batch operations."""
        n_points = positions.shape[0]
        
        # Vectorized operations for pairwise calculations
        # Compute all distances at once
        diffs = positions.unsqueeze(0) - positions.unsqueeze(1)  # [n, n, dim]
        squared_dists = torch.sum(diffs**2, dim=-1)  # [n, n]
        squared_dists = torch.clamp(squared_dists, min=self.min_distance**2) + self.epsilon
        
        # Compute charge products efficiently
        charge_prods = charges.unsqueeze(0) * charges.unsqueeze(1)  # [n, n]
        
        # Compute force magnitudes
        force_mags = charge_prods / squared_dists  # [n, n]
        force_mags = torch.clamp(force_mags, min=-self.max_force, max=self.max_force)
        
        # Mask out self-interactions
        mask = 1.0 - torch.eye(n_points, device=positions.device)
        force_mags = force_mags * mask
        
        # Normalize directions and compute forces
        dist = torch.sqrt(squared_dists).unsqueeze(-1)  # [n, n, 1]
        norm_diffs = diffs / (dist + self.epsilon)  # [n, n, dim]
        
        # Apply forces
        forces = torch.sum(norm_diffs * force_mags.unsqueeze(-1), dim=1)  # [n, dim]
        
        return forces
        
    def forward(self, field_points, delta_t=0.1, steps=5):
        """Evolve field points over time."""
        # Extract field point properties
        positions = torch.stack([p.position for p in field_points])
        velocities = torch.stack([p.velocity for p in field_points])
        masses = torch.stack([p.mass for p in field_points])
        charges = torch.stack([p.charge for p in field_points])
        decays = torch.stack([p.decay for p in field_points])
        
        # Store evolution history for visualization
        position_history = [positions.clone().detach()]
        
        # Evolve the field for multiple steps
        for step in range(steps):
            # Compute forces
            forces = self.compute_forces(positions, charges, masses)
            
            # Update velocities (F = ma -> a = F/m)
            accelerations = forces / masses.unsqueeze(1)
            
            # Apply velocity decay (damping)
            velocity_decay = (1.0 - decays * delta_t).unsqueeze(1)
            velocities = velocity_decay * velocities + accelerations * delta_t
            
            # Update positions
            positions = positions + velocities * delta_t
            
            # Store position history
            position_history.append(positions.clone().detach())
        
        # Update field points with new positions and velocities
        for i, point in enumerate(field_points):
            point.position.data = positions[i].data
            point.velocity.data = velocities[i].data
        
        # Calculate potential energy of the system (simplified for speed)
        potential_energy = 0.0
        n_points = len(field_points)
        # Calculate potential energy for a subset of pairs
        sample_rate = 0.5  # Only calculate half of all pairs
        for i in range(n_points):
            for j in range(i+1, n_points):
                if random.random() < sample_rate:
                    dist = torch.norm(field_points[i].position - field_points[j].position)
                    potential_energy += (field_points[i].charge * field_points[j].charge) / (dist + self.epsilon)
        potential_energy = potential_energy / sample_rate  # Scale to account for sampling
        
        return position_history, potential_energy

## Field-Based Text Generator

Now we'll implement a text generator that uses field evolution principles to generate text, avoiding conventional transformer architectures.

In [None]:
class FieldBasedGenerator(nn.Module):
    """Generates text directly from field embeddings using field evolution principles."""
    def __init__(self, field_dim, vocab_size, max_seq_len=50, hidden_dim=64):
        super().__init__()
        self.field_dim = field_dim
        self.vocab_size = vocab_size
        self.max_seq_len = max_seq_len
        self.hidden_dim = hidden_dim
        
        # Special tokens
        self.pad_token_id = 0
        self.bos_token_id = 101  # [CLS] token in BERT
        self.eos_token_id = 102  # [SEP] token in BERT
        
        # Word embeddings representing semantic points in the field
        self.word_embeddings = nn.Embedding(vocab_size, field_dim)
        
        # Field-based sequence generator - create attractor points for the sequence field
        self.sequence_field = nn.ModuleList([
            FieldPoint(field_dim, init_charge=0.5) 
            for _ in range(hidden_dim)
        ])
        
        # Sequence evolution engine with smaller min_distance for finer control
        self.sequence_engine = EvolutionEngine(epsilon=1e-6, min_distance=0.05, max_force=5.0)
        
        # Output projection from field to vocabulary
        self.output_projection = nn.Sequential(
            nn.Linear(field_dim, field_dim * 2),
            nn.GELU(),
            nn.Linear(field_dim * 2, vocab_size)
        )
        
        # Sequence position encoder
        self.position_encoder = nn.Embedding(max_seq_len, field_dim)
        
    def forward(self, field_embedding, max_length=30, temperature=0.8, top_k=50):
        """Generate text from a field embedding."""
        batch_size = field_embedding.size(0)
        generated_ids = torch.full((batch_size, 1), self.bos_token_id, dtype=torch.long, device=field_embedding.device)
        
        # Create initial sequence point from field embedding
        sequence_points = []
        for b in range(batch_size):
            sequence_points.append(
                FieldPoint(
                    self.field_dim,
                    init_position=field_embedding[b],
                    init_mass=0.5,
                    init_charge=-1.0
                )
            )
        
        # Generate sequence token by token
        for i in range(max_length):
            new_sequence_points = []
            next_token_ids = []
            
            for b, sequence_point in enumerate(sequence_points):
                # Skip if already generated EOS
                if generated_ids[b, -1].item() == self.eos_token_id:
                    new_sequence_points.append(sequence_point)  # Keep as is
                    next_token_ids.append(self.pad_token_id)  # Add padding token
                    continue
                
                # Evolve sequence point with sequence field
                all_points = [sequence_point] + list(self.sequence_field)
                
                # Add position encoding
                pos_encoding = self.position_encoder(torch.tensor([i], device=device))[0]
                sequence_point.position.data = sequence_point.position.data + 0.1 * pos_encoding
                
                # Evolve the field
                position_history, _ = self.sequence_engine(all_points, steps=3)
                
                # Get evolved position
                evolved_position = position_history[-1][0]
                
                # Project to vocabulary
                logits = self.output_projection(evolved_position)
                
                # Apply temperature and sample
                logits = logits / temperature
                
                # Apply top-k sampling
                top_k_logits, top_k_indices = torch.topk(logits, top_k)
                probs = F.softmax(top_k_logits, dim=-1)
                
                # Sample from the top-k distribution
                top_k_idx = torch.multinomial(probs, 1).item()
                next_token_id = top_k_indices[top_k_idx].item()
                next_token_ids.append(next_token_id)
                
                # Get word embedding for the next token
                word_embedding = self.word_embeddings(torch.tensor([next_token_id], device=evolved_position.device))[0]
                
                # Create new sequence point combining evolved position and word embedding
                new_sequence_point = FieldPoint(
                    self.field_dim,
                    init_position=(evolved_position + word_embedding) / 2,  # Blend the two
                    init_mass=0.5,
                    init_charge=-1.0
                )
                
                new_sequence_points.append(new_sequence_point)
            
            # Update sequence points
            sequence_points = new_sequence_points
            
            # Add new tokens to generated_ids
            next_token_tensor = torch.tensor(next_token_ids, device=generated_ids.device).unsqueeze(1)
            generated_ids = torch.cat([generated_ids, next_token_tensor], dim=1)
            
            # Check if all sequences have EOS token
            if (generated_ids == self.eos_token_id).sum(dim=1).bool().all():
                break
        
        return generated_ids
    
    def train_step(self, field_embeddings, target_ids, teacher_forcing_ratio=0.5):
        """Train the generator on target sequences."""
        batch_size = field_embeddings.size(0)
        seq_len = target_ids.size(1)
        
        # Initialize loss
        loss = 0.0
        
        # Create initial sequence points from field embeddings
        sequence_points = []
        for b in range(batch_size):
            sequence_points.append(
                FieldPoint(
                    self.field_dim,
                    init_position=field_embeddings[b],
                    init_mass=0.5,
                    init_charge=-1.0
                )
            )
        
        # Loop through sequence
        for i in range(1, seq_len):  # Start from 1 to predict after BOS token
            new_sequence_points = []
            step_loss = 0.0
            
            for b, sequence_point in enumerate(sequence_points):
                # Evolve sequence point with sequence field
                all_points = [sequence_point] + list(self.sequence_field)
                
                # Add position encoding
                pos_encoding = self.position_encoder(torch.tensor([i-1], device=device))[0]
                sequence_point.position.data = sequence_point.position.data + 0.1 * pos_encoding
                
                # Evolve the field
                position_history, _ = self.sequence_engine(all_points, steps=3)
                
                # Get evolved position
                evolved_position = position_history[-1][0]
                
                # Project to vocabulary
                logits = self.output_projection(evolved_position)
                
                # Compute loss against target token
                target = target_ids[b, i]
                token_loss = F.cross_entropy(logits.unsqueeze(0), target.unsqueeze(0))
                step_loss += token_loss
                
                # Get next token (either from target for teacher forcing or from predicted token)
                use_teacher_forcing = random.random() < teacher_forcing_ratio
                
                if use_teacher_forcing:
                    next_token_id = target.item()
                else:
                    # Sample from predicted distribution
                    probs = F.softmax(logits, dim=-1)
                    next_token_id = torch.multinomial(probs, 1).item()
                
                # Get word embedding for the next token
                word_embedding = self.word_embeddings(torch.tensor([next_token_id], device=evolved_position.device))[0]
                
                # Create new sequence point combining evolved position and word embedding
                new_sequence_point = FieldPoint(
                    self.field_dim,
                    init_position=(evolved_position + word_embedding) / 2,
                    init_mass=0.5,
                    init_charge=-1.0
                )
                
                new_sequence_points.append(new_sequence_point)
            
            # Update sequence points
            sequence_points = new_sequence_points
            
            # Add step loss to total loss
            loss += step_loss / batch_size
        
        # Return average loss
        return loss / (seq_len - 1)

## Enhanced Laminet Model with Text Generation

Now we'll combine the base Laminet model with the field-based text generator.

In [None]:
class GenerativeLaminet(nn.Module):
    """Enhanced Laminet model with field-based text generation capabilities."""
    def __init__(self, 
                 encoder_model_name='sentence-transformers/all-MiniLM-L6-v2', 
                 field_dim=64,
                 num_attractor_points=20,
                 num_evolution_steps=5,
                 vocab_size=30522,  # BERT vocab size
                 delta_t=0.1):
        super().__init__()
        
        # Text tokenizer (BERT tokenizer)
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        
        # Encoder - use a pretrained sentence transformer
        self.encoder_tokenizer = AutoTokenizer.from_pretrained(encoder_model_name)
        self.encoder = AutoModel.from_pretrained(encoder_model_name)
        
        # Get the encoder output dimension
        self.embed_dim = self.encoder.config.hidden_size
        
        # Project encoder output to field space if dimensions don't match
        self.field_dim = field_dim
        if self.embed_dim != self.field_dim:
            self.projector = nn.Linear(self.embed_dim, self.field_dim)
        else:
            self.projector = nn.Identity()
        
        # Memory field - attractor points in the field
        self.attractor_points = nn.ModuleList([
            FieldPoint(field_dim, init_charge=1.0) 
            for _ in range(num_attractor_points)
        ])
        
        # Evolution engine
        self.evolution_engine = EvolutionEngine()
        self.num_evolution_steps = num_evolution_steps
        self.delta_t = delta_t
        
        # Decoder - transforms evolved field back to embedding space
        self.decoder = nn.Sequential(
            nn.Linear(field_dim, field_dim*2),
            nn.LeakyReLU(),
            nn.Linear(field_dim*2, field_dim),
        )
        
        # Field-based text generator
        self.text_generator = FieldBasedGenerator(
            field_dim=field_dim,
            vocab_size=vocab_size,
            max_seq_len=50,
            hidden_dim=64
        )
        
        # Store the last field evolution for visualization
        self.last_field_history = None
        
    def encode_text(self, texts):
        """Encode texts to embeddings."""
        # Tokenize texts
        inputs = self.encoder_tokenizer(texts, padding=True, truncation=True, return_tensors="pt").to(device)
        
        # Get embeddings
        with torch.no_grad():
            outputs = self.encoder(**inputs)
            # Use CLS token or mean pooling
            embeddings = outputs.last_hidden_state[:, 0]  # CLS token
            # Project to field dimension if needed
            field_embeddings = self.projector(embeddings)
            
        return field_embeddings
    
    def create_query_point(self, embedding):
        """Create a query point from input embedding."""
        return FieldPoint(
            self.field_dim,
            init_position=embedding,
            init_mass=0.5,  # Lower mass to be more influenced by attractors
            init_charge=-1.0  # Opposite charge to be attracted to memory points
        )
    
    def evolve_field(self, query_point):
        """Evolve the field with query and attractor points."""
        # Combine query and attractor points
        all_points = [query_point] + list(self.attractor_points)
        
        # Evolve field
        position_history, potential_energy = self.evolution_engine(
            all_points, 
            delta_t=self.delta_t, 
            steps=self.num_evolution_steps
        )
        
        # Store history for visualization
        self.last_field_history = position_history
        
        # Return evolved query point position
        return query_point.position, potential_energy
    
    def forward(self, source_texts):
        """Process input texts through the Laminet model."""
        # Encode source texts
        source_embeddings = self.encode_text(source_texts)
        
        # Process each source embedding
        evolved_embeddings = []
        potential_energies = []
        
        for embedding in source_embeddings:
            # Create query point
            query_point = self.create_query_point(embedding)
            
            # Evolve field
            evolved_embedding, potential_energy = self.evolve_field(query_point)
            
            evolved_embeddings.append(evolved_embedding)
            potential_energies.append(potential_energy)
            
        # Stack evolved embeddings
        evolved_embeddings = torch.stack(evolved_embeddings)
        potential_energies = torch.stack(potential_energies)
        
        # Decode evolved embeddings
        decoded_embeddings = self.decoder(evolved_embeddings)
        
        return decoded_embeddings, potential_energies
    
    def generate_text(self, source_texts, max_length=30, temperature=0.8, top_k=50):
        """Generate text responses using field-based generation."""
        # Get evolved field embeddings
        evolved_embeddings, _ = self.forward(source_texts)
        
        # Use text generator to produce token IDs
        token_ids = self.text_generator(evolved_embeddings, max_length, temperature, top_k)
        
        # Convert token IDs to text
        texts = []
        for ids in token_ids:
            # Remove padding, BOS, and EOS tokens
            text = self.tokenizer.decode(ids, skip_special_tokens=True)
            texts.append(text)
            
        return texts

## Dataset Preparation

We'll prepare the dataset for training with the text generation component.

In [None]:
class LaminetDataset(Dataset):
    """Dataset for Laminet training with text generation."""
    def __init__(self, samples_path, tokenizer):
        """Initialize dataset from samples JSON file."""
        with open(samples_path, 'r') as f:
            self.samples = json.load(f)
        self.tokenizer = tokenizer
        print(f"Loaded {len(self.samples)} samples")
            
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        """Get a sample by index."""
        sample = self.samples[idx]
        
        # Tokenize target text for the text generator
        target_encoding = self.tokenizer(
            sample['target_text'],
            padding='max_length',
            truncation=True,
            max_length=50,
            return_tensors='pt'
        )
        
        return {
            'sample_id': sample['sample_id'],
            'source_text': sample['source_text'],
            'target_text': sample['target_text'],
            'source_space': sample['source_space'],
            'source_concept': sample['source_concept'],
            'target_space': sample['target_space'],
            'target_concept': sample['target_concept'],
            'transition_pattern': sample['transition_pattern'],
            'target_input_ids': target_encoding['input_ids'][0],
            'target_attention_mask': target_encoding['attention_mask'][0]
        }
    
    def get_spaces_and_concepts(self):
        """Get unique spaces and concepts for visualization."""
        spaces = set()
        concepts = {}
        
        for sample in self.samples:
            spaces.add(sample['source_space'])
            spaces.add(sample['target_space'])
            
            source_space = sample['source_space']
            target_space = sample['target_space']
            source_concept = sample['source_concept']
            target_concept = sample['target_concept']
            
            if source_space not in concepts:
                concepts[source_space] = set()
            if target_space not in concepts:
                concepts[target_space] = set()
                
            concepts[source_space].add(source_concept)
            concepts[target_space].add(target_concept)
            
        return spaces, concepts

In [None]:
# Initialize model
model = GenerativeLaminet(
    encoder_model_name='sentence-transformers/all-MiniLM-L6-v2',
    field_dim=64,
    num_attractor_points=20,
    num_evolution_steps=5
).to(device)

# Upload dataset to Colab if needed
from google.colab import files
import os

# Check if dataset exists
dataset_path = '/content/laminet_samples_10k.json'

if not os.path.exists(dataset_path):
    print("Please upload the dataset file:")
    uploaded = files.upload()
    dataset_path = list(uploaded.keys())[0]
    # If it's uploaded to a different path, move it to the expected path
    if dataset_path != 'laminet_samples_10k.json':
        !mv "{dataset_path}" "/content/laminet_samples_10k.json"
        dataset_path = '/content/laminet_samples_10k.json'
    print(f"Dataset uploaded to {dataset_path}")
else:
    print(f"Dataset already exists at {dataset_path}")

In [None]:
# Load the dataset
dataset = LaminetDataset(dataset_path, model.tokenizer)

# Split into train and validation sets (90/10 split)
train_size = int(0.9 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")

# Create data loaders with larger batch size
batch_size = 32  # Slightly smaller due to higher memory requirements of generator
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=2)

## Training with Text Generation

We'll train the model with both field evolution and text generation objectives.

In [None]:
# Define loss functions
cosine_loss = nn.CosineEmbeddingLoss()
mse_loss = nn.MSELoss()

# Define optimizer with learning rate scheduler
learning_rate = 1e-3
optimizer = optim.AdamW(
    [{'params': model.parameters(), 'lr': learning_rate}],
    weight_decay=1e-4
)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, 
    mode='min', 
    factor=0.5, 
    patience=1,
    verbose=True
)

# Initialize grad scaler for mixed precision training
scaler = GradScaler()

In [None]:
def train_epoch(model, train_loader, optimizer, epoch, scaler):
    """Train for one epoch with mixed precision."""
    model.train()
    total_loss = 0
    total_field_loss = 0
    total_gen_loss = 0
    total_samples = 0
    
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch}")
    
    for batch in progress_bar:
        # Get source and target texts
        source_texts = batch['source_text']
        target_texts = batch['target_text']
        target_input_ids = batch['target_input_ids'].to(device)
        
        # Reset optimizer
        optimizer.zero_grad(set_to_none=True)  # More efficient than zero_grad()
        
        # Forward pass with mixed precision
        with autocast():
            # Get evolved embeddings
            source_evolved, potential_energy = model(source_texts)
            
            # Get target embeddings
            with torch.no_grad():
                target_embeddings = model.encode_text(target_texts)
            
            # Compute cosine similarity loss for field evolution
            target_ones = torch.ones(source_evolved.size(0)).to(device)
            field_loss = cosine_loss(source_evolved, target_embeddings, target_ones)
            
            # Compute field coherence loss (regularization)
            coherence_loss = torch.mean(potential_energy)
            
            # Compute text generation loss
            gen_loss = model.text_generator.train_step(source_evolved, target_input_ids)
            
            # Total loss (weighted sum)
            loss = field_loss + 0.1 * coherence_loss + 0.5 * gen_loss
        
        # Backward pass with gradient scaling
        scaler.scale(loss).backward()
        
        # Unscale before gradient clipping
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        
        # Optimizer step with gradient scaling
        scaler.step(optimizer)
        scaler.update()
        
        # Update statistics
        total_loss += loss.item() * len(source_texts)
        total_field_loss += field_loss.item() * len(source_texts)
        total_gen_loss += gen_loss.item() * len(source_texts)
        total_samples += len(source_texts)
        
        # Update progress bar
        avg_loss = total_loss / total_samples
        avg_field_loss = total_field_loss / total_samples
        avg_gen_loss = total_gen_loss / total_samples
        
        progress_bar.set_postfix({
            'loss': f"{avg_loss:.4f}", 
            'field_loss': f"{avg_field_loss:.4f}", 
            'gen_loss': f"{avg_gen_loss:.4f}"
        })
        
    return total_loss / total_samples, total_field_loss / total_samples, total_gen_loss / total_samples

def validate(model, val_loader, epoch):
    """Validate the model."""
    model.eval()
    total_loss = 0
    total_field_loss = 0
    total_gen_loss = 0
    total_samples = 0
    
    progress_bar = tqdm(val_loader, desc=f"Validation {epoch}")
    
    with torch.no_grad():
        for batch in progress_bar:
            # Get source and target texts
            source_texts = batch['source_text']
            target_texts = batch['target_text']
            target_input_ids = batch['target_input_ids'].to(device)
            
            # Get evolved embeddings
            source_evolved, potential_energy = model(source_texts)
            
            # Get target embeddings
            target_embeddings = model.encode_text(target_texts)
            
            # Compute losses
            target_ones = torch.ones(source_evolved.size(0)).to(device)
            field_loss = cosine_loss(source_evolved, target_embeddings, target_ones)
            
            # Simple estimate for generation loss (without updating model)
            logits = model.text_generator.output_projection(source_evolved)
            gen_loss = F.cross_entropy(logits, target_input_ids[:, 1])
            
            # Total loss
            loss = field_loss + 0.5 * gen_loss
            
            # Update statistics
            total_loss += loss.item() * len(source_texts)
            total_field_loss += field_loss.item() * len(source_texts)
            total_gen_loss += gen_loss.item() * len(source_texts)
            total_samples += len(source_texts)
            
            # Update progress bar
            avg_loss = total_loss / total_samples
            progress_bar.set_postfix({'val_loss': f"{avg_loss:.4f}"})
    
    return total_loss / total_samples, total_field_loss / total_samples, total_gen_loss / total_samples

In [None]:
# Create directory for checkpoints
os.makedirs('/content/checkpoints', exist_ok=True)

# Training loop
num_epochs = 8
best_val_loss = float('inf')

# Training history
train_losses = []
val_losses = []
field_losses = []
gen_losses = []

# Enable cuDNN benchmark for faster training
torch.backends.cudnn.benchmark = True

# Start timing
start_time = time.time()

for epoch in range(1, num_epochs + 1):
    # Train
    train_loss, train_field_loss, train_gen_loss = train_epoch(model, train_loader, optimizer, epoch, scaler)
    train_losses.append(train_loss)
    
    # Validate
    val_loss, val_field_loss, val_gen_loss = validate(model, val_loader, epoch)
    val_losses.append(val_loss)
    field_losses.append(val_field_loss)
    gen_losses.append(val_gen_loss)
    
    # Update learning rate scheduler
    scheduler.step(val_loss)
    
    # Save checkpoint if validation loss improved
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        checkpoint_path = f"/content/checkpoints/laminet_generative_epoch_{epoch}_loss_{val_loss:.4f}.pt"
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'train_loss': train_loss,
            'val_loss': val_loss,
        }, checkpoint_path)
        print(f"Saved checkpoint to {checkpoint_path}")
    
    # Report time elapsed
    elapsed = time.time() - start_time
    print(f"Epoch {epoch}: Train Loss = {train_loss:.4f}, Val Loss = {val_loss:.4f}, Time elapsed: {elapsed/60:.2f} minutes")
    
# Report total training time
total_time = time.time() - start_time
print(f"\nTotal training time: {total_time/60:.2f} minutes")

In [None]:
# Plot training and validation loss
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.plot(range(1, num_epochs + 1), train_losses, label='Train Loss')
plt.plot(range(1, num_epochs + 1), val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Total Training and Validation Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(range(1, num_epochs + 1), field_losses, label='Field Loss')
plt.plot(range(1, num_epochs + 1), gen_losses, label='Generation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Component Losses')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.savefig('/content/loss_curve_generative.png')
plt.show()

## Save Final Model

In [None]:
# Save the final model
final_model_path = "/content/laminet_generative_final.pt"
torch.save({
    'model_state_dict': model.state_dict(),
    'field_dim': model.field_dim,
    'num_attractor_points': len(model.attractor_points),
    'num_evolution_steps': model.num_evolution_steps,
    'encoder_model_name': 'sentence-transformers/all-MiniLM-L6-v2',
}, final_model_path)
print(f"Saved final model to {final_model_path}")

## Testing Text Generation

Let's test the model's ability to generate text from semantic fields.

In [None]:
def test_generation(model, test_texts, temperature=0.8, top_k=50):
    """Test text generation from input texts."""
    model.eval()
    
    print("\nGenerating responses:")
    for text in test_texts:
        print(f"\nInput: {text}")
        
        # Generate response
        with torch.no_grad():
            generated_texts = model.generate_text([text], temperature=temperature, top_k=top_k)
            response = generated_texts[0]
            
        print(f"Generated: {response}")
        
        # For comparison, find closest concept
        with torch.no_grad():
            input_embedding = model.encode_text([text])[0]
            query_point = model.create_query_point(input_embedding)
            evolved_embedding, _ = model.evolve_field(query_point)
            
            # Find closest target text in dataset
            closest_texts = []
            closest_similarities = []
            
            # Get sample target texts
            sample_texts = [sample['target_text'] for sample in random.sample(dataset.samples, min(100, len(dataset)))]
            
            # Get embeddings for sample texts
            sample_embeddings = model.encode_text(sample_texts)
            
            # Compute similarities
            similarities = F.cosine_similarity(evolved_embedding.unsqueeze(0), sample_embeddings)
            
            # Get top 3 closest texts
            top_indices = similarities.argsort(descending=True)[:3]
            for idx in top_indices:
                closest_texts.append(sample_texts[idx])
                closest_similarities.append(similarities[idx].item())
            
        # Print closest texts for comparison
        print("Closest dataset examples:")
        for i, (text, sim) in enumerate(zip(closest_texts, closest_similarities)):
            print(f"{i+1}. (sim: {sim:.4f}) {text}")

In [None]:
# Test with some example texts
test_texts = [
    "The temperature outside was freezing cold.",
    "She felt sad looking at the old photographs.",
    "The problem was extremely complex with many variables.",
    "The artist's creativity allowed her to see unique solutions."
]

test_generation(model, test_texts)

In [None]:
# Test with different temperatures
print("\nTesting with different temperatures:")
input_text = "The building was very complex with many intricate details."
print(f"\nInput: {input_text}")

for temp in [0.5, 0.7, 1.0, 1.3]:
    with torch.no_grad():
        generated_texts = model.generate_text([input_text], temperature=temp, top_k=50)
        response = generated_texts[0]
    
    print(f"\nTemperature {temp}: {response}")

## Generative Chatbot Interface

Create a chatbot that generates new text responses rather than just retrieving existing ones.

In [None]:
class GenerativeLaminetChatbot:
    """Chatbot interface for the GenerativeLaminet model."""
    def __init__(self, model, memory_size=5):
        self.model = model
        self.memory_size = memory_size
        self.memory = []  # Store recent interactions
        self.temperature = 0.8  # Default temperature for generation
    
    def chat(self, user_input):
        """Process user input and generate a response."""
        # Add user input to memory
        self.memory.append({'role': 'user', 'text': user_input})
        
        # Generate response using field-based generation
        with torch.no_grad():
            generated_texts = self.model.generate_text(
                [user_input], 
                temperature=self.temperature, 
                top_k=50
            )
            response = generated_texts[0]
        
        # Add response to memory
        self.memory.append({'role': 'assistant', 'text': response})
        
        # Trim memory if too large
        if len(self.memory) > self.memory_size * 2:
            self.memory = self.memory[-self.memory_size * 2:]
        
        return response
    
    def get_conversation_history(self):
        """Get the conversation history."""
        return self.memory
    
    def reset(self):
        """Reset the conversation history."""
        self.memory = []
        return "Conversation history cleared."
    
    def set_temperature(self, temperature):
        """Set the temperature for text generation."""
        self.temperature = temperature
        return f"Temperature set to {temperature}."

In [None]:
# Initialize chatbot
chatbot = GenerativeLaminetChatbot(model)

# Interactive chat interface using IPython widgets
from ipywidgets import widgets
from IPython.display import display, clear_output

# Chat history display
chat_history = widgets.HTML(value="")

# Text input for user
text_input = widgets.Text(
    placeholder='Type your message here...',
    description='You:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='80%')
)

# Send button
send_button = widgets.Button(
    description='Send',
    button_style='primary',
    layout=widgets.Layout(width='15%')
)

# Reset button
reset_button = widgets.Button(
    description='Reset Chat',
    button_style='danger',
    layout=widgets.Layout(width='15%')
)

# Temperature slider
temperature_slider = widgets.FloatSlider(
    value=0.8,
    min=0.1,
    max=1.5,
    step=0.1,
    description='Temperature:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.1f',
)

def update_chat_display():
    """Update the chat display with current conversation history."""
    history = chatbot.get_conversation_history()
    html = """
    <style>
        .chat-container { font-family: Arial, sans-serif; }
        .user-message { background-color: #e6f7ff; padding: 10px; border-radius: 10px; margin: 5px 0; text-align: right; }
        .assistant-message { background-color: #f1f1f1; padding: 10px; border-radius: 10px; margin: 5px 0; }
    </style>
    <div class="chat-container">
    """
    
    for message in history:
        if message['role'] == 'user':
            html += f"<div class='user-message'><strong>You:</strong> {message['text']}</div>"
        else:
            html += f"<div class='assistant-message'><strong>Laminet:</strong> {message['text']}</div>"
    
    html += "</div>"
    chat_history.value = html

def on_send_clicked(b):
    """Handle send button click."""
    user_input = text_input.value
    if not user_input.strip():
        return
    
    # Clear input field
    text_input.value = ""
    
    # Update temperature
    chatbot.set_temperature(temperature_slider.value)
    
    # Process input and get response
    response = chatbot.chat(user_input)
    
    # Update chat display
    update_chat_display()

def on_reset_clicked(b):
    """Handle reset button click."""
    chatbot.reset()
    update_chat_display()
    print("Chat history cleared.")

# Add event handlers
send_button.on_click(on_send_clicked)
reset_button.on_click(on_reset_clicked)

# Handle Enter key in text input
def on_enter(sender):
    on_send_clicked(None)

text_input.on_submit(on_enter)

# Layout
input_box = widgets.HBox([text_input, send_button])
chat_interface = widgets.VBox([chat_history, input_box, temperature_slider, reset_button])

# Display interface
display(chat_interface)

# Initial update
update_chat_display()

## Conclusion

This notebook demonstrates how to build a generative Laminet model that uses field evolution principles for text generation, rather than relying on transformer-based language models.

Key components include:

1. **Field-Based Generator**: Generates text by evolving semantic field points rather than using attention mechanisms
2. **Semantic Field Evolution**: Uses physical-like forces to model semantic relationships
3. **Dynamic Generation**: Creates new text that reflects semantic transitions in the field

Unlike the original Laminet model which only retrieved existing texts, this model generates entirely new responses based on the semantic field evolution. This approach aligns with the Laminet philosophy of moving away from transformer architectures towards continuous field evolution models.