In [11]:
"""
This is a simple WFQ simulator based on a naive Descrite Event Simulation (DES) approach. This is my own sole work that I did
while preparing for the exam and diving deeper into the topic of WFQ.
The implementation of WFQ is based on the descriptions given in the lecture notes of the course "EE562: Network Stack Implementation"
by Prof. Martin Collier at DCU. I did not use any external sources to write this code.
"""
from collections import defaultdict, deque
from dataclasses import dataclass
from heapq import heappush, heappop


class Packet:
    """
    A packet is a unit of data that is transmitted over a network. It is an indivisible unit of data that is sent from
    one device to another.
    """
    def __init__(self, arrival_time, size, packet_id=None, flow_id=None):
        self.flow_id = flow_id
        self.seq_no = None
        self.packet_id = packet_id
        self.size = size  # The size of the packet in some unit of data
        self.arrival_time = arrival_time
        self.virtual_arrival_time = None  # The virtual arrival time of the packet
        self.virtual_finish_time = None  # The estimated time the packet will finish service
        self.start_time = None  # The time the packet was scheduled for service
        self.departure_time = None  # The actual time the packet finished service
        self.service_time = None  # The time the packet spent in service

    def __lt__(self, other):
        # If virtual finish times are equal, break ties by packet id if it exists, otherwise break ties by packet size
        if self.virtual_finish_time == other.virtual_finish_time:
            if self.packet_id and other.packet_id:
                return self.packet_id < other.packet_id
            return self.size < other.size
        return self.virtual_finish_time < other.virtual_finish_time

    def __str__(self):
        return f"Packet(packet_id={self.packet_id}, flow_id={self.flow_id}, seq_no={self.seq_no}, arrival_time={self.arrival_time}, size={self.size})"

        
class Flow:
    """
    A flow is a collection of packets that are supposed to be treated similarly. The flow id is used to identify the
    flow. The packets are stored in a list. The service amount is the total amount of service received by the flow.
    """
    def __init__(self, flow_id):
        self.flow_id = flow_id
        self.packets = []
        self.service_time = 0  # Total service time received by the flow
        self._next_seq_no = 0  # The next sequence number to be assigned to a packet of the flow

    def __str__(self) -> str:
        return f"Flow(flow_id={self.flow_id}, packets={[str(pct) for pct in self.packets]})"

    def add_packet(self, packet):
        packet.flow_id = self.flow_id
        packet.seq_no = self._next_seq_no
        self._next_seq_no += 1
        self.packets.append(packet)
        return self
        
    def add_packets(self, packet_list):
        for packet in packet_list:
            self.add_packet(packet)
        return self


@dataclass
class Event:
    """
    An event is a thing that happens and requires processing. The event has a type and a time. The type of the event can be
    either "arrival" or "departure". The time of the event is the time at which the event needs to be
    processed.
    """
    type: str
    time: float
    priority: int = 0
    packet: Packet = None

    def __lt__(self, other):
        # The priority is used to break ties when the time of two events is the same, if
        # is equal and packet exists, break ties by packet id, otherwise break ties by packet size
        if self.time == other.time:
            if self.priority == other.priority:
                if self.packet and other.packet:
                    if self.packet.packet_id and other.packet.packet_id:
                        return self.packet.packet_id < other.packet.packet_id
                    return self.packet.size < other.packet.size
                
            return self.priority < other.priority
        return self.time < other.time

    def __str__(self):
        return f"Event(type={self.type}, time={self.time}, packet={self.packet})"


class WFQ:
    """
    This class simulates the weighted fair queuing algorithm (WFQ). WFQ is a practical approximation of Generalized Processor Sharing (GPS)
    algorithm. GPS algorithm uses a "fluid-flow model". The fluid-flow model is a theoretical model that assumes that packets are infinitely
    divisible ("fluid") and that the service rate is proportional to the weight of the flow.
    
    The WFQ algorithm is a packet-by-packet algorithm that approximates the fluid-flow model
    but accounts for the fact that packets are not infinitely divisible.
    
    We assume that packets arrive instantaneously and that the service rate is 1 unit of data per time unit. Note that packet sizes
    given in Bytes need to be converted to units of service time at the speed of the interface.
    For example, if the packet size is 2400 Bytes and the interface rate is 16 Mbps, then the service time in ms is: 
    service_time = 2400 * 8 / 16 000 000 * 1000 = 1.2 ms. We set this as the size of the packet (1.2 units).
    Our implementation of WFQ algorithm uses an idea of a "virtual time". The virtual time is the time in alternative GPS system.
    It is used to estimate the time at which a packet would finish service, so we can schedule the packets in the order of their
    virtual finish times.
    """
    def __init__(self):
        self.event_queue = []  # The event heap queue
        self.queues = defaultdict(lambda: deque())  # The packet queues, dict of queue_idx: queue
        self.queue = []  # The heap queue that stores virtual finish times of the packets and queue indices
        self.virt_queues = defaultdict(lambda: deque())  # The packet queues, dict of queue_idx: queue
        self.virt_queue = []  # The heap queue that stores virtual finish times of the packets and queue indices
        self.last_virtual_finish_time = defaultdict(lambda: 0)  # The virtual finish time of the last packet of each queue
        self.backlog_level = 0  # Sum of weights of all the queues that are not empty
        self.weights = None  # The weights of the queues, dict of queue_idx: weight
        self.time = 0  # The current simulation time
        self.busy = False  # Indicates whether the server is busy transmitting a packet
        self.round_time = {0: 0}  # The round time at time t, dict of time: round_time
        self.last_gps_event = 0
        self.virtual_time = 0
        self.flows = {}  # The flows, dict of flow_id: flow

        self.total_service_time = 0  # The total service time of all the packets

    def _calc_virtual_time(self):
        # The virtual time is the time in a GPS system
        print(f"\tCalculating virtual time: {self.time}, {self.backlog_level}")
        if self.time in self.round_time:
            print(f"\tVirtual time from table: R({self.virtual_time})={self.round_time[self.time]}")
            self.virtual_time = self.round_time[self.time]
        else:
            if self.backlog_level:
                self.round_time[self.time] = self.round_time[self.last_gps_event] + (self.time - self.last_gps_event) / self.backlog_level
                print(f"\tGuessed R(self.time): {self.round_time[self.time]} = {self.round_time[self.last_gps_event]} + ({self.time} - {self.last_gps_event}) / {self.backlog_level}")
                print("\tNeed to check if there were departures in the meantime")
                while self.virt_queue:
                    f_min, queue_idx = heappop(self.virt_queue)
                    print(f"\tLooking at event in virt queue: {f_min}, {queue_idx}")
                    if f_min > self.round_time[self.time]:
                        # No outstanding departures
                        print(f"\tNo outstanding departures, R({self.time})={self.round_time[self.time]}")
                        heappush(self.virt_queue, (f_min, queue_idx))
                        break

                    # Finding the time at which the packet would finish service in the alternative GPS system
                    # R(t) = F_min, find t
                    print(f"\tFmin: {f_min}, Guessed R({self.time}): {self.round_time[self.time]}")
                    print(f"\tLast_GPS_event at {self.last_gps_event}: R({self.last_gps_event})={self.round_time[self.last_gps_event]}")
                    print(f"\tt=Last_GPS_event + (F_min - R(Last_GPS_event)) * backlog_level")
                    print(f"\tt={self.last_gps_event} + ({f_min} - {self.round_time[self.last_gps_event]}) * {self.backlog_level}")
                    self.last_gps_event = self.last_gps_event + (f_min - self.round_time[self.last_gps_event]) * self.backlog_level
                    print(f"\tThere was a departure in the GPS system at time {self.last_gps_event}")
                    print(f"\tFound R({self.last_gps_event})={f_min}")
                    self.round_time[self.last_gps_event] = f_min
                    self.virt_queues[queue_idx].popleft()
                    print(f"\tRemoved packet from virt queue {queue_idx}, new queue size: {len(self.virt_queues[queue_idx])}")

                    # If virt queue is empty, we need to update the backlog level
                    if len(self.virt_queues[queue_idx]) == 0:
                        self.backlog_level -= self.weights[queue_idx]
                        print(f"\tReducing backlog. New backlog level: {self.backlog_level}")
                    if self.backlog_level:
                        self.round_time[self.time] = self.round_time[self.last_gps_event] + (self.time - self.last_gps_event) / self.backlog_level
                        print(f"\tNew round time: {self.round_time[self.time]}")
                self.last_gps_event = self.time
                self.virtual_time = self.round_time[self.time]


    def simulate(self, flows, weights):
        self.flows = {flow.flow_id: flow for flow in flows}
        
        # weights are given as dict of queue_idx: weight, need to normalize them
        self.weights = {queue_idx: weight / sum(weights.values()) * len(weights) for queue_idx, weight in weights.items()}
        print(f"Weights: {self.weights}")

        # Linearize the flows into a the event heap queue
        for flow in flows:
            for packet in flow.packets:
                self._add_event(Event("arrival", packet.arrival_time, priority=2, packet=packet))

        # Process the events in the event heap queue
        while self.event_queue:
            event = self._get_event()
            if event.time > self.time:
                # Time needs to be updated to the time of the event
                self.time = event.time
                print(f"Advancing time: {self.time}")
            
            if event.type == "arrival":
                self._process_arrival(event.packet)
            elif event.type == "departure":
                self._process_departure(event.packet)
            else:
                raise ValueError(f"Unknown event type: {event.type}")
            
            # If the event queue is empty or the next event is in the future and the server is not busy, schedule the next packet
            if not self.event_queue or self.event_queue[0].time > self.time and not self.busy:
                self._schedule_next_packet()


    def _add_event(self, event):
        heappush(self.event_queue, event)

    def _get_event(self):
        return heappop(self.event_queue)

    def _process_arrival(self, packet):
        print(f"t={self.time:.3f} ARRIVED {packet}")
        queue_idx = packet.flow_id
        queue = self.queues[queue_idx]
        virt_queue = self.virt_queues[queue_idx]

        # Calculate the current virtual time
        self._calc_virtual_time()
        
        # Calculate the virtual finish time of the packet
        virtual_start_time = max(self.last_virtual_finish_time[queue_idx], self.virtual_time)
        print(f"\tvrt_start: max({self.last_virtual_finish_time[queue_idx]}, {self.virtual_time}) = {virtual_start_time}")
        virtual_finish_time = virtual_start_time + packet.size / self.weights[queue_idx]
        print(f"\tvrt_finish: {virtual_start_time} + {packet.size} / {self.weights[queue_idx]} = {virtual_finish_time}")
        self.last_virtual_finish_time[queue_idx] = virtual_finish_time
        print(f"\tUpdating last_vrt_finish for queue_idx {queue_idx}: {self.last_virtual_finish_time[queue_idx]}")

        # Update packet times
        packet.virtual_start_time = virtual_start_time
        packet.virtual_finish_time = virtual_finish_time

        # Add the packets to the queues
        queue.append(packet)
        virt_queue.append(packet)
        print(f"\tAdding packet to queue {queue_idx}, queue size: {len(virt_queue)}")

        # Add the queue_idx to the queues
        heappush(self.queue, (virtual_finish_time, queue_idx))
        
        if len(virt_queue) == 1:
            # Arrived to an empty virt queue, update the backlog level
            self.backlog_level += self.weights[queue_idx]
            print(f"\tIncreasing backlog. New backlog level: {self.backlog_level}")
        heappush(self.virt_queue, (virtual_finish_time, queue_idx))
        print(f"\tAdding queue_idx {queue_idx} with virtual_finish_time {virtual_finish_time} to the virtual queue")



    def _process_departure(self, packet):
        print(f"t={self.time:.3f} DEPARTED {packet}")

        # Update packet times
        packet.departure_time = self.time

        queue_idx = packet.flow_id
        queue = self.queues[queue_idx]
        queue.popleft()        
        self.busy = False

        # Update flow service times
        self.flows[queue_idx].service_time += packet.service_time

        self.total_service_time += packet.service_time

    def _schedule_next_packet(self):
        # Schedule the departure of the next packet
        if self._if_any_packet_left():
            # Select the next packet to be served
            virtual_finish_time, queue_idx = heappop(self.queue)
            next_packet = self.queues[queue_idx][0]
            print(f"t={self.time:.3f} STARTED {next_packet} with virtual finish time {virtual_finish_time}")
            self.busy = True
            departure_time = self.time + next_packet.size
            self._add_event(Event("departure", departure_time, priority=1, packet=next_packet))

            # Update packet times
            next_packet.departure_time = departure_time
            next_packet.start_time = self.time
            next_packet.service_time = departure_time - self.time
        else:
            # No packets left, server is idle
            print(f"t={self.time:.3f} IDLE")

    def _if_any_packet_left(self):
        # Check if there are any packets left in any of the queues
        return any([len(queue) > 0 for queue in self.queues.values()])

    def print_results(self):
        # Linearize packets of all the flows into a list and sort them by departure time
        packets = []
        for flow in self.flows.values():
            packets.extend(flow.packets)
        packets.sort(key=lambda packet: packet.departure_time)

        # Print the results, in nice table format, limit float precision to 3 decimal places
        print("Pct ID\tFlow ID\tSeq No\tArrival Time\tStart Time\tSrv Time\tDeparture Time\tVrt Start Time\tVrt Finish Time")
        for packet in packets:
            print(f"{packet.packet_id}\t{packet.flow_id}\t{packet.seq_no}\t{packet.arrival_time:.3f}\t\t{packet.start_time:.3f}\t\t{packet.service_time:.3f}\t\t{packet.departure_time:.3f}\t\t{packet.virtual_start_time:.3f}\t\t{packet.virtual_finish_time:.3f}")

        # Print per flow service times, limit float precision to 3 decimal places
        print()
        print("Flow ID\t\tService Time")
        for flow in sorted(self.flows.values(), key=lambda flow: flow.flow_id):
            print(f"{flow.flow_id}\t\t{flow.service_time:.3f}")
        
        assert sum([flow.service_time for flow in self.flows.values()]) == self.total_service_time
        print(f"Total service time: {self.total_service_time:.3f}")
        print(f"Total time: {self.time:.3f}")
        print(f"Average utilization: {self.total_service_time / self.time:.3f}")

    def print_bandwidth(self, start_time = 0, end_time = None, interface_speed = None):
        """
        Calculates how much bandwidth was used by each flow in the network in a given time interval.
        """

        if end_time is None:
            end_time = self.time

        duration = end_time - start_time

        # Linearize packets of all the flows into a list and sort them by departure time
        packets = []
        for flow in self.flows.values():
            packets.extend(flow.packets)
        packets.sort(key=lambda packet: packet.departure_time)

        # Calculate the bandwidth used by each flow over the given time interval
        service_time = {}
        for packet in packets:
            if packet.departure_time <= start_time:
                continue
            if packet.start_time >= end_time:
                break
            tx_start_time = max(start_time, packet.start_time)
            tx_end_time = min(end_time, packet.departure_time)

            if packet.flow_id not in service_time:
                service_time[packet.flow_id] = 0
            if tx_end_time - tx_start_time < 0:
                print(f"tx_end_time: {tx_end_time}, tx_start_time: {tx_start_time}")
                raise Exception("Negative service time")
            service_time[packet.flow_id] += tx_end_time - tx_start_time

        # Print the results
        print()
        print("Flow ID\t\tFraction of Bandwidth\tBandwidth (Mbps)")
        avg_bandwidth = 0
        for flow_id, flow_srv_time in service_time.items():
            avg_bandwidth += flow_srv_time/duration*interface_speed
            print(f"{flow_id}\t\t{flow_srv_time/duration:.3f}\t\t\t{flow_srv_time/duration*interface_speed:.3f}")

        print(f"Average used bandwidth: {avg_bandwidth:.3f} Mbps")

        

In [12]:
# Create flows and packets
flow_0 = Flow(flow_id=0).add_packets([Packet(arrival_time=0, size=4), Packet(arrival_time=10, size=1)])
flow_1 = Flow(flow_id=1).add_packets([Packet(arrival_time=0, size=1), Packet(arrival_time=10, size=3)])
flow_2 = Flow(flow_id=2).add_packets([Packet(arrival_time=0, size=2), Packet(arrival_time=12, size=1)])
flow_3 = Flow(flow_id=3).add_packets([Packet(arrival_time=0, size=6)])
flow_4 = Flow(flow_id=4).add_packets([Packet(arrival_time=0, size=3)])

flows = [flow_0, flow_1, flow_2, flow_3, flow_4]


In [13]:
for flow in flows:
    print(flow)

Flow(flow_id=0, packets=['Packet(packet_id=None, flow_id=0, seq_no=0, arrival_time=0, size=4)', 'Packet(packet_id=None, flow_id=0, seq_no=1, arrival_time=10, size=1)'])
Flow(flow_id=1, packets=['Packet(packet_id=None, flow_id=1, seq_no=0, arrival_time=0, size=1)', 'Packet(packet_id=None, flow_id=1, seq_no=1, arrival_time=10, size=3)'])
Flow(flow_id=2, packets=['Packet(packet_id=None, flow_id=2, seq_no=0, arrival_time=0, size=2)', 'Packet(packet_id=None, flow_id=2, seq_no=1, arrival_time=12, size=1)'])
Flow(flow_id=3, packets=['Packet(packet_id=None, flow_id=3, seq_no=0, arrival_time=0, size=6)'])
Flow(flow_id=4, packets=['Packet(packet_id=None, flow_id=4, seq_no=0, arrival_time=0, size=3)'])


In [14]:
wfq = WFQ()
wfq.simulate(flows, weights={0: 1, 1: 1, 2: 1, 3: 1, 4: 1})

Weights: {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0}
t=0.000 ARRIVED Packet(packet_id=None, flow_id=1, seq_no=0, arrival_time=0, size=1)
	Calculating virtual time: 0, 0
	Virtual time from table: R(0)=0
	vrt_start: max(0, 0) = 0
	vrt_finish: 0 + 1 / 1.0 = 1.0
	Updating last_vrt_finish for queue_idx 1: 1.0
	Adding packet to queue 1, queue size: 1
	Increasing backlog. New backlog level: 1.0
	Adding queue_idx 1 with virtual_finish_time 1.0 to the virtual queue
t=0.000 ARRIVED Packet(packet_id=None, flow_id=2, seq_no=0, arrival_time=0, size=2)
	Calculating virtual time: 0, 1.0
	Virtual time from table: R(0)=0
	vrt_start: max(0, 0) = 0
	vrt_finish: 0 + 2 / 1.0 = 2.0
	Updating last_vrt_finish for queue_idx 2: 2.0
	Adding packet to queue 2, queue size: 1
	Increasing backlog. New backlog level: 2.0
	Adding queue_idx 2 with virtual_finish_time 2.0 to the virtual queue
t=0.000 ARRIVED Packet(packet_id=None, flow_id=4, seq_no=0, arrival_time=0, size=3)
	Calculating virtual time: 0, 2.0
	Virtual time 

In [15]:
wfq.print_results()

Pct ID	Flow ID	Seq No	Arrival Time	Start Time	Srv Time	Departure Time	Vrt Start Time	Vrt Finish Time
None	1	0	0.000		0.000		1.000		1.000		0.000		1.000
None	2	0	0.000		1.000		2.000		3.000		0.000		2.000
None	4	0	0.000		3.000		3.000		6.000		0.000		3.000
None	0	0	0.000		6.000		4.000		10.000		0.000		4.000
None	0	1	10.000		10.000		1.000		11.000		4.000		5.000
None	1	1	10.000		11.000		3.000		14.000		2.333		5.333
None	2	1	12.000		14.000		1.000		15.000		2.833		3.833
None	3	0	0.000		15.000		6.000		21.000		0.000		6.000

Flow ID		Service Time
0		5.000
1		4.000
2		3.000
3		6.000
4		3.000
Total service time: 21.000
Total time: 21.000
Average utilization: 1.000


In [16]:

# Create flows and packets from the input tuples:
# (packet_id, flow_id, arrival_time, size)
packet_strings = [
    (1, 1, 0, 2400),
    (2, 3, 0.1, 1200),
    (3, 1, 0.1, 160),
    (4, 4, 0.3, 2200),
    (5, 2, 0.3, 200),
    (6, 4, 0.4, 1700),
    (7, 2, 0.6, 120),
    (8, 4, 0.7, 80),
    (9, 2, 0.8, 400),
    (10, 1, 1.3, 300)]

port_rate = 16  # Mbps

# Per flow bandwidth weights in Mbps
weights_bw = {1: 4, 2: 8, 3: 2, 4: 2}

# Flows, time in ms, size as transmission time in ms 
flows = {}
for packet_string in packet_strings:
    packet_id, flow_id, arrival_time, size_bytes = packet_string
    if flow_id not in flows:
        flows[flow_id] = Flow(flow_id)
    size = (size_bytes * 8 / port_rate) / 1000  # Convert size to transmission time in ms
    flows[flow_id].add_packet(Packet(packet_id=packet_id, flow_id=flow_id, arrival_time=arrival_time, size=size))


wfq = WFQ()
wfq.simulate(list(flows.values()), weights=weights_bw)

Weights: {1: 1.0, 2: 2.0, 3: 0.5, 4: 0.5}
t=0.000 ARRIVED Packet(packet_id=1, flow_id=1, seq_no=0, arrival_time=0, size=1.2)
	Calculating virtual time: 0, 0
	Virtual time from table: R(0)=0
	vrt_start: max(0, 0) = 0
	vrt_finish: 0 + 1.2 / 1.0 = 1.2
	Updating last_vrt_finish for queue_idx 1: 1.2
	Adding packet to queue 1, queue size: 1
	Increasing backlog. New backlog level: 1.0
	Adding queue_idx 1 with virtual_finish_time 1.2 to the virtual queue
t=0.000 STARTED Packet(packet_id=1, flow_id=1, seq_no=0, arrival_time=0, size=1.2) with virtual finish time 1.2
Advancing time: 0.1
t=0.100 ARRIVED Packet(packet_id=2, flow_id=3, seq_no=0, arrival_time=0.1, size=0.6)
	Calculating virtual time: 0.1, 1.0
	Guessed R(self.time): 0.1 = 0 + (0.1 - 0) / 1.0
	Need to check if there were departures in the meantime
	Looking at event in virt queue: 1.2, 1
	No outstanding departures, R(0.1)=0.1
	vrt_start: max(0, 0.1) = 0.1
	vrt_finish: 0.1 + 0.6 / 0.5 = 1.3
	Updating last_vrt_finish for queue_idx 3: 1.3


In [17]:
wfq.print_results()
wfq.print_bandwidth(end_time=1, interface_speed=port_rate)

Pct ID	Flow ID	Seq No	Arrival Time	Start Time	Srv Time	Departure Time	Vrt Start Time	Vrt Finish Time
1	1	0	0.000		0.000		1.200		1.200		0.000		1.200
5	2	0	0.300		1.200		0.100		1.300		0.233		0.283
7	2	1	0.600		1.300		0.060		1.360		0.333		0.363
9	2	2	0.800		1.360		0.200		1.560		0.403		0.503
3	1	1	0.100		1.560		0.080		1.640		1.200		1.280
2	3	0	0.100		1.640		0.600		2.240		0.100		1.300
10	1	2	1.300		2.240		0.150		2.390		1.280		1.430
4	4	0	0.300		2.390		1.100		3.490		0.233		2.433
6	4	1	0.400		3.490		0.850		4.340		2.433		4.133
8	4	2	0.700		4.340		0.040		4.380		4.133		4.213

Flow ID		Service Time
1		1.430
2		0.360
3		0.600
4		1.990
Total service time: 4.380
Total time: 4.380
Average utilization: 1.000

Flow ID		Fraction of Bandwidth	Bandwidth (Mbps)
1		1.000			16.000
Average used bandwidth: 16.000 Mbps
