# VER 2.0 - Peak Deconvolution Model Training

**Objective:** Train a 1D U-Net to predict Gaussian peak parameters from summed signals.

**Input:** Summed signal (500 points)  
**Output:** Peak heatmap (probability of peak centers)

---

## Setup

In [None]:
# Mount Google Drive to access training data
from google.colab import drive
drive.mount('/content/drive')

# Set path to your training data (in Colab Notebooks folder)
DATA_PATH = '/content/drive/MyDrive/Colab Notebooks/training_data.npz'

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt

print(f'TensorFlow: {tf.__version__}')
print(f'GPU: {tf.config.list_physical_devices("GPU")}')

## Load Training Data

In [None]:
# Load the generated dataset
data = np.load(DATA_PATH)
signals = data['signals']
heatmaps = data['heatmaps']
voltage_grid = data['voltage_grid']

print(f'Signals shape: {signals.shape}')
print(f'Heatmaps shape: {heatmaps.shape}')
print(f'Voltage grid: {voltage_grid.min():.2f} to {voltage_grid.max():.2f} V')

In [None]:
# Normalize signals (important for neural net training)
signal_mean = signals.mean()
signal_std = signals.std()
signals_norm = (signals - signal_mean) / signal_std

print(f'Normalized signals: mean={signals_norm.mean():.4f}, std={signals_norm.std():.4f}')

# Reshape for Conv1D: (samples, timesteps, features)
X = signals_norm.reshape(-1, 500, 1)
Y = heatmaps.reshape(-1, 500, 1)

# Train/validation split
split_idx = int(0.9 * len(X))
X_train, X_val = X[:split_idx], X[split_idx:]
Y_train, Y_val = Y[:split_idx], Y[split_idx:]

print(f'Training: {len(X_train)}, Validation: {len(X_val)}')

## Build 1D U-Net Model

In [None]:
def build_unet_1d(input_shape=(500, 1)):
    """
    1D U-Net for peak heatmap prediction.
    Encoder-decoder with skip connections.
    """
    inputs = keras.Input(shape=input_shape)
    
    # --- ENCODER ---
    # Block 1
    c1 = layers.Conv1D(32, 7, padding='same', activation='relu')(inputs)
    c1 = layers.Conv1D(32, 7, padding='same', activation='relu')(c1)
    p1 = layers.MaxPooling1D(2)(c1)  # 250
    
    # Block 2
    c2 = layers.Conv1D(64, 5, padding='same', activation='relu')(p1)
    c2 = layers.Conv1D(64, 5, padding='same', activation='relu')(c2)
    p2 = layers.MaxPooling1D(2)(c2)  # 125
    
    # Block 3
    c3 = layers.Conv1D(128, 3, padding='same', activation='relu')(p2)
    c3 = layers.Conv1D(128, 3, padding='same', activation='relu')(c3)
    p3 = layers.MaxPooling1D(5)(c3)  # 25
    
    # --- BOTTLENECK ---
    c4 = layers.Conv1D(256, 3, padding='same', activation='relu')(p3)
    c4 = layers.Conv1D(256, 3, padding='same', activation='relu')(c4)
    
    # --- DECODER ---
    # Block 3
    u3 = layers.UpSampling1D(5)(c4)  # 125
    u3 = layers.Concatenate()([u3, c3])
    d3 = layers.Conv1D(128, 3, padding='same', activation='relu')(u3)
    d3 = layers.Conv1D(128, 3, padding='same', activation='relu')(d3)
    
    # Block 2
    u2 = layers.UpSampling1D(2)(d3)  # 250
    u2 = layers.Concatenate()([u2, c2])
    d2 = layers.Conv1D(64, 5, padding='same', activation='relu')(u2)
    d2 = layers.Conv1D(64, 5, padding='same', activation='relu')(d2)
    
    # Block 1
    u1 = layers.UpSampling1D(2)(d2)  # 500
    u1 = layers.Concatenate()([u1, c1])
    d1 = layers.Conv1D(32, 7, padding='same', activation='relu')(u1)
    d1 = layers.Conv1D(32, 7, padding='same', activation='relu')(d1)
    
    # Output: peak probability heatmap
    outputs = layers.Conv1D(1, 1, activation='sigmoid')(d1)
    
    model = keras.Model(inputs, outputs, name='UNet1D_PeakDetector')
    return model

model = build_unet_1d()
model.summary()

In [None]:
# Compile model
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-3),
    loss='binary_crossentropy',
    metrics=['mae']
)

## Train Model

In [None]:
# Callbacks
callbacks = [
    keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5),
]

# Train
history = model.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=50,
    batch_size=64,
    callbacks=callbacks
)

In [None]:
# Plot training history
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].plot(history.history['loss'], label='Train')
axes[0].plot(history.history['val_loss'], label='Val')
axes[0].set_title('Loss')
axes[0].legend()

axes[1].plot(history.history['mae'], label='Train')
axes[1].plot(history.history['val_mae'], label='Val')
axes[1].set_title('MAE')
axes[1].legend()

plt.tight_layout()
plt.show()

## Evaluate on Test Samples

In [None]:
# Test on validation samples
fig, axes = plt.subplots(5, 3, figsize=(15, 16))

for i in range(5):
    idx = np.random.randint(len(X_val))
    
    signal = X_val[idx:idx+1]
    true_heatmap = Y_val[idx].squeeze()
    pred_heatmap = model.predict(signal, verbose=0).squeeze()
    
    # Denormalize signal for display
    signal_display = signal.squeeze() * signal_std + signal_mean
    
    # Plot signal
    axes[i, 0].plot(voltage_grid, signal_display, 'b-')
    axes[i, 0].set_title('Input Signal')
    axes[i, 0].set_xlabel('V')
    axes[i, 0].grid(True, alpha=0.3)
    
    # Plot true heatmap
    axes[i, 1].fill_between(voltage_grid, true_heatmap, alpha=0.5, color='green')
    axes[i, 1].set_title('True Heatmap')
    axes[i, 1].set_ylim(0, 1.1)
    axes[i, 1].grid(True, alpha=0.3)
    
    # Plot predicted heatmap
    axes[i, 2].fill_between(voltage_grid, pred_heatmap, alpha=0.5, color='orange')
    axes[i, 2].set_title('Predicted Heatmap')
    axes[i, 2].set_ylim(0, 1.1)
    axes[i, 2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Save Model

In [None]:
# Save normalization constants (needed for inference)
np.savez('normalization_params.npz', 
         signal_mean=signal_mean, 
         signal_std=signal_std,
         voltage_grid=voltage_grid)

# Save model
model.save('peak_detector.keras')
print('Model saved!')

# Download files
from google.colab import files
files.download('peak_detector.keras')
files.download('normalization_params.npz')