# Creating a Placement Algorithm

This tutorial demonstrates how we can create a simple placement algorithm on EdgeSimPy.

Let's start by importing the EdgeSimPy modules:

In [4]:
# EdgeSimPy Import Debugging Script

# Explicit dependency installation
!pip install networkx==2.6.2
!pip install matplotlib pandas numpy
!pip install git+https://github.com/EdgeSimPy/EdgeSimPy.git@v1.1.0

# Python and package information
!python --version
!pip list | grep -E "networkx|edge_sim_py"

# Comprehensive import and debugging script
import sys
import os
import importlib

def print_module_structure(module_name):
    """
    Recursively print the structure of a module
    """
    print(f"\n--- Module Structure for {module_name} ---")
    try:
        # Import the module
        module = importlib.import_module(module_name)

        # Get the module's file path
        module_file = getattr(module, '__file__', 'No __file__ attribute')
        print(f"Module file path: {module_file}")

        # Get the module's directory
        module_dir = os.path.dirname(module_file) if hasattr(module, '__file__') else 'Unknown'
        print(f"Module directory: {module_dir}")

        # List all attributes and their types
        print("\nModule Contents:")
        for attr_name in dir(module):
            try:
                attr = getattr(module, attr_name)
                print(f"  {attr_name}: {type(attr)}")
            except Exception as attr_err:
                print(f"  {attr_name}: Could not retrieve (Error: {attr_err})")

        # List files in the module directory
        if os.path.isdir(module_dir):
            print("\nFiles in module directory:")
            try:
                for item in os.listdir(module_dir):
                    print(f"  {item}")
            except Exception as list_err:
                print(f"  Could not list directory contents: {list_err}")

    except ImportError as e:
        print(f"Could not import {module_name}: {e}")
    except Exception as e:
        print(f"Unexpected error examining {module_name}: {e}")

# Print Python path and sys.path for debugging
print("--- Python Path ---")
print(sys.path)

# Attempt to import and examine EdgeSimPy
print_module_structure('edge_sim_py')

# Attempt alternative import methods
print("\n--- Alternative Import Attempts ---")
import_attempts = [
    'edge_sim_py',
    'edge_sim_py.core',
    'edge_sim_py.components',
    'edge_sim_py.device',
    'edge_sim_py.server'
]

for attempt in import_attempts:
    print(f"\nTrying to import {attempt}")
    try:
        module = importlib.import_module(attempt)
        print(f"Successfully imported {attempt}")
        print(f"Module file: {getattr(module, '__file__', 'No file attribute')}")
    except ImportError as e:
        print(f"Import failed: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

# List installed packages with their paths
print("\n--- Installed Packages Paths ---")
for package_name in ['edge_sim_py', 'networkx', 'numpy', 'pandas']:
    try:
        package = importlib.import_module(package_name)
        print(f"{package_name}: {package.__file__}")
    except ImportError:
        print(f"{package_name}: Not found")
    except Exception as e:
        print(f"{package_name}: Error - {e}")


[0mCollecting git+https://github.com/EdgeSimPy/EdgeSimPy.git@v1.1.0
  Cloning https://github.com/EdgeSimPy/EdgeSimPy.git (to revision v1.1.0) to /tmp/pip-req-build-k7brjlxd
  Running command git clone --filter=blob:none --quiet https://github.com/EdgeSimPy/EdgeSimPy.git /tmp/pip-req-build-k7brjlxd
  Running command git checkout -q 5ea400b39390490b25dabf8be711fe559cb2cbff
  Resolved https://github.com/EdgeSimPy/EdgeSimPy.git to commit 5ea400b39390490b25dabf8be711fe559cb2cbff
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[0mPython 3.11.11
[0medge_sim_py                        1.1.0
networkx                           2.6.2
--- Python Path ---
['/content', '/env/python', '/usr/lib/python311.zip', '/usr/lib/python3.11', '/usr/lib/python3.11/lib-dynload', '', '/usr/local/lib/python3.11/dist-packages', '/usr/lib/python3/dist-packages', '/usr/local/lib/python3.11/dist

## Implementing the Placement Algorithm

In this example, we are going to create a simple placement algorithm that works according to the well-known First-Fit heuristic. In a nutshell, our algorithm will provision each service to the first edge server with available resources to host them.

In [5]:
def my_algorithm(parameters):
    # We can always call the 'all()' method to get a list with all created instances of a given class
    for service in Service.all():
        # We don't want to migrate services are are already being migrated
        if service.server == None and not service.being_provisioned:

            # Let's iterate over the list of edge servers to find a suitable host for our service
            for edge_server in EdgeServer.all():

                # We must check if the edge server has enough resources to host the service
                if edge_server.has_capacity_to_host(service=service):

                    # Start provisioning the service in the edge server
                    service.provision(target_server=edge_server)

                    # After start migrating the service we can move on to the next service
                    break

## Running the Simulation

As we're creating a placement algorithm, we must instruct EdgeSimPy that it needs to continue the simulation until all services are provisioned within the infrastructure.

To do so, let's create a simple function that will be used as the simulation's stopping criterion. EdgeSimPy will run that function at the end of each time step, halting the simulation as soon as it returns `True`.

In [6]:
def stopping_criterion(model: object):
    # Defining a variable that will help us to count the number of services successfully provisioned within the infrastructure
    provisioned_services = 0

    # Iterating over the list of services to count the number of services provisioned within the infrastructure
    for service in Service.all():

        # Initially, services are not hosted by any server (i.e., their "server" attribute is None).
        # Once that value changes, we know that it has been successfully provisioned inside an edge server.
        if service.server != None:
            provisioned_services += 1

    # As EdgeSimPy will halt the simulation whenever this function returns True, its output will be a boolean expression
    # that checks if the number of provisioned services equals to the number of services spawned in our simulation
    return provisioned_services == Service.count()

FCFS Algorithm Logic


In [None]:
from typing import List, Dict
import numpy as np

class ResourceType:
    """
    Enum-like class to define resource types
    """
    EDGE_RASPBERRY = "Edge_Raspberry"
    EDGE_SMARTPHONE = "Edge_Smartphone"
    CLOUD = "Cloud"

class Task:
    """
    Task class representing a computational task with data size and CPU requirements
    """
    def __init__(self,
                 task_id: int,
                 data_size: float,     # in MB
                 cpu_required: float):  # in MI (Million Instructions)
        self.id = task_id
        self.data_size = data_size
        self.cpu_required = cpu_required
        self.arrival_time = 0
        self.start_time = 0
        self.completion_time = 0
        self.assigned_resource = None
        self.execution_time = 0
        self.transfer_time = 0

    def calculate_execution_time(self, resource):
        """
        Calculate execution time based on CPU requirements and resource CPU rating
        """
        return self.cpu_required / resource.cpu_rating  # MI / (MI/s) = seconds

    def calculate_transfer_time(self, resource):
        """
        Calculate data transfer time based on data size and bandwidth
        """
        return self.data_size / resource.bandwidth  # MB / (MB/s) = seconds

class Resource:
    """
    Resource class with specific configurations for edge and cloud nodes
    """
    def __init__(self,
                 resource_id: int,
                 resource_type: str,
                 cpu_rating: int,    # in MI/s (Million Instructions per Second)
                 memory: int,        # in GB
                 bandwidth: int):    # in MB/s
        self.id = resource_id
        self.type = resource_type
        self.cpu_rating = cpu_rating
        self.memory = memory
        self.bandwidth = bandwidth
        self.available_cpu = cpu_rating
        self.available_memory = memory
        self.current_tasks = []
        self.completed_tasks = []

class FCFSScheduler:
    """
    First Come First Serve (FCFS) scheduler implementation
    """
    def __init__(self):
        self.queue: List[Task] = []
        self.current_time = 0
        self.scheduled_tasks: Dict[int, List[Task]] = {}
        self.completed_tasks = []

    def add_task(self, task: Task) -> None:
        """
        Add a new task to the scheduling queue
        """
        task.arrival_time = self.current_time
        self.queue.append(task)

    def schedule(self, available_resources: List[Resource]) -> Dict[Resource, List[Task]]:
        """
        Schedule tasks using FCFS algorithm
        """
        if not self.queue or not available_resources:
            return {}

        # Sort tasks by arrival time
        self.queue.sort(key=lambda x: x.arrival_time)

        assignments = {resource: [] for resource in available_resources}
        resources_available = {resource: True for resource in available_resources}

        for task in self.queue[:]:
            best_resource = None
            best_total_time = float('inf')

            # Find the best resource for the task
            for resource in available_resources:
                if resources_available[resource] and self._can_resource_handle_task(resource, task):
                    total_time = self._calculate_total_processing_time(resource, task)
                    if total_time < best_total_time:
                        best_total_time = total_time
                        best_resource = resource

            if best_resource:
                # Assign task to the best resource
                task.assigned_resource = best_resource
                task.start_time = self.current_time
                task.execution_time = task.calculate_execution_time(best_resource)
                task.transfer_time = task.calculate_transfer_time(best_resource)
                task.completion_time = self.current_time + task.execution_time + task.transfer_time

                assignments[best_resource].append(task)
                resources_available[best_resource] = False
                self.queue.remove(task)
                best_resource.current_tasks.append(task)

        return assignments

    def _can_resource_handle_task(self, resource: Resource, task: Task) -> bool:
        """
        Check if a resource can handle a given task
        """
        # Check CPU requirements
        if task.cpu_required > resource.available_cpu:
            return False

        # Calculate total processing time
        total_time = self._calculate_total_processing_time(resource, task)

        # Add basic threshold (can be adjusted)
        if total_time > 600:  # 10 minutes threshold
            return False

        return True

    def _calculate_total_processing_time(self, resource: Resource, task: Task) -> float:
        """
        Calculate total processing time including execution and transfer time
        """
        execution_time = task.calculate_execution_time(resource)
        transfer_time = task.calculate_transfer_time(resource)
        return execution_time + transfer_time

    def update(self, current_time: float) -> None:
        """
        Update scheduler state and process completed tasks
        """
        self.current_time = current_time

        # Process completed tasks
        for resource in self.resources:
            completed = [task for task in resource.current_tasks
                        if task.completion_time <= self.current_time]

            for task in completed:
                resource.current_tasks.remove(task)
                resource.completed_tasks.append(task)
                self.completed_tasks.append(task)
                resource.available_cpu += task.cpu_required

    def get_metrics(self) -> Dict:
        """
        Calculate and return scheduling metrics
        """
        metrics = {
            'average_waiting_time': 0,
            'average_turnaround_time': 0,
            'average_execution_time': 0,
            'average_transfer_time': 0,
            'throughput': 0,
            'queue_length': len(self.queue)
        }

        if self.completed_tasks:
            waiting_times = []
            turnaround_times = []
            execution_times = []
            transfer_times = []

            for task in self.completed_tasks:
                waiting_time = task.start_time - task.arrival_time
                turnaround_time = task.completion_time - task.arrival_time

                waiting_times.append(waiting_time)
                turnaround_times.append(turnaround_time)
                execution_times.append(task.execution_time)
                transfer_times.append(task.transfer_time)

            metrics['average_waiting_time'] = np.mean(waiting_times)
            metrics['average_turnaround_time'] = np.mean(turnaround_times)
            metrics['average_execution_time'] = np.mean(execution_times)
            metrics['average_transfer_time'] = np.mean(transfer_times)
            metrics['throughput'] = len(self.completed_tasks) / self.current_time

        return metrics

def create_edge_cloud_resources():
    """
    Create resources based on the configuration table
    """
    resources = [
        # One Raspberry Pi Edge Node
        Resource(
            resource_id=1,
            resource_type=ResourceType.EDGE_RASPBERRY,
            cpu_rating=80000,    # 80,000 MI/s
            memory=1,            # 1 GB
            bandwidth=5          # 5 MB/s
        ),

        # One Smartphone Edge Node
        Resource(
            resource_id=2,
            resource_type=ResourceType.EDGE_SMARTPHONE,
            cpu_rating=400000,   # 400,000 MI/s
            memory=4,            # 4 GB
            bandwidth=20         # 20 MB/s
        ),

        # One Cloud Node
        Resource(
            resource_id=3,
            resource_type=ResourceType.CLOUD,
            cpu_rating=1000000,  # 1,000,000 MI/s
            memory=32,           # 32 GB
            bandwidth=80         # 80 MB/s
        )
    ]
    return resources

# Example usage
def main():
    # Create scheduler
    scheduler = FCFSScheduler()

    # Create resources
    resources = create_edge_cloud_resources()

    # Create some sample tasks
    tasks = [
        Task(task_id=1, data_size=100, cpu_required=50000),    # 100 MB, 50,000 MI
        Task(task_id=2, data_size=200, cpu_required=100000),   # 200 MB, 100,000 MI
        Task(task_id=3, data_size=150, cpu_required=75000),    # 150 MB, 75,000 MI
    ]

    # Add tasks to scheduler
    for task in tasks:
        scheduler.add_task(task)

    # Schedule tasks
    assignments = scheduler.schedule(resources)

    # Print assignments
    print("\nTask Assignments:")
    print("-----------------")
    for resource, assigned_tasks in assignments.items():
        print(f"\nResource {resource.id} ({resource.type}):")
        for task in assigned_tasks:
            print(f"  Task {task.id}:")
            print(f"    Data Size: {task.data_size} MB")
            print(f"    CPU Required: {task.cpu_required} MI")
            print(f"    Execution Time: {task.execution_time:.2f} seconds")
            print(f"    Transfer Time: {task.transfer_time:.2f} seconds")
            print(f"    Total Time: {(task.execution_time + task.transfer_time):.2f} seconds")

    # Get and print metrics
    metrics = scheduler.get_metrics()
    print("\nScheduling Metrics:")
    print("------------------")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.2f}")

if __name__ == "__main__":
    main()


Once we have our stopping criterion, we can finally run our simulation by creating an instance of the `Simulator` class, loading a dataset, and calling the `run_model()` method.

In [7]:

# Creating a Simulator object
simulator = Simulator(
    tick_duration=1,
    tick_unit="seconds",
    stopping_criterion=stopping_criterion,
    resource_management_algorithm=my_algorithm,
)

# Loading a sample dataset from GitHub
simulator.initialize(input_file="https://raw.githubusercontent.com/EdgeSimPy/edgesimpy-tutorials/master/datasets/sample_dataset2.json")

# Executing the simulation
simulator.run_model()

# Checking the placement output
for service in Service.all():
    print(f"{service}. Host: {service.server}")

Service_1. Host: EdgeServer_1
Service_2. Host: EdgeServer_1
Service_3. Host: EdgeServer_1
Service_4. Host: EdgeServer_1
Service_5. Host: EdgeServer_1
Service_6. Host: EdgeServer_1
