In [1]:
"""Minimal jobshop example."""
import collections
from ortools.sat.python import cp_model

In [2]:
"""Minimal jobshop problem."""
# Data.
jobs_data = [  # task = (machine_id, processing_time).
    [(0, 3), (1, 2), (2, 2)],  # Job0
    [(0, 2), (2, 1), (1, 4)],  # Job1
    [(1, 4), (2, 3)]  # Job2
]

print(jobs_data)

[[(0, 3), (1, 2), (2, 2)], [(0, 2), (2, 1), (1, 4)], [(1, 4), (2, 3)]]


In [3]:
machines_count = 1 + max(task[0] for job in jobs_data for task in job)
print("machines_count:", machines_count)

all_machines = range(machines_count)
print("all_machines:", all_machines)

# Computes horizon dynamically as the sum of all durations.
horizon = sum(task[1] for job in jobs_data for task in job)
print("horizon:", horizon)

machines_count: 3
all_machines: range(0, 3)
horizon: 21


In [4]:
# Create the model.
model = cp_model.CpModel()

# Named tuple to store information about created variables.
task_type = collections.namedtuple('task_type', 'start end interval')

# Named tuple to manipulate solution information.
assigned_task_type = collections.namedtuple('assigned_task_type','start job index duration')

# Creates job intervals and add to the corresponding machine lists.
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)

In [5]:
for job_id, job in enumerate(jobs_data):
    for task_id, task in enumerate(job):
        machine = task[0]
        duration = task[1]
        suffix = '_job%i_task%i' % (job_id, task_id)
        start_var = model.NewIntVar(0, horizon, 'start' + suffix)
        end_var = model.NewIntVar(0, horizon, 'end' + suffix)
        interval_var = model.NewIntervalVar(start_var, duration, end_var,'interval' + suffix)
        all_tasks[job_id, task_id] = task_type(start=start_var,end=end_var,interval=interval_var)
        machine_to_intervals[machine].append(interval_var)

In [6]:
print(machine_to_intervals[0])
# print(all_machines)

[interval_job0_task0(start = start_job0_task0, size = 3, end = end_job0_task0), interval_job1_task0(start = start_job1_task0, size = 2, end = end_job1_task0)]


In [7]:
# Create and add disjunctive constraints. bir makinede aynı zamanda 1 den fazla çalışma olmamalı
for machine in all_machines:
    model.AddNoOverlap(machine_to_intervals[machine])

In [8]:
# Precedences inside a job. task start bir onceki task bitince başlamalı
for job_id, job in enumerate(jobs_data):
    for task_id in range(len(job) - 1):
        model.Add(all_tasks[job_id, task_id + 1].start >= all_tasks[job_id, task_id].end)

In [9]:
for key, value in all_tasks.items() :
    print (key, value)

(0, 0) task_type(start=start_job0_task0(0..21), end=end_job0_task0(0..21), interval=interval_job0_task0(start = start_job0_task0, size = 3, end = end_job0_task0))
(0, 1) task_type(start=start_job0_task1(0..21), end=end_job0_task1(0..21), interval=interval_job0_task1(start = start_job0_task1, size = 2, end = end_job0_task1))
(0, 2) task_type(start=start_job0_task2(0..21), end=end_job0_task2(0..21), interval=interval_job0_task2(start = start_job0_task2, size = 2, end = end_job0_task2))
(1, 0) task_type(start=start_job1_task0(0..21), end=end_job1_task0(0..21), interval=interval_job1_task0(start = start_job1_task0, size = 2, end = end_job1_task0))
(1, 1) task_type(start=start_job1_task1(0..21), end=end_job1_task1(0..21), interval=interval_job1_task1(start = start_job1_task1, size = 1, end = end_job1_task1))
(1, 2) task_type(start=start_job1_task2(0..21), end=end_job1_task2(0..21), interval=interval_job1_task2(start = start_job1_task2, size = 4, end = end_job1_task2))
(2, 0) task_type(start

In [10]:
# Makespan objective.
obj_var = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(obj_var, [all_tasks[job_id, len(job) - 1].end for job_id, job in enumerate(jobs_data)])
model.Minimize(obj_var)

In [11]:
# Creates the solver and solve.
solver = cp_model.CpSolver()
status = solver.Solve(model)


In [12]:
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    print('Solution:')
    # Create one list of assigned tasks per machine.
    assigned_jobs = collections.defaultdict(list)
    for job_id, job in enumerate(jobs_data):
        for task_id, task in enumerate(job):
            machine = task[0]
            assigned_jobs[machine].append(
                assigned_task_type(start=solver.Value(all_tasks[job_id, task_id].start),job=job_id,index=task_id,duration=task[1]))

    # Create per machine output lines.
    output = ''
    for machine in all_machines:
        # Sort by starting time.
        assigned_jobs[machine].sort()
        sol_line_tasks = 'Machine ' + str(machine) + ': '
        sol_line = '           '

        for assigned_task in assigned_jobs[machine]:
            name = 'job_%i_task_%i' % (assigned_task.job,assigned_task.index)
            # Add spaces to output to align columns.
            sol_line_tasks += '%-15s' % name

            start = assigned_task.start
            duration = assigned_task.duration
            sol_tmp = '[%i,%i]' % (start, start + duration)
            # Add spaces to output to align columns.
            sol_line += '%-15s' % sol_tmp

        sol_line += '\n'
        sol_line_tasks += '\n'
        output += sol_line_tasks
        output += sol_line

    # Finally print the solution found.
    print(f'Optimal Schedule Length: {solver.ObjectiveValue()}')
    print(output)
else:
    print('No solution found.')

# Statistics.
print('\nStatistics')
print('  - conflicts: %i' % solver.NumConflicts())
print('  - branches : %i' % solver.NumBranches())
print('  - wall time: %f s' % solver.WallTime())


Solution:
Optimal Schedule Length: 11.0
Machine 0: job_1_task_0   job_0_task_0   
           [0,2]          [2,5]          
Machine 1: job_2_task_0   job_0_task_1   job_1_task_2   
           [0,4]          [5,7]          [7,11]         
Machine 2: job_1_task_1   job_2_task_1   job_0_task_2   
           [2,3]          [4,7]          [7,9]          


Statistics
  - conflicts: 0
  - branches : 0
  - wall time: 0.018869 s
