In [2]:
from openfl.interface.interactive_api.federation import Federation

In [32]:
# !fx director start --disable-tls -c director/director.yaml # start the director

In [33]:
#! sudo apt install gfortran cmake libopenblas-base libopenblas-dev libatlas-base-dev
#! ENVOY_NAME=envoy_example_name fx envoy start \
#     -n "$ENVOY_NAME" \
#     --disable-tls \
#     --envoy-config-path envoy_config.yaml \
#     -dh localhost \
#     -dp 50051

In [34]:
federation = Federation(client_id='client_id', director_node_fqdn='localhost', director_port='50051',tls=False, cert_chain=None, api_cert=None, api_private_key=None)

In [35]:
federation.get_shard_registry()

{'envoy-one': {'shard_info': node_info {
    name: "envoy-one"
  }
  sample_shape: "28"
  sample_shape: "28"
  target_shape: "10",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2023-02-28 03:22:15',
  'current_time': '2023-02-28 03:22:37',
  'valid_duration': seconds: 120,
  'experiment_name': 'ExperimentName Mock'},
 'envoy-two': {'shard_info': node_info {
    name: "envoy-two"
  }
  sample_shape: "28"
  sample_shape: "28"
  target_shape: "10",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2023-02-28 03:22:19',
  'current_time': '2023-02-28 03:22:37',
  'valid_duration': seconds: 120,
  'experiment_name': 'ExperimentName Mock'},
 'envoy-three': {'shard_info': node_info {
    name: "envoy-three"
  }
  sample_shape: "28"
  sample_shape: "28"
  target_shape: "10",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2023-02-28 03:22:30',
  'current_time': '2023-02-28 03:22:37',
  'valid_duration': seconds: 120,


In [36]:
from openfl.interface.interactive_api.experiment import FLExperiment
from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface

In [37]:
fl_experiment = FLExperiment(federation=federation, experiment_name="pranav's test")

In [38]:
from torch import nn
import torch as t
import torch.nn.functional as F
import torch.optim as optim

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.model = nn.Sequential(
            nn.Flatten(),
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(64, 10),
        )
    def forward(self, x):
        return self.model(x)

model = Net()
loss_func = nn.CrossEntropyLoss()

# class XOR(nn.Module):
#     def __init__(self, input_dim = 2, output_dim=1):
#         super(XOR, self).__init__()
#         self.lin1 = nn.Linear(input_dim, 4)
#         self.lin2 = nn.Linear(4, output_dim)

#     def forward(self, x):
#         x = self.lin1(x)
#         x = F.sigmoid(x)
#         x = self.lin2(x)
#         return x


# model = XOR()
# loss_func = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [39]:
from openfl.interface.interactive_api.experiment import ModelInterface
MI = ModelInterface(model, optimizer, framework_plugin='openfl.plugins.frameworks_adapters.pytorch_adapter.FrameworkAdapterPlugin')

In [40]:
from torch.utils.data import DataLoader


class CustomDataInterface(DataInterface):
    def __init__(self, **kwargs):
        # Initialize superclass with kwargs: this array will be passed
        # to get_data_loader methods
        super().__init__(**kwargs)

    @property
    def shard_descriptor(self):
        return self._shard_descriptor

    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        """
        Describe per-collaborator procedures or sharding.

        This method will be called during a collaborator initialization.
        Local shard_descriptor  will be set by Envoy.
        """
        self._shard_descriptor = shard_descriptor

        self.train_set = self._shard_descriptor.get_dataset('train')
        self.valid_set = self._shard_descriptor.get_dataset('val')


    def get_train_loader(self, **kwargs):
        """
        Output of this method will be provided to tasks with optimizer in contract
        """
        return DataLoader(
            self.train_set, batch_size=32, shuffle=True
        )
    def get_valid_loader(self, **kwargs):
        """
        Output of this method will be provided to tasks with optimizer in contract
        """
        return DataLoader(
            self.valid_set, batch_size=32, shuffle=True
        )

    def get_train_data_size(self):
        """
        Information for aggregation
        """
        return len(self.train_set)

    def get_valid_data_size(self):
        """
        Information for aggregation
        """
        return len(self.valid_set)

DI = CustomDataInterface()

In [41]:
## FEDCURV

import numpy as np
import tqdm
from openfl.utilities.fedcurv.torch import FedCurv
from openfl.interface.aggregation_functions import FedCurvWeightedAverage

TI = TaskInterface()

task_settings = {
    'batch_size': 32,
}
fedcurv = FedCurv(MI.provide_model(), importance=1e7)

@TI.add_kwargs(**task_settings)
@TI.register_fl_task(model='model_', data_loader='train_loader',
                     device='device', optimizer='optim')
@TI.set_aggregation_function(FedCurvWeightedAverage())
def train(model_, train_loader, optim, device, batch_size):
    fedcurv.on_train_begin(model_)
    train_loader = tqdm.tqdm(train_loader, desc="train")
    model_.train()
    model_.to(device)

    losses = []

    for data, target in train_loader:
        optim.zero_grad()
        y_hat = model(data.to(dtype=t.float))
        loss = loss_func(y_hat, target)
        loss.backward()
        optim.step()
        losses.append(loss.mean().item())
    
    fedcurv.on_train_end(model_, train_loader, device)
    
    return {'train_loss': np.mean(losses),}

@TI.register_fl_task(model='model_', data_loader='val_loader', device='device')
def validate(model_, val_loader, device):
    device = t.device('cpu')
    model_.eval()
    model_.to(device)

    val_loader = tqdm.tqdm(val_loader, desc="validate")
    val_score = 0
    total_samples = 0

    with t.no_grad():
        for data, target in val_loader:
            samples = target.shape[0]
            total_samples += samples
            # data, target = t.tensor(data).to(device), t.tensor(target).to(device, dtype=t.int64)
            output = model_(data.to(dtype=t.float))
            pred = output.argmax(dim=1)
            val_score += pred.eq(target.to()).sum().cpu().numpy()

    return {'acc': val_score / total_samples,}

In [42]:
## FEDOPT

import numpy as np
import tqdm
from openfl.interface.aggregation_functions import AdagradAdaptiveAggregation

TI = TaskInterface()

task_settings = {
    'batch_size': 32,
}
agg_fn = AdagradAdaptiveAggregation(model_interface=MI,
                                    learning_rate=0.4)

@TI.add_kwargs(**task_settings)
@TI.register_fl_task(model='model_', data_loader='train_loader',
                     device='device', optimizer='optim')
@TI.set_aggregation_function(agg_fn)
def train(model_, train_loader, optim, device, batch_size):
    train_loader = tqdm.tqdm(train_loader, desc="train")
    model_.train()
    model_.to(device)

    losses = []

    for data, target in train_loader:
        optim.zero_grad()
        y_hat = model(data.to(dtype=t.float))
        loss = loss_func(y_hat, target)
        loss.backward()
        optim.step()
        losses.append(loss.mean().item())

    return {'train_loss': np.mean(losses),}

@TI.register_fl_task(model='model_', data_loader='val_loader', device='device')
def validate(model_, val_loader, device):
    device = t.device('cpu')
    model_.eval()
    model_.to(device)

    val_loader = tqdm.tqdm(val_loader, desc="validate")
    val_score = 0
    total_samples = 0

    with t.no_grad():
        for data, target in val_loader:
            samples = target.shape[0]
            total_samples += samples
            # data, target = t.tensor(data).to(device), t.tensor(target).to(device, dtype=t.int64)
            output = model_(data.to(dtype=t.float))
            pred = output.argmax(dim=1)
            val_score += pred.eq(target.to()).sum().cpu().numpy()

    return {'acc': val_score / total_samples,}

In [44]:
## FEDAVG

import numpy as np
import tqdm
from openfl.utilities.fedcurv.torch import FedCurv
from openfl.interface.aggregation_functions import FedCurvWeightedAverage

TI = TaskInterface()

task_settings = {
    'batch_size': 32,
}
@TI.add_kwargs(**task_settings)
@TI.register_fl_task(model='model_', data_loader='train_loader',
                     device='device', optimizer='optim')
def train(model_, train_loader, optim, device, batch_size):
    train_loader = tqdm.tqdm(train_loader, desc="train")
    model_.train()
    model_.to(device)

    losses = []

    for data, target in train_loader:
        optim.zero_grad()
        y_hat = model(data.to(dtype=t.float))
        loss = loss_func(y_hat, target)
        loss.backward()
        optim.step()
        losses.append(loss.mean().item())

    return {'train_loss': np.mean(losses),}

@TI.register_fl_task(model='model_', data_loader='val_loader', device='device')
def validate(model_, val_loader, device):
    device = t.device('cpu')
    model_.eval()
    model_.to(device)

    val_loader = tqdm.tqdm(val_loader, desc="validate")
    val_score = 0
    total_samples = 0

    with t.no_grad():
        for data, target in val_loader:
            samples = target.shape[0]
            total_samples += samples
            # data, target = t.tensor(data).to(device), t.tensor(target).to(device, dtype=t.int64)
            output = model_(data.to(dtype=t.float))
            pred = output.argmax(dim=1)
            val_score += pred.eq(target.to()).sum().cpu().numpy()

    return {'acc': val_score / total_samples,}

In [45]:
fl_experiment.start(
    model_provider=MI,
    data_loader=DI,
    task_keeper=TI,
    rounds_to_train=3,
    opt_treatment='CONTINUE_GLOBAL'
)

In [46]:
fl_experiment.stream_metrics()

_MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "Socket closed"
	debug_error_string = "{"created":"@1677534956.916695728","description":"Error received from peer ipv6:[::1]:50051","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"Socket closed","grpc_status":14}"
>

In [96]:
fl_experiment.remove_experiment_data()

In [95]:
# fl_experiment.get_last_model()

Net(
  (model): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=784, out_features=128, bias=True)
    (2): ReLU()
    (3): Dropout(p=0.1, inplace=False)
    (4): Linear(in_features=128, out_features=64, bias=True)
    (5): ReLU()
    (6): Dropout(p=0.1, inplace=False)
    (7): Linear(in_features=64, out_features=10, bias=True)
  )
)