# Simulación avenida con dos carriles

## Librerías

In [99]:
import agentpy as ap
import numpy as np
import matplotlib.pyplot as plt
import math
import IPython
import json

## Agentes (semaforo y auto)

In [100]:
class Semaphore(ap.Agent):
    
    def setup(self):
        self.step_time = 0.1        # Tiempo que dura cada paso de la simulación

        self.direction = [0, 1]     # Direccion

        self.state = 0              # Estado 0 = verde, 1 = amarillo, 2 = rojo
        self.state_time = 0         # Duracion del estado actual

        self.green_duration = 5     # Duracion de luz verde
        self.yellow_duration = 3    # Duracion de luz amarillo
        self.red_duration = 10      # Duracion de luz rojo        

    # Actualizar estado de acuerdo a su duracion
    def update(self):
        self.state_time += self.step_time #?

        if self.state == 0: # Esta en verde
            if self.state_time >= self.green_duration: #Si se supero el tiempo de duracion de la luz
                self.state = 1 # Se cambia a amarillo
                self.state_time = 0 # Se inicializa duracion de amarillo a 0
        elif self.state == 1: # Esta en amarillo
            if self.state_time >= self.yellow_duration:  #Si se supero el tiempo de duracion de la luz
                self.state = 2 # Se cambia a rojo
                self.state_time = 0  # Se inicializa duracion de rojo a 0
        elif self.state == 2: # Esta en rojo
            if self.state_time >= self.red_duration:  #Si se supero el tiempo de duracion de la luz
                self.state = 0 # Se cambia a verde
                self.state_time = 0 # Se inicializa duracion de verde a 0



In [101]:
class Car(ap.Agent):
    def setup(self):
        self.step_time = 0.1    # Tiempo que dura cada paso de la simulación

        self.direction = [1, 0] # Dirección
        self.speed = 0.0        # Velocidad inicial en m/s
        self.max_speed = 2      # Máxima velocidad en m/s
        self.state = 1          # Car state: 1 = en movimiento, 0 = detenido

    def update_position(self):
        if self.state == 0: # Si el auto se ha detenido
            return

         # Actualiza la posición según la velocidad actual
        self.model.avenue.move_by(self, [self.speed*self.direction[0], self.speed*self.direction[1]])

    def update_speed(self):
        if self.state == 0: # Si el auto se ha detenido
            return
        
        # Obten la distancia más pequeña a uno de los autos que vaya en la misma dirección        
        p = self.model.avenue.positions[self]

        min_car_distance = 100 
        for car in self.model.cars:
            if car != self:
                # Verifica si el carro va en la misma dirección
                dot_p1 = self.direction[0]*car.direction[0] + self.direction[1]*car.direction[1]                
                
                # Verifica si el carro está atrás o adelante
                p2 = self.model.avenue.positions[car]
                dot_p2 = (p2[0]-p[0])*self.direction[0] + (p2[1]-p[1])*self.direction[1]

                if dot_p1 > 0 and dot_p2 > 0: #?                    
                    d = math.sqrt((p[0]-p2[0])**2 + (p[1]-p2[1])**2)                    
                    
                    if min_car_distance > d:
                        min_car_distance = d
       
        # Obten la distancia al próximo semáforo
        min_semaphore_distance = 100
        semaphore_state = 0
        for semaphore in self.model.semaphores:

            # Verifica si el semáforo apunta hacia el vehículo
            dot_p1 = semaphore.direction[0]*self.direction[0] + semaphore.direction[1]*self.direction[1]
            
            # Verifica si el semáforo está adelante o atrás del vehículo
            p2 = self.model.avenue.positions[semaphore]
            dot_p2 = (p2[0]-p[0])*self.direction[0] + (p2[1]-p[1])*self.direction[1]

            if dot_p1 < 0 and dot_p2 > 0: #?                            
                d = math.sqrt((p[0]-p2[0])**2 + (p[1]-p2[1])**2)  
                
                if min_semaphore_distance > d:
                    min_semaphore_distance = d
                    semaphore_state = semaphore.state
        
        # Estado del semáforo 0 = verde, 1 = amarillo, 2 = rojo
        # Actualiza la velocidad del auto
        if min_car_distance < 2:
            self.speed = 0
            self.state = 1

        elif min_car_distance < 5: #20
              self.speed = np.maximum(self.speed - 200*self.step_time, 0)

        elif min_car_distance < 30: #50
              self.speed = np.maximum(self.speed - 80*self.step_time, 0)
                
        elif min_semaphore_distance < 20 and semaphore_state == 1: #40
            self.speed = np.minimum(self.speed + 5*self.step_time, self.max_speed)

        elif min_semaphore_distance < 30 and semaphore_state == 1: #50
            self.speed = np.maximum(self.speed - 20*self.step_time, 0)
            
        elif min_semaphore_distance < 80 and semaphore_state == 2: #100
            self.speed = np.maximum(self.speed - 80*self.step_time, 0)

        else:
            self.speed = np.minimum(self.speed + 5*self.step_time, self.max_speed)
                    


## Modelo (avenida)

In [102]:
class AvenueModel(ap.Model):
    
    def setup(self):
        # Inicializa los autos     
        self.cars = ap.AgentList(self, self.p.cars, Car)
        self.cars.step_time =  self.p.step_time
        
        self.count = 0
        
        c_north = int(self.p.cars/2)
        c_south = self.p.cars - c_north

        for k in range(c_north):
            self.cars[k].direction = [0,1]

        for k in range(c_south):
            self.cars[k+c_north].direction = [0,-1]

        # Inicializa los semaforos  
        self.semaphores = ap.AgentList(self,2, Semaphore)
        self.semaphores.step_time =  self.p.step_time
        self.semaphores.green_duration = self.p.green
        self.semaphores.yellow_duration = self.p.yellow
        self.semaphores.red_duration = self.p.red
        self.semaphores[0].direction = [0, 1]
        self.semaphores[1].direction = [0, -1]

        # Inicializa el entorno
        self.avenue = ap.Space(self, shape=[60, self.p.size], torus = True)
        
        # Agrega los autos al entorno
        self.avenue.add_agents(self.cars, random=True)
        for k in range(c_north):
            self.avenue.move_to(self.cars[k], [40, 10*(k+1)])
        
        for k in range(c_south):
            self.avenue.move_to(self.cars[k+c_north], [20, self.p.size - (k+1)*10])
                
        # Agrega los semáforos al entorno
        self.avenue.add_agents(self.semaphores, random=True)
        self.avenue.move_to(self.semaphores[0], [0, self.p.size*0.5 + 5])  #10
        self.avenue.move_to(self.semaphores[1], [20, self.p.size*0.5 - 5]) #50

        # Guardar datos iniciales en archivo json
        self.data = {}
        self.data['cars'] = []
        self.data['semaphores'] = []
        self.data['frames'] = []
        with open('data.json', 'w') as f:
            for semaphore in self.semaphores:
                self.data['semaphores'].append({'id': semaphore.id, 'dir': semaphore.direction[1]})
                
            for car in self.cars:
                pos_c = self.avenue.positions[car]   
                self.data['cars'].append({'id': car.id, 'dir': car.direction[1], 'x': pos_c[0], 'z': pos_c[1]})
        
            json.dump(self.data, f)
    

    def step(self):
        # Actualziar agentes
        self.semaphores.update()
        self.cars.update_position()
        self.cars.update_speed()
        
        # Agregar informacion del frame a archivo json
        cars = []
        semaphores = []
        semaphores.append({'id': self.semaphores[0].id, 'state': self.semaphores[0].state})
        semaphores.append({'id': self.semaphores[1].id, 'state': self.semaphores[1].state})
        
        for car in self.cars:
            pos_c = self.avenue.positions[car]   
            cars.append({'id': car.id, 'x': pos_c[0], 'z': pos_c[1]})
        
        self.data['frames'].append({'frame': self.count, 'cars': cars, 'semaphores': semaphores, }) 
        
        with open('data.json', 'w') as f:
            json.dump(self.data, f)
        
        self.count += 1
        
    def end(self):
        crashed_cars = len(self.cars.select(self.cars.state == 1))
        self.report('Number of crashed cars ',crashed_cars)
        
        

## Parametros iniciales

In [103]:
parameters = {
    'step_time': 0.1,    
    'size': 500,        # Tamaño de la avenida (m)
    'green': 5,         # Duración de la luz verde
    'yellow': 3,        # Duración de la luz amarilla
    'red': 10,          # Duración de la luz roja
    'cars': 7,          # Número de autos en la simulación
    'steps': 1000,      # Número de pasos de la simulación
}

In [104]:
model = AvenueModel(parameters)
results = model.run()
print(results.reporters)

Completed: 1000 steps
Run time: 0:01:36.105831
Simulation finished
                                      seed  Number of crashed cars 
0  212210823493672200240660004144598320180                        7
