<a href="https://colab.research.google.com/github/jkim2260/Practice/blob/master/Federated_Learning_Avg.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



---
*   Number of clients: 100 (K=100)  -hyperparameter
*   Fraction of sampled clients: 0.1 (C=0.1) -hyperparameter
*   Number of rounds: 500 (R = 500)
*   Number of local epochs: 10 (E= 10) - hyperparameter
*   Mini Batch size: 10 (B=10) - hyperparameter
*   Optimizer: torch.optim.SGD
*   Criterion: torch.nn.CrossEntropyLoss
*   Learning rate: 0.01
*   Momentum: 0.9
*   Initialization: Xavier


Client.py

In [None]:
#Federated Averaging(FedAvg)
#Reference: Communication-Efficient Learning of Deep Networks for Decentralized Data
#multiprocessing of client update and client evaluation.
#log tracking: Support tensorboard

import gc
import pickle
import logging

import torch
import torch.nn as nn

from torch.utils.data import DataLoader

logger = logging.getLogger(__name__)

# 모델을 학습하기 위한 자체(비공개) 데이터 및 리소스가 있는 클라이언트 개체에 대한 클래스
# 참여클라이언트에는 일반적으로 다른 클라이언트에 비해 non-iid 자체 데이터 세트
#각 클라이언트는 학습된 매겨변수 또는 전역접으로 집계된 매개변수를 사용하여 글로벌 서버와 통신함
#속성: 클라이언트의 id를 나타내는 정수 ;#데이터: 로컬 데이터를 포함하는 torch,utils.data.Dataset 인스턴스
#device_훈련기계 표시기(예: "cpu", "cuda") #model:로컬모델로서의 torch.nn 인스턴스
    #Attributes:
        #id: Integer indicating client's id.
        #data: torch.utils.data.Dataset instance containing local data.
        #device: Training machine indicator (e.g. "cpu", "cuda").
        #__model: torch.nn instance as a local model.
    #"""
class Client(object):

   def __init__(self, client_id, local_data, device):
     #클라이언트 개체는 센터 서버에 의해 시작됨
       self.id = client_id
       self.data = local_data
       self.device = device
       self.__model = None

    @property #특성
    def model(self):
      #매개변수 집계를 위한 로컬 모델 getter
        return self.__model
    
    @model.setter
    def model(self, model):
    #전역적으로 집계뙨 모델 매개변수를 전달하기 위한 로컬 모델 setter
        self.__model = model

    def __len__(self):
    #클라이언트 로컬 데이터의 총 크기를 반환함
        return len(self.data)

    def setup(self, **client_config):
    #각 클라이언트의 공통 구성을 설정함; 센터 서버에서 호출함
        self.dataloader = DataLoader(self.data, batch_size=client_config["batch_size"], shuffle=True)
        self.local_epoch = client_config["num_local_epochs"]
        self.criterion = client_config["criterion"]
        self.optimizer = client_config["optimizer"]
        self.optim_config = client_config["optim_config"]

    def client_update(self):
    #로컬 데이터셋을 사용하여 로컬 모델을 업데이트함
        self.model.train()
        self.model.to(self,device)

        optimizer = eval(self.optimizer)(self.model.parameters(), **self.optim_config)
        for e in range(self.local_epoch):
            for data, labels in self.dataloader:
                data, labels = data.float().to(self.device), labels.long().to(self.device)

                optimizer.zero_grad()
                outputs = self.model(data)
                loss = eval(self.criterion)()(outputs, labels)

                loss.backward()
                optimizer.step()

                if self.device == "cuda": torch.cuda.empty_cache()
        self.model.to("cpu")

    def client_evaluate(self):
    # 로컬 데이터 세트를 사용하여 로컬 모델을 평가함(편의상 훈련세트와 동일함) 
        self.model.eval()
        self.model.to(self.device)

        test_loss, correct = 0, 0
        with torch.no_grad():
            for data, labels in self.dataloader:
                data, labels = data.float().to(self_device), labels.long().to(self.device)
                outputs = self.model(data)
                test_loss += eval(self.criterion)()(outputs, lables).item()

                predicted = outputs.argmax(dim=1. keepdim=True)
                correct += predicted.eq(labels.view_as(predicted)).sum().item()

                if self.device == "cuda": torch.cuda.empty_cache()
        self.model.to("cpu")

        test_loss = test_loss / len(self.dataloader)
        test_accuracy = correct / len(self.data)

        message = f"\t[Client {str(self.id).zfill(4)}]...finished evaluation!\
            \n\t=> Test loss: {test_loss:.4f}\
            \n\t=> Test accuracy: {100. * test_accuracy:.2f}%\n"
        print(message, flush=True); logging.info(message)
        del message; gc.collect()

        return test_loss, test_accuracy"

In [None]:
# model.py

import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F

#################################
# Models for federated learning #
#################################
# McMahan et al., 2016; 199,210 parameters

class TwoNN(nn.Module):
    def __init__(self, name, in_features, num_hiddens, num_classes):
        super(TwoNN, self).__init__()
        self.name = name
        self.activation = nn.ReLU(True)

        self.fc1 = nn.Linear(in_features=in_features, out_features=num_hiddens, bias=True)
        self.fc2 = nn.Linear(in_features=num_hiddens, out_features=num_hiddens, bias=True)
        self.fc3 = nn.Linear(in_features=num_hiddens, out_features=num_classes, bias=True)

    def forward(self, x):
        if x.ndim == 4:
            x = x.view(x.size(0), -1)  ##무슨의미?
        
        x = self.activation(self.fc1(x))
        x = self.activation(self.fc2(x))
        x = self.fc3(x)

        return x

# McMahan et al., 2016; 1,663,370 parameters
class CNN(nn.Module):
    def __init__(self, name, in_channels, hidden_channels, num_hiddens, num_classes):
        super(CNN, self).__init__()
        self.name = name
        self.activation = nn.ReLU(True)

        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=hidden_channels,
                               kernel_size=(5,5), padding=1, stride=1, bias = False)
        self.conv2 = nn.Conv2d(in_channels=hidden_channels, out_channels=hidden_channels*2,
                               kernel_size=(5,5), padding=1, stride=1, bias= False)
        
        self.maxpool1 = nn.MaxPool2d(kernel_size=(2,2), padding=1)
        self.maxpool2 = nn.MaxPool2d(kernel_size=(2,2), padding=1)
        self.flatten = nn.Flatten()

        self.fc1 = nn.Linear(in_features=(hidden_channels *2) * (7*7),
                             out_features = num_hiddens, bias=False)
        self.fc2= nn.Linear(in_features=num_hiddens, out_features=num_classes, bias=False)

    def forward(self,x):
        x = self.activation(self.conv1(x))
        x = self.maxpool1(x)

        x = self.activation(self.conv2(x))
        x = self.maxpool2(x)
        x = self.flatten(x)

        x = self.activation(self.fc1(x))
        x = self.fc2(x)

        return x

# for CIFAR10
class CNN2(nn.Module):
    def __init__(self, name, in_channels, hidden_channels, num_hiddens, num_classes):
        super(CNN2, self).__init__()
        self.name = name
        self.activation = nn.ReLU(True)

        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=hidden_channels, kernel_size=(5, 5), padding=1, stride=1, bias=False)
        self.conv2 = nn.Conv2d(in_channels=hidden_channels, out_channels=hidden_channels * 2, kernel_size=(5, 5), padding=1, stride=1, bias=False)
        
        self.maxpool1 = nn.MaxPool2d(kernel_size=(2, 2), padding=1)
        self.maxpool2 = nn.MaxPool2d(kernel_size=(2, 2), padding=1)
        self.flatten = nn.Flatten()

        self.fc1 = nn.Linear(in_features=(hidden_channels * 2) * (8 * 8), out_features=num_hiddens, bias=False)
        self.fc2 = nn.Linear(in_features=num_hiddens, out_features=num_classes, bias=False)

    def forward(self, x):
        x = self.activation(self.conv1(x))
        x = self.maxpool1(x)

        x = self.activation(self.conv2(x))
        x = self.maxpool2(x)
        x = self.flatten(x)
    
        x = self.activation(self.fc1(x))
        x = self.fc2(x)
        
        return x        

In [None]:
#server.py

import copy
import gc
import logging

import numpy as np
import torch
import torch.nn as nn

from multiprocessing import pool, cpu_count
from torch.utils.data import DataLoader
from tqdm.auto import tqdm
from collections import OrderedDict

from .models import *
from .utils import *
from .client import Client

logger = logging.getLogger(__name__)

#연합학습의 전체 프로세스를 조율하는 센터 서버 구현을 위한 클래스
#처음에 중앙서버는 모델 골격을 모든 참여 클라이언트의 구성과함께 배포함
#연합학습을 진행하는 동안 센터서버는 클라이언트의 일부를 샘플링하고
#로컬에서 업데이트된 매개변수를 수신하여 글로벌 매개변수(모델)로 평균화하여 글로벌 모델에 적용함

#다음라운드에서 새로 선택된 고객은 업데이트된 글로벌 모델을 로컬모델로 받게됨


#Arribute
#clients: 연합학습에 참여하는 클라이언트 인스턴스를 포함하는 목록임
#__round: 현재 연합 라운드를 나타내는 Int.
#writer: 메트릭 및 글로벌 모델의 손실을 추적하는 summarywriter 인스턴스임
#모델: 전역모델에 대한 torch.nn 인스턴스
#seed: random seed에 대한 초기값
#device: training machine indicator(e.g. "cpu", "cuda")
#mp_flag: "client_update" 및 "client_evaluate" 방법에 대한 다중처리 사용의 boolean indicator
#Boolean : True or Flase
#data_path: 데이터를 읽는 경로
#dataset_name: 데이터셋트의 이름
#shards: 파편수
#num_shards: non-iid 데이터 분할 시뮬레이션을 위한 샤드수('iid=False'인 경우에만 유효함)
#iid 데이터세트(IID or non-iid)를 분할하는 방법을 나타내는 boolean 표시기
#init_config: 모델초기화를 위한 kwargs
#fraction: 각 연합 라운드에서 선택된 클라이언트 수의 비율임
#num_clients: 총 참여 클라이언트 수(local model)
#local_epochs: 클라이언트 모델업데이트에 필요한 epoch
#batch_size: 클라이언트/글로벌 모델을 업데이트/평가하기 위한 배치크기임
#criterion: loss계산을 위한 torch.nn instance
#optimzer: 매개변수 업데이트를 위한 torch.optim instance
#optim_config: 옵티마이저를 위해 제공되는 kwargs
#python *args, **kwargs 의미와 예제
# *args: arguments 여러개의 인수를 함수로 받을 때 사용하는 표시임 (튜플형태)
# **kwargs : keyword argument 키워드 인수를 받을 때 사용하는 표시임 (딕셔너리 형태) {'키워드':'특정 값}
#함수의 파라미터순서: 일반 변수, *변수, **변수
#*변수 --> 여러개가 아규먼트로 들어올떄, 함수내부에서는 해당변수를 '튜플'로 처리함
#**변수 --> 키워드=''로 입력할 경우에 그것을 각각 키와 값으로 가져오는 '딕셔너리'로 처리함

class Server(object):
    def __init__(self, writier, model_config={}, global_config={}, data_config={}, init_config={},
                 fed_config={}, optim_config={}):
        self.clients = None
        self._round = 0
        self.writer =writer

        self.model = eval(model_config["name"])(**model_config)

        self.seed = global_config["seed"]
        self.device = global_config["device"]
        self.mp_flag = global_config["is_mp"]

        self.data_path = data_config["data_path"]
        self.dataset_name = data_config["dataset_name"]
        self.num_shards = data_config["num_shards"]
        self.iid = data_config["iid"]

        self.init_config = init_config

        self.fraction = fed_config["C"]
        self.num_clients = fed_config["k"]
        self.num_rounds = fed_config["R"]
        self.local_epochs = fed_config["E"]
        self.batch_size = fed_config["B"]

        self.criterion = fed_config["criterion"]
        self.optimizer = fed_config["optimizer"]
        self.optim_config = optim_config

    def setup(self, **init_kwargs):
      #연합학습을 위한 모든 구성요소를 설정함
      #첫 번째 라운드 이전에만 유효함(초기값)
      assert self._round == 0

      #모델의 초기 값 weights
      torch.manual_seed(self.seed)
      init_net(self.model, **self.init_config)

      message = f"[Round: {str(self._round).zfill(4)}] ...successfully initialized model (# parameters: {str(sum(p.numel() for p in self.model.parameters()))})!"
      print(message); logging.info(message)
      del message; gc.collect()

      #각클라이언트에 대한 로컬 데이터 세트 분할 
      local_datasets, test_dataset = create_datasets(self.data_path, self.dataset_name,
                                                     self.num_clients, self.num_shards, self.iid)
      
      # 각 클라이언트에 데이터셋 할당
      self.clients = self.create_clients(local_datasets)

      #평가를 위한 홀드아웃 데이터 세트 준비
      #홀드아웃 데이터는 학습에 사용하지 않은 데이터에 대한 모델의 일반화 능력을 평가하는 데 도움
      self.data = test_Dataset
      self.dataloader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)


      # 클라이언트 업데이트에 대한 세부 설정을 구성하고
      self.setup_clients(batch_size = self.batch_size,
                         criterion=serlf.criterion, num_local_epochs=self.local_epochs,
                         optimizer = self.optimizer, optim_config = self.optim_config)
      
      #모델 스켈레톤을 모든 클라이언트에게 보냄
      self.transmit_model()

      #각 클라이언트 인스턴스를 초기화함
      def create_clients(self, local_datasets):
          clients = [ ]
          for k, dataset in tqdm(enumerate(local_datasets), leave=False):
              cleint = Client(client_id=k, local_data=dataset, device=self.device)
              clients.append(client)

          message = f"[Round: {str(self._round_.zfill(4)}] ... succefully created all {str(self.num_clients)} clients!"
          print(message); logging.info(message)
          del message; gc.collect()
          return clients
      
      #각 클라이언트를 설정함
      def setup_clients(self, **client_config):
          for k, client in tqdm(enumerate(self.clients), leave=False):
              client.setup(**client_config)

         message = f"[Round: {str(self._round).zfill(4)}] ...successfully finished setup of all {str(self.num_clients)} clients!"
        print(message); logging.info(message)
        del message; gc.collect()

      #업데이트된 글로벌 모델을 선택한/모든 클라이언트에게 전송함
      def transmit_model(self, sampled_client_indices=None):
          #첫번째 연합라운드 이전과 마지막 연합 라운드 이후에 모든 클라이언트에게 글로벌 모델을 보냄
          if sampled_client_indicies is None:

              assert (self._round == 0) or (self.round == self.num_rounds)

              for client in tqdum(self.clients, leave=False):
                  client.model = copy.deepcopy(self.model)

            message = f"[Round: {str(self._round).zfill(4)}] ...successfully transmitted models to all {str(self.num_clients)} clients!"
            print(message); logging.info(message)
            del message; gc.collect()
        else:
          #선택한 클라이언트에게 글로벌 모델을 보냄
          assert self._round != 0

          for idx in tqdm(sampled_client_indices, leave=False):
              self.clients[idx].model = copy.deepcopy(self.model)

            message = f"[Round: {str(self._round).zfill(4)}] ...successfully transmitted models to {str(len(sampled_client_indices))} selected clients!"
            print(message); logging.info(message)
            del message; gc.collect()

        #전체 클라이언트 중 일부를 선택함
        # 무작위로 샘플 클라이언트
      def sample_clients(self):
            message = f"[Round: {str(self._round).zfill(4)}] Select clients...!"
            print(message); logging.info(message)
            del message; gc.collect()

        num_sampled_clients = max(int(self.fraction * self.num_clients), 1)
        sampled_client_indices = sorted(np.random.choice(a=[i for i in range(self.num_clients)], size=num_sampled_clients, replace=False).tolist())

        return sampled_client_indices

        #선택한 각 클라이언트의 client updat함수를 호출함
        #선택된 클라이언트 업데이트
           
      def update_selected_clients(self, sampled_client_indices):

          message = f"[Round: {str(self._round).zfill(4)}] Start updating selected {len(sampled_client_indices)} clients...!"
          print(message); logging.info(message)
          del message; gc.collect()

          selected_total_size = 0
          for idx in tqdm(sampled_client_indices, leave=False):
            self.clients[idx].client_update()
            selected_total_size += len(self.clients[idx])

          message = f"[Round: {str(self._round).zfill(4)}] ...{len(sampled_client_indices)} clients are selected and updated (with total sample size: {str(selected_total_size)})!"
          print(message); logging.info(message)
          del message; gc.collect()

          return selected_total_size

          #선정된 클라이언트 업데이트 방식의 멀티프로세싱 적용 버전임
          #선택한 클라이언트 업데이트

    def mp_update_selected_clients(self, selected_index):

        message = f"[Round: {str(self._round).zfill(4)}] Start updating selected client {str(self.clients[selected_index].id).zfill(4)}...!"
        print(message, flush=True); logging.info(message)
        del message; gc.collect()

        self.clients[selected_index].client_update()
        client_size = len(self.clients[selected_index])

        message = f"[Round: {str(self._round).zfill(4)}] ...client {str(self.clients[selected_index].id).zfill(4)} is selected and updated (with total sample size: {str(client_size)})!"
        print(message, flush=True); logging.info(message)
        del message; gc.collect()

        return client_size
        
      #선택한 각 클라이언트에서 업데이트 및 전송된 매개변수의 평균
    def average_model(self, sampled_client_indices, coefficients):
        message = f"[Round: {str(self._round).zfill(4)}] Aggregate updated weights of {len(sampled_client_indices)} clients...!"
        print(message); logging.info(message)
        del message; gc.collect()

        averaged_weights = OrderedDict()
        for it, idx in tqdm(enumerate(sampled_client_indices), leave=False):
            local_weights = self.clients[idx].model.state_dict()
            for key in self.model.state_dict().keys():
                if it == 0:
                    averaged_weights[key] = coefficients[it] * local_weights[key]
                else:
                    averaged_weights[key] += coefficients[it] * local_weights[key]
        self.model.load_state_dict(averaged_weights)

        message = f"[Round: {str(self._round).zfill(4)}] ...updated weights of {len(sampled_client_indices)} clients are successfully averaged!"
        print(message); logging.info(message)
        del message; gc.collect()
    
    #선택한 각 클라이언트의 "client_evaluate" 함수를 호출함
    def evaluate_selected_models(self, sampled_client_indices):
        message = f"[Round: {str(self._round).zfill(4)}] Evaluate selected {str(len(sampled_client_indices))} clients' models...!"
        print(message); logging.info(message)
        del message; gc.collect()

        for idx in sampled_client_indices:
            self.clients[idx].client_evaluate()

        message = f"[Round: {str(self._round).zfill(4)}] ...finished evaluation of {str(len(sampled_client_indices))} selected clients!"
        print(message); logging.info(message)
        del message; gc.collect()

    #evaluate 선정된 모델 방식의 멀티프로세싱 적용 버전임
    def mp_evaluate_selected_models(self, selected_index):
        self.clients[selected_index].client_evaluate()
        return True


    #연합 훈련을 함
    #클라이언트의 미리 정의된 부분을 임의로 선택함
    def train_federated_model(self):
        sampled_client_indices = self.sample_clients()

        #선택한 클라이언트에게 글로벌 모델을 보냄
        self.transmit_model(sampled_client_indices)

        #로컬 데이터 세트로 선택한 클라이언트를 업데이트함
        if self.mp_flag:
            with pool.ThreadPool(processes=cpu_count() - 1) as workhorse:
                selected_total_size = workhorse.map(self.mp_update_selected_clients, sampled_client_indices)
            selected_total_size = sum(selected_total_size)
        else:
            selected_total_size = self.update_selected_clients(sampled_client_indices)

        #로컬 데이터 세트로 선택한 클라이언트 평가(로컬 업데이트에 사용된 것과 동일)
        if self.mp_flag:
            message = f"[Round: {str(self._round).zfill(4)}] Evaluate selected {str(len(sampled_client_indices))} clients' models...!"
            print(message); logging.info(message)
            del message; gc.collect()

            with pool.ThreadPool(processes=cpu_count() - 1) as workhorse:
                workhorse.map(self.mp_evaluate_selected_models, sampled_client_indices)
        else:
            self.evaluate_selected_models(sampled_client_indices)

        #가중치의 평균 계수 계산
        mixing_coefficients = [len(self.clients[idx]) / selected_total_size for idx in sampled_client_indices]

        #선택한 클라이언트의 업데이트된 각 모델 매개변수의 평균을 내고 전역 모델을 업데이트 함
        self.average_model(sampled_client_indices, mixing_coefficients)
    #"""글로벌 홀드아웃 데이터 세트(self.data)를 사용하여 글로벌 모델을 평가합니다."""    
    def evaluate_global_model(self):
        self.model.eval()
        self.model.to(self.device)

        test_loss, correct = 0, 0
        with torch.no_grad():
            for data, labels in self.dataloader:
                data, labels = data.float().to(self.device), labels.long().to(self.device)
                outputs = self.model(data)
                test_loss += eval(self.criterion)()(outputs, labels).item()
                
                predicted = outputs.argmax(dim=1, keepdim=True)
                correct += predicted.eq(labels.view_as(predicted)).sum().item()
                
                if self.device == "cuda": torch.cuda.empty_cache()
        self.model.to("cpu")

        test_loss = test_loss / len(self.dataloader)
        test_accuracy = correct / len(self.data)
        return test_loss, test_accuracy

    """연합 학습의 전 과정을 실행합니다."""
    def fit(self):
        self.results = {"loss": [], "accuracy": []}
        for r in range(self.num_rounds):
            self._round = r + 1
            
            self.train_federated_model()
            test_loss, test_accuracy = self.evaluate_global_model()
            
            self.results['loss'].append(test_loss)
            self.results['accuracy'].append(test_accuracy)

            self.writer.add_scalars(
                'Loss',
                {f"[{self.dataset_name}]_{self.model.name} C_{self.fraction}, E_{self.local_epochs}, B_{self.batch_size}, IID_{self.iid}": test_loss},
                self._round
                )
            self.writer.add_scalars(
                'Accuracy', 
                {f"[{self.dataset_name}]_{self.model.name} C_{self.fraction}, E_{self.local_epochs}, B_{self.batch_size}, IID_{self.iid}": test_accuracy},
                self._round
                )

            message = f"[Round: {str(self._round).zfill(4)}] Evaluate global model's performance...!\
                \n\t[Server] ...finished evaluation!\
                \n\t=> Loss: {test_loss:.4f}\
                \n\t=> Accuracy: {100. * test_accuracy:.2f}%\n"            
            print(message); logging.info(message)
            del message; gc.collect()
        self.transmit_model()

In [None]:
#utils.py


import os
import logging

import numpy as np
import torch
import torch.nn as nn
import torch.nn.init as init ##??

from torch.utils.data import Dataset, TensorDataset, ConcatDataset
from torchvision import datasets, transforms

logger = logging.getLogger(__name__)

#######################
# TensorBaord setting #
#######################
def launch_tensor_board(log_path, port, host):
    #Tensor Board를 초기화하는 기능임 
    
    #인수
        #log_path: 로그가 저장되는 경로
        #port : TensorBoard를 시작하는데 사용되는 포트번호임
        #host: TensorBoard를 시작하는데 사용되는 주소임

  
    os.system(f"tensorboard --logdir={log_path} --port={port} --host={host}")
    return True

#########################
# Weight initialization #
#########################

#가중치 초기화

def init_weights(model, init_type, init_gain):
    #네트워크 가중치를 초기화하는 기능임
    
    #인수:
        #모델:초기화할 torch.nn 인스턴스
        #init_type: 초기화 방법의 이름(normal | xavier | kaiming | orthogonal).
        #init_gain: (normal | xavier | orthogonal)에 대한 배율 인수.
    
    #Reference:
        #https://github.com/DS3Lab/forest-prediction/blob/master/pix2pix/models/networks.py

    def init_func(m):
        classname = m.__class__.__name__
        if hasattr(m, 'weight') and (classname.find('Conv') != -1 or classname.find('Linear') != -1):
            if init_type == 'normal':
                init.normal_(m.weight.data, 0.0, init_gain)
            elif init_type == 'xavier':
                init.xavier_normal_(m.weight.data, gain=init_gain)
            elif init_type == 'kaiming':
                init.kaiming_normal_(m.weight.data, a=0, mode='fan_in')
            else:
                raise NotImplementedError(f'[ERROR] ...initialization method [{init_type}] is not implemented!')
            if hasattr(m, 'bias') and m.bias is not None:
                init.constant_(m.bias.data, 0.0)
        
        elif classname.find('BatchNorm2d') != -1 or classname.find('InstanceNorm2d') != -1:
            init.normal_(m.weight.data, 1.0, init_gain)
            init.constant_(m.bias.data, 0.0)   
    model.apply(init_func)

def init_net(model, init_type, init_gain, gpu_ids):
    #네트워크 가중치를 초기화하는 기능임
    
    #인수:
        #모델:초기화할 torch.nn.Module
        #init_type: 초기화 방법의 이름(normal | xavier | kaiming | orthogonal)l
        #init_gain: (normal | xavier | orthogonal)에 대한 배율 인수.
        #gpu_ids: 네트워크가 실행되는 GPU를 나타내는 목록 또는 정수. (예: [0, 1, 2], 0)
    
    #Returns:
        #초기화된 torch.nn.Module 인스턴스.

    if len(gpu_ids) > 0:
        assert(torch.cuda.is_available())
        model.to(gpu_ids[0])
        model = nn.DataParallel(model, gpu_ids)
    init_weights(model, init_type, init_gain)
    return model

#################
# Dataset split #
#################
class CustomTensorDataset(Dataset):
    #변환을 지원하는 TensorDataset
    def __init__(self, tensors, transform=None):
        assert all(tensors[0].size(0) == tensor.size(0) for tensor in tensors)
        self.tensors = tensors
        self.transform = transform

    def __getitem__(self, index):
        x = self.tensors[0][index]
        y = self.tensors[1][index]
        if self.transform:
            x = self.transform(x.numpy().astype(np.uint8))
        return x, y

    def __len__(self):
        return self.tensors[0].size(0)

def create_datasets(data_path, dataset_name, num_clients, num_shards, iid):
    #클라이언트에 배포하기 위해 전체 데이터 세트를 IID 또는 비 IID 방식으로 분할합니다."""
    dataset_name = dataset_name.upper()
    ## 존재하는 경우 torchvision.datasets에서 데이터 세트 가져오기
    if hasattr(torchvision.datasets, dataset_name):
        ## 데이터 세트마다 변형을 다르게 설정
        if dataset_name in ["CIFAR10"]:
            transform = torchvision.transforms.Compose(
                [
                    torchvision.transforms.ToTensor(),
                    torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                ]
            )
        elif dataset_name in ["MNIST"]:
            transform = torchvision.transforms.ToTensor()

        # prepare raw training & test datasets
        training_dataset = torchvision.datasets.__dict__[dataset_name](
            root=data_path,
            train=True,
            download=True,
            transform=transform
        )
        test_dataset = torchvision.datasets.__dict__[dataset_name](
            root=data_path,
            train=False,
            download=True,
            transform=transform
        )
    else:
        # 데이터세트를 찾을 수 없음 예외
        error_message = f"...dataset \"{dataset_name}\" is not supported or cannot be found in TorchVision Datasets!"
        raise AttributeError(error_message)

    # 그레이스케일 이미지 데이터셋을 위한 unsqueeze 채널 차원
    if training_dataset.data.ndim == 3: # convert to NxHxW -> NxHxWx1 # NxHxW -> NxHxWx1로 변환
        training_dataset.data.unsqueeze_(3)
    num_categories = np.unique(training_dataset.targets).shape[0]
    
    if "ndarray" not in str(type(training_dataset.data)):
        training_dataset.data = np.asarray(training_dataset.data)
    if "list" not in str(type(training_dataset.targets)):
        training_dataset.targets = training_dataset.targets.tolist()
    
    # iid 플래그에 따라 데이터셋 분할
    if iid:
        # 데이터 섞기
        shuffled_indices = torch.randperm(len(training_dataset))
        training_inputs = training_dataset.data[shuffled_indices]
        training_labels = torch.Tensor(training_dataset.targets)[shuffled_indices]

        # 데이터를 num_clients로 분할
        split_size = len(training_dataset) // num_clients
        split_datasets = list(
            zip(
                torch.split(torch.Tensor(training_inputs), split_size),
                torch.split(torch.Tensor(training_labels), split_size)
            )
        )

        # 로컬 데이터 세트 묶음을 마무리합니다.
        local_datasets = [
            CustomTensorDataset(local_dataset, transform=transform)
            for local_dataset in split_datasets
            ]
    else:
        # 레이블로 데이터 정렬
        sorted_indices = torch.argsort(torch.Tensor(training_dataset.targets))
        training_inputs = training_dataset.data[sorted_indices]
        training_labels = torch.Tensor(training_dataset.targets)[sorted_indices]

        # 먼저 데이터를 샤드로 분할
        shard_size = len(training_dataset) // num_shards #300
        shard_inputs = list(torch.split(torch.Tensor(training_inputs), shard_size))
        shard_labels = list(torch.split(torch.Tensor(training_labels), shard_size))

        # 목록을 정렬하여 두 개 이상의 클래스에서 각 클라이언트에 샘플을 편리하게 할당
        shard_inputs_sorted, shard_labels_sorted = [], []
        for i in range(num_shards // num_categories):
            for j in range(0, ((num_shards // num_categories) * num_categories), (num_shards // num_categories)):
                shard_inputs_sorted.append(shard_inputs[i + j])
                shard_labels_sorted.append(shard_labels[i + j])
                
        # 각 클라이언트에 샤드를 할당하여 로컬 데이터 세트를 마무리합니다.
        shards_per_clients = num_shards // num_clients
        local_datasets = [
            CustomTensorDataset(
                (
                    torch.cat(shard_inputs_sorted[i:i + shards_per_clients]),
                    torch.cat(shard_labels_sorted[i:i + shards_per_clients]).long()
                ),
                transform=transform
            ) 
            for i in range(0, len(shard_inputs_sorted), shards_per_clients)
        ]
    return local_datasets, test_dataset

In [None]:
#main.py

import os #?
import time #?
import datetime #?
import pickle #?
import yaml #?
import threading #?
import logging #?

import torch.nn as nn
from torch.utils.tensorboard import SummaryWriter

from src.server import Server
from src.utils import launch_tensor_board

if __name__ == "__main__":
    # read configuration file(yaml file)
    with open('./config.yaml') as c:
        configs = list(yaml.load_all(c, Loader=yaml.FullLoader))
    global_config = configs[0]["global_config"]
    data_config = configs[1]["data_config"]
    fed_config = configs[2]["fed_config"]
    optim_config = configs[3]["optim_config"]
    init_config = configs[4]["init_config"]
    model_config = configs[5]["model_config"]
    log_config = configs[6]["log_config"]

    #현재시간을 포함하도록 log_path 수정
    log_config["log_path"] = os.path.join(log_config["log_path"],
                                          str(datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")))

    #손실 및 메트릭 추적을 위해 TensorBoard 시작
    writer = SummaryWriter(log_dir=log_config["log_path"], filename_suffix="FL")
    tb_thread = threading.Thread(target=launch_tensor_board,
                                 args=([log_config["log_path"], log_config["tb_port"], log_config["tb_host"]])
                                 ).start()
    time.sleep(3.0)

    #글로벌 logger의 구성 설정
    logger = logging.getLogger(__name__)
    logging.basicConfig(
        filename=os.path.join(log_config["log_path"], log_config["log_name"]),
        level = logging.INFO,
        fromat = "[%(levelname)s]($(asctime)s) %(message)s",
        datefmt = "%Y/%m/%d/ %I:%M:%S %p")
    
    #디스플레이 및 로그실험 구성
    message = "\n[WELCOME] Unfolding configurations...!"
    print(message); logging.info(message)    

    for config in configs:
        print(config); logging.info(config)
    print()

    #연합학습 초기화
    central_server = Server(writer, model_config, global_config, data_config, init_config,
                            fed_config, optim_config)
    central_server.setup()

    #do federated learning
    central_sever.fit()

    #save resulting losses and metrics
    with open(os.path.join(log_config["log_path"], "result.pkl"), "wb") as f:
        picke.dump(central_server.results, f)

    # bye !
    message = "...done all learning process!\n...exit program!"
    print(message); logging.info(message)
    time.sleep(3); exit()