In [None]:
import numpy as np

class EGT_GWO:
    def __init__(self, num_mds, num_ess, min_task_size, max_task_size, min_speed, max_speed, min_capacity, max_capacity, omega1, omega2, p_k, max_iter):
        self.NUM_MDS = num_mds
        self.NUM_ESS = num_ess
        self.MIN_TASK_SIZE = min_task_size
        self.MAX_TASK_SIZE = max_task_size
        self.MIN_SPEED = min_speed
        self.MAX_SPEED = max_speed
        self.MIN_CAPACITY = min_capacity
        self.MAX_CAPACITY = max_capacity
        self.omega1 = omega1
        self.omega2 = omega2
        self.P_k = p_k
        self.max_iter = max_iter

    def generate_synthetic_data(self):
        """Generate synthetic data for task sizes, mobile device speeds, and edge server capacities."""
        self.task_sizes = np.random.uniform(self.MIN_TASK_SIZE, self.MAX_TASK_SIZE, size=self.NUM_MDS)
        self.md_speeds = np.random.uniform(self.MIN_SPEED, self.MAX_SPEED, size=self.NUM_MDS)
        self.es_capacities = np.random.uniform(self.MIN_CAPACITY, self.MAX_CAPACITY, size=self.NUM_ESS)
        return self.task_sizes, self.md_speeds, self.es_capacities

    def total_edge_time(self, task_size, allocation):
        """Calculate latency for edge server execution."""
        return task_size / allocation

    def local_execution_time(self, task_size, speed):
        """Calculate latency for local execution."""
        return task_size / speed

    def edge_energy(self, task_size, allocation):
        """Calculate energy consumption for edge server execution."""
        return task_size * allocation

    def local_energy(self, task_size, speed):
        """Calculate energy consumption for local execution."""
        return task_size * speed

    def calculate_load_balancing(self, assignments, tasks, es_capacities):
        """Calculate load balancing across edge servers."""
        utilization = np.zeros(self.NUM_ESS)
        for es_idx in range(self.NUM_ESS):
            total_load = np.sum(tasks[assignments == es_idx])
            utilization[es_idx] = total_load / es_capacities[es_idx]
        return utilization

    def gwo_fitness(self, wolf, assignments, tasks):
        """Calculate fitness for a wolf (solution)."""
        total_cost = 0
        for i, assign in enumerate(assignments):
            if assign < self.NUM_ESS:  # Task is offloaded to an edge server
                latency = self.total_edge_time(tasks[i], wolf[assign])
                energy = self.edge_energy(tasks[i], wolf[assign])
            else:  # Task is executed locally
                latency = self.local_execution_time(tasks[i], self.md_speeds[i])
                energy = self.local_energy(tasks[i], self.md_speeds[i])
            total_cost += self.omega1 * latency + self.omega2 * energy

        # Overload penalty
        overload_penalty = 0
        for k in range(self.NUM_ESS):
            tasks_assigned = np.sum(assignments == k)
            overload = max(0, tasks_assigned - self.es_capacities[k])
            overload_penalty += self.P_k * overload

        fitness = total_cost + overload_penalty
        return fitness

    def logit_dynamics(self, md_speeds, es_allocations, tasks, beta, current_assignments):
        """Simulate task assignment decisions using Logit Dynamics."""
        probabilities = []
        new_assignments = []
        for md_idx in range(self.NUM_MDS):
            task = tasks[md_idx]
            payoffs = []
            for es_idx in range(self.NUM_ESS):
                effective_allocation = es_allocations[es_idx] / (np.sum(current_assignments == es_idx) + 1e-10)
                latency = self.total_edge_time(task, effective_allocation)
                energy = self.edge_energy(task, effective_allocation)
                payoffs.append(- (self.omega1 * latency + self.omega2 * energy))
            local_latency = self.local_execution_time(task, md_speeds[md_idx])
            local_energy_val = self.local_energy(task, md_speeds[md_idx])
            payoffs.append(- (self.omega1 * local_latency + self.omega2 * local_energy_val))
            max_payoff = np.max(payoffs)
            exp_payoffs = np.exp(beta * (np.array(payoffs) - max_payoff))
            exp_sum = np.sum(exp_payoffs)
            if exp_sum == 0:
                probabilities.append(np.ones(len(payoffs)) / len(payoffs))
            else:
                probabilities.append(exp_payoffs / exp_sum)
            new_assignments.append(np.random.choice(len(payoffs), p=probabilities[-1]))
        return np.array(probabilities), np.array(new_assignments)

    def update_position(self, wolf, alpha, beta_wolf, delta, a, A, C, search_space):
        """Update the position of a wolf in GWO."""
        for j in range(len(wolf)):
            D_alpha = abs(C * alpha[j] - wolf[j])
            D_beta = abs(C * beta_wolf[j] - wolf[j])
            D_delta = abs(C * delta[j] - wolf[j])
            X1 = alpha[j] - A * D_alpha
            X2 = beta_wolf[j] - A * D_beta
            X3 = delta[j] - A * D_delta
            wolf[j] = (X1 + X2 + X3) / 3
            wolf[j] = np.clip(wolf[j], search_space[j][0], search_space[j][1])
        return wolf

    def egt_gwo(self, tasks, md_speeds, es_capacities):
        """Run the EGT-GWO optimization algorithm."""
        num_wolves = 10
        dim = self.NUM_ESS
        search_space = [(0.1 * cap, cap) for cap in es_capacities]
        wolves = np.random.uniform([low for (low, high) in search_space], [high for (low, high) in search_space], (num_wolves, dim))
        alpha = wolves[0].copy()
        beta_wolf = wolves[1].copy()
        delta = wolves[2].copy()
        alpha_score = np.inf
        assignments = np.random.randint(0, self.NUM_ESS + 1, size=self.NUM_MDS)
        fitness_values = []
        task_assignments_history = np.zeros((self.max_iter, self.NUM_ESS + 1))
        utilization_history = np.zeros((self.max_iter, self.NUM_ESS))

        for iter in range(self.max_iter):
            iter_fitness = [self.gwo_fitness(wolf, assignments, tasks) for wolf in wolves]
            fitness_values.append(min(iter_fitness))
            utilization_history[iter] = self.calculate_load_balancing(assignments, tasks, es_capacities)
            task_assignments_history[iter] = np.histogram(assignments, bins=np.arange(self.NUM_ESS + 2))[0]
            a = 2 - 2 * (iter / self.max_iter)
            A = 2 * a * np.random.random() - a
            C = 2 * np.random.random()
            for i in range(num_wolves):
                current_fitness = self.gwo_fitness(wolves[i], assignments, tasks)
                if current_fitness < alpha_score:
                    alpha_score = current_fitness
                    alpha = wolves[i].copy()
                if current_fitness < self.gwo_fitness(beta_wolf, assignments, tasks):
                    beta_wolf = wolves[i].copy()
                elif current_fitness < self.gwo_fitness(delta, assignments, tasks):
                    delta = wolves[i].copy()
                wolves[i] = self.update_position(wolves[i], alpha, beta_wolf, delta, a, A, C, search_space)
            probabilities, assignments = self.logit_dynamics(md_speeds, es_capacities, tasks, beta=0.4, current_assignments=assignments)
        return alpha, assignments, fitness_values, task_assignments_history, utilization_history