In [8]:
import numpy as np
import pandas as pd
import plotly.express as px
from pymoo.algorithms.moo.nsga2 import NSGA2, binary_tournament
from pymoo.core.callback import Callback
from pymoo.core.problem import Problem
from pymoo.factory import *
from pymoo.optimize import minimize


In [9]:
class SaveHistoryEach(Callback):

    def __init__(self, check_progress_each):
        super().__init__()
        self.check_progress_each = check_progress_each
        self.data["history"] = pd.DataFrame()
        

    def notify(self, algorithm):
        n_gen = algorithm.n_gen
        if n_gen % self.check_progress_each == 0:

            tmp_df = pd.DataFrame(algorithm.pop.get("F"))
            tmp_df.columns = ["F_" + str(col) for col in tmp_df.columns]
            tmp_df["n_gen"] = n_gen
            self.data["history"] = self.data["history"].append(tmp_df)

class MultiObjective:

    def __init__(self, problem, algorithm, callbacks, termination):
        self.problem = problem
        self.algorithm = algorithm
        self.callbacks = callbacks
        self.termination = termination

        self.res = minimize(
            problem=self.problem,
            algorithm=self.algorithm,
            termination=self.termination,
            callback=self.callbacks,
            seed=1
        )

        self.history_df = self.res.algorithm.callback.data["history"]
        self.history_df["n_gen"] = self.history_df["n_gen"].astype("category")

        # 最終世代の採用変数とその目的関数の値をDataFrame化
        for i, individual in enumerate(self.res.pop):

            vars = individual.X
            objs = individual.F

            value_list = []
            value_list.extend(vars)
            value_list.extend(objs)

            if i == 0:
                cols = [f"var_{num}" for num in range(len(vars))]
                cols.extend([f"F_{num}" for num in range(len(objs))])
                df = pd.DataFrame(columns=cols)

            df.loc[i] = value_list

        # 結果のDataFrame化
        self.result_df = df
        
        # 目的変数の数
        self.F_num = len(objs)

        # 最終結果の各値のMax値×2を基準点とする
        F_max = df[[col for col in df.columns if "F_" in col]].max().to_numpy()
        self.hv = get_performance_indicator("hv", ref_point=F_max*1.5)
        
        

    def visulualize_result(self):

        if self.F_num == 2:
            fig = px.scatter(self.result_df, x="F_0", y="F_1")

        if self.F_num == 3:
            fig = px.scatter_3d(self.result_df, x="F_0", y="F_1", z="F_2")
            fig.update_traces(marker_size=2)
        fig.show()

    def visulualize_F_progress(self):
        
        if self.F_num == 2:
            fig = px.scatter(self.history_df, x="F_0", y="F_1", color="n_gen")

        if self.F_num == 3:
            fig = px.scatter_3d(self.history_df, x="F_0", y="F_1", z="F_2", color="n_gen")
            fig.update_traces(marker_size=2)

        fig.show()

    def visulualize_hypervolume_progress(self):
        hypervolume_df = pd.DataFrame(columns=["hypervolume"])

        for n_gen, df in self.history_df.groupby("n_gen"):
            hypervolume = self.hv.do(df.drop("n_gen", axis=1).to_numpy())
            hypervolume_df.loc[n_gen] = hypervolume

        hypervolume_df.index.name = "n_gen"
        hypervolume_df.reset_index(inplace=True)

        fig = px.line(hypervolume_df, x="n_gen", y="hypervolume")
        fig.show()

In [11]:
problem = get_problem("dascmop7", 12)

algorithm = NSGA2(
    pop_size=100,
    sampling=get_sampling("real_random"),
    selection = get_selection("tournament", binary_tournament),
    crossover=get_crossover("real_sbx", prob=0.9, eta=15),
    mutation=get_mutation("real_pm", prob=None, eta=20),
    eliminate_duplicates=True
)

termination = get_termination("n_gen", 1000)

In [13]:
nsga2 = MultiObjective(
    problem,
    algorithm,
    SaveHistoryEach(100),
    termination
)

nsga2.visulualize_hypervolume_progress()
nsga2.visulualize_F_progress()
nsga2.visulualize_result()