# CLIENT

In [2]:
from torch.optim import SGD

class Client():
    def __init__(self, client_id:str, model:nn.Module, data_info:dict=None, device:str=cfg.DEVICE):
        self.id = client_id
        #self.cfg = cfg
        
        self.__model = None
        self.device = device
        
        self.train_info, self.test_info = data_info['train'], data_info['test'] # 함수화하기
        self.trainset, self.testset = FEMNIST(self.train_info), FEMNIST(self.test_info)
        
    @property
    def model(self):             
        return self.__model

    @model.setter
    def model(self, model):
        self.__model = model
    
    def __len__(self):
        return len(self.trainset)
    
    def setup(self):
        self.train_loader = DataLoader(self.trainset, batch_size=16, shuffle=True)
        self.test_loader = DataLoader(self.testset, batch_size=16, shuffle=False)
        self.optimizer = SGD(self.model.parameters(), lr=0.01)     # TODO: utils.get_optimizer(cfg['optim']:str)
        self.criterion = nn.CrossEntropyLoss()                       # TODO: utils.get_loss(cfg['loss']:str)
        self.epochs = 10
    
    def local_train(self)->None:
        proc = os.getpid()
        self.model.train()
        self.model.to(self.device)
        # TRAINING
        for epoch in range(self.epochs):
            for idx, batch in enumerate(self.train_loader):
                self.optimizer.zero_grad()
                X, Y = batch
                X, Y = X.to(self.device), Y.to(self.device)
                pred = self.model(X)
                loss = self.criterion(pred, Y)
                loss.backward()
                self.optimizer.step()
                if "cuda" in self.device : torch.cuda.empty_cache()
        # TESTING
        self.model.eval()
        self.model.to(self.device)
        with torch.no_grad():
            loss_trace, result_pred, result_anno = [], [], []
            for idx, batch in enumerate(self.train_loader):
                X, Y = batch
                X, Y = X.to(self.device), Y.to(self.device)
                pred = self.model(X)
                loss = self.criterion(pred, Y)
                loss_trace.append(loss.to('cpu').detach().numpy())
                pred_np  = pred.to('cpu').detach().numpy()
                pred_np  = np.argmax(pred_np, axis=1).squeeze()
                Y_np     = Y.to('cpu').detach().numpy().reshape(-1, 1).squeeze()
                result_pred = np.hstack((result_pred, pred_np))
                result_anno = np.hstack((result_anno, Y_np))
                if "cuda" in self.device : torch.cuda.empty_cache()
            train_acc = metrics.accuracy_score(y_true=result_anno, y_pred=result_pred)
            train_loss = np.average(loss_trace)
            self.model.to('cpu')
        
        print(f'=== Process ID: {proc} | Client {self.id} Finished Training {len(self)} samples ===')
        print(f'client:{self.id} | Train Acc:{train_acc*100:.2f} | Train Loss:{train_loss:.4f}')
    
    def local_test(self):
        self.model.eval()
        self.model.to(self.device)
        with torch.no_grad():
            loss_trace, result_pred, result_anno = [], [], []
            for idx, batch in enumerate(self.test_loader):
                X, Y = batch
                X, Y = X.to(self.device), Y.to(self.device)
                pred = self.model(X)
                loss = self.criterion(pred, Y)
                loss_trace.append(loss.to('cpu').detach().numpy())
                pred_np  = pred.to('cpu').detach().numpy()
                pred_np  = np.argmax(pred_np, axis=1).squeeze()
                Y_np     = Y.to('cpu').detach().numpy().reshape(-1, 1).squeeze()
                result_pred = np.hstack((result_pred, pred_np))
                result_anno = np.hstack((result_anno, Y_np))
                
                if "cuda" in self.device : torch.cuda.empty_cache()
                
            test_acc = metrics.accuracy_score(y_true=result_anno, y_pred=result_pred)
            test_loss = np.average(loss_trace)
            print(f'client:{self.id} | Test Acc:{test_acc*100:.2f} | Test Loss:{test_loss:.4f}')
            self.model.to('cpu')

# SERVER

In [3]:
class Server():
    def __init__(self, DM_dict:dict, algorithm:str=None):
        self.train_DM = DM_dict['train']
        self.test_DM = DM_dict['test']
        
        self.clients = None
        self.device = cfg.DEVICE
        
        self.global_model = Net()
        
        self.criterion = nn.CrossEntropyLoss()              # TODO: utils.get_loss(cfg['loss']:str)
        
        self.Algorithm = FedAvg.FedAvg                      # FedAVG 같은 aggrrgation method 들어감 TODO: utils.get_algortihm() 작성
        self.received_models = None                         # Client.upload_model() 결과가 여기 들어감
        
        self.mp_flag = True

    def setup(self):
        self.clients = self.create_clients()
        self.data = FEMNIST(self.test_DM.get_global_testset())
        self.dataloader = DataLoader(self.data, batch_size=256, shuffle=False)
        
        self.transmit_model()
        self.setup_clients()
        
        
    def create_clients(self, n_users:int=100):
        self.user_ids = self.test_DM.users
        self.user_ids = np.random.choice(self.user_ids, n_users, replace=False)
        clients = {}
        for user in self.user_ids:
            data_info = {'train':self.train_DM.get_user_info(user),\
                         'test':self.test_DM.get_user_info(user)}
            clients[user] = Client(client_id=user, model=self.global_model, data_info=data_info)
        return clients
    
    def setup_clients(self)->None:
        for k, client in tqdm(enumerate(self.clients), leave=False):
            self.clients[client].setup()
    
    def transmit_model(self, sampled_clients:list=None)->None:
        if sampled_clients == None:
            for client in tqdm(self.clients, leave=False):
                self.clients[client].model = copy.deepcopy(self.global_model)
        else:
            for client in tqdm(sampled_clients, leave=False):
                self.clients[client].model = copy.deepcopy(self.global_model)

        
    def sample_clients(self, n_participant:int=50)->np.array:
        assert n_participant <= len(self.user_ids), "Check 'n_participant <= len(self.clients)'"
        return np.random.choice(self.user_ids, n_participant, replace=False) # 입력된 수의 유저를 추출해서 반환

    def train_selected_clients(self, sampled_clients:list)->None:
        total_sample = 0
        for client in tqdm(sampled_clients, leave=False):
            self.clients[client].local_train()
            total_sample += len(self.clients[client])

    def mp_train_selected_clients(self, client:str)->None:
        self.clients[client].local_train()
        n_sample = len(self.clients[client])
        return n_sample
    
    def test_selected_models(self, sampled_clients):
        for client in sampled_clients:
            self.clients[client].local_test()

    def mp_test_selected_models(self, client):
        self.clients[client].local_test()
    
    def average_model(self, sampled_clients, coefficients):
        averaged_weights = OrderedDict()
        for it, client in tqdm(enumerate(sampled_clients), leave=False):
            local_weights = self.clients[client].model.state_dict()
            for key in self.global_model.state_dict().keys():
                if it == 0:
                    averaged_weights[key] = coefficients[it] * local_weights[key]
                else:
                    averaged_weights[key] += coefficients[it] * local_weights[key]
        self.global_model.load_state_dict(averaged_weights)
    
    def update_model(self, train_result:dict, layers:list=None):
        self.received_models, num_samples = [], []
        for result in train_result:
            self.received_models.append(result['model'])
            num_samples.append(result['num_sample'])
        state = self.Algorithm(self.received_models, num_samples, layers)
        self.global_model.load_state_dict(state)

    def train_federated_model(self):
        sampled_clients = self.sample_clients()
        
        if self.mp_flag:
            with pool.ThreadPool(processes=cpu_count() - 1) as workhorse:
                selected_total_size = workhorse.map(self.mp_train_selected_clients, sampled_clients)
            selected_total_size = sum(selected_total_size)
        else:
            selected_total_size = self.train_selected_clients(sampled_clients)

        if self.mp_flag:
            with pool.ThreadPool(processes=cpu_count() - 1) as workhorse:
                workhorse.map(self.mp_test_selected_models, sampled_clients)
        else:
            self.test_selected_models(sampled_clients)
        
        mixing_coefficients = [len(self.clients[client]) / selected_total_size for client in sampled_clients]
        
        self.average_model(sampled_clients, mixing_coefficients)
        
    def global_test(self):
        self.global_model.eval()
        self.global_model.to(self.device)
        
        with torch.no_grad():
            loss_trace, result_pred, result_anno = [], [], []
            for idx, batch in enumerate(self.dataloader):
                X, Y = batch
                X, Y = X.to(self.device), Y.to(self.device)
                pred = self.global_model(X)
                loss = self.criterion(pred, Y)
                loss_trace.append(loss.to('cpu').detach().numpy())
                pred_np  = pred.to('cpu').detach().numpy()
                pred_np  = np.argmax(pred_np, axis=1).squeeze()
                Y_np     = Y.to('cpu').detach().numpy().reshape(-1, 1).squeeze()
                result_pred = np.hstack((result_pred, pred_np))
                result_anno = np.hstack((result_anno, Y_np))
            self.acc = metrics.accuracy_score(y_true=result_anno, y_pred=result_pred)
            self.test_loss = np.average(loss_trace)
            print(f'Global Test Result | Acc:{self.acc*100:.2f}, Loss:{self.test_loss:.4f}')
            self.global_model.to(self.device)

In [4]:
PATH = cfg.DATAPATH['femnist']
    
file_dict = get_files(PATH)
    
TRAIN_DM = DataManager(file_dict['train'], is_train=True)
TEST_DM = DataManager(file_dict['test'], is_train=False)
DM_dict = {'train':TRAIN_DM,
            'test':TEST_DM}
print("DATA READY")

DATA READY


In [None]:
server = Server(DM_dict)
#server.create_clients()
server.setup()
for i in range(100):
    server.train_federated_model()
    server.global_test()


In [None]:
file = server.train_DM.files[0]
with open(file) as f:
    data = json.load(f)
data


In [41]:
np.shape(data['user_data'][data['users'][0]]['x'])

(141, 784)

In [67]:
class DataManager():
    def __init__(self, files:list, is_train:bool=True):
        self.files = files
        self.is_train = is_train
        self.users, self.data = [], {}
        if not self.is_train:
            self.global_test_data = {'x':[], 'y':[]}
        
        for idx, file in enumerate(self.files):
            idx = str(idx)
            self.data[idx] = {'x':[], 'y':[]}
            with open(file) as f:
                data = json.load(f)
                self.users.append(idx)

                for user in data['users']:              # 각 유저의 data 저장
                    self.data[idx]['x'] = self.data[idx]['x'] + data['user_data'][user]['x']
                    self.data[idx]['y'] = self.data[idx]['y'] + data['user_data'][user]['y']
                    
                if not self.is_train:                   # test dataset인 경우 global evaluation 위해서 모든 데이터셋 저장
                    for user in data['users']:
                        self.global_test_data['x'] = self.global_test_data['x'] + data['user_data'][user]['x']
                        self.global_test_data['y'] = self.global_test_data['y'] + data['user_data'][user]['y']

class FEMNIST(Dataset):
    def __init__(self, data:dict):
        self.data = data
        self.data['x'] = np.array(data['x'])
        self.data['y'] = np.array(data['y'])
        
    def __getitem__(self, idx):
        #self.X = torch.tensor(self.data['x'][idx,:].reshape(1, 28, 28)).float()
        self.X = torch.tensor(self.data['x'][idx,:]).float()
        self.Y = torch.tensor(self.data['y'][idx]).long()
        return self.X, self.Y

    def __len__(self):
        return len(self.data['y'])

In [53]:
PATH = '../leaf/data/femnist/data/train'
files = [os.path.join(PATH, file) for file in os.listdir(PATH) if file.endswith('.json')]
files.sort()

DM = DataManager(files, is_train=False)

In [49]:
np.shape(DM.data['1']['x'])

(351, 784)

In [66]:
np.shape(DM.data['0']['x'])

(2754, 784)

In [68]:
dataset = FEMNIST(DM.data['0'])

In [69]:
dataset[0]

(tensor([1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0