In [32]:
import numpy as np
from collections import namedtuple
from ortools.sat.python import cp_model
import time

In [2]:
''' Algorithmic Approach : use solve_schedule_algo(intervals, num_resources) '''

# Proof: http://www.cs.toronto.edu/~milad/csc373/lectures/T1.pdf
def schedule_algo(intervals, num_resources):
  sorted_end   = sorted(intervals, key=lambda x: x[1])    # n*log(n)
  schedules = [[] for _ in range(num_resources)]

  for interval in sorted_end: 
    # get schedule with smallest end time such that the start time of interval is greater than endtime
    index = assign_schedule(interval[0], schedules)
    if index > -1: schedules[index].append(interval)

  return sum([len(schedule) for schedule in schedules]), schedules  

def assign_schedule(start_time, schedules):
  index = -1
  latest_end = -1
  for i, schedule in enumerate(schedules): 
    schedule_end_time = 0 if len(schedule) < 1 else schedule[-1][1]
    if schedule_end_time > start_time:
      continue
    elif schedule_end_time > latest_end:
      latest_end = schedule_end_time
      index = i
  return index

In [3]:
''' SMT Approach : use solve_schedule_smt(intervals, num_resources) '''

def solve_schedule(intervals, num_resources):
  # Declare Model
  model = cp_model.CpModel()

  # Define Variables
  num_intervals = len(intervals)

  # 2D boolean table for interval-resource assignments
  # row represents resource | column represents interval
  assign_resource = np.empty((num_resources, num_intervals), dtype=cp_model.IntVar)
  for i in range(num_intervals):
    for r in range(num_resources):
      assign_resource[r][i] = model.NewBoolVar(f'assign_i{i+1}_r{r+1}')

  # 2D array for schedules | row represents a resource
  schedules = np.empty((num_resources, num_intervals), dtype=cp_model.IntervalVar)
  for i in range(num_intervals):
    interval = intervals[i]
    for r in range(num_resources):
      schedules[r][i] = model.NewOptionalIntervalVar(interval[0], interval[1]-interval[0], 
                        interval[1], assign_resource[r][i], f'interval_i{i+1}_r{r+1}')

  # No overlapping intervals
  for r in range(num_resources):
    model.AddNoOverlap(schedules[r])

  # Each task scheduled at most once
  for i in range(num_intervals): 
    model.AddAtMostOne(assign_resource[:,i])

  # Maximize Length of Schedule
  model.Maximize(sum(assign_resource.flatten()))

  # Invoke Solver
  solver = cp_model.CpSolver()
  status = solver.Solve(model)

  # Reformat Solution 
  solution = []
  if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for r in range(num_resources):
      resource_schedule = []
      for i in range(num_intervals):
        is_scheduled = solver.Value(assign_resource[r][i])
        if is_scheduled: resource_schedule.append(intervals[i])
      solution.append(resource_schedule)
  else: 
    print(f'Solver Failed with Error Code: {solver.StatusName()}')

  return solver.ObjectiveValue(), solution

In [36]:
''' SMT Approach + Resource Optimization : use solve_schedule_opt(intervals, num_resources) '''

def solve_opt_schedule(intervals, num_resources):
  # Solve Intermediate Solution
  obj = int(solve_schedule(intervals, num_resources)[0])

  # Declare Model
  model = cp_model.CpModel()

  # Define Variables
  num_intervals = len(intervals)

  # 2D boolean table for interval-resource assignments
  # row represents resource | column represents interval
  assign_resource = np.empty((num_resources, num_intervals), dtype=cp_model.IntVar)
  for i in range(num_intervals):
    for r in range(num_resources):
      assign_resource[r][i] = model.NewBoolVar(f'assign_i{i+1}_r{r+1}')

  # 2D array for schedules | row represents a resource
  durations = np.zeros((num_resources, num_intervals))
  schedules = np.empty((num_resources, num_intervals), dtype=cp_model.IntervalVar)
  for i in range(num_intervals):
    interval = intervals[i]
    for r in range(num_resources):
      durations[r][i] = interval[1]-interval[0]
      schedules[r][i] = model.NewOptionalIntervalVar(interval[0], interval[1]-interval[0], 
                        interval[1], assign_resource[r][i], f'interval_i{i+1}_r{r+1}')

  # No overlapping intervals
  for r in range(num_resources):
    model.AddNoOverlap(schedules[r])

  # Each task scheduled at most once
  for i in range(num_intervals): 
    model.AddAtMostOne(assign_resource[:,i])

  # Add Optimal Value for Solver 2
  model.Add(sum(assign_resource.flatten()) == obj)
  model.Maximize(cp_model.LinearExpr.WeightedSum(assign_resource.flatten(), durations.flatten()))

  # Invoke Solver
  solver = cp_model.CpSolver()
  status = solver.Solve(model)

  # Reformat Solution 
  solution = []
  if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for r in range(num_resources):
      resource_schedule = []
      for i in range(num_intervals):
        is_scheduled = solver.Value(assign_resource[r][i])
        if is_scheduled: resource_schedule.append(intervals[i])
      solution.append(resource_schedule)
  else: 
    print(f'Solver Failed with Error Code: {solver.StatusName()}')

  return obj, solver.ObjectiveValue(), solution

In [40]:
''' Multiple-Objective Optimization Static Weighted : solve_static_weighted(intervals, num_resources) '''

def solve_static_weighted(intervals, num_resources):
  # Declare Model
  model = cp_model.CpModel()

  # Define Variables
  num_intervals = len(intervals)

  # 2D boolean table for interval-resource assignments
  # row represents resource | column represents interval
  assign_resource = np.empty((num_resources, num_intervals), dtype=cp_model.IntVar)
  for i in range(num_intervals):
    for r in range(num_resources):
      assign_resource[r][i] = model.NewBoolVar(f'assign_i{i+1}_r{r+1}')

  # 2D array for schedules | row represents a resource
  durations = np.zeros((num_resources, num_intervals))
  schedules = np.empty((num_resources, num_intervals), dtype=cp_model.IntervalVar)
  for i in range(num_intervals):
    interval = intervals[i]
    for r in range(num_resources):
      durations[r][i] = interval[1]-interval[0]
      schedules[r][i] = model.NewOptionalIntervalVar(interval[0], interval[1]-interval[0], 
                        interval[1], assign_resource[r][i], f'interval_i{i+1}_r{r+1}')

  # No overlapping intervals
  for r in range(num_resources):
    model.AddNoOverlap(schedules[r])

  # Each task scheduled at most once
  for i in range(num_intervals): 
    model.AddAtMostOne(assign_resource[:,i])

  # Weighted Sum for # of tasks + resource utilization time
  model.Maximize(10000*sum(assign_resource.flatten())+cp_model.LinearExpr.WeightedSum(assign_resource.flatten(), durations.flatten()))

  # Invoke Solver
  solver = cp_model.CpSolver()
  status = solver.Solve(model)

  # Reformat Solution 
  solution = []
  num_scheduled = 0
  if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for r in range(num_resources):
      resource_schedule = []
      for i in range(num_intervals):
        is_scheduled = solver.Value(assign_resource[r][i])
        num_scheduled += is_scheduled
        if is_scheduled: resource_schedule.append(intervals[i])
      solution.append(resource_schedule)
  else: 
    print(f'Solver Failed with Error Code: {solver.StatusName()}')

  return num_scheduled, solver.ObjectiveValue(), solution

In [44]:
''' Multiple-Objective Optimization Dynamic Weighted : solve_dynamic_weighted(intervals, num_resources) '''

def solve_dynamic_weighted(intervals, num_resources):
  # Declare Model
  model = cp_model.CpModel()

  # Define Variables
  num_intervals = len(intervals)

  # Grab Maximum Range of Intervals
  flattened = list(sum(intervals,()))
  task_weights = max(flattened) - min(flattened)

  # 2D boolean table for interval-resource assignments
  # row represents resource | column represents interval
  assign_resource = np.empty((num_resources, num_intervals), dtype=cp_model.IntVar)
  for i in range(num_intervals):
    for r in range(num_resources):
      assign_resource[r][i] = model.NewBoolVar(f'assign_i{i+1}_r{r+1}')

  # 2D array for schedules | row represents a resource
  durations = np.zeros((num_resources, num_intervals))
  schedules = np.empty((num_resources, num_intervals), dtype=cp_model.IntervalVar)
  for i in range(num_intervals):
    interval = intervals[i]
    for r in range(num_resources):
      durations[r][i] = interval[1]-interval[0]
      schedules[r][i] = model.NewOptionalIntervalVar(interval[0], interval[1]-interval[0], 
                        interval[1], assign_resource[r][i], f'interval_i{i+1}_r{r+1}')

  # No overlapping intervals
  for r in range(num_resources):
    model.AddNoOverlap(schedules[r])

  # Each task scheduled at most once
  for i in range(num_intervals): 
    model.AddAtMostOne(assign_resource[:,i])

  # Weighted Sum for # of tasks + resource utilization time
  model.Maximize(task_weights*sum(assign_resource.flatten())+cp_model.LinearExpr.WeightedSum(assign_resource.flatten(), durations.flatten()))

  # Invoke Solver
  solver = cp_model.CpSolver()
  status = solver.Solve(model)

  # Reformat Solution 
  solution = []
  num_scheduled = 0
  if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for r in range(num_resources):
      resource_schedule = []
      for i in range(num_intervals):
        is_scheduled = solver.Value(assign_resource[r][i])
        num_scheduled += is_scheduled
        if is_scheduled: resource_schedule.append(intervals[i])
      solution.append(resource_schedule)
  else: 
    print(f'Solver Failed with Error Code: {solver.StatusName()}')

  return num_scheduled, solver.ObjectiveValue(), solution

In [12]:
def cumulative_time(schedules): 
  total_time = 0
  for intervals in schedules: 
    for i in intervals: 
      total_time += i[1]-i[0]
  return total_time

def display_results(schedules): 
  for i, s in enumerate(schedules):
    print(f'Resource {i+1}:', s)

In [None]:
approach = solve_dynamic_weighted

intervals = [(1,3), (4,11), (2,5), (6,8), (9,12), (7,10)]
num_resources = 2


In [None]:
start = time.now()
approach(intervals, num_resources)
end = time.now()

runtime = end-start