In [None]:
import numpy as np
import matplotlib.pyplot as plt
from pymoo.core.problem import Problem
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.termination.default import DefaultSingleObjectiveTermination
from pymoo.optimize import minimize
from pymoo.operators.crossover.ox import OrderCrossover
from pymoo.operators.crossover.ux import UniformCrossover
from pymoo.core.mutation import Mutation
from pymoo.core.crossover import Crossover
from pymoo.core.sampling import Sampling

class CustomHybridMutation(Mutation):
    def __init__(self, prob=0.1):
        super().__init__()
        self.prob = prob

    def _do(self, problem, X, **kwargs):
        n_individuals, n_var = X.shape
        seq_length = problem.seq_length
        n_stages = problem.n_stages
        Y = X.copy()

        for i in range(n_individuals):
            if np.random.random() < self.prob:
                idx1, idx2 = np.random.choice(seq_length, size=2, replace=False)
                Y[i, :seq_length][[idx1, idx2]] = Y[i, :seq_length][[idx2, idx1]]

            machine_choices = Y[i, seq_length:].reshape(seq_length, n_stages)
            for j in range(seq_length):
                for s in range(n_stages):
                    if np.random.random() < self.prob:
                        machine_choices[j, s] = np.random.randint(0, problem.machines_per_stage[s])
            Y[i, seq_length:] = machine_choices.flatten()

        return Y.astype(int)

class CustomHybridCrossover(Crossover):
    def __init__(self, prob=0.5):
        super().__init__(2, 2)
        self.prob = prob
        self.order_cx = OrderCrossover()
        self.uniform_cx = UniformCrossover(prob=prob)

    def _do(self, problem, X, **kwargs):
        n_parents, n_matings, n_var = X.shape
        seq_length = problem.seq_length
        Y = np.zeros_like(X)

        for k in range(n_matings):
            a, b = X[:, k, :]
            perm_a, perm_b = self.order_cx._do(problem, np.array([[a[:seq_length]], [b[:seq_length]]]))[:, 0, :]
            machine_a, machine_b = self.uniform_cx._do(problem, np.array([[a[seq_length:]], [b[seq_length:]]]))[:, 0, :]
            Y[0, k, :seq_length] = perm_a
            Y[1, k, :seq_length] = perm_b
            Y[0, k, seq_length:] = machine_a
            Y[1, k, seq_length:] = machine_b

        return Y.astype(int)

class CustomHybridSampling(Sampling):
    def _do(self, problem, n_samples, **kwargs):
        seq_length = problem.seq_length
        n_stages = problem.n_stages
        X = np.zeros((n_samples, problem.n_var), dtype=int)

        for i in range(n_samples):
            X[i, :seq_length] = np.random.permutation(seq_length)
            machine_choices = np.zeros((seq_length, n_stages), dtype=int)
            for j in range(seq_length):
                for s in range(n_stages):
                    machine_choices[j, s] = np.random.randint(0, problem.machines_per_stage[s])
            X[i, seq_length:] = machine_choices.flatten()

        return X

class BakeryHybridSchedulingProblem(Problem):
    def __init__(self, user_sequence, recipe_ids, recipe_id_to_index, machines_per_stage, processing_times, changeover_times, batch_sizes, tact_times, recipe_required_stages, debug=False):
        self.seq_length = len(user_sequence)
        self.user_sequence = np.array(user_sequence)
        self.recipe_ids = recipe_ids  # Store recipe_ids for mapping
        self.recipe_id_to_index = recipe_id_to_index
        self.n_stages = len(machines_per_stage)
        self.machines_per_stage = machines_per_stage
        self.max_makespan = 0
        self.max_machines = max(machines_per_stage)
        self.debug = debug
        self.recipe_required_stages = recipe_required_stages
        n_var = self.seq_length + (self.seq_length * self.n_stages)
        super().__init__(
            n_var=n_var,
            n_obj=1,
            n_constr=0,
            xl=[0] * n_var,
            xu=[self.seq_length - 1] * self.seq_length + [m - 1 for m in machines_per_stage for _ in range(self.seq_length)],
            type_var=int
        )
        self.processing_times = processing_times
        self.changeover_times = changeover_times
        self.batch_sizes = batch_sizes
        self.tact_times = tact_times

    def _evaluate(self, X, out, *args, **kwargs):
        makespans = np.array([self.calculate_makespan(x, store_best=False) for x in X])
        out["F"] = makespans

    def calculate_makespan(self, x, store_best=True):
        perm = x[:self.seq_length]
        machine_choices = x[self.seq_length:].reshape(self.seq_length, self.n_stages)
        seq = self.user_sequence[perm]
        start_times = np.zeros((self.seq_length, self.n_stages))
        end_times = np.zeros((self.seq_length, self.n_stages))
        changeover_times_array = np.zeros((self.seq_length, self.n_stages))
        machine_free_times = np.zeros((self.n_stages, self.max_machines))

        if self.debug:
            print(f"Perm: {perm}, Sequence: {[self.recipe_ids[idx] for idx in seq]}")
            print(f"Machine choices:\n{machine_choices}")

        for i in range(self.seq_length):
            recipe_idx = seq[i]
            recipe = self.recipe_ids[recipe_idx]  # Map index to recipe ID
            required_stages = self.recipe_required_stages[recipe]
            prev_end = 0

            for s in required_stages:
                m = machine_choices[i, s]
                changeover = 0
                if i > 0 and s in self.recipe_required_stages[self.recipe_ids[seq[i-1]]]:
                    prev_recipe = self.recipe_ids[seq[i-1]]
                    if prev_recipe != recipe:
                        changeover = self.changeover_times[s, m, self.recipe_id_to_index[prev_recipe], self.recipe_id_to_index[recipe]]
                changeover_times_array[i, s] = changeover

                start_times[i, s] = max(machine_free_times[s, m], prev_end) + changeover
                processing_duration = self.processing_times[s, m, self.recipe_id_to_index[recipe]]
                batch_delay = (self.batch_sizes[i] - 1) * self.tact_times[self.recipe_id_to_index[recipe]]
                end_times[i, s] = start_times[i, s] + processing_duration + batch_delay

                machine_free_times[s, m] = end_times[i, s]
                prev_end = end_times[i, s]

                if self.debug:
                    print(f"Recipe {i} (ID {recipe}): Stage {s}, Machine {m}: Start = {start_times[i, s]}, Changeover = {changeover}, Processing = {processing_duration}, Batch Delay = {batch_delay}, End = {end_times[i, s]}, Machine free at = {machine_free_times[s, m]}")

        makespan = np.max(end_times)
        if self.debug:
            print(f"Calculated makespan in calculate_makespan: {makespan}")

        if store_best:
            self.best_start_times = start_times
            self.best_end_times = end_times
            self.best_machine_choices = machine_choices
            self.best_sequence_ids = [self.recipe_ids[idx] for idx in seq]
            self.best_changeover_times = changeover_times_array
            self.max_makespan = makespan

        return makespan

    def plot_gantt_chart(self):
        fig, ax = plt.subplots(figsize=(12, 6))
        stage_labels = [f"Stage {s} (Machine {m})" for s in range(self.n_stages) for m in range(self.machines_per_stage[s])]
        y_positions = np.arange(len(stage_labels))
        colors = {recipe_id: plt.cm.Set3(i) for i, recipe_id in enumerate(np.unique(self.best_sequence_ids))}

        for i in range(self.seq_length):
            recipe = self.best_sequence_ids[i]
            required_stages = self.recipe_required_stages[recipe]
            for s in required_stages:
                m = self.best_machine_choices[i, s]
                stage_idx = sum(self.machines_per_stage[:s]) + m
                if self.best_start_times[i, s] < self.best_end_times[i, s]:
                    if self.best_changeover_times[i, s] > 0:
                        ax.barh(y_positions[stage_idx], self.best_changeover_times[i, s],
                                left=self.best_start_times[i, s], height=0.5, color='red', edgecolor='black', alpha=0.7,
                                label='Changeover' if i == 0 and s == 0 else "")
                        process_start = self.best_start_times[i, s] + self.best_changeover_times[i, s]
                    else:
                        process_start = self.best_start_times[i, s]
                    process_duration = self.best_end_times[i, s] - process_start
                    ax.barh(y_positions[stage_idx], process_duration,
                            left=process_start, height=0.5, color=colors[recipe], edgecolor='black')
                    ax.text(process_start + process_duration / 2, y_positions[stage_idx], f"{recipe}",
                            ha='center', va='center')

        for s in range(self.n_stages):
            for m in range(self.machines_per_stage[s]):
                stage_idx = sum(self.machines_per_stage[:s]) + m
                ax.axhspan(stage_idx - 0.5, stage_idx + 0.5, color='beige' if s % 2 == 0 else 'white', alpha=0.3)

        ax.set_yticks(y_positions)
        ax.set_yticklabels(stage_labels)
        ax.set_xlabel('Time')
        ax.set_title('Data Production Scheduling - Multi-stage Workshop Optimization')
        ax.legend()
        plt.show()

# Example setup
recipe_ids = [1, 3, 4, 5, 8, 14]
recipe_id_to_index = {1: 0, 3: 1, 4: 2, 5: 3, 8: 4, 14: 5}

# User's sequence (8 positions)
user_sequence_ids = [3, 1, 5, 8, 14, 3, 5, 5]
user_sequence_idx = [1, 0, 3, 4, 5, 1, 3, 3]

# Problem parameters
n_stages = 5
n_recipes = len(recipe_ids)
machines_per_stage = [2, 3, 2, 2, 3]
max_machines = max(machines_per_stage)

# Sample data
np.random.seed(42)
processing_times = np.random.randint(5, 20, size=(n_stages, max_machines, n_recipes))
changeover_times = np.random.randint(0, 5, size=(n_stages, max_machines, n_recipes, n_recipes))
for s in range(n_stages):
    for m in range(max_machines):
        for r in range(n_recipes):
            changeover_times[s, m, r, r] = 0
batch_sizes = [10, 20, 15, 5, 30, 15, 25, 25]
tact_times = np.random.uniform(0.5, 2.0, size=n_recipes)

# Define required stages per recipe (stage 0 is mandatory)
recipe_required_stages = {
    1: [0, 1, 3],
    3: [0, 2, 4],
    4: [0, 1, 2, 4],
    5: [0, 1, 2, 3, 4],
    8: [0, 3, 4],
    14: [0, 1, 2]
}

# Initialize the problem
problem = BakeryHybridSchedulingProblem(
    user_sequence=user_sequence_idx,
    recipe_ids=recipe_ids,  # Pass recipe_ids
    recipe_id_to_index=recipe_id_to_index,
    machines_per_stage=machines_per_stage,
    processing_times=processing_times,
    changeover_times=changeover_times,
    batch_sizes=batch_sizes,
    tact_times=tact_times,
    recipe_required_stages=recipe_required_stages,
    debug=True
)

# Set up the genetic algorithm
algorithm = GA(
    pop_size=50,
    sampling=CustomHybridSampling(),
    crossover=CustomHybridCrossover(prob=0.5),
    mutation=CustomHybridMutation(prob=0.1),
    eliminate_duplicates=True
)

# Run the optimization
res = minimize(
    problem,
    algorithm,
    termination=DefaultSingleObjectiveTermination(
        xtol=1e-6, cvtol=1e-6, ftol=1e-6, period=20, n_max_gen=100
    ),
    verbose=True
)

# Extract and display results
best_perm = res.X[:problem.seq_length]
best_machine_choices = res.X[problem.seq_length:].reshape(problem.seq_length, problem.n_stages)
best_sequence_ids = [recipe_ids[problem.user_sequence[i]] for i in best_perm]

print("Original sequence of recipe IDs:", [recipe_ids[idx] for idx in user_sequence_idx])
print("Best permutation of indices:", best_perm)
print("Best sequence of recipe IDs:", best_sequence_ids)
print("Best machine choices per recipe and stage:\n", best_machine_choices)

# Recompute makespan with the best solution
max_makespan = problem.calculate_makespan(res.X)
unique_machines_used = sum(len(np.unique(best_machine_choices[:, s])) for s in range(problem.n_stages))
penalty_weight = 5.0
penalized_makespan = max_makespan + penalty_weight * unique_machines_used

print(f"Unique machines used: {unique_machines_used}, Penalty: {penalty_weight * unique_machines_used}, Penalized makespan: {penalized_makespan}")
print(f"Best makespan: {max_makespan}")

# Plot Gantt chart
problem.plot_gantt_chart()

ImportError: cannot import name 'RandomSampling' from 'pymoo.operators.sampling.rnd' (c:\Users\NHPHUC\Desktop\schantt-model\.venv\lib\site-packages\pymoo\operators\sampling\rnd.py)