In [1]:
import numpy as np

NUM_SLOTS = 48   # 12 hours, 15-min slots
START_HOUR = 10  # schedule starts at 10:00

def hhmm_to_slot(hhmm: str) -> int:
    """Convert hhmm string to a slot index (0–47)."""
    t = int(hhmm)
    h = t // 100
    m = t % 100
    slot = (h - START_HOUR) * 4 + (m // 15)
    return max(0, min(NUM_SLOTS - 1, slot))

def parse_availability(avail_str: str) -> np.ndarray:
    """
    Convert availability string (e.g., '1000-1200,2030-2200')
    into a binary numpy array of length NUM_SLOTS.
    """
    schedule = np.zeros(NUM_SLOTS, dtype=int)

    for rng in avail_str.split(','):
        start, end = rng.split('-')
        start_slot = hhmm_to_slot(start)
        end_slot = hhmm_to_slot(end)

        # Fill working slots; end is exclusive
        schedule[start_slot:end_slot] = 1

    return schedule

def parse_availability(avail_str: str) -> np.ndarray:
    """
    Convert availability string (e.g., '1000-1200,2030-2200')
    into a binary numpy array of length NUM_SLOTS.
    Includes the last slot of the end time.
    """
    schedule = np.zeros(NUM_SLOTS, dtype=int)

    for rng in avail_str.split(','):
        start, end = rng.split('-')
        start_slot = hhmm_to_slot(start)
        end_slot = hhmm_to_slot(end)

        # Make end slot inclusive
        end_slot = min(end_slot, NUM_SLOTS - 1)

        # Fill working slots, inclusive of end_slot
        schedule[start_slot:end_slot + 1] = 1

    return schedule


def build_officer_schedules(input_avail):
    """
    Build (officer_names, base_schedules_matrix).
    
    officer_names: list of officer IDs
    base_schedules: 2D numpy array (num_officers, NUM_SLOTS)
    """
    officer_names = [f"O{i+1}" for i in range(len(input_avail))]
    schedules = [parse_availability(avail) for avail in input_avail]
    base_schedules = np.vstack(schedules)
    return officer_names, base_schedules


# === Example usage ===
input_avail = [
    '1000-1200','2000-2200','1300-1430,2030-2200','1300-1430,2030-2200',
    '1300-1430,2030-2200,1000-1130','1000-1600','1000-1600','1030-1900',
    '1030-1900','1030-1900','1030-1900','1030-1900','1100-2200','1100-2200',
    '1100-2200','1200-2200','1200-2200','1145-1830','1145-2200','1200-2200',
    '1145-2200','1145-2200','1230-1400','1130-1300','1300-1430','1230-1630',
    '1600-1830','1600-1830','1400-1830','1400-1830','1000-1200','2000-2200',
    '1800-2030','1700-2200'
]

input_avail = [
    '1000-1200','2000-2200','1300-1430,2030-2200','1300-1430,2030-2200',
    '1300-1430,2030-2200,1000-1130','1000-1600','1000-1600','1030-1900',
    '1030-1900','1030-1900','1030-1900','1030-1900','1100-2200','1100-2200',
    '1100-2200','1145-1830','1230-1400','1130-1300','1300-1430','1230-1630',
    '1600-1830','1600-1830','1400-1830','1400-1830','1000-1200','2000-2200',
    '1800-2030','1700-2200'
]


officer_names, base_schedules = build_officer_schedules(input_avail)

print("Officer names:", officer_names[:5])
print("Base schedules shape:", base_schedules.shape)
print("Officer_1 schedule:", base_schedules[0])


Officer names: ['O1', 'O2', 'O3', 'O4', 'O5']
Base schedules shape: (28, 48)
Officer_1 schedule: [1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0]


In [3]:
import numpy as np

def generate_break_schedules(base_schedules, officer_names):
    def sliding_window_ok(schedule):
        """Check that no more than 10 consecutive 1s exist."""
        consec = 0
        for x in schedule:
            if x == 1:
                consec += 1
                if consec > 10:
                    return False
            else:
                consec = 0
        return True

    all_schedules = {}

    for idx, officer in enumerate(officer_names):
        base = base_schedules[idx].copy()
        work_slots = np.where(base == 1)[0]

        if len(work_slots) == 0:
            all_schedules[officer] = [base.copy()]
            #print(f"[{officer}] No working slots. Stored as-is: {base}")
            continue

        # Build original consecutive working stretches
        stretches = []
        cur = [work_slots[0]]
        for s in work_slots[1:]:
            if s == cur[-1] + 1:
                cur.append(s)
            else:
                stretches.append(cur)
                cur = [s]
        stretches.append(cur)

        # If all stretches ≤10, store schedule as valid directly
        if all(len(stretch) <= 10 for stretch in stretches):
            all_schedules[officer] = [base.copy()]
            #print(f"[{officer}] All stretches ≤10 slots. Stored as valid: {base}")
            continue

        #print(f"[{officer}] Some stretches >10 slots. Executing mandatory-break placement.")

        valid_schedules = []
        seen_schedules = set()

        def finalize_schedule(schedule, last_break_end, last_break_len):
            """Try 1-slot breaks if sliding-window violated after mandatory breaks."""
            if sliding_window_ok(schedule):
                sig = schedule.tobytes()
                if sig not in seen_schedules:
                    seen_schedules.add(sig)
                    valid_schedules.append(schedule)
                    #print(f"[{officer}] Schedule valid after mandatory breaks: {schedule}")
                return

            # Try inserting a 1-slot break in all working intervals
            for s in range(len(schedule)):
                if schedule[s] != 1:
                    continue

                # Determine current working interval dynamically
                next_break_index = s
                while next_break_index < len(schedule) and schedule[next_break_index] == 1:
                    next_break_index += 1
                interval_end = next_break_index - 1

                prev_break_index = s
                while prev_break_index >= 0 and schedule[prev_break_index] == 1:
                    prev_break_index -= 1
                interval_start = prev_break_index + 1

                # First/last 4 slots rule
                if s <= interval_start + 4 or s >= interval_end - 4:
                    continue

                # Spacing rule
                required_gap = min(2 * last_break_len, 4) if last_break_end >= 0 else 0
                if s - last_break_end - 1 < required_gap:
                    continue

                cand = schedule.copy()
                cand[s] = 0
                if sliding_window_ok(cand):
                    sig = cand.tobytes()
                    if sig not in seen_schedules:
                        seen_schedules.add(sig)
                        valid_schedules.append(cand)
                        #print(f"[{officer}] 1-slot break placed at {s} → schedule OK: {cand}")

            #if not valid_schedules:
                #print(f"[{officer}] No feasible 1-slot break placement, rejecting schedule: {schedule}")

        def place_breaks(schedule, stretch_idx=0, last_break_end=-1, last_break_len=0):
            """Recursive placement of mandatory breaks."""
            if stretch_idx >= len(stretches):
                finalize_schedule(schedule, last_break_end, last_break_len)
                return

            stretch = stretches[stretch_idx]
            min_slot, max_slot = stretch[0], stretch[-1]
            stretch_len = len(stretch)

            # Skip small stretches ≤10
            if stretch_len <= 10:
                place_breaks(schedule, stretch_idx + 1, last_break_end, last_break_len)
                return

            # Determine mandatory break pattern
            if stretch_len >= 36:
                pattern = [2, 3, 3]
            elif stretch_len >= 20:
                pattern = [2,3]
            elif stretch_len >=10:
                pattern = [2]
            else:  # 10-19
                pattern = [0]

            def recurse(schedule, blens, last_break_end, last_break_len):
                if not blens:
                    place_breaks(schedule, stretch_idx + 1, last_break_end, last_break_len)
                    return

                blen = blens[0]

                # Determine the start of the current working interval
                interval_start = min_slot
                if last_break_end >= 0:
                    interval_start = last_break_end + 1

                # Maximum allowed start to ensure no >10 consecutive slots
                max_consec_start = interval_start + 10
                max_allowed = min(max_consec_start, max_slot - blen - 3)
                # also respect last 4 slots

                for s in range(interval_start + 4, max_allowed + 1):  # respect first 4 slots
                    # Spacing rule
                    required_gap = min(2 * last_break_len, 4) if last_break_end >= 0 else 0
                    if s - last_break_end - 1 < required_gap:
                        continue

                    # Only place break if all slots are working
                    if not np.all(schedule[s:s + blen] == 1):
                        continue

                    new_sched = schedule.copy()
                    new_sched[s:s + blen] = 0
                    #print(f"[{officer}] Placing mandatory break {blen} at {s}-{s + blen - 1}")
                    #print(f"Partial schedule: {new_sched}")

                    recurse(new_sched, blens[1:], s + blen, blen)


            recurse(schedule, pattern, last_break_end, last_break_len)

        # Run recursion
        place_breaks(base.copy())

        all_schedules[officer] = valid_schedules if valid_schedules else [base.copy()]
        #print(f"[{officer}] Finished. Number of valid schedules: {len(valid_schedules)}\n")

    return all_schedules

# all_break_schedules = generate_break_schedules(base_schedules, officer_names)


In [10]:
import numpy as np

def greedy_smooth_schedule(schedule_matrix, all_break_schedule):
    """
    schedule_matrix: I x L, 1 = working, 0 = outside work hours
    all_break_schedule: dict {officer_id: list of candidate schedules}
    
    Returns:
        chosen_schedule_indices: list of chosen candidate indices per officer
        best_work_count: final work_count after breaks applied
        min_penalty: total smoothness penalty (number of slope changes)
    """
    I, L = schedule_matrix.shape
    work_count = schedule_matrix.sum(axis=0)
    
    chosen_schedule_indices = [None] * I
    chosen_schedule = [schedule_matrix[i].copy() for i in range(I)]
    
    # initial penalty
    min_penalty = np.sum(np.diff(work_count) != 0)
    
    for officer in range(I):
        officer_key = f'O{officer + 1}'  # Converts 0 -> 'O1', 1 -> 'O2', etc.
        candidates = all_break_schedule.get(officer_key, [])
        if not candidates:
            continue  # no break schedule, skip
        
        best_idx = 0
        best_candidate = candidates[0]
        officer_min_penalty = float('inf')  # Changed: start with infinity
        
        for idx, candidate in enumerate(candidates):
            delta_indices = np.where((schedule_matrix[officer] == 1) & (candidate == 0))[0]
            
            if len(delta_indices) > 0:
                # temporarily apply break
                for i in delta_indices:
                    work_count[i] -= 1
                penalty = np.sum(np.diff(work_count) != 0)
                # revert
                for i in delta_indices:
                    work_count[i] += 1
            else:
                penalty = min_penalty  # no change
            
            if penalty < officer_min_penalty:  # Now this will always trigger at least once
                officer_min_penalty = penalty
                best_candidate = candidate
                best_idx = idx
        
        # apply best candidate permanently
        chosen_schedule[officer] = best_candidate
        chosen_schedule_indices[officer] = best_idx  # This will now always be set
        delta_indices = np.where((schedule_matrix[officer] == 1) & (best_candidate == 0))[0]
        for i in delta_indices:
            work_count[i] -= 1
        
        min_penalty = officer_min_penalty  # update global min_penalty
    
    return chosen_schedule_indices, work_count, min_penalty

# chosen_schedule_indices, best_work_count, min_penalty = greedy_smooth_schedule(base_schedules, all_break_schedules)
print(chosen_schedule_indices)
print(best_work_count)
print(min_penalty)

[0, 0, 0, 0, 0, 2, 2, 161, 2, 49, 161, 2, 114, 426, 286, 11, 0, 0, 0, 6, 0, 0, 6, 6, 0, 0, 0, 3]
[ 5  5 10 10 11 11 12 12 12 10 12 11 15 13 13 13 16 16 16 12 12 12 13 10
 10 10 10 10 12 12 14 14 14 13 13  9  8  2  5  5  6  6  8  9  9  9  9  9]
22


In [12]:
import numpy as np
import heapq
from copy import deepcopy

class SegmentTree:
    def __init__(self, work_count):
        self.work_count = work_count.copy()
    
    def update_delta(self, delta_indices, delta):
        for i in delta_indices:
            self.work_count[i] += delta
    
    def compute_penalty(self):
        diffs = np.diff(self.work_count)
        return int(np.sum(diffs != 0))  # cast to int to avoid numpy.int64 issues

def greedy_smooth_schedule_beam(schedule_matrix, all_break_schedule, beam_width=10):
    I, L = schedule_matrix.shape
    initial_work_count = schedule_matrix.sum(axis=0)
    
    # Beam elements: (penalty, SegmentTree, chosen_indices)
    beam = [(SegmentTree(initial_work_count), np.sum(np.diff(initial_work_count) != 0), [])]

    for officer in range(I):
        officer_key = f'O{officer+1}'
        candidates = all_break_schedule.get(officer_key, [])
        new_beam = []

        if not candidates:
            # No break schedules: extend beam with None
            for stree, pen, indices in beam:
                new_beam.append((stree, pen, indices + [None]))
            beam = new_beam
            continue

        for stree, pen, indices in beam:
            for idx, candidate in enumerate(candidates):
                delta_indices = np.where((schedule_matrix[officer] == 1) & (candidate == 0))[0]
                new_stree = deepcopy(stree)
                if len(delta_indices) > 0:
                    new_stree.update_delta(delta_indices, -1)
                new_penalty = new_stree.compute_penalty()
                new_beam.append((new_stree, new_penalty, indices + [idx]))

        # Keep top-K by penalty
        beam = sorted(new_beam, key=lambda x: x[1])[:beam_width]

    # Return best solution
    best_stree, min_penalty, chosen_indices = min(beam, key=lambda x: x[1])
    best_work_count = best_stree.work_count
    return chosen_indices, best_work_count, min_penalty

chosen_schedule_indices, best_work_count, min_penalty = greedy_smooth_schedule_beam(
    base_schedules,all_break_schedules,beam_width=20  # tune beam width
)

print(chosen_schedule_indices)
print(best_work_count)
print(min_penalty)


[0, 0, 0, 0, 0, 13, 24, 161, 75, 133, 123, 124, 364, 656, 558, 18, 0, 0, 0, 3, 0, 0, 8, 0, 0, 0, 1, 0]
[ 5  5 10 10 13 13 13 13 13 10 10 10 13 13 16 16 16 15 15 11 10 10 11 11
 11 11 11 11 10 10 13 13 12 12 13 10 10  4  4  4  4  6  9  9  9  9  9  9]
17


In [13]:
import numpy as np

def generate_schedule_matrix(saved_indices, all_break_schedules, officer_names):
    """
    Generate a 2D matrix of officers' schedules based on selected indices.
    
    Args:
        saved_indices (list of int): Selected schedule index per officer.
        all_break_schedules (dict): officer -> list of 0/1 np.arrays (schedules).
        officer_names (list of str): List of officer names in the same order.
        
    Returns:
        np.ndarray: 2D array (num_officers x num_slots), 1=working, 0=break
    """
    num_officers = len(saved_indices)
    num_slots = len(next(iter(all_break_schedules[officer_names[0]])))  # assume all schedules same length
    
    schedule_matrix = np.zeros((num_officers, num_slots), dtype=int)
    
    for i, officer in enumerate(officer_names):
        idx = saved_indices[i]
        schedule_matrix[i] = all_break_schedules[officer][idx]
        
    return schedule_matrix

schedule_matrix = generate_schedule_matrix(chosen_schedule_indices, all_break_schedules, officer_names)


In [16]:
def get_intervals_from_schedule(schedule_matrix):
    intervals = []
    schedule_matrix = np.array(schedule_matrix)  # Ensure it's a NumPy array

    for row in schedule_matrix:
        n = len(row)
        i = 0
        while i < n:
            if row[i] == 1:
                start = i
                # Find the end of the consecutive 1's
                while i < n and row[i] == 1:
                    i += 1
                end = i  # end index is exclusive
                intervals.append((start, end))
            else:
                i += 1

    return intervals

schedule_intervals = get_intervals_from_schedule(schedule_matrix)


In [21]:
import numpy as np
from collections import defaultdict

def get_intervals_from_schedule(schedule_matrix):
    interval_dict = defaultdict(list)
    schedule_matrix = np.array(schedule_matrix)  # Ensure it's a NumPy array

    for row_idx, row in enumerate(schedule_matrix):
        n = len(row)
        i = 0
        while i < n:
            if row[i] == 1:
                start = i
                # Find the end of consecutive 1's
                while i < n and row[i] == 1:
                    i += 1
                end = i  # end index is exclusive
                interval_dict[(start, end)].append(row_idx)
            else:
                i += 1

    schedule_intervals = []
    for interval, rows in interval_dict.items():
        schedule_intervals.extend([interval] * len(rows))
    return dict(interval_dict), schedule_intervals

In [19]:
from collections import defaultdict
import random

def greedy_longest_partition(intervals):
    """
    Partition intervals into disjoint paths.
    Always pick the longest available path first.
    """
    intervals = intervals[:]  # copy
    paths = []

    def build_longest_path(remaining):
        # Build adjacency: start -> intervals
        start_map = defaultdict(list)
        for s, e in remaining:
            start_map[s].append((s, e))

        best_path = []
        used = set()

        def dfs(path, current_end, visited):
            nonlocal best_path
            if len(path) > len(best_path):
                best_path = path[:]

            if current_end not in start_map:
                return

            for nxt in start_map[current_end]:
                if nxt not in visited:
                    visited.add(nxt)
                    dfs(path + [nxt], nxt[1], visited)
                    visited.remove(nxt)

        # Try starting from every interval
        for interval in remaining:
            dfs([interval], interval[1], {interval})

        return best_path

    # Keep extracting longest paths until no intervals remain
    while intervals:
        longest = build_longest_path(intervals)
        if not longest:  # safety check
            break
        paths.append(longest)
        # Remove used intervals
        for iv in longest:
            intervals.remove(iv)

    return paths


In [23]:
def max_coverage_paths(chains):
    """
    chains: list of chains, each chain = list of intervals [(start, end), ...]
    total_end: the target coverage end
    """

    # Assign unique indices to chains
    chain_indices = list(range(len(chains)))
    remaining_chains = set(chain_indices)
    all_paths = []

    while remaining_chains:
        best_path = []
        best_coverage = -1

        def dfs(path, coverage_end, used_chains):
            nonlocal best_path, best_coverage

            # Update best path if current coverage is better
            if coverage_end > best_coverage:
                best_coverage = coverage_end
                best_path = path[:]

            for idx in list(remaining_chains):
                if idx in used_chains:
                    continue
                chain = chains[idx]
                chain_start = chain[0][0]
                if chain_start >= coverage_end:  # gaps allowed
                    dfs(path + [chain], chain[-1][1], used_chains | {idx})

        # Run DFS starting with empty path
        dfs([], 0, set())

        # Commit the best path found
        all_paths.append(best_path)
        for chain in best_path:
            # Find its index and remove from remaining_chains
            for i in remaining_chains:
                if chains[i] == chain:
                    remaining_chains.remove(i)
                    break

    flattened_paths = [sum(path, []) for path in all_paths]
    return flattened_paths


# # Example usage
# chains = [
#     [(0, 6), (6, 12), (12, 19), (19, 28), (28, 33), (33, 39), (39, 43), (43, 48)],
#     [(0, 4), (4, 12), (12, 19), (19, 25), (25, 30), (30, 35), (35, 40), (40, 48)],
#     [(2, 10), (10, 15), (15, 25), (25, 30), (30, 35)],
#     [(2, 11), (11, 16), (16, 20), (20, 30), (30, 35)],
#     [(2, 8), (8, 16), (16, 20), (20, 28), (28, 37)],
#     [(6, 13), (13, 21), (21, 29), (29, 37)],
#     [(14, 22), (22, 29), (29, 37)],
#     [(4, 12), (12, 18), (18, 27)],
#     [(0, 7), (7, 17)],
#     [(33, 40), (40, 48)],
#     [(24, 28), (28, 35)],
#     [(24, 30), (30, 35)],
#     [(32, 40), (40, 48)],
#     [(12, 19), (19, 27)],
#     [(14, 22), (22, 27)],
#     [(4, 10), (10, 17)],
#     [(2, 10), (10, 17)],
#     [(12, 19), (19, 27)],
#     [(2, 9)],
#     [(42, 48)],
#     [(32, 37)],
#     [(0, 9)],
#     [(31, 37)],
#     [(12, 17)],
#     [(41, 48)],
#     [(24, 28)],
#     [(31, 37)],
#     [(12, 17)],
#     [(0, 9)],
#     [(41, 48)],
#     [(42, 48)],
#     [(42, 48)]
# ]




In [20]:
def split_full_partial_paths(paths, target_length=48):
    """
    Splits a list of paths into full paths (covering target_length exactly) 
    and partial paths (covering less than target_length).

    Args:
        paths (list of list of tuples): Each path is a list of intervals (start, end).
        target_length (int): The length to consider a path as full.

    Returns:
        tuple: (full_paths, partial_paths)
    """
    full_paths = []
    partial_paths = []

    for path in paths:
        total_length = sum(end - start for start, end in path)
        if total_length == target_length:
            full_paths.append(path)
        else:
            partial_paths.append(path)

    return full_paths, partial_paths


#### data inputs

In [15]:
counter_priority_list = [41] + [n for offset in range(0,10) for n in range(40 - offset, 0, -10)]


#### data pipeline

In [25]:
officer_names, base_schedules = build_officer_schedules(input_avail)
all_break_schedules = generate_break_schedules(base_schedules, officer_names)
# chosen_schedule_indices, best_work_count, min_penalty = greedy_smooth_schedule_beam(
#     base_schedules,all_break_schedules,beam_width=20  # tune beam width)
schedule_matrix = generate_schedule_matrix(chosen_schedule_indices, all_break_schedules, officer_names)
schedule_intervals_to_officers, schedule_intervals = get_intervals_from_schedule(schedule_matrix)
chains = greedy_longest_partition(schedule_intervals)
paths = max_coverage_paths(chains)
full_paths, partial_paths = split_full_partial_paths(paths)

for i, path in enumerate(full_paths, 1):
    print(f"Path {i}: {path}")

print('===partial paths===')

for i, path in enumerate(partial_paths, 1):
    print(f"Path {i}: {path}")


Path 1: [(0, 7), (7, 12), (12, 19), (19, 25), (25, 30), (30, 35), (35, 42), (42, 48)]
Path 2: [(0, 6), (6, 13), (13, 19), (19, 28), (28, 35), (35, 40), (40, 48)]
===partial paths===
Path 1: [(0, 8), (8, 14), (14, 24), (24, 28), (28, 32), (32, 37), (42, 48)]
Path 2: [(2, 12), (12, 19), (19, 27), (27, 32), (32, 37), (42, 48)]
Path 3: [(2, 11), (11, 16), (16, 25), (25, 31), (31, 37), (42, 48)]
Path 4: [(2, 10), (10, 17), (17, 25), (25, 35), (41, 48)]
Path 5: [(4, 12), (12, 22), (22, 29), (29, 37), (41, 48)]
Path 6: [(2, 10), (10, 16), (16, 20), (34, 39), (39, 43), (43, 48)]
Path 7: [(14, 24), (24, 28), (30, 40), (40, 48)]
Path 8: [(4, 12), (12, 22), (22, 27), (27, 37)]
Path 9: [(4, 10), (10, 17), (30, 37)]
Path 10: [(14, 22), (22, 27), (27, 35)]
Path 11: [(0, 9), (12, 19), (30, 35)]
Path 12: [(0, 9), (12, 19), (22, 32)]
Path 13: [(12, 19)]
Path 14: [(2, 9), (14, 19)]


#### optional branch

In [None]:
# import numpy as np

# def understaff_slots(base_schedules: np.ndarray):
#     """
#     Find slots with the minimum and second minimum number of officers (before breaks),
#     including the last slot.
    
#     Returns:
#         min_val, min_slots: minimum value and its slot indices
#         second_min_val, second_min_slots: second minimum value and its slot indices
#     """
#     manning = base_schedules.sum(axis=0)

#     # Find minimum
#     min_val = int(manning.min())
#     min_slots = np.where(manning == min_val)[0].tolist()
    
#     # Mask out the minimum to find the second minimum using a large int
#     masked = manning.copy()
#     masked[manning == min_val] = np.iinfo(manning.dtype).max
#     second_min_val = int(masked.min())
#     second_min_slots = np.where(masked == second_min_val)[0].tolist()
    
#     return min_val, min_slots, second_min_val, second_min_slots

# # Example usage
# min_val, min_slots_index, second_min_val, second_min_slots_index = understaff_slots(base_schedules)

# print("Minimum value:", min_val, "at slots:", min_slots_index)
# print("Second minimum value:", second_min_val, "at slots:", second_min_slots_index)
