> Uh I guess I should import the code from clientbase and maybe from NB202 or the server to set up a few clients, load in their data, print it, compare it to doing it manually like in torch_linregr.py

In [1]:
import copy
import torch
import torch.nn as nn
import numpy as np
import random
import os
import torch.nn.functional as F
from torch.utils.data import DataLoader
from sklearn.preprocessing import label_binarize
from sklearn import metrics

from sklearn.decomposition import PCA

from flcore.pflniid_utils.data_utils import read_client_data
from utils.custom_loss_class import CPHSLoss
from utils.emg_dataset_class import *

import time
from flcore.pflniid_utils.privacy import *

from flcore.clients.clientbase import Client
from flcore.clients.clientavg import clientAVG
from flcore.servers.serverbase import Server
from flcore.servers.serveravg import FedAvg
from flcore.servers.serverlocal import Local

In [2]:
update_ix = [0,  1200,  2402,  3604,  4806,  6008,  7210,  8412,  9614, 10816, 12018, 13220, 14422, 15624, 16826, 18028, 19230, 20432, 20769]

In [3]:
import argparse
parser = argparse.ArgumentParser()

# general
parser.add_argument('-go', "--goal", type=str, default="test", 
                    help="The goal for this experiment")
parser.add_argument('-dev', "--device", type=str, default="cpu",  # KAI: Changed the default to cpu
                    choices=["cpu", "cuda"])
parser.add_argument('-did', "--device_id", type=str, default="0")
parser.add_argument('-data', "--dataset", type=str, default="cphs")  # KAI: Changed the default to cphs (from mnist)
#parser.add_argument('-nb', "--num_classes", type=int, default=10)  # Not doing classification...
parser.add_argument('-m', "--model", type=str, default="LinearRegression")  # KAI: Changed the default to Linear Regression
parser.add_argument('-lbs', "--batch_size", type=int, default=1200)  # Setting it to a full update would be 1300ish... how many batches does it run? In one epoch? Not even sure where that is set
# The 1300 and the batch size are 2 separate things...
# I want to restrict the given dataset to just the 1300, but then iterate in batches... or do I since we don't have that much data and can probably just use all the data at once? Make batch size match the update size? ...
parser.add_argument('-lr', "--local_learning_rate", type=float, default=0.005,
                    help="Local learning rate")
parser.add_argument('-ld', "--learning_rate_decay", type=bool, default=False)
parser.add_argument('-ldg', "--learning_rate_decay_gamma", type=float, default=0.99)
parser.add_argument('-gr', "--global_rounds", type=int, default=250)  # KAI: Switched to 250 down from 2000
parser.add_argument('-ls', "--local_epochs", type=int, default=1, 
                    help="Multiple update steps in one local epoch.")  # KAI: I think it was 1 originally.  I'm gonna keep it there.  Does this mean I can set batchsize to 1300 and cook?Is my setup capable or running multiple epochs? Implicitly I was doing 1 epoch before, using the full update data I believe...
parser.add_argument('-algo', "--algorithm", type=str, default="FedAvg")
parser.add_argument('-jr', "--join_ratio", type=float, default=0.2,
                    help="Ratio of clients per round")
parser.add_argument('-rjr', "--random_join_ratio", type=bool, default=False,
                    help="Random ratio of clients per round")
parser.add_argument('-nc', "--num_clients", type=int, default=14,
                    help="Total number of clients")
parser.add_argument('-dp', "--privacy", type=bool, default=False,
                    help="differential privacy")
parser.add_argument('-dps', "--dp_sigma", type=float, default=0.0)
parser.add_argument('-sfn', "--save_folder_name", type=str, default='items')

# SECTION: practical
parser.add_argument('-cdr', "--client_drop_rate", type=float, default=0.0,
                    help="Rate for clients that train but drop out")
parser.add_argument('-tsr', "--train_slow_rate", type=float, default=0.0,
                    help="The rate for slow clients when training locally")
parser.add_argument('-ssr', "--send_slow_rate", type=float, default=0.0,
                    help="The rate for slow clients when sending global model")
parser.add_argument('-ts', "--time_select", type=bool, default=False,
                    help="Whether to group and select clients at each round according to time cost")
parser.add_argument('-tth', "--time_threthold", type=float, default=10000,
                    help="The threthold for droping slow clients")

# SECTION: Kai's additional args
parser.add_argument('-pca_channels', "--pca_channels", type=int, default=64,
                    help="Number of principal components. 64 means do not use any PCA")
parser.add_argument('-lambdaF', "--lambdaF", type=float, default=0.0,
                    help="Penalty term for user EMG input (user effort)")
parser.add_argument('-lambdaD', "--lambdaD", type=float, default=1e-3,
                    help="Penalty term for the decoder norm (interface effort)")
parser.add_argument('-lambdaE', "--lambdaE", type=float, default=1e-4,
                    help="Penalty term on performance error norm")
parser.add_argument('-starting_update', "--starting_update", type=int, default=0,
                    help="Which update to start on (for CPHS Simulation). Use 0 or 10.")
parser.add_argument('-test_split_fraction', "--test_split_fraction", type=float, default=0.2,
                    help="Fraction of data to use for testing")
parser.add_argument('-device_channels', "--device_channels", type=int, default=64,
                    help="Number of recording channels with the used EMG device")
parser.add_argument('-dt', "--dt", type=float, default=1/60,
                    help="Delta time, amount of time (sec?) between measurements")
parser.add_argument('-normalize_emg', "--normalize_emg", type=bool, default=False,
                    help="Normalize the input EMG signals")
parser.add_argument('-normalize_V', "--normalize_V", type=bool, default=False,
                    help="Normalize the V term in the cost function")
parser.add_argument('-local_round_threshold', "--local_round_threshold", type=int, default=50,
                    help="Number of communication rounds per client until a client will advance to the next batch of streamed data")
parser.add_argument('-debug_mode', "--debug_mode", type=bool, default=False,
                    help="In debug mode, the code is run to minimize overhead time in order to debug as fast as possible.  Namely, the data is held at the server to decrease init time, and communication delays are ignored.")
parser.add_argument('-condition_number', "--condition_number", type=int, default=1,
                    help="Which condition number (trial) to train on")
parser.add_argument('-test_split_each_update', "--test_split_each_update", type=bool, default=False,
                    help="Implement train/test split within each update or on the entire dataset")
parser.add_argument('-verbose', "--verbose", type=bool, default=False,
                    help="Print out a bunch of extra stuff")
parser.add_argument('-slow_clients_bool', "--slow_clients_bool", type=bool, default=False,
                    help="Control whether or not to have ANY slow clients")
parser.add_argument('-return_cost_func_comps', "--return_cost_func_comps", type=bool, default=False, #True
                    help="Return Loss, Error, DTerm, FTerm from loss class")
parser.add_argument('-test_split_users', "--test_split_users", type=bool, default=False,
                    help="Split testing data by holding out some users (fraction held out determined by test_split_fraction)")
    
parser.add_argument('-t', "--times", type=int, default=1,
                    help="Running times")
parser.add_argument('-ab', "--auto_break", type=bool, default=False)
parser.add_argument('-dlg', "--dlg_eval", type=bool, default=False)
parser.add_argument('-dlgg', "--dlg_gap", type=int, default=100)
parser.add_argument('-bnpc', "--batch_num_per_client", type=int, default=2)  # Only used with DLG
parser.add_argument('-eg', "--eval_gap", type=int, default=1,
                    help="Rounds gap for evaluation")
parser.add_argument('-nnc', "--num_new_clients", type=int, default=0)

# This one for sure breaks it
#parser.add_argument('-fte', "--fine_tuning_epoch", type=int, default=0)

#args = parser.parse_args()
args = parser.parse_known_args()

args = args[0]
args.fine_tuning_epoch=0
dataset = 'cphs'

In [4]:
time_list = []
#reporter = MemReporter()
model_str = args.model

# Switched args.prev to 0 since it wasn't working
#for i in range(0, args.times):
print(f"\n============= Running time: {0}th =============")
print("Creating server and clients ...")
start = time.time()

# Generate args.model
args.model = torch.nn.Linear(args.pca_channels, 2)  #input_size, output_size

print(args.model)

# select algorithm
if args.algorithm == "FedAvg":
    server = FedAvg(args, 0)
elif args.algorithm == "Local":
    server = Local(args, 0)
else:
    raise NotImplementedError

#server.train()

#time_list.append(time.time()-start)
#print(f"\nAverage time cost: {round(np.average(time_list), 2)}s.")

server.selected_clients = server.clients
with torch.no_grad():
    # subscript global_model with [0] if it is sequential instead of linear model --> does that return just the first layer then?
    server.global_model.weight.fill_(0)

#for i in range(self.global_rounds+1):
if 0%server.eval_gap == 0:
    print(f"\n-------------Round number: {0}-------------")
    if 0!=0:
        print("\nEvaluate personalized models")
        server.evaluate()

        #print(f"len: {len(self.rs_train_loss[-1])}")
        if type(server.rs_train_loss[-1]) in [int, float]:
            print(f"rs_train_loss: {server.rs_train_loss[-1]}")
        else:
            print(f"len: {len(server.rs_train_loss[-1])}")
        print()

server.selected_clients = server.select_clients()
print(f"Selected client IDs: {[client.ID for client in server.selected_clients]}")


Creating server and clients ...
Linear(in_features=64, out_features=2, bias=True)
Serveravg init(): set_slow_clients()
Serveravg init(): set_clients()
SBSC: iter 0
SBSC: iter 1
SBSC: iter 2
SBSC: iter 3
SBSC: iter 4
SBSC: iter 5
SBSC: iter 6
SBSC: iter 7
SBSC: iter 8
SBSC: iter 9
SBSC: iter 10
SBSC: iter 11
SBSC: iter 12
SBSC: iter 13

Join ratio / total clients: 0.2 / 14
Finished creating server and clients.

-------------Round number: 0-------------
Selected client IDs: [9, 12]


In [5]:
#print("CLIENT TRAINING")
#for client in server.selected_clients:
#    client.train()
#    print(f"Client{client.ID} loss: {client.loss_log[-1]:0,.3f}")

my_client = server.selected_clients[0]
print(f"my_client: {my_client}")

my_client: <flcore.clients.clientavg.clientAVG object at 0x000001CB2BA90700>


In [6]:
#def train(self):
trainloader = my_client.load_train_data()
# self.model.to(self.device)
my_client.model.train()

# differential privacy
#if self.privacy:
#    self.model, self.optimizer, trainloader, privacy_engine = \
#        initialize_dp(self.model, self.optimizer, trainloader, self.dp_sigma)

start_time = time.time()

max_local_steps = my_client.local_epochs
#if self.train_slow:
#    max_local_steps = np.random.randint(1, max_local_steps // 2)

In [7]:
for i, (x, y) in enumerate(trainloader):
    print(f"Batch {i}: x has size {x.size()}; y has size {y.size()}")

Batch 0: x has size torch.Size([1200, 64]); y has size torch.Size([1200, 2])


In [None]:
# FROM CLIENTBASE.PY

'''
def simulate_data_streaming(self, dl, test_data=False):
    it = iter(dl)
    s0 = it.__next__()
    # self.max_training_update_upbound
    # I think this is wrong, implies it never advances the update? Didnt I handle that somewhere else tho...
    # Uhhh this is super wrong, should only be testing one update at a time???
    #s_temp = s0[0][0:self.update_ix[1],:]
    #p_reference = torch.transpose(s0[1][0:self.update_ix[1],:], 0, 1)
    lb = self.update_ix[self.current_update]
    ub = self.update_ix[self.current_update+1]
    s_temp = s0[0][lb:ub,:]
    p_reference = torch.transpose(s0[1][lb:ub,:], 0, 1)

    # First, normalize the entire s matrix
    if self.normalize_emg:
        s_normed = s_temp / torch.linalg.norm(s_temp, ord='fro')
        assert (torch.linalg.norm(s_normed, ord='fro')<1.2) and (torch.linalg.norm(s_normed, ord='fro')>0.8)
    else:
        s_normed = s_temp
    # Apply PCA if applicable
    if self.pca_channels!=self.device_channels:  # 64 is the number of channels present on the recording armband
        pca = PCA(n_components=self.pca_channels)
        s = torch.transpose(torch.tensor(pca.fit_transform(s_normed), dtype=torch.float32), 0, 1)
    else:
        s = torch.transpose(s_normed, 0, 1)

    self.F = s[:,:-1]
    v_actual =  torch.matmul(self.model.weight, s)
    p_actual = torch.cumsum(v_actual, dim=1)*self.dt  # Numerical integration of v_actual to get p_actual
    self.V = (p_reference - p_actual)*self.dt
    if self.normalize_V:
        self.V = self.V/torch.linalg.norm(self.V, ord='fro')
        assert (torch.linalg.norm(self.V, ord='fro')<1.2) and (torch.linalg.norm(self.V, ord='fro')>0.8)
    self.Y = p_reference[:, :-1]  # To match the input
'''
0

#my_client.simulate_data_streaming(trainloader)
it = iter(dl)
s0 = it.__next__()
# self.max_training_update_upbound
# I think this is wrong, implies it never advances the update? Didnt I handle that somewhere else tho...
# Uhhh this is super wrong, should only be testing one update at a time???
#s_temp = s0[0][0:self.update_ix[1],:]
#p_reference = torch.transpose(s0[1][0:self.update_ix[1],:], 0, 1)
lb = update_ix[10]
ub = update_ix[11]
s_temp = s0[0][lb:ub,:]
p_reference = torch.transpose(s0[1][lb:ub,:], 0, 1)

# First, normalize the entire s matrix
s_normed = s_temp
# Apply PCA if applicable
s = torch.transpose(s_normed, 0, 1)

F = s[:,:-1]
# Hmmm I think this line is doing my output=model(x)...
v_actual =  torch.matmul(my_client.model.weight, s)
p_actual = torch.cumsum(v_actual, dim=1)/60  # Numerical integration of v_actual to get p_actual
V = (p_reference - p_actual)/60
y = p_reference[:, :-1]  # To match the input

In [None]:
#my_client.simulate_data_streaming(trainloader)
it = iter(dl)
s0 = it.__next__()
# self.max_training_update_upbound
# I think this is wrong, implies it never advances the update? Didnt I handle that somewhere else tho...
# Uhhh this is super wrong, should only be testing one update at a time???
#s_temp = s0[0][0:self.update_ix[1],:]
#p_reference = torch.transpose(s0[1][0:self.update_ix[1],:], 0, 1)
lb = update_ix[10]
ub = update_ix[11]
s_temp = s0[0][lb:ub,:]
p_reference = torch.transpose(s0[1][lb:ub,:], 0, 1)

# First, normalize the entire s matrix
s_normed = s_temp
# Apply PCA if applicable
s = torch.transpose(s_normed, 0, 1)

F = s[:,:-1]
# Hmmm I think this line is doing my output=model(x)...
v_actual =  my_client.model(torch.transpose(F, 0, 1))
p_actual = torch.cumsum(v_actual, dim=1)/60  # Numerical integration of v_actual to get p_actual
V = (p_reference - p_actual)/60
y = p_reference[:, :-1]  # To match the input

In [None]:
loss_func = CPHSLoss2(lambdaF=my_client.lambdaF, lambdaD=my_client.lambdaD, lambdaE=my_client.lambdaE)
if v_actual.shape[0]!=y.shape[0]:
    ty_pred = torch.transpose(y_pred, 0, 1)
else:
    ty_pred = y_pred
t2 = lambdasFDE[1]*(torch.linalg.matrix_norm((my_client.model.weight))**2)
t3 = lambdasFDE[0]*(torch.linalg.matrix_norm((F))**2)
loss = loss_func(ty_pred, y) + t2 + t3
print(f"t2: {}")
print(f"t3: {}")
print(f"{}")