From a9489e8d04e5e5265cf4a1d8fd2677d615a432be Mon Sep 17 00:00:00 2001 From: Felipe Canales Date: Fri, 11 Jun 2021 21:58:48 -0400 Subject: [PATCH 1/5] callback system after reassembly --- fragmentation_layer/code/main.py | 2 +- .../schc_handlers/schc_gateway_handler.py | 19 ++++++++++++------- .../lorawan/ack_on_error_receiver.py | 2 +- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/fragmentation_layer/code/main.py b/fragmentation_layer/code/main.py index 70987ba..521a3b4 100644 --- a/fragmentation_layer/code/main.py +++ b/fragmentation_layer/code/main.py @@ -3,7 +3,7 @@ import binascii import struct -from message import MESSAGE +from message import SHORT_MESSAGE as MESSAGE from schc_handlers import SCHCNodeHandler from schc_protocols import SCHCProtocol diff --git a/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py b/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py index df64c28..f045fa7 100644 --- a/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py +++ b/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py @@ -15,14 +15,18 @@ class SCHCGatewayHandler(SCHCHandler): def __init__(self, protocol, mtu, on_receive_callback=None): super().__init__(protocol, mtu) if on_receive_callback: - def after_reassembly_processing(msg_bytes): - # TODO decompress before calling callback - on_receive_callback(msg_bytes) + used_callback = on_receive_callback else: + used_callback = lambda msg: print("Message received", msg) + + def create_after_processing_callback(rule_id, dtag): def after_reassembly_processing(msg_bytes): # TODO decompress before calling callback - print("Message received:", msg_bytes) - self.callback = after_reassembly_processing + used_callback(msg_bytes) + self.__sessions__[rule_id].pop(dtag) + return after_reassembly_processing + + self.callback_creator = create_after_processing_callback def send_package(self, packet): if self.__protocol__.id == SCHCProtocol.LoRaWAN: @@ -36,7 +40,9 @@ def receive(self, rule_id, dtag, message): if rule_id == LoRaWAN.ACK_ON_ERROR: # message received from schc_machines.lorawan import AckOnErrorReceiver - self.assign_session(rule_id, dtag, AckOnErrorReceiver(LoRaWAN(LoRaWAN.ACK_ON_ERROR), on_success=self.callback)) + self.assign_session(rule_id, dtag, AckOnErrorReceiver( + LoRaWAN(LoRaWAN.ACK_ON_ERROR), + on_success=self.callback_creator(rule_id, dtag))) self.__sessions__[rule_id][dtag].receive_message(message) elif rule_id == LoRaWAN.ACK_ALWAYS: # response received @@ -64,7 +70,6 @@ def handle(self, message, f_port=None, url=None, dev_id=None): "payload_raw": base64.b64encode(response[1:]).decode("utf-8") } r = requests.post(url, data=json.dumps(post_obj), headers={'content-type': 'application/json'}) - print(r.status_code) def generate_message(self, rule_id, dtag, mtu=512): message = self.__sessions__[rule_id][dtag].generate_message(mtu) diff --git a/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py b/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py index 858b183..4f11d91 100644 --- a/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py +++ b/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py @@ -146,7 +146,7 @@ def receive_all1_schc_fragment(self, schc_message): self._logger_.debug("Integrity check successful") compressed_bitmap = None self.__success__ = True - self.on_success(self.sm.payload.as_bytes()) + self.sm.on_success(self.sm.payload.as_bytes()) else: self._logger_.error("Integrity check failed:\tSender: {}\tReceiver:{}".format( schc_message.header.rcs.rcs, From 95003ee0fe5dcc5f442eb427160c41137d68d8f3 Mon Sep 17 00:00:00 2001 From: Felipe Canales Date: Fri, 11 Jun 2021 22:55:23 -0400 Subject: [PATCH 2/5] fix to not sending last ack --- fragmentation_layer/code/schc_handlers/schc_gateway_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py b/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py index f045fa7..ed472e0 100644 --- a/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py +++ b/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py @@ -23,7 +23,7 @@ def create_after_processing_callback(rule_id, dtag): def after_reassembly_processing(msg_bytes): # TODO decompress before calling callback used_callback(msg_bytes) - self.__sessions__[rule_id].pop(dtag) + # self.__sessions__[rule_id].pop(dtag) return after_reassembly_processing self.callback_creator = create_after_processing_callback From 5308cc7fe13734be09dd43fe3978968afe7f8306 Mon Sep 17 00:00:00 2001 From: Felipe Canales Date: Tue, 15 Jun 2021 17:29:15 -0400 Subject: [PATCH 3/5] Remove machine after reassembly --- fragmentation_layer/code/schc_handlers/schc_gateway_handler.py | 2 +- .../code/schc_machines/lorawan/ack_on_error_receiver.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py b/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py index ed472e0..f045fa7 100644 --- a/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py +++ b/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py @@ -23,7 +23,7 @@ def create_after_processing_callback(rule_id, dtag): def after_reassembly_processing(msg_bytes): # TODO decompress before calling callback used_callback(msg_bytes) - # self.__sessions__[rule_id].pop(dtag) + self.__sessions__[rule_id].pop(dtag) return after_reassembly_processing self.callback_creator = create_after_processing_callback diff --git a/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py b/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py index 4f11d91..f922837 100644 --- a/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py +++ b/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py @@ -69,6 +69,7 @@ def generate_message(self, mtu): self.sm.state.enter_state() message = self.sm.message_to_send.pop(0) self._logger_.schc_message(message) + self.sm.on_success(self.sm.payload.as_bytes()) return message else: return None @@ -146,7 +147,6 @@ def receive_all1_schc_fragment(self, schc_message): self._logger_.debug("Integrity check successful") compressed_bitmap = None self.__success__ = True - self.sm.on_success(self.sm.payload.as_bytes()) else: self._logger_.error("Integrity check failed:\tSender: {}\tReceiver:{}".format( schc_message.header.rcs.rcs, From 0382785785d5aeca4b4d61e5461e86f954556dd6 Mon Sep 17 00:00:00 2001 From: Felipe Canales Date: Wed, 16 Jun 2021 12:35:57 -0400 Subject: [PATCH 4/5] Fix: ACK REQ has issues with next fsm --- .../lorawan/ack_on_error_receiver.py | 1 + .../code/schc_machines/schc_fsm.py | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py b/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py index f922837..6278379 100644 --- a/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py +++ b/fragmentation_layer/code/schc_machines/lorawan/ack_on_error_receiver.py @@ -67,6 +67,7 @@ def generate_message(self, mtu): if self.sm.__last_window__ and self.__success__: self.sm.state = self.sm.states["end"] self.sm.state.enter_state() + self.sm.inactivity_timer.reset() message = self.sm.message_to_send.pop(0) self._logger_.schc_message(message) self.sm.on_success(self.sm.payload.as_bytes()) diff --git a/fragmentation_layer/code/schc_machines/schc_fsm.py b/fragmentation_layer/code/schc_machines/schc_fsm.py index 1807780..013eb4b 100644 --- a/fragmentation_layer/code/schc_machines/schc_fsm.py +++ b/fragmentation_layer/code/schc_machines/schc_fsm.py @@ -345,6 +345,22 @@ def generate_message(self, mtu): """ raise SystemExit(self.sm.__end_msg__) + def on_expiration_time(self, alarm) -> None: + """ + Behaviour on time expiration + + Parameters + ---------- + alarm : Timer + Alarm that triggers expiration + + Returns + ------- + None + """ + self.sm.__active__ = False + return + def __init__(self, protocol, dtag=None): """ Constructor @@ -378,6 +394,7 @@ def __init__(self, protocol, dtag=None): self.__exit_msg__ = "" self.__end_msg__ = "" self.message_to_send = list() + self.__active__ = True return def receive_message(self, message): From 8f76723008165702cedf09272d37ae4caf3ef48d Mon Sep 17 00:00:00 2001 From: Felipe Canales Date: Wed, 16 Jun 2021 12:40:31 -0400 Subject: [PATCH 5/5] Missing code in previous commit --- fragmentation_layer/code/schc_handlers/schc_gateway_handler.py | 2 +- fragmentation_layer/code/schc_handlers/schc_handler.py | 2 +- fragmentation_layer/code/schc_machines/schc_fsm.py | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py b/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py index f045fa7..0b52267 100644 --- a/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py +++ b/fragmentation_layer/code/schc_handlers/schc_gateway_handler.py @@ -23,7 +23,7 @@ def create_after_processing_callback(rule_id, dtag): def after_reassembly_processing(msg_bytes): # TODO decompress before calling callback used_callback(msg_bytes) - self.__sessions__[rule_id].pop(dtag) + #self.__sessions__[rule_id].pop(dtag) return after_reassembly_processing self.callback_creator = create_after_processing_callback diff --git a/fragmentation_layer/code/schc_handlers/schc_handler.py b/fragmentation_layer/code/schc_handlers/schc_handler.py index 65f7579..a282b4e 100644 --- a/fragmentation_layer/code/schc_handlers/schc_handler.py +++ b/fragmentation_layer/code/schc_handlers/schc_handler.py @@ -35,5 +35,5 @@ def receive(self, rule_id, dtag, message): def assign_session(self, rule_id, dtag, machine): if rule_id not in self.__sessions__.keys(): self.__sessions__[rule_id] = dict() - if dtag not in self.__sessions__[rule_id].keys(): + if dtag not in self.__sessions__[rule_id].keys() or self.__sessions__[rule_id][dtag].is_active() == False: self.__sessions__[rule_id][dtag] = machine diff --git a/fragmentation_layer/code/schc_machines/schc_fsm.py b/fragmentation_layer/code/schc_machines/schc_fsm.py index 013eb4b..0bed43e 100644 --- a/fragmentation_layer/code/schc_machines/schc_fsm.py +++ b/fragmentation_layer/code/schc_machines/schc_fsm.py @@ -450,3 +450,6 @@ def on_expiration_time(self, alarm): """ self.state.on_expiration_time(alarm) return + + def is_active(self): + return self.__active__