In [1]:
import numpy as np
from multiprocessing import Pool
from numpy.typing import NDArray
from dco import Optimizer, LocalObjective


def dco_task(
    algorithm: str,
    u_i: NDArray[np.float64],
    v_i: NDArray[np.float64],
    name: str,
    dim_i: int,
    rho_i: float,
    alpha: int | float,
    gamma: int | float,
    max_iter: int,
    server_address: str,
) -> None:
    def f(var: NDArray[np.float64]) -> NDArray[np.float64]:
        return (u_i @ var - v_i) ** 2 + rho_i * var @ var

    local_obj = LocalObjective(dim_i, f)

    optimizer = Optimizer.create(
        algorithm, name, local_obj, alpha, gamma, server_address=server_address
    )
    optimizer.solve_sync(max_iter)


if __name__ == "__main__":
    # Create a simple graph
    node_names = ["1", "2", "3", "4"]

    # Set parameters for ridge regression
    dim = 10

    np.random.seed(0)

    rho = 0.01
    u = {i: np.random.uniform(-1, 1, dim) for i in node_names}
    x_tilde = {i: np.multiply(0.1 * (int(i) - 1), np.ones(dim)) for i in node_names}
    epsilon = {i: np.random.normal(0, 5) for i in node_names}
    v = {i: u[i] @ x_tilde[i] + epsilon[i] for i in node_names}

    # Distributed optimization
    shared_params = {
        "dim_i": dim,
        "rho_i": rho,
        "max_iter": 2000,
        "alpha": 0.2,
        "server_address": "localhost:5555",
    }
    stepsizes = {
        "EXTRA": 0.16,
        "NIDS": 0.21,
        "DIGing": 0.11,
        "AugDGM": 0.31,
        "WE": 0.17,
        "RGT": 0.11,
    }

    with Pool(processes=len(node_names)) as pool:
        algorithm = "EXTRA"
        step_size = stepsizes[algorithm]

        kwds = shared_params | {"gamma": step_size}

        tasks = [
            pool.apply_async(
                dco_task, args=(algorithm, u[name], v[name], name), kwds=kwds
            )
            for name in node_names
        ]

        results = [task.get() for task in tasks]