# RTS Simulation Engine
#### Development Notes:
Please add **"TBI"** as comment to the sections under development (e.g. _# To be randomized (TBI)_)

In [1]:
# Libraries
import math
import time
import random
import numpy as np
from matplotlib import pyplot as plt

In [2]:
# Global variables
logFileName = "sim_log.txt"
roadXLength = 0

In [3]:
# Curve functions
def arclength(f, a, b, tol=1e-6):
    nSteps = 1                                       # number of steps to compute
    oldLength = 1.0e20
    length = 1.0e10
    while abs(oldLength - length) >= tol:
        nSteps *= 2
        y = f(a)
        xDelta = (b - a) / nSteps                    # space between x-values
        oldLength = length
        length = 0
        for i in range(1, nSteps + 1):
            yPrev = y                                # previous function value
            y = f(a + i * (b - a) / nSteps)          # new function value
            length += math.hypot(xDelta, y - yPrev)  # length of small line segment
    return length

def findNextX(f, currentX, targetLength, step, errorTol):
    nextXold =  nextX = currentX
    calculatedLength = 0

    while(targetLength >= calculatedLength):
        nextX += step*2
        calculatedLength = arclength(f, currentX, nextX, errorTol)

    while abs(targetLength-calculatedLength) > errorTol:
        middle = (nextX+nextXold)/2
        calculatedLength = arclength(f, currentX, middle, errorTol)
        if(calculatedLength < targetLength):
            nextXold = middle
        else:
            nextX = middle

    return nextX

In [4]:
class Logger:
    def __init__(self, road, interval):
        self.cars = list()
        self.road = road
        self.interval = interval
    
    def addCar(self, car):
        self.cars.append(car)
    
    def outputToFile(self):
        f = open(logFileName, "w")
        
        # Log road
        roadLocationList = list()
        
        roadEquation = self.road.lanes[1].equation
        x_values = np.arange(0,roadXLength,self.interval).tolist()+[roadXLength]
        
        for x in x_values:
            roadLocationList.append([x,round(roadEquation(x),2),0])
        
        tripleStrList = list()
        for triple in roadLocationList:
            tripleStr = str(triple[0]) + "," + str(triple[1]) + ",0," + str(triple[2])
            tripleStrList.append(tripleStr)
        roadStr = self.road.name + ' '  + ' '.join(tripleStrList)
        f.write(roadStr + "\n")
        
        # Log lanes
        laneCount = len(self.road.lanes)
        f.write(str(laneCount) + " " + "0,{},0".format(self.road.lane_diff) + "\n")
        
        # Log lights
        lightStr = ' '.join(tripleStrList)
        f.write(lightStr + "\n")
    
        # Log cars with locations and radian angles
        for car in self.cars:
            locationStrList = list()
            for loc in car.location_log:
                locationStrList.append(str(loc[0]) + "," + str(loc[1]) + ",0," + str(loc[2]))
            
            line = str(car.car_id) + " "      
            line += " ".join(locationStrList) + "\n"
            f.write(line)
            
        f.close()

In [5]:
class TrafficLight:
    pass

In [6]:
class Vehicle:
    pass
    # Super-class of car, TBI (to be implemented)

In [7]:
class Lane:    
    def __init__(self, equation):
        self.parent_road = None
        self.equation = equation
        self.cars = []
        # Adjacent lanes
        self.left_lane = None
        self.right_lane = None
        
    def set_left_lane(self, lane):
        self.left_lane = lane
        
    def set_right_lane(self, lane):
        self.right_lane = lane
    
    def set_road(self, road):
        self.parent_road = road
    
    def addToLane(self, car):
        self.cars.append(car)
        car.set_lane(self)
        
    def remove_car(self, car):
        self.cars.remove(car)
        
    def isEmpty(self):
        return True if not self.cars else False
                
    def __str__(self):
        strprt = ""
        for car in self.cars:
            strprt += car + ":" + str(car.location) + "  "
        return strprt

In [8]:
class Road:    
    def __init__(self, name, lanes):
        self.name = name
        self.lanes = lanes
        self.collided_cars = []
    
#     @classmethod
#     def generateLanes(self, laneCount):
#         for i in range(0,laneCount):
#             self.lanes.append(Lane(laneLength, self))
    
    def isEmpty(self):
        for lane in self.lanes:
            if not lane.isEmpty():
                return False
        return True

In [9]:
class Car:
    max_speed = 15.0
    min_speed = 0.0
    max_acc = max_speed / 10
    max_brake = 5
    size = 2.0
    
    def __init__(self, car_id, car_type, driver, initialSpeed, initialLocation): # initloc last radian
        self.car_id = car_id
        self.car_type = car_type
        self.driver = driver
        self.speed = initialSpeed
        self.location = initialLocation
        
        self.location_log = []
        self.combo = 0.0 # This variable is used to simulate smooth braking
        self.lane = None
        self.is_collided = False
        self.mode = 0    # 1 is for lane-changing, 0 is for driving forward
        self.switch_lane = None
        self.switch_iter = 0
            
    def set_lane(self, lane):
        self.lane = lane
        
    def accelerate(self):
        self.combo = 0  # Reset the brake combo
        if self.speed >= self.max_speed:
            self.speed = self.max_speed
        else:
            self.speed += self.max_acc * driver.acc_coef
        self.__move()
        
    def decelerate(self):       
        speed_reduction = driver.dec_coef * Car.max_brake + self.combo
        self.combo += driver.combo_increment
        if speed_reduction > Car.max_brake:
            speed_reduction = Car.max_brake
        self.speed -= speed_reduction
        
        if self.speed < self.min_speed:
            self.speed = self.min_speed
            self.combo = 0
        self.__move()
    
    def __move(self):
        current_x = self.location[0]
        self.location[0] = findNextX(self.lane.equation, current_x, self.speed, 1, 1e-2)
        self.location[1] = self.lane.equation(self.location[0])
        self.location[2] = self.calculate_angle('radian')
        self.logPosition()
    
    def __move_lane(self, shift):
        current_x = self.location[0]
        self.location[0] = findNextX(self.lane.equation, current_x, self.speed, 1, 1e-2)
        self.location[1] = self.lane.equation(self.location[0]) + shift
        self.location[2] = self.calculate_angle('radian')
        self.logPosition()
        
    def logPosition(self):
        self.location_log.append(list(self.location))
    
    def search_front_car(self):        
        front_cars = [car for car in self.lane.cars if car.location[0] > self.location[0]]
        for car in front_cars:
            self_x = self.location[0]
            self_y = self.location[1]
            car_x = car.location[0]
            car_y = car.location[1]
            euclidian_dist = math.dist([self_x, self_y], [car_x, car_y])
            if euclidian_dist <= self.driver.vision: #and self.calc_relative_speed(car) > 0: # This should not be zero, should be tuned regarding to driver type etc.
                return True
        return False
        
    def calc_relative_speed(self, car):
        return self.speed - car.speed
    
    def check_mirrors(self):
        lanes = [self.lane.left_lane, self.lane.right_lane]
        for lane in lanes:
            if lane is not None:
                #back_cars = [car for car in lane.cars if car.location[0] < self.location[0]]
                check = True
                for car in lane.cars:
                    self_x = self.location[0]
                    self_y = self.location[1]
                    car_x = car.location[0]
                    car_y = car.location[1]
                    euclidian_dist = math.dist([self_x, self_y], [car_x, car_y])
                    rel_speed = self.calc_relative_speed(car)
                    x_diff = abs(self_x - car_x)
                    if euclidian_dist <= self.driver.vision:
                        # back control
                        if car.location[0] < self.location[0]:                       
                            if rel_speed < 0 and abs(rel_speed)*5 >= x_diff:
                                check = False
                        else:
                            # front control
                            if rel_speed > 0 and abs(rel_speed)*5 <= x_diff:
                                check = False 
                        #if self.calc_relative_speed(car) < 0:# This should not be zero it should be tuned regarding to driver type etc.
                         #   check = False
                if check:
                    return lane
        return None
        
        
    def decide(self):
        self.check_collision()
        if not self.is_collided:
            # Lane changing mode.
            if self.mode:
                shift = self.switch_lane.parent_road.lane_diff
                if self.switch_lane == self.lane.right_lane:
                    shift *= -1

                self.switch_iter += 1
                self.__move_lane(shift / 5 * self.switch_iter)
                if self.switch_iter == 5:                
                    if self.switch_lane == self.lane.left_lane:
                        self.lane.remove_car(self)
                        self.lane.left_lane.addToLane(self)
                    else:                    
                        self.lane.remove_car(self)
                        self.lane.right_lane.addToLane(self)
                    self.mode = 0
                    self.switch_iter = 0
                    self.switch_lane = None
            else:
                # Check any car nearby in front of self car.
                if self.search_front_car():
                    lane_to_switch = self.check_mirrors()

                    # Check whether any lane is available. If there is no available 
                    # lane then decelerate.
                    if lane_to_switch is not None:
                        self.mode = 1
                        self.switch_lane = lane_to_switch               
                    else:
                        self.decelerate()

                self.accelerate()
                    

    def check_collision(self):
        
        
        left_boundary = self.location[1] - Car.size / 4
        right_boundary = self.location[1] + Car.size / 4
        up_boundary = self.location[0] + Car.size / 2
        down_boundary = self.location[0] - Car.size / 2
        
        current_road = self.lane.parent_road
        for lane in current_road.lanes:
            for car in lane.cars:
                if car.car_id != self.car_id and (down_boundary <= car.location[0] <= up_boundary and left_boundary <= car.location[1] <= right_boundary):
                    self.is_collided = True
                    car.is_collided = True
                    self.speed = 0
                    car.speed = 0
                    current_road.collided_cars.append(self)
                    current_road.collided_cars.append(car)
                    return
        self.is_collided = False
        
        
    def calculate_angle(self, measure):
        derivative_eq = np.polyder(self.lane.equation, 1)
        slope = derivative_eq(self.location[0])
        radian_angle = math.atan(slope)
        if measure == 'radian':
            return radian_angle
        degree_angle = math.degrees(radian_angle)
        return degree_angle

In [10]:
#from abc import ABC, abstractmethod
# Should Driver class be abstract class?

class Driver():
    # new parameter of subclasses ? dec coef brake coef ? self.coef?
    def __init__(self, age):
        self.age = age
        self.vision = self.__set_vision()
        
    def __set_vision(self):
        avg_vision = 90
        return (avg_vision * 0.7 + avg_vision * 0.3 * (1 / (self.age - age_range[0]))) * self.vision_coef
        
    # Adds more functionality to Car class objects or vice versa, TBI

In [11]:
class RookieDriver(Driver):
    dec_coef = 0.9
    combo_increment = 0.8
    acc_coef = 0.3
    vision_coef = 0.5
    def __init__(self, age):
        super().__init__(age)

In [12]:
class HastyDriver(Driver):
    dec_coef = 0.7
    combo_increment = 0.7
    acc_coef = 0.9
    vision_coef = 0.6
    def __init__(self, age):
        super().__init__(age)

In [13]:
class ProfessionalDriver(Driver):
    dec_coef = 0.5
    combo_increment = 0.1
    acc_coef = 0.5
    vision_coef = 1
    def __init__(self, age):
        super().__init__(age)

In [14]:
class SimulationEngine:
    __instance = None
    def __init__(self):
        if SimulationEngine.__instance == None:
            SimulationEngine.__instance = self
            
    @staticmethod
    def get_instance():
        if SimulationEngine.__instance == None:
            SimulationEngine()
        return SimulationEngine.__instance
        
    def set_road(self, road):
        self.road = road
        
    def move_cars(self):
        done_cars = 0
        while done_cars < car_count:
            done_cars = 0
            for lane in self.road.lanes:
                for car in lane.cars:
                    car.decide()
                    
                    # Car remove condition
                    if (car.location[0] > roadXLength and car in lane.cars) or car.is_collided:
                        done_cars += 1
    

In [15]:
if __name__ == '__main__':
    
    # Simulation parameters
    driver_types = ["Rookie", "Professional", "Hasty"]
    car_types = ["Sedan"]
    age_range = list(range(18, 50))
    curve_approx_degree = 5  # degree parameter for polyfit functions

    # Road coordinates
    x = [0.0, 62.75, 89.764, 110.18, 173.75, 229.75, 278.452, 404.829, 463.786, 526.106, 557.173, 622.152, 675.735, 711.27, 765.562, 895.002, 1010.035]
    y = [42.21, 32.21, 27.296, 20.831, 6.085, 2.21, 0.0, 2.983, 13.454, 32.089, 48.138, 85.288, 115.431, 134.645, 165.849, 212.408, 182.864]
 
    y = [-y_val for y_val in y]
    roadXLength = x[-1]

    # Forming Lanes
    lane_distance = 2
    lane_count = 3
    base_lane_equation = np.poly1d(np.polyfit(x, y, curve_approx_degree))
    
    myderivation_model = np.polyder(base_lane_equation)
    x_der = [myderivation_model(x_val) for x_val in x]  # 1st-degree derivatives of pivot points on the base curve
    x_perp = [x + math.pi/2 for x in x_der]             # angles perpendicular to the derivative values
    
    lane_equations = list()  # [0] is the right-most (bottom) lane
    for i in range(lane_count):
        if i == 0:
            lane_equations.append(base_lane_equation)
        else:
            x_adj = list()
            y_adj = list()
            for j, x_val in enumerate(x):
                x_delt = i*lane_distance*math.cos(x_perp[j])
                y_delt = i*lane_distance*math.sin(x_perp[j])
                x_adj.append(x_val+x_delt)
                y_adj.append(y[j]+y_delt)
            lane_equations.append(np.poly1d(np.polyfit(x_adj, y_adj, curve_approx_degree)))
        
    lanes = list()
    for lane_eq in lane_equations:
        lanes.append(Lane(lane_eq))
    
    # Setting up lane relations
    lanes[0].set_left_lane(lanes[1])
    lanes[1].set_left_lane(lanes[2])
    lanes[1].set_right_lane(lanes[0])
    lanes[2].set_right_lane(lanes[1])

    # Setting up the road
    road = Road("Road", lanes)
    for lane in lanes:
        lane.set_road(road)

    # Setting up cars
    cars = []
    car_count = 1
    for i in range(car_count):
        driver_type = driver_types[random.randint(0, len(driver_types) - 1)]
        age = 30           # To be randomized (TBI)
        car_type = 'Sedan' # To be randomized (TBI)

        driver = None
        if driver_type == "Rookie":
            driver = RookieDriver(age)
        elif driver_type == "Professional":
            driver = ProfessionalDriver(age)
        elif driver_type == "Hasty":
            driver = HastyDriver(age)
        
        car_lane = round(random.random()*(lane_count-1))
        speed = random.random() * 10
        x_loc = random.uniform(x[0], x[-1]/10*(i/car_count))
        y_loc = lanes[car_lane].equation(x_loc)
        radian_angle = 0.0
        car_name = f"{car_type}{i + 1}"

        car = Car(car_name, car_type, driver, speed, [x_loc, y_loc, radian_angle])
        lanes[car_lane].addToLane(car)
        cars.append(car)

    # Simulation
    print('• SIMULATION HAS STARTED •')
    start_time = time.time()
    
    simulation_engine = SimulationEngine.get_instance()
    simulation_engine.set_road(road)
    simulation_engine.move_cars()

    # Logging
    logger = Logger(road, 25)

    for car in cars:
        logger.addCar(car)

    logger.outputToFile()
    print('• SIMULATION HAS ENDED •')
    print('Elapsed run time:', time.time() - start_time)

• SIMULATION HAS STARTED •


AttributeError: 'Road' object has no attribute 'lane_diff'