In [None]:
import pandas as pd
import random
import numpy as np
import math
import time
from datetime import datetime
from collections import defaultdict
import os
import zipfile

# Parsing and Structuring Schedule Data from Text Files
#### Extracting and organizing the schedule-related information from multiple text files, including schedule length, employee count, shift details, and constraints and combines the parsed data into a Pandas DataFrame for easy analysis and saves the results as a CSV file for further use.

In [None]:
# Function to parse a single file
def parse_file(file_content):
    try:
        lines = [line.decode('utf-8').strip() for line in file_content if line.strip()]
        data = {}
        i = 0

        while i < len(lines):
            line = lines[i]

            if "Length of the schedule" in line:
                i += 1
                data["schedule_length"] = int(lines[i])
            elif "Number of Employees" in line:
                i += 1
                data["num_employees"] = int(lines[i])
            elif "Number of Shifts" in line:
                i += 1
                # Check if the key exists before accessing
                if i < len(lines):
                    data["num_shifts"] = int(lines[i])
            elif "# Temporal Requirements Matrix" in line:
                matrix = []
                for _ in range(data.get("num_shifts", 0)):  # Safe access
                    i += 1
                    if i < len(lines):
                        row = list(map(int, lines[i].split()))
                        matrix.append(row)
                data["temporal_requirements"] = matrix
            elif "#ShiftName" in line:
                shift_info = []
                for _ in range(data.get("num_shifts", 0)):  # Safe access
                    i += 1
                    if i < len(lines):
                        tokens = lines[i].split()
                        shift_info.append(tokens)
                data["shift_info"] = shift_info
            elif "# Minimum and maximum length of days-off blocks" in line:
                i += 1
                if i < len(lines):
                    tokens = list(map(int, lines[i].split()))
                    data["days_off_min"], data["days_off_max"] = tokens
            elif "# Minimum and maximum length of work blocks" in line:
                i += 1
                if i < len(lines):
                    tokens = list(map(int, lines[i].split()))
                    data["work_blocks_min"], data["work_blocks_max"] = tokens
            elif "# Number of not allowed shift sequences" in line:
                i += 1
                if i < len(lines):
                    tokens = list(map(int, lines[i].split()))
                    data["nr_seq_2"], data["nr_seq_3"] = tokens
            elif "# Not allowed shift sequences" in line:
                not_allowed = []
                for _ in range(data.get("nr_seq_2", 0)):  # Safe access
                    i += 1
                    if i < len(lines):
                        not_allowed.append(lines[i].split())
                for _ in range(data.get("nr_seq_3", 0)):  # Safe access
                    i += 1
                    if i < len(lines):
                        not_allowed.append(lines[i].split())
                data["not_allowed_sequences"] = not_allowed

            i += 1

        return data

    except Exception as e:
        print(f"Error parsing file {file_path}: {e}")
        return None

# Parse all files in the folder
all_data = []

with zipfile.ZipFile('/content/data.zip') as z:
    for full_path in z.namelist():
        if full_path.endswith(".txt"):
            filename = os.path.basename(full_path)
            with z.open(full_path) as file_content:
                parsed_data = parse_file(file_content)
                if parsed_data:
                    all_data.append({
                        "filename": filename,
                        "parsed_data": parsed_data
                    })

# Convert to Pandas DataFrame
df = pd.DataFrame([
    {
        "filename": item["filename"],
        "schedule_length": item["parsed_data"].get("schedule_length", None),
        "num_employees": item["parsed_data"].get("num_employees", None),
        "num_shifts": item["parsed_data"].get("num_shifts", None),
        "temporal_requirements": item["parsed_data"].get("temporal_requirements", None),
        "shift_info": item["parsed_data"].get("shift_info", None),
        "days_off_min": item["parsed_data"].get("days_off_min", None),
        "days_off_max": item["parsed_data"].get("days_off_max", None),
        "work_blocks_min": item["parsed_data"].get("work_blocks_min", None),
        "work_blocks_max": item["parsed_data"].get("work_blocks_max", None),
        "nr_seq_2": item["parsed_data"].get("nr_seq_2", None),
        "nr_seq_3": item["parsed_data"].get("nr_seq_3", None),
        "not_allowed_sequences": item["parsed_data"].get("not_allowed_sequences", None)
    } for item in all_data
])


# Save the DataFrame to CSV
df.to_csv('parsed_schedule_20_examples.csv', index=False)


##  Exploration of Initialization and Weighting Strategies in Scheduling
#### Exploring different initialization methods (random and greedy) and penalty weighting schemes for scheduling problems. Using a fixed heuristic of Min Conflict with Simulated Annealing (SA), it evaluates the impact of various strategies on conflict reduction. We are working with the first instance of the dataset for testing different initialization and weighting methods. Results are saved in CSV and text formats to analyze the performance across combinations.
##### Weighted Method 1: 4 - temporal, 3 - off days, 2 - not allowed sequences, 1 - shift blocks and work blocks
##### Weighted Method 2: 2 - temporal requirements and off days, 1 for rest
##### Weighted Method 3: 1000 - hard constraints, 200 - moderate constraints, 10 - Soft constraints



In [None]:
# 1. PARSE FUNCTIONS

def parse_temporal_requirements(temporal_requirements_str):
    return eval(temporal_requirements_str)

def parse_shift_info(shift_info_str):
    shift_list = eval(shift_info_str)
    return [
        {
            'shift_name': s[0],
            'start': int(s[1]),
            'length': int(s[2]),
            'min_blocks': int(s[3]),
            'max_blocks': int(s[4])
        }
        for s in shift_list
    ]

def parse_not_allowed_sequences(not_allowed_sequences_str):
    return [tuple(pair) for pair in eval(not_allowed_sequences_str)]

# 2. INITIALIZATION METHODS

def generate_random_schedule(num_employees, schedule_length):
    possible_shifts = ['D', 'A', 'N', '-']
    schedule = {}
    for emp_id in range(num_employees):
        schedule[emp_id] = [random.choice(possible_shifts) for _ in range(schedule_length)]
    return schedule

def generate_greedy_schedule(row_data):
    possible_shifts = ['D', 'A', 'N']  # ONLY the staffed shifts
    num_employees = int(row_data["num_employees"])
    schedule_length = int(row_data["schedule_length"])
    temporal_requirements = parse_temporal_requirements(row_data["temporal_requirements"])

    # Start everyone as off '-'
    schedule = {emp_id: ['-'] * schedule_length for emp_id in range(num_employees)}

    for day in range(schedule_length):
        # Sort the *staffed* shifts by required demand (descending)
        shift_demand_order = sorted(
            possible_shifts,
            key=lambda sh: temporal_requirements[possible_shifts.index(sh)][day],
            reverse=True
        )

        # Fill day with required number of employees for each shift
        for shift in shift_demand_order:
            employees_needed = temporal_requirements[possible_shifts.index(shift)][day]
            # Sort employees by how many times they've had `shift` to distribute fairly
            sorted_employees = sorted(schedule.items(), key=lambda x: x[1].count(shift))

            assigned_count = 0
            for emp_id, emp_sched in sorted_employees:
                # If this employee is off (i.e. '-') on this day and we still need employees:
                if emp_sched[day] == '-' and assigned_count < employees_needed:
                    schedule[emp_id][day] = shift
                    assigned_count += 1

    return schedule

# 3. CONFLICT EVALUATION

def evaluate_conflicts(schedule, row_data, method=1):
    # Parse the data
    temporal_req = parse_temporal_requirements(row_data["temporal_requirements"])
    shift_defs = parse_shift_info(row_data["shift_info"])
    not_allowed_seq = parse_not_allowed_sequences(row_data["not_allowed_sequences"])
    nr_seq_2 = row_data["nr_seq_2"]
    nr_seq_3 = row_data["nr_seq_3"]
    schedule_len = row_data["schedule_length"]

    days_off_min = row_data["days_off_min"]
    days_off_max = row_data["days_off_max"]
    work_blocks_min = row_data["work_blocks_min"]
    work_blocks_max = row_data["work_blocks_max"]

    shift_block_limits = {
        sd['shift_name']: (sd['min_blocks'], sd['max_blocks']) for sd in shift_defs
    }

    conflicts = 0

    # Weights based on method
    weights = {1: {"temporal": 4, "off_days": 3, "sequences": 2, "blocks": 1},
               2: {"temporal": 2, "off_days": 2, "sequences": 1, "blocks": 1},
               3: {"temporal": 1000, "off_days": 200, "sequences": 1000, "blocks": 10}}[method]

    # Temporal requirements
    for day in range(schedule_len):
        shift_count = defaultdict(int)
        for emp_schedule in schedule.values():
            shift = emp_schedule[day]
            if shift != '-':
                shift_count[shift] += 1

        for shift_id, shift_info in enumerate(shift_defs):
            shift_name = shift_info['shift_name']
            assigned = shift_count.get(shift_name, 0)
            required = temporal_req[shift_id][day]
            diff = abs(assigned - required)
            conflicts += weights["temporal"] * diff

    # Employee-specific constraints
    for emp_sched in schedule.values():
        segments = []
        curr_shift = emp_sched[0]
        curr_len = 1
        for d in range(1, len(emp_sched)):
            if emp_sched[d] == curr_shift:
                curr_len += 1
            else:
                segments.append((curr_shift, curr_len))
                curr_shift = emp_sched[d]
                curr_len = 1
        segments.append((curr_shift, curr_len))

        for seg_type, seg_len in segments:
            if seg_type == '-':
                if seg_len < days_off_min or seg_len > days_off_max:
                    conflicts += weights["off_days"]
            else:
                if seg_type in shift_block_limits:
                    min_b, max_b = shift_block_limits[seg_type]
                    if seg_len < min_b or seg_len > max_b:
                        conflicts += weights["blocks"]

        # Not allowed sequences
        for day in range(len(emp_sched)):
            if day > 0:
                prev_shift = emp_sched[day - 1]
                curr_shift = emp_sched[day]
                if nr_seq_2 > 0 and (prev_shift, curr_shift) in not_allowed_seq:
                    conflicts += weights["sequences"]
                if day > 1 and nr_seq_3 > 0:
                    prev_prev = emp_sched[day - 2]
                    if (prev_prev, prev_shift, curr_shift) in not_allowed_seq:
                        conflicts += weights["sequences"]

    return conflicts

# 4. LOCAL SEARCH AND SIMULATED ANNEALING

def min_conflicts_step(schedule, row_data, method=1):
    possible_shifts = ['D', 'A', 'N', '-']
    new_schedule = {emp: days[:] for emp, days in schedule.items()}  # deep copy
    schedule_length = len(next(iter(new_schedule.values())))

    emp_id = random.choice(list(new_schedule.keys()))
    day = random.randint(0, schedule_length - 1)
    original_shift = new_schedule[emp_id][day]
    best_shift = original_shift
    min_conf = float('inf')

    for candidate_shift in possible_shifts:
        new_schedule[emp_id][day] = candidate_shift
        c = evaluate_conflicts(new_schedule, row_data, method=method)
        if c < min_conf:
            min_conf = c
            best_shift = candidate_shift
        new_schedule[emp_id][day] = original_shift

    new_schedule[emp_id][day] = best_shift
    return new_schedule

def simulated_annealing(schedule, row_data, method=1, max_evaluations=10000, temperature=1000, cooling_rate=0.995):
    current_solution = schedule
    current_cost = evaluate_conflicts(current_solution, row_data, method=method)
    best_solution = current_solution
    best_cost = current_cost

    evaluations = 0

    while evaluations < max_evaluations and temperature > 1:
        neighbor = min_conflicts_step(current_solution, row_data, method=method)
        neighbor_cost = evaluate_conflicts(neighbor, row_data, method=method)

        cost_diff = current_cost - neighbor_cost
        if cost_diff > 0 or random.random() < math.exp(cost_diff / temperature):
            current_solution = neighbor
            current_cost = neighbor_cost
            if current_cost < best_cost:
                best_solution = current_solution
                best_cost = current_cost

        temperature *= cooling_rate
        evaluations += 1

    return best_solution, best_cost

# 5. RUN ALL COMBINATIONS & SAVE RESULTS

def run_scheduling(input_csv_path, output_csv_path="results_testing_with_SA.csv",
                   output_txt_path="results_testing_with_SA.txt"):
    data = pd.read_csv(input_csv_path)
    results = []

    init_modes = ["RANDOM", "GREEDY"]
    weight_methods = [1, 2, 3]

    with open(output_txt_path, "w") as txt_file:
        for idx, row in data.iterrows():
            row_data = {
                "temporal_requirements": row["temporal_requirements"],
                "shift_info": row["shift_info"],
                "days_off_min": row["days_off_min"],
                "days_off_max": row["days_off_max"],
                "work_blocks_min": row["work_blocks_min"],
                "work_blocks_max": row["work_blocks_max"],
                "not_allowed_sequences": row["not_allowed_sequences"],
                "nr_seq_2": row["nr_seq_2"],
                "nr_seq_3": row["nr_seq_3"],
                "schedule_length": row["schedule_length"],
                "num_employees": row["num_employees"]
            }

            for init_mode in init_modes:
                for method in weight_methods:
                    if init_mode == "RANDOM":
                        schedule_init = generate_random_schedule(
                            num_employees=row_data["num_employees"],
                            schedule_length=row_data["schedule_length"]
                        )
                    else:
                        schedule_init = generate_greedy_schedule(row_data)

                    start_time = time.time()
                    best_solution, best_cost = simulated_annealing(
                        schedule_init, row_data, method=method
                    )
                    elapsed = time.time() - start_time

                    formatted_schedule = "\n".join(
                        [f"Emp {k}: {' '.join(v)}" for k, v in best_solution.items()]
                    )
                    results.append({
                        "row_index": idx,
                        "init_mode": init_mode,
                        "method": method,
                        "time_in_sec": round(elapsed, 4),
                        "conflicts": best_cost,
                        "schedule": formatted_schedule
                    })

                    # Write to text file
                    txt_file.write(f"Row: {idx}, Init Mode: {init_mode}, Method: {method}\n")
                    txt_file.write(f"Time (s): {round(elapsed, 4)}, Conflicts: {best_cost}\n")
                    txt_file.write(f"Schedule:\n{formatted_schedule}\n")
                    txt_file.write("=" * 80 + "\n")

            if idx > 0:
                break

    # Save results to CSV
    results_df = pd.DataFrame(results)
    results_df.to_csv(output_csv_path, index=False)
    print(f"Results saved to {output_csv_path} and {output_txt_path}.")

# 6. MAIN ENTRY POINT

run_scheduling(input_csv_path = "/content/parsed_schedule_20_examples.csv")


Results saved to results_testing_with_SA.csv and results_testing_with_SA.txt.


## Comparison of Heuristics for Scheduling Optimization
#### Building on insights from the first code, this implementation fixes the weighting method (Method 2) and uses greedy initialization. It now compares three heuristic approaches: Min Conflict with Simulated Annealing, Min Conflict with Tabu Search, and Constraint-Guided Simulated Annealing—to determine the most effective method. Results are saved for performance analysis.

In [None]:
# 1. PARSE FUNCTIONS

def parse_temporal_requirements(temporal_requirements_str):
    try:
        return eval(temporal_requirements_str)
    except Exception as e:
        raise ValueError(f"Invalid temporal_requirements format: {e}")

def parse_shift_info(shift_info_str):
    try:
        shift_list = eval(shift_info_str)
        return [
            {
                'shift_name': s[0],
                'start': int(s[1]),
                'length': int(s[2]),
                'min_blocks': int(s[3]),
                'max_blocks': int(s[4])
            }
            for s in shift_list
        ]
    except Exception as e:
        raise ValueError(f"Invalid shift_info format: {e}")

def parse_not_allowed_sequences(not_allowed_sequences_str):
    try:
        return [tuple(seq) for seq in eval(not_allowed_sequences_str)]
    except Exception as e:
        raise ValueError(f"Invalid not_allowed_sequences format: {e}")

# 2. VALID SHIFTS

def get_valid_shifts(shift_defs):
    return [shift['shift_name'] for shift in shift_defs]

def get_possible_shifts_with_off_day(shift_defs):
    return [shift['shift_name'] for shift in shift_defs] + ['-']

# 3. GREEDY INITIALIZATION

def generate_greedy_schedule(row_data):
    temporal_requirements = parse_temporal_requirements(row_data["temporal_requirements"])
    shift_defs = parse_shift_info(row_data["shift_info"])
    schedule_length = int(row_data["schedule_length"])
    num_employees = int(row_data["num_employees"])

    # Extract valid shifts
    valid_shifts = get_valid_shifts(shift_defs)

    # Validate temporal requirements
    if len(temporal_requirements) != len(valid_shifts):
        raise ValueError(
            f"Mismatch: temporal_requirements has {len(temporal_requirements)} shifts, "
            f"but expected {len(valid_shifts)} shifts based on shift_info."
        )
    if any(len(req) != schedule_length for req in temporal_requirements):
        raise ValueError(
            f"Temporal requirements sublists must have length {schedule_length}. "
            f"Found {[len(req) for req in temporal_requirements]}."
        )

    schedule = {emp_id: ['-'] * schedule_length for emp_id in range(num_employees)}

    for day in range(schedule_length):
        # Sort shifts by demand descending
        shift_demand_order = sorted(
            valid_shifts,
            key=lambda sh: temporal_requirements[valid_shifts.index(sh)][day],
            reverse=True
        )

        for shift in shift_demand_order:
            employees_needed = temporal_requirements[valid_shifts.index(shift)][day]
            if employees_needed <= 0:
                continue  # Skip if no employees needed for this shift on this day

            # Sort employees by how many times they've had this shift to distribute fairly
            sorted_employees = sorted(schedule.items(), key=lambda x: x[1].count(shift))

            assigned = 0
            for emp_id, emp_schedule in sorted_employees:
                if schedule[emp_id][day] == '-' and assigned < employees_needed:
                    schedule[emp_id][day] = shift
                    assigned += 1
                if assigned >= employees_needed:
                    break  # Move to the next shift after fulfilling the demand

    return schedule

# 4. CONFLICT EVALUATION

def evaluate_conflicts(schedule, row_data):
    temporal_req = parse_temporal_requirements(row_data["temporal_requirements"])
    shift_defs = parse_shift_info(row_data["shift_info"])
    not_allowed_seq = parse_not_allowed_sequences(row_data["not_allowed_sequences"])
    schedule_len = int(row_data["schedule_length"])

    days_off_min = int(row_data["days_off_min"])
    days_off_max = int(row_data["days_off_max"])

    conflicts = 0
    weights = {"temporal": 2, "off_days": 2, "sequences": 1, "blocks": 1}

    # Extract shift names
    valid_shifts = get_valid_shifts(shift_defs)

    # Create a mapping for shift_name to index for faster access
    shift_name_to_index = {shift['shift_name']: idx for idx, shift in enumerate(shift_defs)}

    # (1) Temporal Requirements
    for day in range(schedule_len):
        shift_count = defaultdict(int)
        for emp_schedule in schedule.values():
            shift = emp_schedule[day]
            if shift in valid_shifts:
                shift_count[shift] += 1

        for shift in valid_shifts:
            assigned = shift_count.get(shift, 0)
            required = temporal_req[shift_name_to_index[shift]][day]
            diff = abs(assigned - required)
            conflicts += weights["temporal"] * diff

    # (2) Employee-specific constraints
    for emp_sched in schedule.values():
        segments = []
        curr_shift = emp_sched[0]
        curr_len = 1

        # Identify consecutive segments
        for d in range(1, schedule_len):
            if emp_sched[d] == curr_shift:
                curr_len += 1
            else:
                segments.append((curr_shift, curr_len))
                curr_shift = emp_sched[d]
                curr_len = 1
        segments.append((curr_shift, curr_len))  # Add the last segment

        # Check days-off constraints and shift/work block constraints
        for seg_type, seg_len in segments:
            if seg_type == '-':
                if seg_len < days_off_min:
                    conflicts += weights["off_days"] * (days_off_min - seg_len)
                elif seg_len > days_off_max:
                    conflicts += weights["off_days"] * (seg_len - days_off_max)
            elif seg_type in valid_shifts:
                shift_def = shift_defs[shift_name_to_index[seg_type]]
                min_b = shift_def['min_blocks']
                max_b = shift_def['max_blocks']
                if seg_len < min_b:
                    conflicts += weights["blocks"] * (min_b - seg_len)
                elif seg_len > max_b:
                    conflicts += weights["blocks"] * (seg_len - max_b)

        # Check not allowed sequences
        for day in range(schedule_len):
            if day > 0:
                prev_shift = emp_sched[day - 1]
                curr_shift = emp_sched[day]
                if (prev_shift, curr_shift) in not_allowed_seq:
                    conflicts += weights["sequences"]
            if day > 1:
                prev_prev_shift = emp_sched[day - 2]
                prev_shift = emp_sched[day - 1]
                curr_shift = emp_sched[day]
                if (prev_prev_shift, prev_shift, curr_shift) in not_allowed_seq:
                    conflicts += weights["sequences"]

    return conflicts

# 5. TABU SEARCH

def generate_neighbors(schedule, row_data):
    shift_defs = parse_shift_info(row_data["shift_info"])
    valid_shifts = get_valid_shifts(shift_defs)
    possible_shifts = valid_shifts + ['-']
    neighbors = []
    for _ in range(5):  # Generate 5 neighbors
        neighbor = {emp: days[:] for emp, days in schedule.items()}  # Deep copy
        emp_id = random.choice(list(neighbor.keys()))
        day = random.randint(0, len(neighbor[emp_id]) - 1)
        current_shift = neighbor[emp_id][day]
        possible_changes = [s for s in possible_shifts if s != current_shift]
        new_shift = random.choice(possible_changes)
        neighbor[emp_id][day] = new_shift
        move = (emp_id, day, new_shift)
        neighbors.append((neighbor, move))
    return neighbors

def tabu_search(schedule, row_data, max_iterations=1000, tabu_tenure=10):
    current_solution = schedule
    best_solution = schedule
    best_cost = evaluate_conflicts(best_solution, row_data)

    tabu_list = []
    max_tabu_size = tabu_tenure
    iterations = 0

    while iterations < max_iterations:
        neighbors = generate_neighbors(current_solution, row_data)
        best_neighbor = None
        best_neighbor_cost = float('inf')
        best_move = None

        for neighbor, move in neighbors:
            if move not in tabu_list:
                neighbor_cost = evaluate_conflicts(neighbor, row_data)
                if neighbor_cost < best_neighbor_cost:
                    best_neighbor = neighbor
                    best_neighbor_cost = neighbor_cost
                    best_move = move

        if best_neighbor is None:
            break  # No valid neighbors

        if best_neighbor_cost < best_cost:
            best_solution = best_neighbor
            best_cost = best_neighbor_cost

        current_solution = best_neighbor
        tabu_list.append(best_move)
        if len(tabu_list) > max_tabu_size:
            tabu_list.pop(0)
        iterations += 1

    return best_solution, best_cost

# 6. SIMULATED ANNEALING WITH CONSTRAINT-GUIDED STEP

def constraint_guided_step(schedule, row_data):
    shift_defs = parse_shift_info(row_data["shift_info"])
    valid_shifts = get_valid_shifts(shift_defs)
    possible_shifts = valid_shifts + ['-']
    new_schedule = {emp: days[:] for emp, days in schedule.items()}
    emp_id = random.choice(list(new_schedule.keys()))
    day = random.randint(0, len(new_schedule[emp_id]) - 1)
    original_shift = new_schedule[emp_id][day]
    best_shift = original_shift
    min_violations = float('inf')

    for shift in possible_shifts:
        new_schedule[emp_id][day] = shift
        violations = evaluate_conflicts(new_schedule, row_data)
        if violations < min_violations:
            min_violations = violations
            best_shift = shift
        new_schedule[emp_id][day] = original_shift  # Reset for next iteration

    new_schedule[emp_id][day] = best_shift
    return new_schedule

def simulated_annealing(schedule, row_data, max_evaluations=10000, temperature=1000, cooling_rate=0.995):
    current_solution = schedule
    best_solution = schedule
    best_cost = evaluate_conflicts(best_solution, row_data)
    evaluations = 0

    while evaluations < max_evaluations and temperature > 1:
        neighbor = constraint_guided_step(current_solution, row_data)
        neighbor_cost = evaluate_conflicts(neighbor, row_data)
        cost_diff = best_cost - neighbor_cost

        if cost_diff > 0 or random.random() < math.exp(cost_diff / temperature):
            current_solution = neighbor
            if neighbor_cost < best_cost:
                best_solution = neighbor
                best_cost = neighbor_cost

        temperature *= cooling_rate
        evaluations += 1

    return best_solution, best_cost

# 7. RUN COMPARISONS AND SAVE RESULTS

def run_scheduling(input_csv_path, output_csv_path="results_method_comparison.csv",
                   output_txt_path="results_method_comparison.txt"):
    try:
        data = pd.read_csv(input_csv_path)
    except Exception as e:
        raise FileNotFoundError(f"Error reading input CSV file: {e}")

    results = []

    with open(output_txt_path, "w") as txt_file:
        for idx, row in data.iterrows():
            try:
                row_data = {
                    "temporal_requirements": row["temporal_requirements"],
                    "shift_info": row["shift_info"],
                    "days_off_min": row["days_off_min"],
                    "days_off_max": row["days_off_max"],
                    "schedule_length": row["schedule_length"],
                    "num_employees": row["num_employees"],
                    "not_allowed_sequences": row["not_allowed_sequences"]
                }

                # Greedy initialization
                initial_schedule = generate_greedy_schedule(row_data)

                # Run SA + min conflict
                start_time = time.time()
                sa_schedule, sa_cost = simulated_annealing(initial_schedule, row_data)
                sa_time = time.time() - start_time

                # Run Tabu + min conflict
                start_time = time.time()
                tabu_schedule, tabu_cost = tabu_search(initial_schedule, row_data)
                tabu_time = time.time() - start_time

                # Run Constraint-Guided + SA
                start_time = time.time()
                cg_schedule, cg_cost = simulated_annealing(initial_schedule, row_data)
                cg_time = time.time() - start_time

                # Save results for each method
                for method, cost, time_taken, schedule in [("SA + Min Conflict", sa_cost, sa_time, sa_schedule),
                                                           ("Tabu + Min Conflict", tabu_cost, tabu_time, tabu_schedule),
                                                           ("Constraint-Guided + SA", cg_cost, cg_time, cg_schedule)]:
                    formatted_schedule = "\n".join([f"Emp {k}: {' '.join(v)}" for k, v in schedule.items()])
                    results.append({
                        "instance_name": row["filename"],
                        "method": method,
                        "time_in_sec": round(time_taken, 4),
                        "conflicts": cost,
                        "best_schedule": formatted_schedule
                    })

                    txt_file.write(f"Instance: {row['filename']}, Method: {method}\n")
                    txt_file.write(f"Time: {round(time_taken, 4)} sec, Conflicts: {cost}\n")
                    txt_file.write(f"Best Schedule:\n{formatted_schedule}\n")
                    txt_file.write("=" * 80 + "\n")

            except Exception as e:
                print(f"Error processing row {idx}: {e}")
                txt_file.write(f"Error processing row {idx}: {e}\n")
                txt_file.write("=" * 80 + "\n")

    # Save results to CSV
    try:
        results_df = pd.DataFrame(results)
        results_df.to_csv(output_csv_path, index=False)
        print(f"Results saved to {output_csv_path} and {output_txt_path}.")
    except Exception as e:
        raise IOError(f"Error saving results: {e}")

# 8. MAIN FUNCTION

run_scheduling(input_csv_path="/content/parsed_schedule_20_examples.csv")


Results saved to results_method_comparison.csv and results_method_comparison.txt.
