In [1]:
from dataclasses import dataclass
from pandas import DataFrame
import pygad
import numpy
import pandas as pd
from typing import Callable, Dict, Any, List, Optional
from icecream import ic

from order_optimization.container import ModelInterface

from ordplan_project.settings import MIN_TRIM,PENALTY_VALUE, ROLL_PAPER


@dataclass
class GA():
    orders: DataFrame
    size: float = 66
    num_generations: int = 50
    out_range: int = 6
    showOutput: bool = False
    save_solutions: bool = False
    showZero: bool = False
    selector: Dict[str, Any] | None = None
    set_progress: Callable | None = None
    current_generation: int = 0
    _penalty:int = 0 
    _penalty_value:int = PENALTY_VALUE
    blade:Optional[int] = None
    
    def __post_init__(self):
        if self.orders is None:
            raise ValueError("Orders is empty!")
        self.orders = self.orders[self.orders['quantity'] > 0].reset_index(drop=True)
        self._paper_size  = self.size

        self.model = pygad.GA(
            num_generations=self.num_generations,
            num_parents_mating=60,
            fitness_func=self.fitness_function,
            sol_per_pop=120,
            num_genes=len(self.orders),
            parent_selection_type="tournament",
            gene_type=int,
            init_range_low=0,
            init_range_high=self.out_range,
            crossover_type="uniform",
            mutation_type="random",
            mutation_percent_genes=10,
            on_generation=self.on_gen,
            save_solutions=self.save_solutions,
        )

        
    def paper_type_logic(self, solution):
        init_type = None
        orders = self.orders
        match orders["edge_type"][self.get_first_solution(solution)]:
            case "X":
                init_type = 1
            case "N":
                init_type = 2
            case "W":
                init_type = 2

        if init_type is not None:
            for index, out in enumerate(solution):
                if out >= 1:
                    match init_type:
                        case 1:
                            if orders["edge_type"][index] not in [
                                "X",
                                "Y",
                            ]:  # Changed OR to AND condition
                                self._penalty += self._penalty_value
                        case 2:
                            if orders["edge_type"][index] == "X":
                                self._penalty += self._penalty_value

    def least_order_logic(self, solution):
        init_order = None
        orders = self.orders

        init_order = orders["quantity"][self.get_first_solution(solution)]

        for index, out in enumerate(solution):
            if out >= 1 and orders["quantity"][index] < init_order:
                self._penalty += self._penalty_value

    @staticmethod
    def get_first_solution(solution) -> int:
        for index, out in enumerate(solution):
            if out >= 1:
                return index
        return 0

    def paper_out_logic(self, solution):
        if sum(solution) > 5:
            if sum(solution) <= 6:
                init = 0
                for index, out in enumerate(solution):
                    if out>=1:
                        if self.orders['edge_type'][index]=='X' and init==0:
                            init = 1
                            continue
                        if self.orders['edge_type'][index]=='Y' and init==1:
                            return                
            
            self._penalty += self._penalty_value * sum(solution)  # ยิ่งเกิน ยิ่ง _penaltyเยอะ
        
        
        order_length = 0
        for index, out in enumerate(solution):
            if out >= 1:
                order_length += 1
        if order_length > 2:
            self._penalty += self._penalty_value * order_length  # ยิ่งเกิน ยิ่ง _penaltyเยอะ

    def paper_size_logic(self, _output):
        if _output > self._paper_size :  # ถ้าผลรวมมีค่ามากกว่า roll กำหนดขึ้น _penalty
            self._penalty += self._penalty_value * (
                _output - self._paper_size 
            )  # ยิ่งเกิน ยิ่ง _penaltyเยอะ

    def paper_trim_logic(self, _fitness_values):
        if abs(_fitness_values) <= MIN_TRIM:  # ถ้าผลรวมมีค่าน้อยกว่า _penalty > เงื่อนไขบริษัท
            self._penalty += self._penalty_value

    def selector_logic(self, solution: List[int])->List[int]:
        if self.selector is None:
            return solution
        
        try:
            solution[0] = self.selector["out"] #lock the first to be out (the first order is also the selector, manage by ORD)
        except KeyError:
            pass

        if solution[0] == 0: 
            solution[0] += 1

        return solution

    def fitness_function(self, ga_instance, solution, solution_idx):
        self._penalty = 0

        solution = self.selector_logic(solution)

        self.paper_type_logic(solution)

        self.least_order_logic(solution)

        self.paper_out_logic(solution)

        _output = numpy.sum(solution * self.orders["width"])  # ผลรวมของตัดกว้างทั้งหมด
        self.paper_size_logic(_output)

        _fitness_values = -self._paper_size  + _output  # ผลต่างของกระดาษที่มีกับออเดอร์ ยิ่งเยอะยิ่งดี
        self.paper_trim_logic(_fitness_values)
        return _fitness_values - self._penalty  # ลบด้วย _penalty

    def on_gen(self, ga_instance):

        self.current_generation += 1
        if self.set_progress:
            progress = (self.current_generation / self.num_generations) * 100
            self.set_progress(progress)

        orders = self.orders

        solution = ga_instance.best_solution()[0]

        _output = pd.DataFrame(
            {   
                "id": orders['id'].unique(),
                "blade": 0,
                "order_number": orders["order_number"],
                "num_orders": orders["quantity"],
                "component_type": orders["component_type"],
                "cut_width": orders["width"],
                "cut_len": orders["length"],
                "type": orders["edge_type"],
                "deadline": orders["due_date"],
                "out": solution,
            }
        )

        if not self.showZero:
            _output = _output[_output["out"] >= 1]
        _output = _output.reset_index(drop=True)


        _output = self.blade_logic(_output)

        self._fitness_values = ga_instance.best_solution()[1]
        self._output = _output

        if self.showOutput:
            self.show(ga_instance, _output)

    def blade_logic(self, output: DataFrame) -> DataFrame:
        blade_list: List[Dict[str,int]] = []
        for idx in output.index:
            blade_val = idx+1
            if self.blade is not None:
                blade_val = self.blade
            blade_list.append({"blade": blade_val})

        blade_df = pd.DataFrame(blade_list)
        output = pd.concat([output, blade_df], axis=1)
        return output

    def show(self, ga_instance, _output):
        _paper_size  = self._paper_size 
        print("Generation : ", ga_instance.generations_completed)
        print("Solution :")

        with pd.option_context(
            "display.max_columns",
            None,
            "display.width",
            None,
            "display.colheader_justify",
            "left",
        ):
            print(_output.to_string(index=False))

        print("Roll :", _paper_size )
        print("Used :", _paper_size  + self._fitness_values)
        print("Trim :", abs(self._fitness_values))
        print("\n")

    @property
    def run(self) -> Callable:
        return self.model.run


In [2]:
test_data = pd.DataFrame(
        {
            "order_number": [1, 2, 3, 4, 5],
            "quantity": [100, 200, 1500, 500, 250],
            "component_type": ["A", "B", "C", "D", "E"],
            "width": [66.04, 66.04, 66.04, 66.04, 80],
            "length": [200.0, 200.0, 200.0, 200.0, 200.0],
            "edge_type": ["X", "N", "W", "X", "Y"],
            "due_date": ["08/01/23", "08/01/23", "08/05/23", "08/10/23", "08/15/23"],
            "front_sheet": ["P1", "P2", "P3", "P4", "P5"],
            "c_wave": ["C1", "C2", "C3", "C4", "C5"],
            "middle_sheet": ["M1", "M2", "M3", "M4", "M5"],
            "b_wave": ["B1", "B2", "B3", "B4", "B5"],
            "back_sheet": ["B1", "B2", "B3", "B4", "B5"],
            "level": [1, 2, 3, 1, 2],
            "left_edge_cut": [0, 1, 0, 1, 0],
            "middle_edge_cut": [1, 0, 1, 0, 1],
            "right_edge_cut": [0, 1, 0, 1, 0],
            "id": [1,2,3,4,5]
        }
    )

In [3]:
ga_instance = GA(test_data,showOutput=True)
ga_instance.run()

If you do not want to mutate any gene, please set mutation_type=None.


Generation :  1
Solution :
 id  blade  order_number  num_orders component_type  cut_width  cut_len type deadline  out  blade
4   0      4             500         D              66.04      200.0    X    08/10/23 2    1     
Roll : 66
Used : -65947.92000000001
Trim : 66013.92000000001


Generation :  2
Solution :
 id  blade  order_number  num_orders component_type  cut_width  cut_len type deadline  out  blade
2   0      2             200         B              66.04      200.0    N    08/01/23 1    1     
Roll : 66
Used : -973.9600000000064
Trim : 1039.9600000000064


Generation :  3
Solution :
Empty DataFrame
Columns: [id, blade, order_number, num_orders, component_type, cut_width, cut_len, type, deadline, out]
Index: []
Roll : 66
Used : 0.0
Trim : 66.0


Generation :  4
Solution :
Empty DataFrame
Columns: [id, blade, order_number, num_orders, component_type, cut_width, cut_len, type, deadline, out]
Index: []
Roll : 66
Used : 0.0
Trim : 66.0


Generation :  5
Solution :
Empty DataFrame
