In [1]:
from kan import KAN
import matplotlib.pyplot as plt
import torch
import numpy as np
import pandas as pd
import torch.nn.functional as F
from sklearn.preprocessing import StandardScaler

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

X_columns = [
    'Header_Length', 'Protocol Type', 'Duration', 'Rate', 'Srate', 
    # 'Drate',
    # 'fin_flag_number', 'syn_flag_number', 'rst_flag_number', 'psh_flag_number',
    # 'ack_flag_number', 'ece_flag_number', 'cwr_flag_number', 'ack_count',
    # 'syn_count', 'fin_count', 'rst_count', 'HTTP', 'HTTPS', 'DNS', 'Telnet',
    # 'SMTP', 'SSH', 'IRC', 'TCP', 'UDP', 'DHCP', 'ARP', 'ICMP', 'IGMP', 
    'IPv','LLC', 
    'Tot sum', 'Min', 'Max', 'AVG', 'Std', 'Tot size', 'IAT', 'Number',
    'Magnitue', 'Radius', 'Covariance',
    'Variance', 'Weight'
]

Y_columns = ['label_L1']

label_L1_mapping = {"MQTT": 0, "Benign": 1, "Recon": 2, "ARP_Spoofing": 3}
label_L2_mapping = {"MQTT-DDoS-Connect_Flood": 4, "MQTT-DDoS-Publish_Flood": 5, 
                    "MQTT-DoS-Connect_Flood": 6, "MQTT-DoS-Publish_Flood": 7,
                    "MQTT-Malformed_Data": 8, "benign": 9, 
                    "Recon-OS_Scan": 10, "Recon-Port_Scan": 11,
                    "arp_spoofing": 12}


# Read the CSV file
df = pd.read_csv('/home/zyang44/Github/baseline_cicIOT/CIC_IoMT/19classes/filtered_train_s_4_11.csv')

df['label_L1'] = df['label_L1'].map(label_L1_mapping)
df['label_L2'] = df['label_L2'].map(label_L1_mapping)

# 90% as training set and 10% left as test set
train_size = int(len(df) * 0.9)
train_df, test_df = df.iloc[:train_size, :], df.iloc[train_size:, :]

scaler = StandardScaler()
train_X_scaled = scaler.fit_transform(train_df[X_columns])
test_X_scaled = scaler.transform(test_df[X_columns])
train_y = train_df[Y_columns].values.ravel()
test_y = test_df[Y_columns].values.ravel()

# take Y_columns as the label, and transfering to one-hot coded
dataset = {
    'train_input': torch.tensor(train_X_scaled, dtype=torch.float32, device=device),
    'train_label': F.one_hot(torch.tensor(train_y, dtype=torch.long, device=device), num_classes=4),
    'test_input': torch.tensor(test_X_scaled, dtype=torch.float32, device=device),
    'test_label': F.one_hot(torch.tensor(test_y, dtype=torch.long, device=device), num_classes=4)
}
print("Data prepared.",
      f"Train set: {dataset['train_input'].shape, dataset['train_label'].shape}",
      f"Test set: {dataset['test_input'].shape, dataset['test_label'].shape}", sep="\n")

cuda:0
Data prepared.
Train set: (torch.Size([33048, 20]), torch.Size([33048, 4]))
Test set: (torch.Size([3672, 20]), torch.Size([3672, 4]))


In [2]:
# create a KAN
kan = KAN(width=[20, 10, 4], grid=5, k=3, seed=42, device=device)

results = kan.fit(dataset, opt="Adam", steps=100)

train_logits = kan(dataset['train_input'])
test_logits = kan(dataset['test_input'])
print("KAN output:",
      f"Train logits {train_logits.shape}",
      f"Test logits {test_logits.shape}", sep="\n")

checkpoint directory created: ./model
saving model version 0.0


| train_loss: 2.66e-01 | test_loss: 5.64e-01 | reg: 1.20e+03 | : 100%|█| 100/100 [00:04<00:00, 23.98


saving model version 0.1
KAN output:
Train logits torch.Size([33048, 4])
Test logits torch.Size([3672, 4])


In [None]:
import ltn
import ltn.fuzzy_ops
from utils import MLP, LogitsToPredicate, DataLoader, DataLoaderMulti

# define the connectives, quantifiers, and the SatAgg
Not = ltn.Connective(ltn.fuzzy_ops.NotStandard())
And = ltn.Connective(ltn.fuzzy_ops.AndProd())   # And = ltn.Connective(custom_fuzzy_ops.AndProd())
Or = ltn.Connective(ltn.fuzzy_ops.OrProbSum())
Forall = ltn.Quantifier(ltn.fuzzy_ops.AggregPMeanError(p=2), quantifier="f")
Exists = ltn.Quantifier(ltn.fuzzy_ops.AggregPMean(p=2), quantifier="e")
Implies = ltn.Connective(ltn.fuzzy_ops.ImpliesReichenbach())
SatAgg = ltn.fuzzy_ops.SatAgg()

# define ltn constants
l_MQTT = ltn.Constant(torch.tensor([1, 0, 0, 0]))
l_Benign = ltn.Constant(torch.tensor([0, 1, 0, 0]))
l_Recon = ltn.Constant(torch.tensor([0, 0, 1, 0]))
l_ARP_Spoofing = ltn.Constant(torch.tensor([0, 0, 0, 1]))

In [11]:
mlp = MLP(layer_sizes=(20, 10, 4)).to(device)
P_mlp = ltn.Predicate(LogitsToPredicate(mlp))

train_loader = DataLoader(data=dataset['train_input'], labels=torch.tensor(train_y, dtype=torch.long, device=device), batch_size=len(dataset['train_input']))
test_loader = DataLoader(data=dataset['test_input'], labels=torch.tensor(test_y, dtype=torch.long, device=device), batch_size=len(dataset['test_input']))

# here, we need to filter the data by label_L1, data is a tensor(n*20), label_L1 is a tensor(n*4)
# based on the label_L1, we store the data into different ltn.Variable
# then, we can use the ltn.Predicate to get the predicate of the data
for data, labels in train_loader:
	x = ltn.Variable("x", train_loader.data)
	x_MQTT = ltn.Variable("x_MQTT", data[labels == 0]) 
	x_Benign = ltn.Variable("x_Benign", data[labels == 1])
	x_Recon = ltn.Variable("x_Recon", data[labels == 2])
	x_ARP_Spoofing = ltn.Variable("x_ARP_Spoofing", data[labels == 3])
	
	Forall(x_MQTT, P_mlp(x_MQTT, l_MQTT))
	Forall(x_Benign, P_mlp(x_Benign, l_Benign))
	Forall(x_Recon, P_mlp(x_Recon, l_Recon))
	Forall(x_ARP_Spoofing, P_mlp(x_ARP_Spoofing, l_ARP_Spoofing))

	# p_kan = P_kan(x)
	# p_kan_mqtt = P_kan(x_MQTT, l_MQTT)


**Build myKAN** 

By using the KANLayer, below have shown that the output logits from myKAN and KAN is same.

In [None]:
import torch.nn as nn

class MultiKANModel(nn.Module):
    def __init__(self, kan):
        """
        Wrap an already built MultKAN instance.
        Args:
            kan: a MultKAN model (which has attributes such as act_fun, symbolic_fun, node_bias, node_scale,
                 subnode_bias, subnode_scale, depth, width, mult_homo, mult_arity, input_id, symbolic_enabled, etc.)
        """
        super(MultiKANModel, self).__init__()
        self.kan = kan

    def forward(self, x, training=False, singularity_avoiding=False, y_th=10.):
        # Select input features according to input_id
        x = x[:, self.kan.input_id.long()]
        # Loop through each layer
        for l in range(self.kan.depth):
            # Get outputs from the numerical branch (KANLayer) of current layer
            x_numerical, preacts, postacts_numerical, postspline = self.kan.act_fun[l](x)
            # Get output from the symbolic branch if enabled
            if self.kan.symbolic_enabled:
                x_symbolic, postacts_symbolic = self.kan.symbolic_fun[l](x, singularity_avoiding=singularity_avoiding, y_th=y_th)
            else:
                x_symbolic = 0.
            # Sum the numerical and symbolic outputs
            x = x_numerical + x_symbolic

            # Subnode affine transformation
            x = self.kan.subnode_scale[l][None, :] * x + self.kan.subnode_bias[l][None, :]

            # Process multiplication nodes
            dim_sum = self.kan.width[l+1][0]
            dim_mult = self.kan.width[l+1][1]
            if dim_mult > 0:
                if self.kan.mult_homo:
                    for i in range(self.kan.mult_arity-1):
                        if i == 0:
                            x_mult = x[:, dim_sum::self.kan.mult_arity] * x[:, dim_sum+1::self.kan.mult_arity]
                        else:
                            x_mult = x_mult * x[:, dim_sum+i+1::self.kan.mult_arity]
                else:
                    for j in range(dim_mult):
                        acml_id = dim_sum + int(np.sum(self.kan.mult_arity[l+1][:j]))
                        for i in range(self.kan.mult_arity[l+1][j]-1):
                            if i == 0:
                                x_mult_j = x[:, [acml_id]] * x[:, [acml_id+1]]
                            else:
                                x_mult_j = x_mult_j * x[:, [acml_id+i+1]]
                        if j == 0:
                            x_mult = x_mult_j
                        else:
                            x_mult = torch.cat([x_mult, x_mult_j], dim=1)
                # Concatenate sum and mult parts
                x = torch.cat([x[:, :dim_sum], x_mult], dim=1)

            # Node affine transformation
            x = self.kan.node_scale[l][None, :] * x + self.kan.node_bias[l][None, :]

        # Final x corresponds to the logits output of the whole model
        return x



In [None]:
kan = KAN(width=[20, 10, 4], grid=5, k=3, seed=42, device=device)
# results = kan.fit(dataset, opt="Adam", steps=100)

train_logits = kan(dataset['train_input'])
print("KAN output:", train_logits[10])

mykan = MultiKANModel(kan)
mykan_logits = mykan(dataset['train_input'])
print("MultiKANModel output:", mykan_logits[10])

mlp_logits = mlp(dataset['train_input'])
print("MLP output:", mlp_logits[10])

checkpoint directory created: ./model
saving model version 0.0
KAN output: tensor([-0.0136, -0.2378,  0.2716, -0.1260], device='cuda:0',
       grad_fn=<SelectBackward0>)
MultiKANModel output: tensor([-0.0136, -0.2378,  0.2716, -0.1260], device='cuda:0',
       grad_fn=<SelectBackward0>)
MLP output: tensor([-0.7183, -0.2437,  0.1583,  0.3623], device='cuda:0',
       grad_fn=<SelectBackward0>)


In [35]:
P_mlp = ltn.Predicate(LogitsToPredicate(mlp))
P_kan = ltn.Predicate(LogitsToPredicate(mykan))

P_mlp(x_MQTT, l_MQTT)
P_kan(x_MQTT, l_MQTT)
print("P_mlp(x_MQTT, l_MQTT):", P_mlp(x_MQTT, l_MQTT))
print("P_kan(x_MQTT, l_MQTT):", P_kan(x_MQTT, l_MQTT))

P_mlp(x_MQTT, l_MQTT): LTNObject(value=tensor([0.1994, 0.1340, 0.2202,  ..., 0.1750, 0.2290, 0.2164], device='cuda:0',
       grad_fn=<ViewBackward0>), free_vars=['x_MQTT'])
P_kan(x_MQTT, l_MQTT): LTNObject(value=tensor([0.2669, 0.2453, 0.2565,  ..., 0.2666, 0.2495, 0.2514], device='cuda:0',
       grad_fn=<ViewBackward0>), free_vars=['x_MQTT'])
