In [1]:
from datetime import datetime
from typing import List
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import train_test_split, cross_val_score, ShuffleSplit
import tensorflow as tf
from tensorflow import keras
from tensorflow_addons.metrics import RSquare
import matplotlib.pyplot as plt
import tensorflow_federated as tff

from umlaut_lte import write_result, get_data_csv, get_data_parquet, plot_loss

# folder = 'projects/fl_crowd/'
folder = ''

result_file = folder + 'results.txt'
input_data_file = folder + 'data/df_fl_excerpt.parquet'
#input_data_file = folder + 'data/lte.csv'
dnn_loss_fig_path = folder + 'dnn-loss-fig.png'
logdir = folder + 'tf-logs'  # für tensorboard logs

# to be evaluated (depends on data availability and time frame):
THRESHOLD_MIN_DAYS_PER_USER = 0
THRESHOLD_MIN_DATPOINTS_PER_DAY = 10
TEST_SPLIT_SIZE =0.2
VALIDATION_SPLIT_SIZE=0.2
DNN_BATCH_SIZE = 2

# Federated Learning
FEDERATED_TRAINING_ROUNDS = 50
FEDERATED_LR_CLIENTS = 0.8
FEDERATED_LR_SERVER = 3
BUFFER_SIZE = 100

2023-01-20 15:34:23.379481: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-01-20 15:34:23.379528: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-01-20 15:34:23.410980: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-01-20 15:34:25.533270: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-01-20 15:34:25.533375: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: ca

In [2]:
def create_keras_model(input_size):
    """Creates a DNN keras model.

    :param input_size: The size of the input of the model
    :type input_size: int
    :return: The not yet compiled model
    :rtype: keras.Model
    """
    # The original model, the actual model below is created by Umlaut
    # model = keras.models.Sequential([
    #     keras.layers.InputLayer(input_shape=(input_size)),
    #     keras.layers.Dense(40, activation='relu', kernel_initializer='zeros'),
    #     keras.layers.Dense(10, activation='relu', kernel_initializer='zeros'),
    #     keras.layers.Dense(1, kernel_initializer='zeros'),
    # ])

    print(f'####### input-size {input_size}')
    model = keras.models.Sequential([
        keras.layers.InputLayer(input_shape=(input_size)),
        keras.layers.Dense(1),
    ])
    # print(model.summary())
    return model

In [3]:
def run_tf_dnn(X_train, X_test, y_train, y_test, results=result_file,
               validation_split_size=VALIDATION_SPLIT_SIZE, batch_size=DNN_BATCH_SIZE):
    """Creates and trains a model on the given data and writes the results to the given file.

    :param X_train: The data to train the model with
    :type X_train: pandas.DataFrame
    :param X_test: The data to test the model with
    :type X_test: pandas.DataFrame
    :param y_train: The labels of the training data
    :type y_train: pandas.DataFrame
    :param y_test: The labels of the data to test the model
    :type y_test: pandas.DataFrame
    :param results: Path to file where to save the results
    :type results: str, optional
    :param validation_split_size: The relative part of the data to be used for validation
    :type validation_split_size: float, optional
    :param batch_size: The size for each bach
    :type batch_size: int, optional
    """
    model = create_keras_model(X_train.shape[1])

    model.compile(
        optimizer="adam",
        loss="mse",
        metrics=['mean_absolute_error', RSquare()],
    )

    history = model.fit(X_train,
                        y_train,
                        batch_size=batch_size,
                        shuffle=True,
                        validation_split=validation_split_size,
                        epochs=200,
                        verbose=2)

    plot_loss(history)
    scores = model.evaluate(X_test, y_test)
    write_result('Testing tf DNN (mae_loss, mse, r_square):', result_file=results)
    write_result(str(scores), result_file=results)

In [4]:
def fed_model_fn():
    """A function for TFF to create a model during federated learning and return it is the correct type.

    :return: The LSTM model to be used as federated models
    :rtype: tff.learning.model
    """
    # We _must_ create a new model here, and _not_ capture it from an external
    # scope. TFF will call this within different graph contexts.
    keras_model = create_keras_model(X_train.shape[1])
    return tff.learning.from_keras_model(
        keras_model,
        input_spec=federated_training_data[0].element_spec,
        loss=tf.keras.losses.MeanAbsoluteError(),
        metrics=[
            tf.keras.metrics.MeanAbsoluteError()
        ])

In [5]:
def run_scikit_lin_reg(X_train, X_test, y_train, y_test, result="./results.txt", validation_split_size=0.2):
    """Creates and trains a model on the given data and writes the results to the given file

    :param X_train: The data to train the model with
    :type X_train: pandas.DataFrame
    :param X_test: The data to test the model with
    :type X_test: pandas.DataFrame
    :param y_train: The labels of the training data
    :type y_train: pandas.DataFrame
    :param y_test: The labels of the data to test the model
    :type y_test: pandas.DataFrame
    :param results: Path to file where to save the results
    :type results: str, optional
    :param validation_split_size: The relative part of the data to be used for validation
    :type validation_split_size: float, optional
    :param batch_size: The size for each bach
    :type batch_size: int, optional
    """
    cv = ShuffleSplit(n_splits=5, test_size=validation_split_size, random_state=0)
    clf_lr = LinearRegression()
    scores = cross_val_score(clf_lr, X_train, y_train.values.ravel(), cv=cv)
    write_result("scikit LINEAR REGRESSION cross validation: %0.5f mean R^2 with a standard deviation of %0.5f" % (scores.mean(), scores.std()))
    clf_lr.fit(X_train, y_train.values.ravel())
    test_score = clf_lr.score(X_test, y_test.values.ravel())
    write_result(f'scikit LINEAR REGRESSION test score: {test_score}', result_file=result)


In [13]:
def run_dnn_federated(federated_training_data, X_test, y_test, fed_train_rounds=FEDERATED_TRAINING_ROUNDS,
                      result_file=result_file, logdir=logdir):
    """Create and train a federated DNN model on the given data and writes the result to the result file

    :param federated_training_data: The data in a federated format to be trained on
    :type federated_training_data: typing.List
    :param X_test: The data to test the resulting model with
    :type X_test: pandas.DataFrame
    :param y_test: The labels to check the test data
    :type y_test: pandas.DataFrame
    :param result: The file to write the results to
    :type result: str, optional
    :param logdir: The directory to save the logfiles to
    :type logdir: str, optional
    """
    logfile = f'{logdir}/{datetime.now()}'  # für tensorboard log-Dateien

    # see if TFF works:
    # tff.federated_computation(lambda: 'Initialized!')()
    iterative_process = tff.learning.algorithms.build_weighted_fed_avg(
        fed_model_fn,
        client_optimizer_fn=lambda: tf.keras.optimizers.Adam(),
        server_optimizer_fn=lambda: tf.keras.optimizers.Adam(learning_rate=0.01))

    write_result(iterative_process.initialize.type_signature.formatted_representation(), result_file=result_file)
    print("Test0")
    state = iterative_process.initialize()

    print("Test1")
    summary_writer = tf.summary.create_file_writer(logfile)
    with summary_writer.as_default():
        for round_num in range(1, fed_train_rounds):
            result = iterative_process.next(state, federated_training_data)
            state = result.state
            metrics = result.metrics
            for name, value in metrics['client_work']['train'].items():
                tf.summary.scalar(name, value, step=round_num)

    write_result(f'FINISHED federated training, logfile: {logfile}', result_file=result_file)

    # Test resulting model
    model = create_keras_model(X_test.shape[1])
    model_weights = iterative_process.get_model_weights(state)
    model_weights.assign_weights_to(model)
    model.compile(
        loss=tf.losses.mse,
        optimizer='adam',
        metrics=['mean_absolute_error', RSquare()]
    )
    scores = model.evaluate(X_test, y_test)
    write_result('Testing tf FEDERATED DNN (mae_loss, mse, r_square):', result_file=result_file)
    write_result(str(scores), result_file=result_file)


In [7]:
df = pd.read_parquet(input_data_file)
df.keys()

Index(['Id', 'Calendar_week', 'Weekday', 'Month', 'Weekend', 'Date',
       'Count_data_points', 'Velocity_mean', 'Rsrp_mean', 'Rsrq_mean',
       'Rssnr_mean', 'Rssi_mean', 'Rsrp_variance', 'Rsrq_variance',
       'Rssnr_variance', 'Rssi_variance', 'Brand', 'Wifi_data_points_share',
       'Radius_activity_user', 'Radius_activity_state'],
      dtype='object')

In [8]:
# Load the data
write_result('Loading data...')
X_train, X_test, y_train, y_test = get_data_parquet(input_data_file=input_data_file,
                                                    threshold_min_days_per_user=THRESHOLD_MIN_DAYS_PER_USER,
                                                    test_split_size=TEST_SPLIT_SIZE,
                                                    threshold_min_datpoints_per_day=THRESHOLD_MIN_DATPOINTS_PER_DAY)
X_train.Weekend = X_train.Weekend.astype("float")
X_test.Weekend = X_test.Weekend.astype("float")

Loading data...
running with threshold 0,data point threshold 10,and test split 0.2...
target variable: Radius_activity_user
Ignoring velocity? True
before filtering data: 50 rows in data set.
Index(['Id', 'Calendar_week', 'Weekday', 'Month', 'Weekend', 'Date',
       'Count_data_points', 'Velocity_mean', 'Rsrp_mean', 'Rsrq_mean',
       'Rssnr_mean', 'Rssi_mean', 'Rsrp_variance', 'Rsrq_variance',
       'Rssnr_variance', 'Rssi_variance', 'Brand', 'Wifi_data_points_share',
       'Radius_activity_user', 'Radius_activity_state'],
      dtype='object')
after filtering data: 36 rows in data set.


In [9]:
# This needs to be filled with one dataset per client
# e.g. a list of tf.data.Dataset.from_tensor_slices((tf.convert_to_tensor(X_train), tf.convert_to_tensor(y_train)))
# important: these data sets need to be batched, e.g.
# ds.shuffle(buffer_size=BUFFER_SIZE, seed=1).batch(DNN_BATCH_SIZE)
federated_training_data: List[tf.data.Dataset] = []
for brand in X_train.Brand.unique():
    df_X = X_train[X_train.Brand == brand].drop(columns=["Brand"])
    df_y = y_train[X_train.Brand == brand]
    df = tf.data.Dataset.from_tensor_slices((tf.convert_to_tensor(df_X), tf.convert_to_tensor(df_y)))
    federated_training_data.append(df.shuffle(buffer_size=BUFFER_SIZE, seed=1).batch(DNN_BATCH_SIZE))

2023-01-20 15:34:45.549850: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-01-20 15:34:45.549881: W tensorflow/stream_executor/cuda/cuda_driver.cc:263] failed call to cuInit: UNKNOWN ERROR (303)
2023-01-20 15:34:45.549902: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (svsram): /proc/driver/nvidia/version does not exist


In [10]:
X_train.drop(columns=["Brand"], inplace=True)
X_test.drop(columns=["Brand"], inplace=True)

In [11]:
import tensorflow_datasets as tfa
for dff in federated_training_data:
    dffa = tfa.as_numpy(dff.take(1))
    i = 0
    for el in dffa:
        print(el[0].shape)
    #print(i)

(2, 14)
(2, 14)
(2, 14)


In [14]:
write_result('Starting FEDERATED DNN...')
run_dnn_federated(federated_training_data, X_test, y_test)
write_result('DONE (FEDERATED DNN)')

write_result('DONE\n')

Starting FEDERATED DNN...
####### input-size 14
####### input-size 14
####### input-size 14
( -> <
  global_model_weights=<
    trainable=<
      float32[14,1],
      float32[1]
    >,
    non_trainable=<>
  >,
  distributor=<>,
  client_work=<>,
  aggregator=<
    value_sum_process=<>,
    weight_sum_process=<>
  >,
  finalizer=<
    int64,
    float32[14,1],
    float32[1],
    float32[14,1],
    float32[1]
  >
>@SERVER)
Test0
Test1
FINISHED federated training, logfile: tf-logs/2023-01-20 15:37:07.787194
####### input-size 14
Testing tf FEDERATED DNN (mae_loss, mse, r_square):
[223910080.0, 9387.7734375, -0.64928138256073]
DONE (FEDERATED DNN)
DONE

