In [1]:
import serial
import time
from IPython.display import display, clear_output
from cryptography.hazmat.primitives.ciphers.aead import AESCCM

In [2]:
SERIAL_PORT = "/dev/cu.usbmodem1101"
BAUD_RATE = 115200

ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=0.1)
time.sleep(2)               # give the ESP time to reboot+enumerate
ser.reset_input_buffer()    # drop any boot messages

In [3]:
def send_ping_and_wait(timeout_s=5.0):
    ser.write(b"ping\n")
    deadline = time.time() + timeout_s
    while time.time() < deadline:
        raw = ser.readline()
        if not raw:
            continue
        line = raw.decode(errors="ignore").strip()
        # debug: print any other lines (log lines) you happen to see
        if line.startswith("ping:"):
            print("OK:", line)
            return
        else:
            print("  log >", line)
    raise TimeoutError("no ping reply in time")

In [18]:
def send_log_dump_and_wait(timeout_s=60.0):
    """
    Send the dump command, wait for a line beginning with "log_dump:",
    parse the hex data that follows, and return it as bytes.
    Any other lines (ESP_LOGx… etc.) are printed and skipped.
    """
    # fire off the dump command
    ser.write(b"log_dump\n")
    deadline = time.time() + timeout_s

    audit_logs = []
    while time.time() < deadline:
        raw = ser.readline()
        if not raw:
            # no data yet
            continue

        line = raw.decode(errors="ignore").strip()
        # is this our dump line?
        if line.startswith("log_dump:"):
            # extract the hex chunk after the colon
            hexstr = line.split(":", 1)[1]
            try:
                data = bytes.fromhex(hexstr)
                audit_logs.append(data)
            except ValueError:
                raise ValueError(f"Invalid hex in dump: {hexstr!r}")

        if line.startswith("log_dump_end:"):
            print(f"footer: received {len(audit_logs)} logs.")
            return audit_logs
            
        # otherwise it's just a normal log message—print it and keep waiting
        print("  log >", line)

    print(f"timeout: received {len(audit_logs)} logs.")
    return audit_logs


In [19]:
def get_key(timeout_s=10.0):
    """
    Send the dump command, wait for a line beginning with "log_dump:",
    parse the hex data that follows, and return it as bytes.
    Any other lines (ESP_LOGx… etc.) are printed and skipped.
    """
    # fire off the dump command
    ser.write(b"get_key\n")
    deadline = time.time() + timeout_s

    while time.time() < deadline:
        raw = ser.readline()
        if not raw:
            # no data yet
            continue

        line = raw.decode(errors="ignore").strip()
        # is this our dump line?
        if line.startswith("get_key:"):
            # extract the hex chunk after the colon
            hexstr = line.split(":", 1)[1]
            try:
                key = bytes.fromhex(hexstr)
                return key
            except ValueError:
                raise ValueError(f"Invalid hex in dump: {hexstr!r}")

        if line.startswith("get_key:"):
            print(f"footer: received {len(audit_logs)} logs.")
            return audit_logs

        # otherwise it's just a normal log message—print it and keep waiting
        print("  log >", line)

    raise TimeoutError("did not get response in time")

In [27]:
def reset(timeout_s=10.0):
    """
    Send the dump command, wait for a line beginning with "log_dump:",
    parse the hex data that follows, and return it as bytes.
    Any other lines (ESP_LOGx… etc.) are printed and skipped.
    """
    # fire off the dump command
    ser.write(b"reset\n")
    deadline = time.time() + timeout_s

    while time.time() < deadline:
        raw = ser.readline()
        if not raw:
            # no data yet
            continue

        line = raw.decode(errors="ignore").strip()
        # is this our dump line?
        if line.startswith("reset:ok"):
            print("reset success")

        if line.startswith("reset:fail"):
            print("reset fail")

        # otherwise it's just a normal log message—print it and keep waiting
        print("  log >", line)

    raise TimeoutError("did not get response in time")

In [39]:
def verify_logs(audit_logs, key):
    """
    structure log: 
        - first 12 bytes = nonce
        - next 4 bytes = ad (timestamp)
        - next x bytes = ciphertext
        - last 16 bytes = prev_tag
    """

    aesccm = AESCCM(key, tag_length=16)
    results = []
    expected_nonce = None
    NONCE_LEN = 12

    for idx, record in enumerate(audit_logs):
        if len(record) < NONCE_LEN + 4 + 16:
            raise ValueError(f"Record {idx} too short")

        nonce      = record[:NONCE_LEN]
        aad        = record[NONCE_LEN:NONCE_LEN+4]
        ciphertext = record[NONCE_LEN+4:-16]
        auth_tag   = record[-16:]                # full tag

        # compare the 12-byte nonce against the 12-byte prefix of the previous tag
        if expected_nonce is not None and nonce != expected_nonce:
            raise ValueError(f"Chain broken at record {idx}")

        # decrypt with nonce (12B), ciphertext+tag, and aad
        plaintext = aesccm.decrypt(nonce, ciphertext + auth_tag, aad)

        # schedule next expected_nonce = first 12 bytes of this tag
        expected_nonce = auth_tag[:NONCE_LEN]

        timestamp = int.from_bytes(aad, "little")
        results.append((timestamp, plaintext))

    return results

In [28]:
reset()

  log > D (1130673) ringbuf_flash: ringbuf_flash_write writing log...[0m
  log > D (1130673) ringbuf_flash: WRITING hdr @ offset=55203 len=88 crc=0x74570880[0m
  log > D (1130673) ringbuf_flash: Saving ringbuf metadata...[0m
  log > D (1130673) nvs: nvs_open_from_partition rb_log 1[0m
  log > D (1130673) nvs: nvs_set_blob log_secure 20[0m
  log > D (1130673) nvs: nvs_close 728[0m
  log > D (1130673) nvs: nvs_open_from_partition log_secure_nvs 1[0m
  log > D (1130673) nvs: nvs_set_blob meta 44[0m
  log > D (1130673) nvs: nvs_close 729[0m
  log > D (1135673) log_secure: logging secure. ts=1135673[0m
  log > D (1135673) ringbuf_flash: ringbuf_flash_write writing log...[0m
  log > D (1135673) ringbuf_flash: WRITING hdr @ offset=55303 len=88 crc=0x273373089[0m
  log > D (1135673) ringbuf_flash: Saving ringbuf metadata...[0m
  log > D (1135673) nvs: nvs_open_from_partition rb_log 1[0m
  log > D (1135673) nvs: nvs_set_blob log_secure 20[0m
  log > D (1135673) nvs: nvs_close 730

TimeoutError: did not get response in time

In [29]:
send_ping_and_wait()

  log > [0;32mI (1249923) main: received command. cmd=ping[0m
OK: ping:pong


In [30]:
key = get_key()

  log > D (1250723) log_secure: logging secure. ts=1250723[0m
  log > D (1250723) ringbuf_flash: ringbuf_flash_write writing log...[0m
  log > D (1250723) ringbuf_flash: WRITING hdr @ offset=300 len=88 crc=0x3230998246[0m
  log > D (1250723) ringbuf_flash: Saving ringbuf metadata...[0m
  log > D (1250723) nvs: nvs_open_from_partition rb_log 1[0m
  log > D (1250723) nvs: nvs_set_blob log_secure 20[0m
  log > D (1250723) nvs: nvs_close 780[0m
  log > D (1250723) nvs: nvs_open_from_partition log_secure_nvs 1[0m
  log > D (1250723) nvs: nvs_set_blob meta 44[0m
  log > D (1250723) nvs: nvs_close 781[0m
  log > [0;32mI (1255393) main: received command. cmd=get_key[0m
  log > D (1255393) nvs: nvs_open_from_partition log_secure_nvs 1[0m
  log > D (1255393) nvs: nvs_get_str_or_blob key_fetched[0m
  log > D (1255393) nvs: nvs_set_blob key_fetched 2[0m


In [41]:
audit_logs = send_log_dump_and_wait()

  log > D (1265723) log_secure: logging secure. ts=1265723[0m
  log > D (1265723) ringbuf_flash: ringbuf_flash_write writing log...[0m
  log > D (1265723) ringbuf_flash: WRITING hdr @ offset=600 len=88 crc=0x2227904026[0m
  log > D (1265723) ringbuf_flash: Saving ringbuf metadata...[0m
  log > D (1265723) nvs: nvs_open_from_partition rb_log 1[0m
  log > D (1265723) nvs: nvs_set_blob log_secure 20[0m
  log > D (1265723) nvs: nvs_close 792[0m
  log > D (1265723) nvs: nvs_open_from_partition log_secure_nvs 1[0m
  log > D (1265723) nvs: nvs_set_blob meta 44[0m
  log > D (1265723) nvs: nvs_close 793[0m
  log > D (1270723) log_secure: logging secure. ts=1270723[0m
  log > D (1270723) ringbuf_flash: ringbuf_flash_write writing log...[0m
  log > D (1270723) ringbuf_flash: WRITING hdr @ offset=700 len=88 crc=0x2520266188[0m
  log > D (1270723) ringbuf_flash: Saving ringbuf metadata...[0m
  log > D (1270723) nvs: nvs_open_from_partition rb_log 1[0m
  log > D (1270723) nvs: nvs_set

In [49]:
results = verify_logs(audit_logs, key)
for tx, message in results: 
    print(f"{tx}: {message}")

1265723: b'   1265723 [main]: system running smoothly. counter=253\n'
1270723: b'   1270723 [main]: system running smoothly. counter=254\n'
1275723: b'   1275723 [main]: system running smoothly. counter=255\n'
1280723: b'   1280723 [main]: system running smoothly. counter=256\n'
1285723: b'   1285723 [main]: system running smoothly. counter=257\n'
1290723: b'   1290723 [main]: system running smoothly. counter=258\n'
1295723: b'   1295723 [main]: system running smoothly. counter=259\n'
1300723: b'   1300723 [main]: system running smoothly. counter=260\n'
1305723: b'   1305723 [main]: system running smoothly. counter=261\n'
1310723: b'   1310723 [main]: system running smoothly. counter=262\n'
1315723: b'   1315723 [main]: system running smoothly. counter=263\n'
1320723: b'   1320723 [main]: system running smoothly. counter=264\n'
1325723: b'   1325723 [main]: system running smoothly. counter=265\n'
1330723: b'   1330723 [main]: system running smoothly. counter=266\n'
1335723: b'   133572

In [51]:
import copy
from cryptography.exceptions import InvalidTag

def test_tampering(audit_logs, key):
    NONCE_LEN = 12
    TAG_LEN   = 16

    def run_test(name, tamper_fn):
        logs = copy.deepcopy(audit_logs)
        tamper_fn(logs)
        try:
            _ = verify_logs(logs, key)
            print(f"❌ {name}: No tampering detected!")
        except ValueError as e:
            print(f"✅ {name}: Caught chain‐break: {e}")
        except InvalidTag:
            print(f"✅ {name}: Caught authentication failure (InvalidTag)")

    # 1) Flip a byte in ciphertext of record #2
    def tamper_cipher(logs):
        rec = bytearray(logs[2])
        rec[NONCE_LEN + 4 + 5] ^= 0xFF
        logs[2] = bytes(rec)

    # 2) Flip a bit in the AAD (timestamp) of record #3
    def tamper_aad(logs):
        rec = bytearray(logs[3])
        rec[NONCE_LEN + 1] ^= 0x01
        logs[3] = bytes(rec)

    # 3) Flip a byte in the auth‐tag of record #4
    def tamper_tag(logs):
        rec = bytearray(logs[4])
        rec[-1] ^= 0xAA
        logs[4] = bytes(rec)

    # 4) Flip a byte in the nonce of record #5 to force a chain break
    def tamper_nonce(logs):
        rec = bytearray(logs[5])
        rec[0] ^= 0x11
        logs[5] = bytes(rec)

    run_test("Ciphertext tamper", tamper_cipher)
    run_test("AAD tamper",        tamper_aad)
    run_test("Tag tamper",        tamper_tag)
    run_test("Nonce tamper",      tamper_nonce)

# Run it
test_tampering(audit_logs, key)

✅ Ciphertext tamper: Caught authentication failure (InvalidTag)
✅ AAD tamper: Caught authentication failure (InvalidTag)
✅ Tag tamper: Caught authentication failure (InvalidTag)
✅ Nonce tamper: Caught chain‐break: Chain broken at record 5


In [53]:
def test_insertion(audit_logs, key):
    logs = list(audit_logs)
    # Pick an existing record and insert it at position 2
    logs.insert(2, logs[3])
    
    try:
        _ = verify_logs(logs, key)
        print("❌ Insert test: No tampering detected!")
    except ValueError as e:
        print("✅ Insert test: Caught chain‐break:", e)
    except InvalidTag:
        print("✅ Insert test: Caught authentication failure (InvalidTag)")

test_insertion(audit_logs, key)

✅ Insert test: Caught chain‐break: Chain broken at record 2
