In [1]:
"""Minimal jobshop example."""
import collections
from ortools.sat.python import cp_model

In [2]:
# Data.
task_data = [  # (task_id, machine_id, processing_time)
    (0, 0, 3), (0, 1, 2), (0, 2, 2),  # Task0
    (1, 0, 2), (1, 1, 5), (1, 2, 2),  # Task1
    (2, 0, 6), (2, 1, 1),  # Task2
]

# Encode job data

NameError: name 'all_machines' is not defined

# Create CP model

In [11]:
# Create the model.

class UnrelatedMachinesScheduling:
    def __init__(self, task_data):
        self.task_data = task_data
        self.__define_model()
        self.__define_constraints()
        self.__define_objective()

    def __define_model(self):
        self.model = cp_model.CpModel()
        # Named tuple to store information about created variables.
        self.task_type = collections.namedtuple("task_type", "start end interval")
        # Named tuple to manipulate solution information.
        self.assigned_task_type = collections.namedtuple(
            "assigned_task_type", "start job index duration"
        )
        # Creates job intervals and add to the corresponding machine lists.
        self.all_tasks = {}
        self.task_assigned_machines = collections.defaultdict(list)
        self.machine_to_intervals = collections.defaultdict(list)
        self.horizon = sum(task[2] for task in self.task_data)
    
    
    def __define_constraints(self):
        #Define decision variable
        for task, machine, processing_time in self.task_data:
            suffix = f"_{task}_{machine}"
            assigned_var = self.model.NewIntVar(0, 1, 'assigned' + suffix)
            self.task_assigned_machines[task].append(assigned_var)
            interval_var = self.model.NewIntervalVar(
                start    = self.model.NewIntVar(0, self.horizon, 'start' + suffix),
                end      = self.model.NewIntVar(0, self.horizon, 'end' + suffix),
                size = processing_time,
                name = 'interval'+suffix
            )
            self.machine_to_intervals[machine].append(interval_var)

        #Constraint 1: No machine may work on more than one task simultaneously
        machines_count = 1 + max(task[1] for task in self.task_data)
        all_machines = range(machines_count)
        for machine in all_machines:
            self.model.AddNoOverlap(self.machine_to_intervals[machine])

        #Constraint 2: All tasks must be assigned to exactly one machine
        tasks_count = 1+ max(task[0] for task in self.task_data)
        all_tasks = range(tasks_count)
        for task in all_tasks:
            self.model.Add(cp_model.LinearExpr.Sum(self.task_assigned_machines[task]) == 1)
        
    def __define_objective(self):
        # Makespan objective.
        obj_var = self.model.NewIntVar(0, self.horizon, "makespan")
        self.model.AddMaxEquality(
            obj_var,
            [self.all_tasks[job_id, len(job) - 1].end for job_id, job in enumerate(jobs_data)],
        )
        self.model.Minimize(obj_var)

    def solve(self, solver=cp_model.CpSolver()):
        status = solver.Solve(self.model)
        return (solver, status)


model = UnrelatedMachinesScheduling(task_data)

NameError: name 'jobs_data' is not defined

In [None]:
# Creates the solver and solve.
solver, status = model.solve()

print('feasible:', status == cp_model.FEASIBLE, ', optimal:', status == cp_model.OPTIMAL)

In [None]:
for job_id, job in enumerate(model.jobs_data):
    for task_id, task in enumerate(job):
        print(job_id, task_id, task)
        print(solver.Value(model.all_tasks[job_id, task_id].start))

In [None]:
# Named tuple to store information about created variables.
task_type = collections.namedtuple("task_type", "start end interval")
# Named tuple to manipulate solution information.
assigned_task_type = collections.namedtuple(
    "assigned_task_type", "start job index duration"
)
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)
    

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    print("Solution:")
    # Create one list of assigned tasks per machine.
    assigned_jobs = collections.defaultdict(list)
    for job_id, job in enumerate(jobs_data):
        for task_id, task in enumerate(job):
            machine = task[0]
            assigned_jobs[machine].append(
                assigned_task_type(
                    start=solver.Value(all_tasks[job_id, task_id].start),
                    job=job_id,
                    index=task_id,
                    duration=task[1],
                )
            )

    # Create per machine output lines.
    output = ""
    for machine in all_machines:
        # Sort by starting time.
        assigned_jobs[machine].sort()
        sol_line_tasks = "Machine " + str(machine) + ": "
        sol_line = "           "

        for assigned_task in assigned_jobs[machine]:
            name = f"job_{assigned_task.job}_task_{assigned_task.index}"
            # Add spaces to output to align columns.
            sol_line_tasks += f"{name:15}"

            start = assigned_task.start
            duration = assigned_task.duration
            sol_tmp = f"[{start},{start + duration}]"
            # Add spaces to output to align columns.
            sol_line += f"{sol_tmp:15}"

        sol_line += "\n"
        sol_line_tasks += "\n"
        output += sol_line_tasks
        output += sol_line

    # Finally print the solution found.
    print(f"Optimal Schedule Length: {solver.ObjectiveValue()}")
    print(output)
else:
    print("No solution found.")

# Statistics.
print("\nStatistics")
print(f"  - conflicts: {solver.NumConflicts()}")
print(f"  - branches : {solver.NumBranches()}")
print(f"  - wall time: {solver.WallTime()}s")