In [None]:
from typing import Any, Dict, List, Optional, Tuple
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader

from data import MNISTDataset, FederatedSampler
from models import CNN, MLP
from utils import arg_parser, average_weights, Logger

#FedAvg 알고리즘 클래스 정의, 연합학습의 대표 알고리즘인 FedAvg 논문 기반
class FedAvg:
    """Implementation of FedAvg
    http://proceedings.mlr.press/v54/mcmahan17a/mcmahan17a.pdf
    """
    #외부에서 하이퍼파라미터 등 설정값들을 args로 전달받음
    def __init__(self, args: Dict[str, Any]):
        #GPU 사용 가능 시 지정된 device 번호로 CUDA 사용, 그렇지 않으면 CPU 사용
        self.args = args
        self.device = torch.device(
            f"cuda:{args.device}" if torch.cuda.is_available() else "cpu"
        )
        #결과 기록용 Logger 객체 초기화
        self.logger = Logger(args)
        #학습용 데이터 및 테스트 데이터를 FederatedSampler를 이용해 로딩
        self.train_loader, self.test_loader = self._get_data(
            root=self.args.data_root,
            n_clients=self.args.n_clients,
            n_shards=self.args.n_shards,
            non_iid=self.args.non_iid,
        )
        #MLP 또는 CNN 중 하나를 선택해서 서버 초기 모델로 설정
        if self.args.model_name == "mlp":
            self.root_model = MLP(input_size=784, hidden_size=128, n_classes=10).to(
                self.device
            )
            self.target_acc = 0.97
        elif self.args.model_name == "cnn":
            self.root_model = CNN(n_channels=1, n_classes=10).to(self.device)
            self.target_acc = 0.99
        else:
            raise ValueError(f"Invalid model name, {self.args.model_name}")
        #목표 정확도 설정
        self.reached_target_at = None  # type: int

    ## [참여 skew 전략 삽입 위치] 클라이언트 참여 이력 저장 구조 초기화 필요
    ## self.client_last_participated = [...]


    ## [라벨 skew 전략 삽입 위치] 클라이언트 라벨 커버리지 계산 및 저장 필요
    ## self.client_label_coverage = self._compute_label_coverage()

    #연합학습용 데이터 분할 함수 정의
    def _get_data(
        self, root: str, n_clients: int, n_shards: int, non_iid: int
    ) -> Tuple[DataLoader, DataLoader]:
        """
        Args:
            root (str): path to the dataset.
            n_clients (int): number of clients.
            n_shards (int): number of shards.
            non_iid (int): 0: IID, 1: Non-IID

        Returns:
            Tuple[DataLoader, DataLoader]: train_loader, test_loader
        """
        #MNIST 데이터셋을 학습/테스트용으로 불러옴
        train_set = MNISTDataset(root=root, train=True)
        test_set = MNISTDataset(root=root, train=False)
        #클라이언트 수, shard 수, non-IID 여부에 따라 federated 샘플링 전략 적용
        sampler = FederatedSampler(
            train_set, non_iid=non_iid, n_clients=n_clients, n_shards=n_shards
        )
        #미니배치 생성
        train_loader = DataLoader(train_set, batch_size=128, sampler=sampler)
        test_loader = DataLoader(test_set, batch_size=128)

        return train_loader, test_loader
    #서버 모델을 복사해 클라이언트에서 학습 수행
    def _train_client(
        self, root_model: nn.Module, train_loader: DataLoader, client_idx: int
    ) -> Tuple[nn.Module, float]:
        """Train a client model.

        Args:
            root_model (nn.Module): server model.
            train_loader (DataLoader): client data loader.
            client_idx (int): client index.

        Returns:
            Tuple[nn.Module, float]: client model, average client loss.
        """
        #서버 모델을 클라이언트용으로 깊은 복사 후 학습 모드로 전환
        model = copy.deepcopy(root_model)
        model.train()
        #SGD 옵티마이저 설정
        optimizer = torch.optim.SGD(
            model.parameters(), lr=self.args.lr, momentum=self.args.momentum
        )
        #클라이언트 local epoch 수만큼 반복
        for epoch in range(self.args.n_client_epochs):
            epoch_loss = 0.0
            epoch_correct = 0
            epoch_samples = 0
            #각 미니배치에 대해 forward → 손실 계산 → 역전파(backward) → 파라미터 업데이트 수행
            for idx, (data, target) in enumerate(train_loader):
                data, target = data.to(self.device), target.to(self.device)
                optimizer.zero_grad()

                logits = model(data)
                loss = F.nll_loss(logits, target)
                loss.backward()
                optimizer.step()

                epoch_loss += loss.item()
                epoch_correct += (logits.argmax(dim=1) == target).sum().item()
                epoch_samples += data.size(0)

            # Calculate average accuracy and loss
            epoch_loss /= idx
            epoch_acc = epoch_correct / epoch_samples

            print(
                f"Client #{client_idx} | Epoch: {epoch}/{self.args.n_client_epochs} | Loss: {epoch_loss} | Acc: {epoch_acc}",
                end="\r",
            )
        #학습된 클라이언트 모델과 평균 손실 반환
        return model, epoch_loss / self.args.n_client_epochs
    #서버 입장에서 전체 FedAvg 프로세스 실행
    def train(self) -> None:
        """Train a server model."""
        train_losses = []
        #전체 연합 라운드 반복
        for epoch in range(self.args.n_epochs):
            clients_models = []
            clients_losses = []


  ##[참여 skew 전략 삽입 위치] 참여 기록 기반 확률적 클라이언트 샘플링으로 개선 가능


            #매 라운드에서 참여할 클라이언트 랜덤 선택
            # Randomly select clients
            m = max(int(self.args.frac * self.args.n_clients), 1)
            idx_clients = np.random.choice(range(self.args.n_clients), m, replace=False)

            # Train clients
            self.root_model.train()

            for client_idx in idx_clients:
                # Set client in the sampler
                self.train_loader.sampler.set_client(client_idx)

  ## [참여 skew 전략 삽입 위치] 참여 간격 기반 보정 계수 계산 가능

                # Train client
                client_model, client_loss = self._train_client(
                    root_model=self.root_model,
                    train_loader=self.train_loader,
                    client_idx=client_idx,
                )
                clients_models.append(client_model.state_dict())
                clients_losses.append(client_loss)

  ## [client size skew 전략 삽입 위치] 클라이언트별 샘플 수 측정 및 저장 필요


  ## [label distribution skew 전략 삽입 위치] 클라이언트의 라벨 다양성(coverage) 확인 및 저장 필요


  ## [통합 가중치 계산 삽입 위치] size × coverage × participation 등을 조합하여 가중치 생성


  ## [평균화 전략 삽입 위치] average_weights 호출 시 위에서 계산한 가중치 활용하도록 수정 필요

            #클라이언트 모델 파라미터들을 평균내어 서버 모델 업데이트
            # Update server model based on clients models
            updated_weights = average_weights(clients_models)
            self.root_model.load_state_dict(updated_weights)

            # Update average loss of this round
            avg_loss = sum(clients_losses) / len(clients_losses)
            train_losses.append(avg_loss)

            #일정 라운드마다 테스트 수행 및 결과 기록, 목표 정확도 달성 시 조기 종료
            if (epoch + 1) % self.args.log_every == 0:
                # Test server model
                total_loss, total_acc = self.test()
                avg_train_loss = sum(train_losses) / len(train_losses)

                # Log results
                logs = {
                    "train/loss": avg_train_loss,
                    "test/loss": total_loss,
                    "test/acc": total_acc,
                    "round": epoch,
                }
                if total_acc >= self.target_acc and self.reached_target_at is None:
                    self.reached_target_at = epoch
                    logs["reached_target_at"] = self.reached_target_at
                    print(
                        f"\n -----> Target accuracy {self.target_acc} reached at round {epoch}! <----- \n"
                    )

                self.logger.log(logs)

                # Print results to CLI
                print(f"\n\nResults after {epoch + 1} rounds of training:")
                print(f"---> Avg Training Loss: {avg_train_loss}")
                print(
                    f"---> Avg Test Loss: {total_loss} | Avg Test Accuracy: {total_acc}\n"
                )

                # Early stopping
                if self.args.early_stopping and self.reached_target_at is not None:
                    print(f"\nEarly stopping at round #{epoch}...")
                    break
    #서버 모델로 전체 테스트셋 평가 수행
    def test(self) -> Tuple[float, float]:
        """Test the server model.

        Returns:
            Tuple[float, float]: average loss, average accuracy.
        """
        self.root_model.eval()

        total_loss = 0.0
        total_correct = 0.0
        total_samples = 0
        #
        for idx, (data, target) in enumerate(self.test_loader):
            data, target = data.to(self.device), target.to(self.device)

            logits = self.root_model(data)
            loss = F.nll_loss(logits, target)

            total_loss += loss.item()
            total_correct += (logits.argmax(dim=1) == target).sum().item()
            total_samples += data.size(0)

        #테스트셋을 통해 손실과 정확도 계산
        # calculate average accuracy and loss
        total_loss /= idx
        total_acc = total_correct / total_samples

        return total_loss, total_acc

#메인 실행부/ 외부 실행 시: 하이퍼파라미터를 파싱하여 FedAvg 인스턴스 생성 및 학습 실행
if __name__ == "__main__":
    args = arg_parser()
    fed_avg = FedAvg(args)
    fed_avg.train()