# Federated learning: Video classification with Keras
The present experiment tackles the **federated video classification** using Sherpa Federated Learning (FL) Framework with Keras ([this tutorial](https://www.pyimagesearch.com/2019/07/15/video-classification-with-keras-and-deep-learning/) has been used as reference).

## Dataset 
If not done already, we first need to download the dataset. 
It is a collection of labeled images of 12 different sports. 
For simplicity and for saving computational resources, we decide to only train for three sports, e.g. `tennis`, `football` and `weight_lifting`. However, more categories can be employed by modifying the `LABELS` object below. 
We also set the input parameters such as paths where to save and load the objects, and the number of federated learning rounds to run. Moreover, we can set the boolean parameter `train_network` to either train the federated neural network, or use a pre-trained model:

In [1]:
import shfl
import os
import subprocess

# Set the input parameters for the experiment:
args = {"data_path":"../data/video_classification/sports", 
        "output_path": "../data/video_classification/output",
        "model_name":"activity.model", 
        "label_bin": "lb.pickle", 
        "federated_rounds":3,
        "size_averaging": 1, 
        "train_network": True}

LABELS = set(["weight_lifting", "tennis", "football"])
print("[INFO] training for labels: " + str(LABELS))

if not os.path.exists(args["data_path"]):
    print("[INFO] creating data folders and cloning dataset repo ...")
    subprocess.run(["git", "clone",  "https://github.com/jurjsorinliviu/Sports-Type-Classifier", args["data_path"]])
if not os.path.exists(args["output_path"]):
    os.mkdir(args["output_path"])
    
import matplotlib
matplotlib.use("Agg")
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from imutils import paths
import matplotlib.pyplot as plt
import numpy as np
import argparse
import pickle
import cv2

print("[INFO] loading images...")
imagePaths = list(paths.list_images(os.path.join(args["data_path"], "data/")))
data = []
labels = []

# loop over the image paths
for imagePath in imagePaths:
    # extract the class label from the filename
    label = imagePath.split(os.path.sep)[-2]

    # if the label of the current image is not part of of the labels
    # are interested in, then ignore the image
    if label not in LABELS:
        continue

    # load the image, convert it to RGB channel ordering, and resize
    # it to be a fixed 224x224 pixels, ignoring aspect ratio
    image = cv2.imread(imagePath)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (224, 224))

    # update the data and labels lists, respectively
    data.append(image)
    labels.append(label)

# convert the data and labels to NumPy arrays
data = np.array(data)
labels = np.array(labels)

# perform one-hot encoding on the labels
lb = LabelBinarizer()
labels = lb.fit_transform(labels)

[INFO] training for labels: {'football', 'tennis', 'weight_lifting'}
[INFO] creating data folders and cloning dataset repo ...
[INFO] loading images...


Once the dataset is created, it is encapsulated in a Sherpa.FL dataset, and the train set is easily IID over the network of nodes:

In [2]:
# Convert to a labeled database for shfl:
database = shfl.data_base.LabeledDatabase(data, labels)
train_data, train_labels, test_data, test_labels = database.load_data()

print("[INFO] Number of train images: " + str(len(train_data)))
print("[INFO] Number of test images: " + str(len(test_data)))

# Distribute data over the federated network of nodes:
print("[INFO] Distributing the train set across the nodes...")
iid_distribution = shfl.data_distribution.IidDataDistribution(database)
federated_data, test_data, test_label = iid_distribution.get_federated_data(num_nodes=5, percent=100)

[INFO] Number of train images: 1656
[INFO] Number of test images: 414
[INFO] Distributing the train set across the nodes...


  federated_data = np.array(federated_data)
  federated_label = np.array(federated_label)


## Training the network
At this point, we can create our Keras model. 
We start from a pre-trained network, and we will only train the last layers (fine-tuning).
An augmentation of the train set will be employed. 
We define the learning model class and a model builder:

In [3]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import AveragePooling2D
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.models import load_model

# initialize the training data augmentation object
trainAug = ImageDataGenerator(
    rotation_range=30,
    zoom_range=0.15,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.15,
    horizontal_flip=True,
    fill_mode="nearest")

# initialize the validation/testing data augmentation object (which
# we'll be adding mean subtraction to)
valAug = ImageDataGenerator()

# define the ImageNet mean subtraction (in RGB order) and set the
# the mean subtraction value for each of the data augmentation
# objects
mean = np.array([123.68, 116.779, 103.939], dtype="float32")
trainAug.mean = mean
valAug.mean = mean


class DeepLearningModelAug(shfl.model.DeepLearningModel):
    def train(self, data, labels):
        self._check_data(data)
        self._check_labels(labels)

        #early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=0, mode='min')
        self._model.fit(x=trainAug.flow(data, labels, batch_size=32),
            steps_per_epoch=len(data) // 32,
            validation_data=valAug.flow(test_data, test_labels),
            validation_steps=len(test_data) // 32,
            epochs=self._epochs)

def model_builder():
    
    # load the ResNet-50 network, where head FC layer sets are left off
    baseModel = ResNet50(weights="imagenet", include_top=False,
        input_tensor=Input(shape=(224, 224, 3)))

    # construct the head of the model that will be placed on top of the
    # the base model
    headModel = baseModel.output
    headModel = AveragePooling2D(pool_size=(7, 7))(headModel)
    headModel = Flatten(name="flatten")(headModel)
    headModel = Dense(512, activation="relu")(headModel)
    headModel = Dropout(0.5)(headModel)
    headModel = Dense(len(lb.classes_), activation="softmax")(headModel)

    # place the head FC model on top of the base model (this will become
    # the actual model we will train)
    model = Model(inputs=baseModel.input, outputs=headModel)

    # freeze pre-trained base model layers so they will
    # *not* be updated during the training process
    for layer in baseModel.layers:
        layer.trainable = False

    opt = SGD(lr=1e-4, momentum=0.9, decay=1e-4 / args["federated_rounds"])
    model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"])

    return DeepLearningModelAug(model, epochs=epochs_per_FL_round)

Now we're ready to run the federated learning experiment. 
We choose the global model to be the average of the locally trained models by setting the aggregator to `FedAvgAggregator()`, we create the federated government and we run the FL rounds. 
Each client will train a local model using its own private data. Only a single epoch is performed (`epochs_per_FL_round=1`), then the local models are aggregated to form the global model, thus completing an FL round (note that this step is computationally demanding and could require several minutes depending on the machine used):

In [4]:
# Train the network:
epochs_per_FL_round=1
aggregator = shfl.federated_aggregator.FedAvgAggregator()
federated_government = shfl.federated_government.FederatedGovernment(model_builder, federated_data, aggregator)
if args["train_network"]:
    federated_government.run_rounds(args["federated_rounds"], test_data, test_label)
else:
    print("[INFO] loading pre-computed model ...")
    model_path = os.path.join(args["output_path"], args["model_name"])
    federated_government.global_model._model = load_model(model_path)
    lb = pickle.loads(open(os.path.join(args["output_path"], args["label_bin"]), "rb").read())
    
# evaluate the federated network
print("[INFO] evaluating federated network...")
predictions = federated_government.global_model.predict(data=test_data.astype("float32"))
print(classification_report(test_labels.argmax(axis=1),
    predictions, target_names=lb.classes_))

Accuracy round 0
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0e7358>: [0.6679731011390686, 0.7149758338928223]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0e7278>: [0.6620099544525146, 0.729468584060669]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0e72e8>: [0.584438145160675, 0.8140096664428711]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0e7668>: [0.874295711517334, 0.5410627722740173]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0e7748>: [0.603131890296936, 0.7657004594802856]


  clients_params_array = np.array(clients_params)
  aggregated_weights = np.array([np.mean(clients_params_array[:, layer], axis=0) for layer in range(shape)])


Global model test performance : [0.6145201325416565, 0.7415459156036377]



Accuracy round 1
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0e7358>: [0.40514668822288513, 0.8574879169464111]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0e7278>: [0.4501892924308777, 0.8285024166107178]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0e72e8>: [0.5425381660461426, 0.748792290687561]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0e7668>: [0.433407723903656, 0.8550724387168884]
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0e7748>: [0.5610058307647705, 0.7536231875419617]
Global model test performance : [0.4516211748123169, 0.8429951667785645]



Accuracy round 2
Test performance client <shfl.private.federated_operation.FederatedDataNode object at 0x7f559c0

We evaluate the performance of the trained network using the test set. 
The federated model is compared with the centralized model, where, for a fair comparison, the latter is trained with the same number of epochs as the FL rounds (the training of the centralized model is faster compared to the federated case, since only one single model is used). 
It can be observed that the two models exhibit a similar performance, allowing us to conclude that the FL approach can produce a high accuracy model while at the same time preserving data privacy of the clients.

In [5]:
# Comparison with Centralized experiment: 
if args["train_network"]:
    epochs_per_FL_round=args["federated_rounds"]
    centralized_model = model_builder()
    centralized_model.train(train_data, train_labels)

    # evaluate the centralized network
    print("[INFO] evaluating centralized network...")
    predictions_centralized = centralized_model.predict(data=test_data.astype("float32"))
    print(classification_report(test_labels.argmax(axis=1),
        predictions_centralized, target_names=lb.classes_))

Epoch 1/3
Epoch 2/3
Epoch 3/3
[INFO] evaluating centralized network...
                precision    recall  f1-score   support

      football       0.84      0.87      0.85       155
        tennis       0.79      0.83      0.81       134
weight_lifting       0.86      0.78      0.82       125

      accuracy                           0.83       414
     macro avg       0.83      0.83      0.83       414
  weighted avg       0.83      0.83      0.83       414



The training of the federated model can be time-consuming since, although using less data, each client needs to train a local model. 
We can serialize the trained model to disk for later use in the classification of videos:

In [9]:
# serialize the model to disk
if args["train_network"]:
    model_path = os.path.join(args["output_path"], args["model_name"])
    federated_government.global_model._model.save(model_path, save_format="h5")
    f = open(os.path.join(args["output_path"], args["label_bin"]), "wb")
    f.write(pickle.dumps(lb))
    f.close()

## Prediction: classification of videos
We can test the correct classification of some sample videos. 
Three videos are used for tennis, weight lifting, and football. 
In the present approach, each frame is classified on its own. 
We take advantage of the temporal characteristic of the sequence by assigning the label with respect the average classification of the previous `size_averaging` number of frames. 
If `size_averaging=1`, no average is performed, resulting in the naive approach of classifying each frame of the video on its own. 
This might cause some class-flickering, as the user is invited to experiment by changing the `size_averaging` parameter:

In [8]:
# Predict video
from collections import deque
from scipy import stats
import errno

args["input_video"] = "../data/video_classification/example_clips/tennis.mp4"
args["output_video"] = os.path.join(args["output_path"], "tennis_classified.avi")
args["size_averaging"] = 100

model = federated_government.global_model._model

# initialize the image mean for mean subtraction along with the
# predictions queue
mean = np.array([123.68, 116.779, 103.939][::1], dtype="float32")
Q = deque(maxlen=args["size_averaging"])

# initialize the video stream, pointer to output video file, and
# frame dimensions
vs = cv2.VideoCapture(args["input_video"])
writer = None
(W, H) = (None, None)

if not os.path.isfile(args["input_video"]):
    raise FileNotFoundError(
        errno.ENOENT, os.strerror(errno.ENOENT), args["input_video"])

# loop over frames from the video file stream
print("[INFO] predicting video ..." )
while True:
    # read the next frame from the file
    (grabbed, frame) = vs.read()

    # if the frame was not grabbed, then we have reached the end
    # of the stream
    if not grabbed:
        break

    # if the frame dimensions are empty, grab them
    if W is None or H is None:
        (H, W) = frame.shape[:2]

    # clone the output frame, then convert it from BGR to RGB
    # ordering, resize the frame to a fixed 224x224, and then
    # perform mean subtraction
    output = frame.copy()
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame = cv2.resize(frame, (224, 224)).astype("float32")
    frame -= mean

    # make predictions on the frame and then update the predictions
    # queue
    preds = model.predict(np.expand_dims(frame, axis=0))
    Q.append(preds)

    # perform prediction averaging over the current history of
    # previous predictions
    results = np.array(Q).mean(axis=0)
    i = np.argmax(results)
    label = lb.classes_[i]

    # draw the activity on the output frame
    text = "activity: {}".format(label)
    cv2.putText(output, text, (35, 50), cv2.FONT_HERSHEY_SIMPLEX,
        1.25, (0, 255, 0), 5)

    # check if the video writer is None
    if writer is None:
        # initialize our video writer
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter(args["output_video"], fourcc, 30,
            (W, H), True)

    # write the output frame to disk
    writer.write(output)


# release the file pointers
print("[INFO] cleaning up...")
writer.release()
vs.release()

[INFO] predicting video ...
[INFO] cleaning up...


## Concluding remarks
The goal of the present notebook is to train a neural network for video classification in the **federated context** using Sherpa FL framework. 
It is shown that the procedure is straightforward using the framework classes.
In this experiment, the network is trained on images (not videos), thus not taking into account any spatial-temporal information. 
More advanced approaches can be used when training over videos (see e.g. [Karpathy et al., 2014](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/42455.pdf) and [Simonyan and Zisserman, 2014](https://arxiv.org/pdf/1406.2199.pdf)).
However, the only difference with respect what exposed in the present notebook would consist in setting up and training the local model, while the straightforward creation of the federated experiment with Sherpa FL framework would be exactly the same.