In [35]:
def quicksort(arr, compare):
    if len(arr) <= 1:
        return arr
    pivot = arr[len(arr) // 2]
    left = [x for x in arr if compare(x, pivot) == 1]
    middle = [x for x in arr if compare(x, pivot) == 2]
    right = [x for x in arr if compare(x, pivot) == 0]
    return quicksort(left, compare) + middle + quicksort(right, compare)

In [36]:
import random

In [37]:
import xml.etree.ElementTree as ET
import numpy as np
import re

def extractNumberFromString(string):
    numbers = re.findall(r'\d+', string)
    if numbers:
        return int(numbers[0])  

class task:
    def __init__(self, sources, destinations, dataSize):
        self.sources = sources
        self.destinations = destinations
        self.dataSize = dataSize

class processor:
    def __init__(self, cu, bw, cost):
        self.cu = cu
        self.bw = bw
        self.cost = cost

class workFlow:
    def __init__(self, filePath, deadline):
        self.numberOfTasks = extractNumberFromString(filePath)
        self.tasks = [task([], [], 0)] * self.numberOfTasks
        self.DAG = self.getDAGFromXml(filePath)
        self.deadline = deadline
        degree = self.getDegree()
        self.inDegree = degree[0]
        self.zeroInDegree = degree[1]
       

    def getDAGFromXml(self, filePath):
        tree = ET.parse(filePath)
        root = tree.getroot()
        namespace = {'dax': 'http://pegasus.isi.edu/schema/DAX'}

        DAG = [[0 for _ in range(self.numberOfTasks)] for _ in range(self.numberOfTasks)]
        jobs = {}
        inData = [[]] * self.numberOfTasks
        outData = [[]] * self.numberOfTasks

        tasks = []
        for i in range(self.numberOfTasks):
            tasks.append(task([], [], 0))
        
        for job in root.findall('.//dax:job', namespace):
            job_id = extractNumberFromString(job.attrib['id'])
            uses = job.findall('.uses', namespace)
            inputs = []
            outputs = []
            dataSize = 0

            for use in job.findall('.//dax:uses', namespace):
                file = use.attrib['file']
                link = use.attrib['link']
                data = int(use.attrib['size'])
                if link == 'input':
                    inputs.append((file, data))
                    
                elif link == 'output':
                    outputs.append((file, data))


            inData[job_id] = inputs
            outData[job_id] = outputs

            #!__________________________________________________
            tasks[job_id].dataSize = float(job.attrib['runtime'])
        
        for dependencies in root.findall('.//dax:child', namespace):
            childId = extractNumberFromString(dependencies.attrib['ref'])
            for parents in dependencies.findall('.//dax:parent', namespace):
                parentId = extractNumberFromString(parents.attrib['ref'])
                tasks[childId].sources.append(parentId)
                tasks[parentId].destinations.append(childId)

                transferData = 0
                #______________________________________________!
                '''for dataOut in outData[parentId]:
                    for dataIn in inData[childId]:
                        if dataOut[0] == dataIn[0]:
                            transferData += dataOut[1]'''

                
                DAG[parentId][childId] = outData[parentId][0][1]

        for i in range(self.numberOfTasks):
            self.tasks[i] = tasks[i]
        return DAG
    def getDegree(self):
        inDegree = [0] * self.numberOfTasks
        zeroInDegree = []
        for i in range(0, self.numberOfTasks):
            for j in range(0, self.numberOfTasks):
                if self.DAG[i][j] != 0:
                    inDegree[j] += 1
        for i in range(0, self.numberOfTasks):
            if inDegree[i] == 0:
                zeroInDegree.append(i)
        return (inDegree, zeroInDegree)

In [38]:
numberOfProcessors = 18
listProcessors = [
    processor(1.70, 39231600, 0.06),
    processor(2, 40000000, 0),
    processor(2, 40000000, 0),
    processor(2, 40000000, 0),
    processor(3, 60000000, 0),
    processor(3, 60000000, 0),
    processor(3, 60000000, 0),
    processor(3, 60000000, 0),
    processor(3, 60000000, 0),
    processor(3.75, 85196800, 0.113),
    processor(3.75, 85196800, 0.12),
    processor(5, 80000000, 0),
    processor(5, 80000000, 0),
    processor(7.50, 85196800, 0.225),
    processor(7.50, 85196800, 0.24),
    processor(15.00, 131072000, 0.45),
    processor(15.00, 131072000, 0.48),
    processor(30.00, 131072000, 0.9),
]

In [39]:
def isBetter(a, b):
    if a.completionTime == b.completionTime and a.cost == b.cost:
        return 2
    if a.completionTime <= deadline and b.completionTime <= deadline:
        return a.cost < b.cost
    if a.completionTime <= deadline and b.completionTime > deadline:
        return 1
    if a.completionTime > deadline and b.completionTime > deadline:
        return a.completionTime < b.completionTime
    if a.completionTime > deadline and b.completionTime <= deadline:
        return 0

In [40]:
import copy

def localSearch(chromosome):
    find = True
    Neighbors = []
    while find == True:
        find = False
        FT = [0] * numberOfProcessors
        FTofTasks = [0] * workflow.numberOfTasks
        process = [0] * len(chromosome.tasks)
        cost = 0
        for i in range(len(chromosome.tasks)):      
            process[chromosome.tasks[i]] = chromosome.processors[i]
        for i in range(len(chromosome.processors)):
            prs = chromosome.processors[i]
            task = chromosome.tasks[i]

            newChromo = copy.deepcopy(chromosome)
            for j in range(numberOfProcessors):
                newChromo.processors[i] = j
                newChromo.cal_fitness()
                if isBetter(newChromo, chromosome):
                    Neighbors.append(newChromo)
        
        #Find best neighbors
        for i in range(len(Neighbors)):
            if isBetter(Neighbors[i], chromosome):
                find = True
                chromosome = Neighbors[i]
                
        chromosome.isHillClimbing = True
        return chromosome

In [90]:
class Chromosome:
    def __init__(self, processors, tasks):
        self.processors = processors
        self.tasks = tasks
        self.completionTime = 0
        self.cost = 0.0
        self.isHillClimbing = False
        self.cal_fitness()

    @staticmethod
    def crossover(chromosome1, chromosome2, crossover_rate):
    
        numTasks = len(chromosome1.tasks)
        Child1 = Chromosome([-1] * numTasks, [-1] * numTasks)
        Child2 = Chromosome([-1] * numTasks, [-1] * numTasks)

        if random.random() > crossover_rate:
            return None

        #get position to crossover
        pos = random.randint(1, numTasks)
        dxC1 = [False] * (numTasks + 1)
        dxC2 = [False] * (numTasks + 1)
        #take left part from parents
        for i in range(pos):
            Child1.tasks[i] = chromosome1.tasks[i]
            Child1.processors[i] = chromosome1.processors[i]
            dxC1[chromosome1.tasks[i]] = True
            Child2.tasks[i] = chromosome2.tasks[i]
            Child2.processors[i] = chromosome2.processors[i]
            dxC2[chromosome2.tasks[i]] = True

        #take right part from parents
        ps = pos
        for i in range(numTasks):
            if dxC1[chromosome2.tasks[i]] == False:
                Child1.processors[ps] = chromosome2.processors[i]
                Child1.tasks[ps] = chromosome2.tasks[i]
                dxC1[chromosome2.tasks[i]] = True
                ps += 1
        ps = pos
        for i in range(numTasks):
            if dxC2[chromosome1.tasks[i]] == False:
                Child2.processors[ps] = chromosome1.processors[i]
                Child2.tasks[ps] = chromosome1.tasks[i]
                dxC2[chromosome1.tasks[i]] = True
                ps += 1
        
        return Child1, Child2
    
    def mutate(self, mutation_rate):
        mutated_processors = self.processors[:]  # Create a copy of distributing
        
        # Choose a random position to mutate
        mutate_position = random.randint(0, len(mutated_processors) - 1)
        
        # Mutate the value at the chosen position based on the mutation rate
        if random.random() < mutation_rate:
            mutated_processors[mutate_position] = random.randint(0, numberOfProcessors - 1) 
        
        # Return a new Chromosome object with the mutated distributing and the original scheduling_parts
        return Chromosome(mutated_processors, self.tasks)

    def cal_fitness(self):
        FT = [0] * numberOfProcessors
        FTofTasks = [0] * numberOfTasks
        processor = [0] * numberOfTasks
        
        completionTime = 0.0
        cost = 0
        for i in range(len(self.tasks)):
            prc = self.processors[i]
            task = self.tasks[i]
            processor[task] = prc

            DAT_processor = FT[prc]
            for j in range(len(workflow.tasks[task].sources)):
                parent = workflow.tasks[task].sources[j]
                #fixxxx
                #DAT_processor = max(DAT_processor, FTofTasks[parent] + (DAG[parent][task] / min(listProcessors[prc].bw, listProcessors[processor[parent]].bw) if processor[task] != processor[parent] else 0))
                DAT_processor = max(DAT_processor, FTofTasks[parent] + (DAG[parent][task] / (20 * 1024*1024) ) if processor[task] != processor[parent] else 0)
            FT[prc] = DAT_processor + computationTime[task][prc]
            cost += computationCost[task][prc]
            FTofTasks[task] = FT[prc]
            completionTime = max(completionTime, FT[prc])

        self.cost = cost
        self.completionTime = completionTime
        

In [73]:
def tournamentSelection(tournament):
    winner = Chromosome([0] * workflow.numberOfTasks, [0] * workflow.numberOfProcessors)
    for i in range(len(tournament)):
        if isBetter(tournament[i], winner):
            winner = tournament[i]
    return winner

In [74]:
def selectBest(population, size, percentage):
    size = int(size * percentage)
    sortarr = quicksort(population, isBetter)
    newGen = []
    for i in range(size):
        newGen.append(sortarr[i])
    return newGen

In [75]:
def chooseProcessor(typed, task):
    if typed == 0:
        return random.randint(0, numberOfProcessors - 1)
    else:
        total_time = sum(computationTime[task])
        if total_time == 0:
            total_time = 0.0000001
        positions = []
        for i, time in enumerate(computationTime[task]):
            # Tính xác suất của mỗi vị trí dựa trên thời gian tính toán
            probability = 1 - (time / total_time)
            positions.extend([i] * int(probability * numberOfTasks))  # Nhân với 100 để tăng độ chính xác
        # Chọn ngẫu nhiên một vị trí từ danh sách
        random_position = random.choice(positions)
        return random_position

def createRandomChromosome(inDegree, zeroInDegree, typed):
    inDegree_copy = inDegree[:]
    zeroInDegree_copy = zeroInDegree[:]
    chromosome = Chromosome([0] * numberOfTasks, [0] * numberOfTasks)
    i = 0
    while(len(zeroInDegree_copy) > 0):
        index_task = random.randint(0, len(zeroInDegree_copy) - 1)
        task = zeroInDegree_copy[index_task]
        processor = chooseProcessor(typed, task)
        chromosome.tasks[i] = task
        chromosome.processors[i] = processor
        zeroInDegree_copy.pop(index_task)
        for j in range(len(workflow.tasks[task].destinations)):
            adj = workflow.tasks[task].destinations[j]
            inDegree_copy[adj] -= 1
            if inDegree_copy[adj] == 0:
                zeroInDegree_copy.append(adj)
        i += 1

    #multi thread
    chromosome.cal_fitness()
    return chromosome

In [76]:
def generateRandomPopulation(size, percentage, inDegree, zeroInDegree):
    randomPopulation = []
            
    #generate Ramdom Select population
    size = int(size * percentage)
    percentageOfRandom = 0.50
    percentageOfRandomBasedOnComputeTime = 0.25
    randomType = 0
    basedOnComputeTime = 1
    sizeR = int(percentageOfRandom * size)
    sizeB = int(percentageOfRandomBasedOnComputeTime * size)
    for i in range(sizeR):
        randomPopulation.append(createRandomChromosome(inDegree, zeroInDegree, randomType))
    for i in range(sizeB):
        randomPopulation.append(createRandomChromosome(inDegree, zeroInDegree, basedOnComputeTime))
    return randomPopulation

In [77]:
def createHeuristicChromosome(inDegree, zeroInDegree):
    inDegree_copy = inDegree[:]
    zeroInDegree_copy = zeroInDegree[:]
    chromosome = Chromosome([0] * numberOfTasks, [0] * numberOfTasks)
    FT = [0] * numberOfProcessors
    process = [0] * numberOfTasks
    FTofTasks = [0] * numberOfTasks
    i = 0
    while(len(zeroInDegree_copy) > 0):
        index_task = random.randint(0, len(zeroInDegree_copy) - 1)
        task = zeroInDegree_copy[index_task]

        #choose processor
        minFT = min(FT)
        FT = [x - minFT for x in FT]
        FT_thu = {}
        for j in range(len(FT)):
            if FT[j] == 0:
                FT_thu[j] = 0
                for k in range(len(workflow.tasks[task].sources)):
                    parentTask = workflow.tasks[task].sources[k]
                    FT_thu[j] = max(FT_thu[j], max(FT[j], FTofTasks[parentTask] + (DAG[parentTask][task] / (20 * 1024*1024) if process[parentTask] != j else 0)))
                FT_thu[j] += computationTime[task][j]
        total_value = sum(FT_thu.values())
        # Tính tỷ lệ xác suất cho mỗi phần tử
        probabilities = [value / total_value for value in FT_thu.values()]
        # Chọn ngẫu nhiên một phần tử dựa trên xác suất
        chosen = random.choices(list(FT_thu.keys()), weights=probabilities, k=1)[0]
        
        chromosome.tasks[i] = task
        chromosome.processors[i] = chosen
        process[task] = chosen
        FT[chosen] = FT_thu[chosen]
        FTofTasks[task] = FT_thu[chosen]

        zeroInDegree_copy.pop(index_task)
        for j in range(len(workflow.tasks[task].destinations)):
            adj = workflow.tasks[task].destinations[j]
            inDegree_copy[adj] -= 1
            if inDegree_copy[adj] == 0:
                zeroInDegree_copy.append(adj)
        i += 1

    #multi thread
    chromosome.cal_fitness()
    return chromosome

In [78]:
def generateHeuristicPopulation(size, percentage, inDegree, zeroInDegree):
    heuristicPopulation = []
    size = int(percentage * size)
    for i in range(size):
        chro = createHeuristicChromosome(inDegree, zeroInDegree)
        #print(chro.processors, chro.tasks, "cc")
        heuristicPopulation.append(chro)
    return heuristicPopulation

In [79]:
class GA:
    def __init__(self, workFlow, populationSize, perRandom, perHeuristic, perElitics, perHC, penalty, p_c, p_m, maxGeneration, threshHold):
        self.populationSize = populationSize
        self.worfFlow = workFlow
        self.penalty = penalty
        self.p_c = p_c
        self.p_m = p_m
        self.maxGeneration = maxGeneration
        self.threshHold = threshHold
        self.perRandom = perRandom
        self.perHeuristic = perHeuristic
        self.perElitics = perElitics
        self.perHC = perHC
        self.bestGene = 0

        self.population = generateRandomPopulation(populationSize, perRandom, workFlow.inDegree, workFlow.zeroInDegree) + generateHeuristicPopulation(populationSize, perHeuristic, workFlow.inDegree, workFlow.zeroInDegree) 
    
    
    def find_solution(self):
        best = Chromosome([0] * numberOfTasks, [0] * numberOfTasks)
        best.completionTime = 1e9
        best.cost = 1e9
        notImprove = 0
        for i in range(self.maxGeneration):
            '''print(len(self.population))
            for j in range(len(self.population)):
                print(self.population[j].completionTime, self.population[j].cost)
            print("_______________________________________")'''
            nextGen = selectBest(self.population, self.populationSize, self.perElitics) #get parent population

            '''print(len(nextGen))
            for j in range(len(nextGen)):
                print(nextGen[j].completionTime, nextGen[j].cost)
            print("______________nextTo_________________________")'''
            
            
            while (len(nextGen) < self.populationSize):
                parents = random.sample(self.population, 2)
                child = Chromosome.crossover(parents[0], parents[1], p_c)
                if child == None:
                    continue
                child0 = child[0].mutate(p_m)
                child1 = child[1].mutate(p_m)

                #multi thread
                child0.cal_fitness()
                child1.cal_fitness()
                nextGen.append(child0)
                nextGen.append(child1)

            nextGen = quicksort(nextGen, isBetter)
            total_elements = len(nextGen)
            num_elements_to_select = int(self.perHC * total_elements)
            # Tạo danh sách các chỉ mục có thể được chọn, với xác suất cao hơn ở đầu
            indices_with_bias = list(range(num_elements_to_select))
            ss = int(perHC * populationSize)
            for _ in range(ss):
                # Lấy ngẫu nhiên chỉ mục từ danh sách với xác suất cao hơn ở đầu
                selected_index = random.choices(indices_with_bias, weights=[2 ** i for i in range(num_elements_to_select)], k=1)[0]
                # Lấy phần tử tương ứng với chỉ mục đã chọn
                if nextGen[selected_index].isHillClimbing:
                    continue
                else:
                    nextGen[selected_index] = localSearch(nextGen[selected_index])
            nextGen = quicksort(nextGen, isBetter)
            
            if isBetter(nextGen[0], best) == 1:
                self.bestGene = i
                notImprove = 0
                best = nextGen[0]
                self.population = nextGen
                continue
            else:
                notImprove += 1
                if notImprove >= self.threshHold:
                    return best
        return best

In [80]:
filepath = "D:/WorkFlowTest/dax/CyberShake_30.xml"

In [94]:
workflow = workFlow(filepath, 23.524546274509802)

computationTime = []
computationCost = []
for i in range(workflow.numberOfTasks):
    computationTime.append([]) 
    computationCost.append([])

for i in range(workflow.numberOfTasks):
    for j in range(numberOfProcessors):
        computationTime[i].append(workflow.tasks[i].dataSize / (listProcessors[j].cu))
        computationCost[i].append(computationTime[i][j] * listProcessors[j].cost)

In [92]:
numberOfTasks = workflow.numberOfTasks
DAG = workflow.DAG
deadline = workflow.deadline

populationSize = 100
perRandom = 0.5
perHeuristic = 0.5
perElitics = 0.4
perHC = 0.2
penalty = 1
p_c = 0.95
p_m = 0.02
maxGeneration = 500
threshHold = 50

In [93]:
import random
import time

for i in range(30):
    current_time = int(time.time())
    random.seed(current_time)

    start_time = time.time()
    ga = GA(workflow, populationSize, perRandom, perHeuristic, perElitics, perHC, penalty, p_c, p_m, maxGeneration, threshHold)
    sol = ga.find_solution()
    end_time = time.time()
    elapsed_time = end_time - start_time
    
    print("makespan: ", sol.completionTime, "cost: ", sol.cost, "generation: ", ga.bestGene, "runtime: ", elapsed_time)

makespan:  39.875153575261436 cost:  13.630375058823526 generation:  94 runtime:  6.65248441696167
makespan:  40.34449720827738 cost:  10.642999999999999 generation:  90 runtime:  6.387208700180054
makespan:  40.427005292256666 cost:  12.289043999999999 generation:  82 runtime:  6.233654499053955
makespan:  40.39768613243103 cost:  14.70397168627451 generation:  34 runtime:  3.730250835418701
makespan:  40.03352754974365 cost:  12.296373333333333 generation:  92 runtime:  6.431041479110718


KeyboardInterrupt: 