In [1]:
from statistics import mean
from typing import List, Tuple, Any
import numpy as np
import pandas as pd
from scapy.layers.dot11 import Dot11, RadioTap
from scapy.utils import PcapReader

PacketSet = List[Tuple[Any, Any, Dot11]]



In [2]:
def read_from_pcap(path: str, bpf_filter: str= "") -> PacketSet:
    """
    This function reads the .pcap file and applies the BPF filter given in input.

    :param path: Path of the .pcap file to extract.
    :param bpf_filter: BPF filter to apply to the .pcap file.
    :return: List of filtered packets.
    """
    packet_stream = []
    with PcapReader(filename=path) as pcap_reader:
        for pkt in pcap_reader:
            dot11_packet = len(pkt), pkt.time, pkt
            packet_stream.append(dot11_packet)
    return packet_stream


def dot11_to_feature(pkt: Dot11) -> np.ndarray:
    """
    This function returns a numpy ndarray that contains an encoding of the most important features of the 802.11 frame.
    The features are the following:
     * Source MAC Address (addr2).
     * Destination MAC Address (addr1).
     * Duration field (ID).

    :param Dot11 pkt: the packet to process.
    :return: The ndarray that encodes the extracted features of the input packet.
    """
    return np.array([pkt.addr2, pkt.addr1, pkt.ID])


def feature_expansion_raw(pkt_list: PacketSet) -> pd.DataFrame:
    """
    This function extract the main features from each packet's Wi-Fi header and collects everything into a Pandas
    DataFrame.
    :param pkt_list: List of the filtered packets from the .pcap file.
    :return: Pandas Dataframe of each packet and their main features.
    """
    pkt_length = list(map(lambda pkt, *_,: pkt[0], pkt_list))
    x = list(map(lambda *pkt: dot11_to_feature(pkt[0][2]), pkt_list))
    timestamps = list(map(lambda *t: t[0][1], pkt_list))
    pd_x = pd.DataFrame(x, columns=["SourceAddress", "DestinationAddress", "Duration"])
    pd_x.loc[:, "Duration"] = pd_x.loc[:, "Duration"].astype(int)
    pd_x.loc[:, "PacketLength"] = pd.Series(pkt_length, dtype=int)
    pd_x.loc[:, "Timestamp"] = pd.Series(timestamps, dtype=float)
    pd_x.loc[:, "TimestampOffset"] = pd_x["Timestamp"] - pd_x["Timestamp"].min()
    return pd_x

In [5]:
path = "WIFI_tests/captures/"
captures_files_normal = [path+"capture_test-11.cap"]
captures = [read_from_pcap(path=f) for f in captures_files_normal]
captures_pd_normal = list(map(lambda x: feature_expansion_raw(x), captures))
captures_pd_normal = pd.concat([captures_pd_normal], axis=0)
captures_pd_normal.loc[:, "Anomaly"] = 0

In [6]:
captures_pd_normal

[Empty DataFrame
 Columns: [SourceAddress, DestinationAddress, Duration, PacketLength, Timestamp, TimestampOffset]
 Index: []]

In [None]:
captures_files_anomaly = [path+"capture_test-13.cap"]
captures = [read_from_pcap(path=f) for f in captures_files_anomalyl]
captures_pd_anomaly = list(map(lambda x: feature_expansion_raw(x), captures))
captures_pd_anomaly = pd.concat([captures_pd_anomaly], axis=0)
captures_pd_anomaly.loc[:, "Anomaly"] = 1

In [None]:
captures_pd_anomaly

In [None]:
captures_pd = pd.concat([captures_pd_normal,captures_pd_anomaly], axis=0)

In [None]:
sns.pairplot(captures_pd, hue="Anomaly")