In [4]:
import pandas as pd
import numpy as np
import math


In [6]:
df = pd.read_csv('google-cluster-data-1.csv',sep=' ')
df

Unnamed: 0,Time,ParentID,TaskID,JobType,NrmlTaskCores,NrmlTaskMem,Unnamed: 6
0,90000,757745334,1488529826,0,0.000000,0.031130,
1,90000,975992247,1488529821,0,0.000000,0.000000,
2,90000,1468458091,1488529832,1,0.021875,0.002353,
3,90000,1460281235,1488529840,0,0.000000,0.000000,
4,90000,1164728954,1488529835,0,0.003125,0.001638,
...,...,...,...,...,...,...,...
3535024,112500,1487094655,1487103476,0,0.000000,0.000879,
3535025,112500,1461321601,1465612301,0,0.000000,0.000879,
3535026,112500,1487094655,1487097223,0,0.000000,0.000879,
3535027,112500,618817162,1485932004,1,0.000000,0.000879,


## Append Execution Time and Turnaround Time information to dataframe
* Add total execution time list to dataframe under key "ExecutionTime". A task queue is created for each time quantum for which tasks are allocated. Tasks are allocated to a given task queue based on their arrival time and remaining execution time. Therefore, total execution time is used to determine which task queues each task should be placed in.
* Add execution time remaining list to datafram under key "ExecutionTimeRemaining". Execution time remaining is used by the SJF algorithm to determine which tasks should be allocated to the VMs first. 
* Add columns to store the turnaround times for each version of the greedy algorithm (power, cost, and just the turnaround time).

In [7]:
exec_time_arr = []
subtasks = 0
quantum = 300
rejectQueue = []
for task in df.itertuples():
    taskID = task.TaskID
    burstTime = ((taskID%2)+1)*quantum      # only two options for burst time
    exec_time_arr.append(burstTime)
    subtasks += burstTime / quantum

time_frame = pd.DataFrame({
    'ExecutionTime': exec_time_arr,
    'TurnaroundTime_Power': [-1]*len(df),
    'TurnaroundTime_Cost': [-1]*len(df),
    'TurnaroundTime': [-1]*len(df)
})
df = df.join(time_frame)
print("Subtasks:", subtasks)

Subtasks: 5298274.0


## Create 100 VMs
Each of them has 7 CPU units and 11 MEM units. You could choose to use a 100*2 matrix to represent this data structure. First column is CPU and second column is MEM.

In [8]:
# Create power and cost dictionaries
Pwr_t = {}
Cost_t = {}

Price_t = [0.5, 0.5, 0.6, 0.6, 0.6, 0.7, 0.7, 0.6, 0.6, 0.8, 0.8, 0.8, 0.8]
totalCost = 0
totalPwr = 0

for time in range(90000,113100,300): 
    Pwr_t[time] = []
    Cost_t[time] = []

In [9]:
def powerCalculator(vms, task_idx, vm_idx):
  c = 5
  a = 100
  b = 200
  dy_pwr_threshold = 0.5
  
  pwr_dy = 0
  pwr_st = 0

  pwr_curr = 0
  pwr_next = 0
  
  cpu = df['NrmlTaskCores'][task_idx]

  # current vm calculation
  cpu_cap = 11

  ccr = vms[0][vm_idx]  # current core remaining 
  usage_rate_curr = 1 - (ccr/cpu_cap)

  if(usage_rate_curr > 0):
    pwr_st = c
  if(usage_rate_curr < dy_pwr_threshold):
    pwr_dy = a*dy_pwr_threshold
  else:
    pwr_dy = (a * dy_pwr_threshold) + b*(usage_rate_curr - dy_pwr_threshold)*(usage_rate_curr - dy_pwr_threshold)
  pwr_curr = pwr_st + pwr_dy

  # next vm calculation

  ncr = vms[0][vm_idx] - cpu    # next core remaining
  usage_rate_next = 1 - (ncr/cpu_cap)
  pwr_st = 0    # reset pwr_st because will reuse this var
  if(usage_rate_next > 0):
    pwr_st = c
  if(usage_rate_next < dy_pwr_threshold):
    pwr_dy = a*dy_pwr_threshold
  else:
    pwr_dy = (a * dy_pwr_threshold) + b*(usage_rate_next - dy_pwr_threshold)*(usage_rate_next - dy_pwr_threshold)
  pwr_next = pwr_st + pwr_dy


  return pwr_next - pwr_curr

In [10]:
def powerConsumption(myVmSpace):
    c = 5
    a = 100
    b = 200
    threshold = 0.5
    
    Pwr_dy = 0
    Pwr_st = 0
    Pwr = 0
    
    for core in myVmSpace[0]:
        coreUsage = 1-core/7
        if(coreUsage > 0):
            Pwr_st = c
        if(coreUsage < threshold):
            Pwr_dy = a*coreUsage
        else:
            Pwr_dy = a*threshold + b*(coreUsage-threshold)*(coreUsage-threshold)
        Pwr = Pwr + Pwr_dy + Pwr_st
    return Pwr

# Greedy Algorithm 2 ways
* Optimize energy consumption
* Optimize cost

In [15]:
def greedy(task_queue, vms, optIdentifier, currTime):
    reject_queue = []
    time_quantum = 
    if (optIdentifier == "power"):
        for index,task in df.iterrows():
            df["TurnaroundTime_Power"] = -1
    elif (optIdentifier == "cost"):
        for index,task in df.iterrows():
            df["TurnaroundTime_Cost"] = -1

    for task_idx in task_queue: 
        minOptDelta = 100000000000
        min_vm_idx = 0
        vm_space_found = False
        
        for vm_idx in range(len(vms[0])):
            if(df['NrmlTaskCores'][task_idx] <= vms[0][vm_idx]) and (df['NrmlTaskMem'][task_idx] <= vms[1][vm_idx]):
                #print("VM {} can fit task {} at time {}".format(vm_idx,task_idx,currTime))
                vm_space_found = True
                if(optIdentifier == "power"):
                    optDelta = powerCalculator(vms,task_idx,vm_idx)
                    if(optDelta < minOptDelta):
                        minOptDelta = optDelta
                        min_vm_idx = vm_idx
                    # Make sure you first run the code that initializes the execution time / turnaround
                    # dataframe before running the greedy algorithm! Do this every time
                    startTime = df['Time'][task_idx]
                    exTime = df['ExecutionTime'][task_idx]
                    if(currTime == startTime + exTime - time_quantum):
                        # Turnaround time is the curr time that the task finishes minus the arrival time
                        finishTime = currTime + time_quantum
                        df['TurnaroundTime_Power'][task_idx] = finishTime - startTime
                elif(optIdentifier == "cost"):
                    powerDelta = powerCalculator(vms,task_idx,vm_idx)
                    prcIdx = int((currTime - 90000)/3600)
                    optDelta = Price_t[prcIdx]*powerDelta
                    if(optDelta < minOptDelta):
                        minOptDelta = optDelta
                        min_vm_idx = vm_idx
                    # Make sure you first run the code that initializes the execution time / turnaround
                    # dataframe before running the greedy algorithm! Do this every time
                    startTime = df['Time'][task_idx]
                    exTime = df['ExecutionTime'][task_idx]
                    if(currTime == startTime + exTime - time_quantum):
                        # Turnaround time is the curr time that the task finishes minus the arrival time
                        finishTime = currTime + time_quantum
                        df['TurnaroundTime_Power'][task_idx] = finishTime - startTime
                        
        if(vm_space_found):
            vms[0][min_vm_idx] = vms[0][min_vm_idx] - df['NrmlTaskCores'][task_idx]
            vms[1][min_vm_idx] = vms[1][min_vm_idx] - df['NrmlTaskMem'][task_idx]
        else:
            reject_queue.append(df['TaskID'][task_idx])
    return reject_queue, vms


# Shortest Remaining First Algorithm
The Shortest Remaining First algorithm is optimal for waiting time and thus turnaround time (assuming task execution time is time-invariant). The challenge in most cases is not knowing the task execution time before execution. However, in this example, we know each task's execution time in advance. 
# There are several steps involved in implimenting SRF. 
1. Create a dictionary containing all vms for all times for which the algorithm will be run. Creating all vms at once allows us to allocate a process to VMs for all of its execution time, not just the current time quantum. 
2. Sort tasks arriving at the current time in a list in ascending order. This way, if we iterate through the sorted tasks, priority will be given to shorter tasks. 
3. Iterate through the sorted tasks implimenting a round robin-like algorithm. If a task can fit in the first VM, then allocate it. Otherwise, go through the next VMs until you find one it can fit in. If there are no VMs that the task can fit in, then append it to the reject queue. Always start the next task at the VM after the last one that was filled. 
4. return the reject queue and vm dictionary. 


In [16]:
def sjf(time_params,time_quantum,vm_size):
    vms = {} #vms for all possible times
    rejectQueue = []
    # Reinitialize Turnaround times
    for index,task in df.iterrows():
        df["TurnaroundTime"] = -1
    # Worst case: vms will be used at the max arrival time + time quantum * 2 (b/c burst time max = 600 and time quantum = 300)
    for time in range(time_params[0],113100 + time_quantum * 2,time_quantum):
        vms[time] = [100*[vm_size[0]],100*[vm_size[1]]]
    for time in range(time_params[0],time_params[1],time_quantum):
        # print(time)
        df_t = df[df['Time'].isin([time])] #get the elements from a certain time
        print("Length of df for time " + str(time) + " is " + str(len(df_t)))
        df_t.sort_values(by=["ExecutionTime"],inplace=True) #sort the elements in ascending order of execution time
        vm_iter = 0
        vm_jter = 0
        ctr = 0
        for index,task in df_t.iterrows():
            ctr += 1
            print("Entered inner for loop, index = " + str(index) + " counter = " + str(ctr) + " time = " + str(time))
            if(task['NrmlTaskCores'] <= vms[time][0][vm_iter]) and (task['NrmlTaskMem'] <= vms[time][1][vm_iter]):
                vms,finTime = sjf_alloc(task,time,vms,vm_iter,time_quantum)
                idx = list(df["TaskID"]).index(task["TaskID"])
                df["TurnaroundTime"][idx] = finTime - df["Time"][idx]
            else: 
                alloc = False
                vm_jter = vm_iter+1
                if(vm_jter == 100):
                    vm_jter = 0
                while(not alloc and vm_jter != vm_iter):
                    if(task['NrmlTaskCores'] <= vms[time][0][vm_jter]) and (task['NrmlTaskMem'] <= vms[time][1][vm_jter]):
                        vms,finTime = sjf_alloc(task,time,vms,vm_jter,time_quantum)
                        idx = list(df["TaskID"]).index(task["TaskID"])
                        df["TurnaroundTime"][idx] = finTime - df["Time"][idx]
                        alloc = True
                    vm_jter = vm_jter+1
                    if(vm_jter == 100):
                        vm_jter = 0
                if alloc == False:
                    rejectQueue.append(task['TaskID'])
            vm_iter = vm_iter + 1
            if (vm_iter == 100):
                vm_iter = 0
    print("Exiting!\n")
    return vms,rejectQueue


In [17]:
def sjf_alloc(task,time,vms,vm_idx,time_quantum):
    exTime = int(task['ExecutionTime'])
    finTime = None
    for t in range(0,exTime,time_quantum):
        vms[time + t][0][vm_idx] = vms[time + t][0][vm_idx] - task['NrmlTaskCores']
        vms[time + t][1][vm_idx] = vms[time + t][1][vm_idx] - task['NrmlTaskMem']
        finTime = time + t + time_quantum
    return vms,finTime

In [18]:
def generateTaskQueues():
    
    taskQueues = {}
    for time in range(90000,113100,300):
        taskQueues[time] = []
    time = 90000
    for taskIdx in range(len(df['Time'])):
        if(df['Time'][taskIdx] == time):
            exTime = df['ExecutionTime'][taskIdx]
            timeCnt = 0
            while(exTime > 0):
                taskQueues[time+timeCnt].append(taskIdx)
                timeCnt = timeCnt + 300
                exTime = exTime - 300
        else:
            time = time + 300
    return taskQueues

In [11]:
taskQueues = generateTaskQueues()

rejectQueue = []
#power optimization
for time in range(90000,90600,300): #113100
    taskQueue_t = taskQueues[time]
    vms = [100*[7],100*[11]]
    #reinitialize VMs for new time
    rejectQueue_t, vms_t = greedy(taskQueue_t,vms,"power", time)
    if(rejectQueue_t != []):
        rejectQueue.extend(rejectQueue_t)
    Pwr_t[time] = powerConsumption(vms_t)
    totalPwr = totalPwr + Pwr_t[time]
    prcIdx = int((time - 90000)/3600)
    Cost_t[time] = Price_t[prcIdx]*Pwr_t[time]
    totalCost = totalCost + Cost_t[time]

print("Power optimization:")
print("\tTotal power:", totalPwr)
print("\tTotal cost: ", totalCost)

print("\t" + str(len(rejectQueue)) + " rejections")
if (len(rejectQueue) > 0):
    rejectFileName = "taskReject_1_power.npy"
    np.save(rejectFileName, rejectQueue)
vmFileName = "VMs_1_power.npy"
np.save(vmFileName, vms)

#cost optimization
totalPwr = 0
totalCost = 0
rejectQueue = []
for time in range(90000,90600,300):
    taskQueue_t = taskQueues[time]
    vms = [100*[7],100*[11]]
    #reinitialize VMs for new time
    rejectQueue_t, vms_t = greedy(taskQueue_t,vms,"cost",time)
    if(rejectQueue_t != []):
        rejectQueue.extend(rejectQueue_t)
    Pwr_t[time] = powerConsumption(vms_t)
    totalPwr = totalPwr + Pwr_t[time]
    prcIdx = int((time - 90000)/3600)
    Cost_t[time] = Price_t[prcIdx]*Pwr_t[time]
    totalCost = totalCost + Cost_t[time]
    
print("Cost optimization:")
print("\tTotal power:", totalPwr)
print("\tTotal cost: ", totalCost)

print("\t" + str(len(rejectQueue)) + " rejections")
if (len(rejectQueue) > 0):
    rejectFileName = "taskReject_1_cost.npy"
    np.save(rejectFileName, rejectQueue)
vmFileName = "VMs_1_cost.npy"
np.save(vmFileName, vms)

totalPwr = 0
totalCost = 0
reject_queue,vms = sjf([90000,90600],300,[7,11])
for time in range(90000,90600,300):
    vms_t = vms[time]
    Pwr_t[time] = powerConsumption(vms_t)
    totalPwr = totalPwr + Pwr_t[time]
    prcIdx = int((time - 90000)/3600)
    Cost_t[time] = Price_t[prcIdx]*Pwr_t[time]
    totalCost = totalCost + Cost_t[time]

print("Turnaround Time optimization:")
print("\tTotal power:", totalPwr)
print("\tTotal cost: ", totalCost)

print("\t" + str(len(rejectQueue)) + " rejections")
if (len(rejectQueue) > 0):
    rejectFileName = "taskReject_1_turnaround.npy"
    np.save(rejectFileName, rejectQueue)
vmFileName = "VMs_1_turnaround.npy"
np.save(vmFileName, vms)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['TurnaroundTime_Power'][task_idx] = finishTime - startTime


Power optimization:
	Total power: 5928.969706632647
	Total cost:  2964.4848533163236
	0 rejections


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['TurnaroundTime_Power'][task_idx] = finishTime - startTime


Cost optimization:
	Total power: 5928.969706632647
	Total cost:  2964.4848533163236
	0 rejections


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return func(*args, **kwargs)


AttributeError: 'NoneType' object has no attribute 'iteritems'

In [20]:
totalPwr = 0
totalCost = 0
vms,reject_queue = sjf([90000,90300],300,[7,11])
for time in range(90000,90300,300):
    vms_t = vms[time]
    Pwr_t[time] = powerConsumption(vms_t)
    totalPwr = totalPwr + Pwr_t[time]
    prcIdx = int((time - 90000)/3600)
    Cost_t[time] = Price_t[prcIdx]*Pwr_t[time]
    totalCost = totalCost + Cost_t[time]

print("Turnaround Time optimization:")
print("\tTotal power:", totalPwr)
print("\tTotal cost: ", totalCost)

turnaroundtime_sum = 0
for index,task in df.iterrows():
    turnaroundtime_sum += task.TurnaroundTime

avg_turnaroundtime = float(turnaroundtime_sum) / float(len(df))
print("\tAverage turnaround time:", avg_turnaroundtime)

print("\t" + str(len(rejectQueue)) + " rejections")
if (len(rejectQueue) > 0):
    rejectFileName = "taskReject_1_turnaround.npy"
    np.save(rejectFileName, rejectQueue)
vmFileName = "VMs_1_turnaround.npy"
np.save(vmFileName, vms)

300.0
600.0
300.0
300.0
600.0
300.0
600.0
300.0
600.0
600.0
600.0
300.0
300.0
300.0
300.0
600.0
300.0
600.0
300.0
600.0
300.0
300.0
300.0
600.0
300.0
300.0
600.0
600.0
300.0
600.0
300.0
300.0
300.0
600.0
300.0
300.0
600.0
300.0
600.0
300.0
300.0
600.0
600.0
300.0
300.0
300.0
300.0
300.0
300.0
600.0
300.0
300.0
600.0
300.0
300.0
300.0
300.0
600.0
600.0
300.0
600.0
300.0
600.0
300.0
600.0
600.0
600.0
600.0
300.0
300.0
600.0
600.0
300.0
300.0
600.0
600.0
300.0
600.0
300.0
600.0
300.0
300.0
300.0
600.0
300.0
600.0
600.0
600.0
600.0
300.0
600.0
600.0
300.0
300.0
300.0
600.0
600.0
600.0
300.0
600.0
300.0
300.0
300.0
600.0
300.0
600.0
300.0
600.0
600.0
600.0
600.0
600.0
600.0
600.0
600.0
300.0
300.0
300.0
600.0
300.0
600.0
600.0
300.0
600.0
300.0
600.0
600.0
600.0
600.0
300.0
600.0
600.0
300.0
300.0
300.0
600.0
300.0
600.0
600.0
300.0
300.0
600.0
300.0
300.0
300.0
600.0
600.0
600.0
300.0
600.0
300.0
300.0
300.0
600.0
600.0
600.0
600.0
300.0
600.0
300.0
300.0
300.0
300.0
600.0
300.0
600.0
300.

KeyboardInterrupt: 