# Results of simulation of different fairness policies

These experiments use accumulated deficits to try to ensure that applications always receive their computed allocation of GPU time, even in the event of new jobs coming in and old jobs finishing, by keeping track of the difference between the GPU time the application should have received, and the GPU time the application actually received. Allocation of jobs to GPUs is performed in a round-based fashion, with GPUs instructed to run jobs for a fixed interval of time on all GPUs.

# Import statements

In [1]:
# Imports for plotting.
from matplotlib import pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import matplotlib.lines as mlines
import matplotlib.patches as mpatches
from matplotlib.ticker import MultipleLocator
from pylab import *
import seaborn as sns
from matplotlib import rc
rc('text',
   usetex=True)
sns.set_style('ticks')
font = {
    'font.family':'Times New Roman',
    'font.weight': 200,
    'font.size': 10,
}
sns.set_style(font)
flatui = ['#002A5E', '#FD151B', '#8EBA42', '#348ABD', '#988ED5', '#BDB76B', '#8EBA42', '#FFB5B8']
sns.set_palette(flatui)
paper_rc = {
    'lines.linewidth': 2,
    'lines.markersize': 10,
    'mathtext.fontset': 'custom',
    'mathtext.rm': 'Times New Roman',
    'mathtext.bf': 'Times New Roman:bold',
}
sns.set_context("paper", font_scale=2,  rc=paper_rc)
current_palette = sns.color_palette()

In [2]:
# Other imports.
import json
import os
import re

# Get logfile paths

In [3]:
def get_logfile_paths_helper(directory_name):
    logfile_paths = []
    for root, _, file_names in os.walk(directory_name):
        if len(file_names) > 0:
            logfile_paths.extend(
                [os.path.join(root, file_name)
                 for file_name in file_names])
    return logfile_paths

def get_logfile_paths(directory_name):
    logfile_paths = []
    for logfile_path in get_logfile_paths_helper(directory_name):
        m = re.match(
            r'.*v100=(\d+)\.p100=(\d+)\.k80=(\d+)/(.*)/seed=(\d+)/'
             'lambda=(\d+\.\d+)\.log', logfile_path)
        v100s = int(m.group(1))
        p100s = int(m.group(2))
        k80s = int(m.group(3))
        policy = m.group(4)
        seed = int(m.group(5))
        l = float(m.group(6))
        logfile_paths.append((v100s, p100s, k80s, policy, seed,
                              l, logfile_path))
    return logfile_paths

# Plotting functions

In [5]:
def prune(logfile_paths, v100s, p100s, k80s, policy, seed=None):
    if seed is None:
        return sorted([(x[5], x[6], x[4]) for x in logfile_paths
                       if x[0] == v100s and x[1] == p100s and
                       x[2] == k80s and x[3] == policy])
    else:
        return sorted([(x[5], x[6]) for x in logfile_paths
                       if x[0] == v100s and x[1] == p100s and
                       x[2] == k80s and x[3] == policy and
                       x[4] == seed])

In [6]:
labels = {"fifo": "FIFO",
          "fifo_perf": "FIFO+perf",
          "fifo_packed": "FIFO+perf+packed",
          "max_min_fairness": "MMF",
          "max_min_fairness_perf": "MMF+perf",
          "max_min_fairness_packed": "MMF+perf+packed"}
def plot_metric_vs_inverse_lambda(v100s, p100s, k80s,
                                  policies, metric_fn,
                                  metric_label,
                                  xmax=None,
                                  ymax=None,
                                  output_filename=None):
    plt.figure(figsize=(8, 3.5))
    ax = plt.subplot2grid((1, 1), (0, 0), colspan=1)

    data = {"input_job_rate": [], "metric": [], "seed": [],
            "policy": []}
    for policy in policies:
        relevant_logfile_paths = list(reversed(prune(
            logfile_paths, v100s, p100s, k80s, policy)))
        lambdas = [x[0] for x in relevant_logfile_paths]
        input_job_rates = [3600.0 / x for x in lambdas]
        metrics = [metric_fn(x[1]) for x in relevant_logfile_paths]
        seeds = [x[2] for x in relevant_logfile_paths]
        policies = [labels[policy] for i in range(len(metrics))]

        import pandas as pd
        data["input_job_rate"] += input_job_rates
        data["metric"] += metrics
        data["seed"] += seeds
        data["policy"] += policies

    sns.lineplot(x='input_job_rate', y='metric', style='policy',
                 hue='policy',
                 data=data, ci='sd',
                 markers=True)

    ax.set_xlabel("Input job rate (jobs/hr)")
    ax.set_ylabel(metric_label)
    ax.set_xlim([0, xmax])
    ax.set_ylim([0, ymax])
    sns.despine()
    
    plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
    
    if output_filename is not None:
        with PdfPages(output_filename) as pdf:
            pdf.savefig(bbox_inches='tight')
    
    plt.show()

# Plot per-worker timelines

In [7]:
def get_events(logfile_path):
    events = {}
    utilization = None
    with open(logfile_path, 'r') as f:
        lines = f.readlines()
        for line in lines:
            m = re.match(r'(\d+\.\d+).*scheduled.*Job ID: (\d+)\t'
                          'Worker type: (.*)\tWorker ID[\(s\)]*: ([\d+\,]*\d+).*',
                         line)
            if m is not None:
                start_timestamp = float(m.group(1))
                job_id = int(m.group(2))
                worker_type = m.group(3)
                worker_ids = [int(x) for x in m.group(4).split(',')]
                for worker_id in worker_ids:
                    if worker_id not in events:
                        events[worker_id] = []
                    events[worker_id].append([job_id, worker_type,
                                              start_timestamp,
                                              None])
            m = re.match(r'(\d+\.\d+).*scheduled.*Job ID: \((\d+), (\d+)\)\t'
                          'Worker type: (.*)\tWorker ID[\(s\)]*: ([\d+\,]*\d+).*',
                         line)
            if m is not None:
                start_timestamp = float(m.group(1))
                job_id1 = int(m.group(2))
                job_id2 = int(m.group(3))
                worker_type = m.group(4)
                worker_ids = [int(x) for x in m.group(5).split(',')]
                for worker_id in worker_ids:
                    if worker_id not in events:
                        events[worker_id] = []
                    events[worker_id].append([(job_id1, job_id2), worker_type,
                                              start_timestamp,
                                              None])
            
            m = re.match(r'(\d+\.\d+).*succeeded.*Job ID: (\d+)\t'
                          'Worker type: (.*)\tWorker ID: (\d+).*',
                         line)
            if m is not None:
                end_timestamp = float(m.group(1))
                job_id = int(m.group(2))
                worker_type = m.group(3)
                worker_id = int(m.group(4))
                if worker_id not in events:
                    continue
                assert(events[worker_id][-1][0] == job_id and
                       events[worker_id][-1][1] == worker_type and
                       events[worker_id][-1][3] is None)
                events[worker_id][-1][3] = end_timestamp
                
            m = re.match(r'(\d+\.\d+).*succeeded.*Job ID: \((\d+), (\d+)\)\t'
                          'Worker type: (.*)\tWorker ID: (\d+).*',
                         line)
            if m is not None:
                end_timestamp = float(m.group(1))
                job_id1 = int(m.group(2))
                job_id2 = int(m.group(3))
                worker_type = m.group(4)
                worker_id = int(m.group(5))
                if worker_id not in events:
                    continue
                assert(events[worker_id][-1][0] == (job_id1, job_id2) and
                       events[worker_id][-1][1] == worker_type and
                       events[worker_id][-1][3] is None)
                events[worker_id][-1][3] = end_timestamp
                
            m = re.match(r'Cluster utilization: (\d+\.\d+)', line)
            if m is not None:
                utilization = float(m.group(1)) * 100.

    return events, utilization

def plot_timeline(v100s, p100s, k80s, policy, end, seed,
                  output_filename=None):
    relevant_logfile_paths = list(reversed(prune(
        logfile_paths, v100s, p100s, k80s, policy, seed)))
    for i, (lamb, relevant_logfile_path) in enumerate(relevant_logfile_paths):
        if i % 2 != 0:
            continue
        events, utilization = get_events(relevant_logfile_path)

        if utilization is not None:
            plt.figure(figsize=(25, 6))
            ax = plt.subplot2grid((1, 1), (0, 0), colspan=1)
            print("Lambda = %.2f" % lamb)

            worker_id_to_type_mapping = {}
            for worker_id in events:
                for (job_id, worker_type, start_timestamp, end_timestamp) \
                    in events[worker_id]:
                    if worker_id not in worker_id_to_type_mapping:
                        worker_id_to_type_mapping[worker_id] = worker_type
                    if start_timestamp < end:
                        if isinstance(job_id, tuple):
                            ax.plot([start_timestamp, min(end_timestamp, end)],
                                    [worker_id, worker_id],
                                    linewidth=10, c="yellow")
                        else:
                            ax.plot([start_timestamp, min(end_timestamp, end)],
                                    [worker_id, worker_id],
                                    linewidth=10, c="C%d" % (job_id % 10))
            print(worker_id_to_type_mapping)
            print("Utilization: %.3f%%" % utilization)

        ax.set_xlabel("Time")
        ax.set_ylabel("Worker ID")
        worker_ids = list(events.keys())
        
        ax.set_yticks(worker_ids)
        ax.set_yticklabels([str(worker_id) for worker_id in worker_ids])
        sns.despine()

        if output_filename is not None:
            with PdfPages(output_filename) as pdf:
                pdf.savefig(bbox_inches='tight')

        plt.show()

In [8]:
worker_type_map = {
    'k80': 0,
    'p100': 1,
    'v100': 2,
}
def get_num_promotions_per_job(logfile_path):
    events, _ = get_events(logfile_path)
    per_job_timeline = {}
    num_promotions_per_job = {}
    for worker_id in events:
        for i, (job_id, worker_type, start_time, _) in enumerate(events[worker_id]):
            worker_type_int = worker_type_map[worker_type]
            if type(job_id) == tuple:
                for job_id_ in job_id:
                    if job_id_ not in per_job_timeline:
                        per_job_timeline[job_id_] = []
                    per_job_timeline[job_id_].append((worker_type_int, start_time))
            else:
                if job_id not in per_job_timeline:
                    per_job_timeline[job_id] = []
                per_job_timeline[job_id].append((worker_type_int, start_time))
    for job_id in per_job_timeline:
        per_job_timeline[job_id].sort(key=lambda x: x[1])
    for job_id in per_job_timeline:
        num_promotions_per_job[job_id] = 0
        for i, (worker_type, _) in enumerate(per_job_timeline[job_id]):
            if i == 0: continue
            if worker_type != per_job_timeline[job_id][i-1][0]:
                num_promotions_per_job[job_id] += 1
    return num_promotions_per_job

In [15]:
def sweep_get_num_promotions_per_job(relevant_logfile_paths):
    for lam, logfile_path, seed in relevant_logfile_paths:
        num_promotions_per_job = get_num_promotions_per_job(logfile_path)
        all_num_promotions = [num_promotions_per_job[job_id] for job_id in num_promotions_per_job]
        print('Lambda=%f, seed=%d: Average number of promotions: %f (stddev %f)' % (
            lam,
            seed,
            np.mean(all_num_promotions),
            np.std(all_num_promotions))) 

In [16]:
logfile_paths = sorted(get_logfile_paths(
    "/lfs/1/deepak/gpusched/scheduler/logs/multigpu_support_singlegpu/"))

In [17]:
relevant_logfile_paths = prune(logfile_paths, 8, 8, 8, 'fifo')
relevant_logfile_paths.reverse()
sweep_get_num_promotions_per_job(relevant_logfile_paths)

Lambda=86400.000000, seed=0: Average number of promotions: 0.000000 (stddev 0.000000)
Lambda=43200.000000, seed=0: Average number of promotions: 0.019400 (stddev 0.297697)
Lambda=28800.000000, seed=0: Average number of promotions: 0.188200 (stddev 1.172681)
Lambda=21600.000000, seed=0: Average number of promotions: 0.559888 (stddev 2.148753)
Lambda=17280.000000, seed=0: Average number of promotions: 1.250350 (stddev 3.666936)
Lambda=14400.000000, seed=0: Average number of promotions: 2.010180 (stddev 5.068089)
Lambda=12342.857143, seed=0: Average number of promotions: 2.795613 (stddev 6.485224)
Lambda=10800.000000, seed=0: Average number of promotions: 3.237423 (stddev 7.684556)
Lambda=9600.000000, seed=0: Average number of promotions: 2.021961 (stddev 6.363122)
Lambda=8640.000000, seed=0: Average number of promotions: 0.202824 (stddev 1.851756)


In [18]:
relevant_logfile_paths = prune(logfile_paths, 8, 8, 8, 'fifo_perf')
relevant_logfile_paths.reverse()
sweep_get_num_promotions_per_job(relevant_logfile_paths)

Lambda=86400.000000, seed=0: Average number of promotions: 0.000000 (stddev 0.000000)
Lambda=43200.000000, seed=0: Average number of promotions: 0.007600 (stddev 0.086846)
Lambda=28800.000000, seed=0: Average number of promotions: 0.076200 (stddev 0.265318)
Lambda=21600.000000, seed=0: Average number of promotions: 0.201960 (stddev 0.410330)
Lambda=17280.000000, seed=0: Average number of promotions: 0.386813 (stddev 0.514938)
Lambda=14400.000000, seed=0: Average number of promotions: 0.561302 (stddev 0.576285)
Lambda=12342.857143, seed=0: Average number of promotions: 0.715653 (stddev 0.636623)
Lambda=10800.000000, seed=0: Average number of promotions: 0.906070 (stddev 0.717606)
Lambda=9600.000000, seed=0: Average number of promotions: 1.052423 (stddev 0.757037)
Lambda=8640.000000, seed=0: Average number of promotions: 1.128899 (stddev 0.788392)


In [19]:
logfile_paths = sorted(get_logfile_paths(
    "/lfs/1/deepak/gpusched/scheduler/logs/multigpu_support_multigpu/"))

In [20]:
relevant_logfile_paths = prune(logfile_paths, 8, 8, 8, 'fifo')
relevant_logfile_paths.reverse()
sweep_get_num_promotions_per_job(relevant_logfile_paths)

Lambda=50400.000000, seed=0: Average number of promotions: 0.357186 (stddev 1.355964)
Lambda=25200.000000, seed=0: Average number of promotions: 2.606296 (stddev 7.821046)
Lambda=16800.000000, seed=0: Average number of promotions: 8.559072 (stddev 16.669405)
Lambda=12600.000000, seed=0: Average number of promotions: 4.936767 (stddev 5.001636)
Lambda=10080.000000, seed=0: Average number of promotions: 4.555312 (stddev 3.691781)
Lambda=8400.000000, seed=0: Average number of promotions: 4.345918 (stddev 2.939424)


In [21]:
relevant_logfile_paths = prune(logfile_paths, 8, 8, 8, 'fifo_perf')
relevant_logfile_paths.reverse()
sweep_get_num_promotions_per_job(relevant_logfile_paths)

Lambda=50400.000000, seed=0: Average number of promotions: 0.159936 (stddev 0.462952)
Lambda=25200.000000, seed=0: Average number of promotions: 0.590066 (stddev 0.896230)
Lambda=16800.000000, seed=0: Average number of promotions: 1.378150 (stddev 1.389823)
Lambda=12600.000000, seed=0: Average number of promotions: 1.997406 (stddev 1.692403)
Lambda=10080.000000, seed=0: Average number of promotions: 1.683371 (stddev 1.761532)
Lambda=8400.000000, seed=0: Average number of promotions: 1.476298 (stddev 1.760175)
