In [5]:
# Full GA with monorail (5 devices), 3 mills, 2 hoists, 32 ovens, and detailed constraints.
# Retry execution with slightly reduced GA size to avoid timeouts.
import random, time
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
import pandas as pd

# ---------- Configurable parameters (replace with your real values) ----------
HORIZON = 24 * 360  # minutes
CURING_TIME = 300  # fixed oven time in seconds
NUM_OVENS = 32
OVENS_PER_ROW = 16
NUM_HOISTS = 2
NUM_MILLS = 3
NUM_DEVICES = 5  # rolling devices on monorail
HOIST_OP_DURATION = 6  # load+unload time

# Rolling times per size (minutes) r1,r2,r3
ROLLING_TIME = {
    'S': (23, 18, 26),
    'M': (28, 29, 36),
    'L': (30, 42, 47)
}

# Monorail segment times (minutes) - placeholders; adjust to your layout distances
SEG_TIME = {
    'M1_M2': 35,
    'M2_M3': 32,
    'M3_HA': 20,
    'M3_HB': 26,
    'HA_M1': 15,
    'HB_M1': 15
}

MOVE_INTERNAL = {
    'after_m1': 1,
    'after_m2': 1,
    'after_m3': 1
}

OVENS_BY_PID = {
    'A': list(range(0, 10)),
    'B': list(range(10, 20)),
    'C': list(range(20, 32))
}

BASE_MOVE_TO_OVEN = 3
def move_time_to_oven(oven_id: int) -> int:
    pos = oven_id % OVENS_PER_ROW
    return BASE_MOVE_TO_OVEN + (pos % 4)

def hoist_for_oven(oven_id: int) -> int:
    return 0 if oven_id < OVENS_PER_ROW else 1

@dataclass
class Job:
    id: int
    pid: str
    size: str

@dataclass
class Interval:
    start: int
    end: int  # exclusive

class BusyCalendar:
    def __init__(self):
        self.intervals: List[Interval] = []

    def is_free(self, start: int, duration: int) -> bool:
        end = start + duration
        if start < 0 or end > HORIZON:
            return False
        for it in self.intervals:
            if it.start >= end:
                break
            if it.end <= start:
                continue
            return False
        return True

    def occupy(self, start: int, duration: int):
        end = start + duration
        new = Interval(start, end)
        res = []
        placed = False
        for it in self.intervals:
            if it.end < new.start:
                res.append(it)
            elif it.start > new.end:
                if not placed:
                    res.append(new)
                    placed = True
                res.append(it)
            else:
                new.start = min(new.start, it.start)
                new.end = max(new.end, it.end)
        if not placed:
            res.append(new)
        self.intervals = sorted(res, key=lambda x: x.start)

    def earliest_free(self, earliest_start: int, duration: int) -> Optional[int]:
        if duration <= 0:
            return earliest_start if earliest_start <= HORIZON else None
        if earliest_start < 0: earliest_start = 0
        if not self.intervals:
            if earliest_start + duration <= HORIZON:
                return earliest_start
            return None
        if earliest_start + duration <= self.intervals[0].start:
            return earliest_start
        for i in range(len(self.intervals)-1):
            gap_start = max(earliest_start, self.intervals[i].end)
            gap_end = self.intervals[i+1].start
            if gap_start + duration <= gap_end:
                return gap_start
        gap_start = max(earliest_start, self.intervals[-1].end)
        if gap_start + duration <= HORIZON:
            return gap_start
        return None

def decode_chromosome(chromosome: List[int], jobs: Dict[int, Job]) -> Tuple[int, Dict[int, Dict]]:
    mills = [BusyCalendar() for _ in range(NUM_MILLS)]
    segments = {name: BusyCalendar() for name in SEG_TIME.keys()}
    hoists = [BusyCalendar() for _ in range(NUM_HOISTS)]
    ovens = [BusyCalendar() for _ in range(NUM_OVENS)]

    device_time = [0 for _ in range(NUM_DEVICES)]
    next_job_ptr = 0
    total_jobs = len(chromosome)
    schedule = {jid: {'assigned': False} for jid in jobs.keys()}
    completed = 0

    while next_job_ptr < total_jobs:
        d_idx = min(range(NUM_DEVICES), key=lambda d: device_time[d])
        current_t = device_time[d_idx]
        if current_t >= HORIZON:
            break
        job_id = chromosome[next_job_ptr]
        job = jobs[job_id]
        t = current_t
        r1, r2, r3 = ROLLING_TIME[job.size]

        # find t1
        t1 = mills[0].earliest_free(t, r1)
        if t1 is None:
            schedule[job_id]['assigned'] = False
            next_job_ptr += 1
            next_free = mills[0].earliest_free(t, 1)
            device_time[d_idx] = next_free if next_free is not None else HORIZON
            continue
        t1_end = t1 + r1
        mills[0].occupy(t1, r1)

        # seg M1_M2
        seg = 'M1_M2'; seg_time = SEG_TIME[seg]
        seg_start = segments[seg].earliest_free(t1_end + MOVE_INTERNAL['after_m1'], seg_time)
        if seg_start is None:
            # rollback mill1
            if mills[0].intervals and mills[0].intervals[-1].start == t1:
                mills[0].intervals.pop()
            schedule[job_id]['assigned'] = False
            next_job_ptr += 1
            device_time[d_idx] = t1_end + 1
            continue
        seg_end = seg_start + seg_time
        segments[seg].occupy(seg_start, seg_time)

        # mill2
        t2 = mills[1].earliest_free(seg_end + MOVE_INTERNAL['after_m2'], r2)
        if t2 is None:
            # rollback
            if segments[seg].intervals and segments[seg].intervals[-1].start == seg_start:
                segments[seg].intervals.pop()
            if mills[0].intervals and mills[0].intervals[-1].start == t1:
                mills[0].intervals.pop()
            schedule[job_id]['assigned'] = False
            next_job_ptr += 1
            device_time[d_idx] = seg_end + 1
            continue
        t2_end = t2 + r2
        mills[1].occupy(t2, r2)

        # seg M2_M3
        seg2 = 'M2_M3'; seg2_time = SEG_TIME[seg2]
        seg2_start = segments[seg2].earliest_free(t2_end + MOVE_INTERNAL['after_m2'], seg2_time)
        if seg2_start is None:
            # rollback mills and seg
            if mills[1].intervals and mills[1].intervals[-1].start == t2:
                mills[1].intervals.pop()
            if segments[seg].intervals and segments[seg].intervals[-1].start == seg_start:
                segments[seg].intervals.pop()
            if mills[0].intervals and mills[0].intervals[-1].start == t1:
                mills[0].intervals.pop()
            schedule[job_id]['assigned'] = False
            next_job_ptr += 1
            device_time[d_idx] = t2_end + 1
            continue
        seg2_end = seg2_start + seg2_time
        segments[seg2].occupy(seg2_start, seg2_time)

        # mill3
        t3 = mills[2].earliest_free(seg2_end + MOVE_INTERNAL['after_m3'], r3)
        if t3 is None:
            # rollback
            if segments[seg2].intervals and segments[seg2].intervals[-1].start == seg2_start:
                segments[seg2].intervals.pop()
            if mills[1].intervals and mills[1].intervals[-1].start == t2:
                mills[1].intervals.pop()
            if segments[seg].intervals and segments[seg].intervals[-1].start == seg_start:
                segments[seg].intervals.pop()
            if mills[0].intervals and mills[0].intervals[-1].start == t1:
                mills[0].intervals.pop()
            schedule[job_id]['assigned'] = False
            next_job_ptr += 1
            device_time[d_idx] = seg2_end + 1
            continue
        t3_end = t3 + r3
        mills[2].occupy(t3, r3)

        # choose oven among PID ovens
        candidate_ovens = OVENS_BY_PID.get(job.pid, [])
        chosen = None
        for oven_id in candidate_ovens:
            hoist_idx = hoist_for_oven(oven_id)
            seg_name = 'M3_HA' if hoist_idx == 0 else 'M3_HB'
            seg3_time = SEG_TIME[seg_name]
            seg3_start = segments[seg_name].earliest_free(t3_end + MOVE_INTERNAL['after_m3'], seg3_time)
            if seg3_start is None:
                continue
            seg3_end = seg3_start + seg3_time
            hoist_ready = hoists[hoist_idx].earliest_free(seg3_end, HOIST_OP_DURATION)
            if hoist_ready is None:
                continue
            move_to_oven = move_time_to_oven(oven_id)
            oven_start_candidate = max(seg3_end + move_to_oven, hoist_ready + HOIST_OP_DURATION)
            if oven_start_candidate + CURING_TIME <= HORIZON and ovens[oven_id].is_free(oven_start_candidate, CURING_TIME):
                chosen = (oven_id, seg_name, seg3_start, hoist_idx, hoist_ready, oven_start_candidate)
                break

        if chosen is None:
            # rollback mills and segments
            if mills[2].intervals and mills[2].intervals[-1].start == t3:
                mills[2].intervals.pop()
            if segments['M2_M3'].intervals and segments['M2_M3'].intervals[-1].start == seg2_start:
                segments['M2_M3'].intervals.pop()
            if mills[1].intervals and mills[1].intervals[-1].start == t2:
                mills[1].intervals.pop()
            if segments['M1_M2'].intervals and segments['M1_M2'].intervals[-1].start == seg_start:
                segments['M1_M2'].intervals.pop()
            if mills[0].intervals and mills[0].intervals[-1].start == t1:
                mills[0].intervals.pop()
            schedule[job_id]['assigned'] = False
            next_job_ptr += 1
            device_time[d_idx] = t3_end + 1
            continue

        oven_id, seg3_name, seg3_start, hoist_idx, hoist_ready, oven_start = chosen
        seg3_dur = SEG_TIME[seg3_name]
        segments[seg3_name].occupy(seg3_start, seg3_dur)
        hoists[hoist_idx].occupy(hoist_ready, HOIST_OP_DURATION)
        ovens[oven_id].occupy(oven_start, CURING_TIME)

        # return segment from hoist to M1
        ret_seg = 'HA_M1' if hoist_idx==0 else 'HB_M1'
        ret_dur = SEG_TIME[ret_seg]
        ret_start = segments[ret_seg].earliest_free(hoist_ready + HOIST_OP_DURATION, ret_dur)
        if ret_start is None:
            # schedule at earliest possible (use earliest_free again)
            ret_start = segments[ret_seg].earliest_free(hoist_ready + HOIST_OP_DURATION, ret_dur)
            if ret_start is None:
                device_time[d_idx] = HORIZON
            else:
                segments[ret_seg].occupy(ret_start, ret_dur)
                device_time[d_idx] = ret_start + ret_dur
        else:
            segments[ret_seg].occupy(ret_start, ret_dur)
            device_time[d_idx] = ret_start + ret_dur

        schedule[job_id] = {
            'assigned': True, 'device': d_idx,
            'mill1_start': t1, 'mill1_end': t1_end,
            'mill2_start': t2, 'mill2_end': t2_end,
            'mill3_start': t3, 'mill3_end': t3_end,
            'seg_M1_M2_start': seg_start, 'seg_M1_M2_end': seg_end,
            'seg_M2_M3_start': seg2_start, 'seg_M2_M3_end': seg2_end,
            'seg_M3_H_start': seg3_start, 'seg_M3_H_end': seg3_start+seg3_dur,
            'hoist_idx': hoist_idx, 'hoist_start': hoist_ready,
            'oven_id': oven_id, 'oven_start': oven_start, 'oven_end': oven_start + CURING_TIME
        }

        if schedule[job_id]['oven_end'] <= HORIZON:
            completed += 1
        next_job_ptr += 1

    return completed, schedule

# ---------- GA functions ----------
def generate_jobs(n: int, pids: List[str]=None) -> Dict[int, Job]:
    if pids is None: pids = list(OVENS_BY_PID.keys())
    sizes = list(ROLLING_TIME.keys())
    jobs = {}
    for i in range(n):
        pid = random.choice(pids)
        size = random.choices(sizes, weights=[0.4,0.4,0.2])[0]
        jobs[i] = Job(i, pid, size)
    return jobs

def initial_population(job_ids: List[int], pop_size: int):
    pop = []
    sorted_ids = sorted(job_ids, key=lambda j: sum(ROLLING_TIME[jobs[j].size]))
    pop.append(sorted_ids.copy())
    for _ in range(pop_size-1):
        indiv = job_ids.copy(); random.shuffle(indiv); pop.append(indiv)
    return pop

def tournament_select(pop, fits, k=3):
    idxs = random.sample(range(len(pop)), k)
    best = max(idxs, key=lambda i: fits[i])
    return pop[best].copy()

def pmx(p1,p2):
    n=len(p1); a,b=sorted(random.sample(range(n),2))
    child=[None]*n; child[a:b+1]=p1[a:b+1]
    for i in range(a,b+1):
        if p2[i] not in child:
            val=p2[i]; pos=i
            while True:
                val_in_p1=p1[pos]; pos=p2.index(val_in_p1)
                if child[pos] is None:
                    child[pos]=p2[i]; break
    for i in range(n):
        if child[i] is None: child[i]=p2[i]
    return child

def swap_mut(indiv, rate=0.2):
    if random.random() < rate:
        i,j=random.sample(range(len(indiv)),2); indiv[i],indiv[j]=indiv[j],indiv[i]
    return indiv

def run_ga(jobs: Dict[int, Job], pop_size=30, generations=40, cx_rate=0.8, mut_rate=0.2):
    job_ids=list(jobs.keys())
    pop=initial_population(job_ids, pop_size)
    best_fit=-1; best_ind=None; best_sched=None; history=[]
    for gen in range(generations):
        fits=[]; scheds=[]
        for indiv in pop:
            fit,sched = decode_chromosome(indiv, jobs)
            fits.append(fit); scheds.append(sched)
            if fit>best_fit:
                best_fit=fit; best_ind=indiv.copy(); best_sched=sched
        history.append(max(fits))
        new_pop=[pop[max(range(len(pop)), key=lambda i: fits[i])].copy()]
        while len(new_pop)<pop_size:
            p1=tournament_select(pop,fits); p2=tournament_select(pop,fits)
            if random.random()<cx_rate:
                c1=pmx(p1,p2); c2=pmx(p2,p1)
            else:
                c1=p1.copy(); c2=p2.copy()
            c1=swap_mut(c1,mut_rate); c2=swap_mut(c2,mut_rate)
            new_pop.extend([c1,c2])
        pop=new_pop[:pop_size]
        if (gen+1)%10==0 or gen==0:
            print(f"Gen {gen+1} best_completed={best_fit}")
    return best_fit, best_ind, best_sched, history

# ---------- Demo ----------
random.seed(1234)
NUM_JOBS = 500
jobs = generate_jobs(NUM_JOBS)
start=time.time()
best_fit,best_ind,best_sched,history = run_ga(jobs, pop_size=30, generations=40, cx_rate=0.85, mut_rate=0.18)
elapsed=time.time()-start
print(f"\nGA done in {elapsed:.1f}s. Best completed tires: {best_fit} / {len(jobs)}")

# # Prepare hourly summary and sample scheduled jobs
# completed_ends = []
# for jid,info in best_sched.items():
#     if info.get('assigned') and info.get('oven_end') is not None and info.get('oven_end')<=HORIZON:
#         completed_ends.append(info['oven_end'])
# hourly=[0]*24
# for t in completed_ends:
#     h=min(23,t//60); hourly[h]+=1
# df=pd.DataFrame({'hour':list(range(24)),'completed':hourly,'cumulative':pd.Series(hourly).cumsum()})
# import caas_jupyter_tools as cjt; cjt.display_dataframe_to_user("Hourly Completed Tires", df)

# sched_list=[]
# for jid,info in best_sched.items():
#     if info.get('assigned') and info.get('oven_end') is not None:
#         sched_list.append({'job_id':jid,'pid':jobs[jid].pid,'size':jobs[jid].size,'device':info['device'],'oven_id':info['oven_id'],'oven_start':info['oven_start'],'oven_end':info['oven_end']})
# sched_df=pd.DataFrame(sched_list).sort_values('oven_start').head(60)
# cjt.display_dataframe_to_user("Sample Scheduled Jobs", sched_df)

# print("\nReplace timing parameters and OVENS_BY_PID with your real data to re-run.")


Gen 1 best_completed=219
Gen 10 best_completed=219
Gen 20 best_completed=219
Gen 30 best_completed=219
Gen 40 best_completed=219

GA done in 71.6s. Best completed tires: 219 / 500
