<a href="https://colab.research.google.com/github/j54854/myColab/blob/main/pom2_9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 生産管理技術2_9の補助資料

この補助資料では，生産管理技術2の第9回講義で扱ったオンラインジョブショップスケジューリングの事例で，ディスパッチングルールの性能比較のために用いた離散事象シミュレータについて簡単に説明する．

## 想定するジョブショップ

ここでは，ランダムに到着してくるJ個のジョブ（うちIJ個はシミュレーション開始時に到着済み）を，M台の機械で処理していくジョブショップを想定する．開始後に到着してくるジョブの到着間隔は，平均30の指数分布に従うものとしよう．

各ジョブのオペレーション数は[2, M]の範囲でランダムに設定し，加工経路は同じ機械を複数回通ることがないようにランダムに定める．また，各オペレーションの処理時間は[5, 65]の一様乱数で与え，各ジョブの納期は到着時刻に総加工時間と[100, 1000]の一様乱数を加えた時点に設定することにする．


比較対象のディスパッチングルールは，FIFO，LIFO，SPT，LPT，EDD，MS，MWKR，LRM，SPT/TMKRの9つである．

## コーディング

最初に，上の情報を格納しておくための大域変数を用意し，後で利用するモジュール（random，math，matplotlib）を読み込んでおく．

In [None]:
import random, math
import matplotlib.pyplot as plt

M = 4  # number of machines
J = 2200  # total number of jobs
IJ = 10  # number of initial jobs
LMD = 1 /30  # lambda (1 /mean time interval) of job arrival process
RPT = (5, 65)  # range of possible processing time
RIS = (100, 1000)  # range of possible initial slack
RULE = 'spt'  # choose from 'fifo', 'lifo', 'spt', 'lpt', 'edd', 'ms', 'mwkr', 'lrm', 'spt_tmkr'

### イベントとカレンダー

離散事象シミュレーションでは，イベントの発生をトリガーにしてシステムの状態を変化させていく．そのため，将来生起するイベントを，生起時間の順に並べたリストとして保持しておくと便利である（この
リストは，イベントカレンダーと呼ばれる）．

ここで，イベントとイベントカレンダーのクラスを作成しておこう．

In [None]:
class Event:
    def __init__(self, time, kind, job=None):
        self.time = time  # occurrence time
        self.kind = kind  # event type
        self.job = job  # target job (if exists)

class Calendar:
    def __init__(self, horizon=None):
        self.queue = []  # list of events
        if horizon:  # if time horizon is specified
            self.queue.append(Event(horizon, 'end'))

    def is_empty(self):  # is this calendar empty?
        return False if self.queue else True

    def append(self, e):  # add a new event
        self.queue.append(e)
        self.queue.sort(key=lambda x: x.time)  # sort events chronologically

    def trigger(self):  # trigger first event
        e = self.queue.pop(0)
        return e

イベントクラスには，生起時刻（time），タイプ（kind），そして必要に応じて対象ジョブ（job）の情報を格納しておくためのフィールドが用意されている．

カレンダークラスは，イベントのリスト（queue）を保持しており，インスタンス化の際に引数にhorizonを指定すると，そのhorizonの値を生起時刻とするシミュレーション終了イベント（'end'）が1つ追加されるようになっている（が，他の方法でシミュレーションを終了させる場合は必ずしもこの終了イベントを追加する必要はない）．trigger()メソッドは，queueの先頭から要素を取り出す．また，append()メソッドで要素を追加するたびに，生起時刻の昇順でのソート処理が走るようになっている．is_empty()は，queueが空かどうかを判定するメソッドである．

### ジョブとオペレーション

続いて，ジョブとそれを構成するオペレーションを表すクラスを定義しよう．

In [None]:
class Operation:
    def __init__(self):
        self.pt = random.randint(*RPT)  # processing time
        self.st = None  # starting time
        self.ct = None  # completion time

    def is_done(self):  # is this operation done?
        return True if self.ct else False

上のように，オペレーションクラスには，処理時間（pt），開始時刻（st），終了時刻（ct）の3つのフィールドが用意されている．処理時間は大域変数RPTで指定された範囲内の一様乱数で設定され，開始時刻と終了時刻はNoneで初期化されるようになっている．また，is_done()メソッドは，そのオペレーションが終了済みかどうかを判定するものである．

このように，オペレーションクラスはできるだけ単純にとどめ，加工経路などの情報や，開始や終了の処理に対応するメソッドはジョブクラスの方に持たせてある．ジョブクラスには，下のように，初期化処理によって8つのフィールドが用意される．

まず，jは，ジョブを区別するための連番であり，後述のモデルクラスをインスタンス化する際に到着順に振られるようになっている．modelは，そのモデルインスタンスを必要に応じて逆参照するためのものである．Kはそのジョブを構成するオペレーションの個数であり，上で述べた範囲内でランダムに設定されている．opsは，そのK個のオペレーションに対応する（オペレーションクラスの）インスタンスのリストであり，routeには，加工経路の情報が，後で定義するマシンクラスのインスタンスをランダムに並べたリストとして格納されている．

next_kは，次に処理する（もしくは，処理中で未完了の）オペレーションのインデックスであり，その値は，先頭のオペレーションに対応する0に初期化されている．この値を利用して，next_op()とnext_mac()のメソッドがそれぞれ，次のオペレーションとそれを処理するマシンを取得している．また，is_done()メソッドは，このnext_kの値をKと比較することで，このジョブの終了判定を返す．

to_dueは，到着時刻から納期までの時間であり，上述のルールに則ってランダムに設定されている．なお，その初期化処理で呼ばれているworkload()メソッドは，未完了のオペレーションの処理時間の総和を返す（ので，この時点では，オペレーションの処理時間の総和を返している）．

atは到着時刻（初期化の時点ではNone）であり，arrive()メソッドが呼ばれた際に，その時点のシミュレーション時刻（後述のモデルクラスのclockフィールド）の値がセットされる．そうすると，due()メソッドで納期の値を取得できるようになる．なお，arrive()メソッドは，それに加えて，後述のマシンのreceive()メソッドを呼ぶとともに，（もしmodelインスタンスの初期化処理中でなければ）次のジョブの到着イベントをカレンダーに追加している．

In [None]:
class Job:
    def __init__(self, j, model):
        self.j = j  # job index
        self.model = model  # pointer to model
        self.K = random.randint(2, M)  # number of operations (2 <= K <= M)
        self.ops = [Operation() for _ in range(self.K)]  # list of operations
        self.route = [self.model.macs[m] for m in random.sample(range(M), self.K)]  # processing route (list of machines)
        self.next_k = 0  # index of next operation (updated when completed)
        self.to_due = self.work_remaining() + random.randint(*RIS)  # initial slack (time from arrival to due)
        self.at = None  # arrival time (release date)

    def next_op(self):  # next operation
        return None if self.is_done() else self.ops[self.next_k]

    def next_mac(self):  # next machine
        return None if self.is_done() else self.route[self.next_k]

    def workload(self):  # total remaining workload
        return sum([op.pt for op in self.ops if not op.is_done()])

    def is_done(self):  # are all operations done?
        return False if self.next_k < self.K else True

    def due(self):  # due date
        return self.at +self.to_due

    def arrive(self, init=False):  # arrive at shop floor
        self.at = self.model.clock  # arrival time of job
        self.next_mac().receive(self, init)  # received by next machine
        if not init:  # except for model initialization
            self.model.add_arrival()  # register next arrival event

    def start(self):  # start next operation
        self.next_op().st = self.model.clock  # starting time of current operation
        self.model.add_completion(self)  # register its completion event

    def complete(self):  # complete current (still next) operation
        self.next_op().ct = self.model.clock  # completion time of current operation
        self.next_mac().release()  # released by current machine
        self.next_k += 1  # focus shifted to next operation
        if not self.is_done():  # if next operation exists
            self.next_mac().receive(self)  # received by next machine

    def criterion(self):  # criterion for dispatching rule
        if RULE == 'spt':
            return self.next_op().pt
        elif RULE == 'lpt':
            return -self.next_op().pt
        elif RULE == 'edd':
            return self.due()
        elif RULE == 'ms':
            return self.due() -self.workload() -self.model.clock
        elif RULE == 'mwkr':
            return -self.workload()
        elif RULE == 'lrm':
            return self.next_op().pt -self.workload()
        else:  # elif RULE == 'spt_tmkr':
            return self.next_op().pt /self.workload()

    def ct(self):  # completion time of job
        return self.ops[-1].ct

    def lateness(self):
        return self.ct() -self.due()

    def flow_time(self):
        return self.ct() -self.at

    def idle_time(self):
        return self.flow_time() -sum([op.pt for op in self.ops])


start()メソッドを呼ぶと，次のオペレーション（next_op()）が開始されるとともに，そのオペレーションの完了イベントがカレンダーに追加される．complete()メソッドは，処理中のオペレーション（next_op()）を完了させるものである．オペレーションを実行していたマシンのrelease()メソッドを呼ぶとともに，next_kの値をインクリメントさせ，もし後続のオペレーションがあれば，それを担当するマシンのreceive()メソッドを呼んでいることがわかる．

criterion()メソッドは，到着順以外の規準で優先順位付けを行うディスパッチングルールのために，優先度を計算するものである．ct()は，ジョブの完了時刻を返し，lateness()，flow_time()，idle_time()は，事後評価の際などに，各ジョブの納期ずれ，滞留時間，アイドル時間をそれぞれ計算するために利用できる．

### マシン

次にマシンのクラスを作成する．フィールドのｍは，マシンを区別するための連番であり，modelはジョブの場合と同じく，モデルインスタンスを必要に応じて逆参照するためのものである．bufferは，マシンの前の待ち行列であり，（オペレーションではなく）ジョブのリストになっている．また，Noneで初期化されているwipは，もし現在処理中のオペレーションがあれば，それに対応するジョブが格納される．

In [None]:
class Machine:
    def __init__(self, m, model):
        self.m = m  # machine index
        self.model = model  # pointer to model
        self.buffer = []  # entrance buffer (list of operations)
        self.wip = None  # operation currently under processing

    def receive(self, job, init=False):  # receive a new job
        self.buffer.append(job)  # put it in buffer
        if not init:  # except for model initialization
            self.dispatch()

    def release(self):  # release wip from machine
        self.wip = None
        self.dispatch()

    def dispatch(self):
        if not self.wip and self.buffer:  # if machine is idle and buffer is not empty
            self.wip = self.get_next()
            self.wip.start()

    def get_next(self):
        if RULE == 'lifo':
            return self.buffer.pop(-1)  # -1 is not necessary
        elif not RULE == 'fifo':  # if fifo, no need to reorder
            self.buffer.sort(key=lambda x: x.criterion())  # reorder buffer according to dispatching criterion
        return self.buffer.pop(0)

rceive()とreliese()の2つのメソッドは上で出てきたものである．前者は指定されたジョブをbufferに追加したあと（もしmodelインスタンスの初期化処理中でなければ），後者はwipをNoneに戻したあと，それぞれdispatch()メソッドを呼んでいる．dispatch()メソッドは，（wipがNoneでbufferが空でなければ）bufferから次のジョブを取り出して，それをwipに格納している．そして，この次のジョブを取り出すメソッドがget_next()である．

### モデル

最後に，上で作成した部品を組み合わせて，シミュレーションモデル本体のクラスを作成する．フィールドのclockはシミュレション時刻を表す変数，calendarはイベントカレンダー，macsはマシンのリスト，jobsはジョブのリスト，next_jは次に到着するジョブのインデックスである．初期化処理中に，IJ個のジョブを到着させたあと，各マシンのdispatch()メソッドを呼ぶとともに，次に到着するジョブの到着イベントをカレンダに追加している．

add_arrival()はジョブの到着イベントを，add_completion()はオペレーションの完了イベントを，それぞれイベントカレンダーに追加するメソッドである．print_state()メソッドは，それが呼ばれた時点でのシミュレーション時刻とシステム状態を画面に打ち出す．

In [None]:
class Model:
    def __init__(self):
        self.clock = 0  # simulation time
        self.calendar = Calendar()  # event calendar
        self.macs = [Machine(m, self) for m in range(M)]  # list of machines
        self.jobs = [Job(j, self) for j in range(J)]  # list of jobs
        for job in self.jobs[:IJ]:  # load initial jobs
            job.arrive(init=True)
        for mac in self.macs:  # dispatch initial jobs
            mac.dispatch()
        self.next_j = IJ  # index of next job
        self.add_arrival()  # register next arrival event

    def add_arrival(self):  # create arrival event and added it to calendar
        if self.next_j < J:  # only |J| jobs may arrive
            interval = math.ceil(random.expovariate(LMD))  # time to next arrival
            e = Event(self.clock +interval, 'arrival', self.jobs[self.next_j])
            self.calendar.append(e)
            self.next_j += 1

    def add_completion(self, job):  # create completion event and added it to calendar
        e = Event(self.clock +job.next_op().pt, 'completion', job)
        self.calendar.append(e)

    def print_state(self):
        print('---------- time = {} ----------'.format(self.clock))
        for mac in self.macs:
            print('M{}:({})'.format(mac.m, mac.wip.j if mac.wip else '_'), end=' [')
            for j, job in enumerate(mac.buffer):
                print('{}'.format(job.j), end='')
                if j < len(mac.buffer) -1:
                    print(', ', end='')
            print(']')

    def run(self):
        self.print_state()
        while not self.calendar.is_empty():
            e = self.calendar.trigger() # trigger next event
            self.clock = e.time  # advance simulation time
            if e.kind == 'arrival':  # in case of arrival event
                e.job.arrive()
            elif e.kind == 'completion':  # in case of completion event
                e.job.complete()
            else:  # if e.kind == 'end':
                break
            self.print_state()

最後のrun()がシミュレーションを実行するメソッドである．ここに，離散事象シミュレーションの流れが凝縮されている．イベントカレンダーが空でない限り回り続けるwhileループがあり，その中では，毎度，イベントカレンダーの先頭のイベントが取り出され，そのタイプに応じて，システム状態の変更を担う適切なメソッドが呼ばれる，という流れである．

## 実行

作成したシミュレータを動かしてみよう．モデルをインスタンス化して，そのrun()メソッドを呼べばよい．

randomモジュールで生成される疑似乱数を使用しているので，実行するたびに結果が異なることになるが，同じシード値を指定してrandom.seed()を呼んでおくと，同一の乱数系列を使用（して，同じ結果を再現）することができる．

In [None]:
# random.seed(999)  # if seed is specified, the same random number series can be used

# RULE = 'edd'  # you can specify another rule here, if necessary

model = Model()
model.run()

# print('rule = ' +RULE)  # to make sure if simulation is run with intended rule

### 結果の描画例

シミュレーション中に処理されたジョブ（に対応するJobインスタンス）が格納されたリストがmodel.jobsで参照できるので，例えば，下記のようにして，ジョブの属性（例えば，納期ずれならlateness()メソッドで取得できる）のヒストグラムを描くことができる．

In [None]:
x = [job.lateness() for job in model.jobs]
plt.hist(x, log=True)  # 'log=True' makes y axis expressed in log scale
plt.show()