In [2]:
#!pip install --upgrade tensorflow_federated_nightly
#!pip install --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

In [3]:
import collections
import functools
import os
import time

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
import pandas as pd

np.random.seed(0)

from tensorflow import feature_column
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split

import os
import time
import sys

from tqdm import tqdm

# Test the TFF is working:
tff.federated_computation(lambda: 'Hello, World!')()

b'Hello, World!'

# Dataset

In [4]:
# read the dataset from Drive
df = pd.read_csv("./ma_results/trips_with_zones_final.csv")
df = df.head(10000000)
df.head()

Unnamed: 0,medallion,pickup_week_day,pickup_hour,pickup_day,pickup_month,dropoff_week_day,dropoff_hour,dropoff_day,dropoff_month,pickup_location_id,dropoff_location_id
0,00005007A9F30E289E760362F69E4EAD,1,0,1,1,1,0,1,1,162.0,262.0
1,00005007A9F30E289E760362F69E4EAD,1,0,1,1,1,0,1,1,262.0,239.0
2,00005007A9F30E289E760362F69E4EAD,1,0,1,1,1,1,1,1,239.0,236.0
3,00005007A9F30E289E760362F69E4EAD,1,1,1,1,1,1,1,1,236.0,41.0
4,00005007A9F30E289E760362F69E4EAD,1,1,1,1,1,1,1,1,41.0,211.0


In [5]:
# Check dtypes of the attributes
df.dtypes

medallion               object
pickup_week_day          int64
pickup_hour              int64
pickup_day               int64
pickup_month             int64
dropoff_week_day         int64
dropoff_hour             int64
dropoff_day              int64
dropoff_month            int64
pickup_location_id     float64
dropoff_location_id    float64
dtype: object

In [6]:
# Cast the columns type to int32
dictionary = {'pickup_week_day': 'int32', 'pickup_hour': 'int32', 'pickup_day': 'int32', 'pickup_month': 'int32', 'dropoff_week_day': 'int32', 'dropoff_hour': 'int32', 'dropoff_day': 'int32', 'dropoff_month': 'int32', 'pickup_location_id':'int32', 'dropoff_location_id':'int32'}
df = df.astype(dictionary, copy=True)
df.dtypes

medallion              object
pickup_week_day         int32
pickup_hour             int32
pickup_day              int32
pickup_month            int32
dropoff_week_day        int32
dropoff_hour            int32
dropoff_day             int32
dropoff_month           int32
pickup_location_id      int32
dropoff_location_id     int32
dtype: object

In [7]:
df.medallion.value_counts().loc[df.medallion.value_counts().index[100]]

1765

Because there are too many taxis (over 9000) it is better to take the 100 taxi with the major number of records

In [8]:
# Pick taxis with at least 1000 records
count = df.medallion.value_counts()

medallions = count.loc[count.index[:100]].index # count >= 1000
test_medallions = count.loc[count.index[100:105]].index
val_medallions = count.loc[count.index[105:110]].index

df_test = df.loc[df.medallion.isin(test_medallions)].copy()
df_val = df.loc[df.medallion.isin(val_medallions)].copy()
df = df.loc[df.medallion.isin(medallions)]

We can use the other taxis to create a local test and validation sets

In [9]:
# function to remove duplicates
def create_sequence(locations): 
  # Flatten the list of places
  sequence = np.reshape(locations.values, [-1])

  # Create a temporary array of the same lenght of the sequece of locations
  copy = np.zeros(sequence.shape[0], dtype=np.int32)

  # Copy the sequence of location in the copy array but shifted right by 1 position
  # The last location does not need to be copied, it can't be a duplicate
  copy[1:] = sequence[:sequence.shape[0]-1]

  # Where we get 0 it can be a possible duplicated
  duplicated = sequence - copy

  # indices where the subtraction gives 0
  idx = np.where(duplicated == 0)[0]

  # Find where the position of the zeros are even
  even = idx%2 == 0

  # List the indices where the position is even and the subtraction gave 0
  to_drop = idx[even]

  # Remove the duplicates
  clean_sequence = np.delete(sequence, to_drop)
  return clean_sequence, to_drop

Now we need to create the location sequence for each user

In [10]:
def df_to_location_sequence(df):
  
  # take just the columns we need
  locations = df[['pickup_location_id','dropoff_location_id']].copy()
  locations = locations.astype('int32')


  # define the indices to keep trace of the locations
  x = np.arange(0, locations.values.shape[0])

  pos = np.array([x,x]).T
  pos = np.reshape(pos, [-1])

  # Represent whether the location is a pickup or a dropoff
  pick = np.zeros(locations.values.shape[0], dtype=int)
  drop = np.ones(locations.values.shape[0], dtype=int)

  loc = np.array([pick,drop]).T
  loc = np.reshape(loc, [-1])

  # Generate the sequence of places
  sequence, duplicates = create_sequence(locations)

  # We use now the indices of the duplicated locations to clean also the array of rows and the array of location types
  pos = np.delete(pos, duplicates)
  loc = np.delete(loc, duplicates)

  # Select the indices of records we want the pickup location
  pick_pos = pos[pos[loc == 0]]

  # Select the indices of records we want the dropoff location
  drop_pos = pos[pos[loc == 1]]

  
  records_pick = df.iloc[pick_pos][['medallion', 'pickup_location_id', 'pickup_week_day',	'pickup_hour',	'pickup_day',	'pickup_month']]
  records_pick = records_pick.rename(columns={'pickup_location_id': 'location_id', 'pickup_week_day':'week_day' ,	'pickup_hour':'hour' ,	'pickup_day':	'day' ,	'pickup_month':'month' })

  idx_drop = np.nonzero(loc == 0)[0]
  records_drop = df.iloc[drop_pos][['medallion', 'dropoff_location_id', 'dropoff_week_day',	'dropoff_hour',	'dropoff_day',	'dropoff_month']]
  records_drop = records_drop.rename(columns={'dropoff_location_id': 'location_id', 'dropoff_week_day':'week_day' ,	'dropoff_hour':'hour' ,	'dropoff_day':	'day' ,	'dropoff_month':'month' })

  locations_sequence = pd.concat([records_pick, records_drop])

  # reset the index
  locations_sequence.reset_index(inplace=True)

  # From hour to sin-cos representation
  locations_sequence['hour_sin'] = np.sin(locations_sequence.hour*(2.*np.pi/24))
  locations_sequence['hour_cos'] = np.cos(locations_sequence.hour*(2.*np.pi/24))

  locations_sequence['week_day_sin'] = np.sin(locations_sequence.week_day*(2.*np.pi/7))
  locations_sequence['week_day_cos'] = np.cos(locations_sequence.week_day*(2.*np.pi/7))


  # Drop the original column
  locations_sequence.drop(['hour'], axis=1, inplace=True)
  

  # Helper function to encode the day_type
  def is_weekend(days):
    weekends = np.zeros(len(days))
    weekends[((days == 5) | (days == 6))] = 1
    return weekends

  # Apply the helper function to all the records
  locations_sequence['weekend'] = is_weekend(locations_sequence['week_day'])

  # the column is not needed anymore
  locations_sequence.drop(['week_day'], axis=1, inplace=True)

  # Correct the weekend feature type
  dictionary = {'weekend': 'int32'}
  locations_sequence = locations_sequence.astype(dictionary, copy=True)
  
  return locations_sequence, pos, loc

# Call the function
locations_sequence, pos, loc = df_to_location_sequence(df)

print(locations_sequence)

          index                         medallion  location_id  day  month  \
0         18752  003EEA559FA61800874D4F6805C4A084          170    1      1   
1         18752  003EEA559FA61800874D4F6805C4A084          170    1      1   
2         18755  003EEA559FA61800874D4F6805C4A084          141    1      1   
3         18757  003EEA559FA61800874D4F6805C4A084          239    1      1   
4         18758  003EEA559FA61800874D4F6805C4A084           41    1      1   
...         ...                               ...          ...  ...    ...   
249801  7268314  7C7F7C78F1ECB5625E1F611E711DB449          233   24      1   
249802  7268315  7C7F7C78F1ECB5625E1F611E711DB449           79   24      1   
249803  7268315  7C7F7C78F1ECB5625E1F611E711DB449           79   24      1   
249804  7268316  7C7F7C78F1ECB5625E1F611E711DB449          162   24      1   
249805  7268317  7C7F7C78F1ECB5625E1F611E711DB449           43   24      1   

        hour_sin      hour_cos  week_day_sin  week_day_cos  wee

In [11]:
# List the df for each user
users_locations = []

# For each user
for medallion in tqdm(medallions):
  # Call the function
  locations_sequence, pos, loc = df_to_location_sequence(df.loc[df.medallion == medallion].copy())
  # Add the sequence df of the user to the list
  users_locations.append(locations_sequence)


100%|██████████| 100/100 [00:01<00:00, 79.18it/s]


In [12]:
test_locations_sequence, pos, loc = df_to_location_sequence(df_test)
val_locations_sequence, pos, loc = df_to_location_sequence(df_val)

test_locations_sequence.drop(['index', 'day', 'month'], axis=1, inplace=True)
val_locations_sequence.drop(['index', 'day', 'month'], axis=1, inplace=True)

# Split the data into chunks
N = 17

# Test
# Get a list of dataframes of length n records 
list_test = [test_locations_sequence[i:i+N] for i in range(0, test_locations_sequence.shape[0], N)]

# Test
# Get a list of dataframes of length n records 
list_val = [val_locations_sequence[i:i+N] for i in range(0, val_locations_sequence.shape[0], N)]
list_test[0]

if len(list_val[-1]) < N:
  diff_val = 1
else:
  diff_val = 0

if len(list_test[-1]) < N:
  diff_test = 1
else:
  diff_test = 0


# Define the input features of the  dataset
val_input_dict = {
  'start_place':np.array([list_val[i]['location_id'].values[:-1] for i in range(0, len(list_val)-diff_val)]), 
  'start_hour_sin':np.array([list_val[i]['hour_sin'].values[:-1] for i in range(0, len(list_val)-diff_val)]),
  'start_hour_cos':np.array([list_val[i]['hour_cos'].values[:-1] for i in range(0, len(list_val)-diff_val)]), 
  'weekend':np.array([list_val[i]['weekend'].values[:-1] for i in range(0, len(list_val)-diff_val)]),
  'week_day_sin':np.array([list_val[i]['week_day_sin'].values[:-1] for i in range(0, len(list_val)-diff_val)]),
  'week_day_cos':np.array([list_val[i]['week_day_cos'].values[:-1] for i in range(0, len(list_val)-diff_val)]),
}

# Define the input features of the  dataset
test_input_dict = {
  'start_place':np.array([list_test[i]['location_id'].values[:-1] for i in range(0, len(list_test)-diff_test)]), 
  'start_hour_sin':np.array([list_test[i]['hour_sin'].values[:-1] for i in range(0, len(list_test)-diff_test)]),
  'start_hour_cos':np.array([list_test[i]['hour_cos'].values[:-1] for i in range(0, len(list_test)-diff_test)]), 
  'weekend':np.array([list_test[i]['weekend'].values[:-1] for i in range(0, len(list_test)-diff_test)]),
  'week_day_sin':np.array([list_test[i]['week_day_sin'].values[:-1] for i in range(0, len(list_test)-diff_test)]),
  'week_day_cos':np.array([list_test[i]['week_day_cos'].values[:-1] for i in range(0, len(list_test)-diff_test)]),
}

# Create training examples / targets, we are going to predict the next location
trips_dataset_val = tf.data.Dataset.from_tensor_slices((val_input_dict, np.array([list_val[i]['location_id'].values[1:] for i in range(0, len(list_val)-diff_val)])) )
trips_dataset_test = tf.data.Dataset.from_tensor_slices((test_input_dict, np.array([list_test[i]['location_id'].values[1:] for i in range(0, len(list_test)-diff_test)])) )

# Batch size
BATCH_SIZE = 16

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

# Create the dataset by creating batches
# Uncomment the shuffle function in case we want to shuffle the sequences
val_dataset = trips_dataset_val.batch(BATCH_SIZE, drop_remainder=True) #.shuffle(BUFFER_SIZE)
test_dataset = trips_dataset_test.batch(BATCH_SIZE, drop_remainder=True) #.shuffle(BUFFER_SIZE)

In [13]:
sizes = []
# Number of locations for each user
for user_df in users_locations:
  sizes.append(user_df.shape[0])

print('Mean number of locations: ', np.mean(np.array(sizes)))
print('Max number of locations: ', np.max(np.array(sizes)))
print('Min number of locations: ', np.min(np.array(sizes)))

Mean number of locations:  2498.07
Max number of locations:  2924
Min number of locations:  2306


Create the validation and test sets for each user

In [14]:
# List the dfs fo train, val and test for each user
users_locations_train = []
users_locations_val = []
users_locations_test = []

for user_df in users_locations:
  # Split in train, test and validation
  train, test = train_test_split(user_df, test_size=0.2, shuffle=False)
  train, val = train_test_split(train, test_size=0.2, shuffle=False)

  # Append the sets
  users_locations_train.append(train)
  users_locations_val.append(val)
  users_locations_test.append(test)


In [15]:
sizes = []
# Number of locations for each user in the validation set
for user_df in users_locations_val:
  sizes.append(user_df.shape[0])

print('Mean number of locations: ', np.mean(np.array(sizes)))
print('Max number of locations: ', np.max(np.array(sizes)))
print('Min number of locations: ', np.min(np.array(sizes)))

Mean number of locations:  400.01
Max number of locations:  468
Min number of locations:  369


Create sequences for each client

In [16]:
# Merge back the dataframes
df_train = pd.concat(users_locations_train)
df_train.drop(['index', 'day', 'month'], axis=1, inplace=True)

# Merge back the dataframes
df_val = pd.concat(users_locations_val)
df_val.drop(['index', 'day', 'month'], axis=1, inplace=True)

# Merge back the dataframes
df_test = pd.concat(users_locations_test)
df_test.drop(['index', 'day', 'month'], axis=1, inplace=True)

In [17]:
# list of unique medallions
medallions_list = df_train.medallion.unique()

# number of unique medallions
medallions_num = len(medallions_list)
print(medallions_num)

100


In [18]:
# Split the data into chunks of N+1
N = 17

# dictionary of list of df 
df_dictionary = {}

for medallion in tqdm(medallions_list):

  # Get the records of the user
  user_df_train = df_train.loc[df_train.medallion == medallion].copy()
  user_df_val = df_val.loc[df_val.medallion == medallion].copy()
  user_df_test = df_test.loc[df_test.medallion == medallion].copy()

  # Get a list of dataframes of length N records 
  user_list_train = [user_df_train[i:i+N] for i in range(0, user_df_train.shape[0], N)]
  user_list_val = [user_df_val[i:i+N] for i in range(0, user_df_val.shape[0], N)]
  user_list_test = [user_df_test[i:i+N] for i in range(0, user_df_test.shape[0], N)]

  # Save the list of dataframes into a dictionary
  df_dictionary[medallion] = {
      'train': user_list_train,
      'val': user_list_val,
      'test': user_list_test
  }

'''
# Validation
# Get a list of dataframes of length n records 
list_val = [val[i:i+N] for i in range(0, val.shape[0], N)]

# Test
# Get a list of dataframes of length n records 
list_test = [test[i:i+N] for i in range(0, test.shape[0], N)]
list_test[0]'''

100%|██████████| 100/100 [00:01<00:00, 78.00it/s]


'\n# Validation\n# Get a list of dataframes of length n records \nlist_val = [val[i:i+N] for i in range(0, val.shape[0], N)]\n\n# Test\n# Get a list of dataframes of length n records \nlist_test = [test[i:i+N] for i in range(0, test.shape[0], N)]\nlist_test[0]'

In [19]:
df_train.columns.values

array(['medallion', 'location_id', 'hour_sin', 'hour_cos', 'week_day_sin',
       'week_day_cos', 'weekend'], dtype=object)

In [20]:
# Create the dictionary to create a clientData
columns_names = df_train.columns.values[1:]

# Takes a dictionary with train, validation an test sets and the desired set type
def create_clients_dict(df_dictionary, set_type):
  
  dataset_dict = {}

  for medallion in tqdm(medallions_list):

    c_data = collections.OrderedDict()
    values = df_dictionary[medallion][set_type]

    # If the last dataframe of the list is not complete
    if len(values[-1]) < N:
      diff = 1
    else:
      diff = 0

    if len(values) > 0:
      for header in columns_names:
        #c_data[header] = values[header].values.tolist()
        c_data[header] = [values[i][header].values for i in range(0, len(values)-diff)] #[:-1]
        #c_data['y'] = values['dropoff_location_id'].values.tolist()
      dataset_dict[medallion] = c_data
      
  return dataset_dict


In [21]:
# Generate the dictionaries for each set
clients_train_dict = create_clients_dict(df_dictionary, 'train')
clients_val_dict = create_clients_dict(df_dictionary, 'val')
clients_test_dict = create_clients_dict(df_dictionary, 'test')

100%|██████████| 100/100 [00:01<00:00, 86.36it/s]
100%|██████████| 100/100 [00:00<00:00, 490.20it/s]
100%|██████████| 100/100 [00:00<00:00, 241.55it/s]


In [22]:
# Convert the dictionary to a dataset
client_train_data = tff.simulation.FromTensorSlicesClientData(clients_train_dict)
client_val_data = tff.simulation.FromTensorSlicesClientData(clients_val_dict)
client_test_data = tff.simulation.FromTensorSlicesClientData(clients_test_dict)

In [23]:
client_train_data.create_tf_dataset_for_client(medallions_list[0]).element_spec

OrderedDict([('location_id',
              TensorSpec(shape=(17,), dtype=tf.int32, name=None)),
             ('hour_sin',
              TensorSpec(shape=(17,), dtype=tf.float64, name=None)),
             ('hour_cos',
              TensorSpec(shape=(17,), dtype=tf.float64, name=None)),
             ('week_day_sin',
              TensorSpec(shape=(17,), dtype=tf.float64, name=None)),
             ('week_day_cos',
              TensorSpec(shape=(17,), dtype=tf.float64, name=None)),
             ('weekend', TensorSpec(shape=(17,), dtype=tf.int32, name=None))])

Retrieve and example dataset from client_data to take a look at its structure

In [24]:
example_dataset = client_train_data.create_tf_dataset_for_client(
    client_train_data.client_ids[1])

example_element = next(iter(example_dataset))
example_element

OrderedDict([('location_id',
              <tf.Tensor: shape=(17,), dtype=int32, numpy=
              array([249, 262, 133, 244, 141, 141, 186, 186, 237, 237, 143, 229, 230,
                      79, 137, 233, 114])>),
             ('hour_sin',
              <tf.Tensor: shape=(17,), dtype=float64, numpy=
              array([ 0.00000000e+00,  0.00000000e+00,  7.07106781e-01,  8.66025404e-01,
                      5.00000000e-01,  5.00000000e-01,  5.00000000e-01,  5.00000000e-01,
                      2.58819045e-01,  2.58819045e-01,  2.58819045e-01,  1.22464680e-16,
                      1.22464680e-16,  1.22464680e-16,  1.22464680e-16, -2.58819045e-01,
                     -2.58819045e-01])>),
             ('hour_cos',
              <tf.Tensor: shape=(17,), dtype=float64, numpy=
              array([ 1.        ,  1.        ,  0.70710678,  0.5       , -0.8660254 ,
                     -0.8660254 , -0.8660254 , -0.8660254 , -0.96592583, -0.96592583,
                     -0.96592583, -1.

*Shuffle the individual examples, organize them into batches and renames the target feature from `dropoff_location_id ` to y for use with Keras. We also throw in a repeat over the data set to run several epochs.*

Because `tff.learning.from_keras_model` wants as input_spec a dictionary of 2 elements (x,y) and we have multiple inputs, we have to make also x a ditionary.

In this way we will be able to process each feature separately in the model

In [25]:
NUM_CLIENTS = medallions_num
NUM_EPOCHS = 4
BATCH_SIZE = 16
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 5

def preprocess(dataset):
  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=collections.OrderedDict(
          start_place=tf.reshape(element['location_id'][:, :-1], [-1, N-1]),
          start_hour_sin=tf.reshape(element['hour_sin'][:, :-1], [-1, N-1]),
          start_hour_cos=tf.reshape(element['hour_cos'][:, :-1], [-1, N-1]),
          week_day_sin=tf.reshape(element['week_day_sin'][:, :-1], [-1, N-1]),
          week_day_cos=tf.reshape(element['week_day_cos'][:, :-1], [-1, N-1]),
          weekend=tf.reshape(element['weekend'][:, :-1], [-1, N-1])
          ),
        y=tf.reshape(element['location_id'][:, 1:], [-1, N-1]))
  return dataset.repeat(NUM_EPOCHS).batch(BATCH_SIZE, drop_remainder=True).map(batch_format_fn).prefetch(PREFETCH_BUFFER) # .shuffle(SHUFFLE_BUFFER)

Test the preprocessing on a single client dataset


---



In [26]:
preprocessed_example_dataset = preprocess(example_dataset)
sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

sample_batch['x']['start_place'].shape

(16, 16)

In [27]:
preprocessed_example_dataset

<PrefetchDataset shapes: OrderedDict([(x, OrderedDict([(start_place, (16, 16)), (start_hour_sin, (16, 16)), (start_hour_cos, (16, 16)), (week_day_sin, (16, 16)), (week_day_cos, (16, 16)), (weekend, (16, 16))])), (y, (16, 16))]), types: OrderedDict([(x, OrderedDict([(start_place, tf.int32), (start_hour_sin, tf.float64), (start_hour_cos, tf.float64), (week_day_sin, tf.float64), (week_day_cos, tf.float64), (weekend, tf.int32)])), (y, tf.int32)])>

In [28]:
preprocessed_example_dataset.element_spec

OrderedDict([('x',
              OrderedDict([('start_place',
                            TensorSpec(shape=(16, 16), dtype=tf.int32, name=None)),
                           ('start_hour_sin',
                            TensorSpec(shape=(16, 16), dtype=tf.float64, name=None)),
                           ('start_hour_cos',
                            TensorSpec(shape=(16, 16), dtype=tf.float64, name=None)),
                           ('week_day_sin',
                            TensorSpec(shape=(16, 16), dtype=tf.float64, name=None)),
                           ('week_day_cos',
                            TensorSpec(shape=(16, 16), dtype=tf.float64, name=None)),
                           ('weekend',
                            TensorSpec(shape=(16, 16), dtype=tf.int32, name=None))])),
             ('y', TensorSpec(shape=(16, 16), dtype=tf.int32, name=None))])

The ways to feed federated data to TFF in a simulation is simply as a Python list, with each element of the list holding the data of an individual user, whether as a list or as a tf.data.Dataset.

In [29]:
def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in tqdm(client_ids)
  ]

Of course, we are in a simulation environment, and all the data is locally available. Typically then, when running simulations, we would simply sample a random subset of the clients to be involved in each round of training, generally different in each round.

That said, as you can find out by studying the paper on the Federated Averaging algorithm, achieving convergence in a system with randomly sampled subsets of clients in each round can take a while, and it would be impractical to have to run hundreds of rounds in this interactive tutorial.

What we'll do instead is sample the set of clients once, and reuse the same set across rounds to speed up convergence (intentionally over-fitting to these few user's data). We leave it as an exercise for the reader to modify this tutorial to simulate random sampling - it is fairly easy to do (once you do, keep in mind that getting the model to converge may take a while).

In [30]:
# Select the clients
sample_clients = client_train_data.client_ids[0:NUM_CLIENTS]

# Federate the clients datasets
federated_train_data = make_federated_data(client_train_data, sample_clients)
federated_val_data = make_federated_data(client_val_data, sample_clients)
federated_test_data = make_federated_data(client_test_data, sample_clients)


print('\nNumber of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))

100%|██████████| 100/100 [00:02<00:00, 48.88it/s]
100%|██████████| 100/100 [00:01<00:00, 54.79it/s]
100%|██████████| 100/100 [00:01<00:00, 60.53it/s]


Number of client datasets: 100
First dataset: <PrefetchDataset shapes: OrderedDict([(x, OrderedDict([(start_place, (16, 16)), (start_hour_sin, (16, 16)), (start_hour_cos, (16, 16)), (week_day_sin, (16, 16)), (week_day_cos, (16, 16)), (weekend, (16, 16))])), (y, (16, 16))]), types: OrderedDict([(x, OrderedDict([(start_place, tf.int32), (start_hour_sin, tf.float64), (start_hour_cos, tf.float64), (week_day_sin, tf.float64), (week_day_cos, tf.float64), (weekend, tf.int32)])), (y, tf.int32)])>





# Federated Model

In [31]:
# All the different places in the dataset
indices = np.concatenate((df.pickup_location_id.values, df.dropoff_location_id.values))

# Length of the vocabulary of places (e.g. 11)
vocab_size = int(np.max(indices) + 1) # + 1 because of 0

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 256

# List of numerical column names
numerical_column_names = ['start_hour_sin', 'start_hour_cos', 'weekend', 'week_day']

# Number of different places
number_of_places =  max(locations_sequence.location_id.max(), locations_sequence.location_id.max()) + 1

Define the model

In [32]:
# Create a model
def create_keras_model(number_of_places, batch_size):
  
	# Shortcut to the layers package
  l = tf.keras.layers
	
  # List of numeric feature columns to pass to the DenseLayer
  numeric_feature_columns = []


  # Handling numerical columns 
  for header in numerical_column_names:
		# Append all the numerical columns defined into the list
    numeric_feature_columns.append(feature_column.numeric_column(header, shape=N-1))

  # Now we need to define an input dictionary.
	# Where the keys are the column names
	# This is a model with multiple inputs, so we need to declare and input layer for each feature
  feature_inputs = {
    'start_hour_sin': tf.keras.Input((N-1, ), batch_size=batch_size, name='start_hour_sin'),
    'start_hour_cos': tf.keras.Input((N-1, ), batch_size=batch_size, name='start_hour_cos'),
    'weekend': tf.keras.Input((N-1, ), batch_size=batch_size, name='weekend'),
    'week_day_sin': tf.keras.Input((N-1, ), batch_size=batch_size, name='week_day_sin'),
    'week_day_cos': tf.keras.Input((N-1, ), batch_size=batch_size, name='week_day_cos'),
  }

  # We declare two DenseFeature layers, one for the numeric columns which do not require\ 
	# Any training, and one for the categorical. It is easier to do it like this
  '''numerical_features = l.DenseFeatures(numeric_feature_columns)(feature_inputs)'''
  
  # We cannot use anarray of features as always because we have sequences and we cannot match the shape otherwise
  # We have to do one by one
  start_hour_sin = feature_column.numeric_column("start_hour_sin", shape=(N-1))
  hour_sin_feature = l.DenseFeatures(start_hour_sin)(feature_inputs)

  start_hour_cos = feature_column.numeric_column("start_hour_cos", shape=(N-1))
  hour_cos_feature = l.DenseFeatures(start_hour_cos)(feature_inputs)

  weekend = feature_column.numeric_column("weekend", shape=(N-1))
  weekend_feature = l.DenseFeatures(weekend)(feature_inputs)

  week_day_sin = feature_column.numeric_column("week_day_sin", shape=(N-1))
  week_day_sin_feature = l.DenseFeatures(week_day_sin)(feature_inputs)

  week_day_cos = feature_column.numeric_column("week_day_cos", shape=(N-1))
  week_day_cos_feature = l.DenseFeatures(week_day_cos)(feature_inputs)
  
	# We have also to add a dimension to then concatenate
  hour_sin_feature = tf.expand_dims(hour_sin_feature, -1)
  hour_cos_feature = tf.expand_dims(hour_cos_feature, -1)
  weekend_feature = tf.expand_dims(weekend_feature, -1)
  week_day_sin_feature = tf.expand_dims(week_day_sin_feature, -1)
  week_day_cos_feature = tf.expand_dims(week_day_cos_feature, -1)

  # Declare the dictionary for the places sequence as before
  sequence_input = {
      'start_place': tf.keras.Input((N-1,), batch_size=batch_size, dtype=tf.dtypes.int32, name='start_place') # add batch_size=batch_size in case of stateful GRU
  }


  # Handling the categorical feature sequence using one-hot
  places_one_hot = feature_column.sequence_categorical_column_with_vocabulary_list(
      'start_place', [i for i in range(number_of_places)])
  
  # Embed the one-hot encoding
  places_embed = feature_column.embedding_column(places_one_hot, embedding_dim)


  # With an input sequence we can't use the DenseFeature layer, we need to use the SequenceFeatures
  sequence_features, sequence_length = tf.keras.experimental.SequenceFeatures(places_embed)(sequence_input)

  input_sequence = l.Concatenate(axis=2)([ sequence_features, hour_sin_feature, hour_cos_feature, weekend_feature, week_day_sin_feature, week_day_cos_feature])

  # Rnn
  recurrent = l.GRU(rnn_units,
                        batch_size=batch_size, #in case of stateful
                        dropout=0.3,
                        return_sequences=True,
                        stateful=True,
                        recurrent_initializer='glorot_uniform')(input_sequence)


	# Last layer with an output for each places
  dense_1 = layers.Dense(number_of_places)(recurrent)

	# Softmax output layer
  output = l.Softmax()(dense_1)
	
	# To return the Model, we need to define its inputs and outputs
	# In out case, we need to list all the input layers we have defined 
  inputs = list(feature_inputs.values()) + list(sequence_input.values())

	# Return the Model
  return tf.keras.Model(inputs=inputs, outputs=output)

Function to evaluate the federated model on the server.
With and without round number.

In [33]:
def keras_evaluate(state, round_num, dataset, tb=0):
  # Take our global model weights and push them back into a Keras model to
  # use its standard `.evaluate()` method.
  keras_model = create_keras_model(number_of_places, batch_size=BATCH_SIZE)
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

	# Load state server parameters into the Keras model
  state.model.assign_weights_to(keras_model)
  loss, accuracy = keras_model.evaluate(dataset)
  if tb == 1:
    with eval_summary_writer.as_default():
        for name, value in dict(val_metrics).items():
          tf.summary.scalar('epoch_loss', loss, step=round_num)
          tf.summary.scalar('epoch_sparse_categorical_accuracy', accuracy, step=round_num)
  print('\tEVAL: loss={l:.3f}, accuracy={a:.3f}'.format(l=loss, a=accuracy))

In [34]:
def keras_evaluate(state, dataset, tb=0):
  # Take our global model weights and push them back into a Keras model to
  # use its standard `.evaluate()` method.
  keras_model = create_keras_model(number_of_places, batch_size=BATCH_SIZE)
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

	# Load state server parameters into the Keras model
  state.model.assign_weights_to(keras_model)
  loss, accuracy = keras_model.evaluate(dataset)
  print('\tEVAL: loss={l:.3f}, accuracy={a:.3f}'.format(l=loss, a=accuracy))

Compile the model

In [35]:
keras_model = create_keras_model(number_of_places, batch_size=BATCH_SIZE)
keras_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
keras_model.summary()

Model: "functional_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
start_hour_cos (InputLayer)     [(16, 16)]           0                                            
__________________________________________________________________________________________________
start_hour_sin (InputLayer)     [(16, 16)]           0                                            
__________________________________________________________________________________________________
week_day_cos (InputLayer)       [(16, 16)]           0                                            
__________________________________________________________________________________________________
week_day_sin (InputLayer)       [(16, 16)]           0                                            
_______________________________________________________________________________________

TFF serializes all TensorFlow computations so they can potentially be run in a non-Python environment (even though at the moment, only a simulation runtime implemented in Python is available). Even though we are running in eager mode, (TF 2.0), currently TFF serializes TensorFlow computations by constructing the necessary ops inside the context of a "with tf.Graph.as_default()" statement. Thus, we need to provide a function that TFF can use to introduce our model into a graph it controls. We do this as follows:

In [36]:
# Clone the keras_model inside `create_tff_model()`, which TFF will
# call to produce a new copy of the model inside the graph that it will 
# serialize. Note: we want to construct all the necessary objects we'll need 
# _inside_ this method.
def create_tff_model():
  # TFF uses an `input_spec` so it knows the types and shapes
  # that your model expects.
  input_spec = preprocessed_example_dataset.element_spec
  keras_model_clone = create_keras_model(number_of_places, batch_size=BATCH_SIZE)
  return tff.learning.from_keras_model(
      keras_model_clone,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

In [37]:
preprocessed_example_dataset.element_spec

OrderedDict([('x',
              OrderedDict([('start_place',
                            TensorSpec(shape=(16, 16), dtype=tf.int32, name=None)),
                           ('start_hour_sin',
                            TensorSpec(shape=(16, 16), dtype=tf.float64, name=None)),
                           ('start_hour_cos',
                            TensorSpec(shape=(16, 16), dtype=tf.float64, name=None)),
                           ('week_day_sin',
                            TensorSpec(shape=(16, 16), dtype=tf.float64, name=None)),
                           ('week_day_cos',
                            TensorSpec(shape=(16, 16), dtype=tf.float64, name=None)),
                           ('weekend',
                            TensorSpec(shape=(16, 16), dtype=tf.int32, name=None))])),
             ('y', TensorSpec(shape=(16, 16), dtype=tf.int32, name=None))])

We use a compiled Keras model to perform standard (non-federated) evaluation after each round of federated training. This is useful for research purposes when doing simulated federated learning and there is a standard test dataset.

In a realistic production setting this same technique might be used to take models trained with federated learning and evaluate them on a centralized benchmark dataset for testing or quality assurance purposes.

In [38]:
# This command builds all the TensorFlow graphs and serializes them: 
fed_avg = tff.learning.build_federated_averaging_process(
    model_fn=create_tff_model,
    client_optimizer_fn=lambda: tf.keras.optimizers.Adam(learning_rate=0.002),
    server_optimizer_fn=lambda: tf.keras.optimizers.Adam(learning_rate=0.06))

In [39]:
# EMBEDDED
str(fed_avg.initialize.type_signature)

'( -> <model=<trainable=<float32[264,256],float32[261,768],float32[256,768],float32[2,768],float32[256,264],float32[264]>,non_trainable=<>>,optimizer_state=<int64,float32[264,256],float32[261,768],float32[256,768],float32[2,768],float32[256,264],float32[264],float32[264,256],float32[261,768],float32[256,768],float32[2,768],float32[256,264],float32[264]>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER)'

In [40]:
state = fed_avg.initialize()

In [41]:
# State Embedded
str(state)

"ServerState(model=ModelWeights(trainable=[array([[-0.03832111,  0.06345753,  0.02792387, ..., -0.06383753,\n        -0.07725416, -0.04139237],\n       [-0.04978357, -0.0059065 , -0.06005683, ...,  0.02992821,\n        -0.09823437, -0.10912035],\n       [ 0.02096535,  0.08714084,  0.03277419, ...,  0.00409269,\n        -0.04394473,  0.01000651],\n       ...,\n       [-0.04548226,  0.01850693, -0.04152045, ..., -0.07707153,\n        -0.00912718, -0.01960481],\n       [ 0.11825164,  0.01977342,  0.05901024, ...,  0.06455778,\n         0.00617709,  0.03220361],\n       [-0.01983552, -0.01471853, -0.03114764, ..., -0.01729118,\n         0.06285027, -0.08649164]], dtype=float32), array([[-0.05133128, -0.00804715, -0.06648964, ..., -0.05323223,\n         0.07372577, -0.05255451],\n       [-0.01055259, -0.070721  , -0.02611122, ..., -0.0069702 ,\n        -0.05364567,  0.0629574 ],\n       [ 0.00867224, -0.00455649, -0.06494258, ..., -0.02242834,\n         0.0377905 ,  0.0453207 ],\n       ...

In [42]:
evaluation = tff.learning.build_federated_evaluation(model_fn=create_tff_model)

Tensorboard

In [43]:
#@test {"skip": true}
# Log directory where we want to save the logs
train_logdir = './FL/tb_final/fl_rnn/train'
val_logdir = './FL/tb_final/fl_rnn/val'
#eval_logdir = baseURL + 'tb_final/fl_rnn/eval'

# Summary writer to save the logs
train_summary_writer = tf.summary.create_file_writer(train_logdir)
val_summary_writer = tf.summary.create_file_writer(val_logdir)
#eval_summary_writer = tf.summary.create_file_writer(eval_logdir)

In [44]:
# Run this cell to clean your directory of old output for future graphs from this directory.
#!rm -R '/content/gdrive/My Drive/tb/fl_rnn/'

# Training

Then we just need to wrap our training loop with the summary_writer:

In [45]:
NUM_ROUNDS = 15
#Plot the relevant scalar metrics with the same summary writer.
#@test {"skip": true}
with train_summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS + 1):
    print('Round {r}'.format(r=round_num))

    # Uncomment to simulate sparse availabily of clients
    # data_for_this_round = sample(federated_train_data)

    state, metrics = fed_avg.next(state, federated_train_data)

    # Federated train
    train_metrics = metrics['train']
    print('\tTrain: loss={l:.3f}, accuracy={a:.3f}'.format(l=train_metrics['loss'], a=train_metrics['sparse_categorical_accuracy']))

    # Federated evaluation
    val_metrics = evaluation(state.model, federated_val_data)
    print('\tValidation: loss={l:.3f}, accuracy={a:.3f}'.format( l=val_metrics['loss'], a=val_metrics['sparse_categorical_accuracy']))
    
    # Centralized Evaluation
    #keras_evaluate(state, round_num, val_dataset, tb=1)
    #print(' ')

    print('\twriting..')
    # Iterate across the train metrics and write their data
    for name, value in dict(train_metrics).items():
      # print('\tname: {}, value:{}, step={}'.format(name,value,round_num))
      tf.summary.scalar('epoch_'+name, value, step=round_num)
      
    # Validation metrics
    with val_summary_writer.as_default():
      for name, value in dict(val_metrics).items():
        # print('\twriting..')
         #print('\tname: {}, value:{}, step={}'.format(name,value,round_num))
        tf.summary.scalar('epoch_'+name, value, step=round_num)

train_summary_writer.close()
val_summary_writer.close()
#eval_summary_writer.close()

Round 1
	Train: loss=4.529, accuracy=0.061
	Validation: loss=4.949, accuracy=0.154
	writing..
Round 2
	Train: loss=4.348, accuracy=0.153
	Validation: loss=5.648, accuracy=0.096
	writing..
Round 3
	Train: loss=3.990, accuracy=0.184
	Validation: loss=4.307, accuracy=0.170
	writing..
Round 4
	Train: loss=3.729, accuracy=0.195
	Validation: loss=4.090, accuracy=0.130
	writing..
Round 5
	Train: loss=3.683, accuracy=0.179
	Validation: loss=3.965, accuracy=0.161
	writing..
Round 6
	Train: loss=3.663, accuracy=0.193
	Validation: loss=3.800, accuracy=0.198
	writing..
Round 7
	Train: loss=3.604, accuracy=0.203
	Validation: loss=3.941, accuracy=0.231
	writing..
Round 8
	Train: loss=3.640, accuracy=0.205
	Validation: loss=3.707, accuracy=0.247
	writing..
Round 9
	Train: loss=3.570, accuracy=0.206
	Validation: loss=3.699, accuracy=0.249
	writing..
Round 10
	Train: loss=3.560, accuracy=0.208
	Validation: loss=3.674, accuracy=0.249
	writing..
Round 11
	Train: loss=3.541, accuracy=0.209
	Validation: lo

Training with early-stopping

In [46]:
# Local model for evaluation
'''keras_model = create_keras_model(number_of_places, batch_size=BATCH_SIZE)
keras_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])'''

tolerance = 7
best_state = 0
lowest_loss = 100.00
stop = tolerance

NUM_ROUNDS = 40
with train_summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS + 1):
    print('Round {r}'.format(r=round_num))

    # Uncomment to simulate sparse availabily of clients
    # train_data_for_this_round, val_data_for_this_round = sample((federated_train_data, federated_val_data), 20, NUM_CLIENTS)

    state, metrics = fed_avg.next(state, federated_train_data)

    train_metrics = metrics['train']
    print('\tTrain: loss={l:.3f}, accuracy={a:.3f}'.format(l=train_metrics['loss'], a=train_metrics['sparse_categorical_accuracy']))

    val_metrics = evaluation(state.model, federated_val_data)
    print('\tValidation: loss={l:.3f}, accuracy={a:.3f}'.format( l=val_metrics['loss'], a=val_metrics['sparse_categorical_accuracy']))
    
    # Check for decreasing validation loss
    if lowest_loss > val_metrics['loss']:
      print('\tSaving best model..')
      lowest_loss = val_metrics['loss']
      best_state = state
      stop = tolerance - 1 
    else:
      stop = stop - 1
      if stop <= 0:
        print('\tEarly stopping...')
        break;
    
    # keras_evaluate(state, round_num, val_dataset)
    # Evaluation
    '''state.model.assign_weights_to(keras_model)
    loss, accuracy = keras_model.evaluate(val_dataset)
    print('\tEVAL: loss={l:.3f}, accuracy={a:.3f}'.format(l=loss, a=accuracy))'''
    print(' ')
    print('\twriting..')

    # Iterate across the metrics and write their data
    for name, value in dict(train_metrics).items():
      # print('\tname: {}, value:{}, step={}'.format(name,value,round_num))
      tf.summary.scalar('epoch_'+name, value, step=round_num)

    with val_summary_writer.as_default():
      for name, value in dict(val_metrics).items():
        # print('\twriting..')
        # print('\tname: {}, value:{}, step={}'.format(name,value,round_num))
        tf.summary.scalar('epoch_'+name, value, step=round_num)

train_summary_writer.close()
val_summary_writer.close()

RuntimeError: SummaryWriter is already closed

Test best saved model

In [None]:
# Test the model
test_metrics = evaluation(best_state.model, federated_test_data)
print('\tEvaluation: loss={l:.3f}, accuracy={a:.3f}'.format( l=test_metrics['loss'], a=test_metrics['sparse_categorical_accuracy']))

In [None]:
# Centralized test
keras_evaluate(best_state, dataset=test_dataset)

Test last model

In [None]:
# Test the model
test_metrics = evaluation(state.model, federated_test_data)
print('\tEvaluation: loss={l:.3f}, accuracy={a:.3f}'.format( l=test_metrics['loss'], a=test_metrics['sparse_categorical_accuracy']))

In [None]:
# Centralized test
keras_evaluate(state, dataset=test_dataset)

Tensorboard

In [None]:
import datetime, os
#!kill 719 # If you want to kill the process with the PID
#%reload_ext tensorboard

# We cannot use the logdir variable, we need to define it manually
#%tensorboard --logdir '/content/gdrive/My Drive/NYC Dataset/tb/'

### Training by sampling clients at each round

In [None]:
# sample a subset of clients
# federated_data is a tuple with the train and the validation data
def sample(federate_data, n, n_clients):
  client_ids = np.random.choice(n_clients, n, replace=False).astype(int)
  return [federate_data[0][i] for i in client_ids], [federate_data[1][i] for i in client_ids]

Tensorboard

In [None]:
#@test {"skip": true}
# Log directory where we want to save the logs
train_logdir = './FL/tb/fl_rnn_sampling_new/train'
val_logdir = './FL/tb/fl_rnn_sampling_new/val'
eval_logdir = './FL/tb/fl_rnn_sampling_new/eval'

# Summary writer to save the logs
train_summary_writer = tf.summary.create_file_writer(train_logdir)
val_summary_writer = tf.summary.create_file_writer(val_logdir)
eval_summary_writer = tf.summary.create_file_writer(eval_logdir)

In [None]:
NUM_ROUNDS = 10
with train_summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS + 1):
    print('Round {r}'.format(r=round_num))
    train_data_for_this_round, val_data_for_this_round = sample((federated_train_data, federated_val_data), 50, NUM_CLIENTS)
    state, metrics = fed_avg.next(state, train_data_for_this_round)
    train_metrics = metrics['train']
    print('\tTrain: loss={l:.3f}, accuracy={a:.3f}'.format(l=train_metrics['loss'], a=train_metrics['sparse_categorical_accuracy']))
    val_metrics = evaluation(state.model, federated_val_data)
    print('\tValidation: loss={l:.3f}, accuracy={a:.3f}'.format( l=val_metrics['loss'], a=val_metrics['sparse_categorical_accuracy']))
    #keras_evaluate(state, round_num, val_dataset, tb=1)
    print(' ')

    print('\twriting..')  
    # Iterate across the metrics and write their data
    for name, value in dict(train_metrics).items():
      
       #print('\tname: {}, value:{}, step={}'.format(name,value,round_num))
      tf.summary.scalar('epoch_'+name, value, step=round_num)

    with val_summary_writer.as_default():
      for name, value in dict(val_metrics).items():
        #print('\twriting..')
        #print('\tname: {}, value:{}, step={}'.format(name,value,round_num))
        tf.summary.scalar('epoch_'+name, value, step=round_num)
        
train_summary_writer.close()
val_summary_writer.close()
eval_summary_writer.close()

In [None]:
# test the model
test_metrics = evaluation(state.model, federated_test_data)
print('\tEvaluation: loss={l:.3f}, accuracy={a:.3f}'.format( l=test_metrics['loss'], a=test_metrics['sparse_categorical_accuracy']))

In [None]:
# Centralized test
keras_evaluate(state, dataset=test_dataset)