In [5]:
import random
import scipy.io as sio
import scipy
import operator
import itertools
import collections
from tabulate import tabulate
import numpy as np

N         = 10  # Number of jobs in the queue
P         = 2*N # Number of processors,special case for infinite proc
kk        = 2   # Pairs of applications
qos       = 0.9 # QoS threshold

# loading data
def loaddata(k):
    mat_contents = scipy.io.loadmat('../tmp/dat-multi-'+str(k)+'.mat')
    oct_struct   = mat_contents['dat']
    X ,Y         = oct_struct['xx'], oct_struct['yy']
    X, Y         = X[0,0], Y[0,0]
    return X, Y
    

with open('../data/All_app.tsv') as f:
    app_name = f.read().split('\n')

Y_est     = np.loadtxt('../tmp/Yest'+'.txt')
X,Y       = loaddata(2)
n         = len(app_name)

In [14]:
job_slice = collections.namedtuple('job_slice',\
            ['job_id','name','app_id','type','work','qos','deadline'])

schedule = collections.namedtuple('schedule',['interval','jobs',\
                                              'speeds','est_speeds'])  

def nchoosek_rep(array, k):
    return list(itertools.combinations_with_replacement(array, k))

# generate a random job queue
class job_queue:
    def __init__(self, job_queue = []):
        self._job_queue = job_queue
        
    def __len__(self): 
        return len(self._job_queue)
    
    def __getitem__(self, position): 
        # TODO make it safer 
        if position < 9999:
            return self._job_queue[position]
        else:
            val = None
            for job in self._job_queue:
                if job.job_id == position:
                    val = job
            return val
    
    def __repr__(self):
        return tabulate(list([job for job in self._job_queue] ), headers = job_slice._fields)
    
    def sort(self,  *keyy ,reverse = False): 
        self._job_queue.sort(key=operator.attrgetter(*keyy), reverse = reverse )
    
    def append(self, item): 
        self._job_queue.append(item)
    
#     def remove(self, position): 
#         if position < 9999:
#             del self._job_queue[position]
#         else:
#             for i,job in enumerate(self._job_queue):
#                 if job.job_id == position:
#                     del self._job_queue[i]
    def remove(self, item): 
        #del self._job_queue[item]
        #print('deleting',item)
        for i,job in enumerate(self._job_queue):
            if item is job:
                del self._job_queue[i]
    def random(self, N, qos, app_name):
        """
        :type N:          int: Number of jobs
        :type qos:        float: qos constraint
        :type app_name:   list[str]: application names
        :rtype: job_queue:list[job_slice]: Set of jobs
        """    
        # Harcoding qos job names
        app_name_qos   = ['x264','vips','swish','swaptions','streamcluster',\
                          'STREAM','fluidanimate','blackscholes']
        num_qos   = 0
        job_queue = []
        for i in range(N):
            j_work     = random.choice([100,500,1000])
            if num_qos < P/4:
                j_type     = random.choice(['qos', 'batch'])
            else:
                j_type     = 'batch'

            if j_type      == 'qos':
                  
                j_qos      = qos
                j_deadline = (1+(1-qos))*j_work
                #j_deadline = None
                num_qos    += 1
                indices    = [i for i,item in enumerate(app_name) \
                              if item in app_name_qos]
                j_iden     = random.choice(indices)
                j_name     = app_name[j_iden-1]
            elif j_type    == 'batch':
                j_deadline = random.choice(range(j_work,3000,100))
                j_qos      = None
                indices    = [i for i,item in enumerate(app_name) \
                              if item not in app_name_qos] 
                j_iden     = random.choice(indices)
                j_name     = app_name[j_iden-1]
            job = job_slice(i+10000, j_name, j_iden, j_type,  j_work, j_qos, j_deadline)
            job_queue.append(job);
        #job_queue.sort(key=operator.attrgetter('job'));
        self._job_queue = job_queue
         
# scheduling jobs
class machine:
    def __init__(self, machine_id=0):
        self.utilization   = 0
        self.res           = 0
        self._performance  = []
        
        self._schedules    = [] # list of schedules
        self.machine_id    = machine_id
        ## TODO: variable leak
        self._job_queue    = job_queue([]) # jobs on the machine         
    def fit(self, Y, app_name):
        n         = len(app_name)
        # Harcoding qos job names
        app_name_qos   = ['x264','vips','swish','swaptions','streamcluster',\
                          'STREAM','fluidanimate','blackscholes']
        indices = [i for i,item in enumerate(app_name) \
                              if item in app_name_qos]

        b  = nchoosek_rep(range(n), 2)
        for idx,ii in enumerate(b):
            for i,j in zip(range(len(ii)),ii):
                if j in indices and Y[idx,i] < 0.9:
                    Y[idx,:] = 0
        self._performance = Y
        
    def update(self, job):
        type_id  = [job.type   for job in self._job_queue]
        
        if job.type == 'qos' and 'qos' in type_id:
            return False
        
        self._job_queue.append(job)
        app_id   = list([job.app_id   for job in self._job_queue])
        W        = list([job.work     for job in self._job_queue])
        deadline = list([job.deadline for job in self._job_queue])
        
        A = toMat(app_id, self._performance, n)
        
        while max(W)>0.0001:
            res = scipy.optimize.linprog([1]*A.shape[1], A_eq=A, b_eq=W)
            if 'sched' in locals():
                sched += res.x
            else:    
                sched = res.x
            W = residue_work(app_id, W, n, res.x)
        
        #TODO: add conditions related to deadline for removing a job
#         if random.choice([0,1])==0:
#             self._job_queue.remove(job)
#             return False
#         else:
#             self.utilization = sum(sched)
#             self._schedules  = (A, sched, app_id)
#             return True
            
            self.utilization = sum(sched)
            self._schedules  = (A, sched, app_id)
            return True
    
# when machines actually run the jobs and observe true speeds
# TODO: convert it to class
def toMat(app_id, Y, n):
    X,Y       = loaddata(2)
    a  = nchoosek_rep(app_id, 2)
    aa = nchoosek_rep(range(len(app_id)), 2)
    b  = nchoosek_rep(range(n), 2)
    A  = np.zeros(shape=(len(app_id),len(a)))
    for i in range(len(a)):
        tmp = b.index(tuple(sorted(a[i])))
        sort_index = np.argsort(a[i])
        for j in sort_index:
            A[aa[i][j],i] = Y[tmp, j]

    A = np.concatenate([np.eye(len(app_id)), A.T])
    A = A.T
    return A
def residue_work(app_id, W, n, curr_x):
    X,Y       = loaddata(2)
    A = toMat(app_id, Y, n)
    work_done = np.matmul(A, curr_x)
    return abs(W - work_done)
    
class servers:
    def __init__(self):
        self.machines = []    
        self.utilization = 0
        
    def __getitem__(self, position): 
        return self.machines[position]
    
    def __len__(self): 
        return len(self.machines)
    
    def sort(self,  *keyy ,reverse = False): 
        self.machines.sort(key=operator.attrgetter(*keyy), reverse = reverse )

    def create_machine(self):
        mac  = machine(20000+len(self.machines))
        self.machines.append(mac)
        return mac
    
    def run(self, jobs, Y):
        for job in jobs:
            val = False
            self.sort('utilization') 
            machines_checked = 0
            while val==False:
                if machines_checked < len(self.machines):
                    mac = self.machines[machines_checked]
                else:
                    mac = self.create_machine()
                    mac.fit(Y, app_name)
                machines_checked+=1
                self.utilization = max(mac.utilization,self.utilization)
                val = mac.update(job)
    
    def pprint(self):
        for mac in self.machines:
            print('\n** MACHINE ID: ', mac.machine_id,' ** UTIL', mac.utilization )
            print(mac._job_queue )

In [16]:
jobs = job_queue()
jobs.random(N, qos, app_name)
jobs.sort('job_id')
#print(jobs)

serv_est = servers()
serv_est.run(jobs,  Y_est)

serv = servers()
serv.run(jobs,  Y)

Y_rnd = np.random.rand(Y.shape[0], Y.shape[1])
serv_rnd = servers()
serv_rnd.run(jobs, Y_rnd)
print('Number of machines, TRUE:',len(serv),'EST:', \
      len(serv_est),'RND:', len(serv_rnd))
print('Utilization .       TRUE:',serv.utilization,'EST:', \
      serv_est.utilization,'RND:', serv_rnd.utilization)

serv.pprint()

Number of machines, TRUE: 50 EST: 50 RND: 50
Utilization .       TRUE: 1504.44478589 EST: 1504.44478589 RND: 1504.44478589

** MACHINE ID:  20039  ** UTIL 1381.72957975
  job_id  name            app_id  type      work    qos    deadline
--------  ------------  --------  ------  ------  -----  ----------
   10078  vips                27  qos        100    0.9         110
   10079  lud                 17  batch      100               1900
   10080  fluidanimate         9  batch     1000               1600
   10099  blackscholes         4  batch     1000               2700

** MACHINE ID:  20033  ** UTIL 500.0
  job_id  name      app_id  type      work    qos    deadline
--------  ------  --------  ------  ------  -----  ----------
   10064  bfs            3  qos        500    0.9         550

** MACHINE ID:  20035  ** UTIL 500.0
  job_id  name        app_id  type      work    qos    deadline
--------  --------  --------  ------  ------  -----  ----------
   10068  dijkstra         8  qos

In [None]:
list(zip( nchoosek_rep(range(n), 2), mac._performance))

In [None]:
job_id = list([job.job_id for job in mac._job_queue])
aa = nchoosek_rep(job_id, 1) + nchoosek_rep(job_id, 2)

sched_prelim_pair = [pair for i,pair in enumerate(aa) if res.x[i] > 0.001]
sched_prelim_time = [res.x[i] for i,pair in enumerate(aa) if res.x[i] > 0.001]

print(sched_prelim_pair)
print(sched_prelim_time)