In [None]:
import torch
import matplotlib.pyplot as plt

# Set the mean value for mixture components
mu = 10

# Number of samples per component
n_samples = 5000

# Means for two components in 2D
means = torch.tensor([[mu, mu], [-mu, -mu]], dtype=torch.float)

# Covariance (identity for both)
cov = torch.eye(2)

# Sample standard normal, then shift by mean
def sample_gaussian(mean, cov, n):
    return torch.randn(n, 2) @ cov.sqrt() + mean

# Generate samples for each component
samples_1 = sample_gaussian(means[0], cov, n_samples)
samples_2 = sample_gaussian(means[1], cov, n_samples)

# Stack together
mixture_samples = torch.cat([samples_1, samples_2], dim=0)

# Visualize the samples
plt.figure(figsize=(6, 6))
plt.scatter(samples_1[:, 0], samples_1[:, 1], alpha=0.6, label='Component 1')
plt.scatter(samples_2[:, 0], samples_2[:, 1], alpha=0.6, label='Component 2')
plt.scatter(mixture_samples[:, 0], mixture_samples[:, 1], s=5, color='gray', alpha=0.2, label='Mixture (all)')
plt.xlabel('x')
plt.ylabel('y')
plt.title('Samples from 2D Gaussian Mixture ($\mu={}$, $-\mu={}$)'.format(mu, -mu))
plt.legend()
plt.axis('equal')
plt.grid(True)
plt.show()




In [None]:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data and configs
data = mixture_samples.to(device)
n, d = data.shape
T = 1000  # Number of diffusion steps

# Define the two-layer model with Sharp Sigmoid activation
class TwoLayerDiffusionModel(nn.Module):
    def __init__(self, d, sigmoid_sharpness=10.0):
        super().__init__()
        self.linear1 = nn.Linear(d, 1, bias=False)
        self.sigmoid = nn.Sigmoid()
        self.linear2 = nn.Linear(1, d, bias=True)  # Added bias
        self.sharpness = sigmoid_sharpness  # Controls sigmoid steepness

    def forward(self, x):
        out = self.linear1(x)
        out = self.sigmoid(self.sharpness * out)  # Sharp sigmoid
        out = self.linear2(out)
        return out

model = TwoLayerDiffusionModel(d).to(device)

# Diffusion (DDPM-style) parameters
beta_start = 1e-4
beta_end = 0.02
betas = torch.linspace(beta_start, beta_end, T).to(device)
alphas = 1. - betas
alpha_bars = torch.cumprod(alphas, dim=0)

# Loss: predict the added noise
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Training configuration
epochs = 100  # number of passes through the entire dataset
batch_size = 256

# Create DataLoader for proper batching
dataset = TensorDataset(data)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

print(f"Total samples: {n}")
print(f"Batch size: {batch_size}")
print(f"Batches per epoch: {len(dataloader)}")
print(f"Total training steps: {epochs * len(dataloader)}")

for epoch in range(epochs):
    epoch_loss = 0.0
    num_batches = 0
    
    for (x0_batch,) in tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs}", leave=False):
        x0 = x0_batch.to(device)  # [batch_size, d]
        current_batch_size = x0.shape[0]
        
        # Sample random timesteps for this batch
        t = torch.randint(0, T, (current_batch_size,), device=device).long()
        noise = torch.randn_like(x0)

        alpha_bar_t = alpha_bars[t].view(-1, 1)
        xt = (alpha_bar_t.sqrt()) * x0 + (1 - alpha_bar_t).sqrt() * noise

        pred_noise = xt/alpha_bar_t.sqrt() - model(xt)
        loss = ((pred_noise - noise) ** 2).mean()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        num_batches += 1
    
    avg_loss = epoch_loss / num_batches
    if epoch % 10 == 0 or epoch == epochs - 1:
        print(f"Epoch {epoch+1}/{epochs} | Average Loss: {avg_loss:.4f}")

# Generation function (DDPM sampling)
@torch.no_grad()
def sample_ddpm(model, n_samples, d, T):
    x = torch.randn(n_samples, d).to(device)
    for t in reversed(range(T)):
        beta_t = betas[t]
        alpha_t = alphas[t]
        alpha_bar_t = alpha_bars[t]
        # Model predicts x0 (clean sample), convert to noise prediction
        pred_x0 = model(x)
        pred_noise = x / alpha_bar_t.sqrt() - pred_x0
        coef_one = 1 / alpha_t.sqrt()
        coef_two = (1 - alpha_t) / (1 - alpha_bar_t).sqrt()
        # Update
        x = coef_one * (x - coef_two * pred_noise)
        if t > 0:
            noise = torch.randn_like(x)
            x = x + beta_t.sqrt() * noise  # add stochasticity for t > 0
    return x.cpu()

# Generate samples
gen_samples = sample_ddpm(model, 1000, d, T)

# Visualization
plt.figure(figsize=(6,6))
plt.scatter(gen_samples[:,0], gen_samples[:,1], alpha=0.5, label='Generated', color='red', s=8)
plt.scatter(data[:,0].cpu(), data[:,1].cpu(), alpha=0.15, label='Training (Mixture)', color='gray', s=8)
plt.xlabel('x')
plt.ylabel('y')
plt.title('Generated samples from trained diffusion model')
plt.legend()
plt.axis('equal')
plt.grid(True)
plt.show()



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data and configs
data = mixture_samples.to(device)
n, d = data.shape
T = 1000  # Number of diffusion steps

# Define the two-layer model with custom initialization and Sharp Sigmoid activation
class TwoLayerDiffusionModel(nn.Module):
    def __init__(self, d, init_direction=None, sigmoid_sharpness=10.0):
        super().__init__()
        self.linear1 = nn.Linear(d, 1, bias=False)
        self.sigmoid = nn.Sigmoid()
        self.linear2 = nn.Linear(1, d, bias=True)  # Added bias
        self.sharpness = sigmoid_sharpness  # Controls sigmoid steepness
        
        # Initialize with direction if provided
        if init_direction is not None:
            with torch.no_grad():
                # linear1.weight shape: (1, d) - project d-dimensional input to 1D
                # Initialize as normalized direction (unit vector)
                self.linear1.weight.copy_(init_direction.view(1, -1))
                
                # linear2.weight shape: (d, 1) - project 1D back to d dimensions
                # Initialize as normalized direction transposed
                self.linear2.weight.copy_(init_direction.view(-1, 1))
                self.linear2.bias.zero_()
                
                print(f"Initialized linear layers with direction: {init_direction}")
                print(f"||init_direction||: {init_direction.norm().item():.4f}")

    def forward(self, x):
        out = self.linear1(x)
        out = self.sigmoid(self.sharpness * out)  # Sharp sigmoid
        out = self.linear2(out)
        return out

# Compute μ/||μ|| from the first mixture component mean
mu_vector = means[0]  # [mu, mu] in 2D
mu_normalized = mu_vector / mu_vector.norm()
print(f"Original μ: {mu_vector}")
print(f"Normalized μ/||μ||: {mu_normalized}")

# Create model with normalized mu initialization
model = TwoLayerDiffusionModel(d, init_direction=mu_normalized).to(device)

# Diffusion (DDPM-style) parameters
beta_start = 1e-4
beta_end = 0.02
betas = torch.linspace(beta_start, beta_end, T).to(device)
alphas = 1. - betas
alpha_bars = torch.cumprod(alphas, dim=0)

# Loss: predict the added noise
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Training configuration
epochs = 100  # number of passes through the entire dataset
batch_size = 256

# Create DataLoader for proper batching
dataset = TensorDataset(data)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

print(f"\nTotal samples: {n}")
print(f"Batch size: {batch_size}")
print(f"Batches per epoch: {len(dataloader)}")
print(f"Total training steps: {epochs * len(dataloader)}\n")

for epoch in range(epochs):
    epoch_loss = 0.0
    num_batches = 0
    
    for (x0_batch,) in tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs}", leave=False):
        x0 = x0_batch.to(device)  # [batch_size, d]
        current_batch_size = x0.shape[0]
        
        # Sample random timesteps for this batch
        t = torch.randint(0, T, (current_batch_size,), device=device).long()
        noise = torch.randn_like(x0)

        alpha_bar_t = alpha_bars[t].view(-1, 1)
        xt = (alpha_bar_t.sqrt()) * x0 + (1 - alpha_bar_t).sqrt() * noise

        pred_noise = xt/alpha_bar_t.sqrt() - model(xt)
        loss = ((pred_noise - noise) ** 2).mean()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        num_batches += 1
    
    avg_loss = epoch_loss / num_batches
    if epoch % 10 == 0 or epoch == epochs - 1:
        print(f"Epoch {epoch+1}/{epochs} | Average Loss: {avg_loss:.4f}")

# Generation function (DDPM sampling)
@torch.no_grad()
def sample_ddpm(model, n_samples, d, T):
    x = torch.randn(n_samples, d).to(device)
    for t in reversed(range(T)):
        beta_t = betas[t]
        alpha_t = alphas[t]
        alpha_bar_t = alpha_bars[t]
        # Model predicts x0 (clean sample), convert to noise prediction
        pred_x0 = model(x)
        pred_noise = x / alpha_bar_t.sqrt() - pred_x0
        coef_one = 1 / alpha_t.sqrt()
        coef_two = (1 - alpha_t) / (1 - alpha_bar_t).sqrt()
        # Update
        x = coef_one * (x - coef_two * pred_noise)
        if t > 0:
            noise = torch.randn_like(x)
            x = x + beta_t.sqrt() * noise  # add stochasticity for t > 0
    return x.cpu()

# Generate samples
gen_samples = sample_ddpm(model, 1000, d, T)

# Visualization
plt.figure(figsize=(6,6))
plt.scatter(gen_samples[:,0], gen_samples[:,1], alpha=0.5, label='Generated', color='red', s=8)
plt.scatter(data[:,0].cpu(), data[:,1].cpu(), alpha=0.15, label='Training (Mixture)', color='gray', s=8)
plt.xlabel('x')
plt.ylabel('y')
plt.title('Generated samples from trained diffusion model\n(Initialized with μ/||μ||)')
plt.legend()
plt.axis('equal')
plt.grid(True)
plt.show()



In [None]:
import torch
import matplotlib.pyplot as plt
import numpy as np

# Generate pure noise samples
n_noise_samples = 10000
pure_noise = torch.randn(n_noise_samples, d).to(device)

# Apply the model to get sigmoid activations
with torch.no_grad():
    # Get the output of the first linear layer
    linear1_output = model.linear1(pure_noise)  # Shape: (n_noise_samples, 1)
    
    # Apply sharp sigmoid activation
    sigmoid_output = model.sigmoid(model.sharpness * linear1_output)  # Shape: (n_noise_samples, 1)
    
    # Convert to numpy for plotting
    sigmoid_values = sigmoid_output.cpu().numpy().flatten()
    linear1_values = linear1_output.cpu().numpy().flatten()
    scaled_linear1_values = (model.sharpness * linear1_output).cpu().numpy().flatten()

# Create figure with three subplots
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# Plot 1: Histogram of linear layer output (before scaling)
axes[0].hist(linear1_values, bins=50, alpha=0.7, color='blue', edgecolor='black', density=True)
axes[0].set_xlabel('Linear Layer Output (w^T x)', fontsize=12)
axes[0].set_ylabel('Density', fontsize=12)
axes[0].set_title('Distribution of Linear Layer Output\n(Before Scaling)', fontsize=14)
axes[0].grid(True, alpha=0.3)
axes[0].axvline(0, color='red', linestyle='--', linewidth=2, label='Zero')
axes[0].legend()

mean_linear = linear1_values.mean()
std_linear = linear1_values.std()
axes[0].text(0.02, 0.98, f'Mean: {mean_linear:.3f}\nStd: {std_linear:.3f}', 
             transform=axes[0].transAxes, verticalalignment='top',
             bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

# Plot 2: Histogram of scaled linear output (after sharp scaling)
axes[1].hist(scaled_linear1_values, bins=50, alpha=0.7, color='purple', edgecolor='black', density=True)
axes[1].set_xlabel(f'Scaled Output (k·w^T x), k={model.sharpness}', fontsize=12)
axes[1].set_ylabel('Density', fontsize=12)
axes[1].set_title(f'Distribution After Sharp Scaling\n(k={model.sharpness})', fontsize=14)
axes[1].grid(True, alpha=0.3)
axes[1].axvline(0, color='red', linestyle='--', linewidth=2, label='Zero')
axes[1].legend()

mean_scaled = scaled_linear1_values.mean()
std_scaled = scaled_linear1_values.std()
axes[1].text(0.02, 0.98, f'Mean: {mean_scaled:.3f}\nStd: {std_scaled:.3f}', 
             transform=axes[1].transAxes, verticalalignment='top',
             bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

# Plot 3: Histogram of sigmoid output
axes[2].hist(sigmoid_values, bins=50, alpha=0.7, color='green', edgecolor='black', density=True)
axes[2].set_xlabel('Sharp Sigmoid Output σ(k·w^T x)', fontsize=12)
axes[2].set_ylabel('Density', fontsize=12)
axes[2].set_title(f'Distribution of Sharp Sigmoid Output\n(Applied to Pure Noise)', fontsize=14)
axes[2].grid(True, alpha=0.3)
axes[2].axvline(0.5, color='red', linestyle='--', linewidth=2, label='0.5')
axes[2].legend()
axes[2].set_xlim([0, 1])

mean_sigmoid = sigmoid_values.mean()
std_sigmoid = sigmoid_values.std()
axes[2].text(0.02, 0.98, f'Mean: {mean_sigmoid:.3f}\nStd: {std_sigmoid:.3f}', 
             transform=axes[2].transAxes, verticalalignment='top',
             bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

plt.tight_layout()
plt.show()

# Print detailed statistics
print(f"\n{'='*70}")
print(f"Statistics for {n_noise_samples} pure noise samples:")
print(f"{'='*70}")
print(f"\nLinear Layer Output (w^T x):")
print(f"  Mean: {mean_linear:.4f}")
print(f"  Std:  {std_linear:.4f}")
print(f"  Min:  {linear1_values.min():.4f}")
print(f"  Max:  {linear1_values.max():.4f}")
print(f"\nScaled Output (k·w^T x) with k={model.sharpness}:")
print(f"  Mean: {mean_scaled:.4f}")
print(f"  Std:  {std_scaled:.4f}")
print(f"  Min:  {scaled_linear1_values.min():.4f}")
print(f"  Max:  {scaled_linear1_values.max():.4f}")
print(f"\nSharp Sigmoid Output σ(k·w^T x):")
print(f"  Mean: {mean_sigmoid:.4f}")
print(f"  Std:  {std_sigmoid:.4f}")
print(f"  Min:  {sigmoid_values.min():.4f}")
print(f"  Max:  {sigmoid_values.max():.4f}")

# Analyze the learned weights
with torch.no_grad():
    w1 = model.linear1.weight.cpu().numpy().flatten()
    w1_norm = np.linalg.norm(w1)
    
    w2 = model.linear2.weight.cpu().numpy().flatten()
    w2_norm = np.linalg.norm(w2)
    
    b2 = model.linear2.bias.cpu().numpy().flatten()
    
print(f"\n{'='*70}")
print(f"Learned Weights and Biases:")
print(f"{'='*70}")
print(f"\nLinear1 (w from input layer):")
print(f"  w1: {w1}")
print(f"  ||w1||: {w1_norm:.4f}")
print(f"  Direction: {w1 / (w1_norm + 1e-12)}")
print(f"\nLinear2 (w from output layer):")
print(f"  w2: {w2}")
print(f"  ||w2||: {w2_norm:.4f}")
print(f"  Direction: {w2 / (w2_norm + 1e-12)}")
print(f"\nLinear2 Bias (b2):")
print(f"  b2: {b2}")
print(f"  ||b2||: {np.linalg.norm(b2):.4f}")
print(f"\nSigmoid Sharpness Parameter:")
print(f"  k: {model.sharpness}")
print(f"{'='*70}\n")

