In [None]:
import os
import shutil
import pathlib 
from enum import Enum
import json
from csnake import CodeWriter, Struct, Variable, Function, VariableValue, Subscript, FormattedLiteral
import numpy as np
from typing import Literal, List
import sys

In [None]:
class SensorValues:
    def __init__(
        self,
        sensor_id: int,
        type: int,
        v_id: int,
        value: int,
        ticket: int,
        masked: bool,
    ):
        self.sensor_id = sensor_id
        self.type = type
        self.value_id = v_id
        self.value = value
        self.latest_arrive_ticket = ticket
        self.is_masked = masked

    def __str__(self):
        return f"[{self.sensor_id}, {self.type}]: {self.value, self.latest_arrive_ticket, self.is_masked}"


class GateState:
    def __init__(self, sensor_list: List[SensorValues], latest_value_id: int):
        self.sensor_value_states = sensor_list
        self.latest_value_id = latest_value_id


NUM_UNIQUE_SENSOR_VALUES = 4
SENSOR_TYPE_ID_DWAX509M183X0 = 1
SENSOR_TYPE_ID_REED_SWITCH = 2
SENSOR_TYPE_ID_REED_SWITCH_NC = 3
SENSOR_TYPE_ID_REED_SWITCH_NO = 4
REED_SENSOR_DEBOUNCE_MS = 60
REED_SENSOR_PIN_STATE_OPEN = 0
REED_SENSOR_PIN_STATE_CLOSED = 1
REED_SENSOR_NOT_ACTIVATED = 0
REED_SENSOR_ACTIVATED = 1


def compare_reed_sensor_value_state(sensor: SensorValues, comp_state: int):
    is_nc_pin = sensor.type == SENSOR_TYPE_ID_REED_SWITCH_NC
    check_is_activated = comp_state == REED_SENSOR_ACTIVATED

    if check_is_activated:
        return (
            sensor.value == REED_SENSOR_PIN_STATE_OPEN
            if is_nc_pin
            else sensor.value == REED_SENSOR_PIN_STATE_CLOSED
        )
    else:
        return (
            sensor.value == REED_SENSOR_PIN_STATE_CLOSED
            if is_nc_pin
            else sensor.value == REED_SENSOR_PIN_STATE_OPEN
        )


def verify_ticket_sequence(gate_state: GateState, closing_phase: bool, last_state_ticket: int):
    # Determine phase based on latest sensor state
    closing_phase = compare_reed_sensor_value_state(
        gate_state.sensor_value_states[gate_state.latest_value_id], 
        REED_SENSOR_ACTIVATED
    )
    phase_comp_state = REED_SENSOR_ACTIVATED if closing_phase else REED_SENSOR_NOT_ACTIVATED
    
    got_ticket = False
    seq_valid = True
    
    # Iteration parameters based on phase
    start_idx = 0 if closing_phase else (NUM_UNIQUE_SENSOR_VALUES - 1)
    end_idx = NUM_UNIQUE_SENSOR_VALUES if closing_phase else -1
    step = 1 if closing_phase else -1
    
    i = start_idx
    prev_valid_i = -1
    prev_prev_valid_i = -1  # Track the sensor before prev_valid_i
    first_not_masked_found = False
    
    while i != end_idx:
        # 1. Skip masked sensors
        if gate_state.sensor_value_states[i].is_masked:
            i += step
            continue
        
        # Get the 'first' ticket of the current phase's sequence
        if not got_ticket:
            got_ticket = True
        
        # Check if the sensor value corresponds to phase
        if not compare_reed_sensor_value_state(gate_state.sensor_value_states[i], phase_comp_state):
            i += step
            continue
        
        # The first 'valid' (triggered & not masked) sensor is found
        if not first_not_masked_found:
            first_not_masked_found = True
            prev_valid_i = i
            i += step
            continue
        
        # Compare tickets of current sensor (i) with the previous valid sensor (prev_valid_i)
        if gate_state.sensor_value_states[i].latest_arrive_ticket < gate_state.sensor_value_states[prev_valid_i].latest_arrive_ticket:
            # Ticket inversion detected - need to determine which sensor is out of sequence
            
            # Check if current[i] fits in sequence with prev_prev_valid_i
            # - If prev_prev exists and ticket[i] > ticket[prev_prev], then prev_valid is out of sequence (Scenario B)
            # - Otherwise, current[i] is out of sequence (Scenario A)
            if (prev_prev_valid_i != -1 and 
                gate_state.sensor_value_states[i].latest_arrive_ticket > gate_state.sensor_value_states[prev_prev_valid_i].latest_arrive_ticket):
                # Scenario B: [1,3,2] - prev_valid_i (middle one) was out of sequence
                gate_state.sensor_value_states[prev_valid_i].is_out_of_sequence = True
            else:
                # Scenario A: [2,1,?] - current sensor i was out of sequence
                gate_state.sensor_value_states[i].is_out_of_sequence = True
            
            seq_valid = False
        
        prev_prev_valid_i = prev_valid_i
        prev_valid_i = i
        i += step
    print(
        f"Phase: {'Closing' if closing_phase else 'Opening'}, Order valid: {seq_valid}"
    )
    return seq_valid

# def verify(gate_state: GateState, closing_phase: bool, last_state_ticket: int):
#     # /* determine phase */
#     closing_phase = compare_reed_sensor_value_state(
#         gate_state.sensor_value_states[gate_state.latest_value_id],
#         REED_SENSOR_ACTIVATED,
#     )
#     order_valid = True

#     # /* Set iteration parameters based on phase */
#     start_idx = 0 if closing_phase else (NUM_UNIQUE_SENSOR_VALUES - 1)
#     end_idx = NUM_UNIQUE_SENSOR_VALUES if closing_phase else -1
#     step = 1 if closing_phase else -1

#     i = start_idx
#     prev_valid_i = -1
#     first_not_masked_found = False

#     while i != end_idx:
#         # // 1. Skip masked sensors
#         if gate_state.sensor_value_states[i].is_masked:
#             i += step
#             continue
        

#         if not first_not_masked_found:
#             last_state_ticket = gate_state.sensor_value_states[i].latest_arrive_ticket
#         # // 2. Check if the ticket is fresh (must be newer than the snapshot)
#         # // If a sensor is NOT masked but its ticket is old, it means it hasn't triggered in this movement
#         if gate_state.sensor_value_states[i].latest_arrive_ticket <= last_state_ticket:
#             # // If we expect sensors to trigger in order, any un-triggered sensor
#             # // encountered AFTER the first valid trigger might be okay,
#             # // but finding an old ticket where we expect a new one is a fail.
#             # // i += step;
#             order_valid = False
#             break

#         # // 3. Logic for the first valid (triggered & unmasked) sensor found
#         if not first_not_masked_found:
#             first_not_masked_found = True
#             prev_valid_i = i
#             i += step
#             continue

#         # // 4. Compare current sensor (i) with the previous valid sensor (prev_valid_i)
#         # // In both phases, the "current" sensor in the sequence must have a
#         # // GREATER (newer) ticket than the "previous" sensor in the sequence.
#         if (
#             gate_state.sensor_value_states[i].latest_arrive_ticket
#             > gate_state.sensor_value_states[prev_valid_i].latest_arrive_ticket
#         ):
#             order_valid &= True
#         else:
#             order_valid = False
#             break

#         prev_valid_i = i
#         i += step

#     # /* Final validation: If no sensors were active at all, or the last one was invalid */
#     if not first_not_masked_found:
#         order_valid = 0

#     print(
#         f"Phase: {'Closing' if closing_phase else 'Opening'}, Order valid: {order_valid}"
#     )

latest_snapshot_ticket = 0
sensor_value_states_closing = GateState(
    [
        SensorValues(1, SENSOR_TYPE_ID_REED_SWITCH_NC, 0, 0, 4, False),
        SensorValues(1, SENSOR_TYPE_ID_REED_SWITCH_NO, 1, 1, 5, False),
        SensorValues(2, SENSOR_TYPE_ID_REED_SWITCH_NC, 2, 0, 6, False),
        SensorValues(2, SENSOR_TYPE_ID_REED_SWITCH_NO, 3, 1, 7, False),
    ],
    3,
)
for i in sensor_value_states_closing.sensor_value_states:
    print(i)
verify_ticket_sequence(sensor_value_states_closing, True, latest_snapshot_ticket)
print("----")

latest_snapshot_ticket = 4
sensor_value_states_closing_2 = GateState(
    [
        SensorValues(1, SENSOR_TYPE_ID_REED_SWITCH_NC, 0, 0, 5, False),
        SensorValues(1, SENSOR_TYPE_ID_REED_SWITCH_NO, 1, 1, 2, False),
        SensorValues(2, SENSOR_TYPE_ID_REED_SWITCH_NC, 2, 0, 3, False),
        SensorValues(2, SENSOR_TYPE_ID_REED_SWITCH_NO, 3, 1, 6, False),
    ],
    3,
)
for i in sensor_value_states_closing_2.sensor_value_states:
    print(i)
verify_ticket_sequence(sensor_value_states_closing_2, True, latest_snapshot_ticket)
print("----")

latest_snapshot_ticket = 4
sensor_value_states_closing_2 = GateState(
    [
        SensorValues(1, SENSOR_TYPE_ID_REED_SWITCH_NC, 0, 0, 5, False),
        SensorValues(1, SENSOR_TYPE_ID_REED_SWITCH_NO, 1, 1, 2, True),
        SensorValues(2, SENSOR_TYPE_ID_REED_SWITCH_NC, 2, 0, 3, True),
        SensorValues(2, SENSOR_TYPE_ID_REED_SWITCH_NO, 3, 1, 6, False),
    ],
    3,
)
for i in sensor_value_states_closing_2.sensor_value_states:
    print(i)
verify_ticket_sequence(sensor_value_states_closing_2, True, latest_snapshot_ticket)
print("----")


latest_snapshot_ticket = 4
sensor_value_states_closing_2 = GateState(
    [
        SensorValues(1, SENSOR_TYPE_ID_REED_SWITCH_NC, 0, 0, 5, False),
        SensorValues(1, SENSOR_TYPE_ID_REED_SWITCH_NO, 1, 1, 6, False),
        SensorValues(2, SENSOR_TYPE_ID_REED_SWITCH_NC, 2, 0, 7, False),
        SensorValues(2, SENSOR_TYPE_ID_REED_SWITCH_NO, 3, 1, 4, False),
    ],
    3,
)
for i in sensor_value_states_closing_2.sensor_value_states:
    print(i)
verify_ticket_sequence(sensor_value_states_closing_2, True, latest_snapshot_ticket)
print("----")


sensor_value_states_opening = GateState(
    [
        SensorValues(1, SENSOR_TYPE_ID_REED_SWITCH_NC, 0, 1, 7, False),
        SensorValues(1, SENSOR_TYPE_ID_REED_SWITCH_NO, 1, 0, 6, False),
        SensorValues(2, SENSOR_TYPE_ID_REED_SWITCH_NC, 2, 1, 5, False),
        SensorValues(2, SENSOR_TYPE_ID_REED_SWITCH_NO, 3, 0, 4, False),
    ],
    0,
)




# for i in sensor_value_states_closing.sensor_value_states:
#     print(i)
# print("----")
# verify(sensor_value_states_opening,False)

[1, 3]: (0, 4, False)
[1, 4]: (1, 5, False)
[2, 3]: (0, 6, False)
[2, 4]: (1, 7, False)
Phase: Closing, Order valid: True
----
[1, 3]: (0, 5, False)
[1, 4]: (1, 2, False)
[2, 3]: (0, 3, False)
[2, 4]: (1, 6, False)
Phase: Closing, Order valid: False
----
[1, 3]: (0, 5, False)
[1, 4]: (1, 2, True)
[2, 3]: (0, 3, True)
[2, 4]: (1, 6, False)
Phase: Closing, Order valid: True
----
[1, 3]: (0, 5, False)
[1, 4]: (1, 6, False)
[2, 3]: (0, 7, False)
[2, 4]: (1, 4, False)
Phase: Closing, Order valid: False
----


In [None]:
a = {'val':[0,1,2]}
a.get('val')

[0, 1, 2]

In [None]:
class SensorConfigID(Enum):
    EQUAL_PARALLEL = 0b00    # equal priority, order doesn't matter 
    EQUAL_ORDERED = 0b01     # equal priority, order matters 
    WEIGHTED_PARALLEL = 0b10 # different weights/priorities, order doesn't matter 
    WEIGHTED_ORDERED = 0b11  # different weights/priorities, order matters 


config = 'EQUAL_ORDERED'


In [None]:
l = "polder11/gate13/"

int(l.split('/')[0].removeprefix('polder'))

int(l.split('/')[1].removeprefix('gate'))

13

In [None]:
suc = 0  # Start with 0 (inactive state)

# A sequence of inputs
inputs = [1, 1, 0, 1, 0]
inputs = [0, 0, 0, 1, 0]

for input_value in inputs:
    # If the input is 1, set suc to 1; otherwise, it remains 1 if already set.
    suc = suc or input_value
    suc |= input_value
    print(suc)

0
0
0
1
1


In [None]:
p = '/home/timon/docs/uni/projects/resuce-mate-sensors/silenos/code_gen/test.py'

pathlib.Path(p).parent.parent.joinpath("src").joinpath("sensor_config.h")

PosixPath('/home/timon/docs/uni/projects/resuce-mate-sensors/silenos/src/sensor_config.h')

In [None]:
data = None
with open("../sensor_configs/default_sensor_config.json","r") as f:
    data = json.load(f)

data

{'location': 'polder11/gate13/',
 'sensors': [{'name': 'polder11/gate13/01',
   'type': '1',
   'port_pin1': [1, 9],
   'port_pin2': [0, 8]},
  {'name': 'polder11/gate13/02',
   'type': '1',
   'port_pin1': [0, 11],
   'port_pin2': [0, 12]},
  {'name': 'polder11/gate13/03', 'type': '1', 'port_pin1': [0, 4]}],
 'multi_sensor_mode': 'EQUAL_ORDERED'}

False

## Was muss generiter werden:
- NUM_SENSORS
- Liste/Array der Typen
- Init code fÃ¼r die alarm_cb_args


In [None]:
class SensorTypeID(Enum):
    DWAX = "SENSOR_TYPE_ID_DWAX509M183X0" 
    REED_NC = "SENSOR_TYPE_ID_REED_SWITCH_NC" 
    REED_NO = "SENSOR_TYPE_ID_REED_SWITCH_NO"


In [None]:

cw = CodeWriter()
cw.add_lines([
    "#ifndef SENSOR_CONFIG_H_",
    "#define SENSOR_CONFIG_H_",
    "\n"
])
cw.include("sensors.h")
cw.include("dwax509m183x0.h")
cw.include("reed_sensor_driver.h")
cw.add_line(ignore_indent=True)




cw.add_define("SENSOR_TYPE_ID_DWAX509M183X0",1)
cw.add_define("SENSOR_TYPE_ID_REED_SWITCH_NC",2)
cw.add_define("SENSOR_TYPE_ID_REED_SWITCH_NO",3)
cw.add_line(ignore_indent=True)


cw.add_define("REED_SENSOR_DEBOUNCE_MS",60)
cw.add_line(ignore_indent=True)

cw.start_comment()
cw.add_lines([
            "Macro to encode sensor type and global sensor id in a 16-Bit integer.",
            "---------------------------------",
            "|   Sensor Type (8 bits)     |   Sensor Number (8 bits)   |",
            "|----------------------------|----------------------------|",
            "|  7  6  5  4  3  2  1  0    |  7  6  5  4  3  2  1  0    |",
            "|---------------------------------------------------------|",
            ])
cw.end_comment()
cw.add_line("#define ENCODE_SENSOR_TYPE_ID(type, id) ((type) << 8 | (id))")
cw.add_line(ignore_indent=True)


# add sensor base type
cw.add_lines([
    "typedef union {",
    "\treed_sensor_driver_t reed_sensor;",
    "\tdwax509m183x0_t inductive_sensor;",
    "} sensor_base_type_t;",
    "\n"
])

cw.add_lines([
    "typedef union {",
    "\treed_sensor_driver_params_t reed_sensor_params;",
    "\tdwax509m183x0_params_t inductive_sensor_params;",
    "} sensor_base_params_t;",
    "\n"
])


cw.add_line(comment='-' * 30 + 'Sensor Declaration' + '-' * 30)
cw.add_line(ignore_indent=True)

num_sensors = len(data["sensors"]) 
cw.add_line(comment='Number of physical sensors connected (some sensors have multiple contacts e.g. reed sensors)')
cw.add_define("NUM_SENSORS",num_sensors)
cw.add_line(ignore_indent=True)

num_unique_sensor_values = num_sensors + sum([1 for s in data["sensors"] if int(s["type"]) == 1 ])
cw.add_define("NUM_UNIQUE_SENSOR_VALUES",num_unique_sensor_values)


cw.add_line("static alarm_cb_args_t alarm_cb_args[NUM_UNIQUE_SENSOR_VALUES];")
cw.add_line(ignore_indent=True)




sensors = []
init_code_lines = []
id_counter = 0

for i,s in enumerate(data["sensors"]):
    s_type = None 

    name_base= ""

    if int(s["type"]) == 1: # REED
        name_base = "reed"
        s_type = "reed_sensor_driver_t"


    elif int(s["type"]) == 2: # INDUCTIVE
        name_base = "dwax"
        s_type = "dwax509m183x0_t"


    else:
        TypeError(f"Type {s['type']} not supported!")        


    ## create sensor handle variable

    s_name = f"sensor_{i+1}_{name_base}"
    # s_var = Variable(
    #     s_name,
    #     primitive=f"{s_type}",
    # )


    # cw.add_define(f"SENSOR_{i+1}",i)

    
    ## create sensor contact defines and callback arguments
    if int(s["type"]) == 1:
        s_id_nc = f"{s_name.upper()}_NC_ID"
        cw.add_define(s_id_nc,id_counter)
        
        init_code_lines.extend([
        f'''
alarm_cb_args[{s_id_nc}].pid = thread_getpid();
alarm_cb_args[{s_id_nc}].msg.type = ENCODE_SENSOR_TYPE_ID({SensorTypeID.REED_NC.value},{id_counter});
alarm_cb_args[{s_id_nc}].msg.content.ptr = (void *)&registered_sensors[{i}]; 

         '''
        ])

        id_counter += 1

        ## no 
        s_id_no = f"{s_name.upper()}_NO_ID"

        cw.add_define(s_id_no,id_counter)
        
        init_code_lines.extend([
        f'''
alarm_cb_args[{s_id_no}].pid = thread_getpid();
alarm_cb_args[{s_id_no}].msg.type = ENCODE_SENSOR_TYPE_ID({SensorTypeID.REED_NO.value},{id_counter});
alarm_cb_args[{s_id_no}].msg.content.ptr = (void *)&registered_sensors[{i}];

         '''
        ])

        id_counter += 1

        ## driver init
        init_code_lines.extend([
        f'''
// first cast to specific param type and then to base params type for the array.
registered_sensors_params[{i}] = (sensor_base_params_t) (reed_sensor_driver_params_t){{
                                        .nc_pin = GPIO_PIN({s["port_pin1"][0]},{s["port_pin1"][1]}),
                                        .no_pin = GPIO_PIN({s["port_pin2"][0]},{s["port_pin2"][1]}),
                                        .nc_int_flank = GPIO_BOTH,
                                        .no_int_flank = GPIO_BOTH,
                                        .nc_callback = reed_nc_callback,
                                        .no_callback = reed_no_callback,
                                        .nc_callback_args = (void *)&alarm_cb_args[{s_id_nc}],
                                        .no_callback_args = (void *)&alarm_cb_args[{s_id_no}],
                                        .use_external_pulldown = false,
                                        .debounce_ms = REED_SENSOR_DEBOUNCE_MS}};
reed_sensor_driver_init(&registered_sensors[{i}].reed_sensor, &registered_sensors_params[{i}].reed_sensor_params);
        '''
        ])




    elif int(s["type"]) == 2:
        s_id = f"{s_name.upper()}_ID"
        cw.add_define(s_id,id_counter)
        init_code_lines.extend([
        f'''
alarm_cb_args[{s_id}].pid = thread_getpid();
alarm_cb_args[{s_id}].msg.type = ENCODE_SENSOR_TYPE_ID({SensorTypeID.DWAX.value},{id_counter});
alarm_cb_args[{s_id}].msg.content.ptr = (void *)&registered_sensors[{i}];

         '''
        ])
        id_counter += 1

    ## add sensor handle variable declaration
    # cw.add_variable_declaration(s_var)
    cw.add_line(ignore_indent=True)
    # sensors.append(s_var)



cw.add_line(ignore_indent=True)


registered_sensors = Variable(
    "registered_sensors",
    primitive="sensor_base_type_t",
    array=num_sensors
)

registered_sensors_params = Variable(
    "registered_sensors_params",
    primitive="sensor_base_params_t",
    array=num_sensors
)



cw.add_variable_declaration(registered_sensors)
cw.add_variable_declaration(registered_sensors_params)



init_func = Function(
    "init_sensors",
    return_type="int",
)
init_func.add_code(init_code_lines + ["return 0;"])

#todo: ADD driver initialization. example:
#  reed_sensor_driver_params_t params = {.nc_pin = nc_pin,
#                                           .no_pin = no_pin,
#                                           .nc_int_flank = GPIO_BOTH,
#                                           .no_int_flank = GPIO_BOTH,
#                                           //   .nc_callback = reed_nc_callback_and_dwax_trigger,
#                                           .nc_callback = reed_nc_callback,
#                                           .no_callback = reed_no_callback,
#                                         //   .nc_callback_args = (void *)&alarm_cb_args, // Note: passing the whole array to be able to access the callback of dawx too
#                                           .nc_callback_args = (void *)&alarm_cb_args[1],
#                                           .no_callback_args = (void *)&alarm_cb_args[2],
#                                           .use_external_pulldown = false,
#                                           .debounce_ms = REED_SENSOR_DEBOUNCE_MS};
#     reed_sensor_driver_init(&sensor_02, &params);


cw.add_function_definition(init_func)


cw.add_line("#endif // SENSOR_CONFIG_H_")
print(cw)

# with open("src/sensor_config.h","w") as f:
#     f.writelines(cw.code)



TypeError: VariableValue() takes no arguments

In [None]:
print("DWAX:   {:>08b}".format((SensorTypeID.DWAX.value << 8) | (0)))
print("REED NC {:>08b}".format((SensorTypeID.REED_NC.value << 8) | (1)))
print("REED NO {:>08b}".format((SensorTypeID.REED_NO.value << 8) | (2)))


print("DWAX:   {:>08b}".format((SensorTypeID.DWAX.value << 1) | (0)))
print("REED NC {:>08b}".format((SensorTypeID.REED_NC.value << 1) | (1)))
print("REED NO {:>08b}".format((SensorTypeID.REED_NO.value << 1) | (2)))


TypeError: unsupported operand type(s) for <<: 'str' and 'int'

In [None]:
f'''
reed_sensor_driver_params_t params = {{.nc_pin = {23},
                                          .no_pin = no_pin,
                                          .nc_int_flank = GPIO_BOTH,
                                          .no_int_flank = GPIO_BOTH,
                                          //   .nc_callback = reed_nc_callback_and_dwax_trigger,
                                          .nc_callback = reed_nc_callback,
                                          .no_callback = reed_no_callback,
                                        //   .nc_callback_args = (void *)&alarm_cb_args, // Note: passing the whole array to be able to access the callback of dawx too
                                          .nc_callback_args = (void *)&alarm_cb_args[1],
                                          .no_callback_args = (void *)&alarm_cb_args[2],
                                          .use_external_pulldown = false,
                                          .debounce_ms = REED_SENSOR_DEBOUNCE_MS}};
'''


'\nreed_sensor_driver_params_t params = {.nc_pin = 23,\n                                          .no_pin = no_pin,\n                                          .nc_int_flank = GPIO_BOTH,\n                                          .no_int_flank = GPIO_BOTH,\n                                          //   .nc_callback = reed_nc_callback_and_dwax_trigger,\n                                          .nc_callback = reed_nc_callback,\n                                          .no_callback = reed_no_callback,\n                                        //   .nc_callback_args = (void *)&alarm_cb_args, // Note: passing the whole array to be able to access the callback of dawx too\n                                          .nc_callback_args = (void *)&alarm_cb_args[1],\n                                          .no_callback_args = (void *)&alarm_cb_args[2],\n                                          .use_external_pulldown = false,\n                                          .debounce_ms = REED_SENSOR_

In [None]:
cw = CodeWriter()
num_sensors=5

cw.add_define("NUM_SENSORS", num_sensors)

registered_sensors = Variable(

    "registered_sensors", primitive="sensor_base_type_t", array=f"NUM_SENSORS"
)


cw.add_define("TESTA",  int(data['location'].split('/')[0].removeprefix('polder')) )
cw.add_variable_declaration(registered_sensors, extern=True)

i = 1
cw.add_lines(
                    [
                        f"alarm_cb_args[0].pid = thread_getpid();",
                        f"alarm_cb_args[0].msg.type = ENCODE_SENSOR_TYPE_ID({1000},{0});",
                        f"alarm_cb_args[{0}].msg.content.ptr = (void *)&{Subscript(registered_sensors, i)};\n",
                        "\n",
                    ]
                )
val = 0xff


var = Variable(
    "pic",
    primitive="int",
    value=Literal[val],

)
print(type(Literal[data['location'].split('/')[0].removeprefix('polder')]))
print(cw)

# with open("test.h",'w') as f:
#     f.write(cw.code)

<class 'typing._LiteralGenericAlias'>
#define NUM_SENSORS 5
#define TESTA 11
extern sensor_base_type_t registered_sensors[NUM_SENSORS];
alarm_cb_args[0].pid = thread_getpid();
alarm_cb_args[0].msg.type = ENCODE_SENSOR_TYPE_ID(1000,0);
alarm_cb_args[0].msg.content.ptr = (void *)&registered_sensors[1];


'registered_sensors'

In [None]:
dFromSide=4.5
dMiddle=14
count=3
spacing=15
print( -((((dFromSide*2)+dMiddle + spacing)*(count-1) ) ) )


-99.0
-114.0


In [None]:
from cbor2 import loads, dumps
import numpy as np

In [None]:
SENSOR_TYPE_ID_REED_SWITCH_NC = 2
SENSOR_TYPE_ID_REED_SWITCH_NO = 3

SENSOR_ENCODE_TYPE_BITS = 4
SENSOR_ENCODE_ID_BITS = 4

ENCODE_SENSOR_TYPE_ID_BITS = (SENSOR_ENCODE_TYPE_BITS+SENSOR_ENCODE_ID_BITS)

def ENCODE_SENSOR_TYPE_ID(type, id):
    return ((type) << 8 | (id))

In [None]:
2**ENCODE_SENSOR_TYPE_ID_BITS

256

In [None]:
data1 = {
    "i": [
        ENCODE_SENSOR_TYPE_ID(SENSOR_TYPE_ID_REED_SWITCH_NC, 0),
        ENCODE_SENSOR_TYPE_ID(SENSOR_TYPE_ID_REED_SWITCH_NO, 1),
        ENCODE_SENSOR_TYPE_ID(SENSOR_TYPE_ID_REED_SWITCH_NC, 2),
        ENCODE_SENSOR_TYPE_ID(SENSOR_TYPE_ID_REED_SWITCH_NO, 3),
        ENCODE_SENSOR_TYPE_ID(SENSOR_TYPE_ID_REED_SWITCH_NC, 4),
        ENCODE_SENSOR_TYPE_ID(SENSOR_TYPE_ID_REED_SWITCH_NO, 5),
    ],
    "d": [1, 0, 1, 0, 1, 0],
    "c": [
        np.iinfo(np.uint8).max,
        np.iinfo(np.uint8).max,
        np.iinfo(np.uint8).max,
        np.iinfo(np.uint8).max,
        np.iinfo(np.uint8).max,
        np.iinfo(np.uint8).max,
    ],
    "g": 0,
    "s": np.iinfo(np.uint32).max,
    "t": np.iinfo(np.uint32).max,
}

data_max_values = {
    "l1": np.iinfo(np.uint8).max,
    "l2": np.iinfo(np.uint8).max,
    "tb": np.iinfo(np.uint8).max,
    "ib": np.iinfo(np.uint8).max,
    "i": [
        (ENCODE_SENSOR_TYPE_ID_BITS**2) - 1,
    ],
    "v": [
        np.iinfo(np.uint32).max,
    ],
    "c": [
        np.iinfo(np.uint8).max,
    ],
    "g": 1,
    "s": np.iinfo(np.uint32).max,
    "t": np.iinfo(np.uint32).max,
}

display(data_max_values)

# b1 = dumps(data1)
# display(
#     len(b1),
#     b1.hex(),
# )

int(np.iinfo(np.uint32).bits/4)

{'l1': 255,
 'l2': 255,
 'tb': 255,
 'ib': 255,
 'i': [63],
 'v': [4294967295],
 'c': [255],
 'g': 1,
 's': 4294967295,
 't': 4294967295}

8.0

## Javascript - ThingsNetwork Payload Formatter Test

In [5]:

const hexData = "AC626C310B626C320D6269620362746204627662046169831840190141190242617683010000616383010000616B830A08076167F461730461741A000656660000000000000000000000000000000000000000000000000000000000";
// const hexData = "A56169851840186118421863184461638500000000006167E061730061741A025138C3";

In [6]:
import {decodeUplink} from "../utils/things_network_payload_formatter.js"


function parseHexToBytes(hex) {
    const bytes = [];
    for (let i = 0; i < hex.length; i += 2) {
        const byte = parseInt(hex.substr(i, 2), 16);
        bytes.push(byte);
    }
    return { bytes };
}


var input = parseHexToBytes(hexData);
decodeUplink(input)


{
  data: {
    location: [32m"polder11/gate13"[39m,
    sensors: [
      {
        sensor_id: [33m0[39m,
        type: [32m"REED_NO"[39m,
        value: [33m1[39m,
        event_counter: [33m1[39m,
        latest_arrive_ticket: [33m10[39m
      },
      {
        sensor_id: [33m1[39m,
        type: [32m"REED_NO"[39m,
        value: [33m0[39m,
        event_counter: [33m0[39m,
        latest_arrive_ticket: [33m8[39m
      },
      {
        sensor_id: [33m2[39m,
        type: [32m"REED_NO"[39m,
        value: [33m0[39m,
        event_counter: [33m0[39m,
        latest_arrive_ticket: [33m7[39m
      }
    ],
    gate_state: [32m"OPEN"[39m,
    sequence_number: [33m4[39m,
    timestamp: [33m415334[39m
  },
  errors: []
}

# Lora Bytes

In [None]:
binary_data = bytes.fromhex("15496E0100FFFFFFFF3CADB8301698D23D06F50432E5D01E49608341C523D73978412D92BC7E7816986396260CF4EF57D41CB69971A855F0EF1310BB5525")
binary_data


b'\x15In\x01\x00\xff\xff\xff\xff<\xad\xb80\x16\x98\xd2=\x06\xf5\x042\xe5\xd0\x1eI`\x83A\xc5#\xd79xA-\x92\xbc~x\x16\x98c\x96&\x0c\xf4\xefW\xd4\x1c\xb6\x99q\xa8U\xf0\xef\x13\x10\xbbU%'

In [None]:
import base64

def hex_to_base64(hex_data):
    # Remove spaces and convert to bytes
    hex_bytes = bytes.fromhex(hex_data.replace(" ", ""))
    # Encode to Base64
    return base64.b64encode(hex_bytes).decode('utf-8')

In [None]:
hex_data = "15496E0100FFFFFFFF3CADB8301698D23D06F50432E5D01E49608341C523D73978412D92BC7E7816986396260CF4EF57D41CB69971A855F0EF1310BB5525"

frm_payload = hex_to_base64(hex_data)
print(frm_payload)


FUluAQD/////PK24MBaY0j0G9QQy5dAeSWCDQcUj1zl4QS2SvH54FphjliYM9O9X1By2mXGoVfDvExC7VSU=
