In [1]:
import numpy as np

In [2]:
## Constants

service_times = {
    "contract"  :   30,
    "complain"  :   25,
    "confirm"   :   10,
    "request"   :   5, 
    "review"    :   10,
}

transition_matrices = {
    "A" : {
        "services": ["complain", "contract"], 
        "matrix" : [
            [0.9, 0.1], [0.2, 0.8],
        ],
        "time_between": 5
    },
    "B" : {
        "services": ["review", "request"], 
        "matrix" : [
            [0.95, 0.05], [0.15, 0.85],
        ],
        "time_between": 7
    },
    "C" : {
        "services": ["review", "request", "confirm"], 
        "matrix" : [
            [0.75, 0.15, 0.1], [0.1, 0.8, 0.1], [0.05, 0.05, 0.9],
        ],
        "time_between": 10
    },
}

interarrival_times = {
    "contract"  :   ["minute", "normal", 40, 36],
    "complain"  :   ["hour", "exponential", 0.5],
    "confirm"   :   ["hour", "gamma", 1, 2],
    "request"   :   ["minute", "exponential", 0.06], 
    "review"    :   ["minute", "normal", 15, 36],
}

queue_policies = {
    "contract"  :   'spt',
    "complain"  :   'fifo',
    "confirm"   :   'fifo',
    "request"   :   'spt',
    "review"    :   'siro',
}



In [3]:
class ServiceType:
    def __init__(self, name):
        self.name = name
        self.service_time = service_times[name]
        self.queue_policy = queue_policies[name]

    def get_service_time(self):
        scale = self.service_time
        sample = np.math.ceil(np.random.exponential(scale=scale))
        # print(scale, sample)
        return sample 
    
    def get_interarrival_time(self): 
        config = interarrival_times[self.name]
        scale, distr, params = config[0], config[1], config[2:]
        sample = 0
        if distr == "normal": 
            sample = np.random.normal(loc=params[0], scale=params[1]**0.5)
        elif distr == "exponential": 
            sample = np.random.exponential(scale=1/params[0])
        elif distr == "gamma": 
            sample = np.random.gamma(scale=1/(2*params[0]), shape=params[1])
        
        if scale == "hour": 
            sample *= 60
        elif scale == "second": 
            sample /= 60 
        
        return np.math.ceil(sample)        

class WorkerType: 
    def __init__(self, name):
        self.name = name
        config = transition_matrices[name]
        self.trans_dict = {}
        services = config["services"]
        for i in range(len(services)) :
            self.trans_dict[services[i]] = config["matrix"][i]
        self.transition_time = config["time_between"]
    
    def get_first_job(self): 
        # print(self.trans_dict.keys().)
        job_name = np.random.choice(list(self.trans_dict.keys()))
        return job_name
    
    def get_next_job(self, job): 
        next_job = np.random.choice(list(self.trans_dict.keys()), p=self.trans_dict[job])
        if next_job == job: 
            return None 
        return next_job
    
    def get_jobs(self):
        return list(self.trans_dict.keys())
          
class Worker: 
    def __init__(self, type: WorkerType):
        self.type = type
        self.current_job = type.get_first_job()
        self.current_customer = None 
        self.time_to_move = type.transition_time
        self.works_on_jobs = {}
        for job in type.get_jobs():
            self.works_on_jobs[job] = 0

    
    def customer_finished(self, i): 
        if self.current_customer: 
            self.current_customer.proceed_work(i)
            if self.current_customer.is_finished(): 
                self.current_customer = None 
                return True 
        return False

    def maybe_move(self): 
        self.time_to_move -= 1
        if self.time_to_move == 0: 
            self.time_to_move = self.type.transition_time
            next_job = self.type.get_next_job(self.current_job)
            if next_job: 
                # print(f"worker moved: {self.current_job} -> {next_job}")
                customer = self.current_customer
                self.current_customer = None 
                self.current_job = next_job
                return customer
    
    def needs_customer(self): 
        if self.current_customer != None: 
            self.works_on_jobs[self.current_job] += 1
            return False 
        return self.current_job
    
    def take_customer(self, customer): 
        self.current_customer = customer
        # print(str(self))
            
    def __str__(self): 
        return f"{self.type.name}:{self.current_job} => {self.current_customer}"



class Customer: 
    def __init__(self, type: ServiceType, enter_time): 
        self.type = type 
        self.enter_time = enter_time
        self.service_time = type.get_service_time()
        # print(f"new_customer: {type.name}, {self.service_time}")
        self.remainig_time = self.service_time
        self.finish_time = -1

    def is_finished(self): 
        return self.remainig_time == 0
    
    def proceed_work(self, i): 
        self.remainig_time -= 1
        if self.is_finished():
            self.finish_time = i
            # print(f"customer_finished: {self.type.name}, {self.enter_time}-{self.finish_time}")

    def __str__(self): 
        return f"{self.type.name}"



class Service: 
    def __init__(self, type: ServiceType):
        self.type = type 
        self.queue = []
        self.customers = []
        self.next_customer = self.type.get_interarrival_time()
        self.working_customers = 0
        self.A = [self.next_customer]
        self.L_Q = []
        self.L = []

    def customer_arrived(self, i): 
        self.L_Q.append(len(self.queue))
        self.L.append(self.L_Q[-1] + self.working_customers)
        self.next_customer -= 1
        if self.next_customer == 0: 
            self.next_customer = self.type.get_interarrival_time()
            self.A.append(self.next_customer)
            customer = Customer(self.type, i)
            self.customers.append(customer)
            self.enqueue(customer)
    
    def enqueue(self, customer): 
        self.queue.append(customer)

    def dequeue(self): 
        if len(self.queue) == 0: 
            return None 
        policy = self.type.queue_policy
        if policy == 'fifo': 
            return self.queue.pop(0)
        elif policy == 'spt': 
            self.queue.sort(key=lambda x: x.remainig_time)
            return self.queue.pop(0)
        elif policy == 'siro': 
            return self.queue.pop(np.random.randint(0, len(self.queue)))
        
    def get_service_times(self): 
        s = [c.service_time - c.remainig_time for c in self.customers]
        return s
    
    def finish(self, i): 
        for customer in self.customers: 
            if customer.finish_time == -1: 
                customer.finish_time = i
                # print(f"customer_not_finished: {customer.type.name}, {customer.enter_time}-{customer.finish_time}")

    def get_w_times(self): 
        w = [c.finish_time - c.enter_time for c in self.customers]
        return w




        

In [4]:
def simulate(workers_count, simulation_time):

    services = {}
    workers = []

    for name in service_times: 
        service_type = ServiceType(name)
        service = Service(service_type)
        services[name] = service

    for job in transition_matrices: 
        worker_type = WorkerType(job)
        for i in range(workers_count): 
            w = Worker(worker_type)
            workers.append(w)

    for i in range(simulation_time): # each iteratnio is 1 minute
        # print(f"============= {i} ===============")
        # print([str(worker) for worker in workers])
        
        # add new customers 
        for service in services.values():
            service.customer_arrived(i)

        # remove finished customers
        for worker in workers: 
            if worker.customer_finished(i):
                services[worker.current_job].working_customers -= 1

        # move workers
        for worker in workers: 
            customer = worker.maybe_move()
            if customer: 
                services[customer.type.name].enqueue(customer)

        # update queues
        for worker in workers: 
            job = worker.needs_customer()
            if job: 
                customer = services[job].dequeue()
                # print(customer)
                if customer: 
                    worker.take_customer(customer)
                    services[job].working_customers += 1

    for service in services:
        services[service].finish(i)

    return services, workers
                






In [5]:
services, workers = simulate(1, 1000)
A = {}
S = {}
W = {}
W_Q = {}
L = {}
L_Q = {}

workers_times = [worker.works_on_jobs for worker in workers]

for service in services: 
    A[service] = services[service].A
    S[service] = services[service].get_service_times()
    W[service] = services[service].get_w_times()
    W_Q[service] = [W[service][i] - S[service][i] for i in range(len(S[service]))]
    L[service] = services[service].L
    L_Q[service] = services[service].L_Q



print(A)
print(S)
print(W)
print(W_Q)
print(L)
print(L_Q)

print(workers_times)

worker moved: complain -> contract
worker moved: request -> confirm
worker moved: contract -> complain
worker moved: complain -> contract
worker moved: contract -> complain
worker moved: complain -> contract
worker moved: contract -> complain
worker moved: complain -> contract
worker moved: contract -> complain
worker moved: confirm -> review
worker moved: review -> request
worker moved: review -> request
worker moved: complain -> contract
worker moved: contract -> complain
worker moved: request -> review
worker moved: request -> confirm
worker moved: confirm -> request
worker moved: review -> request
worker moved: complain -> contract
worker moved: request -> confirm
worker moved: request -> review
worker moved: confirm -> request
worker moved: contract -> complain
worker moved: request -> confirm
worker moved: complain -> contract
worker moved: review -> request
worker moved: contract -> complain
worker moved: request -> review
worker moved: confirm -> review
worker moved: complain -