### Modelling the problem

Modelling a small example where we have $X$ employees, and $Y$ days.

Let $x \in X$ and $y \in Y$.

$\forall x \in X$ and $\forall y \in Y$

If employee $x$ has day $y$ off, $L (x,y) = 1$

Therefore $L$ is an $X$ by $Y$ matrix.

$$
L_{x,y} = 
\begin{pmatrix}
a_{1,1} & a_{1,2} & \cdots & a_{1,y} \\
a_{2,1} & a_{2,2} & \cdots & a_{2,y} \\
\vdots & \vdots & \ddots & \vdots \\
a_{x,1} & a_{x,2} & \cdots & a_{x,y} 
\end{pmatrix}
$$

Let us limit this to only a week for testing purposes, and only use a small number of staff members.

Let $X = 10$ and $Y = 5$.

We also have an employee quota ( $q$ ) of $75\%$ meaning that this percentage of employees must be present at all times. This is a hard constraint.

Therefore, 
$$\forall y \in Y,  \sum_{x=0}^X L(x,y) \geq (|X| * .75)$$

Finally, there is a staff leave allowance ( $a$ ) of maximum 2 days per person. This is a hard constraint. In a real model, this would be equal to the staff member's remaining holiday entitlement.

Therefore, 
$$\forall x \in X, \sum_{y=0}^Y L(x,y) \leq 2  $$

The preference of leave assignments for all employees in range $X$ is given by an $X * Y$ matrix $P$, where if $P(x,y) = 1$, the employee has requested this day off.

If an employee does not request a day off, then we do not want the algorithm to assign a day off to them.

Therefore,

$$ L(x,y) \leq P(x,y)  \quad \forall x \in X , \forall y \in Y $$



In [836]:
from ortools.sat.python import cp_model
import numpy as np
import pandas as pd


In [837]:
# num_staff = 5
# num_days = 5
# daily_quotas = [2, 2, 2, 2, 2]
# leave_allowance = [1,1,1,1,1]
# preference_matrix = [[1,1,1,0,0],[0,0,0,0,0],[0,0,0,0,0],[1,0,0,1,0],[0,0,0,0,1]]

import random
random.seed(30)


num_staff = 50
num_days = 20
daily_quotas = [random.randint(1, 15) for d in range(num_days)]
leave_allowance = [random.randint(1, 20) for e in range(num_staff)]

preference_matrix = [[1 if random.randint(0,100) < 20 else 0 for d in range(num_days)]
    for e in range(num_staff)]


p = preference_matrix



# num_ones = int(num_staff * num_days * 0.9)
# num_zeros = (num_staff * num_days) - num_ones
# array = np.array([1] * num_ones + [0] * num_zeros)
# np.random.shuffle(array)

# p = array.reshape(num_staff,num_days)


print(f"Preference Matrix: \n {p}")



Preference Matrix: 
 [[0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1], [0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0], [0, 1, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0], [0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1], [0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0], [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0], [0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0], [0, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1], [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1], [0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0,

In [838]:
model = cp_model.CpModel()

If employee $x$ has day $y$ off, $L (x,y) = 1$

In [839]:
l = {}
for e in range(num_staff):
    for d in range(num_days):
        l[(e, d)] = model.new_bool_var(f"L_{e}_y{d}")

Ensuring holiday entitlement is not exceeded (2 day limit).

$\forall x \in X, \sum_{y=0}^Y L(x,y) \leq 2  $


In [840]:
for e in range(num_staff):
    model.Add(sum(l[e, d] for d in range(num_days)) <= leave_allowance[e])

Ensuring that minimum staff coverage is satisfied (75% minimum)

$\forall y \in Y,  \sum_{x=0}^X L(x,y) \geq (|X| * .75)$

In [841]:


for d in range(num_days):
    model.Add(sum(l[e, d] for e in range(num_staff)) <= daily_quotas[d])



Objective function - maximise the leave which matches a persons preference, and minimise leave that doesn't match a preference.

The line below aims to maximise $P( x , y ) \times L( x , y )$ for each staff member 
$ x \in X $ and for each day $ y \in Y $ .

If a staff member hasn't asked for a day off ( meaning $ P(x,y) = 0 $ ), the algorithm shouldn't give them a day off, as $ P( x , y ) \times L( x , y ) = 0 \times 1 = 0 $

Likewise, if a staff member has asked for a day off ( meaning $ P(x,y) = 1 $ ), the algorithm should try to give them a day off, as $ P( x , y ) \times L( x , y ) = 1 \times 1 = 1 $




In [842]:
model.Maximize(sum(p[e][d] * l[e, d] for e in range(num_staff) for d in range(num_days)))

In [843]:

class SolutionPrinter(cp_model.CpSolverSolutionCallback):

    def __init__(self, num_staff, num_days, l, limit):
        cp_model.CpSolverSolutionCallback.__init__(self)
        self._num_staff = num_staff
        self._num_days = num_days
        self._leave = l
        self._solution_count = 0
        self._solution_limit = limit


    def on_solution_callback(self):

        self._solution_count += 1

        print(f"Solution {self._solution_count}")

        for d in range(self._num_days):
            print(f"Day {d+1}")
            for s in range(self._num_staff):
                is_working = False
                if self.value(self._leave[(s, d)]):
                    is_working = True
                    print(f"  Employee {s+1} granted leave")
                if not is_working:
                    print(f"  Employee {s+1} does not work")

        if self._solution_count >= self._solution_limit:
            print(f"Stop search after {self._solution_limit} solutions")
            self.stop_search()



    def solutionCount(self):
        return self._solution_count



In [844]:
# Solve model

solver = cp_model.CpSolver()
# solver.parameters.linearization_level = 0
solver.parameters.enumerate_all_solutions = True
# solver.parameters.num_search_workers = 8
solver.parameters.max_time_in_seconds = 30.0
solver.parameters.log_search_progress = True
solver.parameters.enumerate_all_solutions = True
solver.log_callback = print  # (str)->None
solver.parameters.log_to_stdout = False
solution_limit = 10


solution_printer = SolutionPrinter(num_staff, num_days, l, solution_limit)


status = solver.Solve(model, solution_printer)


Starting CP-SAT solver v9.11.4210
Parameters: max_time_in_seconds: 30 log_search_progress: true enumerate_all_solutions: true log_to_stdout: false

Initial optimization model '': (model_fingerprint: 0xc41ec2762d8379c6)
#Variables: 1'000 (#bools: 209 in objective)
  - 1'000 Booleans in [0,1]
#kLinearN: 70 (#terms: 2'000)

Starting presolve at 0.00s
  8.96e-04s  0.00e+00d  [operations_research::sat::CpModelPresolver::PresolveToFixPoint] #num_loops=1 
  1.08e-05s  0.00e+00d  [operations_research::sat::CpModelPresolver::ExtractEncodingFromLinear] #potential_supersets=7 
  8.43e-05s  0.00e+00d  [operations_research::sat::CpModelPresolver::DetectDuplicateConstraintsWithDifferentEnforcements] 
  4.83e-03s  1.03e-03d  [operations_research::sat::CpModelPresolver::Probe] #probed=2'000 
  1.46e-04s  0.00e+00d  [MaxClique] 
  5.39e-04s  0.00e+00d  [operations_research::sat::CpModelPresolver::PresolveToFixPoint] #num_loops=1 
  5.30e-04s  0.00e+00d  [operations_research::sat::CpModelPresolver::Pro

In [845]:

def highlight_cells(val):
    return f'color: #00ff15' if val == 1 else  ''

# function taken from stack overflow
def highlight_diff(data, other, color='#ff616b'):
    attr = f'background-color: {color}'
    return pd.DataFrame(np.where(data.ne(other), attr, ''),
                        index=data.index, columns=data.columns)

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    solutionArray = [[solver.Value(l[s,d]) for d in range(num_days)] for s in range(num_staff)]

    df = pd.DataFrame(solutionArray, columns=[f'Day {d+1}' for d in range(num_days)], index=[f'Employee {x+1}' for x in range(num_staff)])

    df2 = pd.DataFrame(p, columns=[f'Day {d+1}' for d in range(num_days)], index=[f'Employee {x+1}' for x in range(num_staff)])

    df_styled = df.style.apply(highlight_diff, axis=None, other=df2).applymap(highlight_cells)

    diffCount = (df!=df2).sum().sum()
    display(df_styled)

    preference_leave_days_count = df2.sum().sum()
    assigned_leave_days_count = df.sum().sum()
    print(f"Preference Leave Days Count: {preference_leave_days_count}")
    print(f"Assigned Leave Days Count: {assigned_leave_days_count}")


    print(f"Difference Count: {diffCount} ")

    print(f"Similarity Percentage of all days: {(1-(diffCount / (num_staff * num_days))) * 100:.2f}%")

    print(f"Percentage of leave granted: {(assigned_leave_days_count / preference_leave_days_count) * 100:.2f}%")

    print(f"Runtime (Wall Time): {solver.WallTime()} seconds")


    # print("Preference Matrix")
    # df2_styled = df2.style.applymap(highlight_cells)
    # display(df2_styled)

else:
    print("No feasible solution found.")



  df_styled = df.style.apply(highlight_diff, axis=None, other=df2).applymap(highlight_cells)


Unnamed: 0,Day 1,Day 2,Day 3,Day 4,Day 5,Day 6,Day 7,Day 8,Day 9,Day 10,Day 11,Day 12,Day 13,Day 14,Day 15,Day 16,Day 17,Day 18,Day 19,Day 20
Employee 1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
Employee 2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1
Employee 3,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0
Employee 4,0,1,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,1,0
Employee 5,0,1,0,0,0,0,1,1,0,1,0,0,0,0,0,0,0,0,1,0
Employee 6,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1
Employee 7,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0
Employee 8,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,1,0
Employee 9,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0
Employee 10,0,1,0,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0


Preference Leave Days Count: 209
Assigned Leave Days Count: 127
Difference Count: 82 
Similarity Percentage of all days: 91.80%
Percentage of leave granted: 60.77%
Runtime (Wall Time): 0.038790700000000004 seconds


In [846]:
# solution_limit = 5
# solution_printer = SolutionPrinter(num_staff, num_days, l, solution_limit)

# solver = cp_model.CpSolver()
# # solver.parameters.linearization_level = 0
# solver.parameters.log_search_progress = True
# solver.parameters.enumerate_all_solutions = True
# solver.log_callback = print  # (str)->None
# solver.parameters.log_to_stdout = False

# solver.Solve(model, solution_printer)

# # Statistics.
# print("\nStatistics")
# print(f"  - conflicts: {solver.num_conflicts}")
# print(f"  - branches : {solver.num_branches}")
# print(f"  - wall time: {solver.wall_time}s")