In [23]:
## Used Code From https://www.kaggle.com/code/timgoodfellow/nsl-kdd-explorations to get NSL-KDD setup

In [24]:
import ipyparallel as ipp
n = 4
rc = ipp.Cluster(engines="mpi", n=n).start_and_connect_sync()
view = rc[:]
rc.ids

Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>


  0%|          | 0/4 [00:00<?, ?engine/s]

[0, 1, 2, 3]

In [25]:
%%px --block
import pandas as pd
import tensorflow as tf
import numpy as np
from mpi4py import MPI
np.random.seed(482)

mpi = MPI.COMM_WORLD
bcast = mpi.bcast
barrier = mpi.barrier
rank = mpi.rank
size = mpi.size
print("MPI rank: %i/%i" % (mpi.rank, mpi.size))

[stdout:0] MPI rank: 0/4


[stdout:2] MPI rank: 2/4


[stdout:3] MPI rank: 3/4


[stdout:1] MPI rank: 1/4


## Helper Functions

In [26]:
%%px --block
# Implement Custom Loss Function
@tf.function
def consensus_loss(y_true, y_pred, z, l2):

    # local error
    local_loss = tf.keras.losses.BinaryCrossentropy()(y_true, y_pred)
    # local_error = y_true - y_pred
    # local_square_error = tf.square(local_error)
    # local_mse = tf.reduce_mean(local_square_error)

    # consensus loss error
    consensus_loss = l2*tf.keras.losses.BinaryCrossentropy()(z, y_pred)
    # consensus_error = z - y_pred
    # consensus_square_error = tf.square(consensus_error)
    # consensus_mse = l2*tf.reduce_sum(consensus_square_error)

    return local_loss + consensus_loss

In [27]:
 %%px --block
# Play around with this more
def set_learning_rate1(optimizer, epoch):
    if epoch <= 30:
        optimizer.lr = 0.0025
    if 30 < epoch <= 100:
        optimizer.lr = 0.0015
    elif 100 < epoch <= 200:
        optimizer.lr = 0.001
    elif 200 < epoch <= 300:
        optimizer.lr = 0.0005
    elif 300 < epoch <= 400:
        optimizer.lr = 0.00045
    elif 400 < epoch <= 450:
        optimizer.lr = 0.00005
    else:
        optimizer.lr = 0.00001

In [28]:
 %%px --block
# Play around with this more
def set_learning_rate2(optimizer, epoch):
    if epoch >= 1:
        optimizer.lr = optimizer.lr * tf.math.exp(-0.1)

In [29]:
%%px --block
def data_pre_process(rank, size, coordination_size, coord_bs, train_bs):

    def map_attack(attack):

        # attack classifications
        dos_attacks = ['apache2','back','land','neptune','mailbomb','pod','processtable',
                       'smurf','teardrop','udpstorm','worm']
        probe_attacks = ['ipsweep','mscan','nmap','portsweep','saint','satan']
        privilege_attacks = ['buffer_overflow','loadmdoule','perl','ps','rootkit','sqlattack','xterm']
        access_attacks = ['ftp_write','guess_passwd','http_tunnel','imap','multihop',
                          'named','phf','sendmail','snmpgetattack','snmpguess','spy',
                          'warezclient','warezmaster','xclock','xsnoop']

        if attack in dos_attacks:
            # dos_attacks map to 1
            attack_type = 1
        elif attack in probe_attacks:
            # probe_attacks mapt to 2
            attack_type = 2
        elif attack in privilege_attacks:
            # privilege escalation attacks map to 3
            attack_type = 3
        elif attack in access_attacks:
            # remote access attacks map to 4
            attack_type = 4
        else:
            # normal maps to 0
            attack_type = 0
        return attack_type

    # load data with column names
    colnames = ['duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes', 'land',
            'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in', 'num_compromised',
            'root_shell', 'su_attempted', 'num_root', 'num_file_creations', 'num_shells', 'num_access_files',
            'num_outbound_cmds', 'is_host_login', 'is_guest_login', 'count', 'srv_count', 'serror_rate',
            'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate',
            'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count', 'dst_host_same_srv_rate',
            'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate',
            'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate',
            'dst_host_srv_rerror_rate', 'attack_type', 'level']
    train_df = pd.read_csv("Data/KDDTrain+.txt", names=colnames, sep=",")
    test_df = pd.read_csv("Data/KDDTest+.txt", names=colnames, sep=",")

    # make all floats 32 (for tensorflow memory purposes)
    train_df[train_df.select_dtypes(np.float64).columns] = train_df.select_dtypes(np.float64).astype(np.float32)
    test_df[test_df.select_dtypes(np.float64).columns] = test_df.select_dtypes(np.float64).astype(np.float32)

    # Create an attack flag: map normal behavior to 0, all attacks to 1
    is_attack_train = train_df.attack_type.map(lambda a: 0 if a == 'normal' else 1)
    is_attack_test = test_df.attack_type.map(lambda a: 0 if a == 'normal' else 1)
    train_df['attack_flag'] = is_attack_train
    test_df['attack_flag'] = is_attack_test

    attack_labels = ['Normal','DoS','Probe','Privilege','Access']

    # map the data and join to the data set
    attack_map = train_df.attack_type.apply(map_attack)
    train_df['attack_map'] = attack_map

    test_attack_map = test_df.attack_type.apply(map_attack)
    test_df['attack_map'] = test_attack_map

    onehot_features = ['protocol_type', 'service', 'flag']
    other_features = ['duration', 'src_bytes', 'dst_bytes', 'land',
                'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in', 'num_compromised',
                'root_shell', 'su_attempted', 'num_root', 'num_file_creations', 'num_shells', 'num_access_files',
                'num_outbound_cmds', 'is_host_login', 'is_guest_login', 'count', 'srv_count', 'serror_rate',
                'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate',
                'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count', 'dst_host_same_srv_rate',
                'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate',
                'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate',
                'dst_host_srv_rerror_rate', 'level']
    train_onehot = pd.get_dummies(train_df[onehot_features])
    test_onehot = pd.get_dummies(test_df[onehot_features])

    test_index = np.arange(len(test_df.index))
    column_diffs = list(set(train_onehot.columns.values)-set(test_onehot.columns.values))
    diff_df = pd.DataFrame(0, index=test_index, columns=column_diffs)
    column_order = train_onehot.columns.to_list()
    test_temp = test_onehot.join(diff_df)
    test_final = test_temp[column_order].fillna(0)

    test_set = test_final.join(test_df[other_features])
    train_set = train_onehot.join(train_df[other_features])

    train_set_norm = train_set.apply(lambda x: (x - x.min()) / (x.max() - x.min()))
    test_set_norm = test_set.apply(lambda x: (x - x.min()) / (x.max() - x.min()))
    train_set_norm = train_set_norm.fillna(0)
    test_set_norm = test_set_norm.fillna(0)
    num_inputs = len(train_set.columns.to_list())
    num_outputs_multi = len(attack_labels)

    # split train into a coordination set
    coord_set = train_set_norm.iloc[:coordination_size,:]
    whole_train_set = train_set_norm.iloc[coordination_size:,:]

    # test labels
    test_binary_y = tf.convert_to_tensor(test_df['attack_flag'])
    test_multi_y = tf.convert_to_tensor(test_df['attack_map'].to_list())

    # Split training data amongst workers
    worker_train_set_norm = np.array_split(whole_train_set, size)[rank]

    # create our train labels
    train_attack_map_list_binary = train_df['attack_flag'].to_list()
    train_attack_map_list_multi = train_df['attack_map'].to_list()

    # split coordination set labels
    coordination_binary_y = tf.convert_to_tensor(train_attack_map_list_binary[:coordination_size])
    coordination_multi_y = tf.convert_to_tensor(train_attack_map_list_multi[:coordination_size])
    whole_train_label_b = train_attack_map_list_binary[coordination_size:]
    whole_train_label_m = train_attack_map_list_multi[coordination_size:]

    worker_train_label_binary = np.array_split(whole_train_label_b, size)[rank]
    worker_train_label_multi = np.array_split(whole_train_label_m, size)[rank]
    train_binary_y = tf.convert_to_tensor(worker_train_label_binary)
    train_multi_y = tf.convert_to_tensor(worker_train_label_multi)

    # make data into tensors
    train_set = tf.convert_to_tensor(worker_train_set_norm)
    test_set = tf.convert_to_tensor(test_set_norm)
    coordination_set = tf.convert_to_tensor(coord_set)

    # create tensorflow dataset for test and train
    training_binary = tf.data.Dataset.from_tensor_slices((train_set, train_binary_y)).batch(train_bs)
    training_multi = tf.data.Dataset.from_tensor_slices((train_set, train_multi_y)).batch(train_bs)
    test_binary = tf.data.Dataset.from_tensor_slices((test_set, test_binary_y)).batch(train_bs)
    test_multi = tf.data.Dataset.from_tensor_slices((test_set, test_multi_y)).batch(train_bs)
    coordination_binary = tf.data.Dataset.from_tensor_slices((coordination_set, coordination_binary_y)).batch(coord_bs)
    coordination_multi = tf.data.Dataset.from_tensor_slices((coordination_set, coordination_multi_y)).batch(coord_bs)

    return training_binary, test_binary, coordination_binary, training_multi, test_multi, coordination_multi, num_inputs,  num_outputs_multi

In [30]:
%%px --block
def train(model, rank, size, lossF, optimizer, train_dataset, coordination_dataset, epochs, coord_batch_size, batches, isBinary):

    if isBinary:
        loss_metric = tf.keras.metrics.BinaryAccuracy()
    else:
        loss_metric = tf.keras.metrics.SparseCategoricalAccuracy()

    for epoch in range(epochs):

        # Adjust learning rate
        set_learning_rate2(optimizer, epoch)

        # Local Training
        for batch_idx, (data, target) in enumerate(train_dataset):
            with tf.GradientTape() as tape:
                y_p = model(data, training=True)
                loss_val = lossF(y_true=target, y_pred=y_p)
            grads = tape.gradient(loss_val, model.trainable_weights)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            loss_metric.update_state(target, y_p)

        # Forward Pass of Coordination Set
        send_predicted = np.zeros((coord_batch_size, batches), dtype=np.float32)
        recv_avg_pred = np.zeros((coord_batch_size, batches), dtype=np.float32)
        for c_batch_idx, (c_data, c_target) in enumerate(coordination_dataset):
            pred = model(c_data, training=True)
            send_predicted[:, c_batch_idx] = pred.numpy().flatten()

        # Communication Process Here
        MPI.COMM_WORLD.Allreduce(send_predicted, recv_avg_pred, op=MPI.SUM)
        recv_avg_pred = recv_avg_pred/size

        # Consensus Training
        for c_batch_idx, (c_data, c_target) in enumerate(coordination_dataset):
            with tf.GradientTape() as tape:
                c_yp = model(c_data, training=True)
                loss_val = consensus_loss(y_true=c_target, y_pred=c_yp,
                                           z=recv_avg_pred[:, c_batch_idx].reshape(coord_batch_size, 1),
                                           l2=0.1)
            grads = tape.gradient(loss_val, model.trainable_weights)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

            # loss_metric.update_state(c_target, c_yp)
        print('(Rank %d) Training Loss for Epoch %d: %0.4f' % (rank, epoch, loss_metric.result()))
        loss_metric.reset_states()

## Run Experiments

In [31]:
%%px --block
isBinary = True
epochs = 10
train_bs = 64
coord_bs = 32
coord_examples = 32*coord_bs
coord_num_batches = int(np.ceil(coord_examples/coord_bs))
train_data_b, test_data_b, coord_data_b, train_data_m, test_data_m, coord_data_m, num_inputs, num_outputs_multi =  data_pre_process(rank, size, coord_examples, coord_bs, train_bs)

%px:   0%|          | 0/4 [00:00<?, ?tasks/s]

In [32]:
%%px --block
# binary classification model
if rank == 0 or rank == 1:
    binary_model = tf.keras.Sequential()
    binary_model.add(tf.keras.layers.Dense(128, activation='relu', input_shape=(num_inputs,)))
    binary_model.add(tf.keras.layers.Dense(256, activation='relu'))
    binary_model.add(tf.keras.layers.Dense(128, activation='relu'))
    binary_model.add(tf.keras.layers.Dense(64, activation='relu'))
    binary_model.add(tf.keras.layers.Dense(10, activation='relu'))
    binary_model.add(tf.keras.layers.Dense(1, activation='sigmoid'))
else: 
    binary_model = tf.keras.Sequential()
    binary_model.add(tf.keras.layers.Dense(64, activation='relu', input_shape=(num_inputs,)))
    binary_model.add(tf.keras.layers.Dense(128, activation='relu'))
    binary_model.add(tf.keras.layers.Dense(128, activation='relu'))
    binary_model.add(tf.keras.layers.Dense(64, activation='relu'))
    binary_model.add(tf.keras.layers.Dense(32, activation='relu'))
    binary_model.add(tf.keras.layers.Dense(1, activation='sigmoid'))

# Initialize Local Loss Function
lossF = tf.keras.losses.BinaryCrossentropy()

# Initialize Optimizer
learning_rate = 0.01
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

In [None]:
%%px --block
train(binary_model, rank, size, lossF, optimizer, train_data_b, coord_data_b, epochs, coord_bs, coord_num_batches, isBinary)

%px:   0%|          | 0/4 [00:00<?, ?tasks/s]

[stdout:0] (Rank 0) Training Loss for Epoch 0: 0.9841
(Rank 0) Training Loss for Epoch 1: 0.9931
(Rank 0) Training Loss for Epoch 2: 0.9927
(Rank 0) Training Loss for Epoch 3: 0.9952
(Rank 0) Training Loss for Epoch 4: 0.9961
(Rank 0) Training Loss for Epoch 5: 0.9960


In [None]:
%%px --block
binary_model.compile(loss=lossF, optimizer=optimizer, metrics=[tf.keras.metrics.BinaryAccuracy()])
binary_model.evaluate(test_data_b)

In [None]:
%%px --block
# multi classification model
multi_model = tf.keras.Sequential()
multi_model.add(tf.keras.layers.Dense(128, activation='relu', input_shape=(num_inputs,)))
multi_model.add(tf.keras.layers.Dense(256, activation='relu'))
multi_model.add(tf.keras.layers.Dense(128, activation='relu'))
multi_model.add(tf.keras.layers.Dense(64, activation='relu'))
multi_model.add(tf.keras.layers.Dense(10, activation='relu'))
multi_model.add(tf.keras.layers.Dense(num_outputs_multi, activation='softmax'))