# 04b - Multiclass Federated Learning with Edge IIoT Dataset using Flower and TensorFlow/Keras

In this notebook we use the Flower Federated Learning library (flower.dev) with Tensorflow/Keras to distribute the Edge-IIoT data across multiple clients in various different ways.

In [40]:
### THIS SECTION NEEDS TO BE SET TO DETERMINE WHICH CONFIGURATION METHOD TO UTILISE

SPLIT_AVAILABLE_METHODS = ['INDIVIDUAL_ATTACK', 'ATTACK_GROUP', 'STRATIFIED']
METHOD = 'STRATIFIED'
NUM_OF_STRATIFIED_CLIENTS = 10 # only applies to stratified method
print (METHOD)

STRATIFIED


In [41]:
%%capture
%pip install flwr[simulation] torch torchvision matplotlib sklearn openml

In [74]:
import os
import pandas as pd
import numpy as np
import flwr as fl
import sklearn
from sklearn import preprocessing
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score

import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F
import torchvision.transforms as transforms
from flwr.common import Metrics
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import CIFAR10


In [43]:
print("flwr", fl.__version__)
print("numpy", np.__version__)
print("torch", torch.__version__)
print("torchvision", torchvision.__version__)

DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Training on {DEVICE}")

flwr 1.3.0
numpy 1.24.2
torch 1.13.1
torchvision 0.14.1
Training on cuda:0


In [44]:
dataset_path = "../datasets/Edge-IIoT/"

df = pd.read_pickle(dataset_path + "Edge-IIoTset dataset/Selected dataset for ML and DL/ML-EdgeIIoT-dataset.pkl")   

## Make dataframe for Multi-class classification

In [45]:
# Multiclass attack dataframe
multiclass_df = df[['frame.time', 'ip.src_host', 'ip.dst_host', 'arp.dst.proto_ipv4',
       'arp.opcode', 'arp.hw.size', 'arp.src.proto_ipv4', 'icmp.checksum',
       'icmp.seq_le', 'icmp.transmit_timestamp', 'icmp.unused',
       'http.file_data', 'http.content_length', 'http.request.uri.query',
       'http.request.method', 'http.referer', 'http.request.full_uri',
       'http.request.version', 'http.response', 'http.tls_port', 'tcp.ack',
       'tcp.ack_raw', 'tcp.checksum', 'tcp.connection.fin',
       'tcp.connection.rst', 'tcp.connection.syn', 'tcp.connection.synack',
       'tcp.dstport', 'tcp.flags', 'tcp.flags.ack', 'tcp.len', 'tcp.options',
       'tcp.payload', 'tcp.seq', 'tcp.srcport', 'udp.port', 'udp.stream',
       'udp.time_delta', 'dns.qry.name', 'dns.qry.name.len', 'dns.qry.qu',
       'dns.qry.type', 'dns.retransmission', 'dns.retransmit_request',
       'dns.retransmit_request_in', 'mqtt.conack.flags',
       'mqtt.conflag.cleansess', 'mqtt.conflags', 'mqtt.hdrflags', 'mqtt.len',
       'mqtt.msg_decoded_as', 'mqtt.msg', 'mqtt.msgtype', 'mqtt.proto_len',
       'mqtt.protoname', 'mqtt.topic', 'mqtt.topic_len', 'mqtt.ver',
       'mbtcp.len', 'mbtcp.trans_id', 'mbtcp.unit_id',
       'Attack_type']]

In [46]:
known_sensor_ip_addresses = [ '192.168.0.101', '192.168.2.194', '192.168.3.18', '192.168.4.73', '192.168.5.47', '192.168.6.56', '192.768.7.62', '192.168.8.163']
print ("known_sensor_ip_addresses:", known_sensor_ip_addresses)

tcp_dos_attack_ip_addresses = [ '207.192.25.133', '94.196.109.185', '133.149.252.77', '220.146.94.148' ]
tdp_dos_atack_ip_addresses = [ '190.123.219.128', '16.226.184.201', '153.125.214.15', '91.184.12.91' ]
http_attack_ip_addresses = [ '192.168.0.170', '216.58.198.74' ]
icmp_flood_attack_ip_addresses = [ '213.117.18.213', '183.223.100.122', '166.153.227.121', '49.81.59.152', '227.117.33.125' ]
port_scan_attack_ip_addresses = [ '192.168.0.170' ]
os_fingerprinting_attack_ip_addresses = [ '192.168.0.170' ]
vuln_scan_attack_ip_addresses = [ '192.168.0.170', '142.250.200.205', '172.217.19.35', '142.250.201.10' ]
dns_spoof_attack_ip_addresses = [ '192.168.0.101', '192.168.0.152', '172.217.19.35', '192.168.0.170' ]
arp_spoof_attack_ip_addresses = [ '192.168.0.101', '192.168.0.152', '172.217.19.35', '192.168.0.170' ]
xss_attack_ip_addresses = [ '192.168.0.170', '172.217.19.42', '104.16.87.20' ]
sql_injection_attack_ip_addresses = [ '192.168.0.170' ]
upload_attack_ip_addresses = [ '192.168.0.170' ]
backdoor_attack_ip_addresses = [ '192.168.0.170' ]
password_attack_ip_addresses = [ '192.168.0.170' ]
ransomware_attack_ip_addresses = [ '192.168.0.170' ] 

# Combine all attack IP addresses into one list, ensuring no duplicates
known_attacker_ip_addresses = list(set(tcp_dos_attack_ip_addresses + tdp_dos_atack_ip_addresses + http_attack_ip_addresses + icmp_flood_attack_ip_addresses + port_scan_attack_ip_addresses + os_fingerprinting_attack_ip_addresses + vuln_scan_attack_ip_addresses + dns_spoof_attack_ip_addresses + arp_spoof_attack_ip_addresses + xss_attack_ip_addresses + sql_injection_attack_ip_addresses + upload_attack_ip_addresses + backdoor_attack_ip_addresses + password_attack_ip_addresses + ransomware_attack_ip_addresses))
print (f"known_attacker_ip_addresses: \nNumber of IPs {len(known_attacker_ip_addresses)}\n{known_attacker_ip_addresses}")


known_sensor_ip_addresses: ['192.168.0.101', '192.168.2.194', '192.168.3.18', '192.168.4.73', '192.168.5.47', '192.168.6.56', '192.768.7.62', '192.168.8.163']
known_attacker_ip_addresses: 
Number of IPs 22
['142.250.200.205', '94.196.109.185', '192.168.0.170', '213.117.18.213', '91.184.12.91', '16.226.184.201', '216.58.198.74', '227.117.33.125', '153.125.214.15', '166.153.227.121', '104.16.87.20', '142.250.201.10', '220.146.94.148', '183.223.100.122', '133.149.252.77', '172.217.19.35', '192.168.0.152', '192.168.0.101', '207.192.25.133', '172.217.19.42', '190.123.219.128', '49.81.59.152']


With the exception of DDOS attacks, the attacks mainly come from a small subset of IP addresses. `192.168.0.170` being responsible for a lot of the attacks. From the data exploration workbook `02b-ML-Data-Exploration.ipynb` we can also see that the attacked IP is always the `192.168.0.128` edge server. This means it is not feasible the divide the traffic either by attacker or attacked IP address.

## Multiclass Classification

Categorical data encoding (Dummy Encoding):

EG. Takes a product category and converts it to a binary vector

In [47]:
def encode_text_dummy(df, name):

    dummies = pd.get_dummies(df[name])

    for x in dummies.columns:

        dummy_name = f"{name}-{x}"

        df[dummy_name] = dummies[x]

    df.drop(name, axis=1, inplace=True)

encode_text_dummy(multiclass_df,'http.request.method')

encode_text_dummy(multiclass_df,'http.referer')

encode_text_dummy(multiclass_df,"http.request.version")

encode_text_dummy(multiclass_df,"dns.qry.name.len")

encode_text_dummy(multiclass_df,"mqtt.conack.flags")

encode_text_dummy(multiclass_df,"mqtt.protoname")

encode_text_dummy(multiclass_df,"mqtt.topic")

In [48]:
# print max index value of binary_df
print("max index value of binary_df:", max(multiclass_df.index))
print(multiclass_df.shape)

max index value of binary_df: 157799
(157800, 90)


We need to drop some unrequired columns from the DF, but we need to keep the original around as we may need to split the data differently for different models based on things like the IP address

In [49]:
#drop_columns = ["frame.time", "ip.src_host", "ip.dst_host", "arp.src.proto_ipv4","arp.dst.proto_ipv4", 
drop_columns = ["frame.time", "arp.src.proto_ipv4","arp.dst.proto_ipv4", 

         "http.file_data","http.request.full_uri","icmp.transmit_timestamp",

         "http.request.uri.query", "tcp.options","tcp.payload","tcp.srcport",

         "tcp.dstport", "udp.port", "mqtt.msg"]

multiclass_df = multiclass_df.drop(drop_columns, axis=1)

multiclass_df = multiclass_df.dropna(axis=0, how='any')

multiclass_df = multiclass_df.drop_duplicates(subset=None, keep="first")

# We cant shuffle at this point as we need to keep the order so we can split the dataset later based on things like IP address
#binary_df_copy = shuffle(binary_df_copy)

# Compute the number of missing values (NaN or null) in each column of a pandas DataFrame object named df.
multiclass_df.isna().sum()

ip.src_host                            0
ip.dst_host                            0
arp.opcode                             0
arp.hw.size                            0
icmp.checksum                          0
                                      ..
mqtt.protoname-0.0                     0
mqtt.protoname-MQTT                    0
mqtt.topic-0                           0
mqtt.topic-0.0                         0
mqtt.topic-Temperature_and_Humidity    0
Length: 77, dtype: int64

In [50]:
# print max index value of binary_df
print("max index value of binary_df:", max(multiclass_df.index))
print(multiclass_df.shape)

multiclass_df.reset_index(drop=True, inplace=True)

# print max index value of binary_df
print("max index value of binary_df:", max(multiclass_df.index))
print(multiclass_df.shape)


max index value of binary_df: 157799
(153225, 77)
max index value of binary_df: 153224
(153225, 77)


Change the Attack Type to be a unique number for the attack type

In [51]:
# Creating a dictionary of Types
attacks = {'Normal': 0 ,'Backdoor' :1, 'DDoS_HTTP':2,  'DDoS_ICMP':3, 'DDoS_TCP':4, 'DDoS_UDP':5, 
           'Fingerprinting':6, 'MITM':7, 'Password':8, 'Port_Scanning':9, 'Ransomware':10, 
           'SQL_injection':11, 'Uploading':12, 'Vulnerability_scanner':13, 'XSS':14}

multiclass_df['Attack_type'] = multiclass_df['Attack_type'].map(attacks)


In [52]:
label = multiclass_df['Attack_type']
le = preprocessing.LabelEncoder()
label_n = le.fit_transform(label.values)

# Stratify based on the attack label to balance the dataset - This is our original copy of the data include IP addresses
X_train_df, X_test_df, y_train_df, y_test_df = train_test_split(multiclass_df, label_n, stratify=label_n, test_size=0.2, random_state=42)

print(X_train_df.shape)

X_train_df.reset_index(drop=True, inplace=True)

#print the max index of X_train_df
print(X_train_df.index.max())

#  print the max index of multiclass_df
print(multiclass_df.index.max())

(122580, 77)
122579
153224


In [60]:
print(label_n)

[7 7 7 ... 3 3 3]


In [62]:
multiclass_df_copy = multiclass_df.copy()

multiclass_df_copy = multiclass_df_copy.drop(["ip.src_host", "ip.dst_host", "Attack_type"], axis=1)

# This is our copy of the data without IP addresses
scaled_features = StandardScaler().fit_transform(multiclass_df_copy.values)
X_train, X_test, y_train, y_test = train_test_split(scaled_features, label_n, stratify=label_n, test_size=0.2, random_state=42)

print ("Train:", X_train.shape, y_train.shape)
print ("Test:", X_test.shape, y_test.shape)


Train: (122580, 74) (122580,)
Test: (30645, 74) (30645,)


In [63]:
fl_X_train = []
fl_y_train = []

if METHOD == 'STRATIFIED':
    # Stratfiy the dataset
    from sklearn.model_selection import StratifiedKFold

    skf = StratifiedKFold(n_splits=NUM_OF_STRATIFIED_CLIENTS, shuffle=True, random_state=42)
    skf.get_n_splits(X_train, y_train)

    for _, train_index in skf.split(X_train, y_train):
        print("TRAIN:", train_index)
        X_np = X_train[train_index]
        y_np = y_train[train_index]

        fl_X_train.append(X_np)
        fl_y_train.append(y_np)

else: # UNUSED
    # Individual IP address
    for ip in known_sensor_ip_addresses:
        new_ip = [ip]
        print("new_ip:", new_ip)

        X_train_df['ip.src_host']
        
        print("Shape X_Train:", X_train.shape)
        print("Shape y_Train:", y_train.shape)

        # Filter dataframe by IP address
        new_df_src = X_train_df[ X_train_df['ip.src_host'].isin(new_ip) ]
        new_df_dst = X_train_df[ X_train_df['ip.dst_host'].isin(new_ip) ]

        print("Shape new_df_src:", new_df_src.shape)
        print("Shape new_df_dst:", new_df_dst.shape)

        X_np = np.vstack([ X_train[ new_df_src.index, : ], X_train[ new_df_dst.index, :] ])
        y_np = np.hstack([ y_train[ new_df_src.index ], y_train[ new_df_dst.index ] ])

        print ("x_np:", X_np.shape)
        print ("y_np:", y_np.shape)

        fl_X_train.append(X_np)
        fl_y_train.append(y_np)


TRAIN: [     0     19     25 ... 122549 122557 122567]
TRAIN: [     2     13     21 ... 122544 122568 122573]
TRAIN: [     6     12     20 ... 122574 122576 122577]
TRAIN: [    35     56     73 ... 122554 122565 122570]
TRAIN: [     3      4      5 ... 122536 122572 122579]
TRAIN: [     7     11     14 ... 122533 122563 122575]
TRAIN: [    10     50     61 ... 122561 122564 122571]
TRAIN: [     9     18     24 ... 122550 122555 122560]
TRAIN: [     1     15     26 ... 122547 122562 122569]
TRAIN: [    17     29     32 ... 122559 122566 122578]


In [64]:
# Print out the size of the fl_X_train
print ("fl_X_train size:", len(fl_X_train))

# Print out the size of each element in the fl_X_train
for i in range(len(fl_X_train)):
    print ("fl_X_train[", i, "]:", fl_X_train[i].shape)
    print ("fl_y_train[", i, "]:", fl_y_train[i].shape)


fl_X_train size: 10
fl_X_train[ 0 ]: (12258, 74)
fl_y_train[ 0 ]: (12258,)
fl_X_train[ 1 ]: (12258, 74)
fl_y_train[ 1 ]: (12258,)
fl_X_train[ 2 ]: (12258, 74)
fl_y_train[ 2 ]: (12258,)
fl_X_train[ 3 ]: (12258, 74)
fl_y_train[ 3 ]: (12258,)
fl_X_train[ 4 ]: (12258, 74)
fl_y_train[ 4 ]: (12258,)
fl_X_train[ 5 ]: (12258, 74)
fl_y_train[ 5 ]: (12258,)
fl_X_train[ 6 ]: (12258, 74)
fl_y_train[ 6 ]: (12258,)
fl_X_train[ 7 ]: (12258, 74)
fl_y_train[ 7 ]: (12258,)
fl_X_train[ 8 ]: (12258, 74)
fl_y_train[ 8 ]: (12258,)
fl_X_train[ 9 ]: (12258, 74)
fl_y_train[ 9 ]: (12258,)


In [65]:
len(label.unique()) 

15

In [70]:
NUM_OF_CLIENTS = len(fl_X_train)
print("NUM_OF_CLIENTS:", NUM_OF_CLIENTS)    

NUM_OF_ROUNDS = 15

print("Checking data split groups")
for i in range(len(fl_X_train)):
    print(i, ":", "X Shape", fl_X_train[i].shape, "Y Shape", fl_y_train[i].shape)

print("\nDeploy Simulation")


NUM_OF_CLIENTS: 10
Checking data split groups
0 : X Shape (12258, 74) Y Shape (12258,)
1 : X Shape (12258, 74) Y Shape (12258,)
2 : X Shape (12258, 74) Y Shape (12258,)
3 : X Shape (12258, 74) Y Shape (12258,)
4 : X Shape (12258, 74) Y Shape (12258,)
5 : X Shape (12258, 74) Y Shape (12258,)
6 : X Shape (12258, 74) Y Shape (12258,)
7 : X Shape (12258, 74) Y Shape (12258,)
8 : X Shape (12258, 74) Y Shape (12258,)
9 : X Shape (12258, 74) Y Shape (12258,)

Deploy Simulation


# FL Part

NOTE TO SELF - BUILD IN F1 SCORE  - https://www.kaggle.com/code/gpiosenka/flower-classification-f1-score-93


In [83]:
import os
import flwr as fl
import numpy as np
import tensorflow as tf

from trans import *

print('scikit-learn {}.'.format(sklearn.__version__))
print("flwr", fl.__version__)
print("numpy", np.__version__)
print("tf", tf.__version__)
# Make TensorFlow log less verbose
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Activation

class NumpyFlowerClient(fl.client.NumPyClient):
    def __init__(self, cid, model, train_data, train_labels):
        self.model = model
        self.cid = cid
        self.train_data = train_data
        self.train_labels = train_labels

    def get_parameters(self, config):
        return self.model.get_weights()

    def fit(self, parameters, config):
        self.model.set_weights(parameters)
        print ("Client ", self.cid, "Training...")
        self.model.fit(self.train_data, self.train_labels, epochs=1, batch_size=32)
        print ("Client ", self.cid, "Training complete...")
        return self.model.get_weights(), len(self.train_data), {}

    def evaluate(self, parameters, config):
        self.model.set_weights(parameters)
        print ("Client ", self.cid, "Evaluating...")
        loss, accuracy = self.model.evaluate(self.train_data, self.train_labels, batch_size=32)
        print ("Client ", self.cid, "Evaluating complete...", accuracy, loss)
        return loss, len(self.train_data), {"accuracy": accuracy}
    
    def predict(self, incoming):
        prediction = np.argmax( self.model.predict(incoming) ,axis=1)
        return prediction

def client_fn(cid: str) -> NumpyFlowerClient:
    """Create a Flower client representing a single organization."""

    # Load model
    #model = tf.keras.applications.MobileNetV2((32, 32, 3), classes=10, weights=None)
    #model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])

    print ("Client ID:", cid)

    model = Sequential([
      #Flatten(input_shape=(79,1)),
      Flatten(input_shape=(fl_X_train[0].shape[1] , 1)),
      Dense(256, activation='sigmoid'),
      Dense(128, activation='sigmoid'), 
      #Dense(18, activation='sigmoid'),  
      Dense(len(label.unique()), activation='sigmoid')
    ])
    
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

   
    partition_id = int(cid)
    X_train_c = fl_X_train[partition_id]
    y_train_c = fl_y_train[partition_id]

    # Create a  single Flower client representing a single organization
    return NumpyFlowerClient(cid, model, X_train_c, y_train_c)


print ("Deploy simulation...")

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
eval_count = 0

def get_evaluate_fn(server_model):
    global eval_count
    """Return an evaluation function for server-side evaluation."""
    # The `evaluate` function will be called after every round
    
    
    def evaluate(server_round, parameters, config):
        global eval_count
        
        # Update model with the latest parameters
        server_model.set_weights(parameters)
        print ("Server Evaluating...", eval_count)
        loss, accuracy = server_model.evaluate(X_test, y_test)
        
        y_pred = server_model.predict(X_test)
        print ("Prediction: ", y_pred, y_pred.shape)
        #cmatrix = confusion_matrix(y_test, np.rint(y_pred))
        #print ("confusion_matrix:", cmatrix, cmatrix.shape)
                        
        print ("Server Evaluating complete...", accuracy, loss)
        
        np.save("y_pred-" + str(eval_count) + ".npy", y_pred)
        #np.save("cmatrix-" + str(eval_count) + ".npy", cmatrix)
        eval_count = eval_count + 1
        
        return loss, {"accuracy": accuracy}
    return evaluate



server_model = Sequential([
      #Flatten(input_shape=(79,1)),
      Flatten(input_shape=(fl_X_train[0].shape[1] , 1)),
      Dense(256, activation='sigmoid'),
      Dense(128, activation='sigmoid'), 
      #Dense(18, activation='sigmoid'),  
      Dense(len(label.unique()), activation='sigmoid')
    ])
server_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])



# Create FedAvg strategy
strategy = fl.server.strategy.FedAvg(
        fraction_fit=1.0,
        fraction_evaluate=0.5,
        min_fit_clients=2, #10,
        min_evaluate_clients=2, #5,
        min_available_clients=2, #10,
        evaluate_fn=get_evaluate_fn(server_model),
        #evaluate_metrics_aggregation_fn=weighted_average,
)

# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_OF_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=NUM_OF_ROUNDS),
    strategy=strategy,
)

INFO flwr 2023-07-06 14:06:24,844 | app.py:145 | Starting Flower simulation, config: ServerConfig(num_rounds=15, round_timeout=None)


scikit-learn 1.2.0.
flwr 1.3.0
numpy 1.24.2
tf 2.11.0
Deploy simulation...


2023-07-06 14:06:32,309	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8266 [39m[22m
INFO flwr 2023-07-06 14:06:35,594 | app.py:179 | Flower VCE: Ray initialized with resources: {'CPU': 24.0, 'GPU': 1.0, 'object_store_memory': 3460152115.0, 'node:127.0.0.1': 1.0, 'memory': 6920304231.0}
INFO flwr 2023-07-06 14:06:35,595 | server.py:86 | Initializing global parameters
INFO flwr 2023-07-06 14:06:35,595 | server.py:270 | Requesting initial parameters from one random client


[2m[36m(launch_and_get_parameters pid=49848)[0m Client ID: 2


INFO flwr 2023-07-06 14:06:41,480 | server.py:274 | Received initial parameters from one random client
INFO flwr 2023-07-06 14:06:41,481 | server.py:88 | Evaluating initial parameters


ValueError: You called `set_weights(weights)` on layer "sequential_7" with a weight list of length 262, but the layer was expecting 6 weights. Provided weights: [array([[[[-0.08516684, -0.04290653, -0.05079073, ...