# The London Railway Network

The cell below defines the abstract class whose API you will need to impement. Do NOT modify it.

In [None]:
# DO NOT MODIFY THIS CELL

from abc import ABC, abstractmethod  

class AbstractLondonRailwayMapper(ABC):
    
    # constructor
    @abstractmethod
    def __init__(self):
        pass           
        
    # data initialisation
    @abstractmethod
    def loadStationsAndLines(self):
        pass

    # returns the minimum number of stops to connect station "fromS" to station  "toS"
    # fromS : str
    # toS : str
    # numStops : int
    @abstractmethod
    def minStops(self, fromS, toS):     
        numStops = -1
        return numStops    
    
    # returns the minimum distance in miles to connect station "fromS" to station  "toS"
    # fromS : str
    # toS : str
    # minDistance : float
    @abstractmethod
    def minDistance(self, fromS, toS):
        minDistance = -1.0
        return minDistance
    
    # given an unordered list of station names, returns a new railway line 
    # (represented as a list of adjacent station names), connecting all such stations 
    # and such that the sum of the distances (in miles) between adjacent stations is minimised
    # inputList : set<str>
    # outputList : list<str>
    @abstractmethod
    def newRailwayLine(self, inputList):
        outputList = []
        return outputList

Use the cell below to define any data structure and auxiliary python function you may need. Leave the implementation of the main API to the next code cell instead.

In [None]:
import csv
from math import sin, cos, sqrt, atan2, radians, inf
import random

class Edge():
    def __init__(self, v, w, weight):
        self.v = v
        self.w = w
        self.weight = weight

    def endPoint(self):
        return self.v
    
    def otherEndPoint(self, vertex):
        if vertex == self.v: return self.w
        else: return self.v

    def showValues(self):
        return self.v, self.w, self.weight

class EdgeWeightedGraph():
    def __init__(self, V):
        self.V = V
        self.adj = []
        for _ in range(0, V):
            self.adj.append([])

    def addWeightedEdge(self, e):
        v = e.endPoint()
        w = e.otherEndPoint(v)
        self.adj[v].append(e)
        self.adj[w].append(e)

    def adja(self, v):
        return self.adj[v]

class BFS():
    def __init__(self, G):
        self.distToSource = [-1 for _ in range(0, G.V)]
        self.edgeTo = [-1 for _ in range(0, G.V)]


    def bfs(self, G, s):
        queue = []
        queue.append(s)
        self.distToSource[s] = 0
        while queue != []:
            x = queue.pop(0)
            for w in G.adj[x]:

                #retrieving index from station objects
                if w.endPoint() == x:
                    w = w.otherEndPoint(x)
                else:
                    w = w.endPoint()

                if self.distToSource[w] == -1:
                    queue.append(w)
                    self.distToSource[w] = self.distToSource[x] + 1
                    self.edgeTo[w] = x

    def hasPathTo(self, v):
        return self.distToSource[v] != -1

    def pathLengthTo(self, v):
        return self.distToSource[v]

    def pathTo(self, v, s):
        if self.hasPathTo(v) == False:
            return None
        
        stack = []
        x = v
        while x != s:
            stack.append(x)
            x = self.edgeTo[x]
        stack.append(s)
        return stack

class Station():
    def __init__(self, name, index, lat, lon):
        self.name = name
        self.index = index
        self.lat = lat
        self.lon = lon

    def show(self):
        return self.name, self.index, self.lat, self.lon

class Node():
    def __init__(self, vertex, distTo):
        self.vertex = vertex
        self.distTo = distTo

class BinaryHeapMinPQ():
    def __init__(self):
        self.size = [Node(-1, 0)]
        self.index = -1

    def length(self):
        return len(self.size) - 1

    def isEmpty(self):
        return len(self.size) == 1

    def contains(self, key):
        i = 0
        for node in self.size:
            if key == node.vertex:
                self.index = i 
                return True
            i += 1
        return False

    def insert(self, node):
        self.size.append(node)
        self.shiftUp(self.length())

    def shiftUp(self, index):

        while index > 0:
            if self.size[index].distTo < self.size[index//2].distTo:
                self.size[index], self.size[index//2] = self.size[index//2], self.size[index]
            index = index // 2
            
    def extractMin(self):
        minValue = self.size[1]
       
        self.size[1], self.size[self.length()] = self.size[self.length()], self.size[1]
        e = self.size.pop()

        self.shiftDown()
        return e

    def shiftDown(self):
        index = 1
        while index * 2 <= self.length():

            i = self.minChild(index)
            if self.size[index].distTo > self.size[i].distTo:
                self.size[index], self.size[i] = self.size[i], self.size[index]
            
            index = i

    def minChild(self, index):
        if index*2 == self.length():
            return index*2
        if self.size[index*2].distTo > self.size[index*2 + 1].distTo:
            return index*2 + 1
        else:
            return index*2

    def decreaseKey(self, key, value):

        self.size[self.index].distTo = value
        self.shiftUp(self.index)

class DijkstraShortestPath():
    def __init__(self, G, s):
        self.distTo = [inf for _ in range(0, G.V)]
        self.edgeTo = [False for _ in range(0, G.V)]
        self.distTo[s] = 0

        self.pq = BinaryHeapMinPQ()
        self.pq.insert(Node(s, 0))

        while self.pq.isEmpty() is False:

            v = self.pq.extractMin().vertex

            for e in G.adj[v]:
                self.relax(e)

    def relax(self, e):
        v = e.endPoint()
        w = e.otherEndPoint(v)

        if self.distTo[w] > self.distTo[v] + e.weight:
            self.distTo[w] = self.distTo[v] + e.weight
            self.edgeTo[w] = e
            if self.pq.contains(w):
                self.pq.decreaseKey(w, self.distTo[w])
            else:
                self.pq.insert(Node(w, self.distTo[w]))


        if self.distTo[v] > self.distTo[w] + e.weight:
            self.distTo[v] = self.distTo[w] + e.weight
            self.edgeTo[v] = e
            if self.pq.contains(v):
                self.pq.decreaseKey(v, self.distTo[v])
            else:
                self.pq.insert(Node(v, self.distTo[v]))

def calculateDist(lat1, lon1, lat2, lon2):
        earthRadius = 3958.76 # in miles

        lat1, lon1, lat2, lon2 = float(lat1), float(lon1), float(lat2), float(lon2)
        # converting from deg to rad
        lat1, lon1, lat2, lon2 = radians(lat1), radians(lon1), radians(lat2), radians(lon2)
        dlon = lon2 - lon1
        dlat = lat2 - lat1

        # Haversine formula for calculating distance between 2 points on a sphere
        a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
        c = 2 * atan2(sqrt(a), sqrt(1 - a))

        distance = earthRadius * c
        
        return distance

""" BRUTE FORCE ALGORITHM """

#generate all possible permutations of stations route and selects the shortest path
def bruteForce_tsp(stations):
    return shortest_tour(alltours(stations))

#returns a list of stations, each a permutation of the routes, but each starting with the same station
def alltours(stations):
    tours = []
    for station in stations:
        for rest in all_perms(list(stations - {station})):
            tours.append([station] + rest)
    return tours

def all_perms(elements):
    if len(elements) <=1:
        yield elements
    else:
        for perm in all_perms(elements[1:]):
            for i in range(len(elements)):
                # nb elements[0:1] works in both string and list contexts
                yield perm[:i] + elements[0:1] + perm[i:]

""" BASE NN TSP ALGORITHM """
#returns the first station in the set
def first(stations):
    return next(iter(stations))

def shortest_tour(tours):
    return min(tours, key= tour_length)

def tour_length(tour):

    #place all stations and edges onto the graph for each tour 
    distSum = 0
    for i in range(1, len(tour)):

        station1 = tour[i-1]
        station2 = tour[i]
        distTo = calculateDist(station1.lat, station1.lon, station2.lat, station2.lon)
        distSum += distTo

    return distSum

#nearest neighbour tsp algorithm
def nn_tsp(stations, start= None):
    if start is None:
        start = first(stations)
    start = first(stations)
    tour = [start]
    unvisited = set(stations - {start})
    while unvisited:
        s = nearest_neighbour(tour[-1], unvisited)
        tour.append(s)
        unvisited.remove(s)

    return tour

#finds the station that is the closest to the previous city by distance 
def nearest_neighbour(station, stations):
    minValue1 = inf
    for stat in stations:
        distance = calculateDist(stat.lat, stat.lon, station.lat, station.lon)
        if distance < minValue1:
            minValue1 = distance
            closest = stat
    return closest

""" REPEATED NN TSP ALGORITHM """
def repeated_nn_tsp(stations, repetitions= 100):
    return shortest_tour(nn_tsp(stations, start) for start in sample(stations, repetitions))

def sample(population, k, seed= 42):
    if k is None or k > len(population):
        return population
    random.seed(len(population) * seed * k)
    return random.sample(population, k)

def repeat_10_nn_tsp(stations): return repeated_nn_tsp(stations, 10)
def repeat_100_nn_tsp(stations): return repeated_nn_tsp(stations, 100)

""" ALTERED NN TSP ALGORTIHM """

#if reversing the tour shortens it, then execute.
def reverse_if_better(tour, i, j):
    A, B, C, D = tour[i-1], tour[i], tour[j-1], tour[j % len(tour)]
    distAB = calculateDist(A.lat, A.lon, B.lat, B.lon)
    distCD = calculateDist(C.lat, C.lon, D.lat, D.lon)
    distAC = calculateDist(A.lat, A.lon, C.lat, C.lon)
    distBD = calculateDist(B.lat, B.lon, D.lat, D.lon)

    if distAB + distCD > distAC + distBD:
        tour[i:j] = reversed(tour[i:j])

#alter tour by reversing segments
def alter_tour(tour):
    original_length = tour_length(tour)
    for (start, end) in all_segments(len(tour)):
        reverse_if_better(tour, start, end)
    
    if tour_length(tour) < original_length:
        return alter_tour(tour)
    return tour

#return pairs of indexes that form the tour segments
def all_segments(N):
    return [(start, start + length) 
            for length in range(N, 1, -1)
            for start in range(N - length + 1)]

#run altered nearest neightbour TSP algorithm, and altering the results
def altered_nn_tsp(stations):
    return alter_tour(nn_tsp(stations))


""" ALTERED & REPEATED NN TSP """
def repeated_altered_nn_tsp(stations, repetitions= 30):
    repetitions = len(stations)
    return shortest_tour(alter_tour(nn_tsp(stations, start)) 
                        for start in sample(stations, repetitions))


Use the cell below to implement the requested API.

In [None]:
import csv

class LondonRailwayMapper(AbstractLondonRailwayMapper):
    
    def __init__(self):
        self.lines = {}
        self.stations = {}
        self.graph = None
                   
        
    def loadStationsAndLines(self):
        try:
            with open('londonstations.csv', newline='') as stationsFile, open('londonrailwaylines.csv', newline='') as railwayLinesFile:

                stationsReader = csv.reader(stationsFile, delimiter= ',')
                linesReader = csv.reader(railwayLinesFile, delimiter= ',')

                # storing stations and coordinates in a hash table
                next(stationsReader)
                index = 0
                for station in stationsReader:
                    self.stations[station[0]] = Station(station[0], index, station[1], station[2])
                    index += 1
                
                vertices = len(self.stations)
                self.graph = EdgeWeightedGraph(vertices)

                # storing lines and stations in a hash table
                next(linesReader)
                for row in linesReader:
                    
                    station1 = self.stations.get(row[1])
                    station2 = self.stations.get(row[2])
                    lat1, lon1, lat2, lon2 = float(station1.lat), float(station1.lon), float(station2.lat), float(station2.lon)
                    
                    dist = calculateDist(lat1, lon1, lat2, lon2) #in miles

                    # storing the vertices using indexes of the stations
                    self.graph.addWeightedEdge(Edge(station1.index, station2.index, dist))

                    #check if line is in table. If not, append it.
                    if row[0] in self.lines:
                        if row[1] not in self.lines[row[0]]:
                            self.lines[row[0]].append(station1)

                        if row[2] not in self.lines[row[0]]:
                            self.lines[row[0]].append(station2)

                    else:
                        newline = []
                        newline.append(station1)
                        newline.append(station2)
                        self.lines[row[0]] = newline
                
        except IOError as e:
            print("Error. 'londonstations.csv' or 'londonrailwaylines.csv' is not found. Please try again.")      
        
    
    def minStops(self, fromS, toS):     
        numStops = -1

        #obtain index of station object from hashTable
        fromStation = self.stations.get(fromS).index
        toStation = self.stations.get(toS).index

        #create BFS object for shortest route without weights
        search = BFS(self.graph)
        search.bfs(self.graph, fromStation)

        numStops = search.pathLengthTo(toStation)

        return numStops    
    
    
    
    def minDistance(self, fromS, toS):
        minDistance = -1.0

        #obtain index of station object from hashTable
        fromStation = self.stations.get(fromS).index
        toStation = self.stations.get(toS).index

        #initialise Dijkstra's algorithm
        shortestPath = DijkstraShortestPath(self.graph, fromStation)
        minDistance = shortestPath.distTo[toStation]
        
        return minDistance
    
    def newRailwayLine(self, inputList):
        outputList = []
        inputStations = set()
        for station in inputList:
            station = self.stations.get(station)
            inputStations.add(station)
        
        if len(inputList) < 8:
            line = bruteForce_tsp(inputStations)

        else:
            line = repeated_altered_nn_tsp(inputStations)

        #add edges between stations of new line
        for i in range(0, len(line) - 1):
            edgeDist = calculateDist(line[i].lat, line[i].lon, line[i+1].lat, line[i+1].lon)
            self.graph.addWeightedEdge(Edge(line[i].index, line[i+1].index, edgeDist))
            outputList.append(line[i].name)
        
        return outputList

Use the cell below for all python code needed to test the `LondonRailwayMapper` class above.

In [None]:
import timeit
from matplotlib import pyplot as plt

""" VISUALISING THE TRACK"""  

#sets the seed to ensure that all tests are on the same seed
def Stations(n, stationsDict, seed =69):
    random.seed(seed * n)
    return frozenset(random.choice(list(stationsDict.values())) for _ in range(n))


#plot the stations as nodes and the tour as lines between them
def plot_tour(tour):
    start = tour[0]
    plot_lines(list(tour))
    plot_lines([start], 'rs')

#plot lines to connect a series of points
def plot_lines(points, style= 'bo-'):
    x_coords, y_coords = [], []
    for point in points:
    
        x_coords.append(float(point.lat)) 
        y_coords.append(float(point.lon))
        
    plt.plot(x_coords, y_coords, style)
    plt.axis('scaled')
    plt.axis('off')

#plot the graph for the corresponding algorithm and outputs the distance and time taken
def plot_tsp(algorithm, stations):
    time0 = timeit.default_timer()
    tour = algorithm(stations)
    time1 = timeit.default_timer()
    assert valid_tour(tour, stations)
    plot_tour(tour); plt.show()
    print("{} city tour with length {:.1f} in {:.3f} secs for {}".format(len(tour), tour_length(tour), time1 - time0, algorithm.__name__))

#checks that the tour is valid 
def valid_tour(tour, stations):
    return set(tour) == set(stations) and len(tour) == len(stations)

""" PLOTTING TIME AGAINST NUMBER OF STATIONS"""

def test_tsp_algorithms(stations, algorithm, time_taken):
    timeSum = 0
    for n in range(10):
        time0 = timeit.default_timer()
        tour = algorithm(stations)
        time1 = timeit.default_timer()
        timet = time1 - time0
        timeSum += timet
    time_avg = timeSum / 10
    assert valid_tour(tour, stations)
    time_taken.append(time_avg)
    print("{} station tour with length {:.1f} in {:.3f} secs for {}".format(len(tour), tour_length(tour), time1 - time0, algorithm.__name__))

def plot_tsp_test_graphs(x_axis, y_axis, label, color):
    x = numpy.array(x_axis)
    y = numpy.array(y_axis)
    m, b = numpy.polyfit(x, y, 1)

    plt.plot(x, y, 'o', color= color, label= label)

def run_tsp_test(numOfStations, algorithm, label, color, start=3):
    numOfStations_xaxis = [x for x in range(start, numOfStations)]
    time_taken = []
    for n in range(start, numOfStations):
        stations = Stations(n)
        test_tsp_algorithms(stations, algorithm, time_taken)

    plot_tsp_test_graphs(numOfStations_xaxis, time_taken, label, color)

""" RATIO OF BEST ROUTE LENGTH PRODUCED BY TWO ALGORITHMS """

def optimal_ratio(numOfStations, algorithm1, algorithm2):
    numOfStations_xaxis = [x for x in range(3, numOfStations)]
    ratios = []
    for n in range(3, numOfStations):
        tour1Sum, tour2Sum = 0, 0
        for i in range(50):
            stations = Stations(n)
            tour1 = algorithm1(stations)
            length1 = tour_length(tour1)
            tour1Sum += length1 
            tour2 = algorithm2(stations)
            length2 = tour_length(tour2)
            tour2Sum += length2

        length1 = tour1Sum / 50
        length2 = tour2Sum / 50
        ratio = length2 / length1
        ratios.append(ratio)
        print("{} station tour with length {:.1f} for {} and length {:.1f} for {}".format(n, length1, algorithm1.__name__, length2, algorithm2.__name__))
    
    x = numpy.array(numOfStations_xaxis)
    y = numpy.array(ratios)
    m, b = numpy.polyfit(x, y, 1)

    plt.plot(x, y, 'o')
    plt.plot(x, m * x + b)

""" TEST FINAL CODE  """
def run_tsp_final(numStations):
    if numStations <= 8:
        run_tsp_test(numStations, bruteForce_tsp, "brute_force_tsp", 'red')
    
    else:
        run_tsp_test(8, bruteForce_tsp, "brute_force_tsp", 'red')
        run_tsp_test(numStations, repeated_improved_nn_tsp, "repeated_altered_nn_tsp", 'green', 8)

""" TESTS """
m = LondonRailwayMapper()
m.loadStationsAndLines()

#test code to visualise the route (change the number to number of stations as desired)

# plot_tsp(bruteForce_tsp, Stations(8, m.stations))
# plot_tsp(nn_tsp, Stations(100, m.stations))
# plot_tsp(altered_nn_tsp, Stations(100, m.stations))
# plot_tsp(repeated_altered_nn_tsp, Stations(100, m.stations))

#test code to compare time complexities between algorithms (change n to number of stations accordingly)

# run_tsp_test(n, bruteForce_tsp, "brute_force_tsp", 'red')
# run_tsp_test(n, nn_tsp, "nn_tsp", 'green')
# run_tsp_test(n, repeated_improved_nn_tsp, "repeated_improved_nn_tsp", 'blue')
# plt.xlabel("Number of stations", loc= 'center')
# plt.ylabel("Time taken (s)", loc= 'center')
# plt.legend(loc= 'best')
# plt.show()

#test code to compare the best route length between algorithms (change n to number of stations, and algorithms accordingly)

# optimal_ratio(n, nn_tsp, repeated_improved_nn_tsp)
# plt.xlabel("Number of stations", loc= 'center')
# plt.ylabel("ratio of repeated_altered_nn_tsp length to nn_tsp length", loc= 'center')
# plt.show()

#test code to check time complexity of final algorithm
# run_tsp_final(n)

The cell below exemplifies the test code I will invoke on your submission. Do NOT modify it. 

In [None]:
# DO NOT MODIFY THIS CELL

import timeit

testMapper = LondonRailwayMapper()

#
# testing the loadStationsAndLines() API 
#
starttime = timeit.default_timer()
testMapper.loadStationsAndLines()
endtime = timeit.default_timer()
print("\nExecution time to load:", round(endtime-starttime,3))

#
# testing the minStops() and minStops() API on a sample of from/to station pairs  
#
fromList = ["Baker Street", "Epping", "Canonbury", "Vauxhall"]
toList = ["North Wembley", "Belsize Park", "Balham", "Leytonstone"]

for i in range(len(fromList)):
    starttime = timeit.default_timer()
    stops = testMapper.minStops(fromList[i], toList[i])
    endtime = timeit.default_timer()
    print("\nExecution time minStops:", round(endtime-starttime,3))

    starttime = timeit.default_timer()
    dist = testMapper.minStops(fromList[i], toList[i])
    endtime = timeit.default_timer()
    print("Execution time minDistance:", round(endtime-starttime,3))

    print("From", fromList[i], "to", toList[i], "in", stops, "stops and", dist, "miles")  
    
#
# testing the newRailwayLine() API on a small list of stations  
#
stationsList = ["Queens Park", "Chigwell", "Moorgate", "Swiss Cottage", "Liverpool Street", "Highgate"]

starttime = timeit.default_timer()
newLine = testMapper.newRailwayLine(stationsList)
endtime = timeit.default_timer()

print("\n\nStation list", stationsList)
print("New station line", newLine)
print("Total track length from", newLine[0], "to", newLine[len(newLine)-1], ":", testMapper.minDistance(newLine[0], newLine[len(newLine)-1]), "miles")
print("Execution time newLine:", round(endtime-starttime,3))

#
# testing the newRailwayLine() API on a big list of stations  
#
stationsList = ["Abbey Road", "Barbican", "Bethnal Green", "Cambridge Heath", "Covent Garden", "Dollis Hill", "East Finchley", "Finchley Road and Frognal", "Great Portland Street", "Hackney Wick", "Isleworth", "Kentish Town West", "Leyton", "Marble Arch", "North Wembley", "Old Street", "Pimlico", "Queens Park", "Richmond", "Shepherds Bush", "Tottenham Hale", "Uxbridge", "Vauxhall", "Wapping"]

starttime = timeit.default_timer()
newLine = testMapper.newRailwayLine(stationsList)
endtime = timeit.default_timer()

print("\n\nStation list", stationsList)
print("New station line", newLine)
print("Total track length from", newLine[0], "to", newLine[len(newLine)-1], ":", testMapper.minDistance(newLine[0], newLine[len(newLine)-1]), "miles")
print("Execution time newLine:", round(endtime-starttime,3))