**Urban Chemical Safety Challenge: PINN Baseline Notebook**

This notebook demonstrates solving the Convection-Diffusion equation using Physics-Informed Neural Networks (PINNs).
The solution includes:
1. Problem Setup
3. PINN Model Definition
4. Training Loop
6. Submission File Creation

In [None]:
# Install FastVPINNs dependencies and library
!mkdir fastvpinns
!cd fastvpinns
!git init
!git remote add origin https://github.com/cmgcds/fastvpinns.git
!git fetch --depth=1 origin main
!git checkout main

[33mhint: Using 'master' as the name for the initial branch. This default branch name[m
[33mhint: is subject to change. To configure the initial branch name to use in all[m
[33mhint: [m
[33mhint: 	git config --global init.defaultBranch <name>[m
[33mhint: [m
[33mhint: Names commonly chosen instead of 'master' are 'main', 'trunk' and[m
[33mhint: 'development'. The just-created branch can be renamed via this command:[m
[33mhint: [m
[33mhint: 	git branch -m <name>[m
Initialized empty Git repository in /.git/
remote: Enumerating objects: 334, done.[K
remote: Counting objects: 100% (334/334), done.[K
remote: Compressing objects: 100% (301/301), done.[K
error: RPC failed; curl 56 GnuTLS recv error (-9): Error decoding the received TLS packet.
error: 6696 bytes of body are still expected
fetch-pack: unexpected disconnect while reading sideband packet
fatal: early EOF
fatal: fetch-pack: invalid index-pack output
error: pathspec 'main' did not match any file(s) known to git


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# --- IMPORTS ---
from fastvpinns import FastVPINN
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import subprocess

ModuleNotFoundError: No module named 'fastvpinns'

In [None]:
# --- CONFIGURABLE PARAMETERS ---
# Domain boundaries
x_min, x_max = 0.0, 1.0
y_min, y_max = 0.0, 1.0
eps = 1e-4

# Network architecture
layers = [2, 50, 50, 50, 1]  # Simpler architecture
activation = 'tanh'

# Training parameters
lr_initial = 0.001
lr_decay_rate = 0.99
lr_decay_steps = 2000
epochs = 10000

# Loss weights
pde_weight = 1.0
bc_weight = 10.0
near_boundary_weight = 10.0

# Sampling parameters
N_interior = 15000  # More focus on interior points
N_boundary = 1000
N_near_boundary = 500  # Reduced near-boundary points

# Early stopping
patience = 200
min_delta = 1e-6

# Remove layer normalization and gradient clipping
use_layer_norm = False
residual_connections = False


In [None]:
def set_device():
    """
    Configure and select the computational device (GPU/CPU).

    Returns:
        str: Device identifier ('/GPU:0' or '/CPU:0')
    """
    gpus = tf.config.list_physical_devices('GPU')
    if gpus:
        print("Using GPU")
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        return '/GPU:0'
    print("Using CPU")
    return '/CPU:0'

In [None]:
device = set_device()

Using GPU


In [None]:
def generate_boundary_points(xmin, xmax, ymin, ymax, num_points):
    """
    Generate training points along the domain boundaries.

    Args:
        xmin, xmax (float): x-coordinate bounds
        ymin, ymax (float): y-coordinate bounds
        num_points (int): Total number of boundary points to generate

    Returns:
        tuple: Arrays of x and y coordinates for boundary points
    """
    num_points_per_edge = num_points // 4
    # Generate points for each boundary edge
    x_left = np.full((num_points_per_edge, 1), xmin)
    y_left = np.random.uniform(ymin, ymax, (num_points_per_edge, 1))
    x_right = np.full((num_points_per_edge, 1), xmax)
    y_right = np.random.uniform(ymin, ymax, (num_points_per_edge, 1))
    x_bottom = np.random.uniform(xmin, xmax, (num_points_per_edge, 1))
    y_bottom = np.full((num_points_per_edge, 1), ymin)
    x_top = np.random.uniform(xmin, xmax, (num_points_per_edge, 1))
    y_top = np.full((num_points_per_edge, 1), ymax)

    # Stack all boundary points
    x_boundary = np.vstack([x_left, x_right, x_bottom, x_top])
    y_boundary = np.vstack([y_left, y_right, y_bottom, y_top])
    return x_boundary, y_boundary

In [None]:
def u_bc(x, y):
    """
    Define the Dirichlet boundary condition.

    Args:
        x (tf.Tensor): x-coordinates of boundary points
        y (tf.Tensor): y-coordinates of boundary points

    Returns:
        tf.Tensor: Boundary values (zero in this case)
    """
    return np.zeros_like(x)

In [None]:
def f(x, y):
    """
    Define the forcing function for the PDE.

    Args:
        x (tf.Tensor): x-coordinates
        y (tf.Tensor): y-coordinates

    Returns:
        tf.Tensor: Values of the forcing function at given points
    """
    return 2*eps*(-x + np.exp(2*(x - 1)/eps)) + x*y**2 + 6*x*y - x*np.exp(3*(y - 1)/eps) - y**2*np.exp(2*(x - 1)/eps) + 2*y**2 - 6*y*np.exp(2*(x - 1)/eps) - 2*np.exp(3*(y - 1)/eps) + np.exp((2*x + 3*y - 5)/eps)


In [None]:
def pde_loss(model, train_interior, eps):
    """
    Calculate the PDE residual loss using automatic differentiation.

    Args:
        model (tf.keras.Model): Neural network model
        train_interior (tf.Tensor): Interior training points
        eps (float): Diffusion coefficient

    Returns:
        tf.Tensor: Mean squared PDE residual
    """
    x = tf.expand_dims(train_interior[:, 0], axis=1)
    y = tf.expand_dims(train_interior[:, 1], axis=1)

    # Calculate derivatives using automatic differentiation
    with tf.GradientTape(persistent=True) as tape1:
        tape1.watch(x)
        tape1.watch(y)
        with tf.GradientTape(persistent=True) as tape2:
            tape2.watch(x)
            tape2.watch(y)
            u = model(tf.concat([x, y], axis=1))
        grad_x = tape2.gradient(u, x)  # First derivatives
        grad_y = tape2.gradient(u, y)
    grad2_x = tape1.gradient(grad_x, x)  # Second derivatives
    grad2_y = tape1.gradient(grad_y, y)

    # Calculate PDE residual: -eps*(uxx + uyy) + 2*ux + 3*uy - f(x,y)
    residual = -eps * (grad2_x + grad2_y) + 2 * grad_x + 3 * grad_y - f(x, y)
    return tf.reduce_mean(tf.square(residual))

In [None]:
def bc_loss(model, x_bd):
    """
    Calculate the boundary condition loss.

    Args:
        model (tf.keras.Model): Neural network model
        x_bd (tf.Tensor): Boundary points

    Returns:
        tf.Tensor: Mean squared error at boundary points
    """
    u_pred = model(x_bd)
    x = tf.expand_dims(x_bd[:, 0], axis=1)
    y = tf.expand_dims(x_bd[:, 1], axis=1)
    u_exact = u_bc(x, y)
    return tf.reduce_mean(tf.square(u_pred - u_exact))

In [None]:
def train(model, epochs, train_interior, train_boundary, eps, beta, lr_initial):
    """
    Train the PINN model.

    Args:
        model (tf.keras.Model): Neural network model
        epochs (int): Number of training epochs
        train_interior (tf.Tensor): Interior training points
        train_boundary (tf.Tensor): Boundary training points
        eps (float): Diffusion coefficient
        beta (float): Weight for boundary condition loss
        lr_initial (float): Initial learning rate
    """
    optimizer = tf.optimizers.Adam(learning_rate=lr_initial)

    for epoch in range(epochs):
        # Compute gradients and update model parameters
        with tf.GradientTape() as tape:
            loss_pde = pde_loss(model, train_interior, eps)
            loss_bc = bc_loss(model, train_boundary)
            total_loss = loss_pde + beta * loss_bc

        grads = tape.gradient(total_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        # Print training progress
        if epoch % 100 == 0 or epoch == epochs-1:
            print(f"Epoch {epoch}, Total Loss: {total_loss.numpy()}, PDE Loss: {loss_pde.numpy()}, BC Loss: {loss_bc.numpy()}")

In [None]:
def create_submission(model, test_path, output_path):
    """
    Create submission file with model predictions.

    Args:
        model (tf.keras.Model): Trained neural network model
        test_path (str): Path to test data CSV file
        output_path (str): Path for output submission file
    """
    test_data = pd.read_csv(test_path)
    xy = test_data[['x', 'y']].values
    predictions = model.predict(xy)
    test_data['u'] = predictions
    test_data.to_csv(output_path, index=False)

In [None]:
# --- MAIN EXECUTION ---
# Generate interior training points
x_train_interior = np.random.uniform(x_min, x_max, (N_interior, 1))
y_train_interior = np.random.uniform(y_min, y_max, (N_interior, 1))
train_interior = tf.convert_to_tensor(np.hstack([x_train_interior, y_train_interior]), dtype=tf.float32)

# Generate boundary training points
x_boundary, y_boundary = generate_boundary_points(x_min, x_max, y_min, y_max, N_boundary)
train_boundary = tf.convert_to_tensor(np.hstack([x_boundary, y_boundary]), dtype=tf.float32)

# Create and train model on specified device
with tf.device(device):
    # Define neural network architecture
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(2,)),
        *[tf.keras.layers.Dense(units, activation='tanh') for units in layers[1:-1]],
        tf.keras.layers.Dense(1)
    ])

    # Train the model
    train(model, epochs, train_interior, train_boundary, eps, beta, lr_initial)



Epoch 0, Total Loss: 6.146156311035156, PDE Loss: 5.951121807098389, BC Loss: 0.03900687396526337
Epoch 100, Total Loss: 0.6030737161636353, PDE Loss: 0.2394435554742813, BC Loss: 0.07272603362798691
Epoch 200, Total Loss: 0.48991960287094116, PDE Loss: 0.1201016753911972, BC Loss: 0.07396358251571655
Epoch 300, Total Loss: 0.4117259383201599, PDE Loss: 0.07314594089984894, BC Loss: 0.06771599501371384
Epoch 400, Total Loss: 0.39483001828193665, PDE Loss: 0.04224011301994324, BC Loss: 0.07051797956228256
Epoch 500, Total Loss: 0.3783321678638458, PDE Loss: 0.04328598454594612, BC Loss: 0.06700923293828964
Epoch 600, Total Loss: 0.37165939807891846, PDE Loss: 0.04616701230406761, BC Loss: 0.06509847939014435
Epoch 700, Total Loss: 0.36633598804473877, PDE Loss: 0.03229809179902077, BC Loss: 0.0668075829744339
Epoch 800, Total Loss: 0.3643454313278198, PDE Loss: 0.036483291536569595, BC Loss: 0.06557242572307587
Epoch 900, Total Loss: 0.3622533082962036, PDE Loss: 0.028831951320171356, B

In [None]:
def create_submission(model, test_path, output_path):
    """
    Create submission file with model predictions.

    Args:
        model (tf.keras.Model): Trained neural network model
        test_path (str): Path to test data CSV file
        output_path (str): Path for output submission file
    """
    test_data = pd.read_csv(test_path)
    xy = test_data[['x', 'y']].values
    predictions = model.predict(xy)
    test_data['u'] = predictions
    # Modify the output path to include a filename
    output_file = os.path.join(output_path, 'submission3.csv')  # Add filename to the path
    test_data.to_csv(output_file, index=False) # Saving to the file

In [None]:
test_path = 'test.csv'
submission_path = 'submissions/'

os.makedirs(submission_path, exist_ok=True)  # Create the directory if it doesn't exist

create_submission(model, test_path, submission_path)  # Generating csv file for submission

[1m5000/5000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 1ms/step
