In [1]:
from random import *
from math import *
from copy import *
from statistics import mean 
from IPython.display import Image
import cairocffi as cairo
import pandas as pd
import numpy as np

## Visualisation tools

In [2]:
from dataclasses import dataclass

import cairocffi as cairo

@dataclass
class Vector2i():
    x: int = 0
    y: int = 0

class Visualizer():
    def __init__(self,
                 width=800,
                 height=800,
                 x_offset=0.1,
                 y_offset=0.1):
        self.height = height
        self.width = width
        self.x_offset = x_offset
        self.y_offset = y_offset
        self.surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, height, width)
        self.context = cairo.Context(self.surface)

    def draw_gantt(self, scheduler, filename, grid_draw = False):
        self._setup_canvas()
        max_time = self._setup_xaxis(scheduler.work_duration())
        num_servers = self._setup_yaxis(scheduler.server_count)
        width_scaling = self._scale_axis(self.x_offset, max_time)
        height_scaling = self._scale_axis(self.y_offset, num_servers)

        jobs_colors = {
            j_id: self._generate_color()
            for j_id in scheduler.job_ids()
        }

        ids = scheduler.server_ids()
        servers_ypos = {
            id: ypos
            for id, ypos in zip(ids, self._server_yposition(ids))
        }

        for task in scheduler.tasks:
            if type(task) is Power_off:
                r_col = 0
                g_col = 0
                b_col = 0
            else:
                j_id = task.job_id
                r_col = jobs_colors[j_id][0]
                g_col = jobs_colors[j_id][1]
                b_col = jobs_colors[j_id][2]
            t_time = task.end_time - task.start_time
            for server in task.servers:
                alpha = 0.5 if isinstance(task, Reconfiguration) else 1.0
                tl = Vector2i(self.x_offset + task.start_time * width_scaling,
                              servers_ypos[server.id])
                size = Vector2i(width_scaling * t_time, height_scaling)
                self._draw_rectangle(tl=tl,
                                     size=size,
                                     r=r_col,
                                     g=g_col,
                                     b=b_col,
                                     alpha=alpha)

        if (grid_draw):
            self._draw_grid(scheduler.work_duration(), num_servers)

        self.context.set_font_size(0.05)
        self._set_labels(xlabel="Time",
                         xlabel_pos=Vector2i(0.8, 0.97),
                         ylabel="Servers",
                         ylabel_pos=Vector2i(0, 0.07))
        self._save_to(filename)

    def _setup_canvas(self, height=800, width=800):
        self._normalize_canvas()
        self._draw_rectangle()

    def _normalize_canvas(self):
        self.context.scale(self.height, self.width)

    def _setup_xaxis(self, xmax):
        return 1.01 * xmax

    def _setup_yaxis(self, ymax):
        return ymax

    def _scale_axis(self, offset, max_value):
        return (1 - offset * 2) / max_value

    def _generate_color(self):
        return [random() for j in range(3)]

    def _server_yposition(self, servers):
        y_positions = []
        server_count = len(servers)
        height_scaling = self._scale_axis(self.y_offset, server_count)
        for i in range(server_count):
            y_positions.append(self.y_offset + i * height_scaling)
        return y_positions

    def _draw_rectangle(self,
                        tl=Vector2i(),
                        size=Vector2i(1,1),
                        r=1,
                        g=1,
                        b=1,
                        alpha=1.0):
        self.context.rectangle(tl.x, tl.y, size.x, size.y)
        self._set_color(r, g, b, alpha)
        self.context.fill()

    def _set_color(self, r=0, g=0, b=0, alpha=1.0):
        self.context.set_source_rgba(r, g, b, alpha)

    def _draw_grid(self, xmax, ymax):
        scaled_x = self._setup_xaxis(xmax)
        width_scaling = self._scale_axis(self.x_offset, scaled_x)
        height_scaling = self._scale_axis(self.y_offset, ymax)

        for i in range(ceil((xmax + 1)/1000)):
            pos = Vector2i(self.x_offset + (i * width_scaling)*1000, self.y_offset)
            self._move_cursor(pos)
            pos.y = 1 - pos.y
            self._draw_line(pos)

        for j in range(ymax + 1):
            pos = Vector2i(self.x_offset, self.y_offset + j * height_scaling)
            self._move_cursor(pos)
            pos.x += xmax
            self._draw_line(pos)

    def _move_cursor(self, position):
        self.context.move_to(position.x, position.y)

    def _draw_line(self, position, line_width=0.001, r=0, g=0, b=0, alpha=0.9):
        self.context.line_to(position.x, position.y)
        self._set_color(r, g, b, alpha)
        self.context.set_line_width(line_width)
        self.context.stroke()

    def _set_labels(self, xlabel, xlabel_pos, ylabel, ylabel_pos):
        self._move_cursor(xlabel_pos)
        self.context.show_text(xlabel)

        self._move_cursor(ylabel_pos)
        self.context.show_text(ylabel)

    def _save_to(self, filename):
        self.surface.write_to_png(filename)


## Objective:

minimize(max $\frac{t_{termination} - t_{submission}}{mass})$

## Job class
Initial parameters: <br>
- name : unique id of the job<br>
- sub_time: submission time<br>
- alpha : speed up factor (can be thought of as the communication time), should be somewhere between 0.5 and 1
- data: data to be processed, required for the reconfiguration time (see equation below)
- mass: mass of computation : area if run with one server 
- min_num_servers : minimum number of servers required for the job to be run
- max_num_servers : maximum number of servers the job should be run on
<br>



### Jobs and tasks
A job is broken into smaller tasks that can be processed on a different amount of servers. The number of servers used will impact the processing time according to the equation shown below.


### Processing time of a task
$time = \frac{mass}{(num\_servers)^{alpha}}$<br>
or<br>
$mass= time \times (num\_servers)^{alpha}$


### Reconfiguration time
Time required to reconfigure the data if the number of servers is either increased or decreased. <br>
if $n\geq m$:<br>
$T_{n\rightarrow m} = \frac{D}{n}(\lceil \frac{n}{m}\rceil - 1)$<br>
if $n\leq m$:<br>
$T_{n\rightarrow m} = \frac{D}{m}(\lceil \frac{m}{n}\rceil - 1)$<br>

where $D$ is the data amount in the application, $n$ number of servers in the previous process and $m$ the number of servers in the new process

In [3]:
class Job():
    def __init__(self, id, sub_time, alpha, data, mass, min_num_servers,
                 max_num_servers):
        self.id = id
        self.sub_time = sub_time
        self.alpha = alpha
        self.data = data
        self.mass = mass
        self.min_num_servers = min_num_servers
        self.max_num_servers = max_num_servers

    def __eq__(self, other):
        return self.id == other.id


## Task class
Initial parameters: <br>
- job_name: The job to which the task belongs
- start_time
- end_time
- mass_executed
- servers : list of servers the task is executed on

In [4]:
class Task(object):
    def __init__(self, job_id, mass_executed, servers, start_time, end_time):
        self.job_id = job_id
        self.mass_executed = mass_executed
        self.servers = servers
        self.start_time = start_time
        self.end_time = end_time

    def __repr__(self):
        return "name: {}, mass_exec: {}, #servers: {} start_time: {}, end_time: {}".format(
            self.job_id, self.mass_executed, len(self.servers),
            self.start_time, self.end_time)

    def from_job(self, job):
        return self.job_id == job.id



class Reconfiguration(Task):
    def __init__(self, job_id, servers, start_time, end_time):
        Task.__init__(self, job_id, 0, servers, start_time, end_time)

    def __str__(self):
        return "name: {}, reconfiguration, mass_exec: {}, #servers: {} start_time: {}, end_time: {}".format(
            self.job_id, self.mass_executed, len(self.servers),
            self.start_time, self.end_time)

class Power_off(Task):
    def __init__(self, servers, start_time, end_time):
        Task.__init__(self, "power off", 0, servers, start_time, end_time)
    def __str__(self):
        return "Power off, #servers: {} start_time: {}, end_time: {}".format(
            len(self.servers), self.start_time, self.end_time)

## Server

A server is a computation unit 

In [5]:
class Server():
    def __init__(self, id):
        self.id = id

    def __eq__(self, other):
        return self.id == other.id


## Energy


values from Poquet 2017, batsim phd thesis <br>
Energy notes:<br>
Energy is saved by turning off and back on computer compared to idle state if the time off is at least 230s, meaning a total time to switch off + off time + starting time is at least 362s


In [6]:
class Energy():
    def power_idle(self):
        return 95
    
    def energy_idle(self, time):
        return self.power_idle()*time
    
    def energy_computing(self,time):
        return 191*time
    
    def power_off_on(self):
        return 125
    
    def time_off_on(self):
        return 151
    
    def power_on_off(self):
        return 101
    
    def time_on_off(self):
        return 6
    
    def power_off(self):
        return 10
    
    def energy_off(self, time):
        off_time = time - self.time_off_on() - self.time_on_off()
        energy = self.power_off_on() * self.time_off_on() + self.power_on_off() * self.time_on_off() 
        energy = energy + off_time * self.power_off()
        return energy

# Scheduler - state of whole thing

Manager should be able to access objects and query.<br>

## Tuning parameters:
### Power off parameters
**server_threshold**<br>
Maximum ratio of servers busy to consider shutting down. For example a value of 0.9 would mean that powering off will be considered only if 90% of the servers are busy.<br>
Range: 0 to 1, should probably be above 0.5

**ratio_almost_finished_jobs** <br>
Max ratio of jobs that needs to be almost finished for a shut down to be considered. <br>
Range: 0 to 1

**time_remaining_for_power_off** = 370<br>
Time that defines what is an almost finished jobs. <br>
Should probably be at least 370 and less than shut_down_time


**shut_down_time** = 800<br>
How long the servers will be shut down for, should be at least 370 as under there is no energy gain in shutting down the server.

### Reconfiguration parameters
**estimated_improv_threshold** <br>
Fraction of the remaining estimated time of the task that the reconfiguration would take. For example a value of 0.9 would mean that a reconfiguration would take place only if the reconfigured task would take 90% or less time than the estimated task without reconfiguration.<br>
Range: 0 to 1, should probably be above 0.5


### Servers allocation parameters
The number of servers allocated depends on the number of servers available, the minimum and maximum number of servers requested as well as the alpha value. A low alpha value implies that the increasing the number of servers would provide only a small improvement. To deal with that the alpha values are placed in three ranges and then according the number of possible servers allocated is calculated as a ratio of the servers available.<br>

**alpha_lower** and **alpha_mid** should both be between 0.5 and 1 with alpha_lower<alpha_mid<br>
alpha_mid = 0.75<br>

**alpha_min_server_lower_range**,
**alpha_min_server_mid_range** and 
**alpha_min_server_upper_range** should all be between be between 0.1 and 1 with alpha_min_server_lower_range < alpha_min_server_mid_range < alpha_min_server_upper_range<br>


In [69]:
class Scheduler(object):
    def __init__(self, servers):
        self.servers = servers
        self.jobs = []
        self.tasks = []
        self.job_queued = []
        
        # tuning parameters
        self.server_threshold = 0.7
        self.ratio_almost_finished_jobs = 0.8
        
        self.time_remaining_for_power_off = 370
        self.shut_down_time = 800
        
        self.estimated_improv_threshold = 0.9 #ratio of the remaining time of the reconfigured to the original time
        
        self.alpha_min_server_lower_range = 0.4
        self.alpha_min_server_mid_range = 0.6
        self.alpha_min_server_upper_range = 1
        
        self.alpha_lower = 0.65
        self.alpha_mid = 0.75
        
        #Cost function weights
        self.stretch_time_weight = 1
        self.energy_weight = 1
        
    
    def set_tuning_params(self, server_threshold,ratio_almost_finished_jobs, time_remaining_for_power_off, shut_down_time,
                         estimated_improv_threshold, alpha_min_server_lower_range, alpha_min_server_mid_range, 
                         alpha_min_server_upper_range, alpha_lower, alpha_mid):
        self.server_threshold = server_threshold
        self.ratio_almost_finished_jobs = ratio_almost_finished_jobs     
        self.time_remaining_for_power_off = time_remaining_for_power_off
        self.shut_down_time = shut_down_time        
        self.estimated_improv_threshold = estimated_improv_threshold        
        self.alpha_min_server_lower_range = alpha_min_server_lower_range
        self.alpha_min_server_mid_range = alpha_min_server_mid_range
        self.alpha_min_server_upper_range = alpha_min_server_upper_range  
        self.alpha_lower = alpha_lower
        self.alpha_mid = alpha_mid
    
    def schedule1(self, job):
        self.jobs.append(job)
        self.job_queued.append(job)
        self.update_schedule1(job.sub_time)
    
    def update_schedule1(self, time):
        if self._is_ratio_available_servers_above_threshold(time):
            while self.job_queued:
                self._order_queue_by_sub_time()
                job_to_sched = self.job_queued[0]
                num_servers_to_use = self._num_server_alloc(job_to_sched, time)
                if num_servers_to_use > 0:
                    available_servers = self._available_servers(time)
                    self._schedule_task_given_num_servers(num_servers_to_use, available_servers, job_to_sched, time)
                    self.job_queued.remove(job_to_sched)
                else:
                    break
                if self._is_ratio_available_servers_above_threshold(time):
                    break
        
        if self._is_ratio_available_servers_above_threshold(time):     
            #order job by remaining mass
            jobs_by_mass = self._jobs_by_mass_remaining(time)
            while jobs_by_mass:
                job_reconfig = jobs_by_mass[0]
                if self._is_job_reconfigurable(job_reconfig, time):
                    task_to_reconfig = self._current_task_of_job(job_reconfig, time)
                    self._reconfigure_task(task_to_reconfig, time)
                    jobs_by_mass.remove(job_reconfig)
                if self._is_ratio_available_servers_above_threshold(time):
                    break
                
        if  self._num_available_servers(time)!=0 and not self._is_ratio_available_servers_above_threshold(time):
            num_jobs_currently_executed = 0
            num_jobs_finishing_under_threshold_time = 0
            for job in self.jobs:
                job_termination_time = self._job_termination_time(job)
                if job_termination_time > time:
                    num_jobs_currently_executed += 1
                    time_remaining = job_termination_time - time
                    if time_remaining < self.time_remaining_for_power_off:
                        num_jobs_finishing_under_threshold_time += 1
            if num_jobs_currently_executed > 0 and num_jobs_finishing_under_threshold_time/num_jobs_currently_executed <= self.ratio_almost_finished_jobs:
                available_servers = self._available_servers(time)
                self.turn_off_servers(available_servers, time)
                    
            
    ############################################
    # Reconfigure a job if possible
    
    def schedule_simple(self, job):
        self.jobs.append(job)

        #schedule the first job into one task
        if len(self.tasks) == 0:
            self._schedule_task(self.servers, job, job.sub_time)
        #Schedule the jobs
        else:
            #Get the servers available at submission time
            available_servers = self._available_servers(job.sub_time)
            if (len(available_servers) < job.min_num_servers):
                self.job_queued.append(job)
                return
            #Schedule task
            else:
                self._schedule_task(available_servers, job, job.sub_time)

    def update_schedule_simple(self, time):
        #get the free servers
        available_servers = self._available_servers(time + 0.01)
        num_available_servers = len(available_servers)

        #print('Number of available servers at update: ', num_available_servers)

        #Check if there is job queued
        if (self.job_queued):
            #Try to schedule each job
            for job in self.job_queued:
                if (num_available_servers > job.min_num_servers):
                    self._schedule_task(available_servers, job, time)
                    #remove job from queue
                    self.job_queued.remove(job)
                    #update list of servers available
                    available_servers = self._available_servers(time + 0.01)
                    num_available_servers = len(available_servers)

        #find tasks that could be reconfigured
        #List of tasks for which the possible change of the number of servers is greater than 0
        tasks_candidates = [
            t for t in self.tasks if
            self._task_possible_inc_num_ser(t, time, num_available_servers) > 0
        ]

        if (len(tasks_candidates) == 0):
            return

        task_to_reconfig = tasks_candidates[0]
        self._reconfigure_task(task_to_reconfig, time)

    def _reconfigure_task(self, task, time):
            available_servers = self._available_servers(time + 0.01)
            num_available_servers = len(available_servers)
            # Return the list of new servers to execute the task
            def reallocate_task_servers(task):
                extra_srv_count = self._task_possible_inc_num_ser(
                    task, time, num_available_servers)
                task_servers = [s for s in task.servers]
                for i in range(extra_srv_count):
                    task_servers.append(available_servers[i])
                return task_servers

            # Reconfigure and update the task
            # Update the task end_time and mass_executed.
            def interrupt_task(task, job):
                task.end_time = time
                exec_time = task.end_time - task.start_time
                task.mass_executed = self._mass_exec(job.alpha,
                                                     len(task.servers),
                                                     exec_time)

        #Create a new task for reconfiguration

            def make_reconfiguration(job, servers):
                reconfig_time = self._reconfig_time(job.data,
                                                    len(task.servers),
                                                    len(servers))
                reconfig_end_time = time + reconfig_time
                reconfig = Reconfiguration(job.id, servers, time,
                                           reconfig_end_time)
                self.tasks.append(reconfig)
                return reconfig

            # Create a task to finish job after reconfig
            def reschedule_interrupted(job, reconfig, servers):
                mass_left = job.mass - self._mass_executed(job, time)
                exec_time = self._exec_time(mass_left, job.alpha, len(servers))
                start_time = reconfig.end_time
                end_time = reconfig.end_time + exec_time
                mass_executed = mass_left
                self.tasks.append(
                    Task(job.id, mass_executed, servers, start_time, end_time))

            task_job = self._task_job(task)
            task_servers = reallocate_task_servers(task)
            interrupt_task(task, task_job)
            reconfig = make_reconfiguration(task_job, task_servers)
            reschedule_interrupted(task_job, reconfig, task_servers)
    
    def turn_off_servers(self, servers, time):
        self.tasks.append(Power_off(servers, time, time+self.shut_down_time))
            
    #################################################################
    def _is_job_reconfigurable(self, job, time):
        #print(job.id)
        if abs(job.sub_time - time) < 0.001:
            return False      
        remaining_time = self._job_termination_time(job) - time
        #task to reconfigure
        task = self._current_task_of_job(job, time)
        if task is None or type(task) is Reconfiguration:
            return False
        #estimate exec time if task is reconfigured
        current_srv_count = len(task.servers)
        extra_srv_count = self._task_possible_inc_num_ser(
                    task, time, self._num_available_servers(time))
        new_srv_count = current_srv_count + extra_srv_count
        # Reconfiguration time
        reconfig_time = self._reconfig_time(job.data,
                                                    current_srv_count,
                                                    new_srv_count)
        
        #Mass remaining to execute until time
        mass_left = task.mass_executed - self._mass_exec(job.alpha,len(task.servers), time - task.start_time)
        
        #execution time on new srv count
        exec_time = self._exec_time(mass_left, job.alpha, new_srv_count)
        if (reconfig_time + exec_time) / remaining_time < self.estimated_improv_threshold :
            return True
        else:
            return False
        
    def _current_task_of_job(self, job, time):
        for t in self.tasks:
            if time > t.start_time and time < t.end_time and t.job_id == job.id:
                return t
    
    def _job_termination_time(self, job):
        term_time = -1
        for t in self.tasks:
            if t.job_id == job.id and t.end_time > term_time:
                term_time = t.end_time
        return term_time
    
    def _job_start_time(self, job):
        start_time = inf
        for t in self.tasks:
            if t.job_id == job.id and t.start_time < start_time:
                start_time = t.start_time
        return start_time
    
    ######## Can be probably tuned as well #################
    def _num_server_alloc(self, job, time):
        num_available_servers = self._num_available_servers(time)
        
        if job.alpha > self.alpha_mid :
            min_servers = ceil(self.alpha_min_server_upper_range * len(self.servers))
            return min(min_servers, job.max_num_servers, num_available_servers)
        elif job.alpha > self.alpha_lower:
            min_servers = ceil(self.alpha_min_server_mid_range * len(self.servers))
            return min(min_servers, job.max_num_servers, num_available_servers)
        else:
            min_servers = ceil(self.alpha_min_server_lower_range * len(self.servers))
            return min(min_servers,job.max_num_servers, num_available_servers)
    
    def _order_queue_by_sub_time(self):
        self.job_queued = sorted(self.job_queued, key=lambda k: k.sub_time) 
    
    
    def _is_ratio_available_servers_above_threshold(self, time):
        if self._ratio_available_servers(time) > self.server_threshold:
            return True
        else:
            return False
    
    def _ratio_available_servers(self, time):
        return len(self._available_servers(time))/len(self.servers)
    
    def _num_available_servers(self,time):
        return len(self._available_servers(time))
    
    # returns a list of servers not utilized at a given time
    def _available_servers(self, time):
        #Start with all servers as potential servers
        candidate_servers = [s for s in self.servers]
        #remove servers that are busy at the given time
        for t in self.tasks:
            if not (t.start_time <= time and t.end_time > time):
                continue
            for s in t.servers:
                if s in candidate_servers:
                    candidate_servers.remove(s)
        return candidate_servers
    
    def _schedule_task_given_num_servers(self, num_servers, servers, job, time):
        servers_selec = sample(servers, k=num_servers)
        exec_time = self._exec_time(job.mass, job.alpha, num_servers)
        self.tasks.append(
            Task(job.id, job.mass, servers_selec, time, time + exec_time))
    
    def _schedule_task(self, servers, job, time):
        num_servers = min(job.max_num_servers, len(servers))
        servers = sample(servers, k=num_servers)
        exec_time = self._exec_time(job.mass, job.alpha, num_servers)
        self.tasks.append(
            Task(job.id, job.mass, servers, time, time + exec_time))

    # Returns the possible increase in the number of servers for a task
    # given that num_servers are not busy
    def _task_possible_inc_num_ser(self, task, time, num_servers):
        #job = list(filter(lambda j: (j.id == task.job_id), self.jobs))[0]
        if (task.end_time <= time):
            return 0
        job = next(j for j in self.jobs if (j.id == task.job_id))
        task_num_servers = len(task.servers)
        if (task_num_servers == job.max_num_servers):
            return 0
        elif (task_num_servers + num_servers > job.max_num_servers):
            return job.max_num_servers - task_num_servers
        else:
            return num_servers

    # Formula for communication time
    def _mass_exec(self, alpha, num_serv, exec_time):
        return exec_time * (num_serv)**alpha

    def _exec_time(self, mass, alpha, num_serv):
        return mass / (num_serv)**alpha

    #Calculates the reconfiguration time
    def _reconfig_time(self, data, init_servers, final_servers):
        if init_servers > final_servers:
            return data / init_servers * (ceil(init_servers / final_servers) -
                                          1)
        return data / final_servers * (ceil(final_servers / init_servers) - 1)

    
    ############################################################
    def _jobs_by_mass_remaining(self, time):
        jobs_by_mass = []
        for j in self.jobs:
            if j.sub_time < time:
                m = self._mass_remaining(j, time)
                if m > 0:
                    jobs_by_mass.append([j,m])
        jobs_by_mass = sorted(jobs_by_mass, key=lambda k: k[1], reverse=True) 
        return [j[0] for j in jobs_by_mass]
    
    def _mass_remaining(self, job, time):
        return job.mass - self._mass_executed_at_time(job, time)
    
    #Finds how much mass has been executing of job at time t
    def _mass_executed_at_time(self, job, time):
        mass_ex = 0
        for t in self.tasks:
            if t.job_id == job.id:
                if t.end_time <= time:
                    mass_ex += t.mass_executed
                else:
                    mass_ex += t.mass_executed - self._mass_exec(job.alpha,len(t.servers), time - t.start_time)
        return mass_ex

    #Finds how much mass has been executing of job
    def _mass_executed(self, job, time):
        mass_ex = 0
        for t in self.tasks:
            if t.job_id == job.id:
                mass_ex += t.mass_executed
        return mass_ex

    # Returns the makespan
    def work_duration(self):
        min_time = min([t.start_time for t in self.tasks])
        max_time = max([t.end_time for t in self.tasks])
        return max_time - min_time

    def job_ids(self):
        return [j.id for j in self.jobs]

    def server_ids(self):  # server_id_list
        return [s.id for s in self.servers]

    @property
    def server_count(self):
        return len(self.servers)

    #Returns the job that a task is executing
    def _task_job(self, task):
        return next(j for j in self.jobs if task.from_job(j))
    
    def stretch_time(self, job):
        #find the termination time from the tasks of that job and subtime, divide the difference by the mass
        term_time = -1
        for t in self.tasks:
            if t.job_id == job.id and t.end_time > term_time:
                term_time = t.end_time
        return (term_time - job.sub_time) / job.mass

    def stretch_times(self):
        #array of all the stretch times
        return [self.stretch_time(j) for j in self.jobs]

    def average_stretch_time(self):
        return mean(self.stretch_times())

    def max_stretch_time(self):
        return max(self.stretch_times())
    
    def average_power(self):
        work_duration = self.work_duration()
        serv_count = len(self.servers)
        total_energy = 0
        area = 0
        energy_calc = Energy()
        for task in self.tasks:
            task_duration = task.end_time - task.start_time
            task_num_servers = len(task.servers)
            if type(task) is Power_off:
                total_energy = total_energy + energy_calc.energy_off(task_duration) * task_num_servers
            else:
                total_energy = total_energy + energy_calc.energy_computing(task_duration) * task_num_servers
            area = area + task_duration * task_num_servers
        #adding idle time
        energy_idle = (work_duration * serv_count - area) * energy_calc.power_idle()
        total_energy = total_energy + energy_idle
        return total_energy
    
    def normalized_average_power(self):
        work_duration = self.work_duration()
        serv_count = len(self.servers)
        energy_calc = Energy()
        idle_power = work_duration * serv_count * energy_calc.power_idle()
        return self.average_power() / idle_power
    
    def num_reconfig_task(self):
        num = 0
        for t in self.tasks:
            if type(t) is Reconfiguration:
                num = num + 1
        return num
    
    def num_power_off(self):
        num = 0
        for t in self.tasks:
            if type(t) is Power_off:
                num = num + 1
        return num
    
    def cost_function(self):
        return self.stretch_time_weight * self.average_stretch_time() + self.energy_weight * self.normalized_average_power()
    
    def summary(self):
        print("Number of servers: {}".format(len(self.servers)))
        print("Number of jobs scheduled: {}".format(len(self.jobs)))
        print("Number of reconfigurations: {}".format(self.num_reconfig_task()))
        print("Number of power offs: {}".format(self.num_power_off()))
        print("Total work time: {}".format(self.work_duration()))
        print("Cost function value: {}".format(self.cost_function()))
        
    def stats(self):
        #num reconfig, num power off, min stretch, max stretch, mean stretch, std stretch, av power, cost function
        stretch_times = np.array(self.stretch_times())
        stats = np.array([self.num_reconfig_task(), self.num_power_off(), np.min(stretch_times), np.max(stretch_times), np.mean(stretch_times), np.std(stretch_times), self.normalized_average_power(), self.cost_function()])
        return stats

Initialize the scheduler

In [94]:
class Experiments():
    def __init__(self, num_expts, seeds=2):
        self.seeds = seeds
        self.num_expts = num_expts  
        self.cols = ["num reconfig", "num power off", "min stretch", "max stretch", "mean stretch", "std stretch", "av power", "cost function"]
        self.stats = pd.DataFrame(columns=self.cols)
        self.tuning_params = []
        self.scheduler = self.generate_scheduler()
    
    def generate_job(self, num):  
        seed(self.seeds)
        sub_time = uniform(0,3000)
        alpha = uniform(0.5,1)
        data = uniform(10,1000)
        mass = uniform(10, 50000)
        min_num_servers = randrange(1,9)
        max_num_servers = randrange(min_num_servers,10)
        return Job('job' + str(num), sub_time, alpha, data, mass, min_num_servers, max_num_servers)

    def generate_jobs(self):
        jobs = [self.generate_job(i) for i in range(30)]
        jobs = sorted(jobs, key=lambda k: k.sub_time) 
        return jobs
    
    def generate_servers(self):
        servers = [Server('server'+str(i)) for i in range(10)]
        return servers
    
    def generate_scheduler(self):
        scheduler = Scheduler(self.generate_servers())
        return scheduler
    
    '''server_threshold,ratio_almost_finished_jobs, time_remaining_for_power_off, shut_down_time,
                         estimated_improv_threshold, alpha_min_server_lower_range, alpha_min_server_mid_range, 
                         alpha_min_server_upper_range, alpha_lower, alpha_mid'''
    def set_tuning_params(self,tuning_params):
        self.tuning_params = tuning_params
        self.scheduler.set_tuning_params(tuning_params[0], tuning_params[1], tuning_params[2], tuning_params[3],
                                        tuning_params[4], tuning_params[5], tuning_params[6], tuning_params[7], 
                                         tuning_params[8], tuning_params[9])
        
    
    def run_expt(self):
        jobs = self.generate_jobs()
        
        for t in range(18000):
            time = t * 10
            for job in jobs:
                if job.sub_time<=time:
                    self.scheduler.schedule1(job)
                    jobs.remove(job)
                else:
                    break
            self.scheduler.update_schedule1(time)
        expt_stats = pd.DataFrame([self.scheduler.stats()], columns=self.cols)
        self.stats = pd.concat([self.stats, expt_stats])
    
    def run_expts(self):
        for i in range (self.num_expts):
            self.run_expt()
    
    def get_stats(self):
        return self.stats
    
    def get_mean_cost(self):
        return self.stats['cost function'].mean()
    
    def get_tuning_params(self):
        return self.tuning_params

In [95]:
def generate_random_tuning_params(s):
    seed(s)
    server_threshold = uniform(0.5,0.91)
    ratio_almost_finished_jobs = uniform(0.5,0.91)
    time_remaining_for_power_off = uniform(0.5,0.91)
    shut_down_time = uniform(370,600)
    estimated_improv_threshold = uniform(shut_down_time,shut_down_time*2)
    alpha_min_server_lower_range = uniform(0.01,0.4) 
    alpha_min_server_mid_range = uniform(alpha_min_server_lower_range,alpha_min_server_lower_range*2) 
    alpha_min_server_upper_range =uniform(alpha_min_server_mid_range,1) 
    alpha_lower = uniform(0.5,0.7)
    alpha_mid = uniform(alpha_lower,0.9)
    return [server_threshold,ratio_almost_finished_jobs, time_remaining_for_power_off, shut_down_time,
                         estimated_improv_threshold, alpha_min_server_lower_range, alpha_min_server_mid_range, 
                         alpha_min_server_upper_range, alpha_lower, alpha_mid]

In [80]:
#Running an experiment
experiments = Experiments(2)
experiments.set_tuning_params(generate_random_tuning_params())
experiments.run_expts()
print(experiments.get_stats())
print(experiments.get_mean_cost())
print(experiments.get_tuning_params())

10.334170557573383
[0.6472683462787974, 0.7621073231118541, 0.6309313042244274, 407.1070632175762, 549.5257451286685, 0.056621632558840486, 0.0893316139796527, 0.41481627698860823, 0.613457604656789, 0.763975951395836]


In [100]:
#Find parameters through random search
params_cost = []
for i in range (30):
    experiments = Experiments(10)
    experiments.set_tuning_params(generate_random_tuning_params(i))
    experiments.run_expts()
    params_cost.append([experiments.get_tuning_params(), experiments.get_mean_cost()])

In [101]:
params_cost

[[[0.8462129591252698,
   0.810761305205524,
   0.6724343481406465,
   429.5508525673816,
   649.1693450274179,
   0.1679243136056616,
   0.29954315367441187,
   0.5120006292340744,
   0.5953193908304711,
   0.7730645859901921],
  22.612643098422033],
 [[0.5550893400860845,
   0.8474478321442654,
   0.8131475937804118,
   428.665875920067,
   641.0419914898685,
   0.18530151526760788,
   0.3060426804508609,
   0.853383023070676,
   0.518771917354847,
   0.5295787714771599],
  21.447701634744583],
 [[0.8919740514745923,
   0.8886092696943333,
   0.5231860607679916,
   389.52055888655195,
   714.9645488446223,
   0.2970282957367241,
   0.495957175479585,
   0.6512711459005323,
   0.6211888331356925,
   0.7903719325473793],
  28.166105197229523],
 [[0.5975654971076755,
   0.7231339823713403,
   0.6516816182847125,
   508.9016088771247,
   827.3316783447972,
   0.03555625510352711,
   0.036024459570453304,
   0.8433241705774247,
   0.5518708028656015,
   0.63344825219852],
  21.85493430972

In [104]:
params_cost = sorted(params_cost, key=lambda k: k[1]) 

In [105]:
print("Best parameters are : {}".format(params_cost[0]))

Best parameters are : [[0.6898330167041589, 0.6530578918719673, 0.5568011591309266, 569.3092254968585, 572.9727611518132, 0.20608501122036127, 0.39121075845362097, 0.4404098462179984, 0.6108540936356572, 0.7891559291370227], 18.07112499611286]


In [106]:
print([p[1] for p in params_cost])

[18.07112499611286, 18.094107787561235, 18.121809094965393, 20.45789270319069, 20.45881532291546, 21.23407965833021, 21.37693605775913, 21.398914522551088, 21.447701634744583, 21.57012638706725, 21.845196067889486, 21.854934309722008, 21.863067573217272, 21.87231694802399, 21.937764927618353, 22.612643098422033, 22.75652274186627, 23.137910921383945, 23.43136640826962, 23.68064499023452, 23.736029406848765, 25.898646678665433, 26.05236312055043, 26.054557189265363, 26.37643288959469, 27.667987098816116, 28.166105197229523, 28.38938180121779, 30.15259478664556, 30.333004788753136]
