In [20]:
from collections import deque
import numpy as np
from myheapq import _heapify_max, _heappop_max, _siftdown_max

INFINITY = np.iinfo(np.int32).max

In [21]:
def heap_push(max_heap, item):
    max_heap.append(item)
    _siftdown_max(max_heap, 0, len(max_heap) - 1)

def heap_pop(arr):
    return _heappop_max(arr)  # Converts array to max_heap

In [22]:
class Processor:
    def __init__(self):
        self.task_allocation_queue = deque()  # <rank_u, criticality, task number, function number, AST, AFT, processor>
        self.avail = 0


In [24]:
class Function:
    def __init__(self, processors: list, n, criticality, communication_cost, computation_time, edges, arrival_time, deadline, lower_bound):
        self.processors = processors
        self.communication_cost = communication_cost
        self.computation_cost = computation_time
        self.edges = edges
        self.task_priority_queue = []  # <rank_u, criticality, task number, function number, AST, AFT, processor>
        self.criticality = criticality
        self.upward_rank = np.zeros(n)
        self.arrival_time = arrival_time
        self.deadline = deadline
        self.lower_bound = lower_bound
        self.task_lower_bound = [(0, 0) for _ in range(n)]
        self.AFT = np.zeros(n)

    def get_EFT(self, task, k):
        return self.get_EST(task, k) + self.computation_cost[task][k]

    def get_EST(self, task, k):
        return max(self.processors[k].avail, self.find_max_AFT_plus_comcost(task))

    def find_max_AFT_plus_comcost(self, task):
        candidate_max = 0
        for pred_task in self.get_predecessor(task):
            if candidate_max < self.AFT[pred_task] + self.communication_cost[pred_task][task]:
                candidate_max = self.AFT[pred_task] + self.communication_cost[pred_task][task]
        return candidate_max

    def get_abs_deadline(self):
        return self.arrival_time + self.deadline

    def get_task_abs_deadline(self, task):
        return self.arrival_time + self.calculate_lower_bound(task)[0] + self.get_deadline_slack()

    def get_deadline_slack(self):
        return self.deadline - self.lower_bound

    def calculate_lower_bound(self, task):
        if self.task_lower_bound[task][0] != 0:
            return self.task_lower_bound[task]
        if task == 0:
            self.task_lower_bound[task] = (np.min(self.computation_cost[task]), np.argmin(self.computation_cost[task]))
            return self.task_lower_bound[task]
        previous_task_lower_bound = self.calculate_lower_bound(task - 1)
        min_next_task = min(self.computation_cost[task][previous_task_lower_bound[1]], self.communication_cost[task - 1][task] + np.min(self.computation_cost[task]))
        if min_next_task == self.computation_cost[task][previous_task_lower_bound[1]]:
            min_processor = previous_task_lower_bound[1]
        else:
            min_processor = np.argmin(self.computation_cost[task])
        self.task_lower_bound[task] = (previous_task_lower_bound[0] + min_next_task, min_processor)
        return self.task_lower_bound[task]

    def get_successor(self, task):
        return list(np.where(self.edges[task] == 1)[0])

    def get_predecessor(self, task):
        return list(np.where(np.transpose(self.edges)[task] == 1)[0])

    def get_average_computation_time(self, task):
        avg = 0
        count = 0
        for wcet in self.computation_cost[task]:
            if wcet != INFINITY:
                avg = (avg * count + wcet) / (count + 1)
                count += 1
        return avg

    def calculate_rank(self, task):
        if self.upward_rank[task] != 0:
            return self.upward_rank[task]

        _max = self.find_max_successor_computation_time(task)

        self.upward_rank[task] = self.get_average_computation_time(task) + _max
        return self.upward_rank[task]

    def find_max_successor_computation_time(self, task):
        _max = 0
        for successor_task in self.get_successor(task):
            candidate_max = self.communication_cost[task][successor_task] + self.calculate_rank(successor_task)
            if candidate_max > _max:
                _max = candidate_max
        return _max

    def add_to_priority_queue(self, function_num):
        for task_num in range(len(self.upward_rank)):
            heap_push(self.task_priority_queue, (self.upward_rank[task_num], self.criticality, task_num, function_num, None, None, None))
            # self.task_priority_queue.put((self.upward_rank[task_num], self.criticality, task_num, function_num))

    @staticmethod
    def get_n_max(criticality_slack):
        return criticality_slack + 1

In [25]:
class MS:
    def __init__(self, functions: list, criticality):
        self.functions = functions
        self.common_ready_queue = []  # <rank_u, criticality, task number, function number, AST, AFT, processor>
        self.criticality = criticality

        function_num = 0
        for function in functions:
            self.add_function(function, function_num)
            function_num += 1

    def add_function(self, function, function_num):
        function.calculate_rank(0)
        function.add_to_priority_queue(function_num)
        function.calculate_lower_bound(len(function.upward_rank) - 1)
        self.functions.append(function)

    def is_fd_empty(self, fd):
        for common_ready_record in self.common_ready_queue:
            if common_ready_record[3] == fd:
                return False
        return len(self.functions[fd].task_priority_queue) == 0



In [26]:
processor_1 = Processor()
processor_2 = Processor()
processor_3 = Processor()
processors_list = [processor_1, processor_2, processor_3]

In [45]:
communication_cost_1 = np.array([[0, 11, 8],
                                [11, 0, 6],
                                [8, 6, 0]])
computation_cost_1 = np.array([[INFINITY, 6, 11],
                              [17, 12, 6],
                              [13, 9, 10]])
edges_1 = np.array([[0, 1, 1],
                    [0, 0, 1],
                    [0, 0, 0]])

function_1 = Function(processors_list, n=3, criticality=3, communication_cost=communication_cost_1,
                      computation_time=computation_cost_1, edges=edges_1, arrival_time=0, lower_bound=27, deadline=34)


In [46]:
communication_cost_2 = np.array([[0, 14, INFINITY, 8],
                                [14, 0, 19, 6],
                                [INFINITY, 19, 0, 5],
                                [8, 6, 5, 0]])
computation_cost_2 = np.array([[14, 15, 8],
                              [16, 7, 15],
                              [7, 7, 5],
                               [6, 11, 13]])
edges_2 = np.array([[0, 1, 0, 1],
                    [0, 0, 1, 1],
                    [0, 0, 0, 1],
                    [0, 0, 0, 0]])

function_2 = Function(processors_list, n=4, criticality=0, communication_cost=communication_cost_2,
                      computation_time=computation_cost_2, edges=edges_2, arrival_time=0, lower_bound=39, deadline=46)


In [47]:
communication_cost_3 = np.array([[0, 3, 7],
                                [3, 0, 2],
                                [7, 2, 0]])
computation_cost_3 = np.array([[12, 18, 9],
                              [13, 10, INFINITY],
                              [INFINITY, 18, 7]])
edges_3 = np.array([[0, 1, 1],
                    [0, 0, 1],
                    [0, 0, 0]])

function_3 = Function(processors_list, n=3, criticality=1, communication_cost=communication_cost_3,
                      computation_time=computation_cost_3, edges=edges_3, arrival_time=10, lower_bound=31, deadline=46)


In [48]:
communication_cost_4 = np.array([[0, 2, 11],
                                [2, 0, 2],
                                [11, 2, 0]])
computation_cost_4 = np.array([[8, 13, 18],
                              [20, 7, 11],
                              [20, 15, 8]])
edges_4 = np.array([[0, 1, 1],
                    [0, 0, 1],
                    [0, 0, 0]])

function_4 = Function(processors_list, n=3, criticality=2, communication_cost=communication_cost_4,
                      computation_time=computation_cost_4, edges=edges_4, arrival_time=20, lower_bound=27, deadline=34)

In [49]:
functions_list = [function_1, function_2, function_3, function_4]
# new_functions_list = [function_3]
# new_new_functions_list = [function_4]
ms = MS(functions=[], criticality=0)

In [38]:
functions_list[0].task_priority_queue

[]

In [43]:
ms.add_function(function_1, 0)
ms.add_function(function_4, 3)

print(function_1.task_priority_queue)
print(function_4.upward_rank)


# print(function_1.communication_cost[0][1])
# print(function_1.task_lower_bound)
# print(function_2.task_lower_bound)
# print(function_2.get_task_abs_deadline(0))
# print(function_1.get_predecessor(2))
# np.min(function_1.computation_cost[1])
# while function_1.task_priority_queue:
#     print(heap_pop(function_1.task_priority_queue))

[(47.83333333333333, 3, 0, 0, None, None, None), (47.83333333333333, 3, 0, 0, None, None, None), (47.83333333333333, 3, 0, 0, None, None, None), (28.33333333333333, 3, 1, 0, None, None, None), (28.33333333333333, 3, 1, 0, None, None, None), (10.666666666666666, 3, 2, 0, None, None, None), (10.666666666666666, 3, 2, 0, None, None, None), (28.33333333333333, 3, 1, 0, None, None, None), (10.666666666666666, 3, 2, 0, None, None, None)]
[44.         29.         14.33333333]


In [63]:
def task_priority_queues_is_empty(task_priority_queues: list):
    for task_priority_queue in task_priority_queues:
        if len(task_priority_queue) != 0:
            return False
    return True

In [22]:
# task_priority_queues = [function_obj.task_priority_queue for function_obj in functions_list]
# task_priority_queues

[[(47.83333333333333, 3, 0, 0, None, None, None),
  (28.33333333333333, 3, 1, 0, None, None, None),
  (10.666666666666666, 3, 2, 0, None, None, None)],
 [(42.0, 1, 0, 1, None, None, None),
  (26.0, 1, 1, 1, None, None, None),
  (12.5, 1, 2, 1, None, None, None)]]

In [23]:
def remove_back_to_task_priority_queues(task_set: list):
    while len(task_set) != 0:
        popped_record = task_set.pop()
        function_num = popped_record[3]
        heap_push(functions_list[function_num].task_priority_queue, popped_record)


In [34]:
f_d = -1
previous_round = []
current_round = []
system_time = 0
step = 5

In [35]:
def find_new_arrived_functions(functions_list):
    global system_time, step
    new_arrived_functions = []
    for function_record in functions_list:
        if 0 <= system_time - function_record.arrival_time < step:
            new_arrived_functions.append(function_record)
    return new_arrived_functions

In [36]:
def has_lower_criticality_tasks(max_criticality, processor_list: list):
    for pr in processor_list:
        pr: Processor
        for item in pr.task_allocation_queue:
            if item[1] < max_criticality:
                return True
    return False

In [56]:
while True:
    print(f'Time is {system_time}')
    previous_round = current_round.copy()
    current_round = []

################ fill common ready queue ##################

    for function_obj in ms.functions:
        if function_obj.criticality < ms.criticality:
            continue
        criticality_slack = function_obj.criticality - ms.criticality
        n_max = Function.get_n_max(criticality_slack)
        cnt = n_max
        while cnt != 0 and len(function_obj.task_priority_queue) != 0:
            cnt -= 1
            tuple_record = heap_pop(function_obj.task_priority_queue)
            print(f'pop task priority queue: {tuple_record}')
            heap_push(ms.common_ready_queue, tuple_record)
            print(f'ms.common_ready_queue: {ms.common_ready_queue}')

############# handle new function ###############

    new_arrived_function = find_new_arrived_functions(functions_list)
    max_criticality_in_new_arrived_function = max([func.criticality for func in new_arrived_function])
    for function_obj in new_arrived_function:
        print(f"Added new function with computation_cost {function_obj.computation_cost}")
        ms.add_function(function_obj, functions_list.index(function_obj))

############# remove lower critical tasks from task_allocation_queue and common ready queue #############

    if len(new_arrived_function) != 0:
        if has_lower_criticality_tasks(max_criticality_in_new_arrived_function, processors_list):
            for pr in processors_list:
                pr: Processor
                print(f"processor task_allocation_queue is:\n"
                      f"{pr.task_allocation_queue}")
                candidate_items = [task for task in pr.task_allocation_queue if task[1] < max_criticality_in_new_arrived_function]
                print(f"candidate items from task_allocation_queue of processor {processors_list.index(pr)} to be removed is:\n"
                      f"{candidate_items}")
                for item in candidate_items:
                    pr.task_allocation_queue.remove(item)
                    item[4], item[5], item[6] = None, None, None
                    ms.functions[item[3]].AFT[item[2]] = None
                    heap_push(ms.functions[item[3]].task_priority_queue, item)
                print(f"final processor task_allocation_queue is:\n"
                      f"{pr.task_allocation_queue}")
            while len(ms.common_ready_queue) != 0:
                common_ready_queue_record = heap_pop(ms.common_ready_queue)
                print(f"pop common_ready_queue record {common_ready_queue_record} in handle new arrived")
                heap_push(ms.functions[common_ready_queue_record[3]].task_priority_queue, common_ready_queue_record)

############# handle common ready queue ##############

    while len(ms.common_ready_queue) != 0:
        tuple_record = heap_pop(ms.common_ready_queue)
        task_num, function_num = tuple_record[2], tuple_record[3]
        min_EFT, suitable_processor = (min([ms.functions[function_num].get_EFT(task_num, k) for k in range(len(processors_list))]),
                                       np.argmin([ms.functions[function_num].get_EFT(task_num, k) for k in range(len(processors_list))]))
        print(f'trying to assign common_ready_queue record {tuple_record} to processor {suitable_processor}'
              f'function_number {function_num}, task_number {task_num}, min_EFT {min_EFT}, suitable_processor {suitable_processor}')
        if min_EFT > ms.functions[function_num].get_task_abs_deadline(task_num) and ms.functions[function_num].criticality > ms.criticality:
            ms.criticality = ms.functions[function_num].criticality
            f_d = function_num
            print(f'function that increases system criticality: {f_d}')

############ remove from common ready queue and task allocation queue in current and previous round ##############

            while len(ms.common_ready_queue) != 0:
                common_ready_queue_record = heap_pop(ms.common_ready_queue)
                print(f"pop common_ready_queue record {common_ready_queue_record} in deadline miss event")
                heap_push(ms.functions[common_ready_queue_record[3]].task_priority_queue, common_ready_queue_record)
            for round_record in previous_round + current_round:
                if system_time < round_record[4]:
                    print(f"pop {round_record} from task_allocation_queue of {round_record[6]} in deadline miss event")
                    processors_list[round_record[6]].task_allocation_queue.remove(round_record)
                    ms.functions[round_record[3]].AFT[round_record[2]] = None
                    round_record[4], round_record[5], round_record[6] = None, None, None
                    heap_push(ms.functions[round_record[3]].task_priority_queue, round_record)
                    print(f"push {round_record} into task_priority_queue of {round_record[3]} in deadline miss event")
            print(f"failed to assign common_ready_queue record {tuple_record} to processor {suitable_processor}")

############ assign task to processor with min EFT ###############

        else:
            tuple_record[6] = suitable_processor # processor
            tuple_record[5] = min_EFT  # AFT
            tuple_record[4] = min_EFT - ms.functions[function_num].computation_cost[task_num][suitable_processor] # AST
            processors_list[suitable_processor].task_allocation_queue.append(tuple_record)
            ms.functions[tuple_record[3]].AFT[tuple_record[2]] = min_EFT
            current_round.append(tuple_record)

            print(f"successfully assign common_ready_queue record {tuple_record} to processor {suitable_processor}")
            if ms.is_fd_empty(f_d):
                print(f"function {f_d} which cause system criticality to rise is finished")
                f_d = -1
                ms.criticality = 0

        # if functions_list[function_num].AFT[task_num] > functions_list[function_num].get_task_abs_deadline(task_num) and functions_list[function_num].criticality > ms.criticality:
        #     f_d = function_num
        #     ms.criticality = functions_list[function_num].criticality
        #     canceled_task_set = []
        #     while len(ms.common_ready_queue) != 0:
        #         record = heap_pop(ms.common_ready_queue)
        #         canceled_task_set.append(record)
        #         print(f'pop from common ready queue: {record}')
        #
        #     for processor in processors_list:
        #         while len(processor.task_allocation_queue) != 0:
        #             record = heap_pop(processor.task_allocation_queue)
        #             canceled_task_set.append(record)
        #             print(f'pop from task allocation queue: {record}')
        #
        #     remove_back_to_task_priority_queues(canceled_task_set)

        # TODO: second if

    system_time += step


Time is 0
pop task priority queue: (47.83333333333333, 3, 0, 0, None, None, None)
ms.common_ready_queue: [(47.83333333333333, 3, 0, 0, None, None, None)]
pop task priority queue: (28.33333333333333, 3, 1, 0, None, None, None)
ms.common_ready_queue: [(47.83333333333333, 3, 0, 0, None, None, None), (28.33333333333333, 3, 1, 0, None, None, None)]
pop task priority queue: (10.666666666666666, 3, 2, 0, None, None, None)
ms.common_ready_queue: [(47.83333333333333, 3, 0, 0, None, None, None), (28.33333333333333, 3, 1, 0, None, None, None), (10.666666666666666, 3, 2, 0, None, None, None)]
Time is 5
Time is 10


KeyboardInterrupt: 