# Drone Simulation

Example run. This run just uses a simple PD control.

In [2]:
from itertools import cycle
import time
import pybullet as p
from drone_world import World, QuadPDController


pybullet build time: May 17 2025 21:07:23


## Note for MacOS users

The simulation software package that I use does not play nicely with Macos
and causes a lot of "hanging processes".

After running a GUI visuallization, you will need to restart the Jupyter 
Notebook Kernel to spawn a new "working" GUI session.

Skip this next cell if you just want to see the training.

In [1]:

world = World(controller_type=QuadPDController, connection_type=p.GUI)

waypoints = cycle([
    (0, 0, 1),
    (1, 0, 2),
    (1, 1, 1),
    (0, 1, 2),
    (1, 0, 1),
    (1, 1, 2),
])

try:
    for k in range(30 * 240):
        # change target when waypoint is reached
        if world.reached():
            waypoint = next(waypoints)
            print("going to waypoint", waypoint)
            world.set_target(waypoint)

        world.step()
finally: 
    world.end()

NameError: name 'World' is not defined

# Can can use this to log our controls

This can create example data for our model.

In [7]:

import numpy as np


# Create new world using a "direct" connection to skip GUI.
# Record controls to use as training data
world = World(controller_type=QuadPDController, connection_type=p.DIRECT, record_controls=True)

# 500 points, 3 coords: x,y ~ [-5,5), z ~ [1,2)
xs = np.random.rand(500) * 10 - 5
ys = np.random.rand(500) * 10 - 5
zs = np.random.uniform(1, 6, size=500)

waypoints = cycle(zip(xs, ys, zs))

for k in range(30 * 240):
    # change target when waypoint is reached
    if world.reached():
        waypoint = next(waypoints)
        print("going to waypoint", waypoint)
        world.set_target(waypoint)

    world.step()
world.end()

print(world.controls[0])
# We get a big list of controls!

going to waypoint (np.float64(-4.8879130344595065), np.float64(2.2586541864531373), np.float64(2.8457950654744115))
going to waypoint (np.float64(-1.2685771739160359), np.float64(-3.5197611157193096), np.float64(3.1707308744493545))
going to waypoint (np.float64(-0.8925231780835148), np.float64(0.3920072911638126), np.float64(5.049610221900777))
going to waypoint (np.float64(3.4101849997123175), np.float64(2.4322267205175176), np.float64(1.4994974478135301))
going to waypoint (np.float64(3.5311901354553434), np.float64(0.4821239236161565), np.float64(3.383563663547471))
going to waypoint (np.float64(-0.48561885022913476), np.float64(-3.9268252705963045), np.float64(3.0210521216390815))
going to waypoint (np.float64(-0.4186889680690067), np.float64(3.782653556744819), np.float64(3.8897902964360926))
going to waypoint (np.float64(-0.13380784453951833), np.float64(4.098502888317396), np.float64(2.7739952772160006))
going to waypoint (np.float64(-4.762250133759682), np.float64(2.3962309619

# Define a NN to learn from our controls

In [8]:
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, random_split

# ---------- 1) Feature extraction ----------
def sample_to_xy(sample):
    s, t = sample["state"], sample["target"]
    # state: pos(3), vel(3), rpy(3), R(3x3), w_body(3) = 21
    pos   = np.asarray(s["pos"], dtype=np.float64).ravel()
    vel   = np.asarray(s["vel"], dtype=np.float64).ravel()
    rpy   = np.asarray(s["rpy"], dtype=np.float64).ravel()
    R     = np.asarray(s["R"], dtype=np.float64).ravel()        # 9
    w     = np.asarray(s["w_body"], dtype=np.float64).ravel()
    # target: pos(3) + yaw(1) = 4
    tpos  = np.asarray(t["pos"], dtype=np.float64).ravel()
    yaw   = np.array([t["yaw"]], dtype=np.float64)

    x = np.concatenate([pos, vel, rpy, R, w, tpos, yaw])  # shape (25,)
    y = np.asarray(sample["thrusts"], dtype=np.float64).ravel()  # shape (4,)
    return x, y

# ---------- 2) Dataset ----------
class ThrustDataset(Dataset):
    def __init__(self, data_list):
        X, Y = zip(*(sample_to_xy(s) for s in data_list))
        self.X = np.stack(X).astype(np.float32)  # (N, 25)
        self.Y = np.stack(Y).astype(np.float32)  # (N, 4)

        # simple standardization (save stats for inference)
        self.x_mean = self.X.mean(axis=0, keepdims=True)
        self.x_std  = self.X.std(axis=0, keepdims=True) + 1e-8
        self.y_mean = self.Y.mean(axis=0, keepdims=True)
        self.y_std  = self.Y.std(axis=0, keepdims=True) + 1e-8

        self.Xn = (self.X - self.x_mean) / self.x_std
        self.Yn = (self.Y - self.y_mean) / self.y_std

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = torch.from_numpy(self.Xn[idx])   # (25,)
        y = torch.from_numpy(self.Yn[idx])   # (4,)
        return x, y

# ---------- 3) Model (tanh MLP) ----------
class TanhMLP(nn.Module):
    def __init__(self, in_dim=25, out_dim=4, hidden=(128, 128)):
        super().__init__()
        layers = []
        last = in_dim
        for h in hidden:
            layers += [nn.Linear(last, h), nn.Tanh()]
            last = h
        layers += [nn.Linear(last, out_dim)]
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)

# ---------- 4) Training utility ----------
def train_model(data_list, epochs=100, batch_size=128, lr=1e-3, val_split=0.2, hidden=(128,128), device="cpu"):
    ds = ThrustDataset(data_list)
    n_val = int(len(ds) * val_split)
    n_tr  = len(ds) - n_val
    tr_ds, val_ds = random_split(ds, [n_tr, n_val], generator=torch.Generator().manual_seed(0))

    tr_loader  = DataLoader(tr_ds, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)

    model = TanhMLP(in_dim=ds.X.shape[1], out_dim=ds.Y.shape[1], hidden=hidden).to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()

    def run_epoch(loader, train=True):
        model.train(train)
        total = 0.0
        with torch.set_grad_enabled(train):
            for xb, yb in loader:
                xb, yb = xb.to(device), yb.to(device)
                pred = model(xb)
                loss = loss_fn(pred, yb)
                if train:
                    opt.zero_grad()
                    loss.backward()
                    opt.step()
                total += loss.item() * xb.size(0)
        return total / len(loader.dataset)

    for ep in range(1, epochs+1):
        tr_loss = run_epoch(tr_loader, train=True)
        val_loss = run_epoch(val_loader, train=False) if n_val > 0 else float('nan')
        if ep % 5 == 0 or ep == 1 or ep == epochs:
            print(f"Epoch {ep:3d} | train MSE(norm): {tr_loss:.5f}" + (f" | val: {val_loss:.5f}" if n_val > 0 else ""))

    # package normalization for inference
    norm = {
        "x_mean": torch.from_numpy(ds.x_mean.astype(np.float32)),
        "x_std":  torch.from_numpy(ds.x_std.astype(np.float32)),
        "y_mean": torch.from_numpy(ds.y_mean.astype(np.float32)),
        "y_std":  torch.from_numpy(ds.y_std.astype(np.float32)),
    }
    return model, norm

# ---------- 5) Inference helper (denormalize output) ----------
@torch.no_grad()
def predict_thrusts(model, norm, samples):
    xs = np.stack([sample_to_xy(s)[0] for s in samples]).astype(np.float32)  # (B,25)
    Xn = (xs - norm["x_mean"].numpy()) / norm["x_std"].numpy()
    Xn = torch.from_numpy(Xn)
    yn = model(Xn)
    y  = yn * norm["y_std"] + norm["y_mean"]
    return y.numpy()  # (B,4)

# Suppose `data` is your list of dicts like the one you showed
model, norm = train_model(world.controls, epochs=150, batch_size=256, lr=1e-3, hidden=(128,128))

# Predict on a few samples
pred = predict_thrusts(model, norm, world.controls[:5])  # shape (5,4)
print(pred)

Epoch   1 | train MSE(norm): 0.79060 | val: 0.57507
Epoch   5 | train MSE(norm): 0.08123 | val: 0.07575
Epoch  10 | train MSE(norm): 0.03289 | val: 0.03292
Epoch  15 | train MSE(norm): 0.01647 | val: 0.01648
Epoch  20 | train MSE(norm): 0.00961 | val: 0.00966
Epoch  25 | train MSE(norm): 0.00653 | val: 0.00645
Epoch  30 | train MSE(norm): 0.00441 | val: 0.00448
Epoch  35 | train MSE(norm): 0.00342 | val: 0.00353
Epoch  40 | train MSE(norm): 0.00263 | val: 0.00247
Epoch  45 | train MSE(norm): 0.00200 | val: 0.00198
Epoch  50 | train MSE(norm): 0.00163 | val: 0.00175
Epoch  55 | train MSE(norm): 0.00137 | val: 0.00155
Epoch  60 | train MSE(norm): 0.00118 | val: 0.00129
Epoch  65 | train MSE(norm): 0.00108 | val: 0.00114
Epoch  70 | train MSE(norm): 0.00089 | val: 0.00114
Epoch  75 | train MSE(norm): 0.00077 | val: 0.00086
Epoch  80 | train MSE(norm): 0.00071 | val: 0.00072
Epoch  85 | train MSE(norm): 0.00066 | val: 0.00078
Epoch  90 | train MSE(norm): 0.00068 | val: 0.00071
Epoch  95 | 

# Now we have a simple model that learned to copy our PD Controller!

Lets visuallize its outputs!

## But first, we need to define a controller class that uses our trained model

In [9]:
from drone_world import Controller, Drone

def build_features(state, target) -> np.ndarray:
    s, t = state, target
    pos   = np.asarray(s["pos"], dtype=np.float64).ravel()
    vel   = np.asarray(s["vel"], dtype=np.float64).ravel()
    rpy   = np.asarray(s["rpy"], dtype=np.float64).ravel()
    R     = np.asarray(s["R"], dtype=np.float64).ravel()          # (9,)
    w     = np.asarray(s["w_body"], dtype=np.float64).ravel()

    tpos  = np.asarray(t["pos"], dtype=np.float64).ravel()
    yaw   = np.array([t["yaw"]], dtype=np.float64)

    x = np.concatenate([pos, vel, rpy, R, w, tpos, yaw]).astype(np.float32)  # (25,)
    return x


class LearnedController(Controller):
    def __init__(self, drone: Drone) -> None:
        self.x_mean = norm["x_mean"].float().cpu().numpy()  # (1,25)
        self.x_std  = norm["x_std"].float().cpu().numpy()   # (1,25)
        self.y_mean = norm["y_mean"].float().cpu().numpy()  # (1,4)
        self.y_std  = norm["y_std"].float().cpu().numpy()   # (1,4)


    def update(self, state, target) -> tuple:
        x = build_features(state, target)                       # (25,)
        xn = (x - self.x_mean.squeeze(0)) / self.x_std.squeeze(0)

        # 2) model forward (normalized space)
        xb = torch.from_numpy(xn[None, :])

        with torch.no_grad():
            yn = model(xb).cpu().numpy()[0]
            y = yn * self.y_std.squeeze(0) + self.y_mean.squeeze(0)  # (4,)
            return tuple(float(v) for v in y), (0, 0, 0, 0) 

In [None]:
world = World(controller_type=LearnedController, connection_type=p.GUI)

waypoints = cycle([
    (0, 0, 1),
    (1, 0, 2),
    (1, 1, 1),
    (0, 1, 2),
    (1, 0, 1),
    (1, 1, 2),
])

try:
    for k in range(30 * 240):
        # change target when waypoint is reached
        if world.reached():
            waypoint = next(waypoints)
            print("going to waypoint", waypoint)
            world.set_target(waypoint)

        world.step()
finally: 
    world.end()

Version = 4.1 Metal - 89.4
Vendor = Apple
Renderer = Apple M1
going to waypoint (0, 0, 1)
b3Printf: Selected demo: Physics Server
startThreads creating 1 threads.
starting thread 0
started thread 0 
MotionThreadFunc thread started
going to waypoint (1, 0, 2)


We can see this model doesn't work very well, but this is just a demonstration
of our *workflow*, not any of the models we actually intend to train.