# Integer Program Formulation
This notebook outlines an integer programming perspective on the Covid-19 scheduling problem. In its current iteration, this model schedules people on a week-by-week basis, ensuring that people take $F_i = 2$ tests per week (if feasible, i.e. available at least 2 days) and attempting to keep the number of tests on any given day below a reasonable bound.

The optimized variable is a $assignment$, a $(n\_people, n\_days)$ binary matrix where each row is the weekly schedule for one person, where 0 means 'no test' and 1 means 'test'. We can incorporate $avail$, which is a $(n\_people, n\_days)$ binary matrix that marks on which days people are on campus (0 is not on campus, 1 is on campus and available to test). This way, multiplying $avail$ by $assignment$ elementwise and summing across rows gives a $ n\_people $ vector with the counts of well-scheduled tests that week for each person. Moreover, summing $ assignment$ across columns gives the total number of tests per day, which we want to keep fairly even.

In [1]:
import numpy as np
import cvxpy as cp
import matplotlib.pyplot as plt
from matplotlib import colors

# setup

In [2]:
n_days = 7
n_people = 20
test_interval = 3.5 # corresponds to Qs_i
n_tests = n_days / test_interval # corresponds to F_i
buffer = .05

In [3]:
def runSetup():
    # create random availability matrix
    avail = np.random.randint(0,2, size=(n_people, n_days))
    
    # Identify people who have infeasible availabilities
    people_total_availability = np.sum(avail, 1)
    infeasible_people = np.where(people_total_availability < n_tests)
    
    # If people have 2 or fewer available days, their schedule can't be shifted
    # to account for optimal spacing
    unshiftable_people = np.where(people_total_availability <= n_tests)

    # beq tracks how many tests people should take — 2 if possible, fewer if they're less available
    beq = np.multiply(tests_per_week, np.ones((n_people,1)))
    for i in range(len(infeasible_people[0])):
        beq[infeasible_people[0][i]][0] = people_total_availability[infeasible_people[0][i]]
        
    # Setup for the optimal spacing constraint — see optimization constraints below
    spaces = np.ones((n_people,))
    for i in range(len(unshiftable_people[0])):
        spaces[unshiftable_people[0][i]] += 1 

    # Calculate the average number of tests per day
    b_value = (n_people*n_tests/n_days)
    b = np.multiply(b_value, np.ones((n_days,1)))
    
    return avail, b, beq, spaces

# cvxpy optimization

In [4]:
avail, b, beq, spaces = runSetup()

def runOpt():
    assignment = cp.Variable((n_people, n_days), boolean=True)

    # Working on an objective function to track schedule spacing
    # (not working right now, not implemented)
    def objective():
        expr = []
        for i in range(n_days):
            for j in range(i + 1, n_days):
                penalty = ((j - i) - test_interval)**2
                expr.append(penalty * cp.sum(assignment[:, i], assignment[:, j]))
        expr.append(cp.sum(expr))
        
    # Helper matrix to identify consecutive tests
    adjacencies = np.zeros((n_days-1, n_days))
    for i in range(n_days-1):
        adjacencies[i][i] = 1
        adjacencies[i][i+1] = 1

    constraints = [
        cp.sum(assignment, axis=0) <= np.ceil(b.transpose()[0] * (1 + buffer)), # set load upper bound
        cp.sum(assignment, axis=0) >= np.floor(b.transpose()[0] * (1 - buffer)), # set load lower bound
        cp.sum(assignment, axis=1) == beq.transpose()[0], # enforce taking F_i tests
        cp.sum(cp.multiply(assignment, avail), axis=1) == beq.transpose()[0] # align with availability
    ]
    
    # Ensure each person has no consecutive days scheduled, unless that's all they can do
    for i in range(6):
        consec = adjacencies[i]*np.ones((n_people,1))
        constraints.append(cp.sum(cp.multiply(assignment, consec), axis=1) <= spaces)

#     obj = cp.Minimize(objective()) # Spread objective function (not working)
    obj = cp.Maximize(cp.sum(assignment)) # Dummy objective function

    prob = cp.Problem(obj, constraints=constraints)
    prob.solve()

    return assignment

def analyze(assignment):
    # Compute spread score (lower is better)
    score = 0
    for week in assignment.value:
        scheduled_days = []
        for i in range(n_days):
            if week[i] == 1:
                scheduled_days.append(i)
        if len(scheduled_days) == 2:
            score += ((scheduled_days[1] - scheduled_days[0]) - test_interval) ** 2
    spreadScore = score / n_people
    
    # Compute daily demands, deviations
    daily_demands = np.sum(assignment.value, axis=0)
    exp_supply_per_day = n_people * n_tests / n_days
    max_deviation = max(daily_demands.max() - exp_supply_per_day,
                        exp_supply_per_day - daily_demands.min()) / exp_supply_per_day # MIN, right?
    
    return spreadScore, daily_demands, max_deviation

assignment = runOpt()

if assignment.value is not None:
    spreadScore, daily_demands, max_deviation = analyze(assignment)
else:
    print("Situation infeasible")

NameError: name 'tests_per_week' is not defined

# analysis

In [None]:
print("Spread score is: %.02f" % spreadScore)
print("Max deviation %% is: %.01f" % (100 * max_deviation))
print('Daily demand (min):', daily_demands.min())
print('Daily demand (max):', daily_demands.max())
print('All daily demands:', daily_demands)

# Display Calendar
scaled_assignment = np.multiply(assignment.value, 0.5)
data = np.add(scaled_assignment, avail)

outputColors = colors.ListedColormap(['gray', 'red', 'yellow', 'green'])
availColors = colors.ListedColormap(['gray', 'yellow'])

plt.title("Output Vector", fontsize=16)
plt.imshow(data, cmap=outputColors, aspect='auto', extent=[0, n_days, n_people, 0])
plt.grid(which='major', axis='both', linestyle='-', color='k', linewidth=1.2)
plt.xticks(np.arange(0,n_days, 1))
plt.yticks(np.arange(0, n_people, 1))

plt.xlabel("Day", fontsize=12)
plt.ylabel("Person", fontsize=12)
plt.savefig('output.png', dpi=300, bbox_inches='tight')
plt.show()

# running many trials
to check the distributions of spread scores, load balancing deviations

In [None]:
%%time

spreads = []
devs = []
infeasible = 0
for t in range(1000):
#     if (t%100 == 0):
#         print(t)
    avail, b, beq, spaces = runSetup()
    assignment = runOpt()
    if assignment.value is not None:
        spreadScore, daily_demands, max_deviation = analyze(assignment)
        spreads.append(spreadScore)
        devs.append(100 * max_deviation)
    else:
        infeasible += 1
print("There were %d infeasible situations" % infeasible)

In [None]:
plt.title("Distribution of Spread Scores")
plt.hist(spreads)
plt.savefig('spreads.png', dpi=300, bbox_inches='tight')
plt.show()

plt.title("Distribution of Max Deviations (%)")
plt.hist(devs)
plt.savefig('maxDevs.png', dpi=300, bbox_inches='tight')
plt.show()