In [1]:
import numpy as np

import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GATConv
from torch_geometric.data import Data


In [9]:
import torch
import torch.nn.functional as F

# Let's say our traffic_action is 2 (representing the third option: 
# "allow vehicular traffic through North-East and South-West direction")
traffic_action = torch.tensor([1])

# We have 4 possible traffic light states, so num_classes is 2
one_hot_action = F.one_hot(traffic_action, num_classes=2)

print(one_hot_action)
# Output: tensor([[0, 0, 1, 0]])

# If we remove the extra dimension:
print(one_hot_action.squeeze())
# Output: tensor([0, 0, 1, 0])


tensor([[0, 1]])
tensor([0, 1])


### Task: Propose variable number of tuples using GAT in a RL setup

In [2]:
# https://github.com/pyg-team/pytorch_geometric/blob/master/examples/gat.py
# In the original model, expected input: 
# 1. x: Node feature matrix with shape [num_nodes, in_channels]
    # num_nodes: number of nodes in the graph
    # in_channels: number of node features

# 2. edge_index: Edge indices with shape [2, num_edges]
    # num_edges: number of edges in the graph
    # example: edge_index = torch.tensor([[0, 1, 2, 3], [1, 2, 3, 0]], dtype=torch.long)
    # edge exists from node 0 to node 1, 1 to 2, 2 to 3, and 3 to 0

# Expected output:
# 1. x: Updated node feature matrix with shape [num_nodes, out_channels]
    # num_nodes: number of nodes in the graph
    # out_channels: number of output features

# 2. edge_index: Updated edge indices with shape [2, num_edges]
    # num_edges: number of edges in the graph

# So what the GAT model is "learning to create new, more informative representations of each node based on its own features and the features of its neighbors"
# Learn better representations based on the graph structure.
# The core idea about GATs is to update node representations by aggregating information from their neighbors.
# Capture local graph structure and node features.
# The attention in GAT refers to the model's ability to weigh the importance of different neighbors differently. Not all neighbors are equally important.

# With 2 GAT layers, the model can incorporate information from node's 2-hop neighborhood. 

class GAT(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, heads):
        """
        heads: number of attention heads

        """
        super().__init__()

        self.conv1 = GATConv(in_channels, hidden_channels, heads, dropout=0.6)
        # On the Pubmed dataset, use `heads` output heads in `conv2`.

        # output layer has one head.
        self.conv2 = GATConv(hidden_channels * heads, out_channels, heads=1,
                             concat=False, dropout=0.6)

    def forward(self, x, edge_index):

        x = F.dropout(x, p=0.6, training=self.training)
        x = F.elu(self.conv1(x, edge_index))

        x = F.dropout(x, p=0.6, training=self.training)
        x = self.conv2(x, edge_index)

        return x

In [3]:
# Modified GAT for the RL setup.
# Based on the input state, propose a variable number of tuples.

# Input: 
# 1. Node feature matrix with shape [num_nodes, in_channels]
    # num_nodes: number of nodes in the graph
    # in_channels: number of node features

# 2. Edge indices with shape [2, num_edges]
    # num_edges: number of edges in the graph

# Output:
# 1. N Tuples of (locations, thickness)

### Setup below: 

- Model structure: 
    - 2 GAT layers
    - SeparateLinear layers for predicting location parameters, thickness parameters, and number of proposals

- Proposal generation:
    - Core to the policy, generate stochastic proposals. 



Using a GAT allows us to naturally take variable sized input.
However, we are still operating under the purview of RL, which asks us to have fixed sized input. 

To resolve that we can use (this is a common practice):
- a maximum number of proposals to define the action space and pad then the ones that are not proposed.
- a maximum number of nodes, edges to define the state space and pad accordingly.

Make sure that the PPO algorithm is aware of the masking so that it does not try to learn from or

Also this uses GAT v2


In [147]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GATv2Conv

class EdgeFeatureGATConv(GATv2Conv):
    """
    Apply Graph Attention to the input graph with edge features.
    By default supports edge features.
    """

    def __init__(self, in_channels, out_channels, edge_dim, heads=1, **kwargs):
        super().__init__(in_channels, out_channels, heads=heads, edge_dim=edge_dim, **kwargs)

    def forward(self, x, edge_index, edge_attr):
        return super().forward(x, edge_index, edge_attr=edge_attr)

class GATCrosswalkPolicy(torch.nn.Module):
    def __init__(self, in_channels, edge_dim, hidden_channels, heads, max_proposals=10, min_thickness=0.1, max_thickness=10.0):
        super().__init__()
        
        # First Graph Attention layer with edge features
        self.conv1 = EdgeFeatureGATConv(in_channels, hidden_channels, edge_dim=edge_dim, heads=heads, dropout=0.6)
        
        # Second Graph Attention layer with edge features
        self.conv2 = EdgeFeatureGATConv(hidden_channels * heads, hidden_channels, edge_dim=edge_dim, heads=1, concat=False, dropout=0.6)
        
        # Linear layer for predicting location parameters (mean and log std)
        self.location_layer = torch.nn.Linear(hidden_channels, 2)
        
        # Linear layer for predicting thickness parameters (mean and log std)
        self.thickness_layer = torch.nn.Linear(hidden_channels, 2)
        
        # Linear layer for predicting the number of proposals
        self.num_proposals_layer = torch.nn.Linear(hidden_channels, max_proposals)
        
        # Store initialization parameters as instance variables
        self.max_proposals = max_proposals
        self.min_thickness = min_thickness
        self.max_thickness = max_thickness

    def forward(self, x, edge_index, edge_attr):
        # Apply dropout to input features for regularization
        x = F.dropout(x, p=0.6, training=self.training)
        
        # Apply first GAT layer with edge features and ELU activation
        x = F.elu(self.conv1(x, edge_index, edge_attr))
        
        # Apply dropout to hidden representations
        x = F.dropout(x, p=0.6, training=self.training)
        
        # Apply second GAT layer with edge features (no activation here as it's the final layer)
        x = self.conv2(x, edge_index, edge_attr)
        
        return x  # Return the final node embeddings

    def propose_crosswalks(self, x, edge_index, edge_attr):
        # Get node embeddings by passing input through GAT layers
        node_embeddings = self(x, edge_index, edge_attr)
        
        # Predict the number of proposals
        num_proposals_logits = self.num_proposals_layer(node_embeddings.mean(dim=0))
        num_proposals_probs = F.softmax(num_proposals_logits, dim=0)  # Convert to probabilities
        # Sample the number of proposals (add 1 to ensure at least 1 proposal)
        num_actual_proposals = torch.multinomial(num_proposals_probs, 1).item() + 1
        
        # Predict location parameters
        location_params = self.location_layer(node_embeddings)
        location_means = torch.sigmoid(location_params[:, 0])  # Ensure means are between 0 and 1
        location_log_stds = location_params[:, 1].clamp(-20, 2)  # Clamp log stds for numerical stability
        location_stds = torch.exp(location_log_stds)  # Convert log stds to stds
        
        # Sample locations from normal distribution
        locations = torch.normal(location_means, location_stds)
        locations = torch.clamp(locations, 0, 1)  # Ensure sampled locations are between 0 and 1
        
        # Predict thickness parameters
        thickness_params = self.thickness_layer(node_embeddings)
        # Scale thickness means to be between min_thickness and max_thickness
        thickness_means = torch.sigmoid(thickness_params[:, 0]) * (self.max_thickness - self.min_thickness) + self.min_thickness
        thickness_log_stds = thickness_params[:, 1].clamp(-20, 2)  # Clamp log stds for numerical stability
        thickness_stds = torch.exp(thickness_log_stds)  # Convert log stds to stds
        
        # Sample thicknesses from normal distribution
        thicknesses = torch.normal(thickness_means, thickness_stds)
        # Ensure sampled thicknesses are between min_thickness and max_thickness
        thicknesses = torch.clamp(thicknesses, self.min_thickness, self.max_thickness)
        
        # Create fixed-size output with padding
        output = torch.full((self.max_proposals, 2), -1.0)
        indices = torch.randperm(x.size(0))[:num_actual_proposals]
        output[:num_actual_proposals, 0] = locations[indices]
        output[:num_actual_proposals, 1] = thicknesses[indices]
        
        return output, num_actual_proposals  # Return the padded proposals and the number of actual proposals

# Example usage
in_channels = 2  # Number of input features per node (e.g., x and y coordinates)
edge_dim = 2  # Number of features per edge
hidden_channels = 16  # Number of hidden units in GAT layers
heads = 4  # Number of attention heads in first GAT layer

num_nodes = 10  # Maximum number of nodes in the graph (potential crosswalk locations)
num_edges = 20  # Maximum number of edges in the graph

# Initialize the model with specified parameters
model = GATCrosswalkPolicy(in_channels, edge_dim, hidden_channels, heads, max_proposals=10, min_thickness=0.1, max_thickness=10.0)

# Create dummy input data
x = torch.rand((num_nodes, in_channels))  # Random node features
edge_index = torch.randint(0, num_nodes, (2, num_edges))  # Random edges connecting nodes
edge_attr = torch.rand((num_edges, edge_dim))  # Random edge features


print(f"Number of nodes: {num_nodes}, Number of edges: {num_edges}")
print(f"Number of node features: {in_channels}, Number of edge features: {edge_dim}")

print("Input data: ")
print(f"Node features: {x}")
print(f"Edge indices: {edge_index}")
print(f"Edge features: {edge_attr}")

Number of nodes: 10, Number of edges: 20
Number of node features: 2, Number of edge features: 2
Input data: 
Node features: tensor([[0.8292, 0.2036],
        [0.7067, 0.1631],
        [0.3717, 0.7307],
        [0.8826, 0.5376],
        [0.3398, 0.3567],
        [0.7639, 0.0156],
        [0.2888, 0.3420],
        [0.1353, 0.2934],
        [0.7088, 0.6415],
        [0.0506, 0.2979]])
Edge indices: tensor([[3, 3, 8, 9, 8, 7, 8, 3, 3, 7, 7, 0, 5, 0, 2, 0, 6, 1, 8, 8],
        [5, 4, 0, 4, 3, 1, 7, 5, 7, 3, 2, 6, 8, 2, 7, 9, 8, 5, 0, 9]])
Edge features: tensor([[0.7337, 0.3222],
        [0.1443, 0.4561],
        [0.4512, 0.8670],
        [0.4517, 0.9976],
        [0.1814, 0.4896],
        [0.5131, 0.4759],
        [0.5964, 0.9999],
        [0.1735, 0.9213],
        [0.7353, 0.6507],
        [0.1911, 0.3615],
        [0.4897, 0.4718],
        [0.9423, 0.9385],
        [0.5673, 0.5662],
        [0.1150, 0.3849],
        [0.4931, 0.0195],
        [0.3387, 0.3835],
        [0.2155, 0.4800],
   

In [None]:

# Generate crosswalk proposals using the model
proposed_crosswalks, num_actual_proposals = model.propose_crosswalks(x, edge_index, edge_attr)

# Print the proposed crosswalks
print("Proposed crosswalks:")
for i, (location, thickness) in enumerate(proposed_crosswalks):
    if i < num_actual_proposals:
        print(f"Location: {location:.4f}, Thickness: {thickness:.2f}")
    else:
        print(f"Padded proposal: {location:.4f}, {thickness:.2f}")
print(f"Number of actual proposals: {num_actual_proposals}")
print(f"Total number of proposals (including padding): {len(proposed_crosswalks)}")