In [1]:
# Copyright 2019-2020, ETH Zurich, Media Technology Center
#
# This file is part of Federated Learning Project at MTC.
#
# Federated Learning is a free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Federated Learning is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with Federated Learning.  If not, see <https://www.gnu.org/licenses/>.
"""
The Python implementation of the MTC Federated Learning Example Operator.

This is an example how to use the Operator Class.

Look at operator_class_db.py for all callable functions.

The example creates a model and runs 10 rounds of send_model_to_nodes->training_on_nodes->
send_back_to_server->aggregate_gradients_to_get_new_global_models

"""
import os

import sys
if os.getcwd().split('/')[-1]!="Federated-Learning":
    os.chdir('../..')
print(os.getcwd())
sys.path.append(os.getcwd())
os.environ['STATIC_VARIABLES_FILE_PATH'] = "globalserver/static_variables.json"

os.environ['PATH_TO_GLOBALSERVER'] = "globalserver/api/"
import tensorflow as tf
import xgboost as xgb
import pandas as pd
import logging
from globalserver.operator_.operator_class_db import Operator
from testing.test_class import Testing
from globalserver.operator_.utils import operator_utils
import json

tf.keras.backend.clear_session()
clients = ['c0', 'c1']
TestSetup = Testing(clients, start_servers=True, clear_logs=True, clear_db=False, interface=False)



/home/schneebi/PycharmProjects/federated-learning/Federated-Learning


2020-05-13 13:18:22,402 [MainProcess ] [INFO ] [OPERATOR] [test_class.py / kill_servers / 194] KILLING SERVERS
2020-05-13 13:18:22,403 [MainProcess ] [INFO ] [OPERATOR] [test_class.py / kill_global_server / 156] KILLING GLOBAL SERVER
2020-05-13 13:18:22,404 [MainProcess ] [INFO ] [OPERATOR] [test_class.py / kill_client_interface_node / 163] KILLING CLIENT INTERFACE NODE
2020-05-13 13:18:22,404 [MainProcess ] [INFO ] [OPERATOR] [test_class.py / kill_client_interface / 173] KILLING CLIENT INTERFACE
2020-05-13 13:18:22,405 [MainProcess ] [INFO ] [OPERATOR] [test_class.py / kill_node_servers / 183] KILLINGCLIENTS
2020-05-13 13:18:25,553 [MainProcess ] [INFO ] [OPERATOR] [test_class.py / start_global_server / 337] STARTING UP GLOBAL SERVER
2020-05-13 13:18:25,569 [MainProcess ] [INFO ] [OPERATOR] [test_class.py / start_node_servers / 225] STARTING UP NODES


the commandline is ['/home/schneebi/PycharmProjects/federated-learning/venv/bin/python', 'api/globalserver_task_controller.py', 'C88A33B946']
the commandline is ['/home/schneebi/PycharmProjects/federated-learning/venv/bin/python', 'node_task_controller.py', 'c0', 'C88A33B946']
the commandline is ['/home/schneebi/PycharmProjects/federated-learning/venv/bin/python', 'node_task_controller.py', 'c1', 'C88A33B946']


In [2]:

def get_compiled_model_P2P(param_dict):

    model = xgb.Booster(param_dict['params'], [param_dict['example']])
    return model

with open('datasets/test_c0.jsonl') as f:
    line = list(json.loads(f.readline()).values())

X_example = [line[:-1], line[:-1]]
y_example = [line[-1], line[-1]]
example = xgb.DMatrix(X_example, label=y_example)
param_dict={'params':{'max_depth': 8, 'subsample': 0.5, 'eta': 0.01, 'max_delta_step': 0,
                        'scale_pos_weight': 1.5, 'objective': 'binary:logitraw',
                        'tree_method': 'hist', 'max_bin': 250, 'colsample_bytree': 1},'example':example}


def preprocessing(batch):
    batch['label'] = [int(value) for value in batch['label']]
    for key in batch:
        batch[key] = [0 if value == '(not_set)' else float(value) for value in batch[key]]

    return batch



# here we specify rounds a bit differently, since clients need to perform tasks one after another
round = []
# for i in range(len(clients)):
for client in clients:
    round.extend([("fetch_model", client), ("train_model", client), ("send_model", client),
                  ("send_validation_loss", client), ("aggregate", 0)])






In [5]:
setup_dict = {"model_function": {
    "function": get_compiled_model_P2P,
    "parameters": param_dict,
    },
        "git_version": 'e9339081b76ad3a89b1862bd38d8af26f0541f1c',
        "protocol": 'P2P',
        "model_name": "test_model",
        "model_description": "this model is just to test the db",
        "testing": True,
        "training_config": {
        "verbosity": 1,
        "epochs": 1,
        "batch_size": 100,
        "nthread": -1,
        "client_steps_per_round": 1,
        "nr_clients": len(clients),
        "skmetrics": ["f1_score","accuracy_score"],
        "tfmetrics": ["AUC", "Accuracy"],

        "dataset": "",

    },
    "rounds": 2,
    "round":round,
    "final_round":[],
    "clients": clients,
    "experiment_name": "kkbox",
    "experiment_description": f"desc if nice experiment",
    "preprocessing": {
    "noise": {
        "epsilon": 1,
        "delta": 0.2
    },
    "cast_to_float": "",
    "feature_selection": [f"column{i}" for i in range(5,50)],
    "preprocessing_function": preprocessing
}
}

In [6]:

from globalserver.operator_.operator_class_db import Operator

operator = Operator()
operator.define_and_start_experiment(setup_dict)

2020-05-13 13:19:04,357 [MainProcess ] [DEBUG] [OPERATOR] [cmd.py / execute / 719] Popen(['git', 'cat-file', '--batch-check'], cwd=/home/schneebi/PycharmProjects/federated-learning/Federated-Learning, universal_newlines=False, shell=None, istream=<valid stream>)
2020-05-13 13:19:04,405 [MainProcess ] [INFO ] [OPERATOR] [operator_class_db.py / start_experiment / 171] 5ebbd7a871ead263b014c879
2020-05-13 13:19:04,407 [MainProcess ] [DEBUG] [OPERATOR] [operator_utils.py / get_current_task / 96] Working on {'task_id': ObjectId('5ebbd7a871ead263b014c87a'), 'task_order': 0, 'task_name': 'fetch_model', 'task_status': 'not_scheduled'}
2020-05-13 13:19:19,426 [MainProcess ] [DEBUG] [OPERATOR] [operator_utils.py / get_current_task / 96] Working on {'task_id': ObjectId('5ebbd7a871ead263b014c87b'), 'task_order': 1, 'task_name': 'train_model', 'task_status': 'scheduled'}


(ObjectId('5ebbd7a871ead263b014c879'),
 {'aggregated_metric': [], 'additional_metrics': []})