In [1]:
# src/pipelines/pipeline_layout_opt.py
import ast
import random
from typing import Dict, List, Optional, Tuple

import numpy as np
import optuna
import pandas as pd

from src.models.affinity import AffinityBuilder
from src.models.ga_optimizer import GlobalLayoutOptimizer, TwoZoneLayoutOptimizer
from src.models.greedy import GreedyLayout
from src.models.forbidden_pairs import get_forbidden_pairs

from src.plots import LayoutVisualizer
from src.config import PROCESSED_DATA_DIR, INTERIM_DATA_DIR
from src.preprocess import DataLoader


# =============== Pipeline ===============
class LayoutOptimizationPipeline:
    def __init__(
        self,
        data: DataLoader,
        n_trials: int = 30,
        n_gen_final: int = 80,
        use_optuna: bool = True,
        output_path: Optional[str] = None,
        selection: str = "tournament",
        crossover: str = "PMX",
        mutation: str = "shuffle",
        adaptive: bool = True,
        seed: int = 42,
    ):
        self.data = data
        self.n_trials = n_trials
        self.n_gen_final = n_gen_final
        self.use_optuna = use_optuna
        self.output_path = output_path or (
            PROCESSED_DATA_DIR / "layout_real_mapped.csv"
        )
        self.selection, self.crossover, self.mutation = selection, crossover, mutation
        self.adaptive = adaptive
        self.seed = seed
        np.random.seed(seed)
        random.seed(seed)

        self.all_items = data.all_items
        self.refrig_cats = data.refrig_cats

        self.affinity_builder = AffinityBuilder(
            self.data.assoc_rules,
            self.data.freq_itemsets,
            self.data.all_items,
            self.data.margin_matrix,
        )

        # holders
        self.ga_logbook: Optional[pd.DataFrame] = None
        self.best_logbook: Optional[pd.DataFrame] = None

    # ---- helpers ----
    def _coords_and_entrance(
        self, override_entr_xy: Optional[Tuple[float, float]] = None
    ):
        slots = self.data.sorted_slots_xy()
        coords = list(
            zip(slots["x"] + slots["width"] * 0.5, slots["y"] + slots["height"] * 0.5)
        )
        if override_entr_xy is not None:
            return coords, tuple(override_entr_xy)

        df = self.data.layout_real
        if (
            "is_entrance" in df.columns
            and df["is_entrance"].fillna(0).astype(int).any()
        ):
            row = (
                df.loc[df["is_entrance"].fillna(0).astype(int) == 1]
                .sort_values(["y", "x"])
                .iloc[0]
            )
            ex = float(row["x"]) + float(row.get("width", 0)) * 0.5
            ey = float(row["y"]) + float(row.get("height", 0)) * 0.5
            return coords, (ex, ey)

        name_col = df["Category"].astype(str).str.lower()
        hits = name_col.isin({"entrance", "entry", "door", "cửa vào", "lối vào"})
        if hits.any():
            row = df.loc[hits].sort_values(["y", "x"]).iloc[0]
            ex = float(row["x"]) + float(row.get("width", 0)) * 0.5
            ey = float(row["y"]) + float(row.get("height", 0)) * 0.5
            return coords, (ex, ey)

        row = df.sort_values(["y", "x"]).iloc[0]
        ex = float(row["x"]) + float(row.get("width", 0)) * 0.5
        ey = float(row["y"]) + float(row.get("height", 0)) * 0.5
        return coords, (ex, ey)

    def _cat_support(self) -> Dict[str, float]:
        cs = {c: 0.0 for c in self.all_items}
        df = self.data.freq_itemsets
        if "items" in df.columns and "support" in df.columns:
            for _, r in df.iterrows():
                try:
                    items = ast.literal_eval(r["items"])
                except Exception:
                    continue
                sup = float(r["support"])
                for it in items:
                    if it in cs:
                        cs[it] = max(cs[it], sup)
        return cs

    def _pairs_list(self, affinity: pd.DataFrame, threshold: float):
        pairs = []
        for a in self.all_items:
            for b in self.all_items:
                if a >= b:
                    continue
                w = float(affinity.loc[a, b])
                if w >= threshold:
                    pairs.append((a, b, w))
        return pairs

    def _seed_layout_real(self) -> List[str]:
        known = set(self.all_items)
        return (
            self.data.sorted_slots_xy()["Category"]
            .astype(str)
            .apply(lambda x: x if x in known else None)
            .dropna()
            .tolist()
        )

    def _filter_to_slot_len(self, layout: List[str]) -> List[str]:
        return layout[: len(self.data.positions)]

    # ---- Optuna objective ----
    def _objective(self, trial: optuna.trial.Trial) -> float:
        # Affinity params
        lift_threshold = trial.suggest_float("lift_threshold", 0.0, 2.0)
        w_lift = trial.suggest_float("w_lift", 0.1, 1.0)
        w_conf = trial.suggest_float("w_conf", 0.0, 1.0)
        w_margin = trial.suggest_float("w_margin", 0.0, 1.0)
        gamma = trial.suggest_float("gamma", 0.5, 4.0)

        # Mixed fitness
        w_aff = trial.suggest_float("w_aff", 0.5, 2.0)
        w_pair = trial.suggest_float("w_pair", 0.0, 2.0)
        w_entr = trial.suggest_float("w_entr", 0.0, 2.0)
        gamma_support = trial.suggest_float("gamma_support", 0.5, 1.5)
        pair_threshold = trial.suggest_float("pair_threshold", 0.5, 0.9)

        # GA params
        pop_size = trial.suggest_int("pop_size", 120, 500, step=40)
        elite_ratio = trial.suggest_float("elite_ratio", 0.04, 0.1)
        penalty = trial.suggest_int("penalty", 20, 150)

        # normalize weights
        s = w_lift + w_conf + w_margin
        w_lift, w_conf, w_margin = w_lift / s, w_conf / s, w_margin / s

        # Build affinity
        affinity = self.affinity_builder.build_affinity(
            lift_threshold=lift_threshold,
            w_lift=w_lift,
            w_conf=w_conf,
            w_margin=w_margin,
        )
        affinity = self.affinity_builder.normalize(affinity)
        affinity = self.affinity_builder.kernelize(affinity, gamma=gamma)

        # Seeds & inputs
        greedy = GreedyLayout(self.all_items)
        layout_greedy = greedy.local_search(greedy.init_layout(affinity), affinity)
        coords, entr_xy = self._coords_and_entrance()
        cat_support = self._cat_support()
        pairs_list = self._pairs_list(affinity, threshold=pair_threshold)
        forbidden_pairs = get_forbidden_pairs(affinity, self.all_items)
        baseline = self._seed_layout_real()

        # Choose engine
        if len(self.refrig_cats) > 0:
            ga = TwoZoneLayoutOptimizer(
                all_items=self.all_items,
                refrig_cats=self.refrig_cats,
                affinity_matrix=affinity,
                forbidden_pairs=forbidden_pairs,
                penalty_forbidden=penalty,
                hard_rules={},
                coords=coords,
                entr_xy=entr_xy,
                cat_support=cat_support,
                pairs_list=pairs_list,
                w_aff=w_aff,
                w_pair=w_pair,
                w_entr=w_entr,
                gamma_support=gamma_support,
                selection=self.selection,
                crossover=self.crossover,
                mutation=self.mutation,
                anchor_start=None,
            )
            best_layout, best_fitness, logbook = ga.run(
                ngen=30,
                pop_size=pop_size,
                seed=self.seed,
                elite_ratio=elite_ratio,
                adaptive=self.adaptive,
                return_all=False,
                baseline=baseline or layout_greedy,
                # lưu log thử nghiệm vào trial
                log_csv_path=None,
                as_dataframe=True,
            )
        else:
            ga = GlobalLayoutOptimizer(
                all_items=self.all_items,
                affinity_matrix=affinity,
                forbidden_pairs=forbidden_pairs,
                penalty_forbidden=penalty,
                hard_rules={},
                coords=coords,
                entr_xy=entr_xy,
                cat_support=cat_support,
                pairs_list=pairs_list,
                w_aff=w_aff,
                w_pair=w_pair,
                w_entr=w_entr,
                gamma_support=gamma_support,
                selection=self.selection,
                crossover=self.crossover,
                mutation=self.mutation,
            )
            best_layout, best_fitness, logbook = ga.run(
                ngen=30,
                pop_size=pop_size,
                seed=self.seed,
                elite_ratio=elite_ratio,
                adaptive=self.adaptive,
                return_all=False,
                baseline=baseline or layout_greedy,
                log_csv_path=None,
                as_dataframe=True,
            )

        # gắn logbook vào trial để xem lại
        try:
            log_df = getattr(ga, "logbook_df", None)
            if log_df is not None:
                trial.set_user_attr("logbook", log_df.to_dict("records"))
            else:
                trial.set_user_attr("logbook", logbook)
        except Exception:
            pass

        trial.set_user_attr("best_layout", self._filter_to_slot_len(best_layout))
        return float(best_fitness)

    # ---- Public API ----
    def tune(self):
        sampler = optuna.samplers.TPESampler(seed=self.seed)
        study = optuna.create_study(direction="maximize", sampler=sampler)
        study.optimize(self._objective, n_trials=self.n_trials)
        self.study = study
        self.best_params = study.best_params
        self.best_layout = study.best_trial.user_attrs["best_layout"]
        # lưu logbook của best trial (nếu có)
        try:
            self.best_logbook = pd.DataFrame(
                study.best_trial.user_attrs.get("logbook", [])
            )
        except Exception:
            self.best_logbook = None

        print("Best params:", self.best_params)
        print("Best layout (from Optuna):", self.best_layout)
        return study

    def run_final(self):
        if not hasattr(self, "best_params"):
            raise RuntimeError("Hãy gọi tune() trước.")

        p = self.best_params
        affinity = self.affinity_builder.build_affinity(
            lift_threshold=p["lift_threshold"],
            w_lift=p["w_lift"],
            w_conf=p["w_conf"],
            w_margin=p["w_margin"],
        )
        affinity = self.affinity_builder.normalize(affinity)
        affinity = self.affinity_builder.kernelize(affinity, gamma=p["gamma"])

        coords, entr_xy = self._coords_and_entrance()
        cat_support = self._cat_support()
        pairs_list = self._pairs_list(affinity, threshold=p["pair_threshold"])
        forbidden_pairs = get_forbidden_pairs(affinity, self.all_items)
        baseline = self._seed_layout_real()

        # chọn engine
        if len(self.refrig_cats) > 0:
            ga = TwoZoneLayoutOptimizer(
                all_items=self.all_items,
                refrig_cats=self.refrig_cats,
                affinity_matrix=affinity,
                forbidden_pairs=forbidden_pairs,
                penalty_forbidden=p["penalty"],
                hard_rules={},
                coords=coords,
                entr_xy=entr_xy,
                cat_support=cat_support,
                pairs_list=pairs_list,
                w_aff=p["w_aff"],
                w_pair=p["w_pair"],
                w_entr=p["w_entr"],
                gamma_support=p["gamma_support"],
                selection=self.selection,
                crossover=self.crossover,
                mutation=self.mutation,
            )
        else:
            ga = GlobalLayoutOptimizer(
                all_items=self.all_items,
                affinity_matrix=affinity,
                forbidden_pairs=forbidden_pairs,
                penalty_forbidden=p["penalty"],
                hard_rules={},
                coords=coords,
                entr_xy=entr_xy,
                cat_support=cat_support,
                pairs_list=pairs_list,
                w_aff=p["w_aff"],
                w_pair=p["w_pair"],
                w_entr=p["w_entr"],
                gamma_support=p["gamma_support"],
                selection=self.selection,
                crossover=self.crossover,
                mutation=self.mutation,
            )

        # lưu logbook final ra CSV
        log_csv = PROCESSED_DATA_DIR / "ga_logbook_final.csv"

        best_layout, best_fitness, logbook = ga.run(
            ngen=self.n_gen_final,
            pop_size=p["pop_size"],
            seed=self.seed,
            elite_ratio=p["elite_ratio"],
            adaptive=self.adaptive,
            return_all=False,
            baseline=baseline,
            log_csv_path=str(log_csv),
            as_dataframe=True,
        )

        # lưu logbook để vẽ
        self.ga_logbook = getattr(ga, "logbook_df", None)
        if self.ga_logbook is None:
            try:
                self.ga_logbook = pd.DataFrame(logbook)
            except Exception:
                self.ga_logbook = None

        # Xuất file theo slot (y,x)
        best_layout = [str(c) for c in self._filter_to_slot_len(best_layout)]
        slots = self.data.sorted_slots_xy()
        n = min(len(best_layout), len(slots))
        layout_opt = pd.DataFrame(
            {
                "Category": best_layout[:n],
                "x": slots.loc[: n - 1, "x"].to_list(),
                "y": slots.loc[: n - 1, "y"].to_list(),
                "width": slots.loc[: n - 1, "width"].to_list(),
                "height": slots.loc[: n - 1, "height"].to_list(),
            }
        )
        layout_opt["cx"] = layout_opt["x"] + layout_opt["width"] / 2.0
        layout_opt["cy"] = layout_opt["y"] + layout_opt["height"] / 2.0

        layout_opt.to_csv(self.output_path, index=False)
        self.affinity = affinity
        self.layout_opt = layout_opt
        self.best_fitness = best_fitness

        print("\nBest layout:", best_layout)
        print(f"Best fitness: {best_fitness:.4f}")
        return layout_opt, best_fitness

    def plot_all(self):
        if not hasattr(self, "layout_opt"):
            print("Hãy chạy run_final() trước khi plot.")
            return
        LayoutVisualizer.plot_affinity_heatmap(self.affinity)
        LayoutVisualizer.plot_affinity_bar(self.affinity)
        # Ưu tiên logbook final; nếu không có thì dùng best_logbook từ tune()
        log_df = self.ga_logbook if self.ga_logbook is not None else self.best_logbook
        if log_df is not None and not log_df.empty:
            LayoutVisualizer.plot_ga_convergence(log_df)
        LayoutVisualizer.plot_spring_layout(self.affinity, threshold=0.8)


# =============== Example usage ===============
if __name__ == "__main__":
    df = DataLoader(
        assoc_rules_path=PROCESSED_DATA_DIR / "association_rules.csv",
        freq_itemsets_path=PROCESSED_DATA_DIR / "frequent_itemsets.csv",
        layout_real_path=INTERIM_DATA_DIR / "layout.csv",
        margin_matrix_path=None,
    )

    pipeline = LayoutOptimizationPipeline(
        data=df,
        n_trials=20,
        n_gen_final=50,
        selection="tournament",
        crossover="PMX",
        mutation="shuffle",
        adaptive=True,
        seed=42,
    )

    pipeline.tune()
    pipeline.run_final()
    pipeline.plot_all()

  from .autonotebook import tqdm as notebook_tqdm


ModuleNotFoundError: No module named 'deap'

In [None]:
# src/plots_ga.py — CLASS VERSION
from pathlib import Path
from typing import Dict, Optional, Tuple
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors


class GAPlotter:
    def __init__(
        self, padding_ratio: float = 0.06, dpi: int = 220, cmap_name: str = "tab20"
    ):
        self.padding_ratio = padding_ratio
        self.dpi = dpi
        self.cmap_name = cmap_name

    # ---------- public API ----------
    def visualize_layout(
        self,
        df_layout: pd.DataFrame,
        out_png: Path,
        cell_size: Optional[float] = None,
        title: str = "Layout (GA) — preview",
        label_fontsize: int = 4,
        show_labels: bool = True,
    ) -> None:
        cs = cell_size or self._estimate_cell_size(df_layout)
        grid, id2name = self._rasterize_grid(df_layout, cs)
        self._draw_grid(
            grid, id2name, title, out_png, label_fontsize if show_labels else 0
        )

    def compare_layouts(
        self,
        df_before: pd.DataFrame,
        df_after: pd.DataFrame,
        out_png: Path,
        titles: Tuple[str, str] = ("Trước", "Sau"),
        cell_size_before: Optional[float] = None,
        cell_size_after: Optional[float] = None,
        label_mode: str = "both",  # "left" | "right" | "both" | "none"
        label_fontsize: int = 4,
    ) -> None:
        shared_map = self._build_shared_mapping(df_before, df_after)

        cs_a = cell_size_before or self._estimate_cell_size(df_before)
        grid_a, id2name = self._rasterize_grid(df_before, cs_a, name2id=shared_map)

        cs_b = cell_size_after or self._estimate_cell_size(df_after)
        grid_b, _ = self._rasterize_grid(df_after, cs_b, name2id=shared_map)

        H1, W1 = grid_a.shape
        H2, W2 = grid_b.shape
        max_id = int(max(np.max(grid_a, initial=0), np.max(grid_b, initial=0)))

        colors = ["#F9F9F9"] + [
            plt.cm.get_cmap(self.cmap_name, max(1, max_id))(i) for i in range(max_id)
        ]
        cmap = mcolors.ListedColormap(colors)
        bounds = list(range(0, max_id + 2))
        norm = mcolors.BoundaryNorm(bounds, cmap.N)

        fig_w = min(30, 18 * ((W1 + W2) / max(1, max(H1, H2))))
        fig_h = min(30, 18)
        fig, axs = plt.subplots(1, 2, figsize=(fig_w, fig_h))

        axs[0].imshow(grid_a, cmap=cmap, norm=norm, interpolation="none")
        axs[0].set_title(titles[0], fontsize=6)
        self._axes_off(axs[0])

        axs[1].imshow(grid_b, cmap=cmap, norm=norm, interpolation="none")
        axs[1].set_title(titles[1], fontsize=6)
        self._axes_off(axs[1])

        if label_mode in ("left", "both"):
            self._draw_labels(axs[0], grid_a, id2name, fontsize=label_fontsize)
        if label_mode in ("right", "both"):
            self._draw_labels(axs[1], grid_b, id2name, fontsize=label_fontsize)

        plt.savefig(out_png, dpi=self.dpi, bbox_inches="tight")
        plt.close(fig)

    # ---------- internals ----------
    def _estimate_cell_size(self, df: pd.DataFrame) -> float:
        if df.empty:
            return 5
        min_dim = np.minimum(df["width"].to_numpy(), df["height"].to_numpy())
        pos = min_dim[min_dim > 0]
        if pos.size == 0:
            return 5
        return max(1, int(np.median(pos) / 4))

    def _build_shared_mapping(
        self, df_a: pd.DataFrame, df_b: Optional[pd.DataFrame] = None
    ) -> Dict[str, int]:
        cats_a = pd.Index(df_a["Category"].astype(str).unique())
        cats = (
            cats_a.union(pd.Index(df_b["Category"].astype(str).unique()))
            if df_b is not None
            else cats_a
        )
        return {c: i + 1 for i, c in enumerate(cats)}

    def _rasterize_grid(
        self,
        df: pd.DataFrame,
        cell_size: float,
        name2id: Optional[Dict[str, int]] = None,
    ):
        if name2id is None:
            name2id = self._build_shared_mapping(df)
        id2name = {v: k for k, v in name2id.items()}

        x0, y0 = df["x"].min(), df["y"].min()
        x1, y1 = (df["x"] + df["width"]).max(), (df["y"] + df["height"]).max()
        pad_x, pad_y = int((x1 - x0) * self.padding_ratio), int(
            (y1 - y0) * self.padding_ratio
        )
        min_x, min_y = x0 - pad_x, y0 - pad_y
        max_x, max_y = x1 + pad_x, y1 + pad_y

        W = int(math.ceil((max_x - min_x) / cell_size))
        H = int(math.ceil((max_y - min_y) / cell_size))
        if W * H > 10_000_000:  # RAM guard
            scale = ((W * H) / 10_000_000) ** 0.5
            cell_size *= scale
            W = int(math.ceil((max_x - min_x) / cell_size))
            H = int(math.ceil((max_y - min_y) / cell_size))

        grid = np.zeros((H, W), dtype=np.int32)
        for _, r in df.iterrows():
            did = name2id[str(r["Category"])]
            gx0 = int((r["x"] - min_x) // cell_size)
            gx1 = int(math.ceil((r["x"] + r["width"] - min_x) / cell_size))
            gy0 = int((r["y"] - min_y) // cell_size)
            gy1 = int(math.ceil((r["y"] + r["height"] - min_y) / cell_size))
            grid[gy0:gy1, gx0:gx1] = did
        return grid, id2name

    def _draw_grid(
        self,
        grid: np.ndarray,
        id2name: Dict[int, str],
        title: str,
        out_png: Path,
        label_fontsize: int,
    ):
        H, W = grid.shape
        unique_ids = np.unique(grid)
        max_id = int(unique_ids.max()) if unique_ids.size else 0

        colors = ["#F9F9F9"] + [
            plt.cm.get_cmap(self.cmap_name, max(1, max_id))(i) for i in range(max_id)
        ]
        cmap = mcolors.ListedColormap(colors)
        bounds = list(range(0, max_id + 2))
        norm = mcolors.BoundaryNorm(bounds, cmap.N)

        fig_w = min(30, 18 * (W / max(1, H)))
        fig_h = min(30, 18)
        fig, ax = plt.subplots(figsize=(fig_w, fig_h))
        ax.imshow(grid, cmap=cmap, norm=norm, interpolation="none")
        if label_fontsize > 0:
            self._draw_labels(ax, grid, id2name, fontsize=label_fontsize)

        ax.set_title(title, fontsize=6)
        self._axes_off(ax)
        plt.savefig(out_png, dpi=self.dpi, bbox_inches="tight")
        plt.close(fig)

    @staticmethod
    def _draw_labels(ax, grid: np.ndarray, id2name: Dict[int, str], fontsize: int = 4):
        for did in np.unique(grid[grid > 0]):
            ys, xs = np.where(grid == did)
            if xs.size == 0:
                continue
            cx, cy = xs.mean(), ys.mean()
            name = id2name.get(did, f"ID {did}")
            ax.text(cx, cy, name, va="center", ha="center", fontsize=fontsize)

    @staticmethod
    def _axes_off(ax):
        ax.grid(False)
        ax.tick_params(
            axis="both",
            which="both",
            bottom=False,
            left=False,
            labelbottom=False,
            labelleft=False,
        )


from src.config import OUTPUT_DATA_DIR

plotter = GAPlotter(padding_ratio=0.06, dpi=220)
df_before = pipeline.data.layout_real.copy()
df_after = pipeline.layout_opt.copy()
# Preview một layout
plotter.visualize_layout(df_after, OUTPUT_DATA_DIR / "ga_preview.png", show_labels=True)

plotter.compare_layouts(
    df_before,
    df_after,
    OUTPUT_DATA_DIR / "ga_compare.png",
    titles=("Trước", "Sau"),
    label_mode="both",
    label_fontsize=5,
)