In [2]:
import collections
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import sys
import warnings
import argparse
import math
import numpy as np
import pandas as pd
import dp_accounting
import tensorflow as tf
import tensorflow_federated as tff
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, TensorBoard
import nest_asyncio
nest_asyncio.apply()

In [None]:
trian_path = "train.csv"
test_path = "test.csv"

attr = 'Lane 1 Flow (Veh/5 Minutes)'
lags = 12
df_train = pd.read_csv(trian_path, encoding='utf-8').fillna(0)
df_test = pd.read_csv(test_path, encoding='utf-8').fillna(0)


In [None]:
scaler = MinMaxScaler(feature_range=(0, 1)).fit(df_train[attr].values.reshape(-1, 1))
flow1 = scaler.transform(df_train[attr].values.reshape(-1, 1)).reshape(1, -1)[0]
flow2 = scaler.transform(df_test[attr].values.reshape(-1, 1)).reshape(1, -1)[0]

train, test = [], []
for i in range(lags, len(flow1)):
    train.append(flow1[i - lags: i + 1])
for i in range(lags, len(flow2)):
    test.append(flow2[i - lags: i + 1])

train = np.array(train).astype(np.float32)
train = np.expand_dims(train, axis=-1)
test = np.array(test).astype(np.float32)
test = np.expand_dims(test, axis=-1)
np.random.shuffle(train)

X_train = train[:, :-1]
y_train = train[:, -1]
X_test = test[:, :-1]
y_test = test[:, -1]

In [None]:
X_train.shape

In [None]:
y_train.shape

In [None]:
EDGE_NUM = 50
LOCAL_EPOCHS = 10
BATCH_SIZE = 32
GLOBAL_EPOCHS = 5

In [None]:
#分发数据
train_data, test_data, val_data = [], [], []
for edge_ids in range(EDGE_NUM):
    data_length = X_train.shape[0] // EDGE_NUM #边缘节点数据长度
    temp_data = X_train[data_length*edge_ids:data_length*edge_ids+data_length]
    tf.expand_dims(temp_data,axis=-1)
    temp_label = y_train[data_length*edge_ids:data_length*edge_ids+data_length]
    temp_dataset = tf.data.Dataset.from_tensor_slices((temp_data, temp_label)).repeat(LOCAL_EPOCHS).batch(BATCH_SIZE)
    train_data.append(temp_dataset)
temp_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
val_data.append(temp_dataset.batch(BATCH_SIZE))
temp_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
test_data.append(temp_dataset.batch(BATCH_SIZE))

In [None]:
train_data[0]

In [None]:
def input_spec():
    return (
        tf.TensorSpec([None, 12, 1], tf.float32),
        tf.TensorSpec([None, 1], tf.float32)
    )

def model_fn():
    model = tf.keras.models.Sequential([
        tf.keras.layers.LSTM(64, input_shape=(12, 1), return_sequences=True),
        tf.keras.layers.LSTM(64),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(1, activation='sigmoid'),
    ])

    return tff.learning.from_keras_model(
        model,
        input_spec=input_spec(),
        loss=tf.keras.losses.MeanSquaredError(),
        metrics=[tf.keras.metrics.RootMeanSquaredError(name='rmse')])

In [None]:
evaluator = tff.learning.build_federated_evaluation(model_fn)

In [None]:
trainer = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.Adam(),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1),
    # model_update_aggregation_factory=zeroing_mean,
    # broadcast_process=encoded_broadcast_process,
    # aggregation_process=encoded_mean_process
)

state = trainer.initialize()
train_hist = []
losses = []
accs = []

val_losses = []
val_accs = []
# environment = set_sizing_environment()
for i in range(GLOBAL_EPOCHS):
    state, metrics = trainer.next(state, train_data)
    train_hist.append(metrics)
    losses.append(metrics['train']['loss'])
    accs.append(metrics['train']['rmse'])
    ''' 通信传输比特 '''
#     size_info = environment.get_size_info()
#     broadcasted_bits = size_info.broadcast_bits[-1]
#     aggregated_bits = size_info.aggregate_bits[-1]
    val_metrics = evaluator(state.model, val_data)
    val_losses.append(val_metrics['loss'])
    val_accs.append(val_metrics['rmse'])
    # print(f"\rRun {i+1}/{GLOBAL_EPOCHS} _ loss={metrics['train']['loss']} _ "
    #       f"RMSE={metrics['train']['rmse']}_"
    #       f"broadcasted_bits={format_size(broadcasted_bits)}_"
    #       f"aggregated_bits={format_size(aggregated_bits)}")
    print(f"\rRun {i+1}/{GLOBAL_EPOCHS} _ loss={metrics['train']['loss']} _ "
          f"RMSE={metrics['train']['rmse']}_"f"valLoss={val_metrics['loss']}_"f"valRmse"
          f"={val_metrics['rmse']}_")