In [1]:
import logging
import math 
import random
SCALAR = 2 / (2 * math.sqrt(2.0))
EXPAND_NODE = 0


In [2]:
class Node:
    def __init__(self, state, parent=None):
        # Initialize a Node with the given state and optional parent.
        self.visits = 1
        self.reward = 0.0
        self.state = state
        self.children = []  # List to store child nodes
        self.parent = parent  # Reference to the parent node

    def add_child(self, child_state):
        # Create a new child node with the given child_state and add it to the children list.
        child = Node(child_state, self)
        self.children.append(child)

    def update(self, reward):
        # Update the total reward and visit count of the node.
        self.reward += reward
        self.visits += 1

    def fully_expanded(self):
        # Check if the node is fully expanded, i.e., it has as many children as possible moves.
        if len(self.children) == self.state.num_moves:
            return True
        return False

    def __repr__(self):
        # Return a string representation of the node, including state, children count, visits, total reward,
        # and the exploit value (reward per visit).
        s = "Node: %s\n\tChildren: %d; visits: %d; reward: %f, exploit: %f" % (
            self.state,
            len(self.children),
            self.visits,
            self.reward,
            self.reward / self.visits,
        )
        return s


In [3]:
def uct_search(budget, root):
    # UCT (Upper Confidence Bound for Trees) search algorithm.
    for iteration in range(int(budget)):
        if iteration % 100 == 0:
            logging.debug("simulation: %d" % iteration)
            logging.debug(root)
        front = tree_policy(root)
        reward = default_policy(front.state)  # Perform default policy simulation
        backpropagation(front, reward)
        try:
            if best_child(root, 0).visits / budget > 0.5:
                # Break if a certain condition is met (e.g., a threshold on child visits).
                break
        except:
            logging.debug("No best child in iteration: %d" % iteration)
    return best_child(root, 0)


def default_policy(state):
    # Default policy for simulation: iteratively advance the state until a terminal state is reached.
    itr = 100
    while not state.terminal() and itr > 0:
        state = state.next_state()
        itr -= 1
    return state.reward()


def tree_policy(node):
    # Tree policy for node selection during the search.
    # A hack to force 'exploitation' in a game where there are many options,
    # and you may never/not want to fully expand first.
    while node and not node.state.terminal():
        if len(node.children) == 0:
            return expand(node)
        elif random.uniform(0, 1) < 0.6:
            node = best_child(node, SCALAR)
        else:
            if not node.fully_expanded():
                return expand(node)
            else:
                node = best_child(node, SCALAR)
    return node


def expand(node):
    # Expand the given node by adding a new child node with the next state.
    new_state = node.state.next_state()
    node.add_child(new_state)
    global EXPAND_NODE
    EXPAND_NODE += 1
    return node.children[-1]


def best_child(node, scalar):
    # Select the child node with the best UCT score.
    best_score = 0.0
    best_children = []
    for child in node.children:
        exploit = child.reward / child.visits
        explore = math.sqrt(2.0 * math.log(node.visits) / float(child.visits))
        score = exploit + scalar * explore
        if score == best_score:
            best_children.append(child)
        if score > best_score:
            best_children = [child]
            best_score = score
    if len(best_children) == 0:
        logging.warning("OOPS: no best child found, probably fatal")
        return None
    return random.choice(best_children)


def backpropagation(node, reward):
    # Backpropagate the reward information up the tree.
    while node is not None:
        node.visits += 1
        node.reward += reward
        node = node.parent
    return


In [4]:
# A* 1/sqrt(depth) + B* traffic_mean

import csv, random
from copy import deepcopy
import numpy as np

class RoadNetwork:

    def __init__(self, file_path, file_path_2):
        # Initialize the RoadNetwork with data from two CSV files.
        # The first file contains information about the road network.
        with open(file_path, 'r') as file:
            csv_reader = csv.reader(file)
            first_line = next(csv_reader)
            max_nodes = int(first_line[0])
            # Initialize the network with maximum nodes and set initial values to infinity.
            self.network = (np.full((max_nodes, max_nodes, 2), float('inf'))).tolist()

            # Populate the network with edge weights from the CSV file.
            for row in csv_reader:
                self.network[int(row[0])][int(row[1])][0] = float(row[2])
                self.network[int(row[0])][int(row[1])][1] = 0  # Traffic initially set to 0
                
                self.network[int(row[1])][int(row[0])][0] = float(row[2])
                self.network[int(row[1])][int(row[0])][1] = 0

            # Set diagonal elements to 0 (distance from a node to itself).
            for i in range(max_nodes):
                self.network[i][i][0] = 0
            self.total_vertices = max_nodes

        # The second file contains information about drivers and their source-destination pairs.
        with open(file_path_2, 'r') as file:
            csv_reader = csv.reader(file)
            first_line = next(csv_reader)
            self.total_drivers = int(first_line[0])
            self.driver_src_dst = []
            for row in csv_reader:
                self.driver_src_dst.append([int(row[0]), int(row[1])])

        # Create a distance matrix from the network for shortest path calculations.
        self.dist = []
        for row in self.network:
            ls = []
            for val in row:
                ls.append(val[0])
            self.dist.append(ls)

        # Iterate through all vertices and find the shortest path using Floyd-Warshall algorithm.
        for k in range(self.total_vertices):
            for i in range(self.total_vertices):
                for j in range(self.total_vertices):
                    self.dist[i][j] = min(self.dist[i][j], self.dist[i][k] + self.dist[k][j])

# Example usage:
# road_network = RoadNetwork('network.csv', 'drivers.csv')
# # Access attributes like road_network.total_vertices, road_network.total_drivers, etc.


In [5]:
class State:
    
    def __init__(self, oldstate, net):
        # Initialize a State object.
        self.A = 34
        self.B = 122
        self.net = net
        self.network = deepcopy(self.net.network)
        self.oldstate = oldstate
        self.num_moves = 10

        if oldstate is None:
            # Initialize state for the first iteration.
            self.cur_locs = [driver_info[0] for driver_info in self.net.driver_src_dst] 
            self.depth = 0
            mask = ~np.isinf(np.array(self.network)[:,:,1])
            self.traffic_qual = 1 / (np.var(np.array(self.network)[:,:,1][mask]) + 1)
        else:
            # Update state based on the previous state.
            self.cur_locs = oldstate.cur_locs[:]
            self.depth = oldstate.depth + 1
            mask = ~np.isinf(np.array(self.network)[:,:,1])
            self.traffic_qual = oldstate.traffic_qual + 1 / (np.var(np.array(self.network)[:,:,1][mask]) + 1)
        self.best_poss = []

    def build(self):
        # Build the best possible moves for each driver based on neighbors.
        for i in range(self.net.total_drivers):
            src = self.cur_locs[i]
            src_neigh = []
            for j in range(len(self.net.network[src])):
                if self.net.network[src][j][0] != float('inf'):
                    src_neigh.append([self.net.dist[j][self.net.driver_src_dst[i][1]], j])
            src_neigh = sorted(src_neigh, key=lambda x: x[0])

            if len(src_neigh) == 2:
                src_neigh.append(src_neigh[0])
            elif len(src_neigh) == 1:
                src_neigh.append(src_neigh[0])
                src_neigh.append(src_neigh[0])
            src_neigh = src_neigh[:3]
            src_neigh = [row[1] for row in src_neigh]

            # Randomly select a move with a probability of 60%, else choose the best move.
            if random.uniform(0, 1) < 0.6:
                for x in range(3):
                    if self.net.dist[self.cur_locs[i]][self.net.driver_src_dst[i][1]] < \
                            self.net.dist[src_neigh[x]][self.net.driver_src_dst[i][1]]:
                        src_neigh[x] = src_neigh[0]
            self.best_poss.append(src_neigh)
            self.explored = set()

    def next_state(self):
        # Generate the next state based on random moves and exploration.
        next_st = None
        while next_st is None:
            l = []
            for i in range(self.net.total_drivers):
                val = random.randint(0, 9)
                if val < 5:
                    l.append(0)
                elif val < 8:
                    l.append(1)
                else:
                    l.append(2)
            l = tuple(l)
            if l not in self.explored:
                next_st = State(self, self.net)
                for i in range(self.net.total_drivers):
                    if self.cur_locs[i] != self.net.driver_src_dst[i]:
                        next_st.cur_locs[i] = self.best_poss[i][l[i]]
                        next_st.network[self.cur_locs[i]][next_st.cur_locs[i]][1] += 1
                    else:
                        next_st.cur_locs[i] = self.cur_locs[i]
                next_st.build()
                
        return next_st

    def reward(self):
        # Calculate the reward for the current state.
        return self.A * (1 / (self.depth * self.depth)) + self.B * (self.traffic_qual / self.depth)

    def terminal(self):
        # Check if the state is terminal (all drivers reached their destinations).
        for i in range(self.net.total_drivers):
            if self.net.driver_src_dst[i][1] != self.cur_locs[i]:
                return False
        return True


In [6]:
# Create a road network and initialize the first state.
network = RoadNetwork("input2_data.csv", "input2_driver.csv")
first_state = State(None, network)
first_state.build()

# Create the root node for the search tree using the initial state.
root = Node(first_state)

val = 1
# Continue the search until the terminal condition is met (all drivers reach their destinations).
while not root.state.terminal():
    # Perform UCT search to find the best child node.
    child = uct_search(200, root)
    
    # Print the action number.
    print(f"Action {val}")
    print()

    # Print the moves for each driver from the current state to the next state.
    for i in range(network.total_drivers):
        print(f"Move Driver {i} from:{root.state.cur_locs[i]} to:{child.state.cur_locs[i]} ")
    print()
    print()

    # Update the root to the best child node for the next iteration.
    root = child
    val += 1


Action 1

Move Driver 0 from:1 to:6 
Move Driver 1 from:0 to:5 
Move Driver 2 from:2 to:1 
Move Driver 3 from:4 to:4 
Move Driver 4 from:7 to:3 


Action 2

Move Driver 0 from:6 to:6 
Move Driver 1 from:5 to:4 
Move Driver 2 from:1 to:6 
Move Driver 3 from:4 to:4 
Move Driver 4 from:3 to:7 


Action 3

Move Driver 0 from:6 to:6 
Move Driver 1 from:4 to:3 
Move Driver 2 from:6 to:0 
Move Driver 3 from:4 to:5 
Move Driver 4 from:7 to:8 


Action 4

Move Driver 0 from:6 to:6 
Move Driver 1 from:3 to:8 
Move Driver 2 from:0 to:5 
Move Driver 3 from:5 to:0 
Move Driver 4 from:8 to:8 


Action 5

Move Driver 0 from:6 to:6 
Move Driver 1 from:8 to:8 
Move Driver 2 from:5 to:5 
Move Driver 3 from:0 to:6 
Move Driver 4 from:8 to:8 


