# HQFBP Examples

This notebook demonstrates the Ham Radio Quick File Broadcasting Protocol (HQFBP) with various examples including text messages, chunked data, compressed data, and file transmission. Each example computes the message length and overhead percentage.


In [198]:
import base64
import cbor2
import gzip
import lzma
import reedsolo
import zlib
from datetime import datetime

# COaP Known mimetypes.
# See https://www.iana.org/assignments/core-parameters/core-parameters.xhtml#content-formats
coap_content_formats = {
    # Text and Standard Web Formats
    "text/plain; charset=utf-8": 0,
    "application/link-format": 40,
    "application/xml": 41,
    "application/octet-stream": 42,
    "application/json": 50,
    "application/cbor": 60,
    
    # SenML (Sensor Measurement Lists)
    "application/senml+json": 110,
    "application/senml-exi": 111,
    "application/senml+cbor": 112,
    "application/sensml+json": 113,
    "application/sensml-exi": 114,
    "application/sensml+cbor": 115,
    
    # Image Formats
    "image/gif": 21,
    "image/jpeg": 22,
    "image/png": 23,
    "image/tiff": 24,
    "image/svg+xml": 30,
    
    # Other Useful IoT Formats
    "application/cose-key": 101,
    "application/cose-key-set": 102,
    "application/or-tecap": 116, # SenML Etch
}

def get_coap_id(mimetype):
    return coap_content_formats.get(mimetype.lower(), None)

HQFBP_CBOR_KEYS = {
    # Core Message Identification
    "Message-Id": 0,

    # Chunking and File Grouping
    "Chunk-Index": 1,
    "Original-Message-Id": 9,
    "Total-Chunks": 10,
    "File-Size": 8,

    # Addressing
    "Src-Callsign": 2,
    "Dst-Callsign": 3,

    # Content Description
    "Content-Format": 4,  # CoAP Content-Format ID
    "Content-Type": 7,    # Full MIME Type (alternative to Content-Format)
    
    # Encoding and Integrity
    "Content-Encoding": 5,
    "Repr-Digest": 6,
    "Content-Digest": 11,
}

def calculate_overhead(original_size, message_size):
    """Calculate the overhead percentage."""
    if original_size == 0:
        return float('inf')
    overhead = ((message_size - original_size) / original_size) * 100
    return round(overhead, 2)

def create_hqfbp_message(
        content=None,
        message_id=1,
        chunk_index=None,
        src_callsign=None,
        content_type="text/plain; charset=utf-8",
        content_encoding=None,
        file_size=None,
        original_message_id=None,
        total_chunks=None,
        repr_digest=None,
        content_digest=None): 
    """Create an HQFBP message with given parameters."""

    # Message-Id is the only mandatory field
    header = {
        HQFBP_CBOR_KEYS['Message-Id']: message_id,
    }

    if src_callsign:
        header[HQFBP_CBOR_KEYS['Src-Callsign']] = src_callsign
    
    # Content-Type is optimized by leveraging known COaP Content-Format
    # or if usual text message
    coap_id = get_coap_id(content_type)
    if coap_id is not None:
        if coap_id == coap_content_formats["text/plain; charset=utf-8"]:
            # This is the default, no need to send
            pass
        else:
            header[HQFBP_CBOR_KEYS['Content-Format']] = coap_id
    else:
        header[HQFBP_CBOR_KEYS['Content-Type']] = str(content_type)

    if chunk_index is not None:
        header[HQFBP_CBOR_KEYS['Chunk-Index']] = chunk_index
    
    if content_encoding:
        header[HQFBP_CBOR_KEYS['Content-Encoding']] = content_encoding
    
    if file_size is not None:
        header[HQFBP_CBOR_KEYS['File-Size']] = file_size
    
    if original_message_id is not None:
        header[HQFBP_CBOR_KEYS['Original-Message-Id']] = original_message_id
    
    if total_chunks is not None:
        header[HQFBP_CBOR_KEYS['Total-Chunks']] = total_chunks
    
    if repr_digest is not None:
        header[HQFBP_CBOR_KEYS['Repr-Digest']] = repr_digest
        
    if content_digest is not None:
        header[HQFBP_CBOR_KEYS['Content-Digest']] = content_digest
    
    # Encode the header and content into a CBOR message
    payload = content.encode('utf-8') if isinstance(content, str) else bytes(content)
    message = cbor2.dumps(header) + payload
    return message


## Example 1: Simple "Hello World" Text from "FOSM-1"

In [199]:
# Example 1: Simple "Hello World" Text from "FOSM-1"
src_callsign = "FOSM-1"
content = "Autour de la terre, je pense aux élèves scrutant l'horizon."
data_len = len(content.encode('utf-8'))
print(f"Initial data length: {data_len} bytes")

print("\n# With callsign")
message = create_hqfbp_message(content, src_callsign=src_callsign)
message_length = len(message)
overhead_percentage = calculate_overhead(data_len, message_length)
text_with_callsign = message

print(f"Message Length: {message_length} bytes")
print(f"Overhead Percentage: {overhead_percentage}%")

print("\n# Without callsign")
message = create_hqfbp_message(content)
message_length = len(message)
overhead_percentage = calculate_overhead(data_len, message_length)

print(f"Message Length: {message_length} bytes")
print(f"Overhead Percentage: {overhead_percentage}%")

Initial data length: 61 bytes

# With callsign
Message Length: 72 bytes
Overhead Percentage: 18.03%

# Without callsign
Message Length: 64 bytes
Overhead Percentage: 4.92%


## Example 2: Lorem Ipsum Text as 2 Chunks

In [200]:
# Example 2: Lorem Ipsum Text as 2 Chunks
lorem_ipsum = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."

# Split the text into 2 chunks
chunk1 = lorem_ipsum[:len(lorem_ipsum)//2]
chunk2 = lorem_ipsum[len(lorem_ipsum)//2:]

# Create messages for each chunk
chunk1_message = create_hqfbp_message(chunk_index=0, src_callsign=src_callsign, content=chunk1, file_size=len(lorem_ipsum.encode('utf-8')), total_chunks=2, original_message_id=1)
chunk2_message = create_hqfbp_message(chunk_index=1, src_callsign=src_callsign, content=chunk2, file_size=len(lorem_ipsum.encode('utf-8')), total_chunks=2, original_message_id=1)

# Calculate total message length and overhead
total_message_length = len(chunk1_message) + len(chunk2_message)
overhead_percentage = calculate_overhead(len(lorem_ipsum.encode('utf-8')), total_message_length)

print(f"Total Message Length: {total_message_length} bytes")
print(f"Overhead Percentage: {overhead_percentage}%")

Total Message Length: 487 bytes
Overhead Percentage: 9.44%


## Example 3.a: Lorem Ipsum Text with `lzma` Compression"

In [201]:
# Example 3: Lorem Ipsum Text with `lzma` Compression
compressed_content = lzma.compress(lorem_ipsum.encode('utf-8'))

compressed_message = create_hqfbp_message(
    src_callsign=src_callsign,
    content=compressed_content,
    content_encoding="lzma")
message_length = len(compressed_message)
overhead_percentage = calculate_overhead(len(lorem_ipsum.encode('utf-8')), message_length)

print(f"Compressed Message Length: {message_length} bytes")
print(f"Overhead Percentage: {overhead_percentage}%")

Compressed Message Length: 401 bytes
Overhead Percentage: -9.89%


## Example 3.b: Lorem Ipsum Text with `gzip` compression

In [202]:
# Example 3: Lorem Ipsum Text with `lzma` Compression
compressed_content = gzip.compress(lorem_ipsum.encode('utf-8'))

compressed_message = create_hqfbp_message(
    src_callsign=src_callsign,
    content=compressed_content,
    content_encoding="gzip")
message_length = len(compressed_message)
overhead_percentage = calculate_overhead(len(lorem_ipsum.encode('utf-8')), message_length)

print(f"Compressed Message Length: {message_length} bytes")
print(f"Overhead Percentage: {overhead_percentage}%")

Compressed Message Length: 300 bytes
Overhead Percentage: -32.58%


## Example 3.c: Lorem Ipsum text with `gzip` and `rs(255,233)` encodings

In [203]:
original_text = "Hello World! This is a test message for RS(255,233) encoding.".encode('utf-8')

print("### Header not covered by rs(255, 233)")
compressed_content = gzip.compress(original_text)
print(len(compressed_content))
# 1. Initialize the Reed-Solomon object
n = 255 #(total block length)
k = 233 #(message length)
# n - k = 22 parity bytes
rs = reedsolo.RSCodec(n-k)
padding = b'\x00' * (k - len(compressed_content))
padded_compressed_content = compressed_content + padding
assert(len(padded_compressed_content) == k)
rs_content = rs.encode(padded_compressed_content)

hqfbp_message = create_hqfbp_message(
    src_callsign=src_callsign,
    content=rs_content,
    content_encoding=["gzip", f"rs({n},{k})"])
message_length = len(hqfbp_message)
overhead_percentage = calculate_overhead(len(original_text), message_length)


print(f"Compressed Message Length: {message_length} bytes")
print(f"Overhead Percentage: {overhead_percentage}%")


print("### Headers included in rs(255, 233)")
compressed_content = gzip.compress(original_text)
print(len(compressed_content))
not_rs_message = create_hqfbp_message(
    src_callsign=src_callsign,
    content=compressed_content,
    content_encoding=["gzip", f"rs({n},{k})"])
rs = reedsolo.RSCodec(n-k)
padding = b'\x00' * (k - len(not_rs_message))
padded_not_rs_message = not_rs_message + padding
assert(len(padded_not_rs_message) == k)
rs_message = rs.encode(padded_not_rs_message)


message_length = len(rs_message)
overhead_percentage = calculate_overhead(len(original_text), message_length)

print(f"Compressed Message Length: {message_length} bytes")
print(f"Overhead Percentage: {overhead_percentage}%")

### Header not covered by rs(255, 233)
79
Compressed Message Length: 285 bytes
Overhead Percentage: 367.21%
### Headers included in rs(255, 233)
79
Compressed Message Length: 255 bytes
Overhead Percentage: 318.03%


## Example 4: PNG File Transmission

In [204]:
# Example 4: PNG File Transmission
# For demonstration, we'll create a simple PNG file in memory
import io
from PIL import Image

# Create a simple image
img = Image.new('RGB', (640, 480), color = 'red')
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
img_byte_arr = img_bytes.getvalue()

# Create HQFBP message for the PNG file
png_message = create_hqfbp_message(src_callsign=src_callsign, content=img_byte_arr, content_type="image/png")
message_length = len(png_message)
overhead_percentage = calculate_overhead(len(img_byte_arr), message_length)

print(f"PNG Message Length: {message_length} bytes")
print(f"Overhead Percentage: {overhead_percentage}%")

PNG Message Length: 1958 bytes
Overhead Percentage: 0.67%


## Example 5: PNG File Transmission with `lzma` compression

In [205]:
# Example 5: PNG File Transmission with compression
# For demonstration, we'll create a simple PNG file in memory

compressed_img_content = lzma.compress(img_byte_arr)

# Create HQFBP message for the PNG file
png_message = create_hqfbp_message(
    src_callsign=src_callsign,
    content=compressed_img_content,
    content_type="image/png",
    content_encoding='lzma'
    )
message_length = len(png_message)
overhead_percentage = calculate_overhead(len(img_byte_arr), message_length)

print(f"PNG Message Length: {message_length} bytes")
print(f"Overhead Percentage: {overhead_percentage}%")

PNG Message Length: 227 bytes
Overhead Percentage: -88.33%


## Example 6: JPEG File Transfer

In [206]:
# load image "img_0.jpg"
with open("img_0.jpg", "rb") as f:
    img_byte_arr = f.read()
    print("Original Image Size: {} bytes".format(len(img_byte_arr)))
    print("# Jpeg LZMA")
    compressed_img_content = lzma.compress(img_byte_arr)
    # Create HQFBP message for the JPG file
    png_message = create_hqfbp_message(
        src_callsign=src_callsign,
        content=compressed_img_content,
        content_type="image/jpeg",
        content_encoding='lzma'
        )
    message_length = len(png_message)
    overhead_percentage = calculate_overhead(len(img_byte_arr), message_length)

    print(f"Jpeg Lzma Message Length: {message_length} bytes")
    print(f"Overhead Percentage: {overhead_percentage}%")

    print("# Jpeg Gzip")
    compressed_img_content = gzip.compress(img_byte_arr)
    # Create HQFBP message for the JPG file
    png_message = create_hqfbp_message(
        src_callsign=src_callsign,
        content=compressed_img_content,
        content_type="image/jpeg",
        content_encoding='gzip'
        )
    message_length = len(png_message)
    overhead_percentage = calculate_overhead(len(img_byte_arr), message_length)

    print(f"Jpeg Gzip Message Length: {message_length} bytes")
    print(f"Overhead Percentage: {overhead_percentage}%")

Original Image Size: 30629 bytes
# Jpeg LZMA
Jpeg Lzma Message Length: 25267 bytes
Overhead Percentage: -17.51%
# Jpeg Gzip
Jpeg Gzip Message Length: 25621 bytes
Overhead Percentage: -16.35%


# Decoding

In [207]:
buf = io.BytesIO(text_with_callsign)
headers = cbor2.load(buf) # load cbor headers from bytes
print(headers)
text = buf.read() # remaining bytes are the text
print(text.decode())

{0: 1, 2: 'FOSM-1'}
Autour de la terre, je pense aux élèves scrutant l'horizon.


In [208]:
assert(len(rs_message) == 255)

# Simulate corruption
# Try to corrupt 1 byte (it could be up to 11)
# i=11 is right at the header in a bad position
# i=10 is right at the header in the callsign part
# i>30 is in the gzip content
i = 50
rs_message[i] = rs_message[i] & 0x00

buf = io.BytesIO(rs_message)
headers = cbor2.load(buf) # load cbor headers from bytes
print(buf.tell())
print(headers)
# 1. Initialize the Reed-Solomon object
n = 255 #(total block length)
k = 233 #(message length)
# n - k = 22 parity bytes
rs = reedsolo.RSCodec(n-k)
repaired_msg = rs.decode(rs_message)[0]
# print(repaired_msg)
buf = io.BytesIO(repaired_msg)
headers = cbor2.load(buf) # load cbor headers from bytes
print(headers)
gzip_text = buf.read() # remaining bytes are the compressed text
text = gzip.decompress(gzip_text)
print(text.decode())

30
{0: 1, 2: 'FOSM-1', 5: ['gzip', 'rs(255,233)']}
{0: 1, 2: 'FOSM-1', 5: ['gzip', 'rs(255,233)']}
Hello World! This is a test message for RS(255,233) encoding.
