In [5]:
# !pip install flwr[simulation] torch pandas matplotlib scikit-learn
!pip install -q  torch torchvision matplotlib

In [7]:
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from torch import nn
from torch.utils.data import Dataset, DataLoader
from collections import OrderedDict
from typing import List, Tuple

import flwr as fl
from flwr.client import NumPyClient
from flwr.server.strategy import FedAvg
from flwr.simulation import run_simulation
from flwr.common import Metrics, Context
from flwr.client import ClientApp
from flwr.server import ServerApp, ServerAppComponents, ServerConfig

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")


Using device: cpu


In [None]:
NUM_CLIENTS = 10

def load_partitioned_dataset(partition_id: int):
    df = pd.read_csv(f"./docker_clients/data/products_{partition_id}.csv")  # placeholder path
    X = df[['rating', 'no_of_ratings', 'discount_price', 'actual_price']].values.astype(np.float32)
    y = df['main_category'].astype('category').cat.codes.values.astype(np.int64)
    
    class EcommerceDataset(Dataset):
        def __init__(self, X, y):
            self.X = torch.tensor(X)
            self.y = torch.tensor(y)
        def __len__(self):
            return len(self.X)
        def __getitem__(self, idx):
            return {"features": self.X[idx], "label": self.y[idx]}
    
    dataset = EcommerceDataset(X, y)
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    return DataLoader(train_dataset, batch_size=32, shuffle=True), DataLoader(val_dataset, batch_size=32)


In [16]:
class MLPNet(nn.Module):
    def __init__(self, input_size, hidden_size=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 10)  # Assuming 10 categories
        )
    def forward(self, x):
        return self.net(x)

def train(model, dataloader, epochs=1):
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    for _ in range(epochs):
        for batch in dataloader:
            X, y = batch['features'].to(DEVICE), batch['label'].to(DEVICE)
            optimizer.zero_grad()
            loss = criterion(model(X), y)
            loss.backward()
            optimizer.step()

def evaluate(model, dataloader):
    model.eval()
    criterion = nn.CrossEntropyLoss()
    correct = 0
    total = 0
    loss = 0.0
    with torch.no_grad():
        for batch in dataloader:
            X, y = batch['features'].to(DEVICE), batch['label'].to(DEVICE)
            outputs = model(X)
            loss += criterion(outputs, y).item()
            correct += (outputs.argmax(1) == y).sum().item()
            total += y.size(0)
    return loss / total, correct / total

def get_parameters(net):
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

def set_parameters(net, parameters):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


In [17]:
class EcommerceClient(NumPyClient):
    def __init__(self, model, trainloader, valloader):
        self.model = model
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        return get_parameters(self.model)

    def fit(self, parameters, config):
        set_parameters(self.model, parameters)
        train(self.model, self.trainloader, epochs=1)
        return get_parameters(self.model), len(self.trainloader.dataset), {}

    def evaluate(self, parameters, config):
        set_parameters(self.model, parameters)
        loss, accuracy = evaluate(self.model, self.valloader)
        return float(loss), len(self.valloader.dataset), {"accuracy": float(accuracy)}


In [18]:
def client_fn(context: Context):
    partition_id = context.node_config["partition-id"]
    model = MLPNet(input_size=4).to(DEVICE)
    trainloader, valloader = load_partitioned_dataset(partition_id)
    return EcommerceClient(model, trainloader, valloader).to_client()

client = ClientApp(client_fn=client_fn)

def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    total = sum(num_examples for num_examples, _ in metrics)
    acc = sum(num_examples * m["accuracy"] for num_examples, m in metrics)
    return {"accuracy": acc / total}

def server_fn(context: Context):
    strategy = FedAvg(
        fraction_fit=1.0,
        fraction_evaluate=0.5,
        min_fit_clients=NUM_CLIENTS,
        min_evaluate_clients=5,
        min_available_clients=NUM_CLIENTS,
        evaluate_metrics_aggregation_fn=weighted_average,
    )
    config = ServerConfig(num_rounds=5)
    return ServerAppComponents(strategy=strategy, config=config)

server = ServerApp(server_fn=server_fn)


In [19]:
backend_config = {"client_resources": {"num_cpus": 1, "num_gpus": 1.0 if DEVICE.type == "cuda" else 0.0}}

run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_CLIENTS,
    backend_config=backend_config,
)


[92mINFO [0m:      Starting Flower ServerApp, config: num_rounds=5, no round_timeout
[91mERROR [0m:     Backend `ray`, is not supported. Use any of [] or add support for a new backend.


[92mINFO [0m:      
[91mERROR [0m:     Unable to import module `ray`.

    To install the necessary dependencies, install `flwr` with the `simulation` extra:

        pip install -U "flwr[simulation]"
    
[92mINFO [0m:      [INIT]
[91mERROR [0m:     An exception occurred !! 'ray'
[92mINFO [0m:      Requesting initial parameters from one random client
[91mERROR [0m:     Traceback (most recent call last):
  File "/Users/sathwik/opt/anaconda3/envs/flec_env/lib/python3.9/site-packages/flwr/simulation/run_simulation.py", line 370, in _main_loop
    vce.start_vce(
  File "/Users/sathwik/opt/anaconda3/envs/flec_env/lib/python3.9/site-packages/flwr/server/superlink/fleet/vce/vce_api.py", line 331, in start_vce
    raise ex
  File "/Users/sathwik/opt/anaconda3/envs/flec_env/lib/python3.9/site-packages/flwr/server/superlink/fleet/vce/vce_api.py", line 319, in start_vce
    backend_type = supported_backends[backend_name]
KeyError: 'ray'



KeyboardInterrupt: 