In [69]:
# %%
import pandas as pd
from datetime import datetime, timedelta
from ortools.sat.python import cp_model
import json


# Parameters
num_people = 30

num_days = 90  # Number of days for the rota

shifts = [
    '7am-3pm (weekday)', '10am-6pm (weekday)', '11am-7pm (weekday)',
    '7.30am-3.30pm (weekday)', '12pm-8pm (weekday)',
    '7am-3pm (weekend)', '12pm-8pm (weekend)', '9.00am-5.30pm', '7.30am-4pm', '8am-4pm (weekday)'
]

unique_shifts = [
        "7am-3pm (weekday)", "7.30am-3.30pm (weekday)", 
        "10am-6pm (weekday)", "11am-7pm (weekday)", "12pm-8pm (weekday)"
    ]

# Define people and exceptions
people = [f'Person {i + 1}' for i in range(num_people)]
exception_people = ['Exception_Person1', 'Exception_Person2']
excep_1 = ['Exception_Person1']
all_people = people + exception_people
people_new = people + excep_1


# Calculate the date of the next Monday
def get_next_monday():
    today = datetime.now().date()
    days_until_monday = (7 - today.weekday()) % 7  # Monday is 0
    return today + timedelta(days=days_until_monday)


# Convert JSON input to holidays DataFrame
def convert_json_to_holidays_df(json_data):
    """
    Converts JSON input into a holidays DataFrame.
    Expected format:
    {
        "holidays": [
            {"Person": "Person 1", "Holiday_date": "2024-12-04"},
            {"Person": "Person 2", "Holiday_date": "2024-12-05"}
        ]
    }
    """
    holidays_data = json_data.get("holidays", [])  # Default to empty list if not provided
    if not holidays_data:  # If holidays is empty or missing
        # Return an empty DataFrame with the required structure
        return pd.DataFrame(columns=["Person", "Holiday_date"])
    
    # Create a DataFrame and ensure the date column is in datetime format
    holidays_df = pd.DataFrame(holidays_data)
    holidays_df["Holiday_date"] = pd.to_datetime(holidays_df["Holiday_date"])
    return holidays_df


# Convert holidays DataFrame to offsets
def convert_holidays_df_to_int(df, all_people, rota_start_date):
    holidays = {person: set() for person in all_people}
    if not df.empty:
        for person, group in df.groupby('Person'):
            holiday_offsets = {
                (holiday_date.date() - rota_start_date).days for holiday_date in group['Holiday_date']
            }
            holidays[person] = holiday_offsets
    return holidays


# Solve the Model
def solve_model(json_input):
    """
    Accept JSON input for holidays, generate the rota, and return the result.
    """
    # Parse JSON holidays into a DataFrame
    holidays_data = json_input.get("holidays", [])
    holidays_df = convert_json_to_holidays_df({"holidays": holidays_data})
    start_date = get_next_monday()
    date_range = [start_date + timedelta(days=i) for i in range(num_days)]
    holidays_per_person = convert_holidays_df_to_int(holidays_df, all_people, start_date)


    # Initialize the CP-SAT model
    model = cp_model.CpModel()

    shift_assignments = {}
    rdo_assignments = {}
        # Create variables for shifts and RDOs
    for person in all_people:  # Include both general and exception people
        for date in date_range:
            for shift in shifts:
                shift_assignments[(person, date, shift)] = model.NewBoolVar(f'{person}_{date}_{shift}')
            rdo_assignments[(person, date)] = model.NewBoolVar(f'{person}_{date}_RDO')
    



### Soft constraints start


    all_penalties = []
    # Minimise changes to the rota when recalculating changes

    def minimize_shift_time_changes():
        """
        Soft constraint to minimize drastic shift time changes between consecutive working days.
        """
        shift_start_times = {
            '7am-3pm (weekday)': 7 * 60,
            '7.30am-3.30pm (weekday)': 7 * 60 + 30,
            '8am-4pm (weekday)': 8 * 60,
            '9.00am-5.30pm': 9 * 60,
            '10am-6pm (weekday)': 10 * 60,
            '11am-7pm (weekday)': 11 * 60,
            '12pm-8pm (weekday)': 12 * 60,
            '7am-3pm (weekend)': 7 * 60,
            '12pm-8pm (weekend)': 12 * 60,
            '7.30am-4pm': 7 * 60 + 30
        }

        penalties = []  # ✅ Correct storage

        for person in people_new:
            for day_idx in range(num_days - 1):
                date1 = date_range[day_idx]
                date2 = date_range[day_idx + 1]

                is_working_day1 = model.NewBoolVar(f"{person}_working_{date1}")
                is_working_day2 = model.NewBoolVar(f"{person}_working_{date2}")

                model.Add(is_working_day1 == sum(shift_assignments[(person, date1, shift)] for shift in shifts))
                model.Add(is_working_day2 == sum(shift_assignments[(person, date2, shift)] for shift in shifts))

                for shift1 in shifts:
                    for shift2 in shifts:
                        time_diff = abs(shift_start_times[shift1] - shift_start_times[shift2])
                        penalty = model.NewIntVar(0, 1, f"penaltyscn_{person}_{date1}_{shift1}_{date2}_{shift2}")

                        if time_diff > 120:  # ✅ Correct threshold
                            model.Add(penalty == 1).OnlyEnforceIf([
                                shift_assignments[(person, date1, shift1)],
                                shift_assignments[(person, date2, shift2)],
                                is_working_day1,
                                is_working_day2
                            ])
                            model.Add(penalty == 0).OnlyEnforceIf([
                                shift_assignments[(person, date1, shift1)].Not(),
                                shift_assignments[(person, date2, shift2)].Not()
                            ])
                        else:
                            model.Add(penalty == 0)

                        penalties.append(penalty)  # ✅ Store the penalty

        all_penalties.extend(penalties) 
        return penalties  # ✅ Return penalties

    def limit_consecutive_unique_shifts():
        """
        Adds a soft constraint to penalize employees who are assigned more than 2 consecutive unique shifts.
        """
        consecutive_shift_penalties = []

        for person in people_new:  # Applies to both regular and exception employees
            for day_idx in range(num_days - 2):  # Only check up to num_days - 2 to avoid out-of-bounds
                date1, date2, date3 = date_range[day_idx], date_range[day_idx + 1], date_range[day_idx + 2]

                # Boolean variables to check if the person is assigned unique shifts on these consecutive days
                is_shift1_unique = model.NewBoolVar(f"{person}_unique_shift_{date1}")
                is_shift2_unique = model.NewBoolVar(f"{person}_unique_shift_{date2}")
                is_shift3_unique = model.NewBoolVar(f"{person}_unique_shift_{date3}")

                # Define conditions for unique shifts
                model.Add(is_shift1_unique == sum(shift_assignments[(person, date1, shift)] for shift in unique_shifts))
                model.Add(is_shift2_unique == sum(shift_assignments[(person, date2, shift)] for shift in unique_shifts))
                model.Add(is_shift3_unique == sum(shift_assignments[(person, date3, shift)] for shift in unique_shifts))

                # Penalty if all 3 days have unique shifts
                penalty = model.NewBoolVar(f"penalty_consecutive_unique_{person}_{date1}_{date2}_{date3}")
                model.Add(penalty == 1).OnlyEnforceIf([is_shift1_unique, is_shift2_unique, is_shift3_unique])
                model.Add(penalty == 0).OnlyEnforceIf([is_shift1_unique.Not(), is_shift2_unique.Not(), is_shift3_unique.Not()])

                consecutive_shift_penalties.append(penalty)

        all_penalties.extend(consecutive_shift_penalties)  # Add to the global penalty list
        return consecutive_shift_penalties


    

### Soft constraint 2

    # Balancing unique shifts    
    def add_weekday_shift_balancing_constraint():
        """
        Generate penalties for balancing unique weekday shifts across all staff.
        Scales the average to 60% of the normal average for Exception_Person1 due to working 3 days/week.
        """
        weekday_shift_pens = []  # List to store penalties
        deviation_penalties = []  # List for weekday shift deviation penalties

        # Step 1: Calculate shift counts for all people and unique shifts
        shift_counts = {
            (person, shift): model.NewIntVar(0, num_days, f"{person}_{shift}_count")
            for person in people_new
            for shift in unique_shifts
        }

        # Populate shift counts for all people
        for person in people_new:
            for shift in unique_shifts:
                model.Add(
                    shift_counts[(person, shift)] == sum(
                        shift_assignments[(person, date, shift)]
                        for date in date_range
                        if date.strftime("%A") not in ["Saturday", "Sunday"]
                    )
                )

        # Step 2: Calculate avg_shifts_per_shift for all staff
        avg_shifts_per_shift = {}
        for shift in unique_shifts:
            # Total shifts assigned across all people for the current shift type
            total_shifts = model.NewIntVar(0, num_days * len(people_new), f"total_{shift}")
            model.Add(total_shifts == sum(shift_counts[(person, shift)] for person in people_new))
            
            # Calculate the average shifts per person for this shift type
            avg_shifts_per_shift[shift] = model.NewIntVar(0, num_days, f"avg_{shift}")
            model.AddDivisionEquality(avg_shifts_per_shift[shift], total_shifts, len(people_new))

        # Step 3: Add penalties for deviations
        for person in people_new:
            for shift in unique_shifts:
                if person == 'Exception_Person1':
                    # Scaled average for Exception_Person1 (60% of normal average)
                    scaled_avg = model.NewIntVar(0, num_days, f"Exception_Person1_scaled_avg_{shift}")
                    scaled_avg_calc = model.NewIntVar(0, num_days * 3, f"Exception_Person1_scaled_avg_calc_{shift}")
                    model.Add(scaled_avg_calc == avg_shifts_per_shift[shift] * 3)
                    model.AddDivisionEquality(scaled_avg, scaled_avg_calc, 5)

                    # Add penalties for deviations from the scaled average
                    deviation = model.NewIntVar(0, num_days, f"Exception_Person1_{shift}_deviation")
                    model.AddAbsEquality(
                        deviation,
                        shift_counts[(person, shift)] - scaled_avg
                    )
                else:
                    # Regular average for everyone else
                    deviation = model.NewIntVar(0, num_days, f"{person}_{shift}_shift_balancing_deviation")
                    model.AddAbsEquality(
                        deviation,
                        shift_counts[(person, shift)] - avg_shifts_per_shift[shift]
                    )

                weekday_shift_pens.append(deviation)  # Collect penalties for aggregation
                deviation_penalties.append((person, shift, deviation))  # Track individual deviations

        # Return both penalties for aggregation and tracking
        all_penalties.extend(weekday_shift_pens)  # Add to global list for debugging
        return weekday_shift_pens, deviation_penalties



    def add_maximum_penalty_constraint_ws(deviation_penalties):
        """
        Generate penalties to minimize the maximum deviation for weekday shifts.
        """
        weekday_shift_max_pens = []
        max_penalty = model.NewIntVar(0, num_days, "max_penalty")

        for person, _, penalty_var in deviation_penalties:
            if person != "Exception_Person1":  # Ignore Exception_Person1
                model.Add(penalty_var <= max_penalty)

        weekday_shift_max_pens.append(max_penalty)
        all_penalties.extend(weekday_shift_max_pens) 
        return weekday_shift_max_pens



    def add_fairness_constraint_ws(deviation_penalties):
        """
        Generate penalties to balance weekday shifts across all staff.
        """
        weekday_shift_dev_pens = []
        
        # Exclude Exception_Person1 from global penalty calculation
        filtered_penalties = [
            penalty_var for person, _, penalty_var in deviation_penalties if person != "Exception_Person1"
        ]
        
        global_penalty = model.NewIntVar(0, num_days * len(filtered_penalties), "global_penalty")
        model.Add(global_penalty == sum(filtered_penalties))

        avg_penalty = model.NewIntVar(0, num_days, "average_penalty")
        model.AddDivisionEquality(avg_penalty, global_penalty, len(people) - 1)  # Exclude Exception_Person1

        for person, shift, penalty_var in deviation_penalties:
            if person != "Exception_Person1":  # Ignore Exception_Person1
                person_penalty_deviation = model.NewIntVar(0, num_days, f"{person}_{shift}_balancing_fairness_deviation")
                model.AddAbsEquality(person_penalty_deviation, penalty_var - avg_penalty)
                weekday_shift_dev_pens.append(person_penalty_deviation)

        all_penalties.extend(weekday_shift_dev_pens)  # Add to global list for debugging
        return weekday_shift_dev_pens



### HARD CONSTRAINTS START

    # Minimum 22 people in at any time
    def add_minimum_people_in_constraint_v2():
        """
        Add a hard constraint ensuring at least 22 people are working on any weekday.
        Only counts people who are assigned a shift and not on a holiday.
        """
        valid_shifts = [
            "7am-3pm (weekday)",
            "7.30am-3.30pm (weekday)",
            "9.00am-5.30pm",
            "10am-6pm (weekday)",
            "11am-7pm (weekday)",
            "12pm-8pm (weekday)",
            "8am-4pm (weekday)"
        ]

        for date_offset in range(num_days):  # Iterate over day offsets
            # Only apply constraint to weekdays
            day_date = start_date + timedelta(days=date_offset)
            if day_date.weekday() < 5:  # Monday to Friday
                workers = []
                for person in all_people:
                    # Boolean variable to check if this person is working on this day
                    is_working = model.NewBoolVar(f"{person}_{day_date}_is_working")

                    # Add logic for "working" based on shift assignments and holidays
                    # Person must be assigned to at least one valid shift
                    model.Add(
                        sum(shift_assignments[(person, day_date, shift)] for shift in valid_shifts) >= 1
                    ).OnlyEnforceIf(is_working)

                    # Person must not be on a holiday
                    if person in holidays_per_person and date_offset in holidays_per_person[person]:
                        model.Add(is_working == 0)  # Not working if on a holiday

                    # Append this variable for the constraint
                    workers.append(is_working)

                # Ensure at least 22 people are "working"
                model.Add(sum(workers) >= 22)

    # Assign weekend shift
    def assign_weekend_shift(sub_group, weekend_dates, shift):
        """
        Assign a specific shift to a sub-group for a given weekend.
        Ensure exactly 3 people are assigned to the shift.
        """
        for date in weekend_dates:
            # Ensure exactly 3 people are assigned to the given shift
            model.Add(
                sum(shift_assignments[(person, date, shift)] for person in sub_group) == 3
            )

            # Ensure the same people are assigned to both days of the weekend
            for person in sub_group:
                model.Add(
                    shift_assignments[(person, weekend_dates[0], shift)] ==
                    shift_assignments[(person, weekend_dates[1], shift)]
                )

    # Stagger weekend shifts
    def staggered_weekend_shifts():
        """
        Assign staggered weekend shifts for groups in 5-week cycles.
        Alternate weekend shifts between '7am-3pm (weekend)' and '12pm-8pm (weekend)' for each group.
        """
        groups = [
            ['Person 1', 'Person 2', 'Person 3', 'Person 4', 'Person 5', 'Person 6'],
            ['Person 7', 'Person 8', 'Person 9', 'Person 10', 'Person 11', 'Person 12'],
            ['Person 13', 'Person 14', 'Person 15', 'Person 16', 'Person 17', 'Person 18'],
            ['Person 19', 'Person 20', 'Person 21', 'Person 22', 'Person 23', 'Person 24'],
            ['Person 25', 'Person 26', 'Person 27', 'Person 28', 'Person 29', 'Person 30']
        ]  # Divide 30 people into 5 groups
        cycle_length = 7 * 5  # 5-week cycle

        for cycle_start in range(0, len(date_range), cycle_length):
            for week_idx in range(5):  # Assign one weekend per group in each 5-week cycle
                # Calculate which group is assigned to this weekend
                group_idx = week_idx % len(groups)  # Cycle through groups
                group = groups[group_idx]

                # Split the group into sub-groups a and b
                sub_group_a = group[:len(group) // 2]
                sub_group_b = group[len(group) // 2:]

                # Determine the weekends for this cycle
                weekend_start = cycle_start + week_idx * 7
                saturday = date_range[weekend_start + 5] if weekend_start + 5 < len(date_range) else None
                sunday = date_range[weekend_start + 6] if weekend_start + 6 < len(date_range) else None

                if saturday and sunday:
                    # Alternate shifts between cycles
                    current_cycle = cycle_start // cycle_length  # Determine the current cycle number
                    if current_cycle % 2 == 0:
                        # Even cycles: Sub-group A gets '7am-3pm', Sub-group B gets '12pm-8pm'
                        assign_weekend_shift(sub_group_a, [saturday, sunday], "7am-3pm (weekend)")
                        assign_weekend_shift(sub_group_b, [saturday, sunday], "12pm-8pm (weekend)")
                    else:
                        # Odd cycles: Sub-group A gets '12pm-8pm', Sub-group B gets '7am-3pm'
                        assign_weekend_shift(sub_group_a, [saturday, sunday], "12pm-8pm (weekend)")
                        assign_weekend_shift(sub_group_b, [saturday, sunday], "7am-3pm (weekend)")

                    # Restrict everyone outside the assigned group from working weekend shifts
                    non_group_people = [person for person in people if person not in group]
                    for date in [saturday, sunday]:
                        for person in non_group_people:
                            for shift in ["7am-3pm (weekend)", "12pm-8pm (weekend)"]:
                                model.Add(shift_assignments[(person, date, shift)] == 0)

    # unique shift requirments
    def shift_requirements():
        # Define weekday-specific shift requirements
        weekday_requirements = {
            "7am-3pm (weekday)": 1,
            "7.30am-3.30pm (weekday)": 1,
            "10am-6pm (weekday)": 1,
            "11am-7pm (weekday)": 2,
            "12pm-8pm (weekday)": 2
        }

        for date in date_range:
            if date.strftime("%A") not in ['Saturday', 'Sunday']:  # Weekdays only
                # Add weekday shift requirements
                for shift, required_count in weekday_requirements.items():
                    shift_count = sum(
                        shift_assignments[(person, date, shift)]
                        for person in people
                    )
                    model.Add(shift_count == required_count)
            else:
                # Explicitly block weekday shifts on weekends
                for shift in weekday_requirements.keys():
                    for person in people:
                        model.Add(shift_assignments[(person, date, shift)] == 0)
    
    # Block weekday shifts on weekends
    def block_weekday_shifts_on_weekends():
        """
        Explicitly block all weekday shifts on weekends for every person and every weekend date.
        """
        weekday_shifts = [
            '7am-3pm (weekday)', 
            '7.30am-3.30pm (weekday)', 
            '10am-6pm (weekday)', 
            '11am-7pm (weekday)', 
            '12pm-8pm (weekday)', 
            '8am-4pm (weekday)'
        ]

        for date in date_range:
            if date.strftime("%A") in ['Saturday', 'Sunday']:  # Only apply to weekends
                for person in people:
                    for shift in weekday_shifts:
                        # Ensure weekday shifts cannot be assigned on weekends
                        model.Add(shift_assignments[(person, date, shift)] == 0)
    # People exceptions
    def exception1_constraints():
        """
        Ensure Exception_Person1 works any shift on Tuesday, Wednesday, and Thursday,
        like a regular person, but must be assigned exactly one shift on these days.
        """
        for date in date_range:
            day_of_week = date.strftime("%A")

            if day_of_week in ['Tuesday', 'Wednesday', 'Thursday']:
                # Ensure Exception_Person1 is not on an RDO on their working days
                model.Add(rdo_assignments[('Exception_Person1', date)] == 0)

                # Assign exactly one shift, like regular people
                model.Add(sum(shift_assignments[('Exception_Person1', date, shift)] for shift in shifts) == 1)

            else:
                # Ensure Exception_Person1 is on an RDO for non-working days
                model.Add(rdo_assignments[('Exception_Person1', date)] == 1)
                # Ensure no shifts are assigned on RDO days
                model.Add(sum(shift_assignments[('Exception_Person1', date, shift)] for shift in shifts) == 0)


    # People exceptions
    def exception2_constraints():
        """
        Ensure Exception_Person2 works specific shifts on specific days.
        """
        for date in date_range:
            day_of_week = date.strftime("%A")
            if day_of_week == 'Sunday':
                # Only allow shift '7.30am-4pm' on Sunday
                model.Add(sum(shift_assignments[('Exception_Person2', date, shift)] for shift in shifts if shift != '7.30am-4pm') == 0)
                model.Add(shift_assignments[('Exception_Person2', date, '7.30am-4pm')] == 1)
            elif day_of_week == 'Monday':
                # Only allow shift '9.00am-5.30pm' on Monday
                model.Add(sum(shift_assignments[('Exception_Person2', date, shift)] for shift in shifts if shift != '9.00am-5.30pm') == 0)
                model.Add(shift_assignments[('Exception_Person2', date, '9.00am-5.30pm')] == 1)
            elif day_of_week == 'Tuesday':
                # Only allow shift '9.00am-5.30pm' on Tuesday
                model.Add(sum(shift_assignments[('Exception_Person2', date, shift)] for shift in shifts if shift != '9.00am-5.30pm') == 0)
                model.Add(shift_assignments[('Exception_Person2', date, '9.00am-5.30pm')] == 1)
            else:
                # Ensure Exception_Person1 is on an RDO for non-working days
                model.Add(rdo_assignments[('Exception_Person2', date)] == 1)
                # Ensure no shifts are assigned on RDO days
                model.Add(sum(shift_assignments[('Exception_Person2', date, shift)] for shift in shifts) == 0)


    # Minumum 2 RDO's
    def Rdos():
        """
        Ensure every person has exactly 2 RDOs per week
        """
        for person in people:  # Include both groups
            for week_start in range(0, num_days, 7):
                rdo_count = sum(
                    rdo_assignments[(person, date_range[week_start + i])]
                    for i in range(7) if week_start + i < num_days
                )
                model.Add(rdo_count == 2)

                # Maximum of 5 working days per week (2 RDOs)
                working_days_count = sum(
                    sum(shift_assignments[(person, date_range[week_start + i], shift)] for shift in shifts)
                    for i in range(7) if week_start + i < num_days
                )
                model.Add(working_days_count <= 5)


    # One shift per day
    def oneshiftperday():
        # One shift per day
        for person in all_people:
            for date in date_range:
                model.Add(sum(shift_assignments[(person, date, shift)] for shift in shifts) <= 1)


    # Make sure a shift is assigned
    def assign_shift():
        """
        Ensure that each person is assigned exactly one valid shift per day, unless it's a holiday or RDO.
        """
        for person in all_people:
            for date in date_range:
                is_weekend = date.strftime("%A") in ['Saturday', 'Sunday']
                valid_shifts = ['7am-3pm (weekend)', '12pm-8pm (weekend)'] if is_weekend else [
                    '7am-3pm (weekday)', '10am-6pm (weekday)', '11am-7pm (weekday)', 
                    '7.30am-3.30pm (weekday)', '12pm-8pm (weekday)', '8am-4pm (weekday)'
                ]

                # Ensure exactly one shift is assigned per day (only enforce if no RDO)
                model.Add(sum(shift_assignments[(person, date, shift)] for shift in valid_shifts) == 1).OnlyEnforceIf(
                    rdo_assignments[(person, date)].Not()
                )

### Debuggers


    def debug_penalties(solver):
        """
        Debug penalties and display the details of penalties assigned.

        Args:
            solver (cp_model.CpSolver): The solver instance after solving.
        """
        print(f"\n--- Debugging All Penalties ---")
        total_penalty = 0

        for penalty_var in all_penalties:
            penalty_value = solver.Value(penalty_var)
            if penalty_value > 0:
                print(f"Penalty Variable: {penalty_var.Name()} = {penalty_value}")
                total_penalty += penalty_value

        print(f"Total Penalty: {total_penalty}\n")


    # Hard Constraints
    oneshiftperday()
    Rdos()
    staggered_weekend_shifts()
    shift_requirements()
    add_minimum_people_in_constraint_v2()
    block_weekday_shifts_on_weekends()
    assign_shift()
    
    # Hard constraints, exceptions
    exception1_constraints()
    exception2_constraints()

    # Soft constraints

        # Create penalties weights
    soft_constraint_weights = {
        "weekday_balancing": 100,
        "fairness_weekday_shifts": 50,
        "max_penalty_ws": 50,
        "time_diff": 1,
        "limit_consecutive_unique_shifts": 1  # New soft constraint to avoid consecutive unique shifts
    }
    
        # Collect penalties

        # Balancing unique shifts across people
    weekday_balancing_penalties, weekday_shift_deviation_penalties = add_weekday_shift_balancing_constraint()
    weekday_fairness_penalties = add_fairness_constraint_ws(weekday_shift_deviation_penalties)
    weekday_max_penalty = add_maximum_penalty_constraint_ws(weekday_shift_deviation_penalties)

        # Minimise shift time changes
    min_time_diff = minimize_shift_time_changes()

        # Assign fairly weighted shift types

    # Limit consecutive unique shifts
    consecutive_unique_shift_penalties = limit_consecutive_unique_shifts()

    # Aggregate penalties with weights
    model.Minimize(
        soft_constraint_weights["weekday_balancing"] * sum(weekday_balancing_penalties) +
        soft_constraint_weights["fairness_weekday_shifts"] * sum(weekday_fairness_penalties) +
        soft_constraint_weights["max_penalty_ws"] * sum(weekday_max_penalty) +
        
        soft_constraint_weights["time_diff"] * sum(min_time_diff) +
        
        soft_constraint_weights["limit_consecutive_unique_shifts"] * sum(consecutive_unique_shift_penalties)  # New penalty
    )

    # Solve the issue
    solver = cp_model.CpSolver()
    solver.parameters.max_time_in_seconds = 600
    status = solver.Solve(model)

    if status not in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
        raise Exception("No feasible solution found.")

    debug_penalties(solver)

    results = []

    for person in people + exception_people:
        for date_idx, date in enumerate(date_range):
            date_offset = (date - start_date).days  # Use start_date here
            if date_offset in holidays_per_person[person]:
                results.append({
                    'Person': person,
                    'Date': date.strftime('%Y-%m-%d'),  # Format date here
                    'Shift': 'Holiday'
                })
            else:
                shift_assigned = False
                for shift in shifts:
                    if solver.Value(shift_assignments[(person, date, shift)]) == 1:
                        results.append({
                            'Person': person,
                            'Date': date.strftime('%Y-%m-%d'),  # Format date here
                            'Shift': shift
                        })
                        shift_assigned = True
                        break

                if not shift_assigned and solver.Value(rdo_assignments[(person, date)]) == 1:
                    results.append({
                        'Person': person,
                        'Date': date.strftime('%Y-%m-%d'),  # Format date here
                        'Shift': 'RDO'
                    })


    # Convert results to a DataFrame
    results_df = pd.DataFrame(results)

    return results_df


In [70]:
import random
import json
from datetime import datetime, timedelta

def generate_holidays(people, num_days, start_date, avg_per_person):
    """
    Generates random holidays for a list of people over a specified date range (working days only).

    Parameters:
        people (list): List of people (names).
        num_days (int): Total number of days in the date range.
        start_date (str): Start date in the format 'YYYY-MM-DD'.
        avg_per_person (int): Average number of holidays per person.

    Returns:
        list: List of dictionaries containing the holidays for each person in the specified format.
    """

    # Generate the date range excluding weekends
    date_range = [start_date + timedelta(days=i) for i in range(num_days) 
                  if (start_date + timedelta(days=i)).weekday() < 5]

    # List to store holidays in the desired format
    holidays_list = []

    # Assign holidays to each person
    for person in people:
        # Determine the number of holidays for this person (around the average with some randomness)
        num_holidays = max(0, int(random.gauss(avg_per_person, avg_per_person * 0.5)))

        # Randomly sample dates for holidays
        person_holidays = random.sample(date_range, min(num_holidays, len(date_range)))
        for holiday in person_holidays:
            holidays_list.append({"Person": person, "Holiday_date": holiday.strftime("%Y-%m-%d")})

    # Return the holidays as a list of dictionaries
    return holidays_list


# Example usage
num_days = 90
avg_per_person = 0
num_people = 30
people = [f'Person {i + 1}' for i in range(num_people)]
start_date = get_next_monday()

# Generate holidays
output = generate_holidays(people, num_days, start_date, avg_per_person)
print(output)

[]


In [73]:
json_req = {
  "holidays": output,
}


In [None]:
data = solve_model(json_req)

In [67]:
# Pivot the DataFrame to show shifts assigned
assigned_pivot_df = data.pivot(index='Person', columns='Date', values='Shift')


  # Fill NaN with 'Off' or any other meaningful string
import pandas as pd
import numpy as np

# Assuming assigned_pivot_df is your actual pivoted DataFrame and already created
# Convert column headers to datetime if not already
assigned_pivot_df.columns = pd.to_datetime(assigned_pivot_df.columns, errors='coerce')

# Check for any conversion issues (e.g., columns that couldn’t be converted)
if assigned_pivot_df.columns.isnull().any():
    raise ValueError("Some columns could not be converted to datetime format.")

# Format columns as desired 'ddd dd-mm-yy'
formatted_dates = assigned_pivot_df.columns.strftime('%a %d-%m-%y')
assigned_pivot_df.columns = formatted_dates

# Define colors for shifts, RDO, and holidays
def highlight_shifts(s):
    colors = {
        "RDO": "background-color: #b3b3ff;",  # Custom color for RDOs
        "Holiday": "background-color: #ffd9b3;",   # Custom color for Holidays
        "7am-3pm (weekday)": "background-color: #ccffcc;",  # Shift color for 7am-3pm
        "7.30am-3.30pm (weekday)": "background-color: #ccff99;",  # Shift color for 7:30am-3pm
        "12pm-8pm (weekday)": "background-color: #ffcc99;",  # Shift color for 12pm-8pm
        "10am-6pm (weekday)": "background-color: #ffffcc;",   # Shift color for 10am-6pm
        "11am-7pm (weekday)": "background-color: #ffb3ff;",   # Shift color for 11am-7pm
        "8am-4pm (weekday)": "background-color: #cce6ff;",   # Shift color for 8am-4pm
        "7am-3pm (weekend)": "background-color: #ff9999;",   # Shift color for 7am-3pm (weekend)
        "12pm-8pm (weekend)": "background-color: #ff6666;",   # Shift color for 12pm-8pm (weekend)
        "Off": "background-color: white;"             # Color for Off days
    }
    return [colors.get(val, '') for val in s]  # Default to no color if not found

# Generate summary rows for each column
summary_rows = {}
shift_types = [
    "7am-3pm (weekday)", "7.30am-3.30pm (weekday)", "7.30-4pm (weekday)", 
    "8am-4pm (weekday)", "10am-6pm (weekday)", "11am-7pm (weekday)", 
    "12pm-8pm (weekday)", "7am-3pm (weekend)", "12pm-8pm (weekend)", 
    "Holiday", "RDO"
]

for col in assigned_pivot_df.columns:
    col_summary = {}
    
    # Count each shift, RDO, and Holiday for the column
    for shift in shift_types:
        col_summary[shift] = (assigned_pivot_df[col] == shift).sum()
    
    # Calculate total people in (i.e., not on RDO or Holiday)
    col_summary["Total in"] = len(assigned_pivot_df) - (
        col_summary.get("Holiday", 0) + col_summary.get("RDO", 0)
    )
    
    summary_rows[col] = col_summary

# Convert summary_rows to DataFrame
summary_df = pd.DataFrame(summary_rows).T

# Reorder summary rows
summary_order = [
    "7am-3pm (weekday)", "7.30am-3.30pm (weekday)", "8am-4pm (weekday)", 
    "10am-6pm (weekday)", "11am-7pm (weekday)", "12pm-8pm (weekday)", 
    "7am-3pm (weekend)", "12pm-8pm (weekend)", "Holiday", "RDO",  "Total in"
]

# Dynamically include only existing columns
existing_columns = [col for col in summary_order if col in summary_df.columns]
summary_df = summary_df[existing_columns]

# Concatenate summary with the main DataFrame
assigned_with_summary = pd.concat([assigned_pivot_df, summary_df.T])

# Apply highlighting to the DataFrame
styled_assigned_df = assigned_with_summary.style.apply(highlight_shifts, axis=1)

### Second Sheet (Calculating Weekends Worked)
rota_summary = pd.DataFrame(index=assigned_pivot_df.index)

# Count each shift type for each person
for shift in shift_types:
    rota_summary[shift] = (assigned_pivot_df == shift).sum(axis=1)

# Count holidays and RDOs for each person
special_days = ["Holiday", "RDO"]
for day_type in special_days:
    rota_summary[day_type] = (assigned_pivot_df == day_type).sum(axis=1)

# Calculate weekends worked per person
weekend_columns = [col for col in assigned_pivot_df.columns if col.startswith('Sat') or col.startswith('Sun')]
assigned_pivot_weekends = assigned_pivot_df[weekend_columns]
rota_summary["Weekends Worked"] = assigned_pivot_weekends.apply(
    lambda row: row[(row != "RDO") & (row != "Holiday")].isin(shift_types).sum(), axis=1
)

# Total Shifts
rota_summary["Total Shifts"] = rota_summary[shift_types].sum(axis=1)

### Penalties Section



### Save Everything to Excel
output_file = r"C:\Users\olivia\Downloads\rota.xlsx"

with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
    # Write the styled DataFrame with summary to the first sheet
    styled_assigned_df.to_excel(writer, sheet_name='Color_Coded_Rota', index=True)
    
    # Write the summary DataFrame to the second sheet
    rota_summary.to_excel(writer, sheet_name='Rota_Summary', index=True)


print("Combined Rota workbook has been created successfully.")




Combined Rota workbook has been created successfully.
