In [1]:
import numpy as np
import json
from random import shuffle

In [2]:
class Job:
    def __init__(self, jid, Sj, rj, dj, wj):
        self.id = jid # job id
        self.S = Sj   # sequence of id of Tasks (list)
        self.r = rj   # release date
        self.d = dj   # due date
        self.w = wj   # weight
    
    def __str__(self):
        return f"id={self.id} release:{self.r} due:{self.d}, weight={self.w}"
    
    def B(self):
        return tasks[self.S[0]].B # beginning date
    
    def C(self):
        return tasks[self.S[-1]].C # completion date

class Task:
    def __init__(self, tid, jid, pi, workers):
        self.id = tid          # Task id
        self.jid = jid         # Job id of corresponding Job
        self.p = pi            # processing time
        self.workers = workers # possible Workers for this Task (list)
        self.B = None          # beginning date
        self.C = None          # completion date
        self.running = False
        self.done = False
        self.mid = None        # id of Machine on which task runs
        self.oid = None        # id of Operator doing this task
        
    def __str__(self):
        return f"id={self.id} job={self.jid} begin:{self.B} complete:{self.C} (time:{self.p}) machine:{self.mid} operator:{self.oid}"
        
class Worker:
    def __init__(self, mid, oid):
        self.mid = mid # id of Machine used
        self.oid = oid # id of Operator working
        
    def __str__(self):
        return f"machine={self.mid} operator:{self.oid}"

In [3]:
def parser_job(job):
    jid = job['job']
    Sj  = job['sequence']
    rj  = job['release_date']
    dj  = job['due_date']
    wj  = job['weight']
    return Job(jid, Sj, rj, dj, wj)

def parser_task(task, jid):
    tid = task['task']
    pi  = task['processing_time']
    workers = []
    for machine in task['machines']:
        mid = machine['machine']
        for operator in machine['operators']:
            oid = operator
            workers.append(Worker(mid, oid))
    return Task(tid, jid, pi, workers)

def parser_inst(inst):
    J = inst['parameters']['size']['nb_jobs']
    I = inst['parameters']['size']['nb_tasks']
    M = inst['parameters']['size']['nb_machines']
    O = inst['parameters']['size']['nb_operators']
    alpha = inst['parameters']['costs']['unit_penalty']
    beta = inst['parameters']['costs']['tardiness']
    
    machines  = {} # storing machines' availability by id
    operators = {} # storing operators' availability by id
    jobs  = (J+1) * [None]                          # list of Jobs
    jobs[0] = Job(jid=0, Sj=[], rj=-1, dj=-1, wj=0) # dummy element 0
    tasks = (I+1) * [None]                          # list of Tasks
    tasks[0] = Task(tid=0, jid=0, pi=0, workers=[]) # dummy element 0
    for job in inst['jobs']:
        jid = job['job']
        jobs[jid] = parser_job(job) # add Job in list of Jobs
        for task in inst['tasks']:
            tid = task['task']
            tasks[tid] = parser_task(task, jid) # add task in list of Tasks
            for machine in task['machines']:
                mid = machine['machine']
                for operator in machine['operators']:
                    oid = operator
                    machines[mid]  = True # set machine to available
                    operators[oid] = True # set operator to available
    
    return (J, I, M, O, alpha, beta, jobs, tasks, machines, operators)

In [4]:
def optim():
    T = 0
    while (np.sum([t.done for t in tasks]) < I):
        for j in jobs:
            if T >= j.r: # it is past the release date of this Job
                for (t_idx, tid) in enumerate(j.S): # loop over Tasks for this Job
                    t = tasks[tid]      # get Task from its id
                    for w in t.workers: # look for a Worker (machine, operator) to execute Task
                        if operators[w.oid] \
                        and machines[w.mid] \
                        and not t.running \
                        and not t.done \
                        and np.all([tasks[tid2].done for tid2 in j.S[:t_idx]]): # all Job's previous Tasks are done
                            t.running = True # set task to running
                            t.B = T          # set beginning time
                            t.mid = w.mid    # set machine id for task
                            t.oid = w.oid    # set operator id for task
                            machines[w.mid]  = False   # set machine to busy
                            operators[w.oid] = False   # set operator to busy

        for t in tasks:
            if t.running:
                if (T - t.B >= t.p):
                    t.running = False # task ends
                    t.done = True     # task is done
                    machines[t.mid]  = True # free machine
                    operators[t.oid] = True # free operator
                    t.C = T           # set completion time

        T += 1 # time flows

In [5]:
def cost(jobs):
    s = 0
    for job in jobs[1:]:
        C = job.C()
        T = max(C - job.d, 0)
        U = 1 if T > 0 else 0
        s += job.w * (C + alpha*U + beta*T)
    return s

In [6]:
def jsonify(tasks):
    res = []
    for task in tasks[1:]:
        sub_res = {}
        sub_res['task'], sub_res['start'], sub_res['machine'], sub_res['operator'] = task.id, task.B, task.mid, task.oid
        res.append(sub_res)
    return res

---

In [13]:
in_filename = "instances/medium.json"
with open(in_filename, 'rb') as f:
    inst = json.load(f)

In [14]:
(J, I, M, O, alpha, beta, jobs, tasks, machines, operators) = parser_inst(inst)
print(f"J={J}\nI={I}\nM={M}\nO={O}\nα={alpha}\nβ={beta}")

J=20
I=150
M=10
O=15
α=6
β=1


In [15]:
(J, I, M, O, alpha, beta, jobs, tasks, machines, operators) = parser_inst(inst)
optim()

In [16]:
print(jobs[5])
print(tasks[20])

id=5 release:5 due:11, weight=5
id=20 job=20 begin:1 complete:3 (time:2) machine:2 operator:3


In [17]:
cost(jobs)

8317

In [18]:
out_filename = "KIRO.json"
with open(out_filename, 'w') as f:
    json.dump(jsonify(tasks), f)