In [12]:
import pandas as pd
import pathlib
import numpy as np
import flwr as fl
from flwr.common import NDArrays, Scalar, Metrics
from typing import Dict, Tuple, List
from sklearn.metrics import classification_report
from pprint import pprint
from minisom import MiniSom

SOM_DIM = 20
SOM_NUM_ITER = 100
SUBJECTS_NUMBER = 30
FED_ROUNDS = 5

prefix_train = pathlib.Path("UCI HAR Dataset split/train")
prefix_test = pathlib.Path("UCI HAR Dataset split/test")

In [13]:
def som_predict(som, Xtrain, ytrain, Xtest, ytest):
    # Test the SOM

    print( Xtrain.shape, ytrain.shape, Xtest.shape, ytest.shape)
    
    # Find the association between neurons and class
    winmap = som.labels_map(Xtrain , ytrain.flatten()) 
    default_class = np.sum( list (winmap.values())).most_common()[0][0] # The most common class in the dataset (when we are undecided)

    # Test
    prediction = []
    for test_sample in Xtest:
        win_position = som.winner(test_sample)
        if win_position in winmap:
            prediction.append( winmap [ win_position ].most_common()[0][0])
        else:
            prediction.append(default_class) #FIXME take the neighboring

    report = classification_report(
            ytest.flatten(),
            prediction,
            zero_division=0.0,
            output_dict=True,
    )

    # Eval Accuracy
    pprint ( report )

    return {"accuracy": float(report["accuracy"])}

In [14]:
class SomClient(fl.client.NumPyClient):
    def __init__(self, som, Xtrain, ytrain, Xtest, ytest, train_iter, cid):
        self.som = som
        self.Xtrain = Xtrain
        self.ytrain = ytrain.flatten()
        self.train_iter = train_iter
        self.Xtest = Xtest
        self.ytest = ytest.flatten()
        self.cid = cid

    # Return the current local model parameters
    def get_parameters(self, config) -> NDArrays:
        return [self.som.get_weights()]
    
    def set_parameters(self, parameters):
        self.som._weights = parameters
    
    # Receive model parameters from the server, 
    # train the model parameters on the local data, 
    # and return the (updated) model parameters to the server
    def fit(self, parameters, config):
        self.set_parameters(parameters)
        self.som.train_random(self.Xtrain, self.train_iter, verbose=False)
        return self.get_parameters(), len(self.Xtrain), {}
    
    # Receive model parameters from the server,
    # evaluate the model parameters on the local data, 
    # and return the evaluation result to the server
    def evaluate(self, parameters, config) -> Tuple[float, int, Dict[str, Scalar]]:
        self.set_parameters(parameters)

        return float(0), len(self.Xtest), som_predict(self.som, self.Xtrain, self.ytrain, self.Xtest, self.ytest)

In [15]:
def load_df(glob : str, path : pathlib.Path, subject = -1, avoid = -1) -> pd.DataFrame: # idx must be a int TODO
    _df = pd.DataFrame()
    for idx, fn in enumerate(path.glob(f'**/{glob}.csv')):
            if subject in (-1, idx) and idx != avoid: 
                _df = pd.concat([_df, pd.read_csv(fn, sep=' ',header = None)], ignore_index = True)
    return _df

In [16]:
train_x_df = load_df('Xtrain', prefix_train, subject = -1, avoid = 23).to_numpy()
train_y_df = load_df('ytrain', prefix_train, subject = -1, avoid = 23).to_numpy()

test_x_df = load_df('Xtest', prefix_test, subject = 23).to_numpy()
test_y_df = load_df('ytest', prefix_test, subject = 23).to_numpy()

In [17]:
def weighted_simple_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    # Multiply accuracy of each client by number of examples used
    w_accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]
    s_accuracies = [m["accuracy"] for _, m in metrics]
    clients_num = len(metrics)
    # Aggregate and return custom metric (weighted average)
    return {"w_accuracy": sum(w_accuracies) / sum(examples), "s_accuracy": sum(s_accuracies)/clients_num}

def simple_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    s_accuracies = [m["accuracy"] for _, m in metrics]
    clients_num = len(metrics)
    # Aggregate and return custom metric (weighted average)
    return {"accuracy": sum(s_accuracies)/clients_num}

In [18]:
def spawn_client(cid) -> SomClient:

    # prendo il dataset corrispondente al cid(client id)
    _Xtrain = load_df('Xtrain', prefix_train, subject = cid).to_numpy()
    _ytrain = load_df('ytrain', prefix_train, subject = cid).to_numpy()

    _Xtest = load_df('Xtest', prefix_test, subject = cid).to_numpy()
    _ytest = load_df('ytest', prefix_test, subject = cid).to_numpy()

    som = MiniSom(
            SOM_DIM,
            SOM_DIM,
            _Xtrain.shape[1],
            sigma=5,
            learning_rate=0.1,
            neighborhood_function="gaussian",
            activation_distance="manhattan",
        )

    return SomClient(som, _Xtrain, _ytrain, _Xtest, _ytest, SOM_NUM_ITER, cid)

In [19]:
def train_federated(num_rounds):

   strategy = fl.server.strategy.FedAvg(
       fraction_fit=1.0,
       fraction_evaluate=1.0,
       min_fit_clients=int(SUBJECTS_NUMBER),
       min_evaluate_clients=int(SUBJECTS_NUMBER),
       min_available_clients=int(SUBJECTS_NUMBER),
       evaluate_metrics_aggregation_fn=simple_average,
   )
   client_resources = None
   hist = fl.simulation.start_simulation(
       client_fn = spawn_client,
       num_clients = int(SUBJECTS_NUMBER),
       config = fl.server.ServerConfig(num_rounds=num_rounds),
       strategy = strategy,
       client_resources = client_resources,
   )
   #    ray_init_args = {"num_cpus": 2, "num_gpus":0.0}
   return hist.metrics_distributed

In [20]:
# Train the SOM


som = MiniSom(
        SOM_DIM,
        SOM_DIM,
        train_x_df.shape[1], #Number of Features
        sigma = 5,
        learning_rate = 0.1,
        neighborhood_function = "gaussian",
        activation_distance = "manhattan",
    )

som.random_weights_init(train_x_df)
som.train_random(train_x_df, SOM_NUM_ITER, verbose=False)  # random training

In [21]:
# Test the SOM
som_predict(som, train_x_df, train_y_df, test_x_df, test_y_df)

TypeError: 'tuple' object is not callable

In [None]:
accs_fed = train_federated(5)
accs_fed

INFO flwr 2023-11-07 17:34:00,332 | app.py:175 | Starting Flower simulation, config: ServerConfig(num_rounds=5, round_timeout=None)
2023-11-07 17:34:02,413	INFO worker.py:1673 -- Started a local Ray instance.
INFO flwr 2023-11-07 17:34:04,708 | app.py:210 | Flower VCE: Ray initialized with resources: {'CPU': 8.0, 'object_store_memory': 1797567283.0, 'memory': 3595134567.0, 'node:__internal_head__': 1.0, 'node:130.136.2.167': 1.0}
INFO flwr 2023-11-07 17:34:04,709 | app.py:218 | No `client_resources` specified. Using minimal resources for clients.
INFO flwr 2023-11-07 17:34:04,710 | app.py:224 | Flower VCE: Resources for each Virtual Client: {'num_cpus': 1, 'num_gpus': 0.0}
INFO flwr 2023-11-07 17:34:04,736 | app.py:270 | Flower VCE: Creating VirtualClientEngineActorPool with 8 actors
INFO flwr 2023-11-07 17:34:04,738 | server.py:89 | Initializing global parameters
INFO flwr 2023-11-07 17:34:04,740 | server.py:276 | Requesting initial parameters from one random client
INFO flwr 2023-11-

{}