In [83]:
import libsumo

PHASES_COUNT = 8
CONFIG_PATH = "../sumo_conf/scenario1/osm.sumocfg"
MIN_TIME = 5
MAX_TIME = 60


class SumoContext:
    def __enter__(self):
        libsumo.start(["sumo", "-c", CONFIG_PATH])
        return libsumo

    def __exit__(self, exc_type, exc_val, exc_tb):
        libsumo.close()


def base_program_logic():
    with SumoContext() as sumo:
        junction_tl_id = sumo.trafficlight_getIDList()[0]
        logic = sumo.trafficlight_getAllProgramLogics(junction_tl_id)[0]
        return junction_tl_id, logic


JUNCTION_TL_ID, LOGIC = base_program_logic()

In [84]:
def fitness(durations: list[int]):
    with SumoContext() as sumo:
        phases = LOGIC.getPhases()
        for i in range(PHASES_COUNT):
            phases[i].duration = float(durations[i])
            phases[i].minDur = float(durations[i])
            phases[i].maxDur = float(durations[i])
        sumo.trafficlight.setProgramLogic(JUNCTION_TL_ID, LOGIC)
        total_wt = 0
        total_veh = 0
        for i in range(3600):
            sumo.simulationStep()
            for veh_id in libsumo.vehicle.getIDList():
                if libsumo.vehicle.getSpeed(veh_id) < 0.1:
                    total_wt += 1
            total_veh += libsumo.simulation.getArrivedNumber()
        return total_wt / total_veh

In [104]:
phases_duration = list(map(lambda x: x.duration, base_program_logic()[1].getPhases()))
default_res = fitness(phases_duration)
print("Default solution: \nX = %s\nF = %s" % (phases_duration, default_res))

In [86]:
from pymoo.core.problem import ElementwiseProblem
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.algorithms.soo.nonconvex.de import DE
from pymoo.algorithms.soo.nonconvex.nelder import NelderMead
from pymoo.algorithms.soo.nonconvex.cmaes import CMAES
from pymoo.algorithms.soo.nonconvex.isres import ISRES
from pymoo.optimize import minimize
from pymoo.core.evaluator import Evaluator
from pymoo.core.population import Population
from random import randint
import numpy as np


class SumoProblem(ElementwiseProblem):
    def __init__(self):
        super().__init__(
            n_var=PHASES_COUNT,
            n_obj=1,
            xl=np.array([MIN_TIME] * PHASES_COUNT),
            xu=np.array([MAX_TIME] * PHASES_COUNT),
        )

    def _evaluate(self, x, out, *args, **kwargs):
        out["F"] = fitness(x)


problem = SumoProblem()
pop_size = 30

In [87]:
X = [
    list(map(lambda x: x.duration + randint(0, 5), base_program_logic()[1].getPhases()))
    for _ in range(pop_size)
]
X = np.array(X)
pop = Population.new("X", X)
Evaluator().eval(problem, pop)

In [88]:
ga = GA(pop_size=pop_size, eliminate_duplicates=True, sampling=pop)
ga_res = minimize(
    problem=problem, algorithm=ga, termination=("n_eval", 2500), seed=1, verbose=True
)
print("Best solution found for GA: \nX = %s\nF = %s" % (ga_res.X, ga_res.F))

In [89]:
de = DE(pop_size=pop_size, sampling=pop)
de_res = minimize(
    problem=problem, algorithm=de, termination=("n_eval", 2500), seed=1, verbose=True
)
print("Best solution found for DE: \nX = %s\nF = %s" % (de_res.X, de_res.F))

In [90]:
nel = NelderMead(sampling=pop)
nel_res = minimize(
    problem=problem, algorithm=nel, termination=("n_eval", 2500), seed=1, verbose=True
)
print("Best solution found for NelderMead: \nX = %s\nF = %s" % (nel_res.X, nel_res.F))

In [91]:
cmaes = CMAES(pop_size=pop_size)
cmaes_res = minimize(
    problem=problem, algorithm=cmaes, termination=("n_eval", 2500), seed=1, verbose=True
)
print("Best solution found for CMAES: \nX = %s\nF = %s" % (cmaes_res.X, cmaes_res.F))

In [92]:
isres = ISRES(pop_size=pop_size, sampling=pop)
isres_res = minimize(
    problem=problem, algorithm=isres, termination=("n_eval", 2500), seed=1, verbose=True
)
print("Best solution found for ISRES: \nX = %s\nF = %s" % (isres_res.X, isres_res.F))

In [105]:
# make a diagram comparing all the algorithms results
import matplotlib.pyplot as plt

x = ["DEFAULT", "GA", "DE", "NelderMead", "CMAES", "ISRES"]
y = [
    default_res,
    ga_res.F[0],
    de_res.F[0],
    nel_res.F[0],
    cmaes_res.F[0],
    isres_res.F[0],
]
plt.bar(x, y)