In [None]:
!pip install osmnx


In [None]:
!pip install gym

In [None]:
!pip install tensorflow

In [None]:
!pip install keras

In [3]:
import gym
from gym import spaces
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import random
from collections import deque
import time
from typing import List, Dict, Any, Tuple, Optional

# Ensure TensorFlow doesn't consume all GPU memory
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)


In [4]:
def distance_between(graph, u, v):
    """Compute the length of the edge between u and v."""
    edge_data = graph.get_edge_data(u, v)
    if edge_data and 'length' in edge_data:
        return edge_data['length']
    return float('inf')

#GraphEnv

In [5]:

class GraphEnv(gym.Env):
    def __init__(self, graph: nx.Graph, max_steps: int = 100,
                 battery_start: float = 100.0, battery_max: float = 100.0,
                 efficiency: float = 1.0, charging_nodes=[]):
        super(GraphEnv, self).__init__()
        self.start_node = None
        self.goal_node = None
        self.graph = graph
        self.n_nodes = self.graph.number_of_nodes()
        self.max_degree = max(dict(graph.degree()).values())
        self.max_steps = max_steps
        self.battery_start = battery_start
        self.battery_max = battery_max
        self.efficiency = efficiency
        self.step_count = 0

        # Charging nodes (binary array)
        self.charging_nodes = np.zeros(self.n_nodes, dtype=np.int8)
        for node in charging_nodes:
            if 0 <= node < self.n_nodes:
                self.charging_nodes[node] = 1

        self.max_visited = self.n_nodes

        # Observation space
        self.observation_space = spaces.Dict({
            "current_node": spaces.Discrete(self.n_nodes),
            "goal_node": spaces.Discrete(self.n_nodes),
            "battery_level": spaces.Box(low=0.0, high=self.battery_max, shape=(), dtype=np.float32),
            "neighbor_ids": spaces.MultiDiscrete([self.n_nodes] * self.max_degree),
            "neighbor_distances": spaces.Box(low=0.0, high=np.inf, shape=(self.max_degree,), dtype=np.float32),
            "neighbor_is_charging": spaces.MultiBinary(self.max_degree),
            "visited_nodes": spaces.MultiBinary(self.n_nodes),
            "neighbor_mask": spaces.MultiBinary(self.max_degree)
        })

        self.action_space = spaces.Discrete(self.max_degree)

        self.reset()

        # For visualization
        self.node_positions = None

    def _get_reachable_neighbors(self, node):
        neighbors = list(self.graph.neighbors(node))
        neighbor_ids = [self.current_node] * self.max_degree
        neighbor_dists = [0.0] * self.max_degree
        neighbor_mask = [0] * self.max_degree
        neighbor_is_charging = [0] * self.max_degree

        # Debugging prints
        #print(f"Current node: {node}, Battery: {self.battery_level}")
        #print(f"Neighbors: {neighbors}")

        valid_idx = 0
        for neighbor in neighbors:
            dist = distance_between(self.graph, self.current_node, neighbor)
            consumption = dist * self.efficiency
            #print(f"  Neighbor: {neighbor}, Distance: {dist}, Consumption: {consumption}, Reachable: {self.battery_level - consumption >= 0}")
            if self.battery_level - dist * self.efficiency >= 0:
                neighbor_ids[valid_idx] = neighbor
                neighbor_dists[valid_idx] = dist
                neighbor_mask[valid_idx] = 1
                neighbor_is_charging[valid_idx] = self.charging_nodes[neighbor]
                valid_idx += 1

        #print(f"Valid neighbors: {[neighbor_ids[i] for i in range(valid_idx)]}")
        return neighbor_ids, neighbor_dists, neighbor_mask, neighbor_is_charging

    def _get_obs(self):
        neighbor_ids, neighbor_dists, neighbor_mask, neighbor_is_charging = self._get_reachable_neighbors(self.current_node)
        return {
            "current_node": self.current_node,
            "goal_node": self.goal_node,
            "battery_level": np.array(self.battery_level, dtype=np.float32),
            "neighbor_ids": np.array(neighbor_ids, dtype=np.int32),
            "neighbor_distances": np.array(neighbor_dists, dtype=np.float32),
            "neighbor_is_charging": np.array(neighbor_is_charging, dtype=np.int8),
            "visited_nodes": self.visited_nodes,
            "neighbor_mask": np.array(neighbor_mask, dtype=np.int8)
        }

    def _visit_current_node(self):
        self.visited_nodes[self.current_node] = 1
        self.step_count += 1

    def reset(self, start_node=None, goal_node=None):
        if start_node is not None:
            self.start_node = start_node
        if goal_node is not None:
            self.goal_node = goal_node

        # Ensure start node has valid actions
        max_attempts = 10
        for _ in range(max_attempts):
            self.current_node = self.start_node if self.start_node is not None else np.random.randint(self.n_nodes)
            self.goal_node = self.goal_node if self.goal_node is not None else np.random.randint(self.n_nodes)
            while self.goal_node == self.current_node:
                self.goal_node = np.random.randint(self.n_nodes)

            self.step_count = 0
            self.battery_level = self.battery_start
            self.visited_nodes = np.zeros(self.n_nodes, dtype=np.int8)
            self._visit_current_node()

            # Check if there are valid actions
            neighbor_ids, neighbor_dists, neighbor_mask, _ = self._get_reachable_neighbors(self.current_node)
            valid_actions = [i for i, mask in enumerate(neighbor_mask) if mask == 1]
            if valid_actions:
                break
        else:
            print("Warning: Could not find a start node with valid actions after max attempts")
            self.current_node = self.start_node if self.start_node is not None else np.random.randint(self.n_nodes)
            self.goal_node = self.goal_node if self.goal_node is not None else np.random.randint(self.n_nodes)
            while self.goal_node == self.current_node:
                self.goal_node = np.random.randint(self.n_nodes)
            self.step_count = 0
            self.battery_level = self.battery_start
            self.visited_nodes = np.zeros(self.n_nodes, dtype=np.int8)
            self._visit_current_node()

        return self._get_obs()

    def step(self, action):
        obs = self._get_obs()

        neighbor_ids = obs["neighbor_ids"]
        neighbor_dists = obs["neighbor_distances"]
        neighbor_mask = obs["neighbor_mask"]
        neighbor_is_charging = obs["neighbor_is_charging"]

        done = False
        info = {}
        reward = 0

        # Validate the action
        if (
            action < 0 or
            action >= self.max_degree or
            neighbor_mask[action] == 0 or
            neighbor_ids[action] == self.current_node ):
            reward = -1000
            info["reason"] = "invalid_action"
            return obs, reward, done, info

        next_node = neighbor_ids[action]
        dist = neighbor_dists[action]

        if ((self.battery_level - dist * self.efficiency < 0) or
            ((self.charging_nodes[neighbor_ids[action]] == 0 or
             neighbor_ids[action] != self.goal_node ) and
            (self.battery_level ==  dist * self.efficiency))) :
            reward = -10000
            info["reason"] = "battery_insufficient_for_move"
            return obs, reward, done, info

        # Battery cost
        self.battery_level -= dist * self.efficiency

        # Move to next node
        self.current_node = next_node

        # Base reward (movement cost)
        reward = -60

        # Penalize if revisiting a node
        if self.visited_nodes[self.current_node] == 1:
            reward -= 100
            info["revisit_penalty"] = True

        # Recharge if on a charging node
        if self.charging_nodes[self.current_node]:
            self.battery_level = self.battery_max
            reward += 10

        self._visit_current_node()

        # Check terminal conditions
        if self.current_node == self.goal_node:
            reward += 1000
            done = True
            info["reason"] = "goal_reached"

        elif self.step_count >= self.max_steps:
            reward -= 60
            done = True
            info["reason"] = "max_steps_reached"

        elif self.battery_level <= 0:
            reward -= 1000
            done = False
            info["reason"] = "battery_depleted"

        new_obs = self._get_obs()
        if self.step_count == self.max_steps:
            done = True
        return new_obs, reward, done, info
    def render(self, mode: str = 'human') -> None:
        if self.node_positions is None:
            try:
                self.node_positions = {node: (data['x'], data['y'])
                                     for node, data in self.graph.nodes(data=True)}
            except KeyError:
                self.node_positions = nx.spring_layout(self.graph)

        plt.figure(figsize=(12, 10))
        nx.draw_networkx_edges(self.graph, self.node_positions, alpha=0.3, edge_color='gray')
        charging_nodes_list = [node for node in range(len(self.charging_nodes)) if self.charging_nodes[node] == 1]
        nx.draw_networkx_nodes(self.graph, self.node_positions, nodelist=charging_nodes_list,
                              node_color='green', node_size=100, label='Charging Station')
        current_node = self.current_node
        nx.draw_networkx_nodes(self.graph, self.node_positions, nodelist=[current_node],
                              node_color='blue', node_size=200, label='Current Position')
        goal_node = self.goal_node
        nx.draw_networkx_nodes(self.graph, self.node_positions, nodelist=[goal_node],
                              node_color='red', node_size=200, label='Goal')
        visited_nodes = [node for node in range(len(self.visited_nodes))
                        if self.visited_nodes[node] == 1 and node != current_node]
        nx.draw_networkx_nodes(self.graph, self.node_positions, nodelist=visited_nodes,
                              node_color='lightblue', node_size=50, label='Visited')
        plt.title(f"Step: {self.step_count}, Battery: {self.battery_level:.1f}")
        plt.legend()
        plt.tight_layout()
        plt.show()


# DQANAgent

In [6]:
class DQNAgent:
    def __init__(self, state_size, action_size, graph_size, max_battery):
        self.state_size = state_size
        self.action_size = action_size
        self.graph_size = graph_size
        self.max_battery = max_battery
        self.gamma = 0.9
        self.epsilon = 1.0
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.995
        self.learning_rate = 0.01
        self.update_target_freq = 100
        self.batch_size = 32
        self.memory = deque(maxlen=2000)
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()
        self.train_step_counter = 0

    def _build_model(self):
        current_node_input = keras.Input(shape=(1,), name='current_node')
        goal_node_input = keras.Input(shape=(1,), name='goal_node')
        battery_input = keras.Input(shape=(1,), name='battery_level')
        visited_nodes_input = keras.Input(shape=(self.graph_size,), name='visited_nodes')
        embedding_dim = min(64, self.graph_size // 4)
        current_embedding = layers.Embedding(self.graph_size, embedding_dim)(current_node_input)
        goal_embedding = layers.Embedding(self.graph_size, embedding_dim)(goal_node_input)
        current_flat = layers.Flatten()(current_embedding)
        goal_flat = layers.Flatten()(goal_embedding)
        battery_normalized = layers.Lambda(lambda x: x / self.max_battery)(battery_input)
        visited_expanded = layers.Reshape((self.graph_size, 1))(visited_nodes_input)
        visited_features = layers.Conv1D(16, 3, padding='same', activation='relu')(visited_expanded)
        visited_features = layers.MaxPooling1D(2)(visited_features)
        visited_features = layers.Conv1D(32, 3, padding='same', activation='relu')(visited_features)
        visited_features = layers.GlobalAveragePooling1D()(visited_features)
        combined = layers.Concatenate()([current_flat, goal_flat, battery_normalized, visited_features])
        x = layers.Dense(256, activation='relu')(combined)
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dense(64, activation='relu')(x)
        output = layers.Dense(self.action_size, activation='linear')(x)
        model = keras.Model(inputs=[current_node_input, goal_node_input, battery_input, visited_nodes_input], outputs=output)
        model.compile(optimizer=keras.optimizers.Adam(learning_rate=self.learning_rate), loss='mse')
        return model

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def memorize(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state, valid_actions):
        if len(valid_actions) == 0:
            return -1
        if np.random.rand() <= self.epsilon:
            return np.random.choice(valid_actions)
        q_values = self.predict(state)
        mask = np.ones(self.action_size) * -np.inf
        mask[valid_actions] = 0
        q_values = q_values + mask
        return np.argmax(q_values)

    def predict(self, state):
        current_node = np.array([[state['current_node']]])
        goal_node = np.array([[state['goal_node']]])
        battery_level = np.array([[state['battery_level']]])
        visited_nodes = np.array([state['visited_nodes']])
        return self.model.predict([current_node, goal_node, battery_level, visited_nodes], verbose=0)[0]

    def replay(self, batch_size=None):
        if batch_size is None:
            batch_size = self.batch_size
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        current_states_current_node = []
        current_states_goal_node = []
        current_states_battery = []
        current_states_visited = []
        next_states_current_node = []
        next_states_goal_node = []
        next_states_battery = []
        next_states_visited = []
        actions = []
        rewards = []
        dones = []
        for state, action, reward, next_state, done in minibatch:
            current_states_current_node.append(state['current_node'])
            current_states_goal_node.append(state['goal_node'])
            current_states_battery.append(state['battery_level'])
            current_states_visited.append(state['visited_nodes'])
            next_states_current_node.append(next_state['current_node'])
            next_states_goal_node.append(next_state['goal_node'])
            next_states_battery.append(next_state['battery_level'])
            next_states_visited.append(next_state['visited_nodes'])
            actions.append(action)
            rewards.append(reward)
            dones.append(done)
        current_states_current_node = np.array(current_states_current_node)
        current_states_goal_node = np.array(current_states_goal_node)
        current_states_battery = np.array(current_states_battery)
        current_states_visited = np.array(current_states_visited)
        next_states_current_node = np.array(next_states_current_node)
        next_states_goal_node = np.array(next_states_goal_node)
        next_states_battery = np.array(next_states_battery)
        next_states_visited = np.array(next_states_visited)
        actions = np.array(actions)
        rewards = np.array(rewards)
        dones = np.array(dones)
        current_q_values = self.model.predict([
            current_states_current_node.reshape(-1, 1),
            current_states_goal_node.reshape(-1, 1),
            current_states_battery.reshape(-1, 1),
            current_states_visited
        ], verbose=0)
        next_q_values = self.target_model.predict([
            next_states_current_node.reshape(-1, 1),
            next_states_goal_node.reshape(-1, 1),
            next_states_battery.reshape(-1, 1),
            next_states_visited
        ], verbose=0)
        for i in range(batch_size):
            if dones[i]:
                current_q_values[i, actions[i]] = rewards[i]
            else:
                current_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])
        self.model.fit([
            current_states_current_node.reshape(-1, 1),
            current_states_goal_node.reshape(-1, 1),
            current_states_battery.reshape(-1, 1),
            current_states_visited
        ], current_q_values, epochs=1, verbose=0)
        self.train_step_counter += 1
        if self.train_step_counter % self.update_target_freq == 0:
            self.update_target_model()
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


# Trainning Loop

In [7]:
def create_test_graph():
    G = nx.Graph()

    # Step 1: Add 100 nodes
    for i in range(100):
        G.add_node(i)

    # Step 2: Add random edges (each node connects to a few others)
    for i in range(100):
        connections = random.sample(range(100), k=random.randint(2, 5))  # Connect to 2–5 random nodes
        for j in connections:
            if i != j and not G.has_edge(i, j):  # Avoid loops and duplicates
                length = round(random.uniform(5.0, 20.0), 1)  # Random length between 5 and 20
                G.add_edge(i, j, length=length)

    # Debug: Print sample edges
    print("Sample of graph edges and attributes:")
    for i, (u, v, data) in enumerate(G.edges(data=True)):
        print(f"Edge ({u}, {v}): {data}")
        if i > 20:  # Just print a few for readability
            break

    # Step 3: Choose 10 random charging stations
    charging_nodes = random.sample(range(100), 10)

    return G, charging_nodes
def run_dqn_episodes(num_episodes=100, render_every=2):
    G, charging_nodes = create_test_graph()
    env = GraphEnv(
        graph=G,
        charging_nodes=charging_nodes,
        battery_max=100.0,
        battery_start=50.0,
        efficiency=0.1,
        max_steps=100
    )
    print(f"Graph has {G.number_of_nodes()} nodes and {G.number_of_edges()} edges")
    print(f"Number of charging stations: {len(charging_nodes)}")
    print(f"Max degree: {env.max_degree}")
    state_size = 2 + 1 + G.number_of_nodes()
    action_size = env.max_degree
    agent = DQNAgent(
        state_size=state_size,
        action_size=action_size,
        graph_size=G.number_of_nodes(),
        max_battery=env.battery_max
    )
    episode_rewards = []
    episode_lengths = []
    goal_reached = []
    for episode in range(num_episodes):
        print(f"\nEpisode {episode+1}/{num_episodes}")
        state = env.reset()
        print(f"Start node: {state['current_node']}, Goal node: {state['goal_node']}")
        total_reward = 0
        steps = 0
        done = False
        info = {}
        while not done:
            neighbor_ids = state['neighbor_ids']
            neighbor_mask = state['neighbor_mask']
            valid_actions = [i for i, mask in enumerate(neighbor_mask) if mask == 1 and neighbor_ids[i] != state['current_node']]
            if not valid_actions:
                print("No valid actions available. Terminating episode.")
                info = {"reason": "no_valid_actions"}
                break
            action = agent.act(state, valid_actions)
            next_state, reward, done, info = env.step(action)
            agent.memorize(state, action, reward, next_state, done)
            agent.replay()
            state = next_state
            total_reward += reward
            steps += 1
            if steps % 5 == 0:
                print(f"Step {steps}: Current node: {state['current_node']}, Battery: {state['battery_level']:.1f}, Reward: {total_reward}")
            if episode % render_every == 0 and steps % 10 == 0:
                env.render()
            if done or steps >= env.max_steps:
                break
        episode_rewards.append(total_reward)
        episode_lengths.append(steps)
        goal_reached.append(True if info.get('reason') == 'goal_reached' else False)
        print(f"Episode {episode+1} completed:")
        print(f"  Steps: {steps}")
        print(f"  Total reward: {total_reward}")
        print(f"  Reached goal: {'Yes' if goal_reached[-1] else 'No'}")
        if 'reason' in info:
            print(f"  Reason: {info['reason']}")
    print("\nTraining Summary:")
    print(f"Episodes run: {num_episodes}")
    print(f"Goal reached: {sum(goal_reached)}/{num_episodes} episodes")
    print(f"Average reward: {np.mean(episode_rewards):.2f}")
    print(f"Average episode length: {np.mean(episode_lengths):.2f} steps")
    plt.figure(figsize=(15, 5))
    plt.subplot(1, 2, 1)
    plt.plot(episode_rewards)
    plt.title('Episode Rewards')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.subplot(1, 2, 2)
    plt.plot(episode_lengths)
    plt.title('Episode Lengths')
    plt.xlabel('Episode')
    plt.ylabel('Steps')
    plt.tight_layout()
    plt.show()
    return agent, env




In [None]:
if __name__ == "__main__":
    run_dqn_episodes(num_episodes=50, render_every=2)
