In [52]:
%load_ext micropython_magic
%reload_ext micropython_magic

The micropython_magic extension is already loaded. To reload it, use:
  %reload_ext micropython_magic


In [25]:
import math

# Standard Deviation: 2387.3688295694906
# Standard Deviation: 928.9352086567103

# Original dataset with outliers
data = [452, 1542, -903, -721, -933, -598, -895, 5794, -917, 2585, -843, -1014, -754, -1078, -1065, -94, 1066, 8305, 589, 4870, 943, 2171, -764, 4606, -104, 4754, -836, 4952, -429, -1147, -723, -797, 5238, -948, 1615, -887, 6069, -747, -973, -779, -734, -792, -484, -853, -1183, -645, -837, -958, -929, -660]
outliers = [5794, 8305, 4870, 4606, 4754, 4952, 5238, 6069]

cleaned_data = [x for x in data if x not in outliers]

# Step 1: Calculate the mean
mean = sum(cleaned_data) / len(cleaned_data)

# Step 2: Calculate the variance
variance = sum((x - mean) ** 2 for x in cleaned_data) / len(cleaned_data)

# Step 3: Calculate the standard deviation
std_dev = math.sqrt(variance)

# Output the result
print("Standard Deviation:", std_dev)


Standard Deviation: 928.9352086567103


In [53]:
%mpy -s {"/dev/cu.usbmodem_fs3_1"}

In [None]:
# %%micropython

import math
import time
from config import config
from machine import RTC
from mp_libs import logging
from mp_libs.mpy_decimal import DecimalNumber
from mp_libs.network import Network
from mp_libs.protocols.min_iot_protocol import MinIotMessage
from mp_libs.time import ptp
from mp_libs.time.time import get_fmt_datetime

NUM_SYNC_CYCLES = 10
TIMEOUT_MSEC = 5000

miniot_network = Network.create_min_iot("GTY")
miniot_network.connect()
rtc = RTC()


print(f"Initial RTC datetime: {rtc.datetime()}")
print(miniot_network.transport.transport.transport.wifi._sta.config("channel"))
print(miniot_network.transport.transport.transport.wifi._sta.config("mac"))
print(f"EPN Peer: {config["epn_peer_mac"]}")

while True:
    # Perform periph sequence
    timestamps = ptp.sequence_periph(miniot_network, TIMEOUT_MSEC, initiate_sync=True, num_sync_cycles=NUM_SYNC_CYCLES)

    # Calculate offsets
    offsets = [ptp.calculate_offset(t1, t2, t3, t4) for t1, t2, t3, t4 in timestamps]

    # Process offsets
    ave_offset = ptp.process_offsets(offsets)

    # Update time
    rtc.offset(str(ave_offset).encode())
    print(f"ave_offset: {ave_offset}")


In [60]:
# %%micropython --writefile ptp_test.py

import math
import time
from config import config
from machine import RTC
from mp_libs import logging
from mp_libs.mpy_decimal import DecimalNumber
from mp_libs.network import Network
from mp_libs.protocols.min_iot_protocol import MinIotMessage
from mp_libs.time import ptp
from mp_libs.time.time import get_fmt_datetime

rtc = RTC()

def master_time_sync(timeout_msec: int = 5000):
    # mqtt_network = Network.create_mqtt("GTY")
    miniot_network = Network.create_min_iot("GTY")
    miniot_network.connect()
    print(miniot_network.transport.transport.transport.wifi._sta.config(channel=11))
    # mqtt_network.connect()
    # mqtt_network.ntp_time_sync()

    print(f"Initial RTC datetime: {rtc.datetime()}")
    print(miniot_network.transport.transport.transport.wifi._sta.config("channel"))
    print(miniot_network.transport.transport.transport.wifi._sta.config("mac"))
    print(f"EPN Peer: {config["epn_peer_mac"]}")

    num_sync_cycles = 1
    wait_for_sync = True

    # Wait for sync req
    while wait_for_sync:
        rxed_msgs = []
        if miniot_network.receive(rxed_msgs):
            for msg in rxed_msgs:
                print(msg)
                try:
                    msg_type, payload = ptp.parse_msg(msg["msg"])
                except ValueError:
                    pass

                if msg_type == ptp.PtpMsg.SYNC_REQ:
                    num_sync_cycles = payload
                    wait_for_sync = False
                    break

        time.sleep(0.1)

    # Perform master sequence
    ptp.sequence_master(miniot_network, timeout_msec, num_sync_cycles)
    print(f"now: {rtc.datetime()}")



def periph_time_sync(num_sync_cycles: int = 10, timeout_msec: int = 5000):
    miniot_network = Network.create_min_iot("GTY")
    miniot_network.connect()

    print(f"Initial RTC datetime: {rtc.datetime()}")
    print(miniot_network.transport.transport.transport.wifi._sta.config("channel"))
    print(miniot_network.transport.transport.transport.wifi._sta.config("mac"))
    print(f"EPN Peer: {config["epn_peer_mac"]}")

    # Perform periph sequence
    timestamps = ptp.sequence_periph(miniot_network, timeout_msec, initiate_sync=True, num_sync_cycles=num_sync_cycles)

    # Calculate offsets
    offsets = [ptp.calculate_offset(t1, t2, t3, t4) for t1, t2, t3, t4 in timestamps]

    # Process offsets
    ave_offset = ptp.process_offsets(offsets)

    # Update time
    rtc.offset(str(ave_offset).encode())

    print(f"ave_offset: {ave_offset}")
    print(f"now: {rtc.datetime()}")


def print_datetime():
    print(rtc.datetime())


In [None]:
import time

trigger_dt = (2024, 9, 14, 16, 34, 0, 5, 258, 1)
incr_sec = 0

for i in range(10):
    sync_logs = %mpy import ptp_test; ptp_test.periph_time_sync(50)
    for log in sync_logs:
        print(log)

    trigger_ts = time.mktime(trigger_dt)
    while time.time() < trigger_ts:
        pass

    for i in range(5):
        result = %mpy ptp_test.print_datetime()
        print(result)
        time.sleep(1)

    incr_sec += 15
    trigger_dt = time.localtime(trigger_ts + incr_sec)