In [41]:
import json
import random
import matplotlib.pyplot as plt
import numpy as np
from itertools import chain, repeat
import task_generation.taskset_generation as tg
import uuid
from enum import Enum

In [83]:
class PTask:
    def __init__(self, period, execution_time):
        self.period = period
        self.execution_time = execution_time

    @property
    def utilization(self):
        return self.execution_time / self.period

    def __str__(self):
        return f"({self.period}, {self.execution_time})"

    def __repr__(self):
        return f"({self.period}, {self.execution_time})"


class Task:
    def __init__(self, arrival_time, deadline, execution_time):
        self.id = uuid.uuid4()
        self.arrival_time = arrival_time
        self.deadline = deadline
        self.execution_time = execution_time
        self.start_time = 0
        self.finish_time = 0
        self.waiting_time = 0
        self.response_time = 0
        self.slack_time = 0

    @property
    def utilization(self):
        return self.execution_time / (self.deadline - self.arrival_time)

    def __str__(self):
        return f"({self.arrival_time}, {self.deadline}, {self.execution_time})"

    def __repr__(self):
        return f"({self.id}, {self.arrival_time}, {self.deadline}, {self.execution_time})"

In [0]:
class Scheduler:
    def __init__(self, num_tasks, mutation_rate, crossover_rate, max_iter):
        self.num_tasks = num_tasks
        self.mutation_rate = mutation_rate
        self.population = []
        self.best_fitness = 0
        self.best_chromosome = []

    def initialize(self):
        # code to initialize population with random chromosomes
        pass

    def crossover(self):
        # code to perform crossover between parent chromosomes
        pass

    def mutate(self):
        # code to perform mutation on child chromosomes
        pass

    def fitness(self):
        # code to calculate fitness of each chromosome in the population
        pass

    def selection(self):
        # code to perform selection of fittest chromosomes for next generation
        pass

    def evolve(self, num_generations):
        # code to run GA algorithm for given number of generations
        pass

    def run(self, tasks):
        # code to run the scheduler for given tasks using GA algorithm
        pass


class GA_Scheduler(Scheduler):
    """
    A class representing the GA algorithm for task scheduling.
    """

    def __init__(self, num_tasks, mutation_rate, crossover_rate, max_iter):

        self.pop_size = num_tasks
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        self.max_iter = max_iter
        super().__init__(num_tasks, mutation_rate, crossover_rate, max_iter)

    def initialize_population(self, num_tasks):
        """
        Initializes the population with random task orders.

        Args:
            num_tasks (int): Number of tasks.

        Returns:
            list: A list of task orders (chromosomes).
        """
        population = []
        for i in range(self.pop_size):
            chromosome = [j for j in range(num_tasks)]
            random.shuffle(chromosome)
            population.append(chromosome)
        return population

    def calculate_fitness(self, population, tasks):
        """
        Calculates the fitness of each chromosome in the population.

        Args:
            population (list): A list of task orders (chromosomes).
            tasks (list): A list of Task objects.

        Returns:
            list: A list of tuples containing the chromosome and its fitness.
        """
        fitness_scores = []
        for chromosome in population:
            full_time, final_time, wait_time, resp_time, slack_time = self.evaluate(chromosome, tasks)
            fitness = 1 / (1 + slack_time)
            fitness_scores.append((chromosome, fitness))
        return fitness_scores

    def evaluate(self, chromosome, tasks):
        """
        Evaluates a chromosome by simulating the execution of the tasks.

        Args:
            chromosome (list): A list representing the order of tasks to be executed.
            tasks (list): A list of Task objects.

        Returns:
            tuple: A tuple containing the full time, final time, wait time, response time, and slack time.
        """
        # TODO: Implement the task execution simulation.
        pass

    def selection(self, fitness_scores):
        """
        Selects two parent chromosomes using tournament selection.

        Args:
            fitness_scores (list): A list of tuples containing the chromosome and its fitness.

        Returns:
            tuple: A tuple containing the two parent chromosomes.
        """
        # TODO: Implement tournament selection.
        pass

    def crossover(self, parent1, parent2):
        """
        Performs crossover between two parent chromosomes.

        Args:
            parent1 (list): The first parent chromosome.
            parent2 (list): The second parent chromosome.

        Returns:
            list: The child chromosome resulting from crossover.
        """
        # TODO: Implement crossover.
        pass

    def mutate(self, chromosome):
        """
        Mutates a chromosome.

        Args:
            chromosome (list): The chromosome to mutate.

        Returns:
            list: The mutated chromosome.
        """
        # TODO: Implement mutation.
        pass

    def evolve(self, population, fitness_scores):
        """
        Evolves the population by selecting parents, performing crossover and mutation, and generating a new population.

        Args:
            population (list): A list of task orders (chromosomes).
            fitness_scores (list): A list of tuples containing the chromosome and its fitness.

        Returns:
            list: A new list of task orders (chromosomes).
        """
        # TODO: Implement evolution.
        pass

    def run(self, tasks):
        """
        Runs the GA
        """
        pass

In [15]:
def generate_tasks(num_tasks):
    # code to generate random tasks with given number of tasks
    pass


def read_tasks_from_file(file_path):
    # code to read tasks from file and create Task objects
    pass


def write_tasks_to_file(tasks):
    # code to write tasks to JSON file
    pass


def write_results_to_file(results):
    # code to write results to JSON file
    pass


def plot_results(num_tasks_list, ga_results, improved_ga_results):
    # code to plot bar chart of average times for different number of tasks for both algorithms
    pass

In [98]:
def generate_ptasks(num_tasks=100, utilization=.5, available_periods=(1, 2, 4, 8, 16)):
    """
    Generates a set of periodic tasks with random periods and costs.
    """
    num_sets = 1
    u = np.array(tg.generate_uunifastdiscard(num_sets, utilization, num_tasks, 'test.csv'))
    p = np.array(tg.generate_random_periods_discrete(num_tasks, num_sets, available_periods))
    tset = np.array(tg.generate_tasksets(u, p, 'tset.csv'), dtype=[('cost', 'float64'), ('period', 'int32')])[0]
    tset = [PTask(tset['period'][i], tset['cost'][i]) for i in range(len(tset))]
    return tset


def generate_tasks(tset):
    """
    Generates a list of tasks from a set of periodic tasks.
    """
    if not tset:
        return []
    hp = np.lcm.reduce([t.period for t in tset])
    tasks = list(chain.from_iterable([
        zip(
            list(range(0, hp, tset[i].period)),
            list(range(tset[i].period, hp + 1, tset[i].period)),
            list(repeat(tset[i].execution_time, hp // tset[i].period))
        ) for i in range(len(tset))
    ]))
    # convert tasks to Task objects
    tasks = [Task(task[0], task[1], task[2]) for task in tasks]
    return tasks

In [100]:
class MAPPING_ALGORITHM(Enum):
    """
    An enum representing the mapping algorithm to use.
    """
    FIRST_FIT_DECREASING = 1
    BEST_FIT_DECREASING = 2
    WORST_FIT_DECREASING = 3
    FIRST_FIT_INCREASING = 4
    BEST_FIT_INCREASING = 5
    WORST_FIT_INCREASING = 6


def map_tasks_to_cores(ptasks, num_cores, mapping_algorithm):
    """
    Maps tasks to cores using the specified mapping algorithm.
    """
    # sort tasks based on utilization based on the mapping algorithm
    if mapping_algorithm.value <= 3:
        ptasks.sort(key=lambda task: task.utilization, reverse=True)
    else:
        ptasks.sort(key=lambda task: task.utilization, reverse=False)

    core_remaining_utilization = [1 / num_cores for _ in range(num_cores)]
    core_tasks = [[] for _ in range(num_cores)]

    # map tasks to cores
    for task in ptasks:
        if mapping_algorithm.value % 3 == 1:
            # first fit
            for i in range(num_cores):
                if core_remaining_utilization[i] >= task.utilization:
                    core_tasks[i].append(task)
                    core_remaining_utilization[i] -= task.utilization
                    break
            else:
                raise Exception('No core has enough remaining utilization to map task.')
        elif mapping_algorithm.value % 3 == 2:
            # best fit
            best_fit = -1
            for i in range(num_cores):
                if core_remaining_utilization[i] >= task.utilization:
                    if best_fit == -1 or core_remaining_utilization[i] < core_remaining_utilization[best_fit]:
                        best_fit = i
            if best_fit != -1:
                core_tasks[best_fit].append(task)
                core_remaining_utilization[best_fit] -= task.utilization
            else:
                raise Exception('No core has enough remaining utilization to map task.')
        else:
            # worst fit
            worst_fit = -1
            for i in range(num_cores):
                if core_remaining_utilization[i] >= task.utilization:
                    if worst_fit == -1 or core_remaining_utilization[i] > core_remaining_utilization[worst_fit]:
                        worst_fit = i
            if worst_fit != -1:
                core_tasks[worst_fit].append(task)
                core_remaining_utilization[worst_fit] -= task.utilization
            else:
                raise Exception('No core has enough remaining utilization to map task.')

    return core_tasks

In [114]:
ptasks = generate_ptasks(100)
mapped_tasks = map_tasks_to_cores(ptasks, 4, MAPPING_ALGORITHM.WORST_FIT_INCREASING)
tasks = [generate_tasks(tset) for tset in mapped_tasks]