In [None]:
import os
import re
import time
import json
import base64
import serial
import hashlib

In [None]:
COM_PORT = "COM13"
BAUD_RATE = 115200

serial_port = serial.Serial()
serial_port.baudrate = BAUD_RATE
serial_port.port = COM_PORT
serial_port.timeout = 10
serial_port.open()

In [None]:
def reset_board(serial_port_obj):
    pythonInject = [
        "import machine",
        "machine.reset()",
    ]
    # interrupt the currently running code
    serial_port_obj.write(b"\x03")  # Ctrl+C
    serial_port_obj.write(b"\x03")  # Ctrl+C
    serial_port_obj.write(b"\x01")  # switch to raw REPL mode & inject code
    for code in pythonInject:
        serial_port_obj.write(bytes(code + "\n", "utf-8"))
    serial_port_obj.write(b"\x04")  # exit raw REPL and run injected code
    serial_port_obj.close()
    time.sleep(2)


reset_board(serial_port_obj=serial_port)

In [None]:
serial_port.open()

In [None]:
# request available space on the disk if requested by sending a json with request_disc_available_space as true
def request_disc_available_space(serial_port) -> tuple[int, int]:
    header = {
        "request_disc_available_space": True,
    }
    message = (json.dumps(header) + "\n").encode()
    serial_port.write(message)
    while True:
        try:
            response = serial_port.readline().decode().strip()
            print("Device -> PC:", response)
            if "Info: Available space" in response:
                # return format "Device -> PC: Info: Available space on the disk: 1228800 bytes, total space: 1441792 bytes."
                match = re.search(r"(\d+) bytes, total space: (\d+) bytes", response)
                if match:
                    available_space = int(match.group(1))
                    total_space = int(match.group(2))
                    return available_space, total_space
        except Exception as e:
            print(f"Error: {e}")
            break
    return 0, 0


available_space, total_space = request_disc_available_space(serial_port)

In [None]:
# request all files stat on the disk if requested by sending a json with request_dir_list as true
def request_dir_list(serial_port):
    header = {
        "request_dir_list": True,
    }
    message = (json.dumps(header) + "\n").encode()
    serial_port.write(message)
    while True:
        try:
            response = serial_port.readline().decode().strip()
            # print("Device -> PC:", response)
            if "Info: Directory list" in response:
                # return format "Device -> PC: Info: Directory list: "start of a directory"
                match = re.search(r"Info: Directory list: (.*)$", response)
                if match:
                    output = json.loads(match.group(1))
                    return output
        except Exception as e:
            print(f"Error: {e}")
            break
    return {}


file_stats = request_dir_list(serial_port)
# the last 4th field in the list is the file size
file_stats_size_only = {file: stat[-4] for file, stat in file_stats.items()}
file_stats_size_only

In [None]:
# request avilable memory on the device if requested by sending a json with request_memory as true
def request_memory(serial_port) -> tuple[int, int]:
    header = {
        "request_memory": True,
    }
    message = (json.dumps(header) + "\n").encode()
    serial_port.write(message)
    while True:
        try:
            response = serial_port.readline().decode().strip()
            print("Device -> PC:", response)
            if "Info: free Memory" in response:
                # return format "Device -> PC: Info: free Memory 218864 bytes, total Memory 233024 bytes."
                match = re.search(r"(\d+) bytes, total Memory (\d+) bytes", response)
                if match:
                    free_memory = int(match.group(1))
                    total_memory = int(match.group(2))
                    return free_memory, total_memory
        except Exception as e:
            print(f"Error: {e}")
            break
    return 0, 0


free_memory, total_memory = request_memory(serial_port)

In [None]:
def remove_file(serial_port, file_path) -> bool:
    header = {
        "remove_file_request": True,
        "filename": file_path,
    }
    message = (json.dumps(header) + "\n").encode()
    serial_port.write(message)
    while True:
        try:
            response = serial_port.readline().decode().strip()
            print("Device -> PC:", response)
            if "Success: Removed file" in response:
                return True
            if "Error: " in response:
                return False
        except Exception as e:
            print(f"Error: {e}")
            break
    return False

status = remove_file(serial_port, "test.pdf")
print(status)

In [None]:
def restart_file_transfer(serial_port):
    header = {
        "restart": True,
    }
    message = (json.dumps(header) + "\n").encode()
    serial_port.write(message)
    while True:
        try:
            response = serial_port.readline().decode().strip()
            print("Device -> PC:", response)
            if "Success: Restarting file transfer" in response:
                return True
            if "Error: " in response:
                return False
        except Exception as e:
            print(f"Error: {e}")
            break
    return False


restart_file_transfer(serial_port)

In [None]:
def reboot(serial_port) -> bool:
    header = {
        "reset": True,
    }
    message = (json.dumps(header) + "\n").encode()
    serial_port.write(message)
    while True:
        try:
            response = serial_port.readline().decode().strip()
            print("Device -> PC:", response)
            if "Info: Firmware update complete." in response:
                return True
            if "Error: " in response:
                return False
        except Exception as e:
            print(f"Error: {e}")
            break
    return False

In [None]:
def send_file(serial_port, file_path) -> bool:
    restart_file_transfer(serial_port)
    available_space, _ = request_disc_available_space(serial_port)
    free_memory, _ = request_memory(serial_port)
    CHUNK_SIZE = int(free_memory * 0.1)

    # Read the file to be transferred
    with open(file_path, "rb") as f:
        file_content = f.read()
    file_size = len(file_content)
    if file_size > available_space:
        print(f"Error: Not enough space on the device to transfer {file_path}")
        return False
    # resize CHUNK_SIZE to the power of 2
    CHUNK_SIZE = 2 ** (CHUNK_SIZE.bit_length() - 1)
    if file_size < CHUNK_SIZE:
        CHUNK_SIZE = file_size
    # Split the file into chunks
    chunks = [file_content[i : i + CHUNK_SIZE] for i in range(0, file_size, CHUNK_SIZE)]
    print(
        f"file size {file_size} bytes, select chunk size of {CHUNK_SIZE} bytes, {len(chunks)} chunks total"
    )

    # Send header message with the filename
    default_header = {
        "filename": os.path.basename(file_path),
        "finish": False,
    }

    # Send each chunk as a JSON message
    for i, chunk in enumerate(chunks):
        # Encode the chunk in base64
        chunk_b64 = base64.b64encode(chunk).decode()
        # Create a JSON message with the chunk info
        chunk_msg = default_header.copy()
        chunk_msg["chunk_size"] = str(len(chunk))
        chunk_msg["chunk_data_b64"] = chunk_b64
        # if this is the last chunk, set finish to True
        if i == len(chunks) - 1:
            checksum = hashlib.sha256(file_content).hexdigest()
            chunk_msg["checksum"] = checksum
            chunk_msg["size"] = file_size
            chunk_msg["finish"] = True
            # print("Sent finish message")
        message = (json.dumps(chunk_msg) + "\n").encode()
        serial_port.write(message)
        print(f"PC -> Device: Sent chunk {i} of size {len(chunk)} bytes")
        # read the response from the device, block reading
        while True:
            try:
                response = serial_port.readline().decode().strip()
                print("Device -> PC:", response)
                if "Success: finished receiving" in response:
                    return True
                if "Success: Received chunk" in response:
                    break
                if "Error: " in response:
                    return False
            except Exception as e:
                print(f"Error: {e}")
                break
    return False

In [None]:
send_file(serial_port, "bootloader.py")

In [None]:
send_file(serial_port, "test.pdf")