In [6]:
import simpy
import numpy as np
from collections import deque

In [7]:
CONTAINER_PROFILE_DIRECTORY = {
    "S" : 1,
    "M" : 2,
    "L" : 4,
    "XL": 8,
}

CONTAINER_DESTRUCTION_OVHERHEAD = 1
CONTAINER_CREATION_OVERHEAD = 5

In [73]:
class Datacenter(object):
    """Encapsualtes a cluster, its cluster monitor, and job scheduler"""
    
    def __init__(self, env, n_pms, pm_resources, job_profiles, weights):
        self.env = env
        self.n_pms = n_pms
        self.pm_resources = pm_resources
        self.job_profiles = job_profiles
        self.weights = weights
        
    def state_vector(self):
        return np.hstack([self.cluster.vectorize(), self.job_queue.vectorize()])
    
    def action(self, index):
        pass

In [9]:
class Cluster(object):
    def __init__(self, env, N, pm_capacity, job_generator):
        self.total_resources = N*pm_capacity
        labels = ("pm-" + str(i) for i in range(N))
        self.physical_machines = {label: PhysicalMachine(env, pm_capacity) for label in labels}
        self.job_generator = job_generator
        
    def resource_utilization(self):
        resources_utilized = sum(pm.utilized_resources.level for pm in self.physical_machines.values())
        return resources_utilized/self.total_resources
    
    def schedule_job(self, job_index, pm, container_profile):
        job = self.job_generator.job_queue.buffer[job_index]
        pm = self.physical_machines[pm]
        yield env.process(pm.schedule_job(job, container_profile))
        self.job_generator.enqueue_job(job_index)
        
    def vectorize(self):
        return np.hstack([pm.vectorize() for pm in self.physical_machines.values()])

In [20]:
class JobProfile(object):
    def __init__(self, resources, time):
        self.resources = resources
        self.time = time

In [63]:
class JobQueue(object):
    def __init__(self, env, buffer_size):
        self.buffer_size = buffer_size
        self.buffer = [None]*buffer_size
        
    def vectorize(self):
        job_profile_vectors = np.array([job.vectorize() for job in self.buffer])
        return np.hstack(job_profile_vectors)

        
class JobGenerator(object):
    """ creates jobs and places them on a job queue"""
    def __init__(self, env, job_queue_size, job_profiles, weights, log):
        self.env = env
        self.job_profiles = job_profiles
        self.weights = weights
        self.job_queue_size = job_queue_size
        self.job_queue = JobQueue(env, job_queue_size)
        self.log = log
        for i in range(job_queue_size):
            self.job_queue.buffer[i] = self.generate_job()
        
    
    def generate_job(self):
        job_profile = np.random.choice(self.job_profiles, p=self.weights)
        job = Job(job_profile.resources, job_profile.time, self.log)
    
    def enqueue_job(self, job_index):
        new_job = self.generate_job()
        new_job.enqueue()
        self.job_queue.buffer[job_index] = new_job

        
            

In [64]:
class Log(simpy.Store):
    pass

In [65]:
class Job(object):
    
    def __init__(self, resources, time, log):
        self.resources = resources
        self.time = time
        self.log = log
    
    def enqueue(self):
        self.start = self.env.now
    
    def execute(self):
        delay = self.env.now - self.start
        job_slowdown = (delay + self.time)/self.time
        self.log.put(job_slowdown)
        
    def vectorize(self):
        return np.array([self.resource, self.time])

In [66]:
class PhysicalMachine(object):
    
    def __init__(self, env, total_resource_capacity):
        self.env = env
        self.total_resources = total_resource_capacity
        self.available_resources = simpy.Container(env, capacity=total_resource_capacity, init=total_resource_capacity)
        self.containers = {}
        for container_label, resource_requirement in CONTAINER_PROFILE_DIRECTORY.items():
            max_containers = total_resource_capacity//resource_requirement
            self.containers[container_label] = simpy.Container(env, capacity=max_containers, init=0) 
            
        # utilized resources is only incremented when a machine esecutes a job
        self.utilized_resources = simpy.Container(env, capacity=total_resource_capacity, init=0)
        #fill machine randomly with VMs
        
    def add_container(self, profile):
        print("adding container")
        resources_requested = CONTAINER_PROFILE_DIRECTORY[profile]
        if resources_requested < self.available_resources.level:
            self.available_resources.get(resources_requested)
            yield self.env.timeout(CONTAINER_CREATION_OVERHEAD)
            self.containers[profile].put(1)
        else:
            print("not enough resources")

    
    def destroy_container(self, profile):
        if self.containers[profile].level > 0:
            print("destroying container")
            resources_to_free = CONTAINER_PROFILE_DIRECTORY[profile]
            yield self.env.timeout(CONTAINER_DESTRUCTION_OVHERHEAD)
            self.containers[profile].get(1)
            self.available_resources.put(resources_to_free)
        else:
            print("no container available")
            
            
    def schedule_job(self, job, container_profile):
        if self.containers[container_profile].level > 0:
            print("scheduling job")
            self.containers[container_profile].get(1)
            self.utilized_resources.put(job.resources)
            yield self.env.timeout(job.time)
            job.execute()
            print("completing job")
            self.containers[container_profile].put(1)
            self.utilized_resources.get(job.resources)
        else:
            print("no available machines")
            
    def vectorize(self):
        levels = [container.level for container in self.containers.values()]
        return np.array(levels)

In [67]:
# def resource_utilization_monitor(env, cluster, interval=10):
#     while True:
#         yield env.timeout(interval)
#         print(env.now, cluster.resource_utilization())

In [68]:
class ClusterMonitor(object):
    def __init__(self, env, cluster):
        self.env = env
        self.cluster = cluster
        env.process(self.monitor_resource_utilization())
        env.process(self.monitor_job_slowdown())
        
    def monitor_resource_utilization(self, interval=10):
        self.resource_utilization = 0
        while True:
            yield env.timeout(interval)
            self.resource_utilization = cluster.resource_utilization()
            print(env.now, self.get_resource_utilization(), self.get_average_job_slowdown())
    
    def monitor_job_slowdown(self, window=10):
        self.job_reports = simpy.Store(env)
        self.job_records = deque(maxlen=window)
        self.total_jobs = 0
        self.average_job_slowdown = 1.0
        while True:
            job_slowdown = yield self.job_reports.get()
            self.job_records.append(job_slowdown)
            
    def get_resource_utilization(self):
        return self.cluster.resource_utilization()
    
    def get_average_job_slowdown(self):
        return np.average(self.job_records)
    
    def vectorize(self):
        return np.array([self.get_resource_utilizaiton(), self.get_average_job_slowdown()])

In [69]:
def test(env, cluster, cm, jq):
    pm = cluster.physical_machines['pm-1']
    yield env.timeout(1)
    env.process(pm.add_container('S'))
    yield env.timeout(1)
    env.process(pm.add_container('XL'))
    j1 = Job(5,95, jq)
    j2 = Job(1,45, jq)
    yield env.timeout(25)
    
    env.process(cluster.schedule_job(j1, 'pm-1', 'XL'))
    yield env.timeout(35)
    env.process(cluster.schedule_job(j2, 'pm-1', 'S'))
    yield env.timeout(100)
    env.process(pm.destroy_container('XL'))
    yield env.timeout(10)
    env.process(pm.destroy_container('S'))
    yield env.timeout(100)



In [70]:

# env = simpy.Environment()
# log = Log(env)
# SIM_DURATION = 300
# cluster = Cluster(env, 2, 32)
# cluster_monitor = ClusterMonitor(env, cluster)
# env.process(test(env, cluster, cluster_monitor, log))
# env.run(until=SIM_DURATION)

In [72]:
env = simpy.Environment()


log = Log(env)
SIM_DURATION = 300


jp = JobProfile(3, 12)
job_generator = JobGenerator(env, 16, [jp], [1.0], log)
cluster = Cluster(env, 2, 32, job_generator)
cluster_monitor = ClusterMonitor(env, cluster)


def test(env, cluster, cluster_monitor, job_generator, log):
    env.process(pm.add_container('S'))
    env.process(pm.add_container('M'))
    env.process(pm.add_container('L'))
    env.process(pm.add_container('XL'))
    
    







