In [33]:
import rasterio
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import colormaps
from scipy.interpolate import griddata
from mpl_toolkits.mplot3d import Axes3D
from scipy.ndimage import gaussian_filter
import matplotlib
matplotlib.use('qt5agg')  # Or 'tkagg'

In [35]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data_utils

In [37]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device("cpu")
    print("Using CPU")

Using GPU: NVIDIA GeForce RTX 4050 Laptop GPU


In [39]:
# Constants
k = 1.0  # Attractive potential scaling constant
m = 1.0  # Repulsive potential scaling constant
rho_0 = 10.0  # Obstacle influence radius
epsilon = 8.8e-12  # Small constant to prevent division by zero
step_size = 5.0  # Movement step size for the UAV
max_steps = 500  # Maximum number of steps

DEBUG_LIMIT = 5  # Limit the number of debug prints

# Start and Target positions
uav_x, uav_y, uav_z = 50, 400, 750
endpoint_x, endpoint_y, endpoint_z = 350, 1, 2100
UAV_position_initial = ([uav_x, uav_y, uav_z]) 
UAV_position = np.array([uav_x, uav_y, uav_z])  # Start position
goal_position = np.array([endpoint_x, endpoint_y, endpoint_z])  # Goal position
obstacle_positions = [(100.0, 200.0, 1000.0), (280.0, 70.0, 1500.0)]
no_fly_zones = [
    {"x_min": 100, "x_max": 150, "y_min": 200, "y_max": 250, "z_min": 900, "z_max": 1100},
    {"x_min": 250, "x_max": 300, "y_min": 100, "y_max": 150, "z_min": 1300, "z_max": 1600},
]

# UAV Dynamics Class
class UAVDynamics:
    def __init__(self, weight, sref, thrust_available):
        self.weight = weight  # UAV weight in lb
        self.sref = sref      # Reference wing area in ft²
        self.thrust_available = thrust_available  # Available thrust in lb
        self.cd = 0.05  # Drag coefficient (assumed constant)

    def compute_climb_rate(self, rho, velocity):
        q = 0.5 * rho * velocity**2  # Dynamic pressure
        drag = q * self.sref * self.cd  # Drag force
        climb_rate = (self.thrust_available - drag) / self.weight  # ft/s
        return climb_rate

# Parameters
weight = 43  # UAV weight in lb
sref = 5.67  # Reference wing area in ft²
thrust_available = 113  # Maximum thrust available in lb
rho = 0.0023769  # Air density at sea level in slugs/ft³
velocity = 38.2  # Airspeed in ft/s
safety_buffer = 100  # 10 ft above terrain as a safety margin

# Create UAV Dynamics Instance
uav = UAVDynamics(weight, sref, thrust_available)
climb_rate = uav.compute_climb_rate(rho, velocity)
print(f"Maximum Climb Rate: {climb_rate:.2f} ft/s")

################# initilize q(s,a) ##########################
actions = {
    0: (0, 0, 1),   # Up
    1: (0, 0, -1),  # Down
    2: (-1, 0, 0),  # Left
    3: (1, 0, 0),   # Right
    4: (0, 1, 0),   # Forward
    5: (0, -1, 0),  # Backward
    6: (0, 1, 1),   # Diagonal Up-Forward
    7: (0, -1, 1),  # Diagonal Up-Backward
    8: (-1, 0, 1),  # Diagonal Up-Left
    9: (1, 0, 1),  # Diagonal Up-Right
    10: (0, 1, -1), # Diagonal Down-Forward
    11: (0, -1, -1),# Diagonal Down-Backward
    12: (-1, 0, -1),# Diagonal Down-Left
    13: (1, 0, -1), # Diagonal Down-Right
    14: (-1, 1, 0), # Diagonal Left-Forward
    15: (-1, -1, 0),# Diagonal Left-Backward
    16: (1, 1, 0),  # Diagonal Right-Forward
    17: (1, -1, 0)  # Diagonal Right-Backward
}

# Define state and action space
num_actions = 18  # Total number of actions
initial_probabilities = {action: 1.0 / num_actions for action in actions}  # Uniform distribution

states = [(x, y, z) for x in range(50) for y in range(50) for z in range(10)]  # Example 3D grid

# Initialize Q-table
Q = {}

# Set initial values for all states and actions
initial_value = 0.0
for s in states:
    Q[s] = {a: initial_value for a in actions}
    
# Define terminal state
terminal_state = (endpoint_x, endpoint_y, endpoint_z)  # Example terminal state

# Set Q(terminal-state, a) = 0 for all actions
Q[terminal_state] = {a: 0.0 for a in actions}

Maximum Climb Rate: 2.62 ft/s


In [41]:
def get_terrain_elevation(x, y):
    x_idx = np.clip(int(x), 0, smoothed_elevation.shape[1] - 1)
    y_idx = np.clip(int(y), 0, smoothed_elevation.shape[0] - 1)
    return smoothed_elevation[y_idx, x_idx]

def is_in_no_fly_zone(position, no_fly_zones):
    for zone in no_fly_zones:
        if (zone["x_min"] <= position[0] <= zone["x_max"] and
            zone["y_min"] <= position[1] <= zone["y_max"] and
            zone["z_min"] <= position[2] <= zone["z_max"]):
            return True
    return False

# Function to compute the attractive potential
def compute_attractive_potential(position, goal):
    d_goal = np.linalg.norm(position - goal) + epsilon
    return 0.5 * k * d_goal**2

# Function to compute the attractive force
def compute_attractive_force(position, goal):
    return k * (goal - position)

# Function to compute the repulsive potential
def compute_repulsive_potential(position, obstacles):
    U_rep = 0.0
    for obs in obstacles:
        d_barrier = np.linalg.norm(position - obs)
        if d_barrier <= rho_0:
            d_barrier = max(d_barrier, epsilon)  # Avoid division by zero
            U_rep += 0.5 * m * (1 / d_barrier - 1 / rho_0)**2
    return U_rep

# Function to compute the repulsive force
def compute_repulsive_force(position, obstacles):
    F_rep = np.array([0.0, 0.0, 0.0])
    for obs in obstacles:
        d_barrier = np.linalg.norm(position - obs)
        if d_barrier <= rho_0:
            d_barrier = max(d_barrier, epsilon)  # Avoid division by zero
            F_rep += m * (1 / d_barrier - 1 / rho_0) * (1 / d_barrier**2) * (position - obs) / d_barrier
    return F_rep

# Function to compute individual repulsive forces
def compute_repulsive_force_separate(position, obstacles):
    F_rep_all = []
    for i, obs in enumerate(obstacles):
        d_barrier = np.linalg.norm(position - obs)
        F_rep = np.array([0.0, 0.0, 0.0])
        if d_barrier <= rho_0:
            d_barrier = max(d_barrier, epsilon)
            F_rep = m * (1 / d_barrier - 1 / rho_0) * (1 / d_barrier**2) * (position - obs) / d_barrier
        F_rep_all.append(np.linalg.norm(F_rep))
        # Correctly indented debug statement
#        print(f"Obstacle {i+1}: Distance = {d_barrier:.2f}, Force = {np.linalg.norm(F_rep):.2e}")
    return F_rep_all

# Function to calculate reward
def calculate_reward2(d_goal, d_barrier, d_max, D_cz, D_mz):
    reward = 0.0
    if d_goal >= D_cz and d_barrier >= D_cz:
        R_a = np.tanh((d_max - d_goal) / d_max) * d_goal
        reward += R_a
    elif D_mz <= d_barrier <= D_cz or D_mz <= d_goal <= D_cz:
        R_cz_goal = np.tanh((D_cz - d_goal) / D_cz) * d_goal
        R_cz_obstacle = np.tanh((D_cz - d_barrier) / D_cz) * d_barrier
        reward += R_cz_goal + R_cz_obstacle
    if d_barrier < D_mz:
        R_mz = np.tanh((D_mz - d_barrier) / D_mz) * d_barrier
        reward += R_mz
    return reward

# Function to update the UAV position
def update_position(position, force, step_size):
    return position + step_size * force / np.linalg.norm(force)

# Function to check if the UAV has reached the goal
def reached_goal(position, goal, tolerance=10.0):
    return np.linalg.norm(position - goal) <= tolerance

def predict_future_positions(position, force, step_size, steps_ahead=2):
    future_positions = []
    current_position = position.copy()
    normalized_force = force / np.linalg.norm(force)
    for _ in range(steps_ahead):
        current_position = current_position + step_size * normalized_force
        future_positions.append(current_position.copy())
    return future_positions

def get_terrain_elevations_for_future_positions(future_positions, raster_data):
    elevations = []
    for pos in future_positions:
        terrain_z = get_terrain_elevation(pos[0], pos[1])
        elevations.append(terrain_z)
    return elevations

def calculate_reward(d_goal_t, d_barrier_t, d_max, D_cz, D_mz, X_t, X_g, a_t, obstacles, 
                     Ra=1.0, Rcz=1.0, Rmz=1.0, critical_distance=10, obstacle_penalty=2.0, goal_weight=2.0):
    X_t_plus1 = X_t + a_t
    d_goal_t_plus1 = np.linalg.norm(X_t_plus1 - X_g)
    d_barrier_t_plus1 = min(np.linalg.norm(X_t_plus1 - obs) for obs in obstacles)
    
    Δdistance_to_goal = d_goal_t - d_goal_t_plus1
    Δdistance_to_obstacle = d_barrier_t_plus1 - d_barrier_t

    direction_goal = Δdistance_to_goal / abs(Δdistance_to_goal) if Δdistance_to_goal != 0 else 0
    direction_barrier = Δdistance_to_obstacle / abs(Δdistance_to_obstacle) if Δdistance_to_obstacle != 0 else 0

    reward = 0.0
    if Δdistance_to_goal < 0:  # Moving farther from the goal
        reward -= 10  # Strong penalty for moving away
    else:  # Moving closer to the goal
        reward += goal_weight * Δdistance_to_goal
    if d_barrier_t < critical_distance:  # Within critical obstacle zone
        penalty = obstacle_penalty * (critical_distance - d_barrier_t) / critical_distance
        reward -= penalty
    if d_goal_t >= D_cz and d_barrier_t >= D_cz:
        R_a = np.tanh((d_max - d_goal_t) / d_max) * direction_goal
        reward += Ra * R_a
    if D_mz <= d_barrier_t <= D_cz or D_mz <= d_goal_t <= D_cz:
        R_cz_goal = np.tanh((D_cz - d_goal_t) / D_cz) * direction_goal
        R_cz_obstacle = np.tanh((D_cz - d_barrier_t) / D_cz) * direction_barrier
        reward += Rcz * (R_cz_goal + R_cz_obstacle)
    if d_barrier_t < D_mz:
        R_mz = -np.tanh((D_mz - d_barrier_t) / D_mz) * direction_barrier
        reward += Rmz * R_mz
    return reward

def select_action_epsilon_greedy(Q, actions, epsilon):
    if np.random.rand() < epsilon:
        return random.choice(actions)  # Random exploration
    else:
        return actions[np.argmax(Q)]  # Exploitation

def select_action_softmax(Q, current_state, actions, tau):
    Q_values = np.array([Q[current_state, action] for action in actions])

    # Prevent overflow in exponential
    Q_clipped = np.clip(Q, -100, 100)  # Clip to avoid overflow
    exp_Q = np.exp(Q_clipped / tau)  # Compute exponentials safely
    probs = exp_Q / np.sum(exp_Q)  # Normalize to get probabilities

    # Check if probabilities contain NaN
    if np.any(np.isnan(probs)):
        print(f"Original Q-values: {Q}")
        print(f"Clipped Q-values: {Q_clipped}")
        print(f"Rescaled Q-values: {Q_rescaled}")
        print(f"Exponential values: {exp_Q}")
        print(f"Probabilities: {probs}")
        raise ValueError("Softmax probabilities contain NaN values")
    action_idx = np.random.choice(range(len(actions)), p=probs)  # Select based on probabilities
    return actions[action_idx], action_idx


def update_action_probabilities(P, Q, epsilon=0.1):
    num_actions = len(Q)
    # Identify indices of actions with the maximum Q-value
    best_action_indices = np.argwhere(Q == np.max(Q)).flatten()
    m = len(best_action_indices)  # Number of optimal actions
    # Start with uniform distribution
    updated_P = np.full(num_actions, epsilon / num_actions)
    # Distribute remaining probability among optimal actions
    remaining_probability = 1.0 - epsilon
    for i in best_action_indices:
        updated_P[i] += remaining_probability / m
    return updated_P

tau = 1.0
# Initialize uniform probabilities based on Eq. (19)
def initialize_uniform_probabilities(actions):
    num_actions = len(actions)  # |A_s| - total actions in the action set
    uniform_prob = 1.0 / num_actions  # Equal probability for each action
    return {action: uniform_prob for action in actions}

In [43]:
class QNetworkWithCNN(nn.Module):
    def __init__(self, state_size, action_size):
        super(QNetworkWithCNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=3, stride=1, padding=1)  # Conv1D for 1D input
        self.bn1 = nn.BatchNorm1d(16)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, stride=1, padding=1)  # Second Conv1D
        self.bn2 = nn.BatchNorm1d(32)

        # Adaptive pooling
        self.pool = nn.AdaptiveMaxPool1d(1)
        
        # Fully connected layers
        self.fc1 = nn.Linear(32, 64)  # First fully connected layer
        self.fc2 = nn.Linear(64, 128)  # Second fully connected layer
        self.fc3 = nn.Linear(128, action_size)  # Final layer outputs a single value... action_size = 18

        self.dropout = nn.Dropout(p=0.5)

    def forward(self, state):
        x = self.bn1(torch.relu(self.conv1(state)))
        x = self.pool(x)
        x = self.bn2(torch.relu(self.conv2(x)))
        x = self.pool(x).squeeze(-1)  # Flatten the pooled features
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout(x)
        return self.fc3(x)  # Output Q-values for each action

class DuelingDQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DuelingDQN, self).__init__()
        self.feature = nn.Sequential(
            nn.Linear(state_size, 128),
            nn.ReLU()
        )
        # Value and advantage streams
        self.value = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1)  # Outputs the value of the state
        )
        self.advantage = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, action_size)  # Outputs the advantages of each action
        )

    def forward(self, state):
        features = self.feature(state)
        value = self.value(features)
        advantage = self.advantage(features)
        q_values = value + advantage - advantage.mean(dim=1, keepdim=True)
        return q_values

import torchvision.models as models
class ResNetDQN(nn.Module):
    def __init__(self, action_size):
        super(ResNetDQN, self).__init__()
        # Use a pretrained ResNet backbone
        self.resnet = models.resnet18(pretrained=True)
        self.resnet.fc = nn.Identity()  # Remove classification layer
        
        # Add a fully connected layer for action-value prediction
        self.fc = nn.Linear(512, action_size)

    def forward(self, state):
        features = self.resnet(state)
        q_values = self.fc(features)
        return q_values


class RNNQNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(RNNQNetwork, self).__init__()
        self.rnn = nn.LSTM(input_size=state_size, hidden_size=128, batch_first=True)
        self.fc = nn.Linear(128, action_size)

    def forward(self, state):
        rnn_out, _ = self.rnn(state)
        q_values = self.fc(rnn_out[:, -1, :])  # Use the last time step's output
        return q_values


class ActorCritic(nn.Module):
    def __init__(self, state_size, action_size):
        super(ActorCritic, self).__init__()
        self.shared = nn.Sequential(
            nn.Linear(state_size, 128),
            nn.ReLU()
        )
        self.actor = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, action_size),  # Outputs action probabilities
            nn.Softmax(dim=-1)
        )
        self.critic = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1)  # Outputs state value
        )

    def forward(self, state):
        shared_out = self.shared(state)
        action_probs = self.actor(shared_out)
        state_value = self.critic(shared_out)
        return action_probs, state_value


from torch.nn import TransformerEncoder, TransformerEncoderLayer
class TransformerQNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(TransformerQNetwork, self).__init__()
        encoder_layer = TransformerEncoderLayer(d_model=state_size, nhead=4)
        self.transformer = TransformerEncoder(encoder_layer, num_layers=2)
        self.fc = nn.Linear(state_size, action_size)

    def forward(self, state):
        state = state.permute(1, 0, 2)  # Transformer expects [seq_len, batch_size, features]
        transformer_out = self.transformer(state)
        q_values = self.fc(transformer_out[-1])  # Use the last time step
        return q_values

In [45]:
# Example for one state
state_action_probabilities = initialize_uniform_probabilities(actions)
print("Initial Action Probabilities for the State:", state_action_probabilities)

# Load raster data
raster = rasterio.open(r'C:\Users\joshp\OneDrive\Desktop\CPP Fall 24\eVTOL\python code\evironment data\USGS_san_gab_mtns.tif')
elevation_data = raster.read(1)  # Read the elevation data

# Downsample the data by a factor of 25
downsample_factor = 25
elevation_data_downsampled = elevation_data[::downsample_factor, ::downsample_factor]

# Apply Gaussian smoothing to the downsampled data
smoothed_elevation = gaussian_filter(elevation_data_downsampled, sigma=5)

# Normalize the elevation data to range 0 to 3.5
normalized_elevation = smoothed_elevation / 1000  # Divide by 1000

# Define X and Y coordinates for the downsampled data
x = np.arange(normalized_elevation.shape[1])
y = np.arange(normalized_elevation.shape[0])
x, y = np.meshgrid(x, y)

# Define finer grid for interpolation
x_fine = np.linspace(0, normalized_elevation.shape[1] - 1, normalized_elevation.shape[1] * 2)
y_fine = np.linspace(0, normalized_elevation.shape[0] - 1, normalized_elevation.shape[0] * 2)
x_fine, y_fine = np.meshgrid(x_fine, y_fine)

# Interpolate normalized elevation data
interpolated_elevation = griddata(
    (x.ravel(), y.ravel()), normalized_elevation.ravel(), (x_fine, y_fine), method='cubic'
)

# Navigation loop
trajectory = [UAV_position.copy()]  # Track the UAV's path
F_rep_values = []  # Repulsive forces
distance_to_first_obstacle = []  # Distance to the first obstacle
distance_to_second_obstacle = []  # Distance to the second obstacle
F_rep_individual = [[], []]  # One list per obstacle
steps = 0

# REWARD
# Initialize reward tracking
rewards = []  # List to store rewards over time

# Define constants for reward calculation
D_cz = 1000  # Example value for critical zone threshold
D_mz = 500  # Example value for minimum zone threshold
d_max = np.linalg.norm(UAV_position - goal_position)  # Initial max distance

Initial Action Probabilities for the State: {0: 0.05555555555555555, 1: 0.05555555555555555, 2: 0.05555555555555555, 3: 0.05555555555555555, 4: 0.05555555555555555, 5: 0.05555555555555555, 6: 0.05555555555555555, 7: 0.05555555555555555, 8: 0.05555555555555555, 9: 0.05555555555555555, 10: 0.05555555555555555, 11: 0.05555555555555555, 12: 0.05555555555555555, 13: 0.05555555555555555, 14: 0.05555555555555555, 15: 0.05555555555555555, 16: 0.05555555555555555, 17: 0.05555555555555555}


In [46]:
# Initialize data storage
data = []
for state, actions in Q.items():
    if not isinstance(state, (list, tuple, np.ndarray)):
        raise TypeError(f"State must be a list, tuple, or numpy array, got {type(state)}")
    state = np.array(state)
    for action, q_value in actions.items():
        if not isinstance(action, (int, tuple)):
            raise TypeError(f"Action must be an int or tuple, got {type(action)}")
        if isinstance(action, int):
            action = (action,)  # Convert integer action into tuple
        data.append((state, np.array(action), q_value))
        
print(f"Data size: {len(data)} samples")

Data size: 450018 samples


In [49]:
# Prepare input and target arrays
X = []  # Input features
y = []  # Target Q-values

for state, action, q_value in data:
    state_action = np.concatenate((state, action))  # Combine state and action
    X.append(state_action)
    y.append(q_value)

X = np.array(X)
y = np.array(y)

print(f"Feature matrix X shape: {X.shape}")
print(f"Target vector y shape: {y.shape}")

Feature matrix X shape: (450018, 4)
Target vector y shape: (450018,)


In [51]:
# Convert data to tensors
X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.float32)

# Create DataLoader
dataset = data_utils.TensorDataset(X_tensor, y_tensor)
loader = data_utils.DataLoader(dataset, batch_size=64, shuffle=True)

print(f"DataLoader created with batch size: 64")

DataLoader created with batch size: 64


In [53]:
# Initialize network and optimizer
state_size = len(next(iter(Q.keys())))  # Assume all states have the same structure

first_action = next(iter(Q.values())).keys().__iter__().__next__()
if isinstance(first_action, int):  # Single integer action
    action_size = 1
elif isinstance(first_action, tuple):  # Multi-dimensional tuple action
    action_size = len(first_action)
else:
    raise TypeError(f"Unsupported action type: {type(first_action)}")

q_network = QNetworkWithCNN(state_size + action_size, 1)  # Network for Q(s,a)
optimizer = optim.Adam(q_network.parameters(), lr=0.001)
criterion = nn.MSELoss()

print(f"Initialized Q-network with input size {state_size + action_size}")

Initialized Q-network with input size 4


In [55]:
max_epochs = 10
min_loss_threshold = 8.8e-6
accumulation_steps = 4

# Training loop
print("Starting training...")
# Ensure PyTorch uses the correct device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using GPU: {torch.cuda.get_device_name(0)}")

# Move the model to the device
q_network = QNetworkWithCNN(state_size, action_size).to(device)

# Define the optimizer and loss function
optimizer = optim.Adam(q_network.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Training loop
for epoch in range(max_epochs):
    print(f"\nEpoch {epoch + 1}/{max_epochs}")
    q_network.train()  # Set the model to training mode (important for batch normalization)
    epoch_loss = 0.0  # Track loss for the epoch
    
    for batch_idx, (batch_x, batch_y) in enumerate(loader):
        # Move batch to the device
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)

        # Reshape input to match [batch_size, channels, sequence_length]
        batch_x = batch_x.unsqueeze(1)  # Add channel dimension

        # Forward pass
        predictions = q_network(batch_x)
        predictions = predictions.squeeze(-1)
        loss = criterion(predictions, batch_y) / accumulation_steps

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (batch_idx + 1) % accumulation_steps == 0 or (batch_idx + 1) == len(loader):
            optimizer.step()
            optimizer.zero_grad()
        
        # Accumulate batch loss
        epoch_loss += loss.item() * accumulation_steps

    # Calculate average epoch loss
    avg_loss = epoch_loss / len(loader)
    print(f"Epoch {epoch + 1} completed. Average Loss: {avg_loss}")

    # Check stopping condition
    if avg_loss < min_loss_threshold:
        print("Stopping early due to low loss threshold.")
        break

print("Training completed.")


Starting training...
Using GPU: NVIDIA GeForce RTX 4050 Laptop GPU

Epoch 1/10
Epoch 1 completed. Average Loss: 9.805987023458077e-05

Epoch 2/10
Epoch 2 completed. Average Loss: 1.013640275107724e-07
Stopping early due to low loss threshold.
Training completed.


In [59]:
q_network.eval()  # Switch to evaluation mode

# Evaluate the model on a small batch of data
sample_states = X_tensor[:5].to(device)  # Move to the same device as the model

# Ensure input has the correct shape
sample_states = sample_states.unsqueeze(1)  # Add channel dimension if required

# Run the forward pass
q_values = q_network(sample_states)
print(f"Q-values for sample states: {q_values}")

# Extract best actions (index of the maximum Q-value for each state)
best_actions = torch.argmax(q_values, dim=1)
print(f"Best actions: {best_actions}")
print(f"Min reward: {torch.min(y_tensor)}, Max reward: {torch.max(y_tensor)}")
print(f"Final training loss: {avg_loss}")

Q-values for sample states: tensor([[-0.0007],
        [-0.0007],
        [-0.0007],
        [-0.0007],
        [-0.0007]], device='cuda:0', grad_fn=<AddmmBackward0>)
Best actions: tensor([0, 0, 0, 0, 0], device='cuda:0')
Min reward: 0.0, Max reward: 0.0
Final training loss: 1.013640275107724e-07


In [1]:
action_mapping = actions 
terrain_z = 0
dt = float(0.1) 

while not reached_goal(UAV_position, goal_position) and steps < max_steps:

    # Step 1: Current state and Q-values
    current_state = UAV_position.copy()
    current_state_tensor = torch.tensor(current_state, dtype=torch.float32).to(device).unsqueeze(0).unsqueeze(1)
    q_values = q_network(current_state_tensor)
    best_action_idx = torch.argmax(q_values, dim=1).item() + 1
    best_action = actions.get(best_action_idx, None)
    
    if best_action is None:
        raise ValueError(f"Invalid action index: {best_action_idx}")

    print(f"Q-values: {q_values}")
    print(f"Selected action index: {best_action_idx}")
    print(f"Selected action: {best_action}")
    print(f"Updated UAV position: {UAV_position}")

    # Step 2: Compute distances
    d_goal = np.linalg.norm(UAV_position - goal_position)
    d_barrier = min([np.linalg.norm(UAV_position - obs) for obs in obstacle_positions])

    d_goal_t = np.linalg.norm(UAV_position - goal_position)
    d_barrier_t = min([np.linalg.norm(UAV_position - obs) for obs in obstacle_positions])

    # Step 3: Compute forces
    F_att = compute_attractive_force(UAV_position, goal_position)
    F_rep = compute_repulsive_force(UAV_position, obstacle_positions)
    F_rep_all = compute_repulsive_force_separate(UAV_position, obstacle_positions)

    # Add noise to encourage exploration
    force_noise = np.random.uniform(-0.1, 0.1, size=F_att.shape)
    total_force = F_att + F_rep + force_noise

    reward = calculate_reward(
    d_goal_t, d_barrier_t, d_max, D_cz, D_mz, 
    UAV_position, goal_position, total_force, obstacle_positions)

    rewards.append(reward)

    # Step 4: Terrain and no-fly zone forces
    terrain_z = float(get_terrain_elevation(UAV_position[0], UAV_position[1]))
    print(f"Terrain elevation: {terrain_z}, type: {type(terrain_z)}")
    if not isinstance(terrain_z, (int, float)):
        raise TypeError(f"Invalid type for terrain_z: {type(terrain_z)}. Expected int or float.")

    F_rep_terrain = np.array([0.0, 0.0, 0.0])
    if UAV_position[2] < terrain_z + safety_buffer:
        F_rep_terrain = np.array([0, 0, 1]) * (terrain_z + safety_buffer - UAV_position[2])

    # No-fly zone repulsion force
    F_rep_no_fly = np.array([0.0, 0.0, 0.0])
    if is_in_no_fly_zone(UAV_position, no_fly_zones):
        F_rep_no_fly = np.array([0, 0, 1]) * 100  # Strong upward repulsion
        reward -= 50  # Penalize no-fly zone entry
    
    F_total = F_att + F_rep + F_rep_terrain + F_rep_no_fly
    
    # Step 5: Update UAV position based on action
    UAV_position = UAV_position.astype(np.float64)
    acceleration = F_total / weight  # F = ma
    velocity += acceleration * dt
    UAV_position += velocity * dt

    # Ensure UAV doesn't collide with terrain
    if UAV_position[2] < terrain_z + safety_buffer:
        UAV_position[2] = terrain_z + safety_buffer  # Correct altitude
        reward -= 50  # Penalize collision with terrain

    # Step 6: Calculate reward
    reward = calculate_reward(
        d_goal, d_barrier, d_max, D_cz, D_mz, 
        UAV_position, goal_position, total_force, obstacle_positions
    )
    rewards.append(reward)

    # Step 7: Record variables for debugging/analysis
    trajectory.append(UAV_position.copy())
    F_rep_values.append(np.linalg.norm(F_rep))
    distance_to_first_obstacle.append(np.linalg.norm(UAV_position - obstacle_positions[0]))
    distance_to_second_obstacle.append(np.linalg.norm(UAV_position - obstacle_positions[1]))

    # Logging
    print(f"Step: {steps}, Position: {UAV_position}, Reward: {reward}")
    print(f"Force components - Attractive: {F_att}, Repulsive: {F_rep}, Terrain: {F_rep_terrain}, No-fly: {F_rep_no_fly}")

    UAV_position += velocity * dt  # Update UAV position
    X_t = UAV_position.copy()      # Update X_t to reflect new position

    
    steps += 1

NameError: name 'actions' is not defined

In [31]:
#torch.cuda.set_per_process_memory_fraction(0.5, 0)

# Define X and Y coordinates for the smoothed elevation grid
x_smooth = np.arange(smoothed_elevation.shape[1])
y_smooth = np.arange(smoothed_elevation.shape[0])
x_smooth, y_smooth = np.meshgrid(x_smooth, y_smooth)

trajectory = np.array(trajectory)
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection="3d")
# Add No-Fly Zones to the plot
for zone in no_fly_zones:
    x_range = [zone["x_min"], zone["x_max"], zone["x_max"], zone["x_min"], zone["x_min"]]
    y_range = [zone["y_min"], zone["y_min"], zone["y_max"], zone["y_max"], zone["y_min"]]
    z_range = [zone["z_min"]] * 5

    ax.plot(x_range, y_range, z_range, color='purple', label='No-Fly Zone', alpha=0.5)

    z_top_range = [zone["z_max"]] * 5
    ax.plot(x_range, y_range, z_top_range, color='purple', alpha=0.5)

    # Connect top and bottom corners
    for i in range(4):
        ax.plot(
            [x_range[i], x_range[i]],
            [y_range[i], y_range[i]],
            [zone["z_min"], zone["z_max"]],
            color='purple', alpha=0.5
        )
ax.plot_surface(x_smooth, y_smooth, smoothed_elevation,
    cmap='terrain', edgecolor='none', alpha=0.7, label='Terrain')
ax.plot(trajectory[:, 0], trajectory[:, 1], trajectory[:, 2],
    label="UAV Trajectory", color="blue")

# Plot the UAV's trajectory
ax.plot(
    trajectory[:, 0], trajectory[:, 1], trajectory[:, 2],
    label="UAV Trajectory", color="blue"
)

# Add gray 'x' markers every 50 steps
for i in range(0, len(trajectory), 50):
    ax.scatter(
        trajectory[i, 0], trajectory[i, 1], trajectory[i, 2],
        color='gray', s=50, marker='x', label='Step Marker' if i == 0 else None
    )


ax.scatter(uav_x, uav_y, uav_z, color='red', s=100, label='Start Point', marker='o')
ax.scatter(*goal_position, color="blue", s=100, label="Goal", marker='^')

cmap = colormaps.get_cmap("tab10")  # Retrieves the "tab10" colormap
for i, obs in enumerate(obstacle_positions):
    color = cmap(i)  # Get a unique color from the colormap
    ax.scatter(*obs, color=color, label=f"Obstacle {i+1}", s=100)

# Add labels and title
ax.set_xlabel("X Dimension")
ax.set_ylabel("Y Dimension")
ax.set_zlabel("Elevation (km)")
ax.set_title("UAV Trajectory Over Smoothed Terrain")
ax.legend()

# Set viewing angle
ax.view_init(elev=30, azim=80)

# Show the plot
plt.show()

# Plot Repulsive Forces for Each Obstacle
plt.figure(figsize=(10, 6))
for i, obs in enumerate(obstacle_positions):
    plt.plot(range(steps), F_rep_individual[i], label=f"Repulsive Force from Obstacle {i+1}")
plt.xlabel("Steps")
plt.ylabel("Repulsive Force Magnitude")
plt.title("Repulsive Forces from Each Obstacle")
plt.legend()
plt.grid()
plt.show()

# Plot Rewards Over Steps
plt.figure(figsize=(10, 6))
plt.plot(range(len(rewards)), rewards, label="Reward", color="purple")
plt.xlabel("Steps")
plt.ylabel("Reward")
plt.title("Reward vs Steps")
plt.legend()
plt.grid()
plt.show()

ValueError: x and y must have same first dimension, but have shapes (500,) and (0,)