Skip to content

Commit

Permalink
Enable compression options (#63)
Browse files Browse the repository at this point in the history
* Fixes to delta calculation and lossy compression flow. Added new workspace that enables lossy compression by default

* WIP

* Fixed bug in sparsity calculation. Working now for single collaborator with 90% sparsity

* Cleaning up code and adding documentation

* Removing comments

* Fixing flake8 linting

* Fix F541

* Fixed collaborator unit tests

* Store additional rounds for tf_cnn_histology

Co-authored-by: Alexey Gruzdev <alexey.gruzdev@intel.com>
  • Loading branch information
psfoley and alexey-gruzdev committed May 5, 2021
1 parent 704dfd5 commit 0412c82
Show file tree
Hide file tree
Showing 31 changed files with 440 additions and 74 deletions.
1 change: 1 addition & 0 deletions docs/advanced_topics.rst
Expand Up @@ -11,5 +11,6 @@ Advanced Topics
:maxdepth: 4

multiple_plans
compression_settings


27 changes: 27 additions & 0 deletions docs/compression_settings.rst
@@ -0,0 +1,27 @@
.. # Copyright (C) 2021 Intel Corporation
.. # Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.
.. _compression_settings:

***************
Compression Settings
***************

Federated Learning can enable tens to thousands of participants to work together on the same model, but with this scaling comes increased communication cost. Furthermore, large models exacerbate this problem. For this reason we make compression is a core capability of |productName|, and our framework supports several lossless and lossy compression pipelines out of the box. In general, the weights of a model are typically not robust to information loss, so no compression is applied by default to the model weights sent bidirectionally; however, the deltas between the model weights for each round are inherently more sparse and better suited for lossy compression. The following is the list of compression pipelines that |productName| currently supports:

* ``NoCompressionPipeline``: This is the default option applied to model weights
* ``RandomShiftPipeline``: A **lossless** pipeline that randomly shifts the weights during transport
* ``STCPipeline``: A **lossy** pipeline consisting of three transformations: *Sparsity Transform* (p_sparsity=0.1), which by default retains only the (p*100)% absolute values of greatest magnitude. *Ternary Transform*, which discretizes the sparse array into three buckets, and finally a *GZIP Transform*.
* ``SKCPipeline``: A **lossy** pipeline consisting of three transformations: *Sparsity Transform* (p=0.1), which by default retains only the(p*100)% absolute values of greatest magnitude. *KMeans Transform* (k=6), which applies the KMeans algorithm to the sparse array with *k* centroids, and finally a *GZIP Transform*.
* ``KCPipeline``: A **lossy** pipeline consisting of two transformations: *KMeans Transform* (k=6), which applies the KMeans algorithm to the original weight array with *k* centroids, and finally a *GZIP Transform*.

We provide an example template, **keras_cnn_with_compression**, that utilizes the *KCPipeline* with 6 centroids for KMeans. To gain a better understanding of how experiments perform with greater or fewer centroids, you can modify the *n_clusters* parameter in the template's plan.yaml:

.. code-block:: console
compression_pipeline :
defaults : plan/defaults/compression_pipeline.yaml
template : openfl.pipelines.KCPipeline
settings :
n_clusters : 6
3 changes: 3 additions & 0 deletions openfl-workspace/default/plan/plan.yaml
Expand Up @@ -31,3 +31,6 @@ assigner :

tasks :
defaults : plan/defaults/tasks_fast_estimator.yaml

compression_pipeline :
defaults : plan/defaults/compression_pipeline.yaml
3 changes: 3 additions & 0 deletions openfl-workspace/fe_tf_adversarial_cifar/plan/plan.yaml
Expand Up @@ -37,3 +37,6 @@ assigner :

tasks :
defaults : plan/defaults/tasks_fast_estimator.yaml

compression_pipeline :
defaults : plan/defaults/compression_pipeline.yaml
3 changes: 3 additions & 0 deletions openfl-workspace/fe_torch_adversarial_cifar/plan/plan.yaml
Expand Up @@ -37,3 +37,6 @@ assigner :

tasks :
defaults : plan/defaults/tasks_fast_estimator.yaml

compression_pipeline :
defaults : plan/defaults/compression_pipeline.yaml
3 changes: 3 additions & 0 deletions openfl-workspace/keras_cnn_mnist/plan/plan.yaml
Expand Up @@ -37,3 +37,6 @@ assigner :

tasks :
defaults : plan/defaults/tasks_keras.yaml

compression_pipeline :
defaults : plan/defaults/compression_pipeline.yaml
2 changes: 2 additions & 0 deletions openfl-workspace/keras_cnn_with_compression/.workspace
@@ -0,0 +1,2 @@
current_plan_name: default

3 changes: 3 additions & 0 deletions openfl-workspace/keras_cnn_with_compression/code/__init__.py
@@ -0,0 +1,3 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""You may copy this file as the starting point of your own model."""
85 changes: 85 additions & 0 deletions openfl-workspace/keras_cnn_with_compression/code/keras_cnn.py
@@ -0,0 +1,85 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""You may copy this file as the starting point of your own model."""

import tensorflow.keras as ke

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D, Flatten, Dense

from openfl.federated import KerasTaskRunner


class KerasCNN(KerasTaskRunner):
"""A basic convolutional neural network model."""

def __init__(self, **kwargs):
"""
Initialize.
Args:
**kwargs: Additional parameters to pass to the function
"""
super().__init__(**kwargs)

self.model = self.build_model(self.feature_shape, self.data_loader.num_classes, **kwargs)

self.initialize_tensorkeys_for_functions()

self.model.summary(print_fn=self.logger.info)

if self.data_loader is not None:
self.logger.info(f'Train Set Size : {self.get_train_data_size()}')
self.logger.info(f'Valid Set Size : {self.get_valid_data_size()}')

def build_model(self,
input_shape,
num_classes,
conv_kernel_size=(4, 4),
conv_strides=(2, 2),
conv1_channels_out=16,
conv2_channels_out=32,
final_dense_inputsize=100,
**kwargs):
"""
Define the model architecture.
Args:
input_shape (numpy.ndarray): The shape of the data
num_classes (int): The number of classes of the dataset
Returns:
tensorflow.python.keras.engine.sequential.Sequential: The model defined in Keras
"""
model = Sequential()

model.add(Conv2D(conv1_channels_out,
kernel_size=conv_kernel_size,
strides=conv_strides,
activation='relu',
input_shape=input_shape))

model.add(Conv2D(conv2_channels_out,
kernel_size=conv_kernel_size,
strides=conv_strides,
activation='relu'))

model.add(Flatten())

model.add(Dense(final_dense_inputsize, activation='relu'))

model.add(Dense(num_classes, activation='softmax'))

model.compile(loss=ke.losses.categorical_crossentropy,
optimizer=ke.optimizers.Adam(),
metrics=['accuracy'])

# initialize the optimizer variables
opt_vars = model.optimizer.variables()

for v in opt_vars:
v.initializer.run(session=self.sess)

return model
118 changes: 118 additions & 0 deletions openfl-workspace/keras_cnn_with_compression/code/mnist_utils.py
@@ -0,0 +1,118 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""You may copy this file as the starting point of your own model."""

import numpy as np

from logging import getLogger
from tensorflow.python.keras.utils.data_utils import get_file

logger = getLogger(__name__)


def one_hot(labels, classes):
"""
One Hot encode a vector.
Args:
labels (list): List of labels to onehot encode
classes (int): Total number of categorical classes
Returns:
np.array: Matrix of one-hot encoded labels
"""
return np.eye(classes)[labels]


def _load_raw_datashards(shard_num, collaborator_count):
"""
Load the raw data by shard.
Returns tuples of the dataset shard divided into training and validation.
Args:
shard_num (int): The shard number to use
collaborator_count (int): The number of collaborators in the federation
Returns:
2 tuples: (image, label) of the training, validation dataset
"""
origin_folder = 'https://storage.googleapis.com/tensorflow/tf-keras-datasets/'
path = get_file('mnist.npz',
origin=origin_folder + 'mnist.npz',
file_hash='731c5ac602752760c8e48fbffcf8c3b850d9dc2a2aedcf2cc48468fc17b673d1')

with np.load(path) as f:
# get all of mnist
X_train_tot = f['x_train']
y_train_tot = f['y_train']

X_valid_tot = f['x_test']
y_valid_tot = f['y_test']

# create the shards
shard_num = int(shard_num)
X_train = X_train_tot[shard_num::collaborator_count]
y_train = y_train_tot[shard_num::collaborator_count]

X_valid = X_valid_tot[shard_num::collaborator_count]
y_valid = y_valid_tot[shard_num::collaborator_count]

return (X_train, y_train), (X_valid, y_valid)


def load_mnist_shard(shard_num, collaborator_count, categorical=True,
channels_last=True, **kwargs):
"""
Load the MNIST dataset.
Args:
shard_num (int): The shard to use from the dataset
collaborator_count (int): The number of collaborators in the federation
categorical (bool): True = convert the labels to one-hot encoded
vectors (Default = True)
channels_last (bool): True = The input images have the channels
last (Default = True)
**kwargs: Additional parameters to pass to the function
Returns:
list: The input shape
int: The number of classes
numpy.ndarray: The training data
numpy.ndarray: The training labels
numpy.ndarray: The validation data
numpy.ndarray: The validation labels
"""
img_rows, img_cols = 28, 28
num_classes = 10

(X_train, y_train), (X_valid, y_valid) = _load_raw_datashards(
shard_num, collaborator_count
)

if channels_last:
X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 1)
X_valid = X_valid.reshape(X_valid.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)
else:
X_train = X_train.reshape(X_train.shape[0], 1, img_rows, img_cols)
X_valid = X_valid.reshape(X_valid.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)

X_train = X_train.astype('float32')
X_valid = X_valid.astype('float32')
X_train /= 255
X_valid /= 255

logger.info(f'MNIST > X_train Shape : {X_train.shape}')
logger.info(f'MNIST > y_train Shape : {y_train.shape}')
logger.info(f'MNIST > Train Samples : {X_train.shape[0]}')
logger.info(f'MNIST > Valid Samples : {X_valid.shape[0]}')

if categorical:
# convert class vectors to binary class matrices
y_train = one_hot(y_train, num_classes)
y_valid = one_hot(y_valid, num_classes)

return input_shape, num_classes, X_train, y_train, X_valid, y_valid
@@ -0,0 +1,40 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""You may copy this file as the starting point of your own model."""

from openfl.federated import TensorFlowDataLoader

from .mnist_utils import load_mnist_shard


class TensorFlowMNISTInMemory(TensorFlowDataLoader):
"""TensorFlow Data Loader for MNIST Dataset."""

def __init__(self, data_path, batch_size, **kwargs):
"""
Initialize.
Args:
data_path: File path for the dataset
batch_size (int): The batch size for the data loader
**kwargs: Additional arguments, passed to super init and load_mnist_shard
"""
super().__init__(batch_size, **kwargs)

# TODO: We should be downloading the dataset shard into a directory
# TODO: There needs to be a method to ask how many collaborators and
# what index/rank is this collaborator.
# Then we have a way to automatically shard based on rank and size of
# collaborator list.

_, num_classes, X_train, y_train, X_valid, y_valid = load_mnist_shard(
shard_num=int(data_path), **kwargs
)

self.X_train = X_train
self.y_train = y_train
self.X_valid = X_valid
self.y_valid = y_valid

self.num_classes = num_classes
5 changes: 5 additions & 0 deletions openfl-workspace/keras_cnn_with_compression/plan/cols.yaml
@@ -0,0 +1,5 @@
# Copyright (C) 2020 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

collaborators:

7 changes: 7 additions & 0 deletions openfl-workspace/keras_cnn_with_compression/plan/data.yaml
@@ -0,0 +1,7 @@
# Copyright (C) 2020 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

# collaborator_name,data_directory_path
one,1


2 changes: 2 additions & 0 deletions openfl-workspace/keras_cnn_with_compression/plan/defaults
@@ -0,0 +1,2 @@
../../workspace/plan/defaults

47 changes: 47 additions & 0 deletions openfl-workspace/keras_cnn_with_compression/plan/plan.yaml
@@ -0,0 +1,47 @@
# Copyright (C) 2020 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

aggregator :
defaults : plan/defaults/aggregator.yaml
template : openfl.component.Aggregator
settings :
init_state_path : save/keras_cnn_mnist_init.pbuf
best_state_path : save/keras_cnn_mnist_best.pbuf
last_state_path : save/keras_cnn_mnist_last.pbuf
db_store_rounds: 2
rounds_to_train : 10

collaborator :
defaults : plan/defaults/collaborator.yaml
template : openfl.component.Collaborator
settings :
db_store_rounds: 2
delta_updates : true
opt_treatment : RESET

data_loader :
defaults : plan/defaults/data_loader.yaml
template : code.tfmnist_inmemory.TensorFlowMNISTInMemory
settings :
collaborator_count : 2
data_group_name : mnist
batch_size : 256

task_runner :
defaults : plan/defaults/task_runner.yaml
template : code.keras_cnn.KerasCNN

network :
defaults : plan/defaults/network.yaml

assigner :
defaults : plan/defaults/assigner.yaml

tasks :
defaults : plan/defaults/tasks_keras.yaml

compression_pipeline :
defaults : plan/defaults/compression_pipeline.yaml
template : openfl.pipelines.KCPipeline
settings :
n_clusters : 6
@@ -0,0 +1 @@
tensorflow==2.3.1
3 changes: 3 additions & 0 deletions openfl-workspace/keras_nlp/plan/plan.yaml
Expand Up @@ -42,3 +42,6 @@ assigner :

tasks :
defaults : plan/defaults/tasks_keras.yaml

compression_pipeline :
defaults : plan/defaults/compression_pipeline.yaml
3 changes: 3 additions & 0 deletions openfl-workspace/tf_2dunet/plan/plan.yaml
Expand Up @@ -38,3 +38,6 @@ assigner :

tasks :
defaults : plan/defaults/tasks_tensorflow.yaml

compression_pipeline :
defaults : plan/defaults/compression_pipeline.yaml

0 comments on commit 0412c82

Please sign in to comment.