# Lecture 2 - Python, Jupyter and APIs

## 2.1 Jupyter notebooks

In [None]:
import sys
print(sys.executable)
!which python3

In [None]:
!pip --version
%pip --version

In [None]:
%pip install flask
import flask

In [None]:
import site
print(site.getsitepackages())
!ls -l {site.getsitepackages()[0]}

In [None]:
%lsmagic

## 2.3 API requests using `requests`

In [None]:
%pip install flask

In [None]:
from flask import Flask, request, jsonify
import threading

app = Flask(__name__)

tasks = {}
i = 1

@app.post("/tasks")
def create():
    global i
    if not request.json: abort(400)
    t = {"id": i, "state": "created", "data": request.json}
    tasks[i] = t
    i += 1
    return jsonify(t), 201

# This starts Flask's blocking event loop in same thread as Jupyter
# Subsequent cells can't run until Flask stops serving
#app.run()

# Inside a Jupyter notebook then, run Flask in a background process
# `use_reloader = False` is mandatory in a Jupyter notebook
def run():
    app.run(host="127.0.0.1", port=5000, use_reloader=False)
threading.Thread(target=run, daemon=True).start()

In [None]:
# Basic test using curl
!curl -X POST http://127.0.0.1:5000/tasks -H "Content-Type: application/json" -d '{"type": "demo", "params": {"x": 1}}'

In [None]:
import requests
r = requests.post(
    "http://127.0.0.1:5000/tasks",
    json={"type": "demo", "params": {"x": 1}}
)
print(r.status_code, r.json())

In [None]:
from flask import Flask, jsonify, abort, request
import threading

app = Flask(__name__)

tasks = {}

i = 1

@app.post("/tasks")
def create():
    global i
    if not request.json: abort(400)
    t = {"id": i, "state": "created", "data": request.json}
    tasks[i] = t
    i += 1
    return jsonify(t), 201

@app.get("/tasks")
def list_tasks():
    return jsonify(list(tasks.values()))

@app.get("/tasks/<int:i>")
def get_task(i):
    return jsonify(tasks[i]) if i in tasks else abort(404)

In [None]:
def run():
    app.run(host="127.0.0.1", port=5000, use_reloader=False)
threading.Thread(target=run, daemon=True).start()

In [None]:
import requests
r = requests.get("http://127.0.0.1:5000/tasks")
print(r.status_code, r.json())

In [None]:
import requests
r = requests.post(
    "http://127.0.0.1:5000/tasks",
    json={"type": "demo", "params": {"x": 1}}
)
print(r.status_code, r.json())

In [None]:
requests.post(
    "http://127.0.0.1:5000/tasks",
    json={"type": "demo", "params": {"x": 1}}
)

In [None]:
requests.post(
    "http://127.0.0.1:5000/tasks",
    json={"type": "demo", "params": {"x": 1}}
).status_code

In [None]:
requests.get("http://127.0.0.1:5000/tasks").json()

In [None]:
requests.get("http://127.0.0.1:5000/tasks/2").json()

# Lecture 3 - Visualising Fields and Observations

## 3.2 ecCodes

In [None]:
import eccodes

In [None]:
!grib_ls -V

In [None]:
print(eccodes.codes_get_api_version())

### Read GRIB2 file

In [None]:
!find .. -name "*.grib2"

In [None]:
grib_file = "../e-ai_ml2/course/code/code03/ifs_2t.grib2"

In [None]:
with open(grib_file, "rb") as f:
    while True:
        gid = eccodes.codes_grib_new_from_file(f)
        if gid is None: break

        short = eccodes.codes_get(gid, "shortName")
        level = eccodes.codes_get(gid, "level")
        size  = eccodes.codes_get_size(gid, "values")

        print(short, level, size)

        eccodes.codes_release(gid)

### Download GRIB2 file from ECMWF

In [None]:
from ecmwf.opendata import Client

client = Client(
    source = "ecmwf",
    model = "ifs",
)

client.retrieve(
    time = 0,
    type = "fc",
    step = 24,
    param = ["2t", "msl"],
    target = "ifs_2t.grib2"
)

In [None]:
!ls *.grib2

### Download from DWD

In [None]:
import datetime

base_url = "http://opendata.dwd.de/weather/nwp/icon/grib/00/t_2m/"
now = datetime.datetime.now(datetime.UTC)
filename = f"icon_global_icosahedral_single-level_{now:%Y%m%d}00_000_T_2M.grib2.bz2"
url = base_url + filename
grib_filename = filename[:-4]

In [None]:
import wget
wget.download(url, filename)

In [None]:
import bz2

with bz2.open(filename, "rb") as f_in, open(grib_filename, "wb") as f_out:
    f_out.write(f_in.read())

In [None]:
!ls *.grib2*

In [None]:
import eccodes
with open(grib_filename, "rb") as f:
    while True:
        gid = eccodes.codes_grib_new_from_file(f)
        if gid is None: break

        short = eccodes.codes_get(gid, "shortName")
        level = eccodes.codes_get(gid, "level")
        size  = eccodes.codes_get_size(gid, "values")

        print(short, level, size)

        eccodes.codes_release(gid)

Extract and list metadata keys from a GRIB file:

In [None]:
import eccodes
with open(grib_filename, "rb") as f:
    while True:
        gid = eccodes.codes_grib_new_from_file(f)
        if gid is None: break

        key_iterator = eccodes.codes_keys_iterator_new(gid)
        keys = []

        while eccodes.codes_keys_iterator_next(key_iterator):
            keyname = eccodes.codes_keys_iterator_get_name(key_iterator)
            if keyname not in ['section2Padding', 'codedValues', 'values']:
                value = eccodes.codes_get_string(gid, keyname)
            keys.append((keyname, value))

        eccodes.codes_release(gid)

        for key, value in keys:
              print(f"Key: {key:40} Value: {value}")

In [None]:
import eccodes
with open(grib_file, "rb") as f:
    while True:
        gid = eccodes.codes_grib_new_from_file(f)
        if gid is None: break

        key_iterator = eccodes.codes_keys_iterator_new(gid)
        keys = []

        while eccodes.codes_keys_iterator_next(key_iterator):
            keyname = eccodes.codes_keys_iterator_get_name(key_iterator)
            if keyname not in ['section2Padding', 'codedValues', 'values']:
                value = eccodes.codes_get_string(gid, keyname)
            keys.append((keyname, value))

        eccodes.codes_release(gid)

        for key, value in keys:
              print(f"Key: {key:40} Value: {value}")

In [None]:
%pip install cartopy
import numpy as np
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import cartopy.feature as cfeature

In [None]:
with open(grib_file, "rb") as f:
    # First message is pressure
    gid = eccodes.codes_grib_new_from_file(f)

nx = eccodes.codes_get(gid, "Ni")
ny = eccodes.codes_get(gid, "Nj")
values = eccodes.codes_get_array(gid, "values")
#field = values.reshape(ny, nx)
field = values.reshape(ny, nx) / 100.0  # Pa → hPa


plt.figure(figsize=(7, 3.5))
#plt.imshow(field)
im = plt.imshow(field)
plt.title("IFS Mean Sea Level Pressure (hPa)")
plt.colorbar(im, label="Pressure (hPa)")
plt.tight_layout()
plt.axis("off")

#plt.show()

import os
out_dir = "../assets/images"
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, "grib_plot_with_eccodes_ifs_pressure.png")
plt.savefig(out_path, dpi=300, bbox_inches="tight", pad_inches=0.1)
!ls -ltr {out_dir}

In [None]:
with open(grib_file, "rb") as f:
    # Run twice to get the second message (T2m)
    gid = eccodes.codes_grib_new_from_file(f)
    gid = eccodes.codes_grib_new_from_file(f)

nx = eccodes.codes_get(gid, "Ni")
ny = eccodes.codes_get(gid, "Nj")
values = eccodes.codes_get_array(gid, "values")
field = values.reshape(ny, nx)

plt.figure(figsize=(7, 3.5))
#plt.imshow(field)
im = plt.imshow(field)
plt.title("IFS 2m Temperature (K)")
plt.colorbar(im, label="K")
plt.tight_layout()
plt.axis("off")
#plt.show()

import os
out_dir = "../assets/images"
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, "grib_plot_with_eccodes_ifs_t2m.png")
plt.savefig(out_path, dpi=300, bbox_inches="tight", pad_inches=0.1)
!ls -ltr {out_dir}

In [None]:
fig, ax = plt.subplots(figsize=(10,5), subplot_kw={"projection": ccrs.PlateCarree()})
ax.coastlines()
ax.add_feature(cfeature.BORDERS)

lats   = eccodes.codes_get_array(gid, "latitudes")
lons   = eccodes.codes_get_array(gid, "longitudes")
lat   = lats.reshape(ny, nx)
lon   = lons.reshape(ny, nx)

ax.pcolormesh(lon, lat, field, transform=ccrs.PlateCarree(), cmap="jet")

In [None]:
#%pip install scipy
from scipy.interpolate import griddata


def load_grib(file, var):
    """Loads specified variable from GRIB file."""
    with open(file, 'rb') as f:
        while (gid := eccodes.codes_grib_new_from_file(f)) is not None:
            if eccodes.codes_get(gid, "shortName") == var:
                vals = eccodes.codes_get_array(gid, "values")
                eccodes.codes_release(gid)
                return vals
            eccodes.codes_release(gid)
    return None

def interpolate_to_grid(lat, lon, t2m, bbox, grid_res=0.25):
    """Interpolates T2M data onto a regular lat/lon grid."""
    latmin, latmax, lonmin, lonmax = bbox

    # Define a smooth regular grid
    grid_lat = np.arange(latmin, latmax, grid_res)
    grid_lon = np.arange(lonmin, lonmax, grid_res)
    lon_grid, lat_grid = np.meshgrid(grid_lon, grid_lat)

    points = np.column_stack((lon.ravel(), lat.ravel()))
    values = t2m.ravel()
    xi = np.column_stack((lon_grid.ravel(), lat_grid.ravel()))
    t2m_grid = griddata(points, values, xi, method='cubic')
    t2m_grid = t2m_grid.reshape(lon_grid.shape)
    
    return lon_grid, lat_grid, t2m_grid


def plot_t2m_grid(lat, lon, t2m, bbox, title, fname):
    """Plots interpolated 2m temperature as a smooth heatmap."""
    lon_grid, lat_grid, t2m_grid = interpolate_to_grid(lat, lon, t2m, bbox)

    # Set reasonable aspect ratio based on bounding box size
    lon_range = bbox[3] - bbox[2]
    lat_range = bbox[1] - bbox[0]
    aspect_ratio = lon_range / lat_range
    figsize = (10, max(5, 10 / aspect_ratio))  # Maintain consistent width & prevent extreme height

    plt.figure(figsize=figsize)
    ax = plt.axes(projection=ccrs.PlateCarree())
    ax.set_extent([bbox[2], bbox[3], bbox[0], bbox[1]])
    ax.add_feature(cfeature.LAND, edgecolor='black')
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=':')

    # Use smooth interpolation and correct aspect ratio
    img = ax.imshow(t2m_grid, extent=[bbox[2], bbox[3], bbox[0], bbox[1]], origin='lower',
                    cmap='jet', transform=ccrs.PlateCarree(), aspect='auto', interpolation='bicubic')

    plt.colorbar(img, label="Temperature (K)")
    plt.title(title)
    plt.savefig(out_path, dpi=200, bbox_inches='tight')  # Reduce DPI for smaller file size
    #plt.show()

import os
out_dir = "../assets/images"
os.makedirs(out_dir, exist_ok=True)

# Load data
lat = load_grib("../e-ai_ml2/course/code/code03/icon_lat.grib", "tlat")
lon = load_grib("../e-ai_ml2/course/code/code03/icon_lon.grib", "tlon")
t2m = load_grib("../e-ai_ml2/course/code/code03/icon_t2m.grib", "2t")

# Plot interpolated global and Germany views
out_path = os.path.join(out_dir, "grib_plot_with_eccodes_icon_t2m_global_interp.png")
plot_t2m_grid(lat, lon, t2m, (-90, 90, -180, 180), "ICON Interpolated Global 2m Temperature", "icon_t2m_global_interp.png")
out_path = os.path.join(out_dir, "grib_plot_with_eccodes_icon_t2m_germany_interp.png")
plot_t2m_grid(lat, lon, t2m, (47, 55, 5, 15), "ICON Interpolated 2m Temperature over Germany", "icon_t2m_germany_interp.png")

!ls -ltr {out_dir}

## 3.3 Accessing SYNOP observation files from NetCDF

In [None]:
!find ../e-ai_ml2 -name "*.nc"

In [None]:
%pip install netCDF4
from netCDF4 import Dataset

In [None]:
import numpy as np

filename = "../e-ai_ml2/course/code/code03/synop.nc"

ncfile = Dataset(filename, "r")

lats = ncfile.variables["MLAH"][:]
lons = ncfile.variables["MLOH"][:]
temps = ncfile.variables["MTDBT"][:]

lats = np.array(lats)
lons = np.array(lons)
temps = np.array(temps)

ncfile.close()

In [None]:
threshold=1e+20

import cartopy.crs as ccrs
#projections = [[ccrs.PlateCarree(), "PlateCarree"]]
projections=[[ccrs.PlateCarree(), "PlateCarree"], 
                                  [ccrs.TransverseMercator(), "TransverseMercator"],
                                  [ccrs.Mercator(), "Mercator"],
                                  [ccrs.EuroPP(), "EuroPP"],
                                  [ccrs.Geostationary(), "Geostationary"],
                                  [ccrs.Stereographic(), "Stereographic"]]
# Filter out large missing values
valid_mask = (temps < threshold) & np.isfinite(temps)
lats, lons, temps = lats[valid_mask], lons[valid_mask], temps[valid_mask]

import cartopy.feature as cfeature

for projection in projections:
        fig, ax = plt.subplots(figsize=(10, 6), subplot_kw={'projection': projection[0]})
        scatter = ax.scatter(lons, lats, c=temps, cmap='jet', s=5, alpha=0.7, transform=ccrs.PlateCarree())

        # Add map features
        ax.coastlines()
        ax.add_feature(cfeature.BORDERS, edgecolor='gray')
        ax.gridlines(draw_labels=True, linewidth=0.5, color='gray', alpha=0.5, linestyle='--')

        # Add colorbar with better spacing
        cbar = plt.colorbar(scatter, ax=ax, fraction=0.04, pad=0.08)  
        cbar.set_label("Temperature (K)")

        # Set title
        plt.title("Temperature Observations on Map in Projection " + projection[1])

        # Save and show the plot
        #plt.show()
        import os
        out_dir = "../assets/images"
        os.makedirs(out_dir, exist_ok=True)
        out_path = os.path.join(out_dir, f"synop_temp_{projection[1]}.png")
        plt.savefig(out_path, dpi=300, bbox_inches="tight", pad_inches=0.1)
        !ls -ltr {out_dir}

## 3.4 AIREP feedback files in NetCDF

In [None]:
airep_file = "../e-ai_ml2/course/code/code03/monAIREP.nc"

ncfile = Dataset(airep_file, "r")

nc = 1
for varname in ncfile.variables.keys():
    var = ncfile.variables[varname]
    description = getattr(var, "longname", "N/A")
    dims = [len(ncfile.dimensions[dim]) for dim in var.dimensions]
    shape1 = dims[0] if len (dims) > 0 else ""
    shape2 = dims[1] if len (dims) > 1 else ""
    print ("{:<4} {:40} {:>10} {:>10} {:30}".format(nc, varname, shape1, shape2, description))
    if nc % 10 == 0:
        print("-" * 110)
    nc += 1

ncfile.close()

In [None]:
# Read header-level variables
ncfile = Dataset(airep_file, "r")
lat = ncfile.variables["lat"][:]
lon = ncfile.variables["lon"][:]

# Body-level variables
varno_all = ncfile.variables["varno"][:]
obs_all = ncfile.variables["obs"][:]
l_body = ncfile.variables["l_body"][:]

# Expand lat/lon to match body-level observations
ni = len(l_body)
ie = np.repeat(range(0, ni), l_body)  # Map each body entry to its header index

# varno == 2 is upper air temperature
idx = np.where(varno_all == 2)[0]

# Filter lat, lon, obs
lat_filtered = lat[ie[idx]]
lon_filtered = lon[ie[idx]]
obs_filtered = obs_all[idx]

var = "level"
var_data = ncfile.variables[var][:]

print(var_data.shape[0], len(varno_all))

extra_data = var_data[idx]
lats, lons, obs = lat_filtered, lon_filtered, obs_filtered
heights = extra_data

threshold=1e+20

print(len(lats), "Latitudes:", lats[:5])
print(len(lons), "Longitudes:", lons[:5])
print(len(obs), "Observations:", obs[:5])
if heights is not None:
    print(len(heights), "Heights:", heights[:5])

valid_mask = (obs < threshold) & np.isfinite(obs)
lats, lons, obs = lats[valid_mask], lons[valid_mask], obs[valid_mask]

# Keep only temperatures between -30°C and 40°C (243.15K to 313.15K)
temp_min, temp_max = 180, 320
physical_mask = (obs >= temp_min) & (obs <= temp_max)

lats_filtered, lons_filtered, obs_filtered = lats[physical_mask], lons[physical_mask], obs[physical_mask]

fig, ax = plt.subplots(figsize=(10, 6), subplot_kw={'projection': ccrs.PlateCarree()})

scatter = ax.scatter(lons_filtered, lats_filtered, c=obs_filtered, cmap='jet', s=2, alpha=0.7, transform=ccrs.PlateCarree())

ax.coastlines()
ax.add_feature(cfeature.BORDERS, edgecolor='gray')
ax.gridlines(draw_labels=True, linewidth=0.5, color='gray', alpha=0.5, linestyle='--')

# Ensure the colorbar does not exceed figure height
cbar = fig.colorbar(scatter, ax=ax, orientation='vertical', fraction=0.04, pad=0.08, shrink=0.8)
cbar.set_label("Temperature (K)")

plt.title("AIREP Observations")
#plt.show()
import os
out_dir = "../assets/images"
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, f"airep.png")
plt.savefig(out_path, dpi=300, bbox_inches="tight", pad_inches=0.1)
!ls -ltr {out_dir}

## GPU access in practice

In [None]:
import torch

In [None]:
print(torch.cuda.is_available())

In [None]:
print(torch.backends.mps.is_available())

In [None]:
import time

d = torch.device("mps")

x = torch.rand((4000, 4000),device=d)

t0 = time.time()
y = torch.matmul(x, x)
torch.mps.synchronize()
print("Time = ", round(time.time()-t0, 3))

In [None]:
n = 30000
x0 = torch.rand((n, n), device="cpu")
x1 = torch.rand((n, n), device="cpu")
t0 = time.time()
y0 = torch.matmul(x0, x0)
y1 = torch.matmul(x1, x1)
print("Time = ", round(time.time() - t0, 3))

In [None]:
d0 = torch.device("mps:0")
d1 = torch.device("mps:1")
x0 = torch.rand((n, n), device=d0)
x1 = torch.rand((n, n), device=d1)

t0 = time.time()
y0 = torch.matmul(x0, x0)
y1 = torch.matmul(x1, x1)
torch.mps.synchronize()

print("Time = ", round(time.time() - t0, 3))

In [None]:
x0, x1, y0, y1, x, y = 0, 0, 0, 0, 0, 0

A0 = torch.rand((n//2,n), device=d0)
A1 = torch.rand((n//2,n), device=d1)

B = torch.rand((n,n), device=d0)

t0 = time.time()

C0 = A0 @ B
C1 = A1 @ B.to(d1)

torch.mps.synchronize()

print("Time = ", round(time.time() - t0, 3))

### Mixed precision

In [None]:
d = torch.device("mps")

def doit(d):
    x = torch.randn((20000, 1024), device=d)
    W1 = torch.randn((1024, 4096), device=d)
    W2 = torch.randn((4096, 1024), device=d)
    t0 = time.time()
    y = torch.nn.functional.gelu(x @ W1)
    z = y @ W2
    torch.mps.synchronize()
    return round(time.time() - t0, 3)

for dt in [torch.float32, torch.float16]:
    torch.set_default_dtype(dt)
    print(f"{dt} time: {doit(d)}")

# Lecture 4 - AI and ML

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

## 4.1 Core AI and ML concepts

In [None]:
# ReLU activation function

x = torch.linspace(-10, 10, 200)
y = torch.relu(x)

plt.plot(x.numpy(), y.numpy(), label='ReLU', color='blue')
plt.title('ReLU Activation Function')
plt.xlabel('Input')
plt.ylabel('Output')
plt.grid(True)
plt.legend()
plt.savefig("../assets/images/relu_function.png")

### Learning a linear function

In [None]:
# Learning linear weights

# Create structured data
X = torch.zeros(100, 10, dtype=torch.float32)
for i in range(100):
    for j in range(10):
        X[i, j] = (i + 1) + (j / 10)

# Normalise to prevent exploding gradients
X = (X - X.min()) / (X.max() - X.min())

# Linear: y = 1, ..., 100
y = torch.arange(1, 101, dtype=torch.float32).reshape(-1, 1)

plt.plot(X[:,0], y)

In [None]:
# Dataset and DataLoader
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=8, shuffle=True)

In [None]:
# Model definition
model = nn.Linear(10, 1)

loss_fn = nn.MSELoss()
opt = optim.SGD(model.parameters(), lr=0.01)

In [None]:
# Training
num_epochs = 100
loss_history = []
prediction_history = {}
for epoch in range(num_epochs):
    epoch_loss = 0.0
    
    for X_batch, y_batch in loader:
        opt.zero_grad()
        y_pred = model(X_batch)
        loss = loss_fn(y_pred, y_batch)
        loss.backward()
        opt.step()
        epoch_loss += loss.item()

    loss_history.append(epoch_loss)

    if epoch%10 == 0 or epoch == (num_epochs-1):
        with torch.no_grad():
            prediction_history[epoch] = model(X).detach().squeeze().numpy()
        print(f"Epoch {epoch+1}: Loss = {epoch_loss:.4f}")

In [None]:
plt.plot(loss_history)
plt.title("Training Loss over Epochs")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.grid(True)
plt.tight_layout()
plt.savefig("../assets/images/training_loss_learning_linear_weights.png")

In [None]:
true_y = y.squeeze().numpy()
plt.plot(X[:,0], true_y, label="True y", color="black", linewidth=2)
for epoch, pred in prediction_history.items():
    plt.plot(X[:,0], pred, label=f"Epoch {epoch}")
plt.title("Evolution of Learned Function")
plt.xlabel("X")
plt.ylabel("Predicted y")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig("../assets/images/model_predictions_over_training.png")

In [None]:
# Evaluation

with torch.no_grad():
    y_pred = model(X)
    for i in range(5):
        pred = y_pred[i].item()
        true = y[i].item()
        print(f"y_pred = {pred:.2f}, y_true = {true:.2f}")

print("Weights:", *model.weight.data.numpy()[0])
print("Bias:", model.bias.data)

### Learning a non-linear function

In [None]:
# Create structured data
n = 20
X = torch.zeros(100, n, dtype=torch.float32)
for i in range(100):
    for j in range(n):
        X[i, j] = (i + 1) + (j / 10)

# Normalise to prevent exploding gradients
X = (X - X.min()) / (X.max() - X.min())

# Non-linear: y = sqrt(100x + 1) * sin(2πx)
y = torch.arange(1, 101, dtype=torch.float32)
y = torch.sqrt(y)
y = y*torch.sin(2*torch.pi*torch.arange(100, dtype=torch.float32)/100)
y = y.reshape(-1,1)
plt.plot(X[:,0], y)

In [None]:
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=8, shuffle=True)

In [None]:
# Define non-linear model
m = 1
mm = 128
if m == 2:
    # Non-linear model
    model = nn.Sequential(
        nn.Linear(n, mm),
        nn.SiLU(),
        nn.Linear(mm, 1)
    )
else:
    # Linear
    model = nn.Sequential(
        nn.Linear(n, 1),
    )

loss_fn = nn.MSELoss()
opt = optim.Adam(model.parameters(), lr=0.01)

loss_history = []
prediction_history = {}

# Training
num_epochs = 300
for epoch in range(num_epochs):

    epoch_loss = 0.0
    for X_batch, y_batch in loader:
        opt.zero_grad()
        y_pred = model(X_batch)
        loss = loss_fn(y_pred, y_batch)
        loss.backward()
        opt.step()
        epoch_loss += loss.item()

    loss_history.append(epoch_loss)

    if epoch % 30 == 0 or epoch == (num_epochs-1):
        with torch.no_grad():
            prediction_history[epoch] = model(X).detach().squeeze().numpy()
        print(f"Epoch {epoch+1} loss: {epoch_loss:.4f}")

plt.figure(figsize=(8, 4))
plt.plot(loss_history)
plt.title("Training Loss over Epochs (Non-linear Function)")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.grid(True)
plt.tight_layout()
#plt.savefig("../assets/images/training_loss_learning_nonlinear_weights.png")

In [None]:
plt.figure(figsize=(8, 4))
true_y = y.squeeze().numpy()
plt.plot(X[:,0], true_y, label="True target", color="black", linewidth=2)

for epoch, pred in prediction_history.items():
    plt.plot(X[:,0], pred, label=f"Epoch {epoch}")

plt.title("Model Predictions Over Training (Nonlinear Model)")
plt.xlabel("X")
plt.ylabel("Predicted y")
plt.legend()
plt.grid(True)
plt.tight_layout()
#plt.savefig("../assets/images/model_predictions_over_training_nonlinear.png")

## 4.2 Torch tensors

In [None]:
x = torch.tensor([2., 3.], requires_grad=True)
y = x[0]**2 + x[1]**2
y.backward()
print(x.grad)

In [None]:
import torch.nn as nn

# nn.Module is the base class for models and layers
# Holds parameters (weights and biases)
class SimpleNN(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Layer 1: 1 -> 16
        self.fc1 = nn.Linear(1,16)
        
        # Non-linear activation function (ReLU in this case)
        self.relu = nn.reLU()
        
        # Layer 2: 16 -> 1
        self.fc2 = nn.Linear(16,1)

    # Calling `model(x)` runs the model's `forward()` method
    # Forward pass computes predictions from inputs (x)
    # Builds the autograd graph (if grads enables on x)
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        return self.fc2(x)

Learning a sine function

In [None]:
# Sample input values x
x = np.linspace(0, 2*np.pi, 1000)

# Compute labels y = sin(x)
y = np.sin(x)

plt.plot(x, y)
plt.show()

In [None]:
# Dataset construction

from torch.utils.data import TensorDataset, DataLoader

x_t = torch.tensor(x).float().unsqueeze(1)
y_t = torch.tensor(y).float().unsqueeze(1)

data = TensorDataset(x_t, y_t)
loader = DataLoader(data,
                    batch_size=32,
                    shuffle=True)

In [None]:
# Model and training loop

# Learn non-linear mapping x -> \hat{y}
# Input: scalar x
# Output: scalar \hat{y}

# Model
model = nn.Sequential(
    nn.Linear(1,16), nn.ReLU(),
    nn.Linear(16,16), nn.ReLU(),
    nn.Linear(16,1)
)

# Loss function
loss_fn = nn.MSELoss()

# Optimiser
opt = torch.optim.Adam(
    model.parameters(),
    lr = 0.01
)

# Training loop
#     - Compare \hat{y} and y
#     - Minimise prediction error
#     - Update model parameters
for x_b, y_b in loader:
    
    # Zero the gradients from the previous iteration
    opt.zero_grad()

    # Forward pass of the model to get predictions
    y_p = model(x_b)

    # Update loss given predictions y_p
    loss = loss_fn(y_p, y_b)

    # Backpropagation - compute gradients of loss wrt parameters
    loss.backward()

    # Optimiser - update parameters (weights and biases) in-place
    # given the gradients
    opt.step()

## 4.3 PyTorch fundamentals

In [None]:
# Define nonlinear function
def f(x):
    return x**4 - 3*x**3 + 2 - 0.2*x

xx = np.linspace(-1, 3, 400)
yy = f(xx)

plt.figure(figsize=(7,4))
plt.plot(xx, yy, label="f(x)")
plt.xlabel("x")
plt.ylabel("f(x)")
plt.title("Gradient Descent on a Scalar Nonlinear Function")
plt.legend()
plt.grid()

In [None]:
# Initial value
x = torch.tensor([-0.5], requires_grad=True)

# Optimiser
opt = optim.SGD([x], lr=0.01)

# Trajectory
x_history = []
y_history = []

n_steps = 250

for step in range(n_steps):
    opt.zero_grad()
    y = f(x)
    y.backward()
    opt.step()

    x_history.append(x.item())
    y_history.append(y.item())

In [None]:
plt.figure(figsize=(7,4))
plt.plot(xx, yy, label="f(x)")
plt.scatter(x_history, y_history, 
            c=range(len(x_history)), 
            cmap="viridis", 
            s=30,
            label="Optimisation path")
plt.xlabel("x")
plt.ylabel("f(x)")
plt.title("Gradient Descent on a Scalar Nonlinear Function")
plt.legend()
plt.colorbar(label="Iteration")
plt.grid()
plt.savefig("../assets/images/minimisation_visualisation.png")

## 4.5 Gradient field and decision boundary

In [None]:
torch.manual_seed(42)
np.random.seed(42)

# Number of samples
N = 900

X = torch.rand(N, 2) * 4 - 2

# Parameters for ellipses
a1, b1 = 1.0, 0.5
a2, b2 = 0.6, 0.9
theta1 = np.radians(30)
theta2 = np.radians(-45)
centre1 = torch.tensor([0.9, 0.9])
centre2 = torch.tensor([-1.1, -0.2])

X_shifted1 = X - centre1
X_shifted2 = X - centre2

x1_rot = \
    X_shifted1[:,0] * np.cos(theta1) + \
    X_shifted1[:,1] * np.sin(theta1)
y1_rot = \
    -X_shifted1[:, 0] * np.sin(theta1) + \
    X_shifted1[:, 1] * np.cos(theta1)
inside_ellipse1 = \
    ((x1_rot / a1) ** 2 + \
     (y1_rot / b1) ** 2) < 1

x2_rot = \
    X_shifted2[:,0] * np.sin(theta2) + \
    X_shifted2[:,1] * np.cos(theta2)
y2_rot = \
    -X_shifted2[:, 0] * np.sin(theta2) + \
    X_shifted2[:, 1] * np.cos(theta2)
inside_ellipse2 = \
    ((x2_rot / a2) ** 2 + (y2_rot / b2) ** 2) < 1

labels = (inside_ellipse1 | inside_ellipse2).float().unsqueeze(1).numpy()

plt.figure(figsize=(7, 5))
plt.scatter(X[:, 0], X[:, 1], c=labels.squeeze(), cmap="bwr", alpha=1, edgecolors="white")
plt.xlabel("Feature 1")
plt.ylabel("Feature 2")
plt.title("Labels Defined by Two Ellipses")
plt.xlim(-2, 2)
plt.ylim(-2, 2)
plt.grid()
plt.colorbar()
plt.savefig("../assets/images/decision_boundary_labels.png")

In [None]:
# Simple classifier

class BetterClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(2, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.net(x)

model = BetterClassifier()

In [None]:
# Training

criterion = nn.BCELoss()
opt = optim.Adam(model.parameters(), lr=0.01)

num_epochs = 1000
for epoch in range(num_epochs):
    opt.zero_grad()
    y_pred = model(X)
    loss = criterion(y_pred, torch.tensor(labels, dtype=torch.float32))
    loss.backward()
    opt.step()

    if (epoch + 1) % 200 == 0:
        print(f"Epoch {epoch + 1}/{num_epochs} loss: {loss.item():.4f}")

In [None]:
# Display classification and gradients

x_min, x_max = X[:, 0].min() - 0.5, X[:, 0].max() + 0.5
y_min, y_max = X[:, 1].min() - 0.5, X[:, 1].max() + 0.5

xx, yy = torch.meshgrid(torch.linspace(x_min, x_max, 50),
                        torch.linspace(y_min, y_max, 50),
                        indexing='ij')

print(xx.shape)
print(xx.flatten().shape)

grid_points = torch.stack([xx.flatten(), yy.flatten()], dim=1)
print(grid_points.shape)

grid_points.requires_grad = True

grid_preds = model(grid_points)
print(grid_preds.shape)

grid_preds.backward(torch.ones_like(grid_preds))

grid_preds_np = grid_preds.detach().numpy().reshape(xx.shape)
print(grid_preds_np.shape)

# Gradients
grid_grads = grid_points.grad.detach().numpy()
grad_magnitudes = np.linalg.norm(grid_grads, axis=1, keepdims=True)
grad_magnitudes = np.clip(grad_magnitudes, 1, 1000)
grid_grads /= grad_magnitudes
grid_grads_x = grid_grads[:, 0].reshape(xx.shape)
grid_grads_y = grid_grads[:, 1].reshape(xx.shape)

plt.figure(figsize=(7, 5))
plt.contourf(xx, yy, grid_preds_np, alpha=1, cmap="bwr")
plt.quiver(xx, yy, grid_grads_x, grid_grads_y, color="black", scale=50)
plt.xlabel("Feature 1")
plt.ylabel("Feature 2")
plt.title("Normalized Gradient Field and Decision Boundary")
plt.xlim(-2, 2)
plt.ylim(-2, 2)
plt.grid()
plt.savefig("../assets/images/points_classified_with_gradients.png")

# Lecture 5 - Neural Network Architectures

## 5.1 Feed Forward Network

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

In [None]:
class FeedForwardNN(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, output_size):
        super(FeedForwardNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size1)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size1, hidden_size2)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(hidden_size2, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.fc3(x)
        return x

In [None]:
input_size, hidden_size1, hidden_size2, output_size = 1, 8, 6, 1
model = FeedForwardNN(input_size, hidden_size1, hidden_size2, output_size)
print(model)

In [None]:
torch.manual_seed(42)
np.random.seed(42)

# Function y = y(x)
x = np.linspace(-2, 2, 500)
y = 1 / (1 + np.exp(-5 * x))

# Convert to tensor
x_tensor = torch.tensor(x, dtype=torch.float32).unsqueeze(1)
y_tensor = torch.tensor(y, dtype=torch.float32).unsqueeze(1)

# --- Define model ----

# Architecture
class DeepFFNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1, self.fc2, self.fc3 = nn.Linear(1, 8), nn.Linear(8, 6), nn.Linear(6, 1)

    def forward(self, x):
        return self.fc3(
            torch.relu(
                self.fc2(
                    torch.relu(
                        self.fc1(x)
                    )
                )
            )
        )

model = DeepFFNN().to(torch.float32)

# Loss function
criterion = nn.MSELoss()

# Optimiser
optimiser = optim.Adam(model.parameters(), lr = 0.01)

In [None]:
# Training

loss_history = []

for epoch in range(2000):
    optimiser.zero_grad()
    y_pred = model(x_tensor)
    loss = criterion(y_pred, y_tensor)
    loss.backward()
    optimiser.step()
    loss_history.append(loss.item())
    if (epoch + 1) % 500 == 0:
        print(f"Epoch {epoch+1:4d}, Loss: {loss.item():.6f}")

In [None]:
# Inference

with torch.no_grad():
    y_pred_np = model(x_tensor).numpy()

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(10, 3))
axes[0].plot(x, y, label="True", linewidth=2)
axes[0].plot(x, y_pred_np, "r--", label="NN Approx.", linewidth=2)
axes[0].set(title="Function Approximation", xlabel="x", ylabel="f(x)"); axes[0].legend(); axes[0].grid()
axes[1].semilogy(loss_history, "r", label="Loss")
axes[1].set(title="Loss Curve", xlabel="Epochs", ylabel="MSE"); axes[1].legend(); axes[1].grid()
plt.savefig("../assets/images/deep_nn_results.png")
#plt.show()

### Depth vs size

In [None]:
np.random.seed(0)

x = np.linspace(-4, 4, 100)
y = np.sin(np.sin(np.sin(x)))

x_t = torch.tensor(x, dtype=torch.float32).unsqueeze(1)
y_t = torch.tensor(y, dtype=torch.float32).unsqueeze(1)

N0 = 64
N1 = 7

class Shallow(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(1, N0),
            nn.Tanh(),
            nn.Linear(N0, 1)
        )
    def forward(self, x):
        return self.net(x)

class Deep(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(1, N1),
            nn.Tanh(),
            nn.Linear(N1, N1),
            nn.Tanh(),
            nn.Linear(N1, N1),
            nn.Tanh(),
            nn.Linear(N1, N1),
            nn.Tanh(),
            nn.Linear(N1, 1)
        )
    def forward(self, x):
        return self.net(x)

# --- Parameter counting function -----------------------------

def count_params(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [None]:
def train(model, epochs=2000):
    opt = torch.optim.Adam(model.parameters(), lr = 0.01)
    loss_fn = nn.MSELoss()
    losses = []

    for _ in range(epochs):
        opt.zero_grad()
        y_pred = model(x_t)
        loss = loss_fn(y_pred, y_t)
        loss.backward()
        opt.step()
        losses.append(loss.item())

    return losses

In [None]:
models = [Shallow(), Deep()]

params = [count_params(m) for m in models]
print(params)

# Training
losses = [train(m) for m in models]

In [None]:
# Inference

In [None]:
with torch.no_grad():
    yf = [m(x_t).numpy() for m in models]

In [None]:
# Inference result

plt.figure(figsize=(10,4))

plt.subplot(1,2,1)
plt.plot(x, y, label="true", lw=2)
plt.plot(x, yf[0], "--", label="shallow")
plt.title("Shallow network")
plt.legend(); plt.grid()

plt.subplot(1,2,2)
plt.plot(x, y, label="true", lw=2)
plt.plot(x, yf[1], "--", label="deep")
plt.title("Deep network")
plt.legend(); plt.grid()

plt.savefig("../assets/images/shallow_vs_deep.png")
plt.show()

In [None]:
# Training loss

plt.semilogy(losses[0], label="shallow")
plt.semilogy(losses[1], label="deep")
plt.legend(); plt.grid()
plt.title("Training loss")
plt.savefig("../assets/images/shallow_vs_deep_loss.png")
plt.show()

In [None]:
dy_true = np.gradient(y, x)
dy_s = np.gradient(yf[0].squeeze(), x)
dy_d = np.gradient(yf[1].squeeze(), x)

In [None]:
plt.plot(x, dy_true, label="true", linewidth=2)
plt.plot(x, dy_s, "--", label="shallow")
plt.plot(x, dy_d, "--", label="deep")
plt.title("Derivatives")
plt.legend(); plt.grid()
plt.savefig("../assets/images/shallow_vs_deep_gradients.png")

## 5.2 Graph Neural Network

In [None]:
import torch.nn.functional as F

In [None]:
%pip install torch-geometric
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data, DataLoader

In [None]:
# GNN with two hidden layers
class GNNModel(nn.Module):
    def __init__(self, num_features, hidden_channels, num_feats_y):
        super().__init__()

        # Graph Convolutional Layers (Message Passing)
        self.conv1 = GCNConv(num_features, hidden_channels[0])
        self.conv2 = GCNConv(hidden_channels[0], hidden_channels[1])

        # Fully Connected Layers (MLP Head)
        self.fc1 = nn.Linear(hidden_channels[1], hidden_channels[0])
        self.fc2 = nn.Linear(hidden_channels[0], num_feats_y)

    def forward(self, x, edge_index):

        # Message Passing with GCN Layers
        x = F.leaky_relu(self.conv1(x, edge_index))
        x = F.leaky_relu(self.conv2(x, edge_index))

        # Fully Connected Layers
        x = F.leaky_relu(self.fc1(x))
        return self.fc2(x)

In [None]:
nx = 25
xa = 10
x_grid = torch.linspace(0, xa, nx)
x_grid

In [None]:
# Graph configuration - p1 and p2 are the parametric coordinates of points on the unit circle
p1 = torch.sin(2 * torch.pi * x_grid / xa)
p2 = torch.cos(2 * torch.pi * x_grid / xa)

In [None]:
plt.plot(x_grid, p1)
plt.plot(x_grid, p2)
plt.show()

In [None]:
plt.plot(p1, p2)
plt.gca().set_aspect('equal')

In [None]:
# Adjacency matrix (chord distance between every pair)
diff = torch.sqrt((p1.repeat(nx, 1).T - p1)**2 + (p2.repeat(nx, 1).T - p2)**2)
diff.shape

In [None]:
# Edge index
threshold = 0.5
edge_index = (diff < threshold).float().nonzero(as_tuple=False).t().contiguous()
edge_index

In [None]:
# Plot the connectivity

x = p2.numpy()  # cos(θ) - x coordinates
y = p1.numpy()  # sin(θ) - y coordinates

# Plot edges
edge_index_np = edge_index.numpy()
for i, j in edge_index_np. T:
    plt.plot([x[i], x[j]], [y[i], y[j]], 'b-', alpha=0.3, linewidth=0.5)

# Plot nodes
plt.scatter(x, y, c='red', s=50, zorder=5)

plt.scatter(x[0], y[0], c='blue', s=50, zorder=5)
plt.scatter(x[1], y[1], c='green', s=50, zorder=5)

plt.gca().set_aspect('equal')
plt.title(f'Graph connectivity (threshold = {threshold})')
plt.show()

In [None]:
# Node features (x) and node labels (y)

# x is a matrix, each row is the coordinates of one point
# y are random binary labels (0 or 1), the target for classification

data = Data(
    x = torch.cat((p1.unsqueeze(1), p2.unsqueeze(1)), dim=1),
    y = torch.randint(0, 2, (nx, 1)).float(),
    edge_index = edge_index
)

In [None]:
# Model

model = GNNModel(num_features=2, hidden_channels=[8, 16], num_feats_y=1)
print(model)

In [None]:
# Count edges
num_edges = edge_index.shape[1]

# Degree of each node
degree_per_node = torch.bincount(edge_index[0])

print("Num edges               = ", num_edges)
print("Average degree per node = ", round(degree_per_node.float().mean().item(), 2))
print("Max degree per node     = ", degree_per_node.max().item())

# Print degree of first few nodes
for i in range(min(10, nx)):  # Print up to 10 nodes
    print(f"Node {i} has {degree_per_node[i].item()} neighbors")

In [None]:
%pip install torchviz
from torchviz import make_dot

In [None]:
# Forward pass to generate graph visualisation
y_pred = model(data.x, data.edge_index)

In [None]:
dot = make_dot(y_pred,
               params={**dict(model.named_parameters()), 'Input features': data.x},
               show_attrs = True,
               show_saved = True
              )

In [None]:
dot.render("../assets/images/gnn_graph", format="png", cleanup = True)

In [None]:
dot

In [None]:
%pip install networkx

In [None]:
import networkx as nx

In [None]:
# No. of nodes
nx_nodes = 12

# Ellipse parameters
a, b = 10, 4

# Adjacency matrix for 4-neighbour connectivity (2 left, 2 right)
adjm = torch.zeros((nx_nodes, nx_nodes), dtype = torch.float)

# Populate
for i in range(nx_nodes):
    adjm[i, (i-1)%nx_nodes] = 1 # Left neighbour
    adjm[i, (i+1)%nx_nodes] = 1 # Right neighbour
    adjm[i, (i-2)%nx_nodes] = 1 # Second left neighbour
    adjm[i, (i+2)%nx_nodes] = 1 # Second right neighbour

print(adjm)

In [None]:
edge_index = adjm.nonzero(as_tuple=False).t().contiguous()
edge_index

In [None]:
# Viz

G = nx.Graph()

# Add edges
edges = edge_index.t().tolist()
G.add_edges_from(edges)

# Generate node positions
tau_values = np.linspace(0, 2*np.pi, nx_nodes, endpoint=False)
x_positions = a * np.sin(tau_values)
y_positions = b * np.cos(tau_values)

plt.plot(x_positions, y_positions)
plt.gca().set_aspect("equal")
plt.show()

In [None]:
pos = {i: (x_positions[i], y_positions[i]) for i in range(nx_nodes)}

node_colors = plt.cm.rainbow(np.linspace(0, 1, nx_nodes))

plt.figure(figsize=(a, b))
nx.draw_networkx_nodes(G, pos, node_color=node_colors, node_size=300, alpha=0.9)

curved_edges = [(u, v) for u, v in G.edges() if abs(u - v) > 1 and not (u == 0 and v == nx_nodes - 1)]  # Curved edges for longer jumps
straight_edges = [(u, v) for u, v in G.edges() if abs(u - v) == 1 or (u == 0 and v == nx_nodes - 1)]  # Direct neighbors + periodic edges

# Draw straight and curved edges separately
nx.draw_networkx_edges(G, pos, edgelist=straight_edges, edge_color="gray", width=1.5, alpha=0.7)
nx.draw_networkx_edges(G, pos, edgelist=curved_edges, edge_color="gray", width=1.5, alpha=0.7, style="dashed")

# Annotate nodes
labels = {i: f"N{i}" for i in range(nx_nodes)}
nx.draw_networkx_labels(G, pos, labels, font_size=9, font_weight="bold")

# Add edge labels (showing node connections)
edge_labels = {(u, v): f"{u}-{v}" for u, v in edges}
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=7, font_color="black")

plt.title(f"Graph Structure for GNN")
plt.axis("off")
plt.savefig("../assets/images/gnn_graph_connectivity.png", bbox_inches="tight")

In [None]:
import torch_geometric.data as geom_data
import torch_geometric.nn as geom_nn

# Set random seed
torch.manual_seed(0)

# Define parameters
xa, nx, nt, v = 10, 25, 15, 0.6

# Create grid and function data
x_grid = np.linspace(0, xa, nx + 1)[:-1]
z = np.zeros([nt, nx])
for j in range(nt):
    z[j, :] = np.sin((2 * np.pi / xa) * x_grid - v * j)

# Create adjacency matrix
p1 = np.sin(2 * np.pi * x_grid / xa)
p2 = np.cos(2 * np.pi * x_grid / xa)
p1m, p2m = np.tile(p1, (nx, 1)).T, np.tile(p2, (nx, 1)).T
diff = np.sqrt((p1m - p1m.T) ** 2 + (p2m - p2m.T) ** 2)
adjm = (diff < 0.5).astype(int)
edge_index = torch.tensor(np.array(np.nonzero(adjm)), dtype=torch.long)

# Split data into training and testing
X_train, Y_train = z[:-1], z[1:]
X_test, Y_test = z[:-1], z[1:]

# Create feature tensors and data loader
features_tmp2 = torch.tensor(np.arange(1, nx + 1) / nx, dtype=torch.float).unsqueeze(1)
train_list, test_list = [], []
for k in range(X_train.shape[0]):
    features_k_tmp1 = torch.tensor(X_train[k, :], dtype=torch.float).unsqueeze(1)
    features_k = torch.cat((features_k_tmp1, features_tmp2), dim=1)
    labels_k = torch.tensor(Y_train[k, :], dtype=torch.float).unsqueeze(1)
    data = geom_data.Data(x=features_k, y=labels_k, edge_index=edge_index)
    train_list.append(data)

for k in range(X_test.shape[0]):
    features_k_tmp1 = torch.tensor(X_test[k, :], dtype=torch.float).unsqueeze(1)
    features_k = torch.cat((features_k_tmp1, features_tmp2), dim=1)
    labels_k = torch.tensor(Y_test[k, :], dtype=torch.float).unsqueeze(1)
    data = geom_data.Data(x=features_k, y=labels_k, edge_index=edge_index)
    test_list.append(data)

# Create DataLoaders for training and testing
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader  # neuer Import

train_loader = DataLoader(train_list, batch_size=1, shuffle=True)
test_loader = DataLoader(test_list, batch_size=1, shuffle=False)

# Define the GNN model
class GNNModel(nn.Module):
    def __init__(self, num_features, hidden_channels, num_feats_y):
        super(GNNModel, self).__init__()
        self.conv1 = geom_nn.GCNConv(num_features, hidden_channels[0])
        self.conv2 = geom_nn.GCNConv(hidden_channels[0], hidden_channels[1])
        self.conv3 = geom_nn.GCNConv(hidden_channels[1], hidden_channels[2])
        self.conv4 = geom_nn.GCNConv(hidden_channels[2], hidden_channels[3])
        self.fc1 = nn.Linear(hidden_channels[3], hidden_channels[2])
        self.fc2 = nn.Linear(hidden_channels[2], hidden_channels[0])
        self.fc3 = nn.Linear(hidden_channels[0], num_feats_y)

    def forward(self, x, edge_index):
        x = F.leaky_relu(self.conv1(x, edge_index))
        x = F.leaky_relu(self.conv2(x, edge_index))
        x = F.leaky_relu(self.conv3(x, edge_index))
        x = F.leaky_relu(self.conv4(x, edge_index))
        x = F.leaky_relu(self.fc1(x))
        x = F.leaky_relu(self.fc2(x))
        return self.fc3(x)

# Initialize model, optimizer, and criterion
model = GNNModel(num_features=2, hidden_channels=[4 * nt, 4 * nt, 4 * nt, 4 * nt], num_feats_y=1)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0005, weight_decay=0)
criterion = nn.MSELoss()

# Training loop
epochs = 1500
train_mse, test_mse = [], []
for epoch in range(epochs):
    model.train()
    total_loss = 0.0
    train_mse_tmp = []
    for batch in train_loader:
        optimizer.zero_grad()
        output = model(batch.x, batch.edge_index)
        loss = criterion(output, batch.y)
        train_mse_tmp.append(loss.item())
        loss.backward()
        optimizer.step()
    train_mse.append(np.mean(train_mse_tmp))

    model.eval()
    test_mse_tmp = []
    for batch in test_loader:
        y_pred = model(batch.x, batch.edge_index)
        test_loss = criterion(y_pred, batch.y)
        test_mse_tmp.append(test_loss.item())
    test_mse.append(np.mean(test_mse_tmp))

    if epoch % 100 == 0:
        print(f'Epoch {epoch + 1}, Train Loss: {train_mse[epoch]}, Test Loss: {test_mse[epoch]}')

# Plot training and test MSE
plt.plot(np.arange(epochs), train_mse, '*', label='Train Loss')
plt.plot(np.arange(epochs), test_mse, '*', label='Test Loss')
plt.legend()
plt.title("Training and Test Loss")
plt.savefig("../assets/images/gnn_loss_curve.png")

In [None]:
model.eval()

In [None]:
test_mse_tmp = []

# Counter for images
ni = 1

# Select a few test cases
test_cases = np.random.choice(range(len(X_train) - 1), size=2, replace=False)

for idx in test_cases:

    # Get a batch from the selected test case
    original_func = X_train[idx]
    translated_func = X_train[idx + 1]
    input_features = train_list[idx].x

    # Predict with the model
    with torch.no_grad():
        predicted_func = model(input_features, train_list[idx].edge_index).numpy().flatten()

    # Compute MSE for this test case
    mse = np.mean((translated_func - predicted_func) **2)
    test_mse_tmp.append(mse)

    # Plot comparison for this test case (Original, Translated, and Predicted)
    plt.figure(figsize=(10, 5))
    plt.plot(original_func, label="Original Function", linestyle='-', marker='o', color='blue')
    plt.plot(translated_func, label="Translated Function", linestyle='-', marker='x', color='green')
    plt.plot(predicted_func, label="Predicted Translated Function", linestyle='--', marker='s', color='red')
    plt.title(f"Function {idx} - MSE: {mse:.4f}")
    plt.legend()
    plt.xlabel('Node index')
    plt.ylabel('Function value')
    plt.savefig(f"../assets/images/gnn_test_{ni}.png")
    ni+=1

## 5.3 CNN Classifier

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

def generate_function_data(num_samples=5000, num_points=50, err=0.02):
    X = []
    y = []
    functions = ['sine-cosine', 'gaussian', 'polynomial']
    
    for _ in range(num_samples):
        x = np.linspace(-1, 1, num_points)
        func_type = np.random.choice(functions)

        # Initialize a default y_values to prevent UnboundLocalError
        y_values = np.zeros(num_points)
        label = -1

        if func_type == 'sine-cosine':
            freq = np.random.uniform(1, 5)  
            phase = np.random.uniform(0, 2 * np.pi)
            amp = np.random.uniform(0.5, 2)
            y_values = amp * np.sin(freq * np.pi * x + phase) + err * np.random.randn(num_points)
            label = 0

        elif func_type == 'gaussian':
            mu = np.random.uniform(-0.5, 0.5)  
            sigma = np.random.uniform(0.2, 0.5)  
            amp = np.random.uniform(0.5, 2)
            y_values = amp * np.exp(-((x - mu) ** 2) / (2 * sigma ** 2)) + err * np.random.randn(num_points)
            label = 1

        elif func_type == 'polynomial':
            a = np.random.uniform(-2, 2)
            b = np.random.uniform(-2, 2)
            c = np.random.uniform(-3, 3)
            d = np.random.uniform(-0.5, 0.5)
            y_values = a * x**3 + b * x**2 + c * x + d + err * np.random.randn(num_points)
            label = 2

        X.append(y_values)
        y.append(label)

    X = np.array(X).reshape(-1, 1, num_points)  # Add channel dimension
    y = np.array(y)
    
    return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.long)

# Generate a large training and test dataset with adjustable noise
X_train, y_train = generate_function_data(num_samples=10000, err=0.05)  # Low noise in training
X_test, y_test = generate_function_data(num_samples=2000, err=0.2)  # Higher noise in test set

print(f"Train Data Shape: {X_train.shape}, Train Labels Shape: {y_train.shape}")
print(f"Test Data Shape: {X_test.shape}, Test Labels Shape: {y_test.shape}")

plt.figure(figsize=(12, 3))
for i, idx in enumerate(torch.randperm(len(X_train))[:6]):
    plt.subplot(1, 6, i + 1)
    plt.plot(X_train[idx][0].cpu().numpy())
    plt.title(['sine-cosine', 'gaussian', 'polynomial'][y_train[idx].item()])
    plt.xticks([]), plt.yticks([])

plt.tight_layout()
plt.show()

In [None]:
num_categories = 5

class FunctionClassifierCNN(nn.Module):

    def __init__(self):
        super().__init__()

        self.conv1 = nn.Conv1d(in_channels=1,  out_channels=16, kernel_size=5, stride=1, padding=2)
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=5, stride=1, padding=2)

        self.fc1 = nn.Linear(32 * 50, 128)
        self.fc2 = nn.Linear(128, num_categories)

    def forward(self, x):

        x = self.conv1(x)
        x = torch.relu(x)

        x = self.conv2(x)
        x = torch.relu(x)

        # Flatten
        x = x.view(x.shape[0], -1)

        x = self.fc1(x)
        x = torch.relu(x)

        x = self.fc2(x)

        return x

In [None]:
# Initialise model
model = FunctionClassifierCNN()
model.eval()

In [None]:
# --- Training ---

# Setup
device = torch.device("mps" if torch.mps.is_available() else "cpu")
print("device", device)
model.to(device)

criterion = nn.CrossEntropyLoss()
opt = optim.Adam(model.parameters(), lr = 0.001)

num_epochs = 20
batch_size = 32

# Convert dataset into DataLoader
train_loader = torch.utils.data.DataLoader(
    list(zip(X_train, y_train)),
    batch_size=batch_size,
    shuffle = True
)

# Loss as a function of epochs
loss_history = []

for epoch in range(num_epochs):
    total_loss = 0
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        opt.zero_grad()
        loss = criterion(model(batch_X), batch_y)
        loss.backward()
        opt.step()
        total_loss += loss.item()
    # Save epoch loss
    loss_history.append(total_loss / len(train_loader))
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss_history[-1]:.4f}")

In [None]:
fig=plt.figure(figsize=(10,5))
plt.plot(loss_history)
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss")
plt.savefig("../assets/images/cnn_training_loss.png", dpi=300)

In [None]:
# Evaluation
model.eval()
test_loader = torch.utils.data.DataLoader(
    list(zip(X_test, y_test)),
    batch_size = batch_size,
    shuffle = True
)

correct = 0
total = 0

with torch.no_grad():
    for batch_X, batch_y in test_loader:

        batch_X, batch_y = batch_X.to(device), batch_y.to(device)

        outputs = model(batch_X)

        _, predicted = torch.max(outputs, 1)

        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

accuracy =  100 * correct/total
print("accuracy", accuracy)

In [None]:
# Inference

import random

num_examples = 12

X_new, y_new = generate_function_data(num_samples=num_examples)
X_new = X_new.to(device)

model.eval()
with torch.no_grad():
    predictions = model(X_new)
    _, predicted_labels = torch.max(predictions, 1)

func_names = ['Sine-Cosine', 'Gaussian', 'Polynomial']

# Plot the results
rows = num_examples // 4  # Show 4 per row
plt.figure(figsize=(12, 3 * rows))

for i in range(num_examples):
    correct = predicted_labels[i] == y_new[i]  # Check if prediction is correct
    color = 'blue' if correct else 'red'  # Blue for correct, red for incorrect

    plt.subplot(rows, 4, i + 1)
    plt.plot(np.linspace(-1, 1, 50), X_new[i].cpu().numpy().squeeze(), color=color, label=f"Pred: {func_names[predicted_labels[i]]}")
    plt.legend()
    plt.title(f"True: {func_names[y_new[i]]}", color=color)  # Color title for extra clarity
    plt.xticks([])
    plt.yticks([])

plt.tight_layout()
plt.savefig("../assets/images/cnn_test_predictions.png", dpi=300)

## 5.4 LSTM Sensor Data

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
import matplotlib.pyplot as plt
import numpy as np

In [None]:
# Generate normal sine wave data with random phase shift
def generate_sensor_data(num_samples=100, seq_length=50, anomaly_ratio=0.1):
    x = []
    labels = []

    for _ in range(num_samples):
        phase_shift = np.random.uniform(0, 2*np.pi)
        time_series = np.sin(np.linspace(0, 2*np.pi, seq_length) + phase_shift) + 0.1 * np.random.rand(seq_length)
        label = 0 # Normal

        # Inject anomalies
        if np.random.rand() < anomaly_ratio:
            # Add large spikes
            time_series += np.random.uniform(-2, 2, size=seq_length)
            label = 1

        x.append(time_series)
        labels.append(label)

    return np.array(x), np.array(labels)

# Training/test data
num_samples = 2000
train_frac = 0.8
bndry = math.floor(0.8*2000)
X, y = generate_sensor_data(num_samples=num_samples)
X_train, X_test = torch.tensor(X[:bndry], dtype=torch.float32), torch.tensor(X[bndry:], dtype=torch.float32)
y_train, y_test = y[:bndry], y[bndry:]

# Reshape for LSTM input
X_train = X_train.unsqueeze(-1)
X_test = X_test.unsqueeze(-1)

print(f"Train Data Shape: {X_train.shape}, Test Data Shape: {X_test.shape}")

In [None]:
normal_indices = np.where(y_train == 0)[0][:3]
anomaly_indices = np.where(y_train == 1)[0][:3]

In [None]:
plt.figure(figsize=(12, 4))

# Plot normal sequences
for i, idx in enumerate(normal_indices):
    plt.subplot(2, 3, i + 1)
    plt.plot(X_train[idx].squeeze().cpu().numpy(), label="Normal", color="blue")
    plt.title("Normal Sensor Data")
    plt.xticks([]), plt.yticks([])

# Plot anomalous sequences
for i, idx in enumerate(anomaly_indices):
    plt.subplot(2, 3, i + 4)
    plt.plot(X_train[idx].squeeze().cpu().numpy(), label="Anomaly", color="red")
    plt.title("Anomalous Sensor Data")
    plt.xticks([]), plt.yticks([])

plt.tight_layout()
plt.savefig("../assets/images/lstm_sensor_data_samples.png", dpi=300)

In [None]:
device = torch.device("mps" if torch.mps.is_available() else "cpu")

class LSTMAutoencoder(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=32, num_layers=2, seq_length=50):
        super().__init__()

        self.seq_length = seq_length
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        # LSTM layers
        self.encoder = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.decoder = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)

        # Final layer to reconstruct output
        self.output_layer = nn.Linear(hidden_dim, input_dim)

    def forward(self, x):
        batch_size = x.size(0)

        # Encode input
        _, (hidden, cell) = self.encoder(x)

        # Initialise decoder input as zeros
        decoder_input = torch.zeros(batch_size, self.seq_length, 1).to(x.device)

        # Decode using last hidden state from encoder
        decoder_output, _ = self.decoder(decoder_input, (hidden, cell))

        x_reconstructed = self.output_layer(decoder_output)

        return x_reconstructed    

In [None]:
# Initialise model with correct sequence length
model = LSTMAutoencoder(seq_length=50).to(device)

In [None]:
# Training

criterion = nn.MSELoss()
opt = optim.Adam(model.parameters(), lr = 0.010)

num_epochs = 20
batch_size = 32

train_loader = torch.utils.data.DataLoader(X_train, batch_size=batch_size, shuffle=True)

# Track loss history
loss_history = []

# Training loop
for epoch in range(num_epochs):
    total_loss = 0
    for batch in train_loader:
        batch = batch.to(device)
        opt.zero_grad()
        outputs = model(batch)
        loss = criterion(outputs, batch)
        loss.backward()
        opt.step()
        total_loss += loss.item()

    epoch_loss = total_loss / len(train_loader)
    loss_history.append(epoch_loss)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

In [None]:
plt.plot(loss_history, label="Loss")
plt.xlabel("Epochs"), plt.ylabel("Loss"), plt.title("LSTM Training Loss")
plt.legend(), plt.grid(True)
plt.show()

In [None]:
# Compute reconstruction error on test data
model.eval()

X_test = X_test.to(device)
with torch.no_grad():
    X_reconstructed = model(X_test)

reconstruction_errors = torch.mean((X_test - X_reconstructed)**2, dim=(1, 2)).cpu().numpy()

# Set anomaly threshold
threshold = np.percentile(reconstruction_errors, 95)
y_pred = (reconstruction_errors > threshold).astype(int)

accuracy = np.mean(y_pred == y_test) * 100
print("accuracy", accuracy)

In [None]:
plt.figure(figsize=(8, 3))

# Plot normal example
plt.subplot(1, 2, 1)
plt.plot(X_test[0].cpu().numpy(), label="Original")
plt.plot(X_reconstructed[0].cpu().numpy(), label="Reconstructed", linestyle="dashed")
plt.title("Normal Sequence")
plt.legend()

# Plot anomaly example
anomaly_idx = np.argmax(reconstruction_errors)  # Most anomalous sample
plt.subplot(1, 2, 2)
plt.plot(X_test[anomaly_idx].cpu().numpy(), label="Original")
plt.plot(X_reconstructed[anomaly_idx].cpu().numpy(), label="Reconstructed", linestyle="dashed", color="red")
plt.title("Anomalous Sequence")
plt.legend()

plt.tight_layout()
plt.savefig("../assets/images/lstm_anomaly_detection.png", dpi=300)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Select 12 random test samples
num_samples = 12
indices = np.random.choice(len(X_test), num_samples, replace=False)

# Compute reconstruction errors
model.eval()
with torch.no_grad():
    X_reconstructed = model(X_test.to(device))

reconstruction_errors = torch.mean((X_test - X_reconstructed) ** 2, dim=(1, 2)).cpu().numpy()

# Detect anomalies based on threshold
threshold = np.percentile(reconstruction_errors, 90)
y_pred = (reconstruction_errors > threshold).astype(int)  # 1 = Anomaly, 0 = Normal

# Plot the selected samples
plt.figure(figsize=(12, 6))
for i, idx in enumerate(indices):
    color = 'red' if y_pred[idx] == 1 else 'blue'
    
    plt.subplot(3, 4, i + 1)
    plt.plot(X_test[idx].cpu().numpy(), color=color, label="Original")
    plt.plot(X_reconstructed[idx].cpu().numpy(), linestyle="dashed", color="black", label="Reconstructed")
    plt.title(f"{'Anomaly' if y_pred[idx] == 1 else 'Normal'}", color=color)
    plt.xticks([]), plt.yticks([])
    plt.legend(fontsize=8, loc="upper right")

plt.tight_layout()
plt.savefig("../assets/images/lstm_anomaly_detection_samples.png", dpi=300)

# Lecture 6 - LLMs

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import HTML, display
%pip install seaborn
import seaborn as sns

In [None]:
vocab = {
    0: "",              # padding token (ignored in loss)
    1: "I", 2: "am", 3: "you", 4: "is", 5: "we", 6: "are",
    7: "a", 8: "an", 9: "the",
    10: "simple", 11: "example", 12: "with",
    13: "and", 14: "but", 15: "or",
    16: "not", 17: "only", 18: "also",
    19: "how", 20: "what", 21: "why",
    22: "can", 23: "must", 24: "should",
    25: "want", 26: "has", 27: "have", 28: "had",
    29: "to", 30: "home", 31: "play", 32: "in",
    33: "garden", 34: "weather", 35: "nice",
    36: "drives", 37: "Berlin", 38: "reads", 39: "book",
    40: "she", 41: "he", 42: "go",
    43: "hungry", 44: "tired", 45: "happy", 46: "sad",
    47: "it", 48: "good", 49: "this", 50: "bad",
    51: "eat", 52: "drink", 53: "come",
    54: "they", 55: "was"
}

In [None]:
vocab_size = len(vocab)
vocab_size

In [None]:
sentences = [
    "I am hungry",
    "you are tired",
    "we are happy",
    "they are sad",
    "it is simple",
    "the weather is nice",
    "this is bad",
    "this was good",
    "we want to eat",
    "they want to drink",
    "you can come",
    "we go home",
    "they play in the garden",
    "the weather is nice",
    "he drives to Berlin",
    "she reads a book"
]

In [None]:
len(set(" ".join(sentences).split()))

In [None]:
[_ for _ in vocab.values() if _ not in set(" ".join(sentences).split())]

In [None]:
# Just the raw, untrained embeddings

d_model = 32     # embedding dimension

# Initialises a matrix of size (vocab_size, d_model) with random numbers
embedding_layer = nn.Embedding(
    num_embeddings=vocab_size,
    embedding_dim=d_model
)

# Single input "I am hungry"
input_tokens = torch.tensor([1, 2, 43])

# Forward pass
# Each integer token is replaced with a vector of length d_model
with torch.no_grad():
    output_vectors = embedding_layer(input_tokens)

print(f"Input shape:  {input_tokens.shape}")
print(f"Output shape: {output_vectors.shape}")

plt.figure(figsize=(10, 3))
sns.heatmap(output_vectors, annot=False, cmap="viridis", cbar=True)
plt.title("Visualizing Embeddings for: 'I', 'am', 'hungry'")
plt.ylabel("Token Position")
plt.xlabel("Embedding Dimension (0-31)")
plt.yticks([0.5, 1.5, 2.5], labels=["I (ID 1)", "am (ID 2)", "hungry (ID 43)"], rotation=0)
plt.savefig("../assets/images/embeddings_structure.png")

In [None]:
# Train a pure embeddings model

# Simplify the vocab
vocab = {
    0: "", 1: "I", 2: "am", 3: "you", 4: "is", 5: "we", 6: "are", 36: "drives",
    37: "Berlin", 41: "he", 42: "go", 43: "hungry", 44: "tired", 45: "happy", 46: "sad",
    54: "they", 30: "home"
}
# Inverse vocab for printing
inv_vocab = {v: k for k, v in vocab.items()}

# Train on subject+verb -> adjective
# These pairs teach the model that hungry/tired/happy/sad fill the same "slot"
training_data = [
    (["we", "are"], "hungry"),
    (["we", "are"], "tired"),
    (["we", "are"], "happy"),
    (["we", "are"], "sad"),
    (["we", "go"],  "home"),        # "home" is a location
    (["we", "go"],  "Berlin"),  # "Berlin" is a location (using "he drives" conceptually)
]

# Helper function
def encode(words):
    return torch.tensor(
        [inv_vocab[w] for w in words], dtype=torch.long
    )

# Continuous bag of words model
class SimpleCBOW(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.linear = nn.Linear(d_model, vocab_size)

    def forward(self, inputs):
        # inputs shape: [batch_size, context_len]

        # Get embeddins
        embeds = self.embedding(inputs)

        # Aggregate
        # Average the vectors of the input words to create one vector for the fragment
        combined_vector = torch.mean(embeds, dim=1)

        # Predict the target word based on the combined vector
        logits = self.linear(combined_vector)

        return logits

d_model = 2  # easy to visualise
vocab_size = 60

In [None]:
# Training

model = SimpleCBOW(vocab_size, d_model)

opt = optim.Adam(model.parameters(), lr=0.05)
criterion = nn.CrossEntropyLoss()

for epoch in range(250):
    total_loss = 0

    for context, target in training_data:

        x = encode(context).unsqueeze(0)
        y = encode([target])

        opt.zero_grad()
        logits = model(x)
        loss = criterion(logits, y)
        loss.backward()
        opt.step()
        total_loss += loss.item()

    if epoch % 50 == 0:
        print(f"Epoch {epoch:3d} loss: {total_loss:.4f}")

In [None]:
# Look at learned vectors for specific words

target_words = ["hungry", "tired", "happy", "sad", "Berlin", "home"]
vectors = {}

#
output_weights = model.linear.weight.detach().numpy()


for w in target_words:
    token_id = inv_vocab[w]
    vec = output_weights[token_id]
    vectors[w] = vec

def dist(w1, w2):
    return np.linalg.norm(vectors[w1] - vectors[w2])

print(f"Distance happy <-> sad    = {dist('happy', 'sad'):.4f}")
print(f"Distance happy <-> Berlin = {dist('happy', 'Berlin'):4f}")
print(f"Distance home  <-> Berlin = {dist('home', 'Berlin'):4f}")

plt.figure(figsize=(8, 6))
for w, vec in vectors.items():
    plt.scatter(vec[0], vec[1], s=100)
    plt.text(vec[0]+0.05, vec[1]+0.05, w, fontsize=12)

plt.title("Learned Word Embeddings (2D Space)")
plt.grid(True)
plt.axhline(0, color='black', linewidth=0.5)
plt.axvline(0, color='black', linewidth=0.5)
#plt.savefig("../assets/images/learned_word_embeddings_2d.png")

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=100):
        super().__init__()

        # Matrix of size [max_len, d_model]
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

        # Divisor term controls frequency of sine/cosine curves
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * 
            (-math.log(10000.0) / d_model)
        )

        # Apply sin to even indices, cos to odd
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Add a 'batch' dimension
        # REMOVED THE .transpose(0,1) FROM THE ORIGINAL WHICH WAS A BUG(?)
        pe = pe.unsqueeze(0)

        # Register as a buffer
        # Saved with model, but not trainable
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :]

In [None]:
class PositionAwareCBOW(nn.Module):
    def __init__(self, vocab_size, d_model, max_len=10):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, d_model)

        # New: positional encoding
        self.pos_encoder = PositionalEncoding(d_model, max_len)

        # Final projection to the vocabulary
        self.linear = nn.Linear(d_model, vocab_size)

    def forward(self, inputs):

        # Look up token embeddings
        x = self.embedding(inputs)

        # Add positional info
        x = self.pos_encoder(x)

        # Average - replaced with Self-Attention in the Transformer
        combined_vector = x.view(x.size(0), -1)

        logits = self.linear(combined_vector)

        return logits

In [None]:
# Initialise model and test order-awareness

# Setup
vocab = {0: "", 1: "we", 2: "are", 3: "happy"}
inv_vocab = {v: k for k, v in vocab.items()}
d_model = 8
vocab_size = 10

def encode(words):
    return torch.tensor(
        [inv_vocab[w] for w in words], dtype=torch.long
    ).unsqueeze(0)

torch.manual_seed(42) # The "God Mode" switch that makes randomness predictable

# Initialise model
model = PositionAwareCBOW(vocab_size, d_model)

# Experiment
# CBOW: "we are" == "are we"

input_1 = encode(["we", "are"])
input_2 = encode(["are", "we"])

model.eval()
with torch.no_grad():
    # Manually step through forward() to get vectors
    emb1 = model.embedding(input_1)
    pos1 = model.pos_encoder(emb1)
    vec1 = pos1.view(1, -1)
    emb2 = model.embedding(input_2)
    pos2 = model.pos_encoder(emb2)
    vec2 = pos2.view(1, -1)

diff = torch.norm(vec1 - vec2).item()

print(f"Vector: {vec1.numpy()[0][:4]}...") # Print first 4 dims
print(f"Vector: {vec2.numpy()[0][:4]}...")
print("-" * 30)
print(f"Euclidean Distance: {diff:.6f}")

In [None]:
# Setup for visualisation
# Generate the encodings
d_model_viz = 128   # Large dimension to see the gradient
max_len_viz = 100   # 100 positions (sequence length)
pe_layer = PositionalEncoding(d_model_viz, max_len_viz)

# Extract matrix (remove 'batch' dimension)
# Shape [100, 128]
pe_matrix = pe_layer.pe.squeeze().numpy()

In [None]:
plt.figure(figsize=(12, 6))
plt.imshow(pe_matrix, aspect='auto', cmap='RdBu', origin='lower')

plt.title("Positional Encoding Matrix")
plt.xlabel("Embedding Dimension (Frequency)")
plt.ylabel("Position in Sequence (Time)")
plt.colorbar(label="Value (-1 to +1)")
plt.savefig("../assets/images/positional_encoding_matrix.png")

In [None]:
plt.figure(figsize=(12, 4))
positions = np.arange(0, 100)

# Plot dimension 0 (High Frequency)
plt.plot(positions, pe_matrix[:, 0], label="Dim 0 (High Freq)", alpha=0.9)

# Plot dimension 40 (Medium Frequency)
plt.plot(positions, pe_matrix[:, 40], label="Dim 40 (Med Freq)", alpha=0.9)

# Plot dimension 80 (Low Frequency)
plt.plot(positions, pe_matrix[:, 80], label="Dim 80 (Low Freq)", alpha=0.9)

plt.title("Sine/Cosine Waves at Different Dimensions")
plt.xlabel("Position in Sequence")
plt.ylabel("Encoding Value")
plt.legend(loc="upper right")
plt.grid(True, alpha=0.3)
plt.savefig("../assets/images/positional_encoding_curves.png")

In [None]:
class SingleHeadAttention(nn.Module):
    def __init__(self, d_model):
        super().__init__()

        self.d_model = d_model

        # Projections
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)

    def forward(self, x):

        # Compute Q, K, V
        q = self.q_linear(x)
        k = self.k_linear(x)
        v = self.v_linear(x)
        
        # Scores (relevance)
        scores = torch.matmul(
            q, k.transpose(-2, -1)
        ) / math.sqrt(self.d_model)

        # Softmax
        weights = F.softmax(scores, dim=-1)

        # Weighted sum of values
        output = torch.matmul(weights, v)

        return output

In [None]:
class AttentionModel(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len)
        self.attention = SingleHeadAttention(d_model)
        self.linear = nn.Linear(d_model, vocab_size)

    def forward(self, inputs):

        # Embed and position
        x = self.embedding(inputs)
        x = self.pos_encoder(x)

        # Self-attention
        x = self.attention(x)

        # Aggregate
        x = x.mean(dim=1)

        return self.linear(x)

In [None]:
vocab = {0: "", 1: "we", 2: "are"}
inv_vocab = {v: k for k, v in vocab.items()}
d_model = 8
vocab_size = 10
max_len = 10

def encode(words):
    return torch.tensor(
        [inv_vocab[w] for w in words], dtype=torch.long
    ).unsqueeze(0)

model = AttentionModel(vocab_size, d_model)

input_1 = encode(["we", "are"])
input_2 = encode(["are", "we"])

model.eval()
with torch.no_grad():

    # Forward pass
    vec1 = model.embedding(input_1)
    vec1 = model.pos_encoder(vec1)
    vec1 = model.attention(vec1)
    vec1_pooled = vec1.mean(dim=1)

    vec2 = model.embedding(input_2)
    vec2 = model.pos_encoder(vec2)
    vec2 = model.attention(vec2)
    vec2_pooled = vec2.mean(dim=1)

diff = torch.norm(vec1_pooled - vec2_pooled).item()

print(f"Vector 'we are': {vec1_pooled.numpy()[0][:4]}...")
print(f"Vector 'are we': {vec2_pooled.numpy()[0][:4]}...")
print("-" * 30)
print(f"Euclidean Distance: {diff:.6f}")

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()

        assert d_model % num_heads == 0, "d_model not divisible by num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # Input: d_model
        # Output: num_heads * d_k = d_model
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)

        self.out_linear = nn.Linear(d_model, d_model)

    def forward(self, x):

        batch_size = x.size(0)

        # Shape: [batch, seq_len, d_model]
        q = self.q_linear(x)
        k = self.k_linear(x)
        v = self.v_linear(x)

        # Split into heads
        # Reshape to: [batch, seq_len, num_heads, d_k]
        # Then transpose to: [batch, num_heads, seq_len, d_k]
        # This puts 'num_heads' into the batch dimension for parallel processing
        q = q.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = k.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = v.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # Scaled dot-product attention
        # Mat-mult: [batch, heads, seq, d_k] x [batch, heads, d_k, seq]
        # Result: [batch, heads, seq, seq]
        scores = torch.matmul(
            q, k.transpose(-2, -1)
        ) / math.sqrt(self.d_k)
        weights = F.softmax(scores, dim=-1)

        # Apply weights to values
        attention_output = torch.matmul(weights, v)

        # Cat heads
        # Transpose back: [batch, seq_len, num_heads, d_k]
        # Flatten back: [batch, seq_len, d_model]
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        # Final layer (mix heads)
        return self.out_linear(attention_output)

In [None]:
# Setup
d_model = 16
num_heads = 2
seq_len = 3

model = MultiHeadAttention(d_model, num_heads)

x = torch.randn(1, seq_len, d_model)

# Forward Pass manually to catch the weights
with torch.no_grad():
    q = model.q_linear(x).view(1, -1, num_heads, d_model//num_heads).transpose(1, 2)
    k = model.k_linear(x).view(1, -1, num_heads, d_model//num_heads).transpose(1, 2)
    scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_model//num_heads)
    weights = F.softmax(scores, dim=1)

    print(f"Weights shape: {weights.shape}")

# Plotting
labels = ["Word A", "Word B", "Word C"]
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

for h in range(num_heads):
    # Extract heatmap for head 'h'
    w_matrix = weights[0, h].numpy()
    
    sns.heatmap(w_matrix, annot=True, cmap="Blues", ax=axes[h],
                xticklabels=labels, yticklabels=labels)
    axes[h].set_title(f"Head {h+1} Attention Pattern")

plt.show()

In [None]:
# FFN / MLP
# Standard MLP applied to every token independently
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = self.linear1(x)
        x = F.relu(x)
        x = self.linear2(x)
        return x

# Transformer block
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()

        # Sub-layer 1 - attention
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)

        # Sub-layer 2 - feed-forward
        self.ff = FeedForward(d_model, d_ff)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):

        # Sub-layer 1 - attention
        attn_out = self.attention(x)

        # Add and norm
        x = self.norm1(x + attn_out)

        # Calc FFN
        ff_out = self.ff(x)

        # Add and norm
        x = self.norm2(x + ff_out)

        return x

In [None]:
d_model = 16
num_heads = 2
d_ff = 64
seq_len = 5

block = TransformerBlock(d_model, num_heads, d_ff)

# Dummy input
input_tensor = torch.randn(1, seq_len, d_model)

# Fwd pass
output_tensor = block(input_tensor)

print(f"Input Shape:  {input_tensor.shape}")
print(f"Output Shape: {output_tensor.shape}")

In [None]:
diff = torch.norm(input_tensor - output_tensor).item()
print(f"Change Magnitude: {diff:.4f} (Vectors were updated)")

In [None]:
# Check LayerNorm working
# LayerNorm forces mean of last dimension to be ~0 and std to ~1.
mean = output_tensor[0, 0].mean().item()
std  = output_tensor[0, 0].std().item()
print(f"Output Token 0 Stats -> Mean: {mean:.4f}, Std: {std:.4f}")

In [None]:
# New Multi Head with causal mask

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)

        self.out_linear = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):

        batch_size = x.size(0)

        # Projections and split heads
        q = self.q_linear(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.k_linear(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.v_linear(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # Scores
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)

        # New mask
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))

        weights = F.softmax(scores, dim=-1)
        output = torch.matmul(weights, v)

        output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        return self.out_linear(output)

In [None]:
class SimpleTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, max_len=100):
        super().__init__()

        # Embedding and position
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len)

        # Stack of transformer blocks
        self.blocks = nn.ModuleList([
            TransformerBlock(d_model, num_heads, d_ff=d_model*4)
            for _ in range(num_layers)
        ])

        # Final output head
        # Projects back from d_model to vocab_size
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, x):

        # x: [batch, seq_len]
        seq_len = x.size(1)

        # Creat causal mask
        # Upper triangle is 0 (future), lower triangle is 1 (past)
        mask = torch.tril(torch.ones(seq_len, seq_len)).to(x.device)

        # Embed
        x = self.embedding(x)
        x = self.pos_encoder(x)

        # Pass through blocks
        for block in self.blocks:
            # Update TransformerBlock to pass the mask to attention
            # Assuming we updated TransformerBlock.forward to accept 'mask'
            x = block(x, mask)

        return self.fc_out(x)

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()

        self.attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.ff = FeedForward(d_model, d_ff)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x, mask):
        attn_out = self.attention(x, mask)
        x = self.norm1(x + attn_out)
        ff_out = self.ff(x)
        x = self.norm2(x + ff_out)
        return x

In [None]:
# Setup
vocab = {0: "", 1: "we", 2: "are", 3: "happy"}
inv_vocab = {v: k for k, v in vocab.items()}

d_model = 16
num_heads = 4
num_layers = 2
vocab_size = 4

model = SimpleTransformer(vocab_size, d_model, num_heads, num_layers)
opt = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

# Data
# input [we, are]
# output [are, happy]
x = torch.tensor([1, 2]).unsqueeze(0)
y = torch.tensor([2, 3]).unsqueeze(0)

# Training
for epoch in range(200):
    opt.zero_grad()

    # Forward pass
    logits = model(x)  # [1, 2, vocab_size]

    # Reshape for loss [batch*seq, vocab_size] vs. [batch*seq]
    loss = criterion(logits.view(-1, vocab_size), y.view(-1))

    loss.backward()
    opt.step()

    if epoch % 50 == 0:
        print(f"Epoch {epoch} loss = {loss.item():.4f}")

In [None]:
# Inference
model.eval()

# Complete "We ..."
test_input = torch.tensor([1]).unsqueeze(0)

with torch.no_grad():

    logits = model(test_input)

    pred_id = torch.argmax(logits[0, -1]).item()

    print(f"Prompt: 'we'")
    print(f"Prediction ID: {pred_id} ({vocab[pred_id]})")

    test_input2 = torch.tensor([1, 2]).unsqueeze(0)
    logits2 = model(test_input2)

    pred_id2 = torch.argmax(logits2[0, -1]).item()

    print(f"Prompt: 'we are'")
    print(f"Prediction ID: {pred_id2} ({vocab[pred_id2]})")

In [None]:
def generate_text(model, start_words, max_tokens=5):
    model.eval()

    # Initial context
    context_ids = [inv_vocab[w] for w in start_words.split()]
    input_tensor = torch.tensor(context_ids).unsqueeze(0)

    print("Starting with {start words}")

    # Generation loop
    for _ in range(max_tokens):
        with torch.no_grad():

            # Forward pass
            logits = model(input_tensor)

        # Pick next token
        next_token_logits = logits[0, -1, :]

        # Greedy decoding
        # (Temperature and random sampling here)
        next_token_id = torch.argmax(next_token_logits).item()

        # Decode
        next_word = vocab[next_token_id]

        if next_word == "": break

        print(f" -> Generated: '{next_word}'")

        # Append
        next_tensor = torch.tensor([[next_token_id]])
        input_tensor = torch.cat([input_tensor, next_tensor], dim=1)

    final_sentence = " ".join([vocab[idx.item()] for idx in input_tensor[0]])

    print(f"Final = {final_sentence}")

generate_text(model, "we", max_tokens=2)

# Lecture 7 - RAG

# Lecture 8 - Multimodal LLMs

# Lecture 9 - Diffusion and Graph Networks

In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# Define a 1D distribution p(x)

torch.manual_seed(0)
np.random.seed(0)

# Target distribution: mixture of Gaussians
def sample_target(n):
    comp = torch.randint(0, 3, (n,))
    means = torch.tensor([-2.0, 0.5, 2.5])
    stds  = torch.tensor([0.3, 0.2, 0.4])
    x = torch.randn(n) * stds[comp] + means[comp]
    return x.unsqueeze(1)

# Draw reference samples
x_ref = sample_target(20_000).numpy()

plt.hist(x_ref, bins=200, density=True)
plt.title("Target distribution p(x)")

In [None]:
# Define the NN - a straighforward FFNN / MLP

class Generator(nn.Module):
    def __init__(self):
        super().__init__()

        self.net = nn.Sequential(
            nn.Linear(1, 64),
            nn.ReLU(),
            nn.Linear(64,64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        x = self.net(x)
        return x

In [None]:
# Define the loss function
# (Used as a loss function in the standard way in the training loop;
# but comparing the *distribution* of the predicted and true samples,
# as opposed to pointwise comparison of paired x, \hat{y}

# NB "sliced" Wasserstein in 1D is just Wasserstein
# Slicing relevant for higher dimensions where true Wasserstein expensive,
# so project onto random 1D slices

def sliced_wasserstein_1d(x_fake, x_real):
    # Earth mover's distance in 1D is just: sort both distributions
    # and compare element-wise
    x_fake_sorted, _ = torch.sort(x_fake.view(-1))
    x_real_sorted, _ = torch.sort(x_real.view(-1))
    return torch.mean((x_fake_sorted - x_real_sorted) ** 2)

In [None]:
# Training loop

# Instantiate the network
G = Generator()

# Define optimiser
optimizer = torch.optim.Adam(G.parameters(), lr=1e-3)

n_samples = 4096
for epoch in range(3001):

    z = torch.randn(n_samples, 1)

    # Generate fake / generated data for this epoch
    x_fake = G(z)

    # Real data drawn from the known true distribution, for this epoch
    x_real = sample_target(n_samples)

    loss = sliced_wasserstein_1d(x_fake, x_real)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if epoch % 300 == 0:
        print(f"Epoch {epoch:4d} | loss = {loss.item():.6f}")

In [None]:
# Inference

with torch.no_grad():
    z = torch.randn(50_000, 1)
    x_gen = G(z).numpy()

In [None]:
plt.figure(figsize=(8,4))
plt.hist(x_ref, bins=200, density=True, alpha=0.5, label="Target")
plt.hist(x_gen, bins=200, density=True, alpha=0.5, label="Generated")
plt.legend()
plt.title("True distribution vs learned sampler")
plt.savefig("../assets/images/1d_distribution_sampling.png")

# MLOps

In [None]:
%pip install git+https://github.com/seppe-intelliprove/face-detection-onnx

In [None]:
from fdlite import FaceDetection, FaceDetectionModel
from fdlite.render import Colors, detections_to_render_data, render_to_image
import PIL
from IPython.display import display

In [None]:
def detect_faces(image: PIL.Image):
    detect_faces = FaceDetection(model_type=FaceDetectionModel.BACK_CAMERA)
    faces = detect_faces(image)
    print(f"Found {len(faces)} faces")
    return faces


def mark_faces(image_filename):
    """Mark all faces recognized in the image"""
    image = PIL.Image.open(image_filename)

    faces = detect_faces(image)

    # Draw faces
    render_data = detections_to_render_data(
        faces, bounds_color=Colors.GREEN, line_width=3
    )
    render_to_image(render_data, image)

    display(image)

In [None]:
!wget https://upload.wikimedia.org/wikipedia/commons/3/3d/Apollo_11_Crew.jpg
mark_faces("Apollo_11_Crew.jpg")

In [None]:
!curl -L -A "Mozilla/5.0" "https://upload.wikimedia.org/wikipedia/commons/thumb/0/07/Isabella_L%C3%B6vin_signing_climate_law_referral.jpg/1024px-Isabella_L%C3%B6vin_signing_climate_law_referral.jpg" -o IL.jpg
mark_faces("IL.jpg")

In [None]:
!curl -L -A "Mozilla/5.0" "https://upload.wikimedia.org/wikipedia/commons/thumb/6/6d/20180610_FIFA_Friendly_Match_Austria_vs._Brazil_Miranda_850_0051.jpg/1024px-20180610_FIFA_Friendly_Match_Austria_vs._Brazil_Miranda_850_0051.jpg" -o FIFA.jpg
mark_faces("FIFA.jpg")

# Model emulator; AIFS and AICON

# AI Data Assimilation

## Modulated sine background with 1 sample

In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [None]:
rng = np.random.default_rng(7)

In [None]:
n = 256
x_grid = np.linspace(0.0, 1.0, n, endpoint=False)

In [None]:
# ----------------------------
# "True" state: modulated sine
#   y(x) = A(x) * sin(2π k x + phase) + trend
# ----------------------------
k = 3.0
phase = 0.4
A0 = 1.0
A1 = 0.35
A_mod_k = 1.0  # modulation wavenumber

A = A0 + A1 * np.sin(2*np.pi*A_mod_k * x_grid + 0.7)
trend = 0.15 * (x_grid - 0.5)
x_true = A * np.sin(2*np.pi*k * x_grid + phase) + trend

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.plot(x_grid, x_true)

In [None]:
# ----------------------------
# Background xb: biased + smoothed + small noise
# ----------------------------
bias = 0.10
shift = 4  # grid points, periodic shift
x_shifted = np.roll(x_true, shift)

# simple smoothing via convolution (periodic padding)
sigma_pts = 2.0
radius = int(np.ceil(4 * sigma_pts))
t = np.arange(-radius, radius + 1)
ker = np.exp(-(t**2) / (2 * sigma_pts**2))
ker /= ker.sum()

x_pad = np.r_[x_shifted[-radius:], x_shifted, x_shifted[:radius]]
x_smooth = np.convolve(x_pad, ker, mode="same")[radius:-radius]

xb = x_smooth + bias + 0.03 * rng.standard_normal(n)

In [None]:
plt.plot(x_grid, x_true)
plt.plot(x_grid, xb)