In [1]:
import numpy as np
import pylab as pl
import math as m

n_data = 10**5  # Total 100K samples
x_min, x_max = -0.95, 0.95
y_min, y_max = -0.95, 0.95

np.random.seed(42)  # Consistency

# STRATIFIED SAMPLING - Phase 2 Implementation
# 70% uniform samples
n_uniform = int(0.7 * n_data)
x_uniform = np.random.uniform(x_min, x_max, size=n_uniform)
y_uniform = np.random.uniform(y_min, y_max, size=n_uniform)

# 30% boundary-biased samples using Beta(0.5, 0.5)
# Beta(0.5, 0.5) creates U-shaped distribution favoring extremes
n_boundary = n_data - n_uniform
beta_samples_x = np.random.beta(0.5, 0.5, size=n_boundary)
beta_samples_y = np.random.beta(0.5, 0.5, size=n_boundary)

# Scale Beta samples from [0,1] to [x_min, x_max]
x_boundary = (x_max - x_min) * beta_samples_x + x_min
y_boundary = (y_max - y_min) * beta_samples_y + y_min

# Combine uniform and boundary-biased samples
x_list = np.concatenate([x_uniform, x_boundary])
y_list = np.concatenate([y_uniform, y_boundary])

# Shuffle to mix uniform and boundary samples
shuffle_indices = np.random.permutation(n_data)
x_list = x_list[shuffle_indices]
y_list = y_list[shuffle_indices]

print(f"Generated {n_uniform} uniform samples + {n_boundary} boundary-biased samples = {n_data} total")
print(f"Boundary sample range: x ∈ [{x_boundary.min():.3f}, {x_boundary.max():.3f}]")

x_list.shape, y_list.shape

Generated 70000 uniform samples + 30000 boundary-biased samples = 100000 total
Boundary sample range: x ∈ [-0.950, 0.950]


((100000,), (100000,))

In [2]:
# Now make them Torch tensor
import torch
X_Input = torch.tensor(x_list, dtype=torch.float32)
Y_Input = torch.tensor(y_list, dtype=torch.float32)
XY_Input = X_Input*Y_Input

X_Input, Y_Input, XY_Input = X_Input.unsqueeze(1), Y_Input.unsqueeze(1), XY_Input.unsqueeze(1),
X_Input.shape, Y_Input.shape, XY_Input.shape

(torch.Size([100000, 1]), torch.Size([100000, 1]), torch.Size([100000, 1]))

In [3]:
from torch.utils.data import DataLoader, TensorDataset, random_split

# Combine input and target tensors into a dataset
Total_Data = TensorDataset(X_Input, Y_Input, XY_Input)

# Specify the size of the validation set
validation_ratio = 0.20
validation_size = int(validation_ratio * len(Total_Data))
training_size = len(Total_Data) - validation_size

# Split the dataset into training and validation sets
Training_Set, Validation_Set = random_split(Total_Data, [training_size, validation_size])

BATCH_SIZE = 512 

# Create data loaders
Train_Loader = DataLoader(
    Training_Set, 
    batch_size=BATCH_SIZE, 
    shuffle=True,
    num_workers=0  # Keep at 0 for MPS device
)

Valid_Loader = DataLoader(
    Validation_Set, 
    batch_size=BATCH_SIZE,
    num_workers=0  # Keep at 0 for MPS device
)

print(f"Training batches: {len(Train_Loader)}")
print(f"Validation batches: {len(Valid_Loader)}")
len(Total_Data), Total_Data[0]

Training batches: 157
Validation batches: 40


(100000, (tensor([0.0454]), tensor([0.9315]), tensor([0.0423])))

In [4]:
import torch.nn as nn

# Set seeds for reproducibility
torch.manual_seed(42)

# Set device to MPS (Mac GPU)
device = torch.device('mps')

# Create the Network - Optimized for M4 Pro

class Network(nn.Module):
    
    def __init__(self):
        super(Network, self).__init__()
        
        self.fc1 = nn.Linear(1, 128)    # Input layer to first hidden layer
        self.fc2 = nn.Linear(128, 128)  # First hidden layer to second hidden layer  
        self.fc3 = nn.Linear(128, 1)    # Output layer
        self.activation = nn.Tanh()
        
        # Learnable output scale - helps fix magnitude collapse
        self.output_scale = nn.Parameter(torch.tensor([180.0]))

    def forward(self, x):
        x = self.activation(self.fc1(x))  
        x = self.activation(self.fc2(x))
        x = self.fc3(x)
        x = x * self.output_scale  # Scale the output
        return x
    
Transform_NN = Network().to(device)

print(f"Network parameters: {sum(p.numel() for p in Transform_NN.parameters()):,}")
Transform_NN

Network parameters: 16,898


Network(
  (fc1): Linear(in_features=1, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=128, bias=True)
  (fc3): Linear(in_features=128, out_features=1, bias=True)
  (activation): Tanh()
)

In [5]:
# Thanks Aiqing Zhu
def Get_Grad(y, x, create_graph=True, keepdim=False):
    '''
    y: [N, Ny] or [Ny]
    x: [N, Nx] or [Nx]
    Return dy/dx ([N, Ny, Nx] or [Ny, Nx]).
    '''
    N = y.size(0) if len(y.size()) == 2 else 1
    Ny = y.size(-1)
    Nx = x.size(-1)
    z = torch.ones_like(y[..., 0])
    dy = []
    for i in range(Ny):
        dy.append(torch.autograd.grad(y[..., i], x, grad_outputs=z, create_graph=create_graph)[0])
    shape = np.array([N, Ny])[2-len(y.size()):]
    shape = list(shape) if keepdim else list(shape[shape > 1])
    return torch.cat(dy, dim=-1).view(shape + [Nx])

def Custom_Loss(model,X,Y,XY):

    # L1 is F(X)+F(Y)-F(XY)=0
    FX, FY, FXY = model(X), model(Y), model((X+Y)/(1+XY))
    L1 = torch.mean((FX+FY-FXY)**2)

    # Get the device from the model's parameters
    device = next(model.parameters()).device
    Xc = torch.tensor([0], dtype=torch.float32, device=device).requires_grad_(True)
    FXc = model(Xc)
    dFdXc = Get_Grad(FXc, Xc, create_graph=True, keepdim=False)

    # L2 is fixing the value at xc=0 to be f(xc)=0
    L2 = torch.mean(FXc**2)

    # L3 is fixing the gradient at xc=0 to be df/dx=1
    L3 = torch.mean((dFdXc-1)**2)
    
    return L1, L2, L3

In [None]:
from torch import optim

# Enable faster matrix operations
torch.set_float32_matmul_precision('high')

# define loss function and optimization method
parameters = list(Transform_NN.parameters())
optimizer = optim.Adam(parameters, lr=1e-3)


scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=2000, eta_min=1e-6)

# Loss weights: Emphasize boundary conditions (L2, L3) 100x more than physics equation (L1)
c1, c2, c3 = 1, 100, 100

# Define the number of epochs - increased from 1000 to 2000 for better convergence
N_EPOCHS = 2000
Epoch_List = np.arange(N_EPOCHS)+1
Train_Loss_List = np.zeros((N_EPOCHS))
Valid_Loss_List = np.zeros((N_EPOCHS))

Train_L1_List = np.zeros((N_EPOCHS))
Train_L2_List = np.zeros((N_EPOCHS))
Train_L3_List = np.zeros((N_EPOCHS))

# How frequent should it report answer
N_REP = 100

print("Starting training...")
for epoch in range(N_EPOCHS):
    
    Train_Loss, Train_L1, Train_L2, Train_L3 = 0.0, 0.0, 0.0, 0.0
    
    # Training loop
    Transform_NN.train()
    for id_batch, (X, Y, XY) in enumerate(Train_Loader):
        # Ensure data is on the correct device with correct dtype
        X = X.to(device, dtype=torch.float32)
        Y = Y.to(device, dtype=torch.float32)
        XY = XY.to(device, dtype=torch.float32)
        
        # Clear gradients
        optimizer.zero_grad()
        
        # Calculate loss components
        L1, L2, L3 = Custom_Loss(Transform_NN, X, Y, XY)
        Loss = c1*L1 + c2*L2 + c3*L3
        
        # Backward pass
        Loss.backward()
        optimizer.step()
        
        # Accumulate loss
        Train_Loss += Loss.item()
        Train_L1 += L1.item()
        Train_L2 += L2.item()
        Train_L3 += L3.item()
        
    # Average training losses
    Train_Loss_List[epoch] = Train_Loss / len(Train_Loader)
    Train_L1_List[epoch] = Train_L1 / len(Train_Loader)
    Train_L2_List[epoch] = Train_L2 / len(Train_Loader)
    Train_L3_List[epoch] = Train_L3 / len(Train_Loader)

    # Validation loop - NOTE: We need gradients for Custom_Loss (to compute f'(0))
    # So we DON'T use torch.no_grad() here
    Transform_NN.eval()
    Valid_Loss = 0.0
    for X, Y, XY in Valid_Loader:
        # Ensure data is on correct device
        X = X.to(device, dtype=torch.float32)
        Y = Y.to(device, dtype=torch.float32)
        XY = XY.to(device, dtype=torch.float32)
        
        # Calculate loss (gradients needed for L3 = f'(0) term)
        L1, L2, L3 = Custom_Loss(Transform_NN, X, Y, XY)
        Loss = c1*L1 + c2*L2 + c3*L3
        Valid_Loss += Loss.item()
        
    Valid_Loss_List[epoch] = Valid_Loss / len(Valid_Loader)

    # Step the learning rate scheduler
    scheduler.step()

    # Print progress (including current learning rate)
    if epoch % N_REP == 0:
        current_lr = scheduler.get_last_lr()[0]
        print(f'Epoch {epoch+1:4d} | Train Loss: {Train_Loss_List[epoch]:.6f} | Valid Loss: {Valid_Loss_List[epoch]:.6f} | LR: {current_lr:.2e}')

print("Training complete!")

Starting training...
Epoch    1 | Train Loss: 5876.682586 | Valid Loss: 0.815652 | LR: 1.00e-03
Epoch  101 | Train Loss: 0.198729 | Valid Loss: 0.061554 | LR: 9.94e-04
Epoch  201 | Train Loss: 2.293318 | Valid Loss: 0.075255 | LR: 9.75e-04
Epoch  301 | Train Loss: 0.050263 | Valid Loss: 0.046740 | LR: 9.45e-04


In [None]:
import pylab as pl

f1 = pl.figure(figsize=(24,8))
sf1_1, sf1_2, sf1_3 = f1.add_subplot(1,3,1), f1.add_subplot(1,3,2), f1.add_subplot(1,3,3)

sf1_1.plot(Epoch_List,Train_Loss_List, linewidth=3, label='Training Loss')
sf1_1.plot(Epoch_List,Valid_Loss_List, linewidth=3, label='Validation Loss')

sf1_1.set_title('Learning Progress')
sf1_1.set_xlabel('Epochs')
sf1_1.set_ylabel('Loss')
sf1_1.set_yscale('log')
sf1_1.legend()

sf1_2.plot(Epoch_List,Train_L1_List, linewidth=3, label='Train L1')
sf1_2.plot(Epoch_List,Train_L2_List, linewidth=3, label='Train L2')
sf1_2.plot(Epoch_List,Train_L3_List, linewidth=3, label='Train L3')
sf1_2.set_title('Learning Progress')
sf1_2.set_xlabel('Epochs')
sf1_2.set_ylabel('Loss')
sf1_2.set_yscale('log')
sf1_2.legend()

sf1_3.plot(Epoch_List,c1*Train_L1_List, linewidth=3, label='Train c1L1')
sf1_3.plot(Epoch_List,c2*Train_L2_List, linewidth=3, label='Train c2L2')
sf1_3.plot(Epoch_List,c3*Train_L3_List, linewidth=3, label='Train c3L3')
sf1_3.set_title('Learning Progress')
sf1_3.set_xlabel('Epochs')
sf1_3.set_ylabel('Loss')
sf1_3.set_yscale('log')
sf1_3.legend()

pl.show()

In [None]:
## Plot the Learnt
n_val = 10**4
#z_min, z_max = x_min*y_min, x_max*y_max
z_min, z_max = -0.8, 0.8

Z_Input = torch.linspace(z_min, z_max, steps=n_val).unsqueeze(1).to(device)
FNNZ = Transform_NN(Z_Input)

Z_Input.shape, FNNZ.shape

Z_numpy, FNNZ_numpy = Z_Input.cpu().detach().numpy(), FNNZ.cpu().detach().numpy()
LogZ_numpy = np.arctanh(Z_numpy)

f2 = pl.figure(figsize=(12,6))
sf2_1 = f2.add_subplot(1,1,1)

sf2_1.plot(Z_numpy, FNNZ_numpy, linestyle='-', color=(0,1,0), linewidth=4, label='Neural Network model')
sf2_1.plot(Z_numpy, LogZ_numpy, linestyle=':',color=(0,0,1), linewidth=4, label="Rapidity")
sf2_1.set_xlabel('Values')
sf2_1.set_ylabel('The Transformation')
sf2_1.legend()

pl.show()