In [None]:

import wandb
import numpy as np
import random
import optuna
import ray
from collections import deque
from ray import tune
from ray.tune.integration.wandb import WandbLoggerCallback
from ray.tune.schedulers import PopulationBasedTraining
from typing import List, Tuple
from pydantic import BaseModel, Field
from mlflow import log_metric, log_param, start_run, end_run
import hydra
from omegaconf import DictConfig
import ipyvolume as ipv
import ipywidgets as widgets
from netCDF4 import Dataset

class Config(BaseModel):
    task_count: int = 8000
    vm_count: int = 10
    task_arrival_rate: float = 40.0
    task_compute_mean: int = 200
    task_compute_std: int = 20
    vm_capacity: List[int] = Field(default_factory=lambda: [1000, 1000, 1100, 1100, 1100, 1100, 1200, 1200, 1200, 1200])
    vm_attribute: List[int] = Field(default_factory=lambda: [1, 1, 1, 1, 1, 0, 0, 0, 0, 0])

@hydra.main(config_path=".", config_name="config")
def main(cfg: DictConfig):

    task_arrival_times = np.cumsum(np.random.exponential(1.0 / cfg.task_arrival_rate, cfg.task_count))

    try:
        with Dataset('task_data.nc', 'w', format='NETCDF4') as ncfile:
            arrival_var = ncfile.createVariable('task_arrival_times', np.float32, ('tasks',))
            arrival_var[:] = task_arrival_times
    except Exception as e:
        print(f"Error handling netCDF4 file: {str(e)}")

    ray.init(ignore_reinit_error=True)

    class VirtualMachine:
        def __init__(self, capacity: int, attribute: int):
            self.capacity = capacity
            self.attribute = attribute
            self.queue = deque()
            self.available_time = 0

        def add_task(self, task: Tuple[float, int, int], current_time: float) -> Tuple[float, float]:
            start_time = max(self.available_time, current_time)
            exec_time = (task[1] / self.capacity) * (2 if task[2] != self.attribute else 1)
            self.available_time = start_time + exec_time
            self.queue.append((task, start_time, self.available_time))
            return start_time, self.available_time

    try:
        with Dataset('vm_data.nc', 'w', format='NETCDF4') as ncfile:
            ncfile.createDimension('vms', cfg.vm_count)
            vm_capacity_var = ncfile.createVariable('vm_capacity', np.int32, ('vms',))
            vm_attribute_var = ncfile.createVariable('vm_attribute', np.int32, ('vms',))
            vm_capacity_var[:] = cfg.vm_capacity
            vm_attribute_var[:] = cfg.vm_attribute
    except Exception as e:
        print(f"Error handling netCDF4 file: {str(e)}")

    vms = [VirtualMachine(cfg.vm_capacity[i], cfg.vm_attribute[i]) for i in range(cfg.vm_count)]

    class Scheduler:
        def __init__(self, vms: List[VirtualMachine]):
            self.vms = vms
            self.iteration_scores = []
            self.global_best_scores = []
            self.swarm_positions = []

        def reset(self):
            for vm in self.vms:
                vm.queue.clear()
                vm.available_time = 0

        def dpso_scheduler(self, tasks, iterations=100, swarm_size=20, w=0.5, c1=2.5, c2=2.5) -> float:
            swarm = [np.random.randint(0, len(self.vms), size=len(tasks)) for _ in range(swarm_size)]
            personal_best = swarm.copy()
            personal_best_scores = [self.evaluate_schedule(tasks, particle) for particle in swarm]
            global_best = personal_best[np.argmin(personal_best_scores)]
            global_best_score = min(personal_best_scores)
            velocities = [np.random.randint(-1, 2, size=len(tasks)) for _ in range(swarm_size)]

            fig = ipv.figure()
            scatter = ipv.scatter([], [], [], marker="sphere", size=1)

            for iteration in range(iterations):
                iteration_best_score = float('inf')
                positions = []

                for i in range(swarm_size):
                    r1, r2 = random.random(), random.random()
                    inertia = w * velocities[i]
                    cognitive = c1 * r1 * (personal_best[i] - swarm[i])
                    social = c2 * r2 * (global_best - swarm[i])
                    velocities[i] = np.clip(inertia + cognitive + social, -1, 1)

                    swarm[i] = (swarm[i] + velocities[i]) % len(self.vms)
                    score = self.evaluate_schedule(tasks, swarm[i])
                    iteration_best_score = min(iteration_best_score, score)

                    positions.append(swarm[i].copy())
                    if score < personal_best_scores[i]:
                        personal_best[i] = swarm[i].copy()
                        personal_best_scores[i] = score

                    if score < global_best_score:
                        global_best = swarm[i].copy()
                        global_best_score = score

                self.iteration_scores.append(iteration_best_score)
                try:
                    with Dataset('dpso_scores.nc', 'a') as ncfile:
                        if 'iteration_scores' not in ncfile.variables:
                            score_var = ncfile.createVariable('iteration_scores', np.float32, ('iterations',))
                        score_var[iteration] = iteration_best_score
                except Exception as e:
                    print(f"Error writing to netCDF4 file: {str(e)}")

                scatter.x = [np.mean(pos) for pos in positions]
                scatter.y = [np.std(pos) for pos in positions]
                scatter.z = [global_best_score] * len(positions)
                ipv.clear()
                ipv.show()

            self.reset()
            total_time = 0
            for task_idx, (task, vm_index) in enumerate(zip(tasks, global_best)):
                vm = self.vms[vm_index]
                start_time, end_time = vm.add_task((task.arrival_time, task.compute_req, task.attribute), task.arrival_time)
                total_time += (end_time - task.arrival_time)
            return total_time / len(tasks)

        def evaluate_schedule(self, tasks, schedule) -> float:
            self.reset()
            total_time = 0
            for task, vm_index in zip(tasks, schedule):
                vm = self.vms[vm_index]
                start_time, end_time = vm.add_task((task.arrival_time, task.compute_req, task.attribute), task.arrival_time)
                total_time += (end_time - task.arrival_time)
            return total_time / len(tasks)

    scheduler = Scheduler(vms)

    wandb.init(project="dpso-advanced", name="DPSO with PBT and MLflow")
    start_run()

    def objective(trial):
        iterations = trial.suggest_int('iterations', 100, 500)
        swarm_size = trial.suggest_int('swarm_size', 100, 200)
        w = trial.suggest_float('w', 0.1, 1.5)
        c1 = trial.suggest_float('c1', 0.5, 5.0)
        c2 = trial.suggest_float('c2', 0.5, 5.0)
        wandb.log({"iterations": iterations, "swarm_size": swarm_size, "w": w, "c1": c1, "c2": c2})

        avg_completion_time = scheduler.dpso_scheduler(tasks, iterations=iterations, swarm_size=swarm_size, w=w, c1=c1, c2=c2)
        log_metric("avg_completion_time", avg_completion_time)
        wandb.log({"avg_completion_time": avg_completion_time})
        return avg_completion_time

    pbt = PopulationBasedTraining(
        time_attr="training_iteration",
        metric="avg_completion_time",
        mode="min",
        perturbation_interval=5,
        hyperparam_mutations={
            "w": tune.uniform(0.1, 1.5),
            "c1": tune.uniform(0.5, 5.0),
            "c2": tune.uniform(0.5, 5.0)
        }
    )

    analysis = tune.run(
        objective,
        name="dpso_pbt_optuna",
        scheduler=pbt,
        num_samples=50,
        callbacks=[WandbLoggerCallback(project="advanced-dpso-pbt")],
    )

    end_run()
    ray.shutdown()

if __name__ == "__main__":
    main()
