In [27]:
import re
import codecs
from datetime import datetime, timedelta

In [28]:
def fromDigits(digits: list[int], b:int)->int:
    """Compute the number given by digits in base b."""
    if b <= 1:
        raise ValueError("Base must be greater than 1")
    n = 0
    for d in digits:
        n = b * n + d
    return n

In [29]:
"""
| Part            | Meaning                                             |
| --------------- | --------------------------------------------------- |
| `b(?P<q>[\'"])` | Capture opening quote (`'` or `"`)                  |
| `.*?`           | Lazily capture everything inside (including quotes) |
| `(?P=q)`        | Match the **same quote** that opened the bytes      |
| `:`             | Literal colon                                       |
"""
log_pattern = (
    r'(?P<datetime>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - '
    r'(?P<communicator>\w+) - '
    r'(?P<level>\w+) - '
    r'Sent message (?P<msg_counter>\d+) '
    r'in digits \[(?P<msg_counter_digits>[0-9,\s]+)\] '
    r'in bytes b(?P<q>[\'"])(?P<msg_counter_bytes>.*?)(?P=q): '
    r'(?P<details>.*)'
)
def parse_sender_log_file(file):
    cleaned_logs = []
    items = []
    with open(file, 'r') as f:
        for line in f:
            match = re.match(log_pattern, line)
            if match:
                timestamp = match.group(1)
                communicator = match.group(2)
                log_level = match.group(3)
                msg_counter = match.group(4)
                msg_counter_digits = match.group(5)
                msg_counter_bytes = match.group(6)
                message = match.group(7)

                # Filter out DEBUG messages
                if log_level != 'DEBUG':
                    timestamp = datetime.strptime(timestamp,'%Y-%m-%d %H:%M:%S,%f')
                    # new test
                    if not items:
                        first_timestamp = timestamp
                        items.append(                                
                                {'timestamp': timestamp, 'communicator': communicator, 
                                'level': log_level, 'msg_counter' : msg_counter,
                                'msg_counter_digits': msg_counter_digits,'msg_counter_bytes': msg_counter_bytes,
                                'message': message
                                }
                            )
                    # there are already some tests
                    else:
                        # 2 minutes passed since the first package has been sent
                        if ((first_timestamp + timedelta(seconds=120)) < timestamp): 
                            cleaned_logs.append(items)
                            items = []
                            first_timestamp = timestamp
                            items.append(                                
                                {'timestamp': timestamp, 'communicator': communicator, 
                                'level': log_level, 'msg_counter' : msg_counter,
                                'msg_counter_digits': msg_counter_digits,'msg_counter_bytes': msg_counter_bytes,
                                'message': message
                                }
                            )
                        # less than 2 minutes since the first package
                        else:
                            items.append(                                
                                {'timestamp': timestamp, 'communicator': communicator, 
                                'level': log_level, 'msg_counter' : msg_counter,
                                'msg_counter_digits': msg_counter_digits,'msg_counter_bytes': msg_counter_bytes,
                                'message': message
                                })
            else:
                print("Log entry does not match the expected format:", line)
        return cleaned_logs


# Clean the raw logs
sender_cleaned_logs = parse_sender_log_file("../log/sender/2025-12-10.log")
# Not considering these tests, used for debugging
"""
sender_cleaned_logs.pop(7)
sender_cleaned_logs.pop(1)
sender_cleaned_logs.pop(0)
"""
# Print cleaned logs
for log in sender_cleaned_logs:
    print(len(log),log)

Log entry does not match the expected format:                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         2025-12-10 14:33:29,020 - sender - INFO - Sent message 1 in digits [1] in bytes b'\x01': hello

Log entry does not match the expected format:                                                                                                                                                                                                                                                                                                                                     

In [35]:
def parse_receiver_details(details, message):
    first_colons = details.find(":")
    last_colons = details.rfind(":")
    print(first_colons,last_colons)
    end_message = ": hello',"
    i = details.find(end_message)
    rest_details = details[(i+len(end_message)):].split(",")
    if details.startswith('b\'counter:') and (message == "valid msg"):
        print(details)
        print(message)
        counter_bytes = details[first_colons+1:last_colons]
        real_bytes = codecs.decode(counter_bytes.encode(), "unicode_escape").encode('latin1')
        print(counter_bytes,real_bytes)
        print(list(real_bytes))
        l = list(real_bytes)
        counter = fromDigits(l, 255)
        print(f"counter: {list(real_bytes)} : {counter}")
    else:
        return 
    return counter, rest_details
        

In [36]:
log_pattern = r'(?P<datetime>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) - (?P<communicator>\w+) - (?P<level>\w+) - (?P<message>.*): \((?P<details>.*)\)'
def parse_receiver_log_file(file):
    cleaned_logs = []
    items = []
    with open(file, 'r') as f:
        for line in f:
            match = re.match(log_pattern, line)
            if match:
                timestamp = match.group(1)
                communicator = match.group(2)
                log_level = match.group(3)
                message = match.group(4)
                details = match.group(5)

                # Filter out DEBUG messages
                if log_level != 'DEBUG':
                    timestamp = datetime.strptime(timestamp,'%Y-%m-%d %H:%M:%S,%f')
                    details = parse_receiver_details(details, message)
                    # new test
                    if not items:
                        first_timestamp = timestamp
                        items.append({
                            'timestamp': timestamp, 'communicator': communicator, 
                            'level': log_level, 'message': message, 'details': details,
                            #'counter': counter
                        })
                    # there are already some tests
                    else:
                        # 2 minutes passed since the first package has been sent
                        if ((first_timestamp + timedelta(seconds=120)) < timestamp): 
                            cleaned_logs.append(items)
                            items = []
                            first_timestamp = timestamp
                            items.append({
                                        'timestamp': timestamp, 'communicator': communicator, 
                                        'level': log_level, 'message': message, 'details': details, 
                                         #'counter': counter
                                         })
                        # less than 2 minutes since the first package
                        else:
                            items.append(
                                {'timestamp': timestamp, 'communicator': communicator, 
                                 'level': log_level, 'message': message, 'details': details,
                                 #'counter': counter
                                 })
            else:
                print("Log entry does not match the expected format:", line)
        return cleaned_logs


# Clean the raw logs
receiver_cleaned_logs = parse_receiver_log_file("../log/hfreceiver/2025-12-10.log")
# Not considering these tests, used for debugging
"""
receiver_cleaned_logs.pop(7)
receiver_cleaned_logs.pop(1)
receiver_cleaned_logs.pop(0)
"""
# Print cleaned logs
for log in receiver_cleaned_logs:
    print(len(log),log)


Log entry does not match the expected format: 2025-12-10 13:54:02,725 - hfreceiver - DEBUG - debug message

Log entry does not match the expected format: 2025-12-10 13:54:02,726 - hfreceiver - INFO - info message


Log entry does not match the expected format: 2025-12-10 13:54:02,726 - hfreceiver - ERROR - error message

Log entry does not match the expected format: 2025-12-10 13:54:02,726 - hfreceiver - CRITICAL - critical message

9 14
b'counter:\x01: hello', 16, -30, -87, 10, 0
valid msg
\x01 b'\x01'
[1]
counter: [1] : 1
9 14
b'counter:\x02: hello', 16, -31, -84, 9, 0
valid msg
\x02 b'\x02'
[2]
counter: [2] : 2
9 14
b'counter:\x03: hello', 16, -31, -84, 10, 0
valid msg
\x03 b'\x03'
[3]
counter: [3] : 3
9 14
b'counter:\x04: hello', 16, -30, -86, 9, 0
valid msg
\x04 b'\x04'
[4]
counter: [4] : 4
9 14
b'counter:\x05: hello', 16, -31, -84, 9, 0
valid msg
\x05 b'\x05'
[5]
counter: [5] : 5
9 14
b'counter:\x06: hello', 16, -30, -86, 10, 0
valid msg
\x06 b'\x06'
[6]
counter: [6] : 6
9 14
b'c

In [32]:
from math import ceil
SF = 7
BW = 125000
PL = 34
H = 0
DE = 0
CR = 1
n_preamble = 8

def calulcate_airtime(SF, BW, PL, H, DE, CR, n_preamble):
    Tsym = 2**SF / BW
    time_preamble = (n_preamble + 4.25)  * Tsym
    payloadSymbNb = 8 + max(ceil((8*PL - 4*SF + 28 + 16 - 20*H) / (4*(SF - 2*DE))) * (CR + 4), 0)
    time_payload = payloadSymbNb * Tsym
    time_packet = time_preamble + time_payload
    return time_packet

    
calulcate_airtime(SF, BW, PL, H, DE, CR, n_preamble)

0.077056