In [3]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.patches import Rectangle, Circle
import random
from deap import base, creator, tools, algorithms
import seaborn as sns
from typing import List, Tuple
from dataclasses import dataclass

@dataclass
class TrafficLight:
    state: str = 'red'
    timer: int = 0

    def update(self, timing: int) -> None:
        self.timer += 1
        if self.timer >= timing:
            self.state = 'green' if self.state == 'red' else 'red'
            self.timer = 0

@dataclass
class Car:
    x: int
    y: int
    direction: Tuple[int, int]

    def move(self, city: 'City') -> None:
        nx, ny = (self.x + self.direction[0]) % city.size, (self.y + self.direction[1]) % city.size

        if (nx, ny) in city.intersections:
            intersection_index = city.intersections.index((nx, ny))
            if city.traffic_lights[intersection_index].state == 'green':
                self.x, self.y = nx, ny
        else:
            self.x, self.y = nx, ny

        if random.random() < 0.1:
            self.direction = random.choice([(0, 1), (0, -1), (1, 0), (-1, 0)])

class City:
    def __init__(self, size: int, num_intersections: int):
        self.size = size
        self.num_intersections = num_intersections
        self.grid = np.zeros((size, size))
        self.intersections = self._place_intersections()
        self.traffic_lights = [TrafficLight() for _ in range(num_intersections)]
        self.cars = self._initialize_traffic()

    def _place_intersections(self) -> List[Tuple[int, int]]:
        spacing = self.size // (int(np.sqrt(self.num_intersections)) + 1)
        return [(x, y) for x in range(spacing, self.size, spacing)
                for y in range(spacing, self.size, spacing)][:self.num_intersections]

    def _initialize_traffic(self) -> List[Car]:
        return [self._create_random_car() for _ in range(self.size * 2)]

    def _create_random_car(self) -> Car:
        return Car(
            x=random.randint(0, self.size - 1),
            y=random.randint(0, self.size - 1),
            direction=random.choice([(0, 1), (0, -1), (1, 0), (-1, 0)])
        )

    def update(self, traffic_light_timings: List[int]) -> None:
        for light, timing in zip(self.traffic_lights, traffic_light_timings):
            light.update(timing)

        for car in self.cars:
            car.move(self)

        if random.random() < 0.1:
            self.cars.append(self._create_random_car())

def evaluate(individual: List[int], city: City) -> Tuple[float, float]:
    city_copy = City(city.size, city.num_intersections)
    city_copy.intersections = city.intersections.copy()
    city_copy.cars = [Car(car.x, car.y, car.direction) for car in city.cars]

    total_movement = 0
    congestion = 0

    for _ in range(50):
        city_copy.update(individual)
        total_movement += sum(1 for car in city_copy.cars if (car.x, car.y) not in city_copy.intersections)
        congestion += sum(sum(1 for car in city_copy.cars if car.x == x and car.y == y)
                          for x, y in city_copy.intersections)

    return total_movement, -congestion

def create_toolbox(num_intersections: int):
    creator.create("FitnessMulti", base.Fitness, weights=(1.0, 1.0))
    creator.create("Individual", list, fitness=creator.FitnessMulti)

    toolbox = base.Toolbox()
    toolbox.register("attr_timing", random.randint, 20, 80)
    toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_timing, n=num_intersections)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    return toolbox

def run_ga(city: City, num_generations: int = 50, population_size: int = 50) -> Tuple[List[int], tools.Logbook]:
    toolbox = create_toolbox(city.num_intersections)
    toolbox.register("evaluate", evaluate, city=city)
    toolbox.register("mate", tools.cxTwoPoint)
    toolbox.register("mutate", tools.mutUniformInt, low=20, up=80, indpb=0.1)
    toolbox.register("select", tools.selNSGA2)

    population = toolbox.population(n=population_size)
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", np.mean, axis=0)
    stats.register("std", np.std, axis=0)
    stats.register("min", np.min, axis=0)
    stats.register("max", np.max, axis=0)

    logbook = tools.Logbook()
    logbook.header = ['gen', 'nevals'] + stats.fields

    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit

    print_header()

    for gen in range(num_generations):
        offspring = algorithms.varAnd(population, toolbox, cxpb=0.7, mutpb=0.3)
        fits = toolbox.map(toolbox.evaluate, offspring)
        for fit, ind in zip(fits, offspring):
            ind.fitness.values = fit
        population = toolbox.select(population + offspring, k=len(population))
        record = stats.compile(population)
        logbook.record(gen=gen, nevals=len(offspring), **record)

        print_generation_stats(gen, record)

    return tools.selBest(population, k=1)[0], logbook

def print_header():
    print("\n╔═══════════════════════════════════════════════════════════════════════════════════════════════════╗")
    print("║                                 Genetic Algorithm Optimization                                    ║")
    print("╠═══════════╦═══════════════════╦═══════════════════╦═════════════════════╦═══════════════════╣")
    print("║ Generation ║ Avg Traffic Flow ║  Avg Congestion   ║  Best Traffic Flow  ║  Best Congestion  ║")
    print("╠═══════════╬═══════════════════╬═══════════════════╬═════════════════════╬═══════════════════╣")

def print_generation_stats(gen: int, record: dict):
    print(f"║ {gen:^9d} ║ {record['avg'][0]:^17.2f} ║ {-record['avg'][1]:^17.2f} ║ {record['max'][0]:^19.2f} ║ {-record['min'][1]:^17.2f} ║")

def print_footer():
    print("╚═══════════╩═══════════════════╩═══════════════════╩═════════════════════╩═══════════════════╝")

def animate_city(city, timings) -> FuncAnimation:
    fig, ax = plt.subplots(figsize=(10, 10))
    ax.set_xlim(0, city.size)
    ax.set_ylim(0, city.size)

    road_color = '#555555'
    for i in range(city.size):
        ax.add_patch(Rectangle((i, 0), 1, city.size, facecolor=road_color))
        ax.add_patch(Rectangle((0, i), city.size, 1, facecolor=road_color))

    intersection_patches = [Rectangle((x-0.5, y-0.5), 1, 1, facecolor='#777777')
                            for x, y in city.intersections]
    for patch in intersection_patches:
        ax.add_patch(patch)

    traffic_lights = [Circle((x, y), 0.2, facecolor='red') for x, y in city.intersections]
    for light in traffic_lights:
        ax.add_patch(light)

    car_patches = [Circle((car.x, car.y), 0.3, facecolor='blue') for car in city.cars]
    for patch in car_patches:
        ax.add_patch(patch)

    def update(frame):
        city.update(timings)

        for light, (x, y), timing in zip(traffic_lights, city.intersections, timings):
            light.center = (x, y)
            light.set_facecolor('green' if frame % timing < timing // 2 else 'red')

        for patch, car in zip(car_patches, city.cars):
            patch.center = (car.x, car.y)

        return traffic_lights + car_patches

    anim = FuncAnimation(fig, update, frames=200, interval=100, blit=True)
    plt.close(fig)
    return anim


def create_visualizations(logbook: tools.Logbook, best_solution: List[int]):
    gen = logbook.select("gen")
    fit_avgs = logbook.select("avg")
    fit_mins = logbook.select("min")
    fit_maxs = logbook.select("max")
    fit_stds = logbook.select("std")

    plt.figure(figsize=(12, 6))
    plt.plot(gen, [avg[0] for avg in fit_avgs], label="Average", color='blue')
    plt.plot(gen, [max_[0] for max_ in fit_maxs], label="Best", color='green')
    plt.fill_between(gen,
                     [avg[0] - std[0] for avg, std in zip(fit_avgs, fit_stds)],
                     [avg[0] + std[0] for avg, std in zip(fit_avgs, fit_stds)],
                     alpha=0.2, color='blue')
    plt.title("Traffic Flow Improvement Over Generations")
    plt.xlabel("Generation")
    plt.ylabel("Total Movement")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.savefig('traffic_flow_improvement.png')
    plt.close()

    plt.figure(figsize=(12, 6))
    plt.plot(gen, [-avg[1] for avg in fit_avgs], label="Average", color='red')
    plt.plot(gen, [-min_[1] for min_ in fit_mins], label="Best", color='green')
    plt.fill_between(gen,
                     [-avg[1] - std[1] for avg, std in zip(fit_avgs, fit_stds)],
                     [-avg[1] + std[1] for avg, std in zip(fit_avgs, fit_stds)],
                     alpha=0.2, color='red')
    plt.title("Congestion Reduction Over Generations")
    plt.xlabel("Generation")
    plt.ylabel("Congestion")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.savefig('congestion_reduction.png')
    plt.close()

    plt.figure(figsize=(10, 8))
    traffic_flow = [ind[0] for ind in fit_maxs]
    congestion = [-ind[1] for ind in fit_mins]
    plt.scatter(traffic_flow, congestion, c=gen, cmap='viridis')
    plt.colorbar(label='Generation')
    plt.title("Pareto Front: Traffic Flow vs. Congestion")
    plt.xlabel("Traffic Flow (Total Movement)")
    plt.ylabel("Congestion")
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.savefig('pareto_front.png')
    plt.close()

    plt.figure(figsize=(10, 6))
    sns.heatmap(np.array(best_solution).reshape(1, -1), annot=True, fmt='d', cmap='YlGnBu')
    plt.title("Best Traffic Light Timings")
    plt.xlabel("Intersection")
    plt.ylabel("Timing (seconds)")
    plt.savefig('best_solution_heatmap.png')
    plt.close()

    plt.figure(figsize=(12, 6))
    traffic_flow_data = [avg[0] for avg in fit_avgs]
    congestion_data = [-avg[1] for avg in fit_avgs]
    plt.boxplot([traffic_flow_data, congestion_data], labels=['Traffic Flow', 'Congestion'])
    plt.title("Distribution of Traffic Flow and Congestion")
    plt.ylabel("Value")
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.savefig('distribution_boxplot.png')
    plt.close()

    print("Visualizations have been created and saved.")

def print_detailed_statistics(logbook: tools.Logbook, best_solution: List[int]):
    initial_fitness = logbook.select("avg")[0]
    final_fitness = logbook.select("avg")[-1]
    best_fitness = logbook.select("max")[-1]

    print("\n╔═══════════════════════════════════════════════════════════════════════════════╗")
    print("║                             Detailed Statistics                               ║")
    print("╠═══════════════════════════════════╦═══════════════════════════════════════════╣")
    print(f"║ Initial average traffic flow      ║ {initial_fitness[0]:^41.2f} ║")
    print(f"║ Final average traffic flow        ║ {final_fitness[0]:^41.2f} ║")
    print(f"║ Best traffic flow                 ║ {best_fitness[0]:^41.2f} ║")
    print(f"║ Improvement in average traffic    ║ {(final_fitness[0] - initial_fitness[0]) / initial_fitness[0] * 100:^41.2f}% ║")
    print(f"║ Improvement in best traffic       ║ {(best_fitness[0] - initial_fitness[0]) / initial_fitness[0] * 100:^41.2f}% ║")
    print("╠═══════════════════════════════════╬═══════════════════════════════════════════╣")
    print(f"║ Initial average congestion        ║ {-initial_fitness[1]:^41.2f} ║")
    print(f"║ Final average congestion          ║ {-final_fitness[1]:^41.2f} ║")
    print(f"║ Best congestion                   ║ {-logbook.select('min')[-1][1]:^41.2f} ║")
    print(f"║ Reduction in average congestion   ║ {(-final_fitness[1] - (-initial_fitness[1])) / (-initial_fitness[1]) * 100:^41.2f}% ║")
    print(f"║ Reduction in best congestion      ║ {(-logbook.select('min')[-1][1] - (-initial_fitness[1])) / (-initial_fitness[1]) * 100:^41.2f}% ║")
    print("╠═══════════════════════════════════╩═══════════════════════════════════════════╣")
    print(f"║ Best traffic light timings: {best_solution}")
    print("╚═══════════════════════════════════════════════════════════════════════════════╝")

def main(city_size: int = 20, num_intersections: int = 4, num_generations: int = 60, population_size: int = 50):
    city = City(size=city_size, num_intersections=num_intersections)

    best_solution, logbook = run_ga(city, num_generations=num_generations, population_size=population_size)

    print_footer()
    print("\nCreating visualizations...")

    create_visualizations(logbook, best_solution)
    print_detailed_statistics(logbook, best_solution)


In [4]:
main(num_generations=60)


╔═══════════════════════════════════════════════════════════════════════════════════════════════════╗
║                                 Genetic Algorithm Optimization                                    ║
╠═══════════╦═══════════════════╦═══════════════════╦═════════════════════╦═══════════════════╣
║ Generation ║ Avg Traffic Flow ║  Avg Congestion   ║  Best Traffic Flow  ║  Best Congestion  ║
╠═══════════╬═══════════════════╬═══════════════════╬═════════════════════╬═══════════════════╣
║     0     ║      2161.66      ║       3.14        ║       2295.00       ║       12.00       ║
║     1     ║      2187.58      ║       3.06        ║       2348.00       ║       12.00       ║
║     2     ║      2200.66      ║       2.78        ║       2348.00       ║       12.00       ║
║     3     ║      2212.90      ║       2.52        ║       2348.00       ║       12.00       ║
║     4     ║      2221.12      ║       2.24        ║       2348.00       ║       9.00        ║
║     5     ║      2226.74 