https://developers.google.com/optimization/scheduling/job_shop

In [1]:
import collections
from ortools.sat.python import cp_model

In [2]:
jobs_data = [  # task = (machine_id, processing_time).
    [(0, 3), (1, 2), (2, 2)],  # Job0
    [(0, 2), (2, 1), (1, 4)],  # Job1
    [(1, 4), (2, 3)],  # Job2
]

machines_count = 1 + max(task[0] for job in jobs_data for task in job)
all_machines = range(machines_count)
# Computes horizon dynamically as the sum of all durations.
horizon = sum(task[1] for job in jobs_data for task in job)

CP = Constraint Programming
cp_model.CpModel is a object to do all the cp stuff.

In [3]:
model = cp_model.CpModel()

In [4]:
# Named tuple to store information about created variables.
task_type = collections.namedtuple("task_type", "start end interval")
# Named tuple to manipulate solution information.
assigned_task_type = collections.namedtuple(
    "assigned_task_type", "start job index duration"
)

# Creates job intervals and add to the corresponding machine lists.
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)

for job_id, job in enumerate(jobs_data):
    for task_id, task in enumerate(job):
        machine, duration = task
        suffix = f"_{job_id}_{task_id}"
        start_var = model.new_int_var(0, horizon, "start" + suffix)
        end_var = model.new_int_var(0, horizon, "end" + suffix)
        interval_var = model.new_interval_var(
            start_var, duration, end_var, "interval" + suffix
        )
        all_tasks[job_id, task_id] = task_type(
            start=start_var, end=end_var, interval=interval_var
        )
        machine_to_intervals[machine].append(interval_var)
print(all_tasks)
print(dict(machine_to_intervals))
# step representation: (job_id, order)

{(0, 0): task_type(start=start_0_0(0..21), end=end_0_0(0..21), interval=interval_0_0(start = start_0_0, size = 3, end = end_0_0)), (0, 1): task_type(start=start_0_1(0..21), end=end_0_1(0..21), interval=interval_0_1(start = start_0_1, size = 2, end = end_0_1)), (0, 2): task_type(start=start_0_2(0..21), end=end_0_2(0..21), interval=interval_0_2(start = start_0_2, size = 2, end = end_0_2)), (1, 0): task_type(start=start_1_0(0..21), end=end_1_0(0..21), interval=interval_1_0(start = start_1_0, size = 2, end = end_1_0)), (1, 1): task_type(start=start_1_1(0..21), end=end_1_1(0..21), interval=interval_1_1(start = start_1_1, size = 1, end = end_1_1)), (1, 2): task_type(start=start_1_2(0..21), end=end_1_2(0..21), interval=interval_1_2(start = start_1_2, size = 4, end = end_1_2)), (2, 0): task_type(start=start_2_0(0..21), end=end_2_0(0..21), interval=interval_2_0(start = start_2_0, size = 4, end = end_2_0)), (2, 1): task_type(start=start_2_1(0..21), end=end_2_1(0..21), interval=interval_2_1(start

In [5]:
# Create and add disjunctive constraints.
for machine in all_machines:
    model.add_no_overlap(machine_to_intervals[machine])

# Precedences inside a job.
for job_id, job in enumerate(jobs_data):
    for task_id in range(len(job) - 1):
        model.add(
            all_tasks[job_id, task_id + 1].start >= all_tasks[job_id, task_id].end
        )

In [6]:
# Makespan objective.
obj_var = model.new_int_var(0, horizon, "makespan")
model.add_max_equality(
    obj_var,
    [all_tasks[job_id, len(job) - 1].end for job_id, job in enumerate(jobs_data)],
)
model.minimize(obj_var)

In [7]:
solver = cp_model.CpSolver()
status = solver.solve(model)

In [8]:
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    print("Solution:")
    # Create one list of assigned tasks per machine.
    assigned_jobs = collections.defaultdict(list)
    for job_id, job in enumerate(jobs_data):
        for task_id, task in enumerate(job):
            machine = task[0]
            assigned_jobs[machine].append(
                assigned_task_type(
                    start=solver.value(all_tasks[job_id, task_id].start),
                    job=job_id,
                    index=task_id,
                    duration=task[1],
                )
            )

    # Create per machine output lines.
    output = ""
    for machine in all_machines:
        # Sort by starting time.
        assigned_jobs[machine].sort()
        sol_line_tasks = "Machine " + str(machine) + ": "
        sol_line = "           "

        for assigned_task in assigned_jobs[machine]:
            name = f"job_{assigned_task.job}_task_{assigned_task.index}"
            # add spaces to output to align columns.
            sol_line_tasks += f"{name:15}"

            start = assigned_task.start
            duration = assigned_task.duration
            sol_tmp = f"[{start},{start + duration}]"
            # add spaces to output to align columns.
            sol_line += f"{sol_tmp:15}"

        sol_line += "\n"
        sol_line_tasks += "\n"
        output += sol_line_tasks
        output += sol_line

    # Finally print the solution found.
    print(f"Optimal Schedule Length: {solver.objective_value}")
    print(output)
else:
    print("No solution found.")

Solution:
Optimal Schedule Length: 11.0
Machine 0: job_1_task_0   job_0_task_0   
           [0,2]          [2,5]          
Machine 1: job_2_task_0   job_0_task_1   job_1_task_2   
           [0,4]          [5,7]          [7,11]         
Machine 2: job_1_task_1   job_2_task_1   job_0_task_2   
           [2,3]          [4,7]          [7,9]          

