Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Porting to Micropython / Raspberry Pi Pico ? #44

Open
petaramesh opened this issue Jan 31, 2024 · 9 comments
Open

Porting to Micropython / Raspberry Pi Pico ? #44

petaramesh opened this issue Jan 31, 2024 · 9 comments
Labels
enhancement New feature or request question Further information is requested

Comments

@petaramesh
Copy link

petaramesh commented Jan 31, 2024

Hello,

I'm creating a hobby microcontroller project that involves reading Victron systems (a BMV-712 smart, a battery SmartSense, a Smart MPPT charger and an Orion DC-DC charger) on a Raspberry Pi Pico running Micropython.

I found out your project and it exactly fits the parts I miss for getting and processing the data from the Victrons.

I tested it on my Linux PC and it works like a charm.

So I tried installing the libraries onto my Pico, but bummer ! it doesn't work because Micropython doesn't have most of the standard CPython libraries you import - although it may generally provide what should be needed.

I'm not a seasoned Python dev myself - actually this is my first project in python, let alone on a microcontroller, but I have written all the code I need and it works, except for the Victron Bluetooth part that I'm still missing.

I don't feel at ease enough with your code for trying to port it myself to this environment, so i was wondering if it was in your own developments projects or if you could possiblly consider it ?

Many thanks in advance.

Addendum :

I've now written enough quick and dirty code so I can retrieve Victron's values (not all of them, just those that I need) without using your libraries.
I got inspiration from them but couldn't easily port them as they rely on a number of libraries / modules that are not available under Micropython or are trimmed down (i.e. no support for AES-CTR in cryptolib, no enum, no typing, and worse of all no construct and no support at all for 24-bit integers).
So I had to work around all this and use different and dirty solutions.

Still, generic libraries for doing this in a cleaner way would be nice.

@petaramesh petaramesh added enhancement New feature or request question Further information is requested labels Jan 31, 2024
@georg90
Copy link

georg90 commented Feb 7, 2024

HI @petaramesh would you mind sharing your code examples? I am in the same position as you and might be able to reuse / extend your work if you're okay with that.

Thank you!

@petaramesh
Copy link
Author

Yeah I will try snipping the BT code from the rest of my project into something that can work standalone and post it here.
It will take me a little time.

@georg90
Copy link

georg90 commented Feb 7, 2024

Very much appreciated

@petaramesh
Copy link
Author

petaramesh commented Feb 7, 2024

Well here comes my quick and dirty extraction from my quick and dirty code.
If you have supported Victron devices and just put your devices MAC addresses and encryption keys in there and run it, it should start spitting received values to the console...

Make sure to install the asyncio librarires as indicated in the comments before attempting to run it.

I cut it from my whole project so it may still contain unnecessary code. I've tested it with a single Victron device I have on hand, and it works. So let me know...

NB: This comes from a project that does a host of other things so I didn't want the BT reception to use too much CPU time.
If you want to receive more data or more frequently, you may wish to change :

_BT_SCAN_INTERVAL_US = const(2000000)         # Scan every 2 sec (uS)
_BT_SCAN_WINDOW_US = const(100000)            # Scan for 100 mS (uS)
# Pico_Victron_BT.py
#
# © Swâmi Petaramesh 2024
#
# This program is free software: you can redistribute it and/or modify it under the terms
# of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>. 3
#
# This program is example code for receiving and decoding some Victron® devices information
# and statues over Bluetooth-LE on a Raspberry Pi® Pico W running :
#
# Works on : sys.version:3.4.0; MicroPython v1.22.1 on 2024-01-05
#            RPI_PICO_W-20240105-v1.22.1.uf2
#
# You need to put your own devices MAC addresses and encryption keys in the code below.

# Lbraries =============================================================
#
# Micropython built-in libraries
import sys, time, struct, bluetooth, cryptolib
from machine import Pin
from cryptolib import aes

# Libraries to be installed on the Raspberry Pi Pico
# With mpremote tool (from Linux PC terminal) :
# mpremote mip install github:peterhinch/micropython-async/v3/primitives
# mpremote mip install github:peterhinch/micropython-async/v3/threadsafe
import asyncio
from threadsafe import ThreadSafeQueue

# Initialisations ==========================================================

_DEBUG = const(2)             # Extra debug console output

_UNKN = const(-9999)          # N/A value

pin_led_bt = Pin("WL_GPIO0", Pin.OUT, value=1))         # Integrated LED, ON

# Victron devices parameters and values -----------------------------------

# We need to put our Victron devices MAC addresses and encryption keys here
# The rest should be left _UNKN

victron = {
    "bmv712" : { "mac": b"\xA3\x76\x6E\x7C\x22\x33",
            "key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF0",
            "volt": _UNKN,
            "amp": _UNKN,
            "soc": _UNKN,
            "temp": _UNKN,
            "upd": _UNKN
        },
    "orion" : { "mac": b"\xA3\x76\x6E\x7C\x22\x34",
           "key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF1",
           "eng_detect": True,
           "mode": _UNKN,
           "cause": _UNKN,
           "v_in": _UNKN,
           "v_out": _UNKN,
           "lib_mode": 2,
           "lib_cause": 6,
           "upd": _UNKN
        },
    "smartsolar" : { "mac": b"\xA3\x76\x6E\x7C\x22\x35",
            "key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF2",
            "mode": _UNKN,
            "pwr": _UNKN,
            "amp": _UNKN,
            "upd": _UNKN,
            "lib_mode": 2
        },
    "batsense" : { "mac": b"\xA3\x76\x6E\x7C\x22\x36",
              "key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF3",
              "volt": _UNKN,
              "temp": _UNKN,
              "upd": _UNKN
        }
}

maclist= [victron[d]["mac"] for d in victron]

# System parameters ------------------------------------------------

_TMR_MAIN_LOOP = const(500)                   # Main loop duration

_BT_SCAN_DURATION_MS = const(0)               # BT Scan default duration, 0 = forever
_BT_SCAN_INTERVAL_US = const(2000000)         # Scan every 2 sec (uS)
_BT_SCAN_WINDOW_US = const(100000)            # Scan for 100 mS (uS)
_BT_MIN_RSSI = const(-85)                     # Minimum RSSI
ble = bluetooth.BLE()

# Fixed values
_BT_EXPIRE = const(180 * 1000)                # BT values expired after stalled (mS)

# This is our main status that we use everywhere
STATUS = (
          "CKSUM",          # 0
          "?????",          # 1
          "Unknown",        # 2
          "ERROR",          # 3
          "FAIL!",          # 4
          "DELTA",          # 5
          "Init",           # 6
          "Off",            # 7
          "Stop",           # 8
          "Start",          # 9
          "HI-V",           # 10
          "HI-v",           # 11
          "LOW-V",          # 12
          "LOW-v",          # 13
          "LOW-%",          # 14
          "HI-T°",          # 15
          "LO-T°",          # 16
          "Drain",          # 17
          "Timer",          # 18
          "Timr",           # 19
          "Stby",           # 20
          "LOW",            # 21
          "OK",             # 22
          "High",           # 23
          "Full",           # 24
          "Chrg",           # 25
          "Chg",            # 26
          "Engine",         # 27
          "Bulk",           # 28
          "Absorption",     # 29
          "Float",          # 30
          "Storage",        # 31
          "Remote",         # 32
          "Lock",           # 33
          "EngStop",        # 34
          "R+Stop",         # 35
          "RunTime",        # 36
          "FloTime",        # 37
          "Display",        # 38
          "Manual",         # 39
          "------------"    # 40
)

# Victron operation modes
VICTRON_OP = { 0: { "code": 7, "lib": "Off" },
               1: { "code": 12, "lib": "LOW-V" },
               2: { "code": 3, "lib": "ERROR" },
               3: { "code": 28, "lib": "Bulk" },
               4: { "code": 29, "lib": "Absorption" },
               5: { "code": 30, "lib": "Float" },
               6: { "code": 31, "lib": "Storage" },
               7: { "code": 1, "lib": "Equalize" },
               9: { "code": 1, "lib": "Inverting" },
               11: { "code": 1, "lib": "Supply" },
               245: { "code": 6, "lib": "Init" },
               246: { "code": 29, "lib": "Repeated absorption" },
               247: { "code": 1, "lib": "Recondition" },
               248: { "code": 1, "lib": "Bat safe" },
               252: { "code": 32, "lib": "Remote" }
             }

# Off reason for Victron DC-DC charger
VICTRON_DC_OFF = { 0x00000000: { "code": 40, "lib": "None" },
                   0x00000001: { "code": 4, "lib": "No-Input" },
                   0x00000002: { "code": 7, "lib": "Off" },
                   0x00000004: { "code": 7, "lib": "Off" },
                   0x00000008: { "code": 32, "lib": "Remote" },
                   0x00000010: { "code": 1, "lib": "Protection" },
                   0x00000020: { "code": 1, "lib": "Pay" },
                   0x00000040: { "code": 1, "lib": "BMS-CUT" },
                   0x00000080: { "code": 27, "lib": "Engine" },
                   0x00000081: { "code": 35, "lib": "R+Stop" },
                   0x00000100: { "code": 1, "lib": "Analyzing" }
                 }
                   
# Initial values --------------------------------------------------
loop_time = 0
comp_time = 0
last_time = 0

# Global exception handler =======================================
def _handle_exception(loop, context):
    print('Exception occurred !')
    sys.print_exception(context["exception"])
    sys.exit()

# Classes ========================================================

class BLEScanner:
    def __init__(self, ble, target_mac_list):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        self._target_mac_list = target_mac_list
        self._start_time = None

    # Interrupt proceessing routine
    def _irq(self, event, data):
        global bt_queue
        pin_led_bt.on()
        if event == 5:  # Event value for _IRQ_SCAN_RESULT
            addr_type, addr, adv_type, rssi, adv_data = data
            if rssi > _BT_MIN_RSSI:
                if addr in self._target_mac_list and adv_type == 0:
                    try:
                        # Queue received data for async coro to process
                        bt_queue.put_sync([bytes(addr), addr_type, adv_type, rssi, bytes(adv_data)])
                    except IndexError:
                        # Queue is full
                        pass
        pin_led_bt.off()

    # Start the scanner
    def start_scan(self, duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US):
        self._ble.active(True)
        self._start_time = time.ticks_ms()
        self._ble.gap_scan(0, duration_ms, interval_us, window_us)

    # Stop the scanner
    def stop_scan(self):
        self._ble.gap_scan(None)
        self._ble.active(False)

# Functions ======================================================

def kelvin_to_celsius(kelvin):
    return round(kelvin - 273.15, 2)

# Decode received and decrypted BT values
def bt_decode(dev,cleartext):
    global victron
    if _DEBUG: print(f"*** Found device : {dev}")
    if _DEBUG >= 2:
        print("  Raw Decrypted Data (Hex):  ", ' '.join(['{:02X}'.format(b) for b in cleartext]))
        
    if dev is "bmv712" or dev is "batsense":
        try:
            if cleartext[2:4] != b'\xFF\x7F':
                victron[dev]["volt"] = float(struct.unpack('h', cleartext[2:4])[0] / 100)
            else:
                victron[dev]["volt"] = _UNKN
        except:
            victron[dev]["volt"] = _UNKN
        try:
            if struct.unpack('B',cleartext[8:9])[0] & 0b11 == 0b10:
                victron[dev]["temp"] = float(kelvin_to_celsius(struct.unpack('h', cleartext[6:8])[0] / 100))
            else:
                victron[dev]["temp"] = _UNKN
        except:
            victron[dev]["temp"] = _UNKN
        if victron[dev]["volt"] != _UNKN and victron[dev]["temp"] != _UNKN:
            victron[dev]["upd"] = time.ticks_ms()
        if _DEBUG >= 2: print(f"Volt : {victron[dev]["volt"]}   Temp: {victron[dev]["temp"]}")
        if dev is "bmv712":
            try:
                victron[dev]["soc"] = float(((struct.unpack('h', cleartext[13:15])[0] & 0x3FFF) >> 4) / 10)
                if victron[dev]["soc"] == 0x3FF:
                    victron[dev]["soc"] = _UNKN
            except:
                victron[dev]["soc"] = _UNKN
            try:
                amp = bytearray(cleartext[8:11])
                if amp[2] & 0x80 == 0x80:
                    amp.extend(b'\xFF')
                else:
                    amp.extend(b'\x00')
                victron[dev]["amp"] = float(((struct.unpack('i', amp)[0]) >>2 ) / 1000)
            except:
                victron[dev]["amp"] = _UNKN
            if _DEBUG >= 2: print(f"Soc : {victron[dev]["soc"]}   Amp: {victron[dev]["amp"]}")
    elif dev is "smartsolar":
        try:
            if cleartext[0:1] != b'\xFF':
                victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0]
            else:
                victron[dev]["mode"] = _UNKN
        except:
            victron[dev]["mode"] = _UNKN
        try:
            if cleartext[4:6] != b'\xFF\x7F':
                victron[dev]["amp"] = float(struct.unpack('h', cleartext[4:6])[0] / 10)
            else:
                victron[dev]["amp"] = _UNKN
        except:
            victron[dev]["amp"] = _UNKN
        try:
            if cleartext[8:10] != b'\xFF\xFF':
                victron[dev]["pwr"] = float(struct.unpack('H', cleartext[8:10])[0])
            else:
                victron[dev]["pwr"] = _UNKN
        except:
            victron[dev]["pwr"] = _UNKN
        try:
            victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"]
        except:
            victron[dev]["lib_mode"] = 1
        if (victron[dev]["mode"] != _UNKN and victron[dev]["amp"] != _UNKN
            and victron[dev]["pwr"] != _UNKN and victron[dev]["lib_mode"] != 1
        ):
            victron[dev]["upd"] = time.ticks_ms()
        if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]}   PWR : {victron[dev]["pwr"]}   Lib : {victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}")
    elif dev is "orion":
        try:
            if cleartext[0:1] != b'\xFF':
                victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0]
            else:
                victron[dev]["mode"] = _UNKN
        except:
            victron[dev]["mode"] = _UNKN
        try:
            if cleartext[2:4] != b'\xFF\xFF':
                victron[dev]["v_in"] = float(struct.unpack('H', cleartext[2:4])[0] / 100)
            else:
                victron[dev]["v_in"] = _UNKN
        except:
            victron[dev]["v_in"] = _UNKN
        try:
            if cleartext[4:6] != b'\xFF\x7F':
                victron[dev]["v_out"] = float(struct.unpack('h', cleartext[4:6])[0] / 100)
            else:
                victron[dev]["v_out"] = _UNKN
        except:
            victron[dev]["v_out"] = _UNKN
        try:
            if cleartext[6:10] != b'\xFF\xFF\xFF\xFF':
                victron[dev]["cause"] = int(struct.unpack('I', cleartext[6:10])[0])
            else:
                victron[dev]["cause"] = _UNKN
        except:
            victron[dev]["cause"] = _UNKN
        try:
            victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"]
        except:
            victron[dev]["lib_mode"] = 1
        try:
            victron[dev]["lib_cause"] = VICTRON_DC_OFF[victron[dev]["cause"]]["code"]
        except:
            victron[dev]["lib_cause"] = 1
        if (victron[dev]["mode"] != _UNKN and victron[dev]["v_in"] != _UNKN
            and (victron[dev]["mode"] == 0 or victron[dev]["v_out"] != _UNKN)
            and victron[dev]["cause"] != _UNKN and victron[dev]["lib_mode"] != 1
            and victron[dev]["lib_cause"] != 1
        ):
            victron[dev]["upd"] = time.ticks_ms()
        if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]}:{victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}, Cause: {victron[dev]["cause"]}:{victron[dev]["lib_cause"]}:{STATUS[victron[dev]["lib_cause"]]}")
        if _DEBUG >= 2: print(f"V_in : {victron[dev]["v_in"]}, V_out: {victron[dev]["v_out"]}")

# Old BT values expiration ----------------------------------
async def bt_expire(coro_freq):
    global comp_time, victron
    while True:
        coro_begin = time.ticks_ms()
        if victron["bmv712"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["bmv712"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
            victron["bmv712"]["volt"] = _UNKN
            victron["bmv712"]["amp"] = _UNKN
            victron["bmv712"]["soc"] = _UNKN
            victron["bmv712"]["temp"] = _UNKN
        if victron["batsense"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["batsense"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
            victron["batsense"]["volt"] = _UNKN
            victron["batsense"]["temp"] = _UNKN
        if victron["orion"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["orion"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
            victron["orion"]["mode"] = _UNKN
            victron["orion"]["cause"] = _UNKN
            victron["orion"]["v_in"] = _UNKN
            victron["orion"]["v_out"] = _UNKN
            victron["orion"]["lib_mode"] = 2
            victron["orion"]["lib_cause"] = 40
        if victron["smartsolar"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["smartsolar"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
            victron["smartsolar"]["mode"] = _UNKN
            victron["smartsolar"]["pwr"] = _UNKN
            victron["smartsolar"]["amp"] = _UNKN
            victron["smartsolar"]["lib_mode"] = 2
        if _DEBUG >= 3: print(f"* CORO: bt_expire: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.")
        coro_throttle = time.ticks_diff(time.ticks_add(coro_begin,coro_freq),time.ticks_ms())
        comp_time += time.ticks_diff(time.ticks_ms(), coro_begin)
        if coro_throttle >= 0:
            await asyncio.sleep_ms(coro_throttle)
        else:
            await asyncio.sleep(0)

# Decrypt and decode received BT values ------------------------------------------
async def bt_decrypt(bt_queue):
    global comp_time, scanner, victron
    async for bt_input in bt_queue:
        coro_begin = time.ticks_ms()
        pin_led_bt.on()
        mac = bt_input[0]                           # MAC address, bytes
        # mac_type = bt_input[1]                    # Address type, integer
        adv_data = bt_input[4]                      # Advertisement data, bytes
        kb0 = struct.unpack('B',adv_data[14:15])[0] # 1st encryption key byte
        
        if _DEBUG:
            timestamp = time.ticks_diff(time.ticks_ms(), scanner._start_time) / 1000
            adv_type = bt_input[2]                  # Advertisement type, integer
            rssi = bt_input[3]                      # RSSI, integer
            print("\n{:.1f}s - Target Device Found - Address: {mac}, RSSI: {rssi}, Adv. Type: {adv_type}".format(
                timestamp,
                mac=':'.join(['{:02X}'.format(b) for b in mac]),
                rssi=rssi,
                adv_type=adv_type
            ))
            if _DEBUG >= 2:
                record_type = struct.unpack('B',adv_data[11:12])[0]
                nonce = struct.unpack('H',adv_data[12:14])[0]
                # Print the entire advertising data as hex
                print("  Raw Advertising Data (Hex):", ' '.join(['{:02X}'.format(b) for b in adv_data]))
                print(f"  Record type: {record_type:#04X}   Nonce: {nonce:#06X}   Key byte 0: {kb0:#04X}")

        for dev in victron:
            if victron[dev]["mac"] == mac:
                if victron[dev]["key"][0:1] == kb0.to_bytes(1,0):
                    if _DEBUG >= 2: print("  Encryption key matches.")
                    
                    # AES-CTR Decryption
                    # We should use AES-CTR but it is not implemented into mycropython's
                    # cryptolib, so we need to fake it using ECB.
                    # We have at most 16 bytes to decrypt, so we can do it in a single
                    # pass with the nonce + a zero CTR value.
                    ctr = bytearray(adv_data[12:14])                          # Start with nonce
                    ctr.extend(bytes(14))                                     # Counter is zero
                    # if _DEBUG >= 2: print("  Ctr feed (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr]))
                    ciphertext = bytearray(adv_data[15:])                     # Our ciphertext
                    if len(adv_data[15:]) < 16 :                              # Extend it to 16 bytes
                           ciphertext.extend(bytes(16 - len(adv_data[15:])))  # if needed
                    cipher = cryptolib.aes(victron[dev]["key"],1)             # Initialize AES ECB with key
                    cipher.encrypt(ctr,ctr)                                   # Encrypt counter
                    # if _DEBUG >= 2: print("  Encrypted CTR (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr]))
                    cleartext = bytes(a ^ b for a, b in zip(ciphertext, ctr)) # XOR results with ciphertext

                    bt_decode(dev,cleartext)                                  # Now decode what we got
                else:
                    if _DEBUG: print(f"  Encryption key mismatch ! Device {dev}:mac Ours: {victron[dev]["key"][0:1]}, got: {kb0.to_bytes(1,0)}")
                    break 
                break
 
        if _DEBUG >= 3: print(f"* CORO: bt_decrypt: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.")
        comp_time += time.ticks_diff(time.ticks_ms(), coro_begin)
        pin_led_bt.off()
        await asyncio.sleep(0)

# Main loop ==================================================
async def main():
    global comp_time, last_time, loop_time
    global scanner, bt_queue, bt_stat
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(_handle_exception)
    
    # Bluetooth thread safe queue
    bt_queue = ThreadSafeQueue([[bytes(6), int(0), int(0), int(0), bytes(48)] for _ in range(20)])
    
    # Create scheduled tasks
    task_bt_expire = asyncio.create_task(bt_expire(5000))                         # Expire old BT values
    task_bt_decrypt = asyncio.create_task(bt_decrypt(bt_queue))                                # Decrypt and decode BT values
    await asyncio.sleep(0)
    
    # Start Bluetooth BLE scanner
    scanner = BLEScanner(ble, maclist)
    scanner.start_scan(duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US)
    
    while True:
        # Sets the main loop defined duration
        loop_begin_tick = time.ticks_ms()
        loop_end_target = time.ticks_add(loop_begin_tick,_TMR_MAIN_LOOP)
        
        # Wait until desired loop duration
        loop_time = time.ticks_diff(time.ticks_ms(),loop_begin_tick)
        last_time = comp_time + loop_time
        comp_time = 0
        loop_throttle = time.ticks_diff(loop_end_target,time.ticks_ms()) - 1
        if loop_throttle >= 0 :
            await asyncio.sleep_ms(loop_throttle)
        else:
            loop_throttle = 0

# Let's do it !
asyncio.run(main())

@petaramesh
Copy link
Author

I've changed some LED related values above, that's not that important, but it was wrong (as my original project uses LEDs in a different awy).

@georg90
Copy link

georg90 commented Feb 7, 2024

I got it compiled. There is a double parenthesis and missing import of "sys" (will post my code once I got it running)

I am running on MicroPython v1.23.0-preview.91.g5a68e82d1 on 2024-02-07; Raspberry Pi Pico W with RP2040.
I am looking to find a SmartShunt but I am not "seeing" my device. When I run the scan with victron-ble I get a value for mac like

B9FD07CC-8128-EC97-96E4-2284631B3CA6

but no hex value. Since I don't know the type of the value I am not sure if the key mismatch is my fault or if the device is not correct.

How did you get the original hex values extracted?

@petaramesh
Copy link
Author

You can these values in the VictronConnedct app on your phone. Go to your device, then “parameters”, menu, “Product info”, and down there “Show encryption key”.

@georg90
Copy link

georg90 commented Feb 8, 2024

Thanks for pointing that out. I got it working with your instructions.
See my adopted code here: https://gist.github.com/georg90/c6822fa28261059e4c8361bdcff13f32

I think this is a great basis for starting of a victron_ble port - maybe it can be considered. Otherwise anyone coming from Google will find this helpful. Thanks alot @petaramesh for the port!

@petaramesh
Copy link
Author

Hello,

I've updated my code to allow receiving and processing Victron BT advertisements even when the VictronConnect phone app is connected to the Victron device (previous code would only work when the VictronConnect app was not connected to the device).

So here is the new code :

# Pico_Victron_BT.py
#
# © Swâmi Petaramesh 2024
#
# This program is free software: you can redistribute it and/or modify it under the terms
# of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>. 3
#
# This program is example code for receiving and decoding some Victron® devices information
# and statues over Bluetooth-LE on a Raspberry Pi® Pico W running :
#
# Works on : sys.version:3.4.0; MicroPython v1.22.1 on 2024-01-05
#            RPI_PICO_W-20240105-v1.22.1.uf2
#
# You need to put your own devices MAC addresses and encryption keys in the code below.

# Lbraries =============================================================
#
# Micropython built-in libraries
import sys, time, struct, bluetooth, cryptolib
from machine import Pin
from cryptolib import aes

# Libraries to be installed on the Raspberry Pi Pico
# With mpremote tool (from Linux PC terminal) :
# mpremote mip install github:peterhinch/micropython-async/v3/primitives
# mpremote mip install github:peterhinch/micropython-async/v3/threadsafe
import asyncio
from threadsafe import ThreadSafeQueue

# Initialisations ==========================================================

_DEBUG = const(2)             # Extra debug console output

_UNKN = const(-9999)          # N/A value

pin_led_bt = Pin("WL_GPIO0", Pin.OUT, value=1)         # Integrated LED, ON

# Victron devices parameters and values -----------------------------------

# We need to put our Victron devices MAC addresses and encryption keys here
# The rest should be left _UNKN

victron = {
    "bmv712" : { "mac": b"\xA3\x76\x6E\x7C\x22\x33",
            "key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF0",
            "volt": _UNKN,
            "amp": _UNKN,
            "soc": _UNKN,
            "temp": _UNKN,
            "upd": _UNKN
        },
    "orion" : { "mac": b"\xA3\x76\x6E\x7C\x22\x34",
           "key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF1",
           "eng_detect": True,
           "mode": _UNKN,
           "cause": _UNKN,
           "v_in": _UNKN,
           "v_out": _UNKN,
           "lib_mode": 2,
           "lib_cause": 6,
           "upd": _UNKN
        },
    "smartsolar" : { "mac": b"\xA3\x76\x6E\x7C\x22\x35",
            "key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF2",
            "mode": _UNKN,
            "pwr": _UNKN,
            "amp": _UNKN,
            "upd": _UNKN,
            "lib_mode": 2
        },
    "batsense" : { "mac": b"\xA3\x76\x6E\x7C\x22\x36",
              "key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF3",
              "volt": _UNKN,
              "temp": _UNKN,
              "upd": _UNKN
        }
}

maclist= [victron[d]["mac"] for d in victron]

# System parameters ------------------------------------------------

_TMR_MAIN_LOOP = const(500)                   # Main loop duration

_BT_SCAN_DURATION_MS = const(0)               # BT Scan default duration, 0 = forever
_BT_SCAN_INTERVAL_US = const(3000000)         # Scan every 3 sec (uS)
_BT_SCAN_WINDOW_US = const(400000)            # Scan for 400 mS (uS)
_BT_MIN_RSSI = const(-85)                     # Minimum RSSI
ble = bluetooth.BLE()

# Fixed values
_BT_EXPIRE = const(180 * 1000)                # BT values expired after stalled (mS)

# This is our main status that we use everywhere
STATUS = (
          "CKSUM",          # 0
          "?????",          # 1
          "Unknown",        # 2
          "ERROR",          # 3
          "FAIL!",          # 4
          "DELTA",          # 5
          "Init",           # 6
          "Off",            # 7
          "Stop",           # 8
          "Start",          # 9
          "HI-V",           # 10
          "HI-v",           # 11
          "LOW-V",          # 12
          "LOW-v",          # 13
          "LOW-%",          # 14
          "HI-T°",          # 15
          "LO-T°",          # 16
          "Drain",          # 17
          "Timer",          # 18
          "Timr",           # 19
          "Stby",           # 20
          "LOW",            # 21
          "OK",             # 22
          "High",           # 23
          "Full",           # 24
          "Chrg",           # 25
          "Chg",            # 26
          "Engine",         # 27
          "Bulk",           # 28
          "Absorption",     # 29
          "Float",          # 30
          "Storage",        # 31
          "Remote",         # 32
          "Lock-V",         # 33
          "EngStop",        # 34
          "R+Stop",         # 35
          "RunTime",        # 36
          "FloTime",        # 37
          "Display",        # 38
          "Manual",         # 39
          "------------"    # 40
)

# Victron operation modes
VICTRON_OP = { 0: { "code": 7, "lib": "Off" },
               1: { "code": 12, "lib": "LOW-V" },
               2: { "code": 3, "lib": "ERROR" },
               3: { "code": 28, "lib": "Bulk" },
               4: { "code": 29, "lib": "Absorption" },
               5: { "code": 30, "lib": "Float" },
               6: { "code": 31, "lib": "Storage" },
               7: { "code": 1, "lib": "Equalize" },
               9: { "code": 1, "lib": "Inverting" },
               11: { "code": 1, "lib": "Supply" },
               245: { "code": 6, "lib": "Init" },
               246: { "code": 29, "lib": "Repeated absorption" },
               247: { "code": 1, "lib": "Recondition" },
               248: { "code": 1, "lib": "Bat safe" },
               252: { "code": 32, "lib": "Remote" }
             }

# Off reason for Victron DC-DC charger
VICTRON_DC_OFF = { 0x00000000: { "code": 40, "lib": "None" },
                   0x00000001: { "code": 4, "lib": "No-Input" },
                   0x00000002: { "code": 7, "lib": "Off" },
                   0x00000004: { "code": 7, "lib": "Off" },
                   0x00000008: { "code": 32, "lib": "Remote" },
                   0x00000010: { "code": 1, "lib": "Protection" },
                   0x00000020: { "code": 1, "lib": "Pay" },
                   0x00000040: { "code": 1, "lib": "BMS-CUT" },
                   0x00000080: { "code": 27, "lib": "Engine" },
                   0x00000081: { "code": 33, "lib": "Lock-V" },
                   0x00000100: { "code": 1, "lib": "Analyzing" }
                 }
                   
# Initial values --------------------------------------------------
loop_time = 0
comp_time = 0
last_time = 0

# Global exception handler =======================================
def _handle_exception(loop, context):
    print('Exception occurred !')
    sys.print_exception(context["exception"])
    sys.exit()

# Classes ========================================================

class BLEScanner:
    def __init__(self, ble, target_mac_list):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        self._target_mac_list = target_mac_list
        self._start_time = None

    # Interrupt proceessing routine
    def _irq(self, event, data):
        global bt_queue
        pin_led_bt.on()
        if event == 5:  # Event value for _IRQ_SCAN_RESULT
            addr_type, addr, adv_type, rssi, adv_data = data
            if (rssi > _BT_MIN_RSSI
                and adv_data[5:8] == b'\xE1\x02\x10'
                and addr in self._target_mac_list
                and ( adv_type == 0 or adv_type == 2)
                and adv_data[1:2] == b'\x01'
            ):
                try:
                    # Queue received data for async coro to process
                    bt_queue.put_sync([bytes(addr), addr_type, adv_type, rssi, bytes(adv_data)])
                except IndexError:
                    # Queue is full
                    pass
        pin_led_bt.off()

    # Start the scanner
    def start_scan(self, duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US):
        self._ble.active(True)
        self._start_time = time.ticks_ms()
        self._ble.gap_scan(0, duration_ms, interval_us, window_us)

    # Stop the scanner
    def stop_scan(self):
        self._ble.gap_scan(None)
        self._ble.active(False)

# Functions ======================================================

def kelvin_to_celsius(kelvin):
    return round(kelvin - 273.15, 2)

# Decode received and decrypted BT values
def bt_decode(dev,cleartext):
    global victron
    if _DEBUG: print(f"*** Found device : {dev}")
    if _DEBUG >= 2:
        print("  Raw Decrypted Data (Hex):  ", ' '.join(['{:02X}'.format(b) for b in cleartext]))
        
    if dev is "bmv712" or dev is "batsense":
        try:
            if cleartext[2:4] != b'\xFF\x7F':
                victron[dev]["volt"] = float(struct.unpack('h', cleartext[2:4])[0] / 100)
            else:
                victron[dev]["volt"] = _UNKN
        except:
            victron[dev]["volt"] = _UNKN
        try:
            if struct.unpack('B',cleartext[8:9])[0] & 0b11 == 0b10:
                victron[dev]["temp"] = float(kelvin_to_celsius(struct.unpack('h', cleartext[6:8])[0] / 100))
            else:
                victron[dev]["temp"] = _UNKN
        except:
            victron[dev]["temp"] = _UNKN
        if victron[dev]["volt"] != _UNKN and victron[dev]["temp"] != _UNKN:
            victron[dev]["upd"] = time.ticks_ms()
        if _DEBUG >= 2: print(f"Volt : {victron[dev]["volt"]}   Temp: {victron[dev]["temp"]}")
        if dev is "bmv712":
            try:
                victron[dev]["soc"] = float(((struct.unpack('h', cleartext[13:15])[0] & 0x3FFF) >> 4) / 10)
                if victron[dev]["soc"] == 0x3FF:
                    victron[dev]["soc"] = _UNKN
            except:
                victron[dev]["soc"] = _UNKN
            try:
                amp = bytearray(cleartext[8:11])
                if amp[2] & 0x80 == 0x80:
                    amp.extend(b'\xFF')
                else:
                    amp.extend(b'\x00')
                victron[dev]["amp"] = float(((struct.unpack('i', amp)[0]) >>2 ) / 1000)
            except:
                victron[dev]["amp"] = _UNKN
            if _DEBUG >= 2: print(f"Soc : {victron[dev]["soc"]}   Amp: {victron[dev]["amp"]}")
    elif dev is "smartsolar":
        try:
            if cleartext[0:1] != b'\xFF':
                victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0]
            else:
                victron[dev]["mode"] = _UNKN
        except:
            victron[dev]["mode"] = _UNKN
        try:
            if cleartext[4:6] != b'\xFF\x7F':
                victron[dev]["amp"] = float(struct.unpack('h', cleartext[4:6])[0] / 10)
            else:
                victron[dev]["amp"] = _UNKN
        except:
            victron[dev]["amp"] = _UNKN
        try:
            if cleartext[8:10] != b'\xFF\xFF':
                victron[dev]["pwr"] = float(struct.unpack('H', cleartext[8:10])[0])
            else:
                victron[dev]["pwr"] = _UNKN
        except:
            victron[dev]["pwr"] = _UNKN
        try:
            victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"]
        except:
            victron[dev]["lib_mode"] = 1
        if (victron[dev]["mode"] != _UNKN and victron[dev]["amp"] != _UNKN
            and victron[dev]["pwr"] != _UNKN and victron[dev]["lib_mode"] != 1
        ):
            victron[dev]["upd"] = time.ticks_ms()
        if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]}   PWR : {victron[dev]["pwr"]}   Lib : {victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}")
    elif dev is "orion":
        try:
            if cleartext[0:1] != b'\xFF':
                victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0]
            else:
                victron[dev]["mode"] = _UNKN
        except:
            victron[dev]["mode"] = _UNKN
        try:
            if cleartext[2:4] != b'\xFF\xFF':
                victron[dev]["v_in"] = float(struct.unpack('H', cleartext[2:4])[0] / 100)
            else:
                victron[dev]["v_in"] = _UNKN
        except:
            victron[dev]["v_in"] = _UNKN
        try:
            if cleartext[4:6] != b'\xFF\x7F':
                victron[dev]["v_out"] = float(struct.unpack('h', cleartext[4:6])[0] / 100)
            else:
                victron[dev]["v_out"] = _UNKN
        except:
            victron[dev]["v_out"] = _UNKN
        try:
            if cleartext[6:10] != b'\xFF\xFF\xFF\xFF':
                victron[dev]["cause"] = int(struct.unpack('I', cleartext[6:10])[0])
            else:
                victron[dev]["cause"] = _UNKN
        except:
            victron[dev]["cause"] = _UNKN
        try:
            victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"]
        except:
            victron[dev]["lib_mode"] = 1
        try:
            victron[dev]["lib_cause"] = VICTRON_DC_OFF[victron[dev]["cause"]]["code"]
        except:
            victron[dev]["lib_cause"] = 1
        if (victron[dev]["mode"] != _UNKN and victron[dev]["v_in"] != _UNKN
            and (victron[dev]["mode"] == 0 or victron[dev]["v_out"] != _UNKN)
            and victron[dev]["cause"] != _UNKN and victron[dev]["lib_mode"] != 1
            and victron[dev]["lib_cause"] != 1
        ):
            victron[dev]["upd"] = time.ticks_ms()
        if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]}:{victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}, Cause: {victron[dev]["cause"]}:{victron[dev]["lib_cause"]}:{STATUS[victron[dev]["lib_cause"]]}")
        if _DEBUG >= 2: print(f"V_in : {victron[dev]["v_in"]}, V_out: {victron[dev]["v_out"]}")

# Old BT values expiration ----------------------------------
async def bt_expire(coro_freq):
    global comp_time, victron
    while True:
        coro_begin = time.ticks_ms()
        if victron["bmv712"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["bmv712"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
            victron["bmv712"]["volt"] = _UNKN
            victron["bmv712"]["amp"] = _UNKN
            victron["bmv712"]["soc"] = _UNKN
            victron["bmv712"]["temp"] = _UNKN
        if victron["batsense"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["batsense"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
            victron["batsense"]["volt"] = _UNKN
            victron["batsense"]["temp"] = _UNKN
        if victron["orion"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["orion"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
            victron["orion"]["mode"] = _UNKN
            victron["orion"]["cause"] = _UNKN
            victron["orion"]["v_in"] = _UNKN
            victron["orion"]["v_out"] = _UNKN
            victron["orion"]["lib_mode"] = 2
            victron["orion"]["lib_cause"] = 40
        if victron["smartsolar"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["smartsolar"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
            victron["smartsolar"]["mode"] = _UNKN
            victron["smartsolar"]["pwr"] = _UNKN
            victron["smartsolar"]["amp"] = _UNKN
            victron["smartsolar"]["lib_mode"] = 2
        if _DEBUG >= 3: print(f"* CORO: bt_expire: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.")
        coro_throttle = time.ticks_diff(time.ticks_add(coro_begin,coro_freq),time.ticks_ms())
        comp_time += time.ticks_diff(time.ticks_ms(), coro_begin)
        if coro_throttle >= 0:
            await asyncio.sleep_ms(coro_throttle)
        else:
            await asyncio.sleep(0)

# Decrypt and decode received BT values ------------------------------------------
async def bt_decrypt(bt_queue):
    global comp_time, scanner, victron
    async for bt_input in bt_queue:
        coro_begin = time.ticks_ms()
        pin_led_bt.on()
        mac = bt_input[0]                           # MAC address, bytes
        # mac_type = bt_input[1]                    # Address type, integer
        adv_data = bt_input[4]                      # Advertisement data, bytes
        kb0 = struct.unpack('B',adv_data[14:15])[0] # 1st encryption key byte
        
        if _DEBUG:
            timestamp = time.ticks_diff(time.ticks_ms(), scanner._start_time) / 1000
            adv_type = bt_input[2]                  # Advertisement type, integer
            rssi = bt_input[3]                      # RSSI, integer
            print("\n{:.1f}s - Target Device Found - Address: {mac}, RSSI: {rssi}, Adv. Type: {adv_type}".format(
                timestamp,
                mac=':'.join(['{:02X}'.format(b) for b in mac]),
                rssi=rssi,
                adv_type=adv_type
            ))
            if _DEBUG >= 2:
                record_type = struct.unpack('B',adv_data[11:12])[0]
                nonce = struct.unpack('H',adv_data[12:14])[0]
                # Print the entire advertising data as hex
                print("  Raw Advertising Data (Hex):", ' '.join(['{:02X}'.format(b) for b in adv_data]))
                print(f"  Record type: {record_type:#04X}   Nonce: {nonce:#06X}   Key byte 0: {kb0:#04X}")

        for dev in victron:
            if victron[dev]["mac"] == mac:
                if victron[dev]["key"][0:1] == kb0.to_bytes(1,0):
                    if _DEBUG >= 2: print("  Encryption key matches.")
                    
                    # AES-CTR Decryption
                    # We should use AES-CTR but it is not implemented into mycropython's
                    # cryptolib, so we need to fake it using ECB.
                    # We have at most 16 bytes to decrypt, so we can do it in a single
                    # pass with the nonce + a zero CTR value.
                    ctr = bytearray(adv_data[12:14])                          # Start with nonce
                    ctr.extend(bytes(14))                                     # Counter is zero
                    # if _DEBUG >= 2: print("  Ctr feed (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr]))
                    ciphertext = bytearray(adv_data[15:])                     # Our ciphertext
                    if len(adv_data[15:]) < 16 :                              # Extend it to 16 bytes
                           ciphertext.extend(bytes(16 - len(adv_data[15:])))  # if needed
                    cipher = cryptolib.aes(victron[dev]["key"],1)             # Initialize AES ECB with key
                    cipher.encrypt(ctr,ctr)                                   # Encrypt counter
                    # if _DEBUG >= 2: print("  Encrypted CTR (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr]))
                    cleartext = bytes(a ^ b for a, b in zip(ciphertext, ctr)) # XOR results with ciphertext

                    bt_decode(dev,cleartext)                                  # Now decode what we got
                else:
                    if _DEBUG: print(f"  Encryption key mismatch ! Device {dev}:mac Ours: {victron[dev]["key"][0:1]}, got: {kb0.to_bytes(1,0)}")
                    break 
                break
 
        if _DEBUG >= 3: print(f"* CORO: bt_decrypt: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.")
        comp_time += time.ticks_diff(time.ticks_ms(), coro_begin)
        pin_led_bt.off()
        await asyncio.sleep(0)

# Main loop ==================================================
async def main():
    global comp_time, last_time, loop_time
    global scanner, bt_queue, bt_stat
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(_handle_exception)
    
    # Bluetooth thread safe queue
    bt_queue = ThreadSafeQueue([[bytes(6), int(0), int(0), int(0), bytes(48)] for _ in range(20)])
    
    # Create scheduled tasks
    task_bt_expire = asyncio.create_task(bt_expire(5000))                         # Expire old BT values
    task_bt_decrypt = asyncio.create_task(bt_decrypt(bt_queue))                                # Decrypt and decode BT values
    await asyncio.sleep(0)
    
    # Start Bluetooth BLE scanner
    scanner = BLEScanner(ble, maclist)
    scanner.start_scan(duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US)
    
    while True:
        # Sets the main loop defined duration
        loop_begin_tick = time.ticks_ms()
        loop_end_target = time.ticks_add(loop_begin_tick,_TMR_MAIN_LOOP)
        
        # Wait until desired loop duration
        loop_time = time.ticks_diff(time.ticks_ms(),loop_begin_tick)
        last_time = comp_time + loop_time
        comp_time = 0
        loop_throttle = time.ticks_diff(loop_end_target,time.ticks_ms()) - 1
        if loop_throttle >= 0 :
            await asyncio.sleep_ms(loop_throttle)
        else:
            loop_throttle = 0

# Let's do it !
asyncio.run(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants