# Distributed Scheduling
The lablet aims to introduce performance gains and overheads of distributed scheduling policies in various scenarios. Distributed scheduling can be implemented with many goals in mind such as load balancing, efficient job placement, etc. In this lablet we focus on the load balancing and evaluate the performance of the receiver-initiated and server-initiated policies described here. A distributed scheduling policy has the following components:
* **Transfer policy:** when to transfer a process?
* **Selection policy:** which process to transfer?
* **Location policy:** where to transfer the process?

We explore two policies on a set of servers, each with a task queue where the jobs wait until they can run:
### Sender-Initiated
In this policy, the overloaded servers (servers with a task length size above a specified threshold), try to decrease the job waiting time by transfering jobs to other servers. In this case, we select a job from the end of the queue and select a server at random to poll. The polled server accepts the new load (job) if its queue length is lower than the threshold. Otherwise we poll another server.

### Receiver-Iniated
In this policy, the underloaded servers (servers with a task queue length above a specified threshold), try to increase their own load by borrowing jobs from other servers. In this case, we select a server at random to poll and ask if it has a job to send. The polled server sends the load (job) if its queue length is higher than the threshold.



## Scenario
Consider a cluster of $N$ servers connected through a network. We explore the effect of different policies on  total messages sent between servers and total job migrations.
- **Part 0**: Describe the Cluster Simulation Input and Output
- **Part 1**: Evaluate the effect of policy under different utilizations
- **Part 2**: Evaluate the effect of Queue length
- **Part 3**: Evaluate the effect of Search length
- **Part 4**: Feedback forms

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from src.simulate import *

## Part 0: Cluster Simulator

### Simulator Components:
- **Cluster**: A cluster is a collector of servers. In this setting we assume that cluster servers are homogenous and all servers are connected.

- **Server**: A server presents a processing entity. The server follows a policy and has a queue threshold. The server holds a queue where tasks are submitted to simulate an expected utilization. 
- **Task**: A task is schedulable entity. A task can be in one of three states (QUEUED, RUN, and DONE). Tasks are movable between servers.

### Starting a simulation instance
> run_simulation(servers_number, simulation_time, utilization, policy, search_limit, queue_threshold)
- **servers_number (+ve int):** The cluster Size
- **simultion_time (+ve int, > 300):** The experiment length in seconds (We note that experiment time should be large enough to reach steady-state). 
- **utilization (float (0, 1)):** Expected  utilization value. This denotes the average utilization of the nodes in the cluster.
- **policy (str (sender|receiver)):** applied policy.
- **search_limit (int):** Number of proped servers
- **queue_threshold (int):** Queue length that triggers the policy (Try values no larger than 6).

### Output
The simulation returns a Pandas Dataframe with the following columns:
* **server**: Server ID	
* **total_tasks**: Total Processed Tasks by the server
* **total_time**: Total processing time by the server
* **messages**: Total sent messages by the server
* **migrations**: Total migrations initated by the server	
* **queue_length**: Average Queue length
* **utilization**: Experiment Utilization 
* **policy**: Experiment Policy
* **effective_utlization**: Actual utilization (total_time/simultion_time)

### Ex:

In [None]:
run_simulation(5, 1000, 0.8, "sender", 3, 2)

__NOTE: Once you run a simulation or set of simulations, please don't stop the kernel/notebook, as it will crash and you will need to re-run everything from the beginning.__

## Running Experiements
We provide a function called `run_experiments()`, which is a wrapper for the `run_simulations()` function to include caching in case the same experiment is repeated. You are going to be using this function for the rest of the lablet. 

In [None]:
# common configurations. Please don't change.
servers_number = 5
simulation_time = 1000

In [None]:
# List of colors used for plotting. Please don't change.
colors = ['tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple', 'tab:brown', 'tab:pink', 'tab:gray', 'tab:olive', 'tab:cyan']

In [None]:
cache = {}

In [None]:
def run_experiment(servers_number, simulation_time, utilization, policy, search_limit, queue_threshold):
    if (servers_number, simulation_time, utilization, policy, search_limit, queue_threshold) in cache:
        return cache[(servers_number, simulation_time, utilization, policy, search_limit, queue_threshold)]
    else:
        df = run_simulation(servers_number, simulation_time, utilization, policy, search_limit, queue_threshold)
        cache[(servers_number, simulation_time, utilization, policy, search_limit, queue_threshold)] = df
        return df

### Plotting function

In [None]:
def plot(x, sender_results, receiver_results, utilization, ax0, ax1, color):
    sender_total_messages = []
    receiver_total_messages = []
    sender_cvs = []
    receiver_cvs = []

    for i in range(len(x)):
        df = sender_results[i]
        messages = sum(df['messages'])

        sender_total_messages.append(messages)
        #cv = cv_func(df['queue_length'])
        sender_cvs.append(df['migrations'].sum())

        df = receiver_results[i]
        messages = sum(df['messages'])

        receiver_total_messages.append(messages)
        #cv = cv_func(df['queue_length'].sum())
        receiver_cvs.append(df['migrations'].sum())
    
    if utilization is None:
        sender_label = 'Sender-initiated'
        receiver_label = 'Receiver-initiated'
    else:
        sender_label = f'Sender-initiated, u = {utilization}'
        receiver_label = f'Receiver-initiated, u = {utilization}'
    
    ax0.plot(x, sender_total_messages, label=sender_label, color=color, linestyle='solid')
    ax0.plot(x, receiver_total_messages, label=receiver_label, color=color, linestyle='dashed')

    ax1.plot(x, sender_cvs, label=sender_label, color=color, linestyle='solid')
    ax1.plot(x, receiver_cvs, label=receiver_label, color=color, linestyle='dashed')

## Part 1: Effect of utilization ratio
This scenario evaluates the effect of utilization under sender and receiver initated policy. This part introduces the tradeoffs and shows the system behavior.

> For this part you will be given some tested configurations. Just run the code cell.

The __utilization ratio__ denotes the expected average utilization of the nodes in the cluster. For instance, an utilization value of 0.5 means that 50% of the time the cpu is idle.

The __search limit__ denotes the maximum number of servers that a node can contact other nodes to send/receive jobs in the sender-initiated and receiver-initiated policies.

The __queue threshold__ denotes the amount of jobs in the queue above/below which the policies are triggered (see the explanation of these policies at the beginning of the Notebook).

In [None]:
sender_utilization_results = []
receiver_utilization_results = []

#################################
# Configurations
utilizations = [0.1, 0.2,0.3,0.4,0.5,0.6, 0.7, 0.8, 0.9,  0.99]
search_limit = 5
queue_threshold = 1
#################################

total_experiments = len(utilizations)
exp = 1
for utilization in utilizations:
    print(f'Experiment {exp} of {total_experiments}, utilization ratio: {utilization:.1f}')
    df = run_experiment(servers_number=servers_number, simulation_time=simulation_time, utilization=utilization,
                             policy="sender", search_limit=search_limit, queue_threshold=queue_threshold)
    sender_utilization_results.append(df)
    
    df = run_experiment(servers_number=servers_number, simulation_time=simulation_time, utilization=utilization,
                             policy="receiver", search_limit=search_limit, queue_threshold=queue_threshold)
    receiver_utilization_results.append(df)
    
    exp += 1

In [None]:
fig, (ax0, ax1) = plt.subplots(1, 2, figsize=(15, 5))
plot(utilizations, sender_utilization_results, receiver_utilization_results,
     None, ax0, ax1, 'tab:blue')
ax0.set_xlabel('Utilization Ratio')
ax0.set_ylabel('Total Messages')
ax0.legend()

ax1.set_xlabel('Utilization Ratio')
ax1.set_ylabel('Total Migrations')
ax1.legend()

## Part 1 Questions (25 Points):

#### Answer for both sender-initiated and receiver-initiated policies.

#### What is the effect of increasing the utilization ratio from 0 to 1 on the total number of messages across the cluster? Why?
*Your answer here.*

#### What is the effect of increasing the utilization ratio from 0 to 1 on the total migrations across the cluster?  Why?
*Your answer here.*

## Part 2: Effect of queue threshold

This scenario evaluates the effect of queue threshold under sender and receiver initated policy across utilizations.


In [None]:
sender_qt_results = {}
receiver_qt_results = {}
#################################
# Test with different values and see the effect in behavior
utilizations = []    #  <- fill in with utilization values (try not giving more than 10 values)
thresholds = []      #  <- fill in with queue threshold values
search_limit =       #  <- fill in with an integer value for search limit
#################################

total_experiments = len(utilizations) * len(thresholds)
exp = 1
for utilization in utilizations:
    sender_qt_results[utilization] = []
    receiver_qt_results[utilization] = []
    for threshold in thresholds:
        print(f'Experiment {exp} of {total_experiments}, queue threshold: {threshold}, '
              f'utilization ratio: {utilization}')
        df = run_experiment(servers_number=servers_number, simulation_time=simulation_time, utilization=utilization,
                                 policy="sender", search_limit=search_limit, queue_threshold=threshold)
        sender_qt_results[utilization].append(df)

        df = run_experiment(servers_number=servers_number, simulation_time=simulation_time, utilization=utilization,
                                 policy="receiver", search_limit=search_limit, queue_threshold=threshold)
        receiver_qt_results[utilization].append(df)
        
        exp += 1

In [None]:
# You don't need to change this part
fig, (ax0, ax1) = plt.subplots(1, 2, figsize=(15, 5))
colors_slice = colors[0:len(utilizations)]#['tab:blue', 'tab:orange', 'tab:green']
for i in range(len(utilizations)):
    plot(thresholds, sender_qt_results[utilizations[i]], receiver_qt_results[utilizations[i]],
         utilizations[i], ax0, ax1, colors_slice[i])
    
ax0.set_xlabel('Queue Threshold')
ax0.set_ylabel('Total Messages')
ax0.legend()

ax1.set_xlabel('Queue Threshold')
ax1.set_ylabel('Total Migrations')
ax1.legend()

## Part 2 Questions (25 Points):

#### Answer for both sender-initiated and receiver-initiated policies.

#### What is the effect of increasing the queue threshold from 0 upwards on the total number of messages across the cluster? Why?
*Your answer here.*

#### What is the effect of increasing the queue threshold from 0 upwards on the total migrations across the cluster?  Why?
*Your answer here.*

## Part 3: Effect of search limit

This scenario evaluates the effect of search limit under sender and reciever initated policy across utilizations.

In [None]:
sender_sl_results = {}
receiver_sl_results = {}
#################################
# Test with different values and see the effect in behavior
utilizations = []    #  <- fill in with utilization values (try not giving more than 10 values)
search_limits = []   #  <- fill in with search limit values
queue_threshold =    #  <- fill with an integer for queue threshold
#################################

total_experiments = len(utilizations) * len(thresholds)
exp = 1
for utilization in utilizations:
    sender_sl_results[utilization] = []
    receiver_sl_results[utilization] = []
    for limit in search_limits:
        print(f'Experiment {exp} of {total_experiments}, search limit: {limit}, '
              f'utilization ratio: {utilization}')
        df = run_experiment(servers_number=servers_number, simulation_time=simulation_time, utilization=utilization,
                                 policy="sender", search_limit=limit, queue_threshold=queue_threshold)
        sender_sl_results[utilization].append(df)

        df = run_experiment(servers_number=servers_number, simulation_time=simulation_time, utilization=utilization,
                                 policy="receiver", search_limit=limit, queue_threshold=queue_threshold)
        receiver_sl_results[utilization].append(df)
        
        exp += 1

In [None]:
# You don't need to change this part
fig, (ax0, ax1) = plt.subplots(1, 2, figsize=(15, 5))
colors_slice = colors[0:len(utilizations)]
for i in range(len(utilizations)):
    plot(search_limits, sender_sl_results[utilizations[i]], receiver_sl_results[utilizations[i]],
         utilizations[i], ax0, ax1, colors_slice[i])
    
    ax0.set_xlabel('Search Limit')
    ax0.set_ylabel('Total Messages')
    ax0.legend()
    
    ax1.set_xlabel('Search Limit')
    ax1.set_ylabel('Search Migrations')
    ax1.legend()

## Part 3 Questions (25 Points):

#### Answer for both sender-initiated and receiver-initiated policies.

#### What is the effect of increasing the search limit from 1 upwards on the total number of messages across the cluster? Why?
*Your answer here.*

#### What is the effect of increasing the search limit from 1 upwards on total migrations across the cluster? Why?
*Your answer here.*

### Part 4 Feedback (25 Points):

Please fill the feedback form at https://docs.google.com/forms/d/1QALcmsoD73nB5WKGAXqJ4Omv1mCPPhonH2tfIsGF7NU