Skip to content

Commit

Permalink
Merge pull request #1276 from thingsboard/feature/connector-id-instea…
Browse files Browse the repository at this point in the history
…d-of-name-identification

Feature/connector id instead of name identification
  • Loading branch information
imbeacon committed Jan 11, 2024
2 parents ad11304 + 1a70320 commit e99bd27
Show file tree
Hide file tree
Showing 27 changed files with 442 additions and 2,186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def run(self): # Main loop of thread
try:
if len(data_from_device) > 0:
converted_data = self.__devices[device]['converter'].convert(self.__devices[device]['device_config'], data_from_device)
self.__gateway.send_to_storage(self.get_name(), converted_data)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), converted_data)
time.sleep(.1)
except Exception as e:
log.exception(e)
Expand Down
6 changes: 3 additions & 3 deletions tests/sdk_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def tearDownClass(cls) -> None:
@patch('tb_gateway_mqtt.TBGatewayMqttClient.gw_connect_device')
def test_add_device(self, mock_func):
self.gateway.add_device(self.DEVICE_NAME,
{'connector': self.gateway.available_connectors['MQTT Broker Connector']}, self.DEVICE_TYPE)
{'connector': self.gateway.available_connectors_by_name['MQTT Broker Connector']}, self.DEVICE_TYPE)
mock_func.assert_called_with(self.DEVICE_NAME, self.DEVICE_TYPE)

@patch('tb_gateway_mqtt.TBGatewayMqttClient.gw_disconnect_device')
Expand All @@ -53,7 +53,7 @@ def test_send_attributes(self, mock_func):
data = {self.DEVICE_NAME: {'attributes': {"atr1": 1, "atr2": True, "atr3": "value3"}}}

self.gateway.add_device(self.DEVICE_NAME,
{'connector': self.gateway.available_connectors['MQTT Broker Connector']}, self.DEVICE_TYPE)
{'connector': self.gateway.available_connectors_by_name['MQTT Broker Connector']}, self.DEVICE_TYPE)
self.gateway._TBGatewayService__send_data(data)

mock_func.assert_called_with(self.DEVICE_NAME, {"atr1": 1, "atr2": True, "atr3": "value3"})
Expand All @@ -63,7 +63,7 @@ def test_send_telemetry(self, mock_func):
data = {self.DEVICE_NAME: {'telemetry': {"key1": "11"}}}

self.gateway.add_device(self.DEVICE_NAME,
{'connector': self.gateway.available_connectors['MQTT Broker Connector']}, self.DEVICE_TYPE)
{'connector': self.gateway.available_connectors_by_name['MQTT Broker Connector']}, self.DEVICE_TYPE)
self.gateway._TBGatewayService__send_data(data)

mock_func.assert_called_with(self.DEVICE_NAME, {"key1": "11"})
Expand Down
8 changes: 6 additions & 2 deletions thingsboard_gateway/connectors/bacnet/bacnet_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, gateway, config, connector_type):
'MessagesSent': 0}
super().__init__()
self.__config = config
self.__id = self.__config.get('id')
self.setName(config.get('name', 'BACnet ' + ''.join(choice(ascii_lowercase) for _ in range(5))))
self.__devices = []
self.__device_indexes = {}
Expand Down Expand Up @@ -128,6 +129,9 @@ def close(self):
def get_name(self):
return self.name

def get_id(self):
return self.__id

def get_type(self):
return self._connector_type

Expand Down Expand Up @@ -244,7 +248,7 @@ def __convert_and_save_data(self, queue):
iocb.ioResponse if iocb.ioResponse else iocb.ioError)
except Exception as e:
self._log.exception(e)
self.__gateway.send_to_storage(self.name, converted_data)
self.__gateway.send_to_storage(self.get_name(), self.__id, converted_data)

def __bacnet_device_mapping_response_cb(self, iocb, callback_params):
mapping_type = callback_params["mapping_type"]
Expand All @@ -261,7 +265,7 @@ def __bacnet_device_mapping_response_cb(self, iocb, callback_params):
iocb.ioResponse if iocb.ioResponse else iocb.ioError)
except Exception as e:
self._log.exception(e)
self.__gateway.send_to_storage(self.name, converted_data)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), converted_data)

def __load_converters(self, device):
datatypes = ["attributes", "telemetry", "attribute_updates", "server_side_rpc"]
Expand Down
6 changes: 5 additions & 1 deletion thingsboard_gateway/connectors/ble/ble_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, gateway, config, connector_type):
self._connector_type = connector_type
self.__gateway = gateway
self.__config = config
self.__id = self.__config.get('id')
self.setName(self.__config.get("name", 'BLE Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))))
self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'))

Expand Down Expand Up @@ -102,6 +103,9 @@ def close(self):
def get_name(self):
return self.name

def get_id(self):
return self.__id

def get_type(self):
return self._connector_type

Expand All @@ -126,7 +130,7 @@ def __process_data(self):
self.__log.debug(converted_data)

if converted_data is not None:
self.__gateway.send_to_storage(self.get_name(), converted_data)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), converted_data)
self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1
self.__log.info('Data to ThingsBoard %s', converted_data)
except Exception as e:
Expand Down
6 changes: 5 additions & 1 deletion thingsboard_gateway/connectors/can/can_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self, gateway, config, connector_type):
self.__gateway = gateway
self._connector_type = connector_type
self.__config = config
self.__id = self.__config.get('id')
self._log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'))
self.__bus_conf = {}
self.__bus = None
Expand Down Expand Up @@ -104,6 +105,9 @@ def close(self):
def get_name(self):
return self.name

def get_id(self):
return self.__id

def get_type(self):
return self._connector_type

Expand Down Expand Up @@ -351,7 +355,7 @@ def __check_and_send(self, conf, new_data):

self._log.debug("[%s] Pushing to TB server '%s' device data: %s", self.get_name(), conf["deviceName"], to_send)

self.__gateway.send_to_storage(self.get_name(), to_send)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), to_send)
self.statistics['MessagesSent'] += 1
else:
self._log.debug("[%s] '%s' device data has not been changed", self.get_name(), conf["deviceName"])
Expand Down
4 changes: 4 additions & 0 deletions thingsboard_gateway/connectors/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def open(self):
def close(self):
pass

@abstractmethod
def get_id(self):
pass

@abstractmethod
def get_name(self):
pass
Expand Down
6 changes: 5 additions & 1 deletion thingsboard_gateway/connectors/ftp/ftp_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, gateway, config, connector_type):
self.statistics = {'MessagesReceived': 0,
'MessagesSent': 0}
self.__config = config
self.__id = self.__config.get('id')
self._connector_type = connector_type
self.__gateway = gateway
self.security = {**self.__config['security']} if self.__config['security']['type'] == 'basic' else {
Expand Down Expand Up @@ -180,7 +181,7 @@ def __process_paths(self, ftp):

def __send_data(self, converted_data):
if converted_data:
self.__gateway.send_to_storage(self.getName(), converted_data)
self.__gateway.send_to_storage(self.getName(), self.get_id(), converted_data)
self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1
self.__log.debug("Data to ThingsBoard: %s", converted_data)

Expand All @@ -192,6 +193,9 @@ def close(self):
def get_name(self):
return self.name

def get_id(self):
return self.__id

def get_type(self):
return self._connector_type

Expand Down
6 changes: 5 additions & 1 deletion thingsboard_gateway/connectors/modbus/modbus_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def __init__(self, gateway, config, connector_type):
self.__backward_compatibility_adapter = BackwardCompatibilityAdapter(config, gateway.get_config_path(),
logger=self.__log)
self.__config = self.__backward_compatibility_adapter.convert()
self.__id = self.__config.get('id')
self.setName(self.__config.get("name", 'Modbus Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))))

self.__connected = False
Expand Down Expand Up @@ -316,7 +317,7 @@ def __convert_data(self, params):
return to_send

def _save_data(self, data):
self.__gateway.send_to_storage(self.get_name(), data)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), data)
self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1

def close(self):
Expand Down Expand Up @@ -344,6 +345,9 @@ def close(self):
def get_name(self):
return self.name

def get_id(self):
return self.__id

def __process_slaves(self):
while not self.__stopped:
if not self.__stopped and not ModbusConnector.process_requests.empty():
Expand Down
6 changes: 5 additions & 1 deletion thingsboard_gateway/connectors/mqtt/mqtt_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(self, gateway, config, connector_type):
self.__gateway = gateway # Reference to TB Gateway
self._connector_type = connector_type # Should be "mqtt"
self.config = config # mqtt.json contents
self.__id = self.config.get('id')

self.__log = init_logger(self.__gateway, self.config['name'], self.config.get('logLevel', 'INFO'))
self.statistics = {'MessagesReceived': 0, 'MessagesSent': 0}
Expand Down Expand Up @@ -328,6 +329,9 @@ def close(self):
def get_name(self):
return self.name

def get_id(self):
return self.__id

def __subscribe(self, topic, qos):
message = self._client.subscribe(topic, qos)
try:
Expand Down Expand Up @@ -485,7 +489,7 @@ def put_data_to_convert(self, converter, message, content) -> bool:
return False

def _save_converted_msg(self, topic, data):
if self.__gateway.send_to_storage(self.name, data) == Status.SUCCESS:
if self.__gateway.send_to_storage(self.name, self.get_id(), data) == Status.SUCCESS:
self.statistics['MessagesSent'] += 1
self.__log.debug("Successfully converted message from topic %s", topic)

Expand Down
6 changes: 5 additions & 1 deletion thingsboard_gateway/connectors/ocpp/ocpp_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class OcppConnector(Connector, Thread):
def __init__(self, gateway, config, connector_type):
super().__init__()
self._config = config
self.__id = self._config.get('id')
self._central_system_config = config['centralSystem']
self._charge_points_config = config.get('chargePoints', [])
self._connector_type = connector_type
Expand Down Expand Up @@ -220,6 +221,9 @@ def close(self):
self._log.info('%s has been stopped.', self.get_name())
self._log.reset()

def get_id(self):
return self.__id

def get_name(self):
return self.name

Expand Down Expand Up @@ -249,7 +253,7 @@ def _send_data(self):
while not self.__stopped:
if not self.DATA_TO_SEND.empty():
converted_data = self.DATA_TO_SEND.get()
self._gateway.send_to_storage(self.name, converted_data)
self._gateway.send_to_storage(self.name, self.get_id(), converted_data)
self.statistics['MessagesSent'] += 1
self._log.info("Data to ThingsBoard: %s", converted_data)

Expand Down
6 changes: 5 additions & 1 deletion thingsboard_gateway/connectors/odbc/odbc_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(self, gateway, config, connector_type):
'MessagesSent': 0}
self.__gateway = gateway
self.__config = config
self.__id = self.__config.get('id')
self._log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'))
self._connector_type = connector_type
self.__stopped = False
Expand Down Expand Up @@ -94,6 +95,9 @@ def close(self):
self._log.debug("[%s] Stopping", self.get_name())
self._log.reset()

def get_id(self):
return self.__id

def get_name(self):
return self.name

Expand Down Expand Up @@ -316,7 +320,7 @@ def __check_and_send(self, device_name, device_type, new_data):
self._log.debug("[%s] Pushing to TB server '%s' device data: %s", self.get_name(), device_name, to_send)

to_send['telemetry'] = [to_send['telemetry']]
self.__gateway.send_to_storage(self.get_name(), to_send)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), to_send)
self.statistics['MessagesSent'] += 1
else:
self._log.debug("[%s] '%s' device data has not been changed", self.get_name(), device_name)
Expand Down
6 changes: 5 additions & 1 deletion thingsboard_gateway/connectors/opcua/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, gateway, config, connector_type):
super().__init__()
self.__gateway = gateway
self._config = config
self.__id = self._config.get('id')
self.__server_conf = config.get("server")
self.setName(
self._config.get("name", 'OPC-UA ' + ''.join(choice(ascii_lowercase) for _ in range(5)) + " Connector"))
Expand Down Expand Up @@ -173,7 +174,7 @@ def run(self):
# NOTE: possible performance improvement: use a map to store only one event per
# variable to reduce frequency of messages to platform.
while self.data_to_send:
self.__gateway.send_to_storage(self.get_name(), self.data_to_send.pop())
self.__gateway.send_to_storage(self.get_name(), self.get_id(), self.data_to_send.pop())
if self.__stopped:
self.close()
break
Expand Down Expand Up @@ -242,6 +243,9 @@ def close(self):
self._log.info('%s has been stopped.', self.get_name())
self._log.reset()

def get_id(self):
return self.__id

def get_name(self):
return self.name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, gateway, config, connector_type):
self._connector_type = connector_type
self.__gateway = gateway
self.__config = config
self.__id = self.__config.get('id')
self.__server_conf = config['server']
self.setName(
self.__config.get("name", 'OPC-UA Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))))
Expand Down Expand Up @@ -124,6 +125,9 @@ async def __reset_nodes(self, device_name=None):
pass
self.__subscription = None

def get_id(self):
return self.__id

def get_name(self):
return self.name

Expand Down Expand Up @@ -401,7 +405,7 @@ def __send_data(self):
data = self.__data_to_send.get()
self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1
self.__log.debug(data)
self.__gateway.send_to_storage(self.get_name(), data)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), data)
self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1
self.__log.debug('Data to ThingsBoard %s', data)
else:
Expand Down
6 changes: 5 additions & 1 deletion thingsboard_gateway/connectors/request/request_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, gateway, config, connector_type):
'MessagesSent': 0}
self.__rpc_requests = []
self.__config = config
self.__id = self.__config.get('id')
self._connector_type = connector_type
self.__gateway = gateway
self.setName(self.__config.get("name", "".join(choice(ascii_lowercase) for _ in range(5))))
Expand Down Expand Up @@ -293,12 +294,15 @@ def __process_data(self):
try:
if not self.__convert_queue.empty():
data = self.__convert_queue.get()
self.__gateway.send_to_storage(self.get_name(), data)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), data)
self.statistics["MessagesSent"] = self.statistics["MessagesSent"] + 1

except Exception as e:
self._log.exception(e)

def get_id(self):
return self.__id

def get_name(self):
return self.name

Expand Down

0 comments on commit e99bd27

Please sign in to comment.