# The Job Shop Problem

One common scheduling problem is the job shop, in which multiple jobs are processed on several machines.

Each job consists of a sequence of tasks, which must be performed in a given order, and each task must be processed on a specific machine. For example, the job could be the manufacture of a single consumer item, such as an automobile. The problem is to schedule the tasks on the machines so as to minimize the length of the schedule—the time it takes for all the jobs to be completed.

There are several constraints for the job shop problem:

- No task for a job can be started until the previous task for that job is completed.
- A machine can only work on one task at a time.
- A task, once started, must run to completion.


In [2]:
# !pip install ortools

Collecting ortools
  Downloading ortools-9.10.4067-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.9 kB)
Collecting protobuf>=5.26.1 (from ortools)
  Downloading protobuf-5.27.0-cp38-abi3-manylinux2014_x86_64.whl.metadata (592 bytes)
Collecting immutabledict>=3.0.0 (from ortools)
  Downloading immutabledict-4.2.0-py3-none-any.whl.metadata (3.4 kB)
Downloading ortools-9.10.4067-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (26.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.7/26.7 MB[0m [31m61.0 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hDownloading immutabledict-4.2.0-py3-none-any.whl (4.7 kB)
Downloading protobuf-5.27.0-cp38-abi3-manylinux2014_x86_64.whl (309 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m309.2/309.2 kB[0m [31m26.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: protobuf, immutabledict, ortools
  Attempting uninstall: protobuf
    Found existing installatio

In [1]:
import collections
from ortools.sat.python import cp_model

## Define the data
Next, the program defines the data for the problem.

In [4]:
jobs_data = [  # task = (machine_id, processing_time).
    [(0, 3), (1, 2), (2, 2)],  # Job0
    [(0, 2), (2, 1), (1, 4)],  # Job1
    [(1, 4), (2, 3)],  # Job2
]

machines_count = 1 + max(task[0] for job in jobs_data for task in job)
all_machines = range(machines_count)
# Computes horizon dynamically as the sum of all durations.
horizon = sum(task[1] for job in jobs_data for task in job)

21


## Declare the model

The following code declares the model for the problem.

For each job and task, the program uses the model's `NewIntVar/new_int_var/newIntVar` method to create the variables:

- `start_var`: Start time of the task.
- `end_var`: End time of the task.
The upper bound for start_var and end_var is horizon, the sum of the processing times for all tasks in all jobs. horizon is sufficiently large to complete all tasks for the following reason: if you schedule the tasks - in non-overlapping time intervals (a non-optimal solution), the total length of the schedule is exactly horizon. So the duration of the optimal solution can't be any greater than horizon.

Next, the program uses the `NewIntervalVar/new_interval_var/newIntervalVar` method to create an **interval variable** —whose value is a variable time interval — for the task. The inputs to this method are:

- The start time of the task.
- The length of the time interval for the task.
- The end time of the task.
- The name for the interval variable.

In any solution, `end_var` minus `start_var` must equal `duration`.



In [5]:
model = cp_model.CpModel()

# Named tuple to store information about created variables.
task_type = collections.namedtuple("task_type", "start end interval")
# Named tuple to manipulate solution information.
assigned_task_type = collections.namedtuple(
    "assigned_task_type", "start job index duration"
)

# Creates job intervals and add to the corresponding machine lists.
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)

for job_id, job in enumerate(jobs_data):
    for task_id, task in enumerate(job):
        machine, duration = task
        suffix = f"_{job_id}_{task_id}"
        start_var = model.new_int_var(0, horizon, "start" + suffix)
        end_var = model.new_int_var(0, horizon, "end" + suffix)
        interval_var = model.new_interval_var(
            start_var, duration, end_var, "interval" + suffix
        )
        all_tasks[job_id, task_id] = task_type(
            start=start_var, end=end_var, interval=interval_var
        )
        machine_to_intervals[machine].append(interval_var)

print(machine_to_intervals)

defaultdict(<class 'list'>, {0: [interval_0_0(start = start_0_0, size = 3, end = end_0_0), interval_1_0(start = start_1_0, size = 2, end = end_1_0)], 1: [interval_0_1(start = start_0_1, size = 2, end = end_0_1), interval_1_2(start = start_1_2, size = 4, end = end_1_2), interval_2_0(start = start_2_0, size = 4, end = end_2_0)], 2: [interval_0_2(start = start_0_2, size = 2, end = end_0_2), interval_1_1(start = start_1_1, size = 1, end = end_1_1), interval_2_1(start = start_2_1, size = 3, end = end_2_1)]})


## Define the constraints

The program uses the model's `AddNoOverlap/add_no_overlap/addNoOverlap` method to create the no overlap constraints, which prevent tasks for the same machine from overlapping in time.

Next, the program adds the precedence constraints, which prevent consecutive tasks for the same job from overlapping in time. For each job and each task in the job, a linear constraint is added to specify that the end time of a task to occur before the start time of the next task in the job.

In [6]:
# Create and add disjunctive constraints.
for machine in all_machines:
    model.add_no_overlap(machine_to_intervals[machine])

# Precedences inside a job.
for job_id, job in enumerate(jobs_data):
    for task_id in range(len(job) - 1):
        model.add(
            all_tasks[job_id, task_id + 1].start >= all_tasks[job_id, task_id].end
        )

## Define the objectives

In [7]:
# Makespan objective.
obj_var = model.new_int_var(0, horizon, "makespan")
model.add_max_equality(
    obj_var,
    [all_tasks[job_id, len(job) - 1].end for job_id, job in enumerate(jobs_data)],
)
model.minimize(obj_var)

## Invoke the solver

In [8]:
solver = cp_model.CpSolver()
status = solver.solve(model)

In [10]:
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    print("Solution:")
    # Create one list of assigned tasks per machine.
    assigned_jobs = collections.defaultdict(list)
    for job_id, job in enumerate(jobs_data):
        for task_id, task in enumerate(job):
            machine = task[0]
            assigned_jobs[machine].append(
                assigned_task_type(
                    start=solver.value(all_tasks[job_id, task_id].start),
                    job=job_id,
                    index=task_id,
                    duration=task[1],
                )
            )

    # Create per machine output lines.
    output = ""
    for machine in all_machines:
        # Sort by starting time.
        assigned_jobs[machine].sort()
        sol_line_tasks = "Machine " + str(machine) + ": "
        sol_line = "           "

        for assigned_task in assigned_jobs[machine]:
            name = f"job_{assigned_task.job}_task_{assigned_task.index}"
            # add spaces to output to align columns.
            sol_line_tasks += f"{name:15}"

            start = assigned_task.start
            duration = assigned_task.duration
            sol_tmp = f"[{start},{start + duration}]"
            # add spaces to output to align columns.
            sol_line += f"{sol_tmp:15}"

        sol_line += "\n"
        sol_line_tasks += "\n"
        output += sol_line_tasks
        output += sol_line

    # Finally print the solution found.
    print(f"Optimal Schedule Length: {solver.objective_value}")
    print(output)
else:
    print("No solution found.")

Solution:
Optimal Schedule Length: 11.0
Machine 0: job_1_task_0   job_0_task_0   
           [0,2]          [2,5]          
Machine 1: job_2_task_0   job_0_task_1   job_1_task_2   
           [0,4]          [5,7]          [7,11]         
Machine 2: job_1_task_1   job_2_task_1   job_0_task_2   
           [2,3]          [4,7]          [7,9]          



## Code from copilot

In [None]:
# Import the required libraries
from ortools.sat.python import cp_model

def main():
    # Create a CP-SAT solver
    model = cp_model.CpModel()

    # Define the variables
    num_machines = 3
    num_manpower = 3
    machines = range(num_machines)
    manpower = range(num_manpower)

    # Create task variables
    tasks = {}
    for m in machines:
        for p in manpower:
            tasks[(m, p)] = model.NewBoolVar(f"Task_{m}_{p}")

    # Each machine can handle only one task
    for m in machines:
        model.Add(sum(tasks[(m, p)] for p in manpower) == 1)

    # Each task requires 1 man and 3 hours on a machine
    for p in manpower:
        model.Add(sum(tasks[(m, p)] for m in machines) == 1)

    # Define the objective function (minimize makespan)
    makespan = model.NewIntVar(0, 100, "Makespan")
    for m in machines:
        for p in manpower:
            model.Add(makespan >= (m + 1) * tasks[(m, p)])

    model.Minimize(makespan)

    # Create a solver and solve the problem
    solver = cp_model.CpSolver()
    status = solver.Solve(model)

    if status == cp_model.OPTIMAL:
        print("Optimal solution found:")
        for m in machines:
            for p in manpower:
                if solver.Value(tasks[(m, p)]):
                    print(f"Task {p} assigned to Machine {m}")
    else:
        print("No optimal solution found.")

if __name__ == "__main__":
    main()
