In [1]:
import os
import torch
import torch.nn as nn
import math
import numpy as np

from torch.distributions import Bernoulli
from delivery_drone.game.socket_client import DroneGameClient, DroneState

pygame 2.6.1 (SDL 2.28.4, Python 3.12.9)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
client = DroneGameClient()
client.connect()

Connecting to localhost:5555...
Connected to server at localhost:5555
Server has 1 game instance(s)


### Going from `state` -> `reward`

In [3]:
def linear_scaler(value, scaler):
    # Does value * scaler
    return value * scaler

def gausian_scaler(value, sigma=0.1, scaler=1):
    return scaler * math.exp(-value**2/(2*sigma**2))

def exponential_decay(value, decay=10, scaler=1):
    return scaler * math.exp(-decay*(abs(value)))

def inverse_quadratic(value, decay=10, scaler=1):
    return scaler * (1/(1+(decay*value**2)))

def inverse_linear(value, decay=10, scaler=1):
    return scaler * (1/(1+(decay*abs(value))))

def scaled_shifted_negative_sigmoid(value, sigma=10, scaler=1):
    return scaler * -1 * (1/(1+math.exp(sigma*(value-0.5))))

In [4]:
def calc_reward(state: DroneState):
    reward = 0
    
    # 1. Time penalty - make hovering expensive
    reward -= 0.25
    
    # 2. Velocity alignment
    velocity_dot = (state.drone_vx * state.dx_to_platform + 
                   state.drone_vy * state.dy_to_platform)
    
    if velocity_dot > 0:
        reward += velocity_dot * 2.0
    else:
        reward -= abs(velocity_dot) * 0.5
    
    # 3. Angle penalty - target upright (landing requires |angle| < 20° = 0.111 normalized)
    angle_penalty = -(abs(state.drone_angle) ** 2) * 0.3
    reward += angle_penalty
    
    # 4. Angular velocity penalty
    angular_vel_penalty = -(abs(state.drone_angular_vel) ** 2) * 0.3
    reward += angular_vel_penalty
    
    # 5. CRITICAL: Horizontal alignment reward
    # Platform width is 100px = 0.125 normalized, so must be within ±0.0625
    if state.distance_to_platform < 0.2:
        abs_dx = abs(state.dx_to_platform)
        
        if abs_dx < 0.06:  # Within platform horizontally!
            # Give HUGE bonus for being horizontally aligned
            horizontal_bonus = (0.06 - abs_dx) * 50.0  # Up to +3.0
            reward += horizontal_bonus
            
            # Extra bonus if also stable
            if abs(state.drone_angle) < 0.11 and state.speed < 0.3:
                reward += 5.0  # Ready to land!
        else:
            # Outside platform - penalty for being close but misaligned
            reward -= 2.0
    
    # 6. Speed control - landing requires speed < 0.3 (normalized)
    if state.distance_to_platform < 0.15:
        if state.speed > 0.3:
            # Too fast for landing distance
            speed_penalty = -(state.speed - 0.3) ** 2 * 10.0
            reward += speed_penalty
    
    # 7. Hovering penalty
    if state.speed < 0.15:
        reward -= 2.5  # Strong anti-hovering
    
    # 8. Terminal rewards
    if state.landed:
        reward += 500.0  # MASSIVE landing reward
        reward += state.drone_fuel * 100.0
    
    if state.crashed:
        reward -= 30.0
    
    return reward


In [5]:
state = client.reset(0)
calc_reward(state)

-2.75

### Going from `reward` -> `loss`

There are several approaches to convert RL rewards into a loss function for neural networks:
1. Policy Gradient Methods (REINFORCE, PPO, A3C)
Maximize expected reward by minimizing negative log-likelihood weighted by returns:
```py
loss = -log_prob(action) * reward
# Or with advantage:
loss = -log_prob(action) * advantage
```
2. Q-Learning / DQN
Minimize TD (Temporal Difference) error:
```py
# Predict Q-value for action taken
q_predicted = model(state)[action]

# Target Q-value (Bellman equation)
q_target = reward + gamma * max(model(next_state))

# MSE loss
loss = (q_predicted - q_target)^2
```
3. Actor-Critic Methods (A2C, SAC)
Two separate losses:
```py
# Actor loss (policy)
actor_loss = -log_prob(action) * advantage

# Critic loss (value function)
critic_loss = (value_predicted - value_target)^2
```

In [6]:
def neg_log_prob_loss(action_probs, action_index, reward):
    """
    Computes the negative log-probability loss for policy gradient methods.

    Args:
        action_probs (torch.Tensor): Tensor of probabilities for each action.
        action_index (int): Index of the action taken.
        reward (float): Scalar reward. Can be replaced by advantage for advantage-based methods.

    Returns:
        torch.Tensor: The computed loss value.
    """

    loss = -torch.log(action_probs[action_index]) * reward
    return loss

In [7]:
def Q_loss(q_predicted, reward, gamma, q_next_state, done):
    """
    Computes the Q-learning loss using TD error.
    
    Args:
        q_predicted (torch.Tensor): Predicted Q-value for the taken action
        reward (float): Immediate reward received
        gamma (float): Discount factor for future rewards
        q_next_state (torch.Tensor): Predicted Q-values for next state
        done (bool): Whether the episode has ended
        
    Returns:
        torch.Tensor: The computed TD error loss
    """
    # If done, next state value is 0, otherwise it's the max Q-value of next state
    next_value = 0 if done else torch.max(q_next_state)
    
    # Compute target Q-value using Bellman equation
    q_target = reward + gamma * next_value
    
    # Compute MSE loss
    loss = (q_predicted - q_target.detach()) ** 2
    
    return loss

In [8]:
state = client.get_state(0)
display(state.__dict__)

calc_reward(state)

{'drone_x': 0.5,
 'drone_y': 0.16666666666666666,
 'drone_vx': 0.0,
 'drone_vy': 0.0,
 'drone_angle': 0.0,
 'drone_angular_vel': 0.0,
 'drone_fuel': 1.0,
 'platform_x': 0.36,
 'platform_y': 0.81,
 'distance_to_platform': 0.502400487658999,
 'dx_to_platform': -0.14,
 'dy_to_platform': 0.6433333333333333,
 'speed': 0.0,
 'landed': False,
 'crashed': False}

-2.75

## Let's Create a Policy Network now

In [9]:
def state_to_array(state):
    """Convert DroneState dataclass to numpy array"""
    data = np.array([
        state.drone_x,
        state.drone_y,
        state.drone_vx,
        state.drone_vy,
        state.drone_angle,
        state.drone_angular_vel,
        state.drone_fuel,
        state.platform_x,
        state.platform_y,
        state.distance_to_platform,
        state.dx_to_platform,
        state.dy_to_platform,
        state.speed,
        float(state.landed),
        float(state.crashed)
    ])
    
    return torch.tensor(data, dtype=torch.float32)

In [10]:
class DroneGamerBoi(nn.Module):
    def __init__(self, state_dim=15):
        super().__init__()
        
        self.network = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.LayerNorm(128),  # Add normalization
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.LayerNorm(128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.LayerNorm(64),
            nn.ReLU(),
            nn.Linear(64, 3),
            nn.Sigmoid()
        )
        
    def forward(self, state):
        if isinstance(state, DroneState):
            state = state_to_array(state)
        
        return self.network(state)

## How do I train my policy:

1. simple approach - Online Learning (Naive):
```py
state = Reset_game()
for _ in range(training_steps):
    probs = policy(state)
    action = sample_from(probs)
    state = game_update(action)
    reward = calc_reward
    loss = loss_fn(reward)
    gradient_step(policy, loss)
```
**Problems**:
- Too much Varience
- Our policy will learn to do erratic movements

(Not going to implement this BS)

2. Episodes - (Less, but still, naive):
**Core Idea**: _Take one episode, i.e, let the policy sample till the episode ends, which means either the drone crashed or landed._

```py
for _ in range(num_training_episode):
    # Collect full episode`
    states, actions, rewards = [], [], []
    state = env.reset()
    while not done:
        probs = policy(state)
        action = sample_from(probs)
        state = game_update(action)

        reward = calc_reward(state)
        
        states.append(state)
        actions.append(action)
        rewards.append(reward)
        
    total_loss = sum([loss_fn(reward) for reward in rewards])
    gradient_step(policy, total_loss)
```

-> This is way better than the previous one, because we will at least optimize for winning (or, negatively, for losing) the game

**Problems**: _Still high varience, the policy may learn how to win, but it may also reinforce erratic behaviour_

---

My point is that there are many ways to do this!

#### Policy Gradient with Baseline

**Core Idea**:

1. Collect multiple episodes
2. Calculate the mean of all the returns (called _Baseline_)
3. Calculate advantage of each episode (i.e., subtract all episode's returns with the baseline)
4. Compute Loss of the actions weighted against the advantage.
5. Gradient descent

In [11]:
import time

from tqdm.notebook import trange, tqdm

In [None]:
policy = DroneGamerBoi() # initialize our policy
optimizer = torch.optim.AdamW(policy.parameters(), lr=1e-3)

In [12]:
# training configurations
num_iterations = 1000
num_episodes = client.num_games

bellman_gamma = 0.99

In [13]:
def collect_episodes(client: DroneGameClient, policy: DroneGamerBoi, max_steps=300):
    """
    Collect episodes with early stopping
    
    Args:
        max_steps: Maximum steps per episode (default: 300)
    """
    num_games = client.num_games
    
    # Initialize storage
    all_episodes = [{'states': [], 'actions': [], 'log_probs': [], 'rewards': [], 'done': False} 
                    for _ in range(num_games)]
    
    # Reset all games
    game_states = [client.reset(game_id) for game_id in range(num_games)]
    step_counts = [0] * num_games  # Track steps per game
    
    while not all(ep['done'] for ep in all_episodes):
        # Batch active games
        batch_states = []
        active_game_ids = []
        
        for game_id in range(num_games):
            if not all_episodes[game_id]['done']:
                batch_states.append(state_to_array(game_states[game_id]))
                active_game_ids.append(game_id)
        
        if len(batch_states) == 0:
            break
        
        # Batched inference
        batch_states_tensor = torch.stack(batch_states)
        batch_action_probs = policy(batch_states_tensor)
        batch_dist = Bernoulli(probs=batch_action_probs)
        batch_actions = batch_dist.sample()
        batch_log_probs = batch_dist.log_prob(batch_actions).sum(dim=1)
        
        # Execute actions
        for i, game_id in enumerate(active_game_ids):
            action = batch_actions[i]
            log_prob = batch_log_probs[i]
            
            next_state, _, done, _ = client.step({
                "main_thrust": int(action[0]),
                "left_thrust": int(action[1]),
                "right_thrust": int(action[2])
            }, game_id)
            
            reward = calc_reward(next_state)
            
            # Store data
            all_episodes[game_id]['states'].append(batch_states[i])
            all_episodes[game_id]['actions'].append(action)
            all_episodes[game_id]['log_probs'].append(log_prob)
            all_episodes[game_id]['rewards'].append(reward)
            
            # Update state and step count
            game_states[game_id] = next_state
            step_counts[game_id] += 1
            
            # Check done conditions
            if done or step_counts[game_id] >= max_steps:
                # Apply timeout penalty if hit max steps without landing
                if step_counts[game_id] >= max_steps and not next_state.landed:
                    all_episodes[game_id]['rewards'][-1] -= 75  # Timeout penalty
                
                all_episodes[game_id]['done'] = True
    
    # Return episodes
    return [(ep['states'], ep['actions'], ep['log_probs'], ep['rewards']) 
            for ep in all_episodes]

In [14]:
def compute_returns(rewards, gamma=0.99):
    """
    Compute discounted returns (G_t) for each timestep based on the Bellman equation
    
    G_t = r_t + γ*r_{t+1} + γ²*r_{t+2} + ...
    """
    returns = []
    G = 0
    
    # Compute backwards (more efficient)
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    
    return returns

In [15]:
steepness = 0.65
start = 75
end = 250

x = np.linspace(0, 1, num=num_iterations)
step_schedule = np.round(start + (end - start) * x**steepness).astype(np.int32)

In [None]:
# training loop
tqdm_iterations = trange(num_iterations, desc='', total=num_iterations)
# tqdm_episodes = trange(num_episodes, desc=f'Episode (0/{num_episodes})', total=num_episodes)

test_round_at = 5

for iteration in tqdm_iterations:
    # Collect from all games once (batched)
    max_steps = step_schedule[iteration]
    
    episodes = collect_episodes(client, policy, max_steps=max_steps)
    
    batch_log_probs = []
    batch_returns = []
    total_reward = 0
    episode_lengths = []  # Track this!
    num_successes = 0  # Track this!
    
    # Process all episodes
    for states, actions, log_probs, rewards in episodes:
        returns = compute_returns(rewards, gamma=bellman_gamma)
        batch_log_probs.extend(log_probs)
        batch_returns.extend(returns)
        total_reward += sum(rewards)
        episode_lengths.append(len(rewards))
        # Check if episode succeeded
        if rewards[-1] > 0:  # Last reward was positive (landed)
            num_successes += 1
    
    # Train
    returns_tensor = torch.tensor(batch_returns, dtype=torch.float32)
    baseline = returns_tensor.mean()
    advantages = (returns_tensor - baseline)
    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
    
    log_probs_tensor = torch.stack(batch_log_probs)
    loss = -(log_probs_tensor * advantages).mean()
    
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(policy.parameters(), max_norm=0.5)
    optimizer.step()
    
    tqdm_iterations.set_description(
        f'Success: {num_successes}/{len(episodes)} | '  # Success rate!
        f'Baseline: {baseline.item():.1f} | '
        f'Reward Std: {returns_tensor.std():.1f} | '  # Variance in returns
        f'Avg Len: {sum(episode_lengths)/len(episode_lengths):.1f} | '
        f'Loss: {loss.item():.4f} | '
        f'Max Steps: {max_steps}'
    )

In [None]:
# Save the model's state dictionary
torch.save(policy.state_dict(), 'models/drone_policy.pth')

# Optionally, save the entire model (includes architecture)
torch.save(policy, 'models/drone_policy_full.pt')

In [None]:
# Option 1: Load state dict into a new model instance
policy_from_state = DroneGamerBoi()
policy_from_state.load_state_dict(torch.load('delivery_drone/models/drone_policy.pth'))
policy_from_state.eval()  # Set to evaluation mode

# # Option 2: Load complete model
# policy_from_file = torch.load('drone_policy_full.pt')
# policy_from_file.eval()  # Set to evaluation mode

# # Use policy_from_file or policy_from_state as your loaded model
# policy = policy_from_file  # Choose which loaded version to use

FileNotFoundError: [Errno 2] No such file or directory: 'models/drone_policy.pth'

In [None]:
for _ in range(5):
    states, actions, log_probs, rewards = collect_episodes(client, policy_from_state, max_steps=300)[0]

In [None]:
state = client.get_state(0)
display(state.__dict__)
calc_reward(state)