In [44]:
import agentpy as ap
import numpy as np
import math
import matplotlib.pyplot as plt
import IPython

In [45]:
class TrafficLightAgent(ap.Agent):

    def setup(self, **kwargs):
        # Declaracion de variables
        self.state = 2
        self.state_time = 0
        self.green = self.p['traffic lights']['time green']
        self.yellow = self.p['traffic lights']['time yellow']
        self.direction = kwargs['direction']

    def calculate_traffic(self):
        traffic = 0
        start = sum(self.model.cars_per_dir[0:self.id - 1])
        end = start + self.model.cars_per_dir[self.id -1]

        # Iterate through neighbor cars in range.
        for car in self.model.cars[start:end]:

            # car is in the light's lane and before the light.
            if car._get_traffic_light_dist(self) != -1 and car.speed == 0:
                traffic += 1

        return traffic

    def update(self):
        # Actualizar el semaforo
        self.state_time += self.p['step time']

        if self.state == 0: # Luz verde
            if self.state_time >= self.green:
                self.state = 1
                self.state_time = 0
        elif self.state == 1: # Luz amarilla
            if self.state_time >= self.yellow:
                self.state = 2
                self.state_time = 0

    def green_light(self): #Poner el semaforo de color verde
        self.state = 0
        self.state_time = 0
    
    def yellow_light(self): #Poner el semaforo de color amarillo
        self.state = 1
        self.state_time = 0
    
    def red_light(self): #Poner el semaforo de color rojo
        self.state = 2
        self.state_time = 0

In [46]:
class CarAgent (ap.Agent):

    def setup(self, **kwargs):
        self.state = 0
        self.speed = 0.0
        self.direction = kwargs['direction']
        self.max_speed = self.model.random.uniform(
            self.p['car']['max vel range'][0],
            self.p['car']['max vel range'][1]
        )
        self.length = self.model.random.uniform(
            self.p['car']['length range'][0],
            self.p['car']['length range'][1]
        )
        self.front_min_dist = self.model.random.uniform(
           self.p['car']['min dist range'][0],
           self.p['car']['min dist range'][1]
        )

    def update_speed(self):
        front_dist = self.p['a']
        traffic_light_dist = self.p['a']
        traffic_light_state = 0

        for car in self.model.cars:
            dist = self._get_front_dsit(car)

            if dist != -1 and dist < front_dist:
                front_dist = dist

        for traffic_light in self.model.traffic_lights:
            dist = self._get_traffic_light_dist(traffic_light)

            if dist != -1 and dist < traffic_light_dist:
                traffic_light_dist = dist
                traffic_light_state = traffic_light.state

        # Actualiza la velocidad del auto
        if front_dist < 1:
            self.speed = 0
            self.state = 1
        elif front_dist < self.front_min_dist:
              self.speed = np.maximum(self.speed - 200*self.p['step time'], 0)

        elif front_dist < self.front_min_dist * 1.2:
              self.speed = np.maximum(self.speed - 100*self.p['step time'], 0)

        elif front_dist < self.front_min_dist * 1.5:
              self.speed = np.maximum(self.speed - 80*self.p['step time'], 0)

        elif traffic_light_state == 1 and traffic_light_dist < self.p['b'] * 2 + 10:
            self.speed = np.minimum(self.speed + 5*self.p['step time'], self.max_speed)

        elif traffic_light_state == 1 and traffic_light_dist < self.p['b'] * 2 + 50:
            self.speed = np.maximum(self.speed - 2*self.p['step time'], 0)
        
        elif traffic_light_state == 2 and traffic_light_dist < self.p['b'] * 2 + 10:
            self.speed = np.maximum(self.speed - 2*self.p['step time'], 0)

        elif traffic_light_state == 2 and traffic_light_dist < self.p['b'] * 2 + 50:
            self.speed = np.maximum(self.speed - 10*self.p['step time'], 0)

        else:
            self.speed = np.minimum(self.speed + 5 * self.p['step time'], self.max_speed)

    def update_position(self):
        self.model.intersection.move_by(
            self,
            (self.direction[0] * self.speed, self.direction[1] * self.speed)
        )

    def _get_front_dsit(self, target):
        c1 = self.model.intersection.positions[self]
        c2 = self.model.intersection.positions[target]
        
        # Checks neighbor is in the same lane.
        dp_direction = self._dot_product(
            self.direction,
            target.direction
        )

        # Checks neighbor is in front.
        dp_place = self._dot_product(
            (
                c2[0] - c1[0],
                c2[1] - c1[1]
            ),
            self.direction
        )

        if dp_direction > 0 and dp_place > 0:
            return math.sqrt((c1[0] - c2[0])**2 + (c1[1] - c2[1])**2) - ((target.length + self.length) / 2)

        # Invalid target.
        return -1

    def _get_traffic_light_dist(self, target):
        c = self.model.intersection.positions[self]
        t = self.model.intersection.positions[target]

        dp_direction = self._dot_product(
            target.direction,
            self.direction
        )

        dp_place = self._dot_product(
            (
                t[0] - c[0],
                t[1] - c[1]
            ),
            self.direction
        )

        if dp_direction < 0 and dp_place > 0:
            return math.sqrt((c[0] - t[0])**2 + (c[1] - t[1])**2)

        # Invalid target.
        return -1  

    def _accelerate(self):
        if self.velocity < self.max_velocity:
            self.velocity += 1

    def _deaccelerate(self):
        if self.velocity > 0:
            self.velocity -= 1

    def _dot_product(self, v1, v2):
        return v1[0] * v2[0] + v1[1] * v2[1]
        

In [47]:
class TrafficModel (ap.Model):

    def setup(self):
        self.dead_time = 0.0
        self.cars_per_dir = self._calc_cars_per_direction(4)

        cars_direction = ap.AttrIter(
            # upstream.
            ([(0, 1)] * self.cars_per_dir[0]) +\
            # downstream.
            ([(0, -1)] * self.cars_per_dir[1]) +\
            # leftstream.
            ([(-1, 0)] * self.cars_per_dir[2]) +\
            # rightstream.
            ([(1, 0)] * self.cars_per_dir[3])
        )

        traffic_lights_direction = ap.AttrIter(
            [
                (0, -1),    # controls upstream.
                (0, 1),     # controls downstream.
                (1, 0),     # controls leftstream.
                (-1, 0)     # controls rightstream.
            ]
        )

        # Generates traffic lights agents
        self.traffic_lights = ap.AgentList(
            self,
            self.p['traffic lights']['amount'],
            TrafficLightAgent,
            direction=traffic_lights_direction
        )

        # Generate car agents.
        self.cars = ap.AgentList(
            self,
            self.p["car"]["amount"],
            CarAgent,
            direction=cars_direction
        )

        # Generate space.
        self.intersection = ap.Space(
            self,
            (self.p['a'], self.p['a']),
            torus=True
        )

        self.intersection.add_agents(self.cars, self._place_cars())
        self.intersection.add_agents(self.traffic_lights, self._place_traffic_lights())

        self.next_green()

    def step(self):
        self.traffic_lights.update()

        if self._all_red():
            self.next_green()
            
        self.cars.update_position()
        self.cars.update_speed()

    def update(self):
        for i in range(self.p['car']['amount']):
            self.record(
                i,
                self.cars[i].speed
            )

    def end(self):
        self.report(
            ['crashed'],
            [self.cars[i].state for i in range(self.p['car']['amount'])]
        )

    def next_green(self):
        traffic = self.traffic_lights.calculate_traffic()

        if self.dead_time < 0.0:
            # More traffic in vertical lanes.
            if traffic[0] + traffic[1] > traffic[2] + traffic[3]:
                self.traffic_lights[0].green_light()
                self.traffic_lights[1].green_light()

            # More traffic in horizontal lanes.
            elif traffic[2] + traffic[3] > traffic[0] + traffic[1]:
                self.traffic_lights[2].green_light()
                self.traffic_lights[3].green_light()

            # Same traffic in vertical and horizontal lanes.
            else:
                chosen = self.random.choices([(0,1), (2,3)], [0.5, 0.5])[0]

                self.traffic_lights[chosen[0]].green_light()
                self.traffic_lights[chosen[1]].green_light()

            self.dead_time = self.p['dead time']

        else:
            self.dead_time -= self.p['step time']

    def _calc_cars_per_direction(self, num_directions):
        leftover = 0
        cars_per_dir = []

        for i in range(num_directions):
            cars_in_dir = self.p['car']['amount'] * self.p['density'][i]

            leftover += cars_in_dir - int(cars_in_dir)

            cars_per_dir.append(int(cars_in_dir))

        for _ in range(int(leftover)):
            cars_per_dir[self.random.randint(0, num_directions - 1)] += 1

        return cars_per_dir

    def _place_cars(self):
        # Next car's y cordenate in upstream lane.
        next_up_lane = 0.5 * self.p['a'] - self.p['b']
        # Next car's y cordenate in downstream lane.
        next_down_lane = 0.5 * self.p['a'] + self.p['b']
        # Next car's x cordenate in leftstream lane.
        next_left_lane = 0.5 * self.p['a'] + self.p['b']
        # Next car's x cordenate in rightstream lane.
        next_right_lane = 0.5 * self.p['a'] - self.p['b']
        # index to start iterating the agent list.
        start = 0
        # index to stop iterating the agnet list.
        end = self.cars_per_dir[0]
        positions = []

        # Cars spwaned in upstream lane.
        for next_car in self.cars[start:end]:
            positions.append(
                (
                    0.5 * (self.p['a'] + self.p['l']),  # X cordenate.
                    next_up_lane - next_car.length      # Y cordenate.
                )
            )

            next_up_lane -= next_car.length + next_car.front_min_dist

        start = end
        end = start + self.cars_per_dir[1]

        # Cars spawned in downstrem lane.
        for next_car in self.cars[start:end]:
            positions.append(
                (
                    0.5 * (self.p['a'] - self.p['l']),  # X cordenate.
                    next_down_lane + next_car.length    # Y cordenate.
                )
            )

            next_down_lane += next_car.length + next_car.front_min_dist

        start = end
        end = start + self.cars_per_dir[2]

        # Cars spawned in leftstream lane.
        for next_car in self.cars[start:end]:
            positions.append(
                (
                    next_left_lane + next_car.length,   # X cordenate.
                    0.5 * (self.p['a']  + self.p['l'])  # Y cordenate.
                )
            )

            next_left_lane += next_car.length + next_car.front_min_dist

        start = end

        # Cars spawned in rightstream lane.
        for next_car in self.cars[start:]:
            positions.append(
                (
                    next_right_lane - next_car.length,   # X cordenate.
                    0.5 * (self.p['a']  - self.p['l'])   # Y cordenate.
                )
            )
            
            next_right_lane -= next_car.length + next_car.front_min_dist
       
        return positions

    def _place_traffic_lights(self):
        return [
            (   # Light controling upstream lane.
                0.5 * (self.p['a'] + self.p['l']),  # X cordenate.
                0.5 * self.p['a'] + self.p['b']     # Y cordenate.
            ),
            (   # Light controling downstream lane.
                0.5 * (self.p['a'] - self.p['l']),  # X cordenate.
                0.5 * self.p['a'] - self.p['b']     # Y cordenate
            ),
            (   # Light controling leftstream lane.
                0.5 * self.p['a'] - self.p['b'],    # X cordenate.
                0.5 * (self.p['a'] + self.p['l'])   # Y cordenate. 
            ),
            (   # Light controling rightstream lane.
                0.5 * self.p['a'] + self.p['b'],    # X cordenate.
                0.5 * (self.p['a'] - self.p['l'])   # Y cordenate.
            )
        ]

    def _all_red(self):
        for light in self.traffic_lights:
            if light.state != 2:
                return False

        return True


In [48]:
def animation_plot_single(m, ax):    
    ax.set_title(f"Avenida t={m.t*m.p['step time']:.2f}")
    
    colors = ["green", "yellow", "red"]
    
    pos_s1 = m.intersection.positions[m.traffic_lights[0]]
    ax.scatter(*pos_s1, s=20, c=colors[m.traffic_lights[0].state])
    
    pos_s2 = m.intersection.positions[m.traffic_lights[1]]
    ax.scatter(*pos_s2, s=20, c=colors[m.traffic_lights[1].state])

    pos_s3 = m.intersection.positions[m.traffic_lights[2]]
    ax.scatter(*pos_s3, s=20, c=colors[m.traffic_lights[2].state])
    
    pos_s4 = m.intersection.positions[m.traffic_lights[3]]
    ax.scatter(*pos_s4, s=20, c=colors[m.traffic_lights[3].state])
    
    ax.set_xlim(0, m.intersection.shape[0])
    ax.set_ylim(0, m.intersection.shape[1])
    
    for car in m.cars:
        pos_c = m.intersection.positions[car]    
        ax.scatter(*pos_c, s=20, c="black")
    
    ax.set_axis_off()
    ax.set_aspect('equal', 'box')
        
def animation_plot(m, p):    
    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(111)
    animation = ap.animate(m(p), fig, ax, animation_plot_single)
    return IPython.display.HTML(animation.to_jshtml(fps=20))

In [49]:
param = {
    "car": {
        "amount": 10,
        "length range": (2.5, 4.5),
        "max vel range": (1.0, 2.0),
        "min dist range": (3.0, 5.0)
    },
    'traffic lights': {
        'amount': 4,
        'time yellow': 0.5,
        'time green': 1
    },
    'a': 200,
    'b': 5,
    'l': 3.6,
    'steps': 1000, 
    'step time': 0.01,
    'density': (0.20, 0.42, 0.23, 0.15),
    'dead time': 0.3
}

In [50]:
model = TrafficModel(param)
result = model.run()

Completed: 1000 steps
Run time: 0:00:00.677586
Simulation finished


In [51]:
print(result.reporters['crashed'][0])

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [52]:
animation_plot(TrafficModel, param)