In [2]:
import min_heap
import random
import matplotlib.pyplot as plt

class DirectedWeightedGraph:

    def __init__(self):
        self.adj = {}
        self.weights = {}

    def are_connected(self, node1, node2):
        for neighbour in self.adj[node1]:
            if neighbour == node2:
                return True
        return False

    def adjacent_nodes(self, node):
        return self.adj[node]

    def add_node(self, node):
        self.adj[node] = []

    def add_edge(self, node1, node2, weight):
        if node2 not in self.adj[node1]:
            self.adj[node1].append(node2)
        self.weights[(node1, node2)] = weight

    def w(self, node1, node2):
        if self.are_connected(node1, node2):
            return self.weights[(node1, node2)]

    def number_of_nodes(self):
        return len(self.adj)


def dijkstra(G, source):
    pred = {} #Predecessor dictionary. Isn't returned, but here for your understanding
    dist = {} #Distance dictionary
    Q = min_heap.MinHeap([])
    nodes = list(G.adj.keys())

    #Initialize priority queue/heap and distances
    for node in nodes:
        Q.insert(min_heap.Element(node, float("inf")))
        dist[node] = float("inf")
    Q.decrease_key(source, 0)

    #Meat of the algorithm
    while not Q.is_empty():
        current_element = Q.extract_min()
        current_node = current_element.value
        dist[current_node] = current_element.key
        for neighbour in G.adj[current_node]:
            if dist[current_node] + G.w(current_node, neighbour) < dist[neighbour]:
                Q.decrease_key(neighbour, dist[current_node] + G.w(current_node, neighbour))
                dist[neighbour] = dist[current_node] + G.w(current_node, neighbour)
                pred[neighbour] = current_node
    return dist


def bellman_ford(G, source):
    pred = {} #Predecessor dictionary. Isn't returned, but here for your understanding
    dist = {} #Distance dictionary
    nodes = list(G.adj.keys())

    #Initialize distances
    for node in nodes:
        dist[node] = float("inf")
    dist[source] = 0

    #Meat of the algorithm
    for _ in range(G.number_of_nodes()):
        for node in nodes:
            for neighbour in G.adj[node]:
                if dist[neighbour] > dist[node] + G.w(node, neighbour):
                    dist[neighbour] = dist[node] + G.w(node, neighbour)
                    pred[neighbour] = node
    return dist


def total_dist(dist):
    total = 0
    for key in dist.keys():
        total += dist[key]
    return total

def create_random_complete_graph(x,y):
    G = DirectedWeightedGraph()
    for i in range(x):
        G.add_node(i)
    for i in range(x):
        for j in range(x):
            if i != j:
                G.add_edge(i, j, random.randint(1, y))
    return G


#Assumes G represents its nodes as integers 0,1,...,(n-1)
def mystery(G):
    n = G.number_of_nodes()
    d = init_d(G)
    for k in range(n):
        for i in range(n):
            for j in range(n):
                if d[i][j] > d[i][k] + d[k][j]: 
                    d[i][j] = d[i][k] + d[k][j]
    return d

def init_d(G):
    n = G.number_of_nodes()
    d = [[float("inf") for j in range(n)] for i in range(n)]
    for i in range(n):
        for j in range(n):
            if G.are_connected(i, j):
                d[i][j] = G.w(i, j)
        d[i][i] = 0
    return d
def path_from_pred(pred,start_node,end_node):
    path = []
    current_node = end_node
    while current_node != start_node:
        path.append(current_node)
        current_node = pred[current_node]
    path.append(current_node)
    path.reverse()
    return path



In [8]:
import csv
station_line = {}
direct_dis = {}
path_length = {}


def add_different_value(list,v1,v2):
    # This function check if v1 and v2 are in the list, if not then add them to list.
    if not v1 in list:
        list.append(v1)
    if not v2 in list:
        list.append(v2)

with open('london_stations.csv', mode ='r')as file:
    ls_csv = csv.reader(file)
    for line in ls_csv:
        if line[0] == "id":
            continue
        direct_dis[line[0]] = (line[1],line[2],line[6])

with open("london_connections.csv", mode = 'r') as file:
    lc_csv = csv.reader(file)
    
    for line in lc_csv:
        if line[0] == "station1":
            continue
        if line[2] not in station_line.keys():
            station_line[line[2]] = []
        add_different_value(station_line[line[2]],line[0],line[1])
        path_length[(line[0],line[1])] = line[3]
        path_length[(line[1],line[0])] = line[3]


def h(s,d):
    return ((direct_dis[s][0] - direct_dis[d][0])**2 + (direct_dis[s][1] - direct_dis[d][1])**2)**(1/2)

def a_star(G, s, d, h):
    pred = {} #Predecessor dictionary. Isn't returned, but here for your understanding
    dist = {} #Distance dictionary
    Q = min_heap.MinHeap([])
    nodes = list(G.adj.keys())
    for node in nodes:
        Q.insert(min_heap.Element(node, float("inf")))
        dist[node] = float("inf")
    Q.decrease_key(s, h(s,d))   
    while not Q.is_empty():
        current_element = Q.extract_min()
        if current_element == d:
            break
        current_node = current_element.value
        dist[current_node] = current_element.key - h(current_node,d)
        for neighbour in G.adj[current_node]:
            if dist[current_node] + G.w(current_node, neighbour) < dist[neighbour]:
                Q.decrease_key(neighbour, dist[current_node] + G.w(current_node, neighbour) + h(neighbour,d) )
                dist[neighbour] = dist[current_node] + G.w(current_node, neighbour)
                pred[neighbour] = current_node
    return (pred, path_from_pred(pred,s,d))

#depand on the 2 csv file create a directed weighted graph
def create_london_graph():
    london_graph = DirectedWeightedGraph()
    for key in direct_dis.keys():
        london_graph.add_node(key)
    for key in path_length.keys():
        london_graph.add_edge(key[0],key[1],path_length[key])
    return london_graph







In [9]:
G=DirectedWeightedGraph()
G.add_node(1)
G.add_node(2)
G.add_node(3)
G.add_node(4)
G.add_node(5)
G.add_node(6)
G.add_edge(1,2,5)
G.add_edge(2,3,5)
G.add_edge(3,4,3)
G.add_edge(4,5,3)
G.add_edge(1,5,7)
G.add_edge(5,6,1)
def get_h(s,des):
    h = {1:2,2:4,3:3,4:2,5:1,6:0}
    return h[s]
d=a_star(G,1,6,get_h)
print(d)

({2: 1, 5: 1, 6: 5, 3: 2, 4: 3}, [1, 5, 6])


In [None]:
# Experient to test the performance of a* and dijkstra on the same station line
def sameLineExp():
    

In [10]:
# 当启发函数h(n)始终为0
# 如果h(n)始终小于等于节点n到终点的代价
# 如果h(n)的值比节点n到终点的代价要大
def h_zero(s, d):
    return 0
def h_lesser(s, d):
    return h(s, d)/2
def h_greater(s, d):
    return h(s, d) * 2

def test_h():
    london_graph = create_london_graph()
    print("h_zero:")
    print(a_star(london_graph, s, d, h_zero))
    print("h_lesser:")
    print(a_star(london_graph, s, d, h_lesser))
    print("h_greater:")
    print(a_star(london_graph, s, d, h_greater))

test_h()

h_zero:


NameError: name 's' is not defined