In [None]:
# Importing EdgeSimPy components
from edge_sim_py import *

# Importing Pymoo components
from pymoo.util.display import Display
from pymoo.core.problem import Problem
from pymoo.optimize import minimize
from pymoo.factory import get_sampling, get_crossover, get_mutation
from pymoo.algorithms.moo.nsga2 import NSGA2

# Importing Python libraries
import networkx as nx

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import itertools
import random
import json


seed = 10
np.random.seed(seed)
random.seed(seed)

pd.set_option("display.max_rows", 500)
pd.set_option("display.max_columns", 500)
pd.set_option("display.width", 1000)

# %matplotlib inline
SAMPLE_TEST = True


## Helper methods

Let's define some helper methods that will be used in the rest of the notebook.

In [None]:
def find_shortest_path(origin_network_switch: object, target_network_switch: object) -> int:
    """Finds the shortest path (delay used as weight) between two network switches (origin and target)."""
    topology = origin_network_switch.model.topology
    path = []

    if not hasattr(topology, "delay_shortest_paths"):
        topology.delay_shortest_paths = {}

    key = (origin_network_switch, target_network_switch)

    if key in topology.delay_shortest_paths.keys():
        path = topology.delay_shortest_paths[key]
    else:
        path = nx.shortest_path(G=topology, source=origin_network_switch, target=target_network_switch, weight="delay")
        topology.delay_shortest_paths[key] = path

    return path


In [None]:
def provision(user: object, application: object, service: object, edge_server: object):
    """Provisions an application's service on an edge server."""
    # Updating the host's resource usage
    edge_server.cpu_demand += service.cpu_demand
    edge_server.memory_demand += service.memory_demand

    # Creating relationship between the host and the registry
    service.server = edge_server
    edge_server.services.append(service)

    for layer_metadata in edge_server._get_uncached_layers(service=service):
        layer = ContainerLayer(
            digest=layer_metadata.digest,
            size=layer_metadata.size,
            instruction=layer_metadata.instruction,
        )

        # Updating host's resource usage based on the layer size
        edge_server.disk_demand += layer.size

        # Creating relationship between the host and the layer
        layer.server = edge_server
        edge_server.container_layers.append(layer)

    user.set_communication_path(app=application)


In [None]:
def get_delay(origin_network_switch: object, target_network_switch: object) -> int:
    """Gets the distance (in terms of delay) between two network switches (origin and target)."""
    topology = origin_network_switch.model.topology

    path = find_shortest_path(origin_network_switch=origin_network_switch, target_network_switch=target_network_switch)
    delay = topology.calculate_path_delay(path=path)

    return delay


In [None]:
def evaluate_placement() -> tuple:
    """Evaluates the placement scheme suggested by the chromosome."""
    delay_sla_violations = 0
    privacy_sla_violations = 0
    overall_edge_server_power_consumption = 0
    overloaded_edge_servers = 0

    for user in User.all():
        for app in user.applications:
            user.set_communication_path(app=app)
            delay_sla = user.delay_slas[str(app.id)]
            delay = user._compute_delay(app=app, metric="latency")

            # Calculating objective #1: Number of Delay SLA Violations
            if delay > delay_sla:
                delay_sla_violations += 1

            # Calculating objective #2: Number of Privacy SLA Violations
            for service in app.services:
                if service.privacy_requirement > user.providers_trust[str(service.server.infrastructure_provider)]:
                    privacy_sla_violations += 1

    for edge_server in EdgeServer.all():
        # Calculating objective #3: Infrastructure's Power Consumption
        if SAMPLE_TEST:
            if edge_server.cpu_demand > 0:
                if edge_server.cpu == 8:
                    overall_edge_server_power_consumption += 250
                elif edge_server.cpu == 12:
                    overall_edge_server_power_consumption += 550
        else:
            overall_edge_server_power_consumption += edge_server.get_power_consumption()

        # Calculating the edge server's free resources
        free_cpu = edge_server.cpu - edge_server.cpu_demand
        free_memory = edge_server.memory - edge_server.memory_demand
        free_disk = edge_server.disk - edge_server.disk_demand

        # Calculating penalty #1: Number of overloaded edge servers
        if free_cpu < 0 or free_memory < 0 or free_disk < 0:
            overloaded_edge_servers += 1

    # Aggregating the solution's fitness scores and penalties
    fitness_values = (
        delay_sla_violations,
        privacy_sla_violations,
        overall_edge_server_power_consumption,
    )
    penalties = overloaded_edge_servers

    output = (fitness_values, penalties)

    return output


In this work, we need to customize the `User` class of the `EdgeSimPy` package. To this end, we create a custom method called `optimized_set_communication_path` and overwrite the original method of the `User` class.

In [None]:
def optimized_set_communication_path(self, app: object, communication_path: list = []) -> list:
    """Updates the set of links used during the communication of user and its application."""
    topology = Topology.first()

    # Releasing links used in the past to connect the user with its application
    if app in self.communication_paths:
        path = [[NetworkSwitch.find_by_id(i) for i in p] for p in self.communication_paths[str(app.id)]]
        topology._release_communication_path(communication_path=path, app=app)

    # Defining communication path
    if len(communication_path) > 0:
        self.communication_paths[str(app.id)] = communication_path
    else:
        self.communication_paths[str(app.id)] = []

        service_hosts_base_stations = [service.server.base_station for service in app.services if service.server]
        communication_chain = [self.base_station] + service_hosts_base_stations

        # Defining a set of links to connect the items in the application's service chain
        for i in range(len(communication_chain) - 1):

            # Defining origin and target nodes
            origin = communication_chain[i]
            target = communication_chain[i + 1]

            # Finding and storing the best communication path between the origin and target nodes
            if origin == target:
                path = []
            else:
                path = find_shortest_path(
                    origin_network_switch=origin.network_switch, target_network_switch=target.network_switch
                )

            # Adding the best path found to the communication path
            self.communication_paths[str(app.id)].append([network_switch.id for network_switch in path])

            # Computing the new demand of chosen links
            path = [[NetworkSwitch.find_by_id(i) for i in p] for p in self.communication_paths[str(app.id)]]
            topology._allocate_communication_path(communication_path=path, app=app)

    # Computing application's delay
    self._compute_delay(app=app, metric="latency")

    return self.communication_paths[str(app.id)]


# Overwriting the User's method
User.set_communication_path = optimized_set_communication_path


## Evaluation

### Faticanti et al. 2020

Adapted version of Faticanti et al. [1] proposed heuristic, that focuses on finding host servers for microservice-based applications on Edge Computing scenarios with multiple infrastructure providers. This heuristic was originally designed to define service's deployment (i.e., initial placement). The code below is an adapted version that uses the heuristic's original reasoning to perform service migration when it detects application's SLA are violated.

In [None]:
def faticanti2020(parameters: dict = {}):

    # Based on Faticanti's idea, we sort services based on their positions in their application's service chain and
    # according to their applications's network demand (services from applications with lower demand come first).
    services = sorted(Service.all(), key=lambda service: service.application.services.index(service))

    for service in services:
        app = service.application
        user = app.users[0]

        # Sorting edge servers by: trustworthiness, distance from user (in terms of delay), and free resources.
        edge_servers = sorted(
            EdgeServer.all(),
            key=lambda s: (
                -(user.providers_trust[str(s.infrastructure_provider)]),
                get_delay(origin_network_switch=user.base_station.network_switch, target_network_switch=s.network_switch),
                s.cpu - s.cpu_demand,
            ),
        )

        # Greedily iterating over the list of EdgeNode candidates to find the best node to host the service
        for edge_server in edge_servers:
            if edge_server.has_capacity_to_host(service):
                provision(user=user, application=app, service=service, edge_server=edge_server)
                break


### Argos

***TODO Argos description***

In [None]:
def argos(parameters: dict = {}):
    """Privacy-aware service migration strategy for edge computing environments."""

    apps = sorted(Application.all(), key=lambda app: app.users[0].delay_slas[str(app.id)])

    for app in apps:
        user = app.users[0]
        services = sorted(app.services, key=lambda s: (-s.privacy_requirement, -s.cpu_demand))

        edge_servers = sorted(get_host_candidates(user=user), key=lambda s: (-s["is_trusted"], s["delay"]))

        for service in services:
            # Greedily iterating over the list of edge servers to find a host for the service
            for edge_server_metadata in edge_servers:
                edge_server = edge_server_metadata["object"]

                if edge_server.has_capacity_to_host(service):
                    provision(user=user, application=app, service=service, edge_server=edge_server)
                    break

        user.set_communication_path(app=app)


def get_host_candidates(user: object) -> list:
    """Get list of host candidates for hosting services of a given user."""

    host_candidates = []

    for edge_server in EdgeServer.all():
        delay = get_delay(
            origin_network_switch=user.base_station.network_switch, target_network_switch=edge_server.network_switch
        )

        # trusted_users = sum([1 for u in User.all() if edge_server in u.trusted_servers and u != user])
        # is_trusted = 1 if edge_server in user.trusted_servers else 0
        trusted_users = [u.providers_trust[str(edge_server.infrastructure_provider)] for u in User.all() if u != user]
        is_trusted = user.providers_trust[str(edge_server.infrastructure_provider)]

        host_candidates.append(
            {
                "object": edge_server,
                "delay": delay,
                "trusted_users": trusted_users,
                "is_trusted": is_trusted,
            }
        )

    return host_candidates


### NSGA-II

***TODO NSGA-II description***

Below we are creating a custom pymoo `Display` class to consider ours objectives, called `TheaDisplay`.

In [None]:
class TheaDisplay(Display):
    """Creates a visualization on how the genetic algorithm is evolving throughout the generations."""

    def _do(self, problem: object, evaluator: object, algorithm: object):
        """Defines the way information about the genetic algorithm is printed after each generation."""
        super()._do(problem, evaluator, algorithm)

        objective_1 = int(np.min(algorithm.pop.get("F")[:, 0]))
        objective_2 = int(np.min(algorithm.pop.get("F")[:, 1]))
        objective_3 = int(np.min(algorithm.pop.get("F")[:, 2]))

        self.output.append("Delay Viol.", objective_1)
        self.output.append("Priv. Viol.", objective_2)
        self.output.append("Power Cons.", objective_3)


Let's create the placement problem `PlacementProblem` extending the `Problem` class from pymoo. In the `PlacementProblem._evaluate` method, we are evaluating the solution proposed by the chromosome (i.e., the placement of the services in the infrastructure). Inittialy, we apply placement scheme with the `apply_placement` method. Then, we evaluate the solution using the `evaluate_placement` method, which returns the values of the objectives and constraints. By the end, we reset the placement scheme with the `reset_placement` method to clean the infrastructure for the next evaluation.

In [None]:
def apply_placement(solution: list):
    """Applies the placement scheme suggested by the chromosome."""
    for service_id, edge_server_id in enumerate(solution, 1):
        service = Service.find_by_id(service_id)
        edge_server = EdgeServer.find_by_id(edge_server_id)
        app = service.application
        user = app.users[0]

        provision(user=user, application=app, service=service, edge_server=edge_server)

def reset_placement():
    """Resets the placement scheme suggested by the chromosome."""
    for edge_server in EdgeServer.all():
        # Resetting the demand
        edge_server.cpu_demand = 0
        edge_server.memory_demand = 0
        edge_server.disk_demand = 0

        # Deprovisioning services
        for service in edge_server.services:
            service.server = None
        edge_server.services = []

        # Removing layers from edge servers not initially set as hosts for container registries
        if len(edge_server.container_registries) == 0:
            layers = list(edge_server.container_layers)
            edge_server.container_layers = []
            for layer in layers:
                layer.server = None
                ContainerLayer.remove(layer)

    for user in User.all():
        for app in user.applications:
            user.delays[str(app.id)] = 0
            user.communication_paths[str(app.id)] = []

class PlacementProblem(Problem):
    """Describes the application placement as an optimization problem."""

    def __init__(self, **kwargs):
        """Initializes the problem instance."""
        super().__init__(n_var=Service.count(), n_obj=3, n_constr=1, xl=1, xu=EdgeServer.count(), type_var=int, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
        """Evaluates solutions according to the problem objectives."""
        output = [self.get_fitness_score_and_constraints(solution=solution) for solution in x]

        out["F"] = np.array([item[0] for item in output])
        out["G"] = np.array([item[1] for item in output])

    def get_fitness_score_and_constraints(self, solution: list) -> tuple:
        """Calculates the fitness score and penalties of a solution based on the problem definition."""
        # Applying the placement scheme suggested by the chromosome
        apply_placement(solution=solution)

        # Calculating objectives and penalties
        output = evaluate_placement()

        # Resetting the placement scheme suggested by the chromosome
        reset_placement()

        return output


With the `PlacementProblem` and `TheaDisplay` class defined, we can create the `nsgaii` method, which uses the `NSGA2` algorithm from pymoo to solve the placement problem. After the metaheuristic is executed, we compute the score of the Pareto front solutions (i.e., a set non-dominated solutions) through the geometric mean of the objectives values. Finally, we use the best solution to perform the placement in the infrastructure.


In [None]:
def nsgaii(parameters: dict = {}):

    # Algorithm parameters
    seed = parameters.get("seed", 1)
    n_gen = parameters.get("n_gen", 100)
    pop_size = parameters.get("pop_size", 100)
    sampling = parameters.get("sampling", "int_random")
    crossover = parameters.get("crossover", "int_ux")
    crossover_prob = parameters.get("crossover_prob", 1)
    mutation = parameters.get("mutation", "int_pm")
    mutation_prob = parameters.get("mutation_prob", 0.2)

    # Defining the NSGA-II attributes
    algorithm = NSGA2(
        pop_size=pop_size,
        sampling=get_sampling(sampling),
        crossover=get_crossover(crossover, prob=crossover_prob),
        mutation=get_mutation(mutation, prob=mutation_prob),
        eliminate_duplicates=True,
    )

    # Running the NSGA-II algorithm
    problem = PlacementProblem()
    res = minimize(problem, algorithm, termination=("n_gen", n_gen), seed=seed, verbose=True, display=TheaDisplay())

    # Parsing the NSGA-II's output
    solutions = []
    for i in range(len(res.X)):
        placement = res.X[i].tolist()
        fitness = {
            "Delay Violations": res.F[i][0],
            "Privacy Violations": res.F[i][1],
            "Power Consumption": res.F[i][2],
        }
        overloaded_servers = res.CV[i][0].tolist()

        solutions.append({"placement": placement, "fitness": fitness, "overloaded_servers": overloaded_servers})

    # Applying the a placement scheme found by the NSGA-II algorithm
    # best_solution = [7, 8, 1, 1, 7, 4, 4, 1, 5, 5]  # This is the solution manually calculated in the whiteboard
    best_solution = sorted(
        solutions,
        key=lambda s: (s["fitness"]["Delay Violations"] * s["fitness"]["Privacy Violations"] * s["fitness"]["Power Consumption"])
        ** (1 / 3),
    )[0]["placement"]

    print(f"best_solution => {best_solution}")

    apply_placement(solution=best_solution)
    result = evaluate_placement()

    if SAMPLE_TEST:
        print("==== SOLUTIONS ====")
        for solution in solutions:
            print(f"    {solution}")
        print("")

        print(f"\nBEST SOLUTION: {best_solution}")

        print(f"OUTPUT: {result}")
        print(f"    Delay SLA Violations: {result[0][0]}")
        print(f"    Privacy SLA Violations: {result[0][1]}")
        print(f"    Power Consumption: {result[0][2]}")
        print(f"    Overloaded Edge Servers: {result[1]}")


### Thea

## Simulation

Now, we are ready to run the simulation. To this end, we create a `algorithms` dicitionary to specify which algorithms will be evaluated. Since some of them has parameters that could be tuned, we specified the parameter values that we want to evaluate for each algorithm.

In [None]:
algorithms = {
    "faticanti2020": {
        "method": faticanti2020,
    },
    "argos": {
        "method": argos,
    },
    "nsgaii": {
        "method": nsgaii,
        "parameters": {
            "n_gen": [100, 200, 300],
            "pop_size": [100, 200, 300],
            "sampling": ["int_random"],
            "crossover": ["int_ux"],
            "crossover_prob": [0.25, 0.5, 1],
            "mutation": ["int_pm"],
            "mutation_prob": [0.2],
        },
    },
    # "thea": {
    #     "method": thea,
    # },
}


Let's define the dataset that will be used in the simulation. Our dataset is composed by XX users, YY services, ZZ infrastructure providers, and AA edge servers.

In [None]:
dataset_filename = "datasets/sample_dataset1.json"

# Loading the dataset
with open(dataset_filename, "r") as f:
    dataset = json.load(f)

print(f"Number of edge servers: {len(dataset['edge_servers'])}")
print(f"Number of applications: {len(dataset['applications'])}")
print(f"Number of services: {len(dataset['services'])}")
print(f"Number of users: {len(dataset['users'])}")
print(f"Number of infrastructure providers: {len(set([edge_server['attributes']['infrastructure_provider'] for edge_server in dataset['edge_servers']]))}")


Now, let's execute the simulation for each algorithm and each parameter value. We also create a `results` dictionary to store the results of the simulation.

In [None]:
results = []
for algorithm in algorithms:

    if "parameters" in algorithms[algorithm]:
        combinations = list(itertools.product(*algorithms[algorithm]["parameters"].values()))

    else:
        combinations = [None]

    for combination in combinations:

        # Creating a Simulator object
        simulator = Simulator(
            tick_duration=1,
            tick_unit="seconds",
            stopping_criterion=lambda model: model.schedule.steps == 1,
            resource_management_algorithm=algorithms[algorithm]["method"],
            dump_interval=float("inf"),
        )

        result = {
            "algorithm": algorithm,
        }

        if combination:
            parameters = dict(zip(algorithms[algorithm]["parameters"].keys(), combination))

            resource_management_algorithm_parameters = {
                "current_step": simulator.schedule.steps + 1,
            }

            result.update(parameters)
            resource_management_algorithm_parameters.update(parameters)

            simulator.resource_management_algorithm_parameters = resource_management_algorithm_parameters

            print(f"==> Executing algorithm: {algorithm} with parameters: {parameters}")

        else:
            print(f"==> Executing algorithm: {algorithm}")

        # Loading the dataset
        simulator.initialize(input_file=dataset)

        # Executing the simulation
        simulator.run_model()

        # Evaluating the placement
        metrics, _ = evaluate_placement()

        metrics_dict = {
            "delay_sla_violations": metrics[0],
            "privacy_sla_violations": metrics[1],
            "overall_edge_server_power_consumption": metrics[2],
        }

        result.update(metrics_dict)

        results.append(result)

        # # Resetting the placement scheme suggested by the chromosome
        # reset_placement()

        print("\n")


## Results

Let's check the results for each algorithm and each parameter value.

In [None]:
# Creating a Pandas DataFrame from the results
results_df = pd.DataFrame(results)[
    [
        "algorithm",
        "n_gen",
        "pop_size",
        "sampling",
        "crossover",
        "crossover_prob",
        "mutation",
        "mutation_prob",
        "delay_sla_violations",
        "privacy_sla_violations",
        "overall_edge_server_power_consumption",
    ]
]
results_df


In [None]:
# Save the results to a CSV file
results_df.to_csv(f"results.csv", index=False)

## References

- [1] Faticanti, Francescomaria, et al. "Deployment of Application Microservices in Multi-Domain Federated Fog Environments." International Conference on Omni-layer Intelligent Systems (COINS). IEEE, 2020.