In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

'''MQTT Packet analysis script developed for the experiments ran in ISR.

This script automatically performs the RTT and Packet Loss analysis of the packets given the folder containing
all Wireshark capture files ('.pcap'). 

It uses pyshark (Python wrapper for the tshark tool) to analyze the packets.

Setting up:
    Use the supplied Dockerfile to setup a development environment, or manually install all dependencies.
    Make sure to mount the folder with the log files in /logs/.
'''

import pyshark
import pandas as pd
from collections import deque

# fix nested loop shenanigans with Jupyter
import nest_asyncio
nest_asyncio.apply()

In [None]:
# Define all experiment variables
client_list =  ['10.231.219.206', '10.231.219.81', '10.231.219.73', '10.231.219.185']
host_ip = '10.231.201.175'

frequency_list =  [1, 10, 100]
packet_sizes =  ['small', 'medium', 'large']
qos_levels = [0, 1, 2]

# Mapping to get real packet size
packet_sizes_bytes = {'small': 1250, 'medium': 12500, 'large': 125000}

# Headers to be used in pandas.DataFrame
FREQUENCY_HEADER = 'FREQUENCY'
PACKET_SIZE_HEADER = 'PACKET_SIZE'
NUM_CLIENTS_HEADER = 'NUM_CLIENTS'
QOS_LEVEL_HEADER = 'QOS_LEVEL'

RTT_HEADER = 'RTT'
RTT_MEAN_HEADER = 'RTT_MEAN' 
RTT_STD_HEADER = 'RTT_STD'
PACKET_LOSS_HEADER = 'PACKET_LOSS'

# Headers to be used in packet_raw_info
TCP_SEGMENTS_HEADER = "tcp.segments" # segments that composed the packet
TCP_FIRST_SEGMENT_HEADER = TCP_SEGMENTS_HEADER # first segment that composed the packet 
TCP_FIRST_SEGMENT_TIMESTAMP_HEADER = "tcp.segments.timestamp" # first segment that composed the packet 

MQTT_MSGID_HEADER = "mqtt.msgid" # id of the MQTT message, used to follow up on QoS on MQTT level
MQTT_MSGLEN_HEADER = "mqtt.msglen" # full length of the packet (payload + overhead)
FRAME_NUMBER_HEADER = "frame.number" # number of the frame
FRAME_TIMESTAMP_HEADER = "frame.time" # timestamp of the recorded packet
IP_ADDR_HEADER = "ip.addr"

MQTT_PACKET_ACKED_HEADER = "MQTT_PACKET_ACKED" # indicates if the packed was acked or not
MQTT_PACKET_ACK_TIMESTAMP_HEADER = "MQTT_PACKET_ACK_TIMESTAMP" # timestamp of packet ack (PUBACK for QoS1, PUBCOMP for QoS2)

# Useful functions
# Packet file name format
filename_fmt = '/logs/f{}c{}qos{}{}.pcap'
getLogFilename = lambda frequency, number_of_clients, qos_level, packet_size : filename_fmt.format(frequency, number_of_clients, qos_level, packet_size)

# Get Client IP from ip.addr info
getClientIP = lambda addr: addr[0] if addr[0] != host_ip else addr[1] 

# Create FileCapture from capture file and display filter 
createFileCapture = lambda capture_filename, display_filter: pyshark.FileCapture(capture_filename, use_json=True, keep_packets=False, display_filter=display_filter)


In [None]:
# Wireshark display filters
TSHARK_DF_AND = "&&"
TSHARK_DF_OR = "||"

# Template filter for different IPs
TSHARK_DF_IPFILTER_fmt = "ip.addr == {}"
TSHARK_DF_IPFILTER = lambda ip :  TSHARK_DF_IPFILTER_fmt.format(ip)

# Filters for different MQTT packets
MQTT_PUBLISH_MSGTYPE = 3
MQTT_PUBACK_MSGTYPE = 5
MQTT_PUBCOMP_MSGTYPE = 7

TSHARK_DF_MQTTPUBLISH = f"mqtt.msgtype == {MQTT_PUBLISH_MSGTYPE}"
TSHARK_DF_MQTTPUBACK = f"mqtt.msgtype == {MQTT_PUBACK_MSGTYPE}"
TSHARK_DF_MQTTPUBCOMP = f"mqtt.msgtype == {MQTT_PUBCOMP_MSGTYPE}"

# filters the 1883 port so we only get MQTT packets
TSHARK_DF_MQTT_TCPFILTER = "tcp.port == 1883"
TSHARK_DF_MQTT_IGNORE_PINGS= "!(mqtt.msgtype == 12)"

# Filter for ACK packets and their corresponding ACK'd frame
TSHARK_DF_ACKS_FRAME_fmt = "tcp.analysis.acks_frame >= {}"
TSHARK_DF_ACKS_FRAME = lambda number :  TSHARK_DF_ACKS_FRAME_fmt.format(number)

# pyshark doesn't provide any easy way to access the data, so we need to do this to get it
TSHARK_DF_START_ON_FRAME_fmt = "frame.number > {}"
TSHARK_DF_START_ON_FRAME = lambda number :  TSHARK_DF_START_ON_FRAME_fmt.format(number)
TSHARK_DF_GET_FRAME_fmt = "frame.number == {}"
TSHARK_DF_GET_FRAME = lambda number :  TSHARK_DF_GET_FRAME_fmt.format(number)

In [None]:
# Log files are processed in two-passes:
# - First pass scans for MQTT packets and gets their info (segment numbers for MQTT PUBLISH, etc)
# - Second pass processes the underlying TCP fragments and ACKs and updates the previous info with timestamps when needed

def process_packet(frequency, number_of_clients, qos_level, packet_size):
    capture_filename = getLogFilename(frequency, number_of_clients, qos_level, packet_size)
    packet_info = []
    
    # split capture according to QoS level
    if (qos_level == 0):
        # dict of all MQTT publish packets: map from {frame.number -> packet_data}
        mqttpub_packet_list = {}

        # needs json to capture segment data
        mqtt_capture = createFileCapture(capture_filename, TSHARK_DF_MQTTPUBLISH + TSHARK_DF_AND + TSHARK_DF_MQTT_IGNORE_PINGS)
        for packet in mqtt_capture:
            # get packet data
            packet_data = {
                IP_ADDR_HEADER: packet.ip.addr,
                FRAME_TIMESTAMP_HEADER: float(packet.frame_info.time_epoch),
                FRAME_NUMBER_HEADER: int(packet.frame_info.number),
                TCP_FIRST_SEGMENT_HEADER: int(packet[TCP_SEGMENTS_HEADER].segment[0] if packet.__contains__(TCP_SEGMENTS_HEADER) else packet.frame_info.number),
                MQTT_MSGLEN_HEADER: int(packet.mqtt.len),
                TCP_FIRST_SEGMENT_TIMESTAMP_HEADER: float(packet.frame_info.time_epoch) if not packet.__contains__(TCP_SEGMENTS_HEADER) else None
            }
            mqttpub_packet_list[packet_data[TCP_FIRST_SEGMENT_HEADER]] = packet_data 

        # close subprocess
        mqtt_capture.close()

        # create a queue so we can ACK the packets in order (this assumes that the ACKs are not out of order)
        # queues are created per client to validate the ACKs properly
        mqttpub_packet_deque = {client: deque(sorted(filter(lambda packet: client == getClientIP(packet[IP_ADDR_HEADER]), mqttpub_packet_list.values()), 
            key=(lambda packet: -packet[FRAME_NUMBER_HEADER]))) for client in client_list}

        # grab last packet of each client
        last_packet_to_ack = {client: mqttpub_packet_deque[client].pop() for client in client_list if len(mqttpub_packet_deque[client]) > 0}

        tcp_capture = createFileCapture(capture_filename, TSHARK_DF_MQTT_TCPFILTER)
        for packet in tcp_capture:
            # check if it's an ACK packet
            if packet.ip.dst == host_ip: 
                if packet.tcp.has_field("analysis") and packet.tcp.analysis.has_field("acks_frame"):
                    if (last_packet_to_ack.get(packet.ip.src) is not None):
                        # check if it ACKs the client's last PUBLISH
                        if (packet.tcp.analysis.acks_frame.main_field.int_value >= last_packet_to_ack.get(packet.ip.src)[FRAME_NUMBER_HEADER]):
                            # add packet to the list
                            packet_info.append({
                                FREQUENCY_HEADER: frequency,
                                MQTT_MSGLEN_HEADER: packet_data[MQTT_MSGLEN_HEADER],
                                PACKET_SIZE_HEADER: packet_sizes_bytes[packet_size],
                                NUM_CLIENTS_HEADER: number_of_clients,
                                QOS_LEVEL_HEADER: qos_level,
                                RTT_HEADER: (float(packet.frame_info.time_epoch) - mqttpub_packet_list[last_packet_to_ack.get(packet.ip.src)[TCP_FIRST_SEGMENT_HEADER]][TCP_FIRST_SEGMENT_TIMESTAMP_HEADER])
                            })
                            # update last packet ACK'd
                            last_packet_to_ack[packet.ip.src] = mqttpub_packet_deque[packet.ip.src].pop() if len(mqttpub_packet_deque[packet.ip.src]) > 0 else None
            else:
                # check if the current packet is one of the first segments, if not then skip
                if mqttpub_packet_list.get(int(packet.frame_info.number)) is not None:
                    mqttpub_packet_list[int(packet.frame_info.number)][TCP_FIRST_SEGMENT_TIMESTAMP_HEADER] = float(packet.frame_info.time_epoch)
    else:
        # dict of all MQTT publish packets: map from {packet.ip.addr -> packet_data}
        mqttpub_packet_list = {}

        # needs json to capture segment data
        mqtt_capture = createFileCapture(capture_filename, 
            display_filter = ((TSHARK_DF_MQTTPUBLISH + TSHARK_DF_OR + TSHARK_DF_MQTTPUBACK) if (qos_level == 1) else 
            (TSHARK_DF_MQTTPUBLISH + TSHARK_DF_OR + TSHARK_DF_MQTTPUBCOMP)))
        
        for packet in mqtt_capture:
            # check if it's a MQTT PUBLISH 
            if (int(packet.mqtt.hdrflags_tree.msgtype) == MQTT_PUBLISH_MSGTYPE): 
                packet_data = {
                    IP_ADDR_HEADER: packet.ip.addr,
                    FRAME_TIMESTAMP_HEADER: float(packet.frame_info.time_epoch),
                    FRAME_NUMBER_HEADER: int(packet.frame_info.number),
                    TCP_FIRST_SEGMENT_HEADER: int(packet[TCP_SEGMENTS_HEADER].segment[0] if packet.__contains__(TCP_SEGMENTS_HEADER) else packet.frame_info.number),
                    MQTT_MSGLEN_HEADER: int(packet.mqtt.len),
                    TCP_FIRST_SEGMENT_TIMESTAMP_HEADER: float(packet.frame_info.time_epoch) if not packet.__contains__(TCP_SEGMENTS_HEADER) else None,
                    MQTT_PACKET_ACK_TIMESTAMP_HEADER: None,
                    MQTT_PACKET_ACKED_HEADER: False,
                }
                mqttpub_packet_list[(frozenset(packet.ip.addr), packet.mqtt.msgid)] = packet_data

            else: # then it's a PUBACK / PUBCOMP
                # get saved packet
                mqtt_packet = mqttpub_packet_list.get((frozenset(packet.ip.addr), packet.mqtt.msgid))
                if mqtt_packet is not None:
                    mqttpub_packet_list[(frozenset(packet.ip.addr), packet.mqtt.msgid)][MQTT_PACKET_ACK_TIMESTAMP_HEADER] = float(packet.frame_info.time_epoch)
                    mqttpub_packet_list[(frozenset(packet.ip.addr), packet.mqtt.msgid)][MQTT_PACKET_ACKED_HEADER] = True    
        
        # reshape dict to be indexed by first segment frame number
        mqttpub_packet_list = {packet[TCP_FIRST_SEGMENT_HEADER]: packet for packet in mqttpub_packet_list.values()}
        
        # close subprocess
        mqtt_capture.close()

        # create a queue so we can ACK the packets in order (this assumes that the ACKs are not out of order)
        # queues are created per client to validate the ACKs properly
        mqttpub_packet_deque = {client: deque(sorted(filter(lambda packet: client == getClientIP(packet[IP_ADDR_HEADER]), mqttpub_packet_list.values()), 
            key=(lambda packet: -packet[FRAME_NUMBER_HEADER]))) for client in client_list}

        # grab last packet of each client
        last_packet_to_ack = {client: mqttpub_packet_deque[client].pop() for client in client_list if len(mqttpub_packet_deque[client]) > 0}

        tcp_capture = createFileCapture(capture_filename, TSHARK_DF_MQTT_TCPFILTER)
        for packet in tcp_capture:
            # check if it's an ACK packet
            if packet.ip.dst == host_ip: 
                if packet.tcp.has_field("analysis") and packet.tcp.analysis.has_field("acks_frame"):
                    if (last_packet_to_ack.get(packet.ip.src) is not None):
                        # check if it ACKs the client's last PUBLISH
                        if (packet.tcp.analysis.acks_frame.main_field.int_value >= last_packet_to_ack.get(packet.ip.src)[FRAME_NUMBER_HEADER]):
                            # add packet to the list
                            packet_info.append({
                                FREQUENCY_HEADER: frequency,
                                MQTT_MSGLEN_HEADER: packet_data[MQTT_MSGLEN_HEADER],
                                PACKET_SIZE_HEADER: packet_sizes_bytes[packet_size],
                                NUM_CLIENTS_HEADER: number_of_clients,
                                QOS_LEVEL_HEADER: qos_level,
                                RTT_HEADER: (float(packet.frame_info.time_epoch) - mqttpub_packet_list[last_packet_to_ack.get(packet.ip.src)[TCP_FIRST_SEGMENT_HEADER]][TCP_FIRST_SEGMENT_TIMESTAMP_HEADER])
                            })
                            # update last packet ACK'd
                            last_packet_to_ack[packet.ip.src] = mqttpub_packet_deque[packet.ip.src].pop() if len(mqttpub_packet_deque[packet.ip.src]) > 0 else None
            else:
                # check if the current packet is one of the first segments, if not then skip
                if mqttpub_packet_list.get(int(packet.frame_info.number)) is not None:
                    mqttpub_packet_list[int(packet.frame_info.number)][TCP_FIRST_SEGMENT_TIMESTAMP_HEADER] = float(packet.frame_info.time_epoch)
    return packet_info

In [None]:
# analyzed info
mqttpacket_info = pd.DataFrame(None, columns=[NUM_CLIENTS_HEADER, QOS_LEVEL_HEADER, FREQUENCY_HEADER, PACKET_SIZE_HEADER, RTT_MEAN_HEADER, RTT_STD_HEADER, PACKET_LOSS_HEADER])
mqttpacket_raw_info = []

# loop through all logs
for frequency in frequency_list:
    for number_of_clients in range(1, len(client_list)+1):
        for qos_level in qos_levels:
            for packet_size in packet_sizes:
                print((frequency, number_of_clients, qos_level, packet_size))
                try:
                    mqttpacket_raw_info = mqttpacket_raw_info + process_packet(frequency, number_of_clients, qos_level, packet_size)
                except (FileNotFoundError, pyshark.TSharkCrashException) as e:
                    print(e) # report instances of failed / broken logs :)
                except Exception as e:
                    raise e

# convert to a DataFrame for easy analysis 
mqttpacket_raw_info = pd.DataFrame(mqttpacket_raw_info)  

In [None]:
import numpy as np
mqttpacket_raw_info = pd.DataFrame(mqttpacket_raw_info)

# group data by dependent variables
mqttpacket_raw_info_groups = mqttpacket_raw_info.groupby(by=[FREQUENCY_HEADER, NUM_CLIENTS_HEADER, PACKET_SIZE_HEADER, QOS_LEVEL_HEADER])

# get mean & std values
print(mqttpacket_raw_info_groups.mean())
print(mqttpacket_raw_info_groups.std())

# to estimate packet loss, count the amount of observed packets and divide it by the expected amount (1000*"nº of clients")
print(mqttpacket_raw_info_groups.size())
