In [1]:
import os
from typing import List
import numpy as np

In [2]:
DC_CLOUD = 0
DC_EDGE = 1

DS_PUBLIC = 0
DS_PRIVATE = 1

DS_INIT = 0
DS_GEN = 1

NUM_DATACENTRES = 3
NUM_DATASETS = 5
NUM_TOTAL_DATASETS = 6
NUM_TASKS = 5

In [3]:
class datacentre(object):
    def __init__(self, dc_type: int, capacity: int, index: int):
        self.capacity = capacity
        self.dc_type = dc_type
        self.index = index
        
    def __str__(self):
        return (str(self.capacity))

class dataset(object):
    def __init__(self, size: int, gt: int, lc: datacentre, flc: datacentre,
                 ds_type: int, input_ds_type: int, index: int):
        self.size = size
        self.gt = gt #Generating task
        self.lc = lc #dc location
        self.flc = flc #dc final location
        self.ds_type = ds_type #public or private
        self.input_ds_type = input_ds_type #input or generated dataset
        self.index = index

    def __str__(self):
        return (str(self.size) + str(self.ds_type))
        
        
class task(object):
    def __init__(self, ids: List[int] , ods: List[int], index: int):
        self.ids = ids #input dataasets
        self.ods = ods #output dataset
        self.index = index
        
    def __str__(self):
        return (str(self.ids) + " " + str(self.ods))
            

In [213]:
datacentres = []
datasets = []
tasks = []
task_dependencies = []
bandwidth = []

def initialize_dc():
    dc1 = datacentre(DC_CLOUD, 10000, 1)
    dc2 = datacentre(DC_EDGE, 20, 2)
    dc3 = datacentre(DC_EDGE, 20, 3)
    
    datacentres.extend([dc1, dc2, dc3])
    
def initialize_ds():
    ds1 = dataset(3, None, -1, -1, DS_PUBLIC, DS_INIT, 1)
    ds2 = dataset(5, None, -1, -1, DS_PUBLIC, DS_INIT, 2)
    ds3 = dataset(3, None, -1, -1, DS_PUBLIC, DS_INIT, 3)
    ds4 = dataset(3, None, datacentres[1], datacentres[1], DS_PRIVATE, DS_INIT, 4)
    ds5 = dataset(5, None, datacentres[2], datacentres[2], DS_PRIVATE, DS_INIT, 5)
    ds6 = dataset(8, None, -1, -1, DS_PUBLIC, DS_GEN,6)
    
    datasets.extend([ds1, ds2, ds3, ds4, ds5, ds6])
    

def initialize_tasks():
    t1 = task([datasets[0], datasets[1]], [], 1)
    t2 = task([datasets[0], datasets[1], datasets[5]], [], 2)
    t3 = task([datasets[0], datasets[1], datasets[2], datasets[5]], [], 3)
    t4 = task([datasets[2], datasets[3], datasets[5]], [], 4)
    t5 = task([datasets[4]], [datasets[5]], 5)
    
    tasks.extend([t1, t2, t3, t4, t5])
    

def initialize_task_dep():
    global task_dependencies
    task_dependencies = [[] for i in range(NUM_TASKS)]
    task_dependencies[4].extend([1,2,3])
    
    
def initialize_bandwidth():
    global bandwidth
    bandwidth = [[0] * NUM_DATACENTRES for i in range(NUM_DATACENTRES)]
    bandwidth[0][1] = bandwidth[1][0] = 10
    bandwidth[0][2] = bandwidth[2][0] = 20
    bandwidth[1][2] = bandwidth[2][1] = 150
    
initialize_dc()
initialize_ds()
initialize_tasks()
initialize_task_dep()
initialize_bandwidth()

In [160]:
#Topologically sort tasks
#Place task in datacentre with minimal transmission time
#After placing task place output tasks in same data centre
#Calculate Ttotal

    
# A recursive function used by topologicalSort
def topologicalSortUtil(v,visited,stack):

    visited[v] = True

    for i in task_dependencies[v]:
        print(i)
        if visited[i] == False:
            topologicalSortUtil(i,visited,stack)

    stack.insert(0,v)

def topologicalSort():

    visited = [False]*NUM_TASKS
    stack =[]

    for i in range(NUM_TASKS):
        if visited[i] == False:
            topologicalSortUtil(i,visited,stack)

    print(stack)
    return stack
        
sorted_tasks = topologicalSort()

1
2
3
[4, 3, 2, 1, 0]


In [179]:
def task_placement_feasibility(task, task_datacentre, datacentre_usage):
    
    for ids in task.ids:
        print("Dataset is " + str(ids.index) + " and size is " + str(ids.size))
        if(datacentre_usage[task_datacentre.index-1] + ids.size > task_datacentre.capacity):
            return False, task_datacentre
        datacentre_usage[task_datacentre.index-1] += ids.size
        print("Usage of data centre " + str(task_datacentre.index) + " increased to " 
              + str( datacentre_usage[task_datacentre.index-1]))
        
    for ods in task.ods:
        print("Dataset is " + str(ods.index) + " and size is " + str(ods.size))
        if(datacentre_usage[task_datacentre.index-1] + ods.size > task_datacentre.capacity):
            return False, task_datacentre
        datacentre_usage[task_datacentre.index-1] += ods.size

        print("Usage of data centre " + str(task_datacentre.index) + " increased to " 
              + str( datacentre_usage[task_datacentre.index-1]))
        
        
    return True, None

In [180]:
def place_tasks(particle, datacentre_usage):
    #Map from dataset to datacentre index (1 indexed)
    dataset_to_datacenter = [0] * NUM_TOTAL_DATASETS
    
    #Copy initial particle dataset to datacnetre map to this map
    for i in range(len(particle)):
        dataset_to_datacenter[i] = particle[i]
    print(dataset_to_datacenter)
        
        
    task_datacentres = [0] * NUM_TASKS
    transmission_times = [0] * NUM_TASKS
    
    #Iterate through tasks in topologically sorted order
    for i in range(NUM_TASKS):
        
        transmission_time = 10000
        datacentre = None
        
        task = tasks[sorted_tasks[i]]
        
        #Check if the task requires any private dataset
        private_dataset_input = 0
        for ids in task.ids:
            if(ids.ds_type == DS_PRIVATE):
                datacentre = ids.lc
                private_dataset_input = 1
                print("Private ids " + str(ids.index) + " found in datacenter " + 
                      str(dataset_to_datacenter[ids.index-1]))
        
        #Private dataset required for this task, hence task should be placed in
        # datacentre in which private dataset is present
        
        if(private_dataset_input == 1):
            transmission_time_i = 0
            
            #Iterate through the input datasets of task to determine the other datasets
            # which should be transmitted to this datacentre
            
            for ids in task.ids:
                print("Input dataset " + str(ids.index))
                idc = dataset_to_datacenter[ids.index-1]
                print("Input dataset resides in " + str(idc))
                if(idc != (datacentre.index)):
                    #print(str(idc) + " " + str(datacentre.index))
                    transmission_time_i += ids.size * 1024 / (bandwidth[idc-1][datacentre.index-1])
                    print("Transmitting dataset " + str(ids.index) +  " to datacentre " + str(datacentre.index))
                    
            #Iterate through output datasets of task to place output dataset in the particular datacentre
            for ods in task.ods:
                dataset_to_datacenter[ods.index-1] = datacentre.index
                print("Adding output dataset " + str(ods.index) + " to datacentre " + str(datacentre.index))
                
            transmission_times[task.index-1] = (transmission_time_i)
            task_datacentres[task.index-1] = (datacentre.index)
            
            #Check if after bringing in input datasets and generating output datasets will violate the dataentre
            #storage constraints
            feasibility, invalid_datacentre = task_placement_feasibility(task, datacentre, datacentre_usage)
            if(feasibility == False):
                return False, -1, [], invalid_datacentre
            
            
        #No private dataset in input dataset of task
        #Need to iterate through all daatacentres and choose the one with minimal transmission time
        
        else:
            transmission_time_i = 10000
            datacentre = None
            
            #Iterating through datacentres
            for j in range(NUM_DATACENTRES):
                task_dc = datacentres[j]
                transmission_time_dcj = 0
                print("Placing task in datacentre " + str(task_dc.index))
          
                #Iterate through the input datasets of task to determine the other datasets
                # which should be transmitted to this datacentre
            
                for ids in task.ids:
                    print("Input dataset " + str(ids.index))
                    idc = dataset_to_datacenter[ids.index-1]
                    print("Input dataset resides in " + str(idc))
                    if(idc != task_dc.index):
                        transmission_time_dcj += ids.size * 1024 / (bandwidth[idc-1][task_dc.index-1])
                        print("Transmitting dataset " + str(ids.index) +  " to datacentre " + str(task_dc.index) 
                             + " in " + str(ids.size * 1024 / (bandwidth[idc-1][task_dc.index-1])))
                   
                print("Trans time for task " + str(task.index) + " in datacentre " + str(j+1) + " is " 
                      + str(transmission_time_dcj))
                
                #Record the datacentre which produces minimal transmission time
                if (transmission_time_dcj < transmission_time_i):
                    print("Here " + str(task_dc.index))
                    transmission_time_i = transmission_time_dcj
                    datacentre = task_dc
                    
            #Iterate through output datasets of task to place output dataset in the particular datacentre
            for ods in task.ods:
                dataset_to_datacenter[ods.index-1] = datacentre.index-1
                print("Adding output dataset " + str(ods.index) + " to datacentre " + str(datacentre.index))
                
            print("Appending " + str(transmission_time_i) + " " + str(datacentre.index))
            transmission_times[task.index-1] = (transmission_time_i)
            task_datacentres[task.index-1] = (datacentre.index)
            
            #Check if after bringing in input datasets and generating output datasets will violate the dataentre
            #storage constraints
            feasibility, invalid_datacentre = task_placement_feasibility(task, datacentre, datacentre_usage)
            if(feasibility == False):
                return False, -1, [], invalid_datacentre
            
    print(transmission_times)
    print(task_datacentres)  
    
    return True, transmission_times, task_datacentres, None

In [181]:
def initial_particle_feasibility(particle):
    
    datacentre_usage = [0] * NUM_DATACENTRES
        
    for i in range(len(particle)):
        if((datacentre_usage[particle[i]-1] + datasets[i].size) > datacentres[particle[i]-1].capacity):
            return False, datacentres[particle[i]-1]
        datacentre_usage[particle[i]-1] += datasets[i].size 
        print("Data centre usage of data centre " + str(datacentres[particle[i]-1].index) + " is " + str(datacentre_usage[particle[i] - 1]))
        
    return True, None

def fitness(particle):
    
    initial_feasibility, invalid_datacentre = initial_particle_feasibility(particle)
    if(initial_feasibility == False):
        return 1, 10000, invalid_datacentre

    feasibility, total_transmission_time, task_datacentres, invalid_datacentre = place_tasks(particle, [0] * NUM_DATACENTRES)
    
    if(feasibility == False):
        return 1, 10000, invalid_datacentre

    return 0, np.sum(total_transmission_time), None
    

In [184]:
feasibility, fitness_score, invalid_datacentre = fitness([1,1,1,2,3])

if(feasibility == 0):
    print("Fitness score is " + str(fitness_score))
else:
    print("Data centre " + str(invalid_datacentre.index) + " violating storage constraint") 

Data centre usage of data centre 1 is 3
Data centre usage of data centre 1 is 8
Data centre usage of data centre 1 is 11
Data centre usage of data centre 2 is 3
Data centre usage of data centre 3 is 5
[1, 1, 1, 2, 3, 0]
Private ids 5 found in datacenter 3
Input dataset 5
Input dataset resides in 3
Adding output dataset 6 to datacentre 3
Dataset is 5 and size is 5
Usage of data centre 3 increased to 5
Dataset is 6 and size is 8
Data centre 3 violating storage constraint


In [176]:
# Pep8 standards - https://www.python.org/dev/peps/pep-0008/#id39 
# Enum -https://docs.python.org/3/library/enum.html#creating-an-enum 
# Keyword arguments for Dataset (any class with large number of args)
# Pass arrays to function instead of global
# Timesheets

In [186]:
def get_public_dataset_positions(datasets):
    public_datasets_positions = []
    for i in range(len(datasets)):
        if(datasets[i].ds_type == DS_PUBLIC and datasets[i].input_ds_type == DS_INIT):
            public_datasets_positions.append(i)
    
    return public_datasets_positions

In [187]:
public_datasets_positions = get_public_dataset_positions(datasets)
print(public_datasets_positions)

[0, 1, 2]


In [188]:
import random

In [241]:
def random_integer(low, high, K = -1):
    r = random.randint(low, high)
    while(r == K):
        r = random.randint(low, high)
    return r

In [242]:
def mutation(particle, w, public_dataset_positions):
    
    r = random.uniform(0, 1)
    print(r)
    if (r > w):
        return particle
    
    feasibility, fitness_score, invalid_datacentre = fitness(particle)
    print("In mutation")
    if(feasibility == 0):
        print("Fitness score is " + str(fitness_score))
        mutation_index = random.choice(public_dataset_positions)
        print("Mutation index is " + str(mutation_index))
        random_datacentre = random_integer(1, NUM_DATACENTRES, particle[mutation_index])
        particle[mutation_index] = random_datacentre
        
    else:
        print("Data centre " + str(invalid_datacentre.index) + " violating storage constraint") 
        for i in range(len(particle)):
            if(particle[i] == invalid_datacentre and datasets[i].ds_type == DS_PUBLIC):
                random_datacentre = random_integer(1, NUM_DATACENTRES, particle[i])
                particle[mutation_index] = random_datacentre

In [243]:
particle = [1,1,1,2,3]
mutation(particle, 0.9, [0,1,2])
print(particle)

0.3796421845601786
Data centre usage of data centre 1 is 3
Data centre usage of data centre 1 is 8
Data centre usage of data centre 1 is 11
Data centre usage of data centre 2 is 3
Data centre usage of data centre 3 is 5
[1, 1, 1, 2, 3, 0]
Private ids 5 found in datacenter 3
Input dataset 5
Input dataset resides in 3
Adding output dataset 6 to datacentre 3
Dataset is 5 and size is 5
Usage of data centre 3 increased to 5
Dataset is 6 and size is 8
Usage of data centre 3 increased to 13
Private ids 4 found in datacenter 2
Input dataset 3
Input dataset resides in 1
Transmitting dataset 3 to datacentre 2
Input dataset 4
Input dataset resides in 2
Input dataset 6
Input dataset resides in 3
Transmitting dataset 6 to datacentre 2
Dataset is 3 and size is 3
Usage of data centre 2 increased to 3
Dataset is 4 and size is 3
Usage of data centre 2 increased to 6
Dataset is 6 and size is 8
Usage of data centre 2 increased to 14
Placing task in datacentre 1
Input dataset 1
Input dataset resides in 1


In [244]:
def fix_private_dataset_locations(particle):
    for i in range(len(particle)):
        if(datasets[i].ds_type == DS_PRIVATE):
            particle[i] = datasets[i].lc.index

In [259]:
def crossover(particle1, particle2, c):
    r = random.uniform(0, 1)
    print(r)
    if (r > c):
        return particle
    
    new_particle = [0] * NUM_DATASETS
    index1 = random_integer(0, len(particle1) - 1)
    index2 = random_integer(0, len(particle1) - 1, index1)
    
    print("Indexes are " + str(index1) + " " + str(index2))
    
    if(index1 > index2):
        temp = index1
        index1 = index2
        index2 = temp
    
    for i in range(len(particle1)):
        if(i < index1):
            new_particle[i] = particle1[i]
        elif (i <= index2):
            new_particle[i] = particle2[i]
        else:
            new_particle[i] = particle1[i]
    
    #fix_private_dataset_locations(new_particle)
    
    return new_particle

In [260]:
new_particle = crossover([1,1,1,2,3], [2,3,3,2,3], 0.8)
print(new_particle)

0.09699816827703278
Indexes are 0 4
[2, 3, 3, 2, 3]


In [261]:
INITIAL_POPULATION = 10

MAX_W = 0.9
MIN_W = 0.4

C1_START = 0.9
C1_END = 0.4

C2_START = 0.9
C2_END = 0.4

In [262]:
particles = []
def initialize_population():
    for i in range(INITIAL_POPULATION):
        particle = np.random.randint(1, NUM_DATACENTRES+1, NUM_DATASETS)
        fix_private_dataset_locations(particle)
        particles.append(particle)
        print(particle)
        
initialize_population()

[2 3 2 2 3]
[3 1 1 2 3]
[2 2 2 2 3]
[2 3 1 2 3]
[1 2 3 2 3]
[3 2 1 2 3]
[1 3 3 2 3]
[3 1 3 2 3]
[3 1 2 2 3]
[1 1 3 2 3]
