In [1]:
import json
import datetime
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
import time

### Creating Datafame 

In [137]:
def cluster_df(data):
    '''
    function to create dataframe for clusters data

    input:
        data (json): input data
    output:
        clusters_pd (pandas dataframe)
    '''
    
    # Creating Dataframe for clusters
    nested_cols = ['workflowInfo']

    col_names1 = list(data["clusters"][next(iter(data["clusters"]))].keys())
    col_names2 = list(data["clusters"][next(iter(data["clusters"]))]['workflowInfo'].keys())
    col_names = list(set(col_names1 + col_names2))
    col_names.remove('workflowInfo')

    clusters_pd = pd.DataFrame(columns=col_names)
    for i in range(len(data["clusters"].keys())):
        temp_dict = {}
        cluster = list(data["clusters"].keys())[i]
        for col1 in col_names1:
            if col1 not in nested_cols:
                temp_dict[col1] = data["clusters"][cluster][col1]
        for col2 in col_names2:
            if data["clusters"][cluster]['workflowInfo'] is not None:
                if (col2 not in nested_cols) and (col2 not in temp_dict.keys()):
                    temp_dict[col2] = data["clusters"][cluster]['workflowInfo'][col2]
        clusters_pd.loc[i] = pd.Series(temp_dict)

    # dropping unnecessary columns
    clusters_pd = clusters_pd.drop(columns=['authenticated', 'clusterUsageTypes', 'runType'])

    return clusters_pd

In [138]:
def contentionTimelines_df(data):
    
    '''
    function to create dataframe for contentionTimeline
    
    input:
        data (json): input data
    output:
        clusters_pd (pandas dataframe)
    '''
    
    nested_cols = ['idleCostSegment', 'clusterSegments']
    col_names1 = list(data["contentionTimelines"][0].keys())
    col_names2 = list(data["contentionTimelines"][0]['idleCostSegment'].keys())
    col_names3 = list(data["contentionTimelines"][0]['clusterSegments'][0].keys())
    col_names = col_names1 + col_names2 + col_names3

    df_dict = {}
    cnt=-1
    for i in range(len(data["contentionTimelines"])):
        input_dict = data["contentionTimelines"][i]
        temp_dict = {}

        for col1 in col_names1:
            if col1 not in nested_cols:
                temp_dict[col1]=input_dict[col1]

        for col2 in col_names2:
            if col2 not in nested_cols:
                temp_dict[col2]=input_dict['idleCostSegment'][col2]

        if len(input_dict['clusterSegments'])>0:        
            for diction in input_dict['clusterSegments']:
                level2_keys = diction.keys()
                cnt += 1
                df_dict[cnt] = {}

                for key in level2_keys:
                    df_dict[cnt][key] = diction[key]

                for k in temp_dict.keys():
                    df_dict[cnt][k] = temp_dict[k]
        else:
            cnt += 1
            df_dict[cnt] = {}
            for key in level2_keys:
                df_dict[cnt][key] = None
            for k in temp_dict.keys():
                df_dict[cnt][k] = temp_dict[k]

    contentionTimelines_pd = pd.DataFrame.from_dict(df_dict, columns=df_dict[1].keys(), orient='index')

    return contentionTimelines_pd


## Step 1: Extracting job start and end times

In [139]:
def cluster_timeline_df(cluster_contention_jc_pd):
    cluster_timelines_pd = cluster_contention_jc_pd.groupby(['clusterId']).agg({'startUnixTime':min,    # Sum duration per group
                                                         'endUnixTime': max,  # get the count of networks
                                                        }).reset_index()
    return cluster_timelines_pd.sort_values('startUnixTime')

## Step 2: Grouping jobs based on their timeline overlap

In [140]:
# grouper function has minor updates - Oct 31
# this is the updated grouper function
def grouper(cluster_timelines_pd, auto_shutdown):
    cluster_groups = {}
    cnt = 0

    for cluster_id in cluster_timelines_pd['clusterId'].values:
        cluster_temp = cluster_timelines_pd[cluster_timelines_pd.clusterId==cluster_id]

        if cnt == 0:
            cnt += 1
            cluster_groups[cnt] = {}
            cluster_groups[cnt]['startUnixTime_group'] = cluster_temp['startUnixTime'].values[0]
            cluster_groups[cnt]['endUnixTime_group'] = cluster_temp['endUnixTime'].values[0] + 60*auto_shutdown
            cluster_groups[cnt]['cluster_ids_group'] = [cluster_temp['clusterId'].values[0]]
        else:        
            same_group = False
            group_starttime = cluster_groups[cnt]['startUnixTime_group']
            group_endtime = cluster_groups[cnt]['endUnixTime_group']

            if group_starttime <= cluster_temp['startUnixTime'].values[0] <= group_endtime:
                same_group=True
            if group_starttime <= cluster_temp['endUnixTime'].values[0] <= group_endtime:
                same_group=True

            if same_group:
                # updating the same group
                cluster_groups[cnt]['cluster_ids_group'].append(cluster_temp['clusterId'].values[0])  
                cluster_groups[cnt]['startUnixTime_group'] = min(group_starttime, cluster_temp['startUnixTime'].values[0])
                cluster_groups[cnt]['endUnixTime_group'] = max(group_endtime, cluster_temp['endUnixTime'].values[0]+60*auto_shutdown)
            else:
                # creating new group
                cnt += 1
                cluster_groups[cnt] = {}
                cluster_groups[cnt]['startUnixTime_group'] = cluster_temp['startUnixTime'].values[0]
                cluster_groups[cnt]['endUnixTime_group'] = cluster_temp['endUnixTime'].values[0] + 60*auto_shutdown
                cluster_groups[cnt]['cluster_ids_group'] = [cluster_temp['clusterId'].values[0]]

                    
    return cluster_groups

## Step 3: Calculating time intervals between groups

In [141]:
def group_intervals(cluster_groups):
    
    n_groups = len(cluster_groups.keys())
    total_groups_distance = 0
    duration_include =1*24*3600 # (one day) (better to be multiplication of days)
    pool_24_7_on = False # if True > pool is 24/7 on
    group_to_exclude = []
    
    for key in cluster_groups.keys():
        cluster_groups[key]["group_duration"]=(cluster_groups[key]['endUnixTime_group'] -
                                               cluster_groups[key]['startUnixTime_group'])
        # calculating groups time intervals
        if key < n_groups:
            cluster_groups[key]["group_distance"]=(cluster_groups[key+1]['startUnixTime_group'] -
                                                   cluster_groups[key]['endUnixTime_group'])
            total_groups_distance += cluster_groups[key]["group_distance"]
        else:
            cluster_groups[key]["group_distance"]=None

        # if the group falls out of the intended duration (duration_include)
        if (cluster_groups[key]['startUnixTime_group'] - cluster_groups[1]['startUnixTime_group']) > duration_include:
            group_to_exclude.append(key)
            total_groups_distance -= cluster_groups[key-1]["group_distance"]

    if len(group_to_exclude)>0:
        for key in group_to_exclude:
            del cluster_groups[key]
        # updating n_groups
        n_groups = len(cluster_groups.keys())

    if pool_24_7_on:
        total_groups_distance += duration_include - (cluster_groups[n_groups]['endUnixTime_group'] - cluster_groups[1]['startUnixTime_group'])
        
        
    return total_groups_distance, cluster_groups

## Adding group number to cluster_timelines_pd from Step 1 to be used in step 4 b

In [142]:
def cluster_timeline_update(cluster_timelines_pd, cluster_groups):
    '''
    function to update the cluster_timeline_pd dataframe to include group numbers
    '''
    cluster_timelines_pd['group_number'] = [0]*len(cluster_timelines_pd)

    for key in cluster_groups.keys():
        for cluster_id in cluster_groups[key]['cluster_ids_group']:
            cluster_timelines_pd.loc[cluster_timelines_pd.clusterId==cluster_id, 'group_number']= key

    # Note: if group_number==0 the cluster falls out of the intended duration (duration_include)
    cluster_timelines_pd = cluster_timelines_pd[cluster_timelines_pd.group_number>0].sort_values('startUnixTime')
    
    
    return cluster_timelines_pd

## Step 4: Calculating ivm-sec index of each group

In [48]:
def idle_vm_find2(input_df):
    '''
    function to find vms as soon as they become idle at cluster level
    outputs:
        idle_vm_: info of vms and the time they become idle
        release_phase: a part of dataframe after initial idle vm release
                       which has potential of more idle vms
    '''
    idle_vm_ = pd.DataFrame()
    df_max = input_df[input_df.vmCount==input_df['vmCount'].max()]
    df_max_latest = df_max[df_max.endUnixTime==df_max['endUnixTime'].max()]
    release_phase = input_df[input_df.startUnixTime >= df_max_latest.endUnixTime.values[0]]

    if len(release_phase)>0:
        idle_vm_['idle_vm_count'] = df_max_latest['vmCount'].values[0]-release_phase['vmCount'].max()
        idle_vm_['availability_startUinxTime'] = df_max_latest['endUnixTime'].values[0]
        idle_vm_['clusterId'] = df_max_latest['clusterId'].values[0]
    else:
        release_phase = []
        idle_vm_['idle_vm_count'] = df_max_latest['vmCount'] #If no nodes after => all the vms will be idle
        idle_vm_['availability_startUinxTime'] = df_max_latest['endUnixTime']
        idle_vm_['clusterId'] = df_max_latest['clusterId']
        
    return idle_vm_, release_phase

In [145]:
def idle_vm_find(input_df):
    '''
    function to find vms as soon as they become idle at cluster level
    outputs:
        idle_vm_: info of vms and the time they become idle
        release_phase: a part of dataframe after initial idle vm release
                       which has potential of more idle vms
    '''
    idle_vm_ = pd.DataFrame()
    df_max = input_df[input_df.vmCount==input_df['vmCount'].max()]
    df_max_latest = df_max[df_max.endUnixTime==df_max['endUnixTime'].max()]
    release_phase = input_df[input_df.startUnixTime >= df_max_latest.endUnixTime.values[0]]

    if len(release_phase)>0:
        idle_vm_['idle_vm_count'] = df_max_latest['vmCount'].values[0]-release_phase['vmCount'][release_phase.startUnixTime==release_phase['startUnixTime'].min()]
        idle_vm_['availability_startUinxTime'] = df_max_latest['endUnixTime'].values[0]
        idle_vm_['clusterId'] = df_max_latest['clusterId'].values[0]
    else:
        release_phase = []
        idle_vm_['idle_vm_count'] = df_max_latest['vmCount'] #If no nodes after => all the vms will be idle
        idle_vm_['availability_startUinxTime'] = df_max_latest['endUnixTime']
        idle_vm_['clusterId'] = df_max_latest['clusterId']
        
    return idle_vm_, release_phase

In [49]:
def idle_df2(cluster_groups, cluster_contention_jc_pd):
    
    appended_data = []

    for key in cluster_groups.keys():
        for cluster_id in cluster_groups[key]['cluster_ids_group']:
            #cnt = 0
            release_phase = ['random']

            cluster_df = cluster_contention_jc_pd[cluster_contention_jc_pd.clusterId==cluster_id].sort_values('startUnixTime') 

            while len(release_phase)>0:
                idle_vm_, release_phase = idle_vm_find2(cluster_df)
                cluster_df = release_phase
                idle_vm_['group_number'] = [key]*len(idle_vm_)
                appended_data.append(idle_vm_) 
                #cnt+=1
                #print(cnt, cluster_id)

    idle_vm_df = pd.concat(appended_data).reset_index(drop=True)  

    return idle_vm_df

In [146]:
def idle_df(cluster_groups, cluster_contention_jc_pd):
    
    appended_data = []

    for key in cluster_groups.keys():
        for cluster_id in cluster_groups[key]['cluster_ids_group']:
            #cnt = 0
            release_phase = ['random']

            cluster_df = cluster_contention_jc_pd[cluster_contention_jc_pd.clusterId==cluster_id].sort_values('startUnixTime') 

            while len(release_phase)>0:
                idle_vm_, release_phase = idle_vm_find(cluster_df)
                cluster_df = release_phase
                idle_vm_['group_number'] = [key]*len(idle_vm_)
                appended_data.append(idle_vm_) 
                #cnt+=1
                #print(cnt, cluster_id)

    idle_vm_df = pd.concat(appended_data).reset_index(drop=True)  

    return idle_vm_df

## Creating idle vm df with one idle vm per row (needed for next parts)

In [1]:
# updated on Nov 12
def idle_one_df(idle_vm_df):

    idle_vm_one_df = pd.DataFrame(columns=idle_vm_df.columns)
    idle_vm_df2 = idle_vm_df.copy()
    cnt=0

    for i in idle_vm_df.index:
        temp_list = list(idle_vm_df.loc[i].values)
        idle_vm_df2.loc[idle_vm_df2.index==i, 'idle_vm_count'] = 1
        for j in range(int(temp_list[0])):
            idle_vm_one_df.loc[cnt] = idle_vm_df2.loc[i]
            cnt+=1

    # by default the idle vm will stay on till auto_shutdown time (defined in step 2) uless resued        
    idle_vm_one_df['availability_endUinxTime'] = idle_vm_one_df['availability_startUinxTime'] + auto_shutdown*60

    # adding local vm id
    idle_vm_one_df['local_vm_id'] = idle_vm_one_df.index + 11 # (11 is just a random starter id)

    # adding vm reused status
    idle_vm_one_df['vm_reused'] = [False] * len(idle_vm_one_df) 

    # sorting by availability time
    idle_vm_one_df.sort_values('availability_startUinxTime', inplace=True)

    return idle_vm_one_df

## Step 4 - part B

In [7]:
def vm_usage(cluster_groups, cluster_timelines_pd, idle_vm_one_df, cluster_contention_jc_pd):
    
    t1 = time.time()

    vm_usage_classes = {}

    for key in cluster_groups.keys():

        # group level parameters
        warm_start_cnt_g_ = 0
        cold_start_cnt_g_ = 0
        cold_start_cnt_parallel_g_ = 0
        vm_usage_classes[key] = {}
        print(key)

        idle_vm_df_g = idle_vm_one_df[(idle_vm_one_df.group_number==key) & (idle_vm_one_df.vm_reused==False)]
        # idle_vm_df_g.sort_values('availability_startUinxTime')

        cluster_timelines_df_g = cluster_timelines_pd[cluster_timelines_pd.group_number==key]
        cluster_timelines_df_g.sort_values('startUnixTime', inplace=True)


        for cluster_id in cluster_timelines_df_g['clusterId'].values:
            # print(cluster_id)

            # idle vms won't be used by the same cluster from where they are relased
            idle_vm_df_c = idle_vm_df_g[(idle_vm_df_g.clusterId!=cluster_id) & (idle_vm_df_g.vm_reused==False)]


            cluster_df = (
                cluster_contention_jc_pd[cluster_contention_jc_pd.clusterId==cluster_id]
                .sort_values('startUnixTime')) 

            # number of vms already allocated to the current cluster
            n_vm_c_aloc = 0

            for i in range(len(cluster_df)):


               # number of vms needed at current cluster at current time
                n_vm_c_need = int(cluster_df.iloc[i].vmCount)

                # look if there is any idle vms available
                idle_vm_df_c = idle_vm_df_c[idle_vm_df_c.vm_reused == False]
                idles = []
                idles = idle_vm_df_c[(idle_vm_df_c.availability_startUinxTime < cluster_df.iloc[i].startUnixTime) &
                              (cluster_df.iloc[i].startUnixTime < idle_vm_df_c.availability_endUinxTime)]


                parallel = False
                # if len(idles)>0:
                # only if we have less allocated vms than required
                if n_vm_c_need > n_vm_c_aloc:
                    for v in range(n_vm_c_need - n_vm_c_aloc):
                        if v < len(idles):
                            idle_ind = idle_vm_one_df[idle_vm_one_df.local_vm_id == idles.iloc[v]['local_vm_id']].index
                            idle_vm_one_df.loc[idle_ind, 'availability_endUinxTime'] = cluster_df.iloc[i].startUnixTime
                            idle_vm_df_g.loc[idle_vm_df_g.local_vm_id == idles.iloc[v]['local_vm_id'],
                                             'vm_reused'] = True
                            idle_vm_df_c.loc[idle_vm_df_c.local_vm_id == idles.iloc[v]['local_vm_id'],
                                             'vm_reused'] = True
                            idle_vm_one_df.loc[idle_ind, 'vm_reused'] = True
                            warm_start_cnt_g_ += 1
                        else:
                            if parallel:
                                cold_start_cnt_parallel_g_ += 1
                            # when there is no idle vm or all available idle vms are already re-allocated
                            cold_start_cnt_g_ += 1
                            parallel = True

                        n_vm_c_aloc +=1
                    # Note: sumwhere here you should count the total active vm at pool for effect of max vm
                    # but think about it more 

                    # Note: if n_vm_c_need < n_vm_c_aloc then we will get idle vm here
                    # in the next version you can make the idle vm detection at this step for speed ups
                    # and detecting idle vms that stay idle shortly but will be reused by the same cluster later
                    # (this part should be added only if needed i.e when we know that if a vm is done it will be 
                    # immidialtely released to the pool eventhough it will be reused later by the same cluster Q1
                    # on page B24)

        # updating level parameters
        vm_usage_classes[key]['cold_start_cnt'] = cold_start_cnt_g_
        vm_usage_classes[key]['warm_start_cnt'] = warm_start_cnt_g_
        vm_usage_classes[key]['cold_start_cnt_parallel'] = cold_start_cnt_parallel_g_

    t2 = time.time()
    print('duration is: ', t2-t1)

    ## Part e: calculating idle vm second (ivm-sec) index of each group
    idle_vm_one_df['idle_duration'] = (idle_vm_one_df['availability_endUinxTime'] 
                                       - idle_vm_one_df['availability_startUinxTime'])

    return vm_usage_classes, idle_vm_one_df

In [None]:
def vm_usage_v3(cluster_groups, cluster_timelines_pd, idle_vm_one_df, cluster_contention_jc_pd):

    t1 = time.time()

    vm_usage_classes = {}

    for key in cluster_groups.keys():

        # group level parameters
        warm_start_cnt_g_ = 0
        cold_start_cnt_g_ = 0
        cold_start_cnt_parallel_g_ = 0
        vm_usage_classes[key] = {}
        print('**************** group number is', key)

        idle_vm_df_g = idle_vm_one_df[(idle_vm_one_df.group_number==key) & (idle_vm_one_df.vm_reused==False)]
        # idle_vm_df_g.sort_values('availability_startUinxTime')

        cluster_timelines_df_g = cluster_timelines_pd[cluster_timelines_pd.group_number==key]
        cluster_timelines_df_g.sort_values('startUnixTime', inplace=True)


        for cluster_id in cluster_timelines_df_g['clusterId'].values:
            # print(cluster_id)

            # idle vms won't be used by the same cluster from where they are relased
            idle_vm_df_c = idle_vm_df_g[(idle_vm_df_g.clusterId!=cluster_id) & (idle_vm_df_g.vm_reused==False)]


            cluster_df = (
                cluster_contention_jc_pd[cluster_contention_jc_pd.clusterId==cluster_id]
                .sort_values('startUnixTime')) 

            # number of vms already allocated to the current cluster
            n_vm_c_aloc = 0
            
            # max number of vms needed for the current cluster
            n_vm_max = cluster_df['vmCount'].max()
            print(n_vm_max)
            
            for i in range(len(cluster_df)):


               # number of vms needed at current cluster at current time
                n_vm_c_need = int(cluster_df.iloc[i].vmCount)
                
               # when all the vms for current cluster are already started
                if n_vm_c_need <= n_vm_c_aloc:
                    continue
                    
               # when max number of vms are already allocated   
                if n_vm_max == n_vm_c_aloc:
                    break
                    
                # look if there is any idle vms available
                idle_vm_df_c = idle_vm_df_c[idle_vm_df_c.vm_reused == False]
                idles = []
                idles = idle_vm_df_c[(idle_vm_df_c.availability_startUinxTime < cluster_df.iloc[i].startUnixTime) &
                              (cluster_df.iloc[i].startUnixTime < idle_vm_df_c.availability_endUinxTime)]
                # print('#len idles is', len(idles))

                parallel = False
                # if len(idles)>0:
                # only if we have less allocated vms than required
                if n_vm_c_need > n_vm_c_aloc:
                    for v in range(n_vm_c_need - n_vm_c_aloc):
                        # print('difference is:', n_vm_c_need - n_vm_c_aloc)
                        if v < len(idles):
                            # idle_ind = idle_vm_one_df[idle_vm_one_df.local_vm_id == idles.iloc[v]['local_vm_id']].index
                            local_vm_id = idles.iloc[v]['local_vm_id']
                            # print('lvi is', local_vm_id)
                            idle_vm_one_df.loc[idle_vm_one_df.local_vm_id==local_vm_id, 'availability_endUinxTime'] = cluster_df.iloc[i].startUnixTime
                            idle_vm_df_g.loc[idle_vm_df_g.local_vm_id == local_vm_id, 'vm_reused'] = True
                            idle_vm_df_c.loc[idle_vm_df_c.local_vm_id == local_vm_id, 'vm_reused'] = True
                            idle_vm_one_df.loc[idle_vm_one_df.local_vm_id == local_vm_id, 'vm_reused'] = True
                            warm_start_cnt_g_ += 1
                        else:
                            if parallel:
                                cold_start_cnt_parallel_g_ += 1
                            # when there is no idle vm or all available idle vms are already re-allocated
                            cold_start_cnt_g_ += 1
                            parallel = True

                        n_vm_c_aloc +=1

        # updating level parameters
        vm_usage_classes[key]['cold_start_cnt'] = cold_start_cnt_g_
        vm_usage_classes[key]['warm_start_cnt'] = warm_start_cnt_g_
        vm_usage_classes[key]['cold_start_cnt_parallel'] = cold_start_cnt_parallel_g_

    t2 = time.time()
    print('duration is: ', t2-t1)

    ## Part e: calculating idle vm second (ivm-sec) index of each group
    idle_vm_one_df['idle_duration'] = (idle_vm_one_df['availability_endUinxTime'] 
                                       - idle_vm_one_df['availability_startUinxTime'])

    return vm_usage_classes, idle_vm_one_df

In [8]:
def vm_usage_v2(cluster_groups, cluster_timelines_pd, idle_vm_one_df, cluster_contention_jc_pd):

    t1 = time.time()

    vm_usage_classes = {}

    for key in cluster_groups.keys():

        # group level parameters
        warm_start_cnt_g_ = 0
        cold_start_cnt_g_ = 0
        cold_start_cnt_parallel_g_ = 0
        vm_usage_classes[key] = {}
        print('**************** group number is', key)

        idle_vm_df_g = idle_vm_one_df[(idle_vm_one_df.group_number==key) & (idle_vm_one_df.vm_reused==False)]
        # idle_vm_df_g.sort_values('availability_startUinxTime')

        cluster_timelines_df_g = cluster_timelines_pd[cluster_timelines_pd.group_number==key]
        cluster_timelines_df_g.sort_values('startUnixTime', inplace=True)


        for cluster_id in cluster_timelines_df_g['clusterId'].values:
            # print(cluster_id)

            # idle vms won't be used by the same cluster from where they are relased
            idle_vm_df_c = idle_vm_df_g[(idle_vm_df_g.clusterId!=cluster_id) & (idle_vm_df_g.vm_reused==False)]


            cluster_df = (
                cluster_contention_jc_pd[cluster_contention_jc_pd.clusterId==cluster_id]
                .sort_values('startUnixTime')) 

            # number of vms already allocated to the current cluster
            n_vm_c_aloc = 0

            for i in range(len(cluster_df)):


               # number of vms needed at current cluster at current time
                n_vm_c_need = int(cluster_df.iloc[i].vmCount)

                # look if there is any idle vms available
                idle_vm_df_c = idle_vm_df_c[idle_vm_df_c.vm_reused == False]
                idles = []
                idles = idle_vm_df_c[(idle_vm_df_c.availability_startUinxTime < cluster_df.iloc[i].startUnixTime) &
                              (cluster_df.iloc[i].startUnixTime < idle_vm_df_c.availability_endUinxTime)]
                # print('#len idles is', len(idles))

                parallel = False
                # if len(idles)>0:
                # only if we have less allocated vms than required
                if n_vm_c_need > n_vm_c_aloc:
                    for v in range(n_vm_c_need - n_vm_c_aloc):
                        # print('difference is:', n_vm_c_need - n_vm_c_aloc)
                        if v < len(idles):
                            # idle_ind = idle_vm_one_df[idle_vm_one_df.local_vm_id == idles.iloc[v]['local_vm_id']].index
                            local_vm_id = idles.iloc[v]['local_vm_id']
                            # print('lvi is', local_vm_id)
                            idle_vm_one_df.loc[idle_vm_one_df.local_vm_id==local_vm_id, 'availability_endUinxTime'] = cluster_df.iloc[i].startUnixTime
                            idle_vm_df_g.loc[idle_vm_df_g.local_vm_id == local_vm_id, 'vm_reused'] = True
                            idle_vm_df_c.loc[idle_vm_df_c.local_vm_id == local_vm_id, 'vm_reused'] = True
                            idle_vm_one_df.loc[idle_vm_one_df.local_vm_id == local_vm_id, 'vm_reused'] = True
                            warm_start_cnt_g_ += 1
                        else:
                            if parallel:
                                cold_start_cnt_parallel_g_ += 1
                            # when there is no idle vm or all available idle vms are already re-allocated
                            cold_start_cnt_g_ += 1
                            parallel = True

                        n_vm_c_aloc +=1

        # updating level parameters
        vm_usage_classes[key]['cold_start_cnt'] = cold_start_cnt_g_
        vm_usage_classes[key]['warm_start_cnt'] = warm_start_cnt_g_
        vm_usage_classes[key]['cold_start_cnt_parallel'] = cold_start_cnt_parallel_g_

    t2 = time.time()
    print('duration is: ', t2-t1)

    ## Part e: calculating idle vm second (ivm-sec) index of each group
    idle_vm_one_df['idle_duration'] = (idle_vm_one_df['availability_endUinxTime'] 
                                       - idle_vm_one_df['availability_startUinxTime'])

    return vm_usage_classes, idle_vm_one_df

In [None]:
def vm_usage_v4(cluster_groups, cluster_timelines_pd, cluster_contention_jc_pd, auto_shutdown, verbose=False):

    cluster_group_pd = cluster_timelines_pd[['clusterId', 'group_number']]
    cluster_contention_jc_pd = pd.merge(cluster_contention_jc_pd, cluster_group_pd, on='clusterId')
    cluster_contention_jc_pd.drop(['clusterType', 'costActive', 'activeNodes', 'idleNodes', 'maxCapacity', 'cost'], axis=1,inplace=True)

    vm_warm_cnt_total = 0
    vm_cold_cnt_total = 0
    vm_usage_classes = {}


    for key in cluster_groups.keys():

            # group level parameters
            warm_start_cnt_g_ = 0
            cold_start_cnt_g_ = 0
            vm_usage_classes[key] = {}


            cluster_df = cluster_contention_jc_pd[cluster_contention_jc_pd.group_number==key].sort_values(['clusterId','startUnixTime']) 

            # finding drops in the number of vms (meaning those vms becoming idle)
            cluster_df['vm_scale'] = cluster_df['vmCount'] - cluster_df['vmCount'].shift(-1)
            cluster_df['time_diff_fw'] = cluster_df['startUnixTime'] - cluster_df['endUnixTime'].shift(+1)
            cluster_df['time_diff_bw'] = cluster_df['endUnixTime'] - cluster_df['startUnixTime'].shift(-1)

            # at the start of each cluster your vm_scale is equal to the vmCount (new Vms count needed)
            # time_diff_fw: for cluster starts >> vm_scale will be needed to cold start (negative)(by default)
            # time_diff_bw: for cluster ends >> vm_scale will be idle (positive)
            # Note: Sequence of the next two lines matters. first bw then fw
            cluster_df['vm_scale'][cluster_df.time_diff_bw!=0] = cluster_df['vmCount']
            cluster_df['vm_scale'][cluster_df.time_diff_fw!=0] = -cluster_df['vmCount']

            vm_pos = cluster_df[cluster_df.vm_scale > 0]
            vm_neg = cluster_df[cluster_df.vm_scale < 0]
            idle_vm_count_g_ = vm_pos['vm_scale'].sum()

            # NOTE: endUnixTime is when the change happens due to how diff was made
            vm_pos['start_availability_time'] = vm_pos['endUnixTime']
            vm_pos['end_availability_time'] = vm_pos['endUnixTime'] + 60*auto_shutdown
            vm_pos['idle_duration'] = [0]*len(vm_pos)
            vm_pos['vm_reused_cnt'] = [0]*len(vm_pos)
            vm_pos.reset_index(inplace=True)

            # adding local id
            vm_neg.reset_index(inplace=True)
            vm_neg['local_id'] = vm_neg.index + 11 # (11 is just a random starter id)


            for i in range(len(vm_pos)):
                vm_p_cnt = vm_pos.iloc[i]['vm_scale']
                cold_starts = vm_neg.loc[(vm_neg.vm_scale<0) & (vm_pos.iloc[i].start_availability_time < vm_neg.endUnixTime) & 
                               (vm_neg.endUnixTime < vm_pos.iloc[i].end_availability_time)]
                local_ids = cold_starts['local_id'].values
                cold_starts_len = len(local_ids)

                if cold_starts_len>0:
                    vm_warm_cnt = 0
                    cnt = 0
                    while vm_warm_cnt < vm_p_cnt:
                        if cnt < cold_starts_len:
                            true_vm = min(abs(cold_starts.iloc[cnt]['vm_scale']), min(vm_p_cnt, vm_p_cnt-vm_warm_cnt))
                            cold_starts.loc[cold_starts.local_id==local_ids[cnt],'vm_scale'] += true_vm
                            idle_duration = cold_starts.iloc[cnt]['endUnixTime'] - vm_pos.iloc[i].start_availability_time
                            vm_pos.loc[vm_pos.index==i,'idle_duration'] += idle_duration*true_vm
                            vm_pos.loc[vm_pos.index==i,'vm_reused_cnt'] += true_vm
                            vm_warm_cnt += true_vm
                            vm_warm_cnt_total += true_vm
                            vm_neg.loc[vm_neg.local_id==local_ids[cnt],'vm_scale'] += true_vm
                            cnt += 1
                        else:
                            break


            cold_start_cnt_g_ = abs(vm_neg['vm_scale'].sum())
            warm_start_cnt_g_ = vm_pos['vm_reused_cnt'].sum()
            vm_cold_cnt_total += cold_start_cnt_g_

            # calculating idle_duration                
            vm_pos['idle_duration'] += (vm_pos['vm_scale'] - vm_pos['vm_reused_cnt'])*auto_shutdown*60
            idle_duration_g_ = vm_pos['idle_duration'].sum()
            # when there is only one record for the current group >> start and end 
            # of the cluster happens at the same row
            if len(cluster_df)==1:
                idle_duration_g_ = cold_start_cnt_g_*auto_shutdown*60

            if verbose:
                print(f'***Group Number is {key}')
                print('total cold starts:', cold_start_cnt_g_)
                print('total warm starts:', warm_start_cnt_g_)
                print('total idle duration is:', idle_duration_g_)

            # updating level parameters
            vm_usage_classes[key]['cold_start_cnt'] = cold_start_cnt_g_
            vm_usage_classes[key]['warm_start_cnt'] = warm_start_cnt_g_
            vm_usage_classes[key]['idle_duration'] = idle_duration_g_
            vm_usage_classes[key]['idle_vm_count'] = idle_vm_count_g_


        
    return vm_usage_classes

In [2]:
def groups_stats(idle_vm_one_df, vm_usage_classes):
    
    '''
    function to provide groups summary and stats
    '''
    
    idle_vm_one_df_summary = idle_vm_one_df.groupby('group_number').agg({'idle_duration': sum,
                                               'idle_vm_count': sum,
                                              'vm_reused': sum}).reset_index()
    
    vm_usage_classes_df = pd.DataFrame(vm_usage_classes).transpose()
    vm_usage_classes_df['group_number'] = vm_usage_classes_df.index

    groups_summary = idle_vm_one_df_summary.merge(vm_usage_classes_df, on='group_number', how='inner')
    
    return groups_summary

In [None]:
def groups_stats_v2(vm_usage_classes):
    
    '''
    function to provide groups summary and stats
    '''
    
    groups_summary = pd.DataFrame(vm_usage_classes).transpose()
    groups_summary['group_number'] = groups_summary.index

    
    return groups_summary


## Part g: add effects of min_idle_vm

In [150]:
def min_idle_vm_effect(min_idle_vm, total_idle_time, total_groups_distance, groups_summary):
    if min_idle_vm> 0:

        # effects of min_idle_vm on total idle time
        total_idle_time += total_groups_distance*min_idle_vm 
        print(f'total_idle_time is {total_idle_time} sec')

        # effects of min_idle_vm on vm start types counts and groups_summary df
        groups_summary['vm_reused'] += min_idle_vm
        groups_summary['cold_start_cnt'] -= min_idle_vm
        groups_summary['warm_start_cnt'] += min_idle_vm
        if min_idle_vm > 1:
            groups_summary['cold_start_cnt_parallel'] -= min_idle_vm-1
    # Note: min_idle_vm will only affects the first cluster of each group
    
    return total_idle_time, groups_summary

In [None]:
def min_idle_vm_effect_v2(min_idle_vm, total_idle_time_org, total_groups_distance, groups_summary_org):
    
    groups_summary = groups_summary_org.copy()
    total_idle_time = total_idle_time_org
    if min_idle_vm> 0:
        # effects of min_idle_vm on total idle time
        total_idle_time += total_groups_distance*min_idle_vm 

        # effects of min_idle_vm on vm start types counts and groups_summary df
        groups_summary['cold_start_cnt'] -= min_idle_vm
        groups_summary['warm_start_cnt'] += min_idle_vm
        groups_summary['cold_start_cnt'][groups_summary.cold_start_cnt<0]=0
    # Note: min_idle_vm will only affects the first cluster of each group
    
    return total_idle_time, groups_summary

In [151]:
def improvements(greedy_df, base_pool_config):
    
    opt_config = {}
    
    opt_df = greedy_df[greedy_df.penalization == greedy_df.penalization.min()]
    base_df = greedy_df[(greedy_df.auto_shutdown == base_pool_config['auto_shutdown']) & (greedy_df.min_idle_vm == base_pool_config['min_idle_vm'])]
    
    # percent improvements
    percent_improvements = (base_df['penalization'].values[0] - opt_df['penalization'].values[0]) / base_df['penalization'].values[0]
    
    return np.round(100*percent_improvements,1)

In [152]:
def optimal_config(greedy_df, max_setup_time=300):
    
    # max_setup_time = 300 # five minutes
    opt_config = {}
    greedy_df['penalization']=(greedy_df.cold_start_cnt*max_setup_time + greedy_df.total_idle_time)/greedy_df.idle_vm_count
    
    opt_df = greedy_df[greedy_df.penalization == greedy_df.penalization.min()]
    opt_config['auto_shutdown'] = opt_df['auto_shutdown'].values[0]
    opt_config['min_idle_vm'] = opt_df['min_idle_vm'].values[0]
    
    return opt_config  # you have to add greedy_df here since you will use it in improvements