In [None]:
!pip install nvflare torch torchvision tensorboard opacus

In [2]:
import torchvision
DATASET_PATH = "/tmp/nvflare/data"
torchvision.datasets.CIFAR10(root=DATASET_PATH, download=True)

Dataset CIFAR10
    Number of datapoints: 50000
    Root location: /tmp/nvflare/data
    Split: Train

In [3]:
!mkdir src

mkdir: cannot create directory ‘src’: File exists


In [4]:
%%writefile src/net.py
import torch
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)
        self.dopout = nn.Dropout(0.1) # Orjinaldeki typo'yu korudum, 'dropout' olabilir

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)  # flatten all dimensions except batch
        x = self.dopout(x)
        x = F.relu(self.fc1(x))
        x = self.dopout(x)
        x = F.relu(self.fc2(x))
        x = self.dopout(x)
        x = self.fc3(x)
        return x

Overwriting src/net.py


In [7]:
from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob
from nvflare.job_config.script_runner import ScriptRunner
from src.net import Net

job = BaseFedJob(
    name="cifar10_fedavg",
    initial_model=Net(),
)

In [8]:
n_clients = 2

controller = FedAvg(
    num_clients=n_clients,
    num_rounds=3,  # 30 rounds should converge
)
job.to(controller, "server")

In [9]:

%%writefile src/cifar10_fl.py
# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse

import torch
import torch.nn as nn
import torch.nn.functional as F # Added for Net class
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from net import Net # Removed this import
from opacus import PrivacyEngine

# (1) import nvflare client API
import nvflare.client as flare

# (optional) metrics
from nvflare.client.tracking import SummaryWriter


# (optional) set a fix place so we don't need to download everytime
DATASET_PATH = "/tmp/nvflare/data"
# If available, we use GPU to speed things up.
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Running on device {DEVICE}")


def main(target_epsilon, max_grad_norm):
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    batch_size = 32
    epochs = 1

    trainset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=True, download=True, transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

    testset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=False, download=True, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

    net = Net() # Now uses the class defined above

    # (2) initializes NVFlare client API
    flare.init()

    # (Optional) compute unique seed from client name to initialize data loaders
    client_name = flare.get_site_name()
    seed = int.from_bytes(client_name.encode(), "big")
    torch.manual_seed(seed)

    # Define loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9)

    # Optionally add DP engine
    if target_epsilon:
        target_delta = 1e-5
        print(f"Adding privacy engine with epsilon={target_epsilon}, delta={target_delta}")
        privacy_engine = PrivacyEngine()
        # Need to handle potential .receive() call here for total_rounds if flare is initialized
        # Assuming flare.init() might not fully initialize context outside main loop start
        # A placeholder or default value might be needed, or refactor to get total_rounds later.
        # For simplicity, let's assume total_rounds can be accessed or estimated.
        # A robust solution might require fetching total_rounds differently or setting a high default.
        estimated_total_rounds = 10 # Example placeholder, adjust as needed
        total_epochs_for_dp = epochs * estimated_total_rounds 

        net, optimizer, trainloader = privacy_engine.make_private_with_epsilon(
            module=net,
            optimizer=optimizer,
            data_loader=trainloader,
            target_epsilon=target_epsilon,
            target_delta=target_delta,
            epochs=total_epochs_for_dp, # Use estimated total epochs
            max_grad_norm=max_grad_norm,
        )

    summary_writer = SummaryWriter()
    first_round = True # Flag to potentially recalculate DP epochs if needed

    while flare.is_running():
        # (3) receives FLModel from NVFlare
        input_model = flare.receive()
        print(f"current_round={input_model.current_round}, total_rounds={input_model.total_rounds}")

        # If DP is enabled and it's the first round, potentially re-attach DP engine with correct total_rounds
        if target_epsilon and first_round and hasattr(privacy_engine, 'steps'):
            print("Re-calculating DP parameters based on received total_rounds.")
            # Detach existing engine if possible (Opacus API might vary)
            # This part is complex as Opacus doesn't easily support dynamic epoch changes after make_private
            # A simpler approach might be to accept the initial estimate or require total_rounds upfront
            # For this example, we'll proceed with the initial estimate.
            # A more advanced implementation might involve custom Accountant or careful state management.
            pass # Sticking with initial DP setup for simplicity in this example
        first_round = False


        # (4) loads model from NVFlare
        if target_epsilon and hasattr(net, '_module'): # Check if Opacus wrapped the model
            global_params = {}
            for k, v in input_model.params.items():
                global_params[f"_module.{k}"] = v
        else:
            global_params = input_model.params
        
        try:
             net.load_state_dict(global_params)
        except RuntimeError as e:
             print(f"Error loading state dict: {e}")
             print("Possibly due to DP wrapper mismatch. Trying to load into underlying module if DP enabled.")
             if target_epsilon and hasattr(net, '_module'):
                  # Try loading directly into the wrapped module if keys match input_model.params
                  try:
                      net._module.load_state_dict(input_model.params)
                      print("Loaded state dict into underlying module.")
                  except Exception as inner_e:
                      print(f"Failed loading into underlying module as well: {inner_e}")
                      # Handle error appropriately, maybe skip round or raise
             else:
                  # If not DP or loading underlying failed, re-raise or handle
                  raise e


        net.to(DEVICE)
        net.train()
        steps = epochs * len(trainloader)
        for epoch in range(epochs):

            running_loss = 0.0
            for i, data in enumerate(trainloader, 0):
                inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE)

                optimizer.zero_grad()

                outputs = net(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                running_loss += loss.item()
                if i % 100 == 99:
                    print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 100:.3f}")
                    global_step = input_model.current_round * steps + epoch * len(trainloader) + i

                    summary_writer.add_scalar(
                        tag="loss_for_each_batch", scalar=running_loss / 100, global_step=global_step
                    )
                    running_loss = 0.0

                    if target_epsilon and hasattr(privacy_engine, 'get_epsilon'):
                        epsilon = privacy_engine.get_epsilon(target_delta)
                        print(f"Training with privacy (ε = {epsilon:.2f}, δ = {target_delta})")

        print("Finished Training")

        PATH = "./cifar_net.pth"
        torch.save(net.state_dict(), PATH)

        def evaluate(input_weights):
            eval_net = Net() # Use the class defined in this file
            # Adjust keys if loading a DP-modified global model for evaluation
            if target_epsilon and any(k.startswith("_module.") for k in input_weights.keys()):
                 # If global model has DP prefix, load into DP-wrapped structure or strip prefix
                 # Simpler: strip prefix for evaluation if eval_net isn't DP-wrapped
                 eval_weights_adjusted = {k.replace("_module.", ""): v for k, v in input_weights.items()}
                 eval_net.load_state_dict(eval_weights_adjusted)

            elif target_epsilon and not any(k.startswith("_module.") for k in input_weights.keys()):
                 # DP client evaluating non-DP global model? Might happen depending on FL setup.
                 # Load directly.
                 eval_net.load_state_dict(input_weights)
            else:
                 # Non-DP case
                 eval_net.load_state_dict(input_weights)

            eval_net.to(DEVICE)
            eval_net.eval()

            correct = 0
            total = 0
            with torch.no_grad():
                for data in testloader:
                    images, labels = data[0].to(DEVICE), data[1].to(DEVICE)
                    outputs = eval_net(images)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            accuracy = 100 * correct / total # Use floating point division for accuracy
            print(f"Accuracy of the network on the 10000 test images: {accuracy:.2f} %")
            return accuracy # Return float accuracy

        # Evaluate potentially DP-modified global model
        accuracy = evaluate(input_model.params)
        summary_writer.add_scalar(tag="global_model_accuracy", scalar=accuracy, global_step=input_model.current_round)
        
        # Prepare local model parameters for sending back
        net.cpu() # Move model to CPU before getting state_dict
        if target_epsilon and hasattr(net, '_module'):
            local_params = {}
            # Get state_dict from the original module wrapped by Opacus
            for k, v in net._module.state_dict().items():
                local_params[k] = v # Use original keys without "_module."
        else:
            local_params = net.state_dict()

        output_model = flare.FLModel(
            params=local_params,
            metrics={"accuracy": accuracy}, # Sending float accuracy
            meta={"NUM_STEPS_CURRENT_ROUND": steps},
        )
        flare.send(output_model)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--target_epsilon", type=float, default=None, help="Target epsilon for DP training. If None, DP is disabled.")
    parser.add_argument("--max_grad_norm", type=float, default=1.0, help="Max grad norm for DP training.")
    args = parser.parse_args()

    # Create src directory if it doesn't exist (useful if running %%writefile in a clean env)
    import os
    os.makedirs("src", exist_ok=True)

    main(target_epsilon=args.target_epsilon, max_grad_norm=args.max_grad_norm)

Overwriting src/cifar10_fl.py


In [10]:
for i in range(n_clients):
    runner = ScriptRunner(
        script="src/cifar10_fl.py"
    )
    job.to(runner, f"site-{i+1}")

In [11]:
job.simulator_run(f"/tmp/nvflare/{job.name}")

[38m2025-10-26 17:43:29,269 - INFO - model selection weights control: {}[0m


2025-10-26 17:43:32.480406: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1761500612.530859     289 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1761500612.548070     289 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/tensorboard/compat/__init__.py", line 42, in tf
    from tensorboard.compat import notf  # noqa: F401
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ImportError: cannot import name 'notf' from 'tensorboard.compat' (/usr/local/lib/python3.11/dist-packages/tensorboard/compat/__init__.py)

During handling of the above exception, another exception occurred:

Attr

[38m2025-10-26 17:43:37,666 - INFO - Tensorboard records can be found in /tmp/nvflare/cifar10_fedavg/server/simulate_job/tb_events you can view it using `tensorboard --logdir=/tmp/nvflare/cifar10_fedavg/server/simulate_job/tb_events`[0m
[38m2025-10-26 17:43:37,669 - INFO - Initializing BaseModelController workflow.[0m
[38m2025-10-26 17:43:37,670 - INFO - Beginning model controller run.[0m
[38m2025-10-26 17:43:37,671 - INFO - 
                                 Start FedAvg.                                  
[0m
[38m2025-10-26 17:43:37,671 - INFO - loading initial model from persistor[0m
[38m2025-10-26 17:43:37,672 - INFO - Both source_ckpt_file_full_name and ckpt_preload_path are not provided. Using the default model weights initialized on the persistor side.[0m
[38m2025-10-26 17:43:37,673 - INFO - 
--------------------------------------------------------------------------------
                                Round 0 started.                                
----------------

## Add DP as an NVFlare Filter

In [13]:
from nvflare import FilterType
from nvflare.client.config import TransferType
from nvflare.app_common.filters import SVTPrivacy

# Create BaseFedJob with the initial model
job = BaseFedJob(
  name="cifar10_fedavg_dp",
  initial_model=Net(),
)

# Define the controller and send to server
controller = FedAvg(
    num_clients=n_clients,
    num_rounds=3,  # 100 rounds should converge
)
job.to_server(controller)

# Add clients
for i in range(n_clients):
    runner = ScriptRunner(
        script="src/cifar10_fl.py",
        params_transfer_type=TransferType.DIFF
    )
    job.to(runner, f"site-{i+1}")

    # add privacy filter.
    dp_filter = SVTPrivacy(fraction=0.9, epsilon=0.1, noise_var=0.1, gamma=1e-5)
    job.to(dp_filter, f"site-{i+1}", tasks=["train"], filter_type=FilterType.TASK_RESULT)

# Optionally export the configuration
job.export_job("job_configs")

In [14]:
job.simulator_run(f"/tmp/nvflare/{job.name}")

[38m2025-10-26 18:01:42,583 - INFO - model selection weights control: {}[0m


2025-10-26 18:01:45.850046: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1761501705.898844     589 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1761501705.914501     589 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/tensorboard/compat/__init__.py", line 42, in tf
    from tensorboard.compat import notf  # noqa: F401
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ImportError: cannot import name 'notf' from 'tensorboard.compat' (/usr/local/lib/python3.11/dist-packages/tensorboard/compat/__init__.py)

During handling of the above exception, another exception occurred:

Attr

[38m2025-10-26 18:01:51,004 - INFO - Tensorboard records can be found in /tmp/nvflare/cifar10_fedavg_dp/server/simulate_job/tb_events you can view it using `tensorboard --logdir=/tmp/nvflare/cifar10_fedavg_dp/server/simulate_job/tb_events`[0m
[38m2025-10-26 18:01:51,006 - INFO - Initializing BaseModelController workflow.[0m
[38m2025-10-26 18:01:51,007 - INFO - Beginning model controller run.[0m
[38m2025-10-26 18:01:51,008 - INFO - 
                                 Start FedAvg.                                  
[0m
[38m2025-10-26 18:01:51,008 - INFO - loading initial model from persistor[0m
[38m2025-10-26 18:01:51,009 - INFO - Both source_ckpt_file_full_name and ckpt_preload_path are not provided. Using the default model weights initialized on the persistor side.[0m
[38m2025-10-26 18:01:51,010 - INFO - 
--------------------------------------------------------------------------------
                                Round 0 started.                                
----------