Skip to content

Commit

Permalink
Merge pull request #24 from wingrunr21/implement_retries
Browse files Browse the repository at this point in the history
Implement retry functionality
  • Loading branch information
zaknye committed Feb 7, 2024
2 parents 0b7790b + 51abdac commit b89b5ff
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 25 deletions.
3 changes: 3 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
METER_IP=
# MQTT_USER=
# MQTT_PASSWORD=
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# don't accidentally commit your secrets
certs
.env

# OS junk
.DS_Store

# Python stuff
__pycache__/
*.py[cod]
*$py.class
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,28 @@ The following are options that may be passed into the container in the form of e
| -e MQTT_PASSWORD | Password to authenticate to the MQTT server | yes |
| -e CERT_PATH | Path to cert file (within the container) if different than the default | yes |
| -e KEY_PATH | Path to key file (within the container) if different than the default | yes |
| -e LOGLEVEL | Set the log level for logging output (default is INFO) | yes |
## Compose (best way)
Docker compose is the easiest way to integrate this repo in with your other services. Below is an example of how to use compose to integrate with a mosquitto MQTT broker container.
### Example
```
mosquitto:
image: eclipse-mosquitto
...
xcel_itron2mqtt:
image: xcel_itron2mqtt
restart: unless-stopped
volumes:
- ~/xcel_itron2mqtt/certs:/opt/xcel_itron2mqtt/certs
networks:
- main
links:
depends_on:
- mosquitto
environment:
- MQTT_SERVER=mosquitto
```

See the `docker-compose.yaml` file for a working example
## CLI
### Example
```
Expand All @@ -78,6 +84,8 @@ docker run --rm -it \
--entrypoint /bin/bash \
xcel_itron2mqtt:latest
```

Alternatively, the `docker-compose.yaml` will allow you to bring a up an ephemeral MQTT broker along with the xcel_itron2mqtt container. Simply copy `.env.sample` to `.env`, update variables there as needed, and run `docker compose up`. You can then use `docker exec -it xcel_itron2mqtt /bin/bash` to attach to the running container.
## Contributing

Please feel free to create an issue with a feature request, bug, or any other comments you have on the software found here.
Expand Down
22 changes: 22 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: "3.7"

services:
mqtt:
image: emqx/nanomq
container_name: nanomq
ports:
- "1883:1883"
xcel_itron2mqtt:
container_name: xcel_itron2mqtt
build:
context: .
dockerfile: Dockerfile
depends_on:
- mqtt
volumes:
- certs:/opt/xcel_itron2mqtt/certs
environment:
MQTT_SERVER: mqtt
METER_PORT: 8081
env_file:
- .env
11 changes: 9 additions & 2 deletions xcel_itron2mqtt/main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import os
import logging
from time import sleep
from pathlib import Path
from xcelMeter import xcelMeter
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf

INTEGRATION_NAME = "Xcel Itron 5"

LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper()
logging.basicConfig(format='%(levelname)s: %(message)s', level=LOGLEVEL)

# mDNS listener to find the IP Address of the meter on the network
class XcelListener(ServiceListener):
def __init__(self):
Expand Down Expand Up @@ -77,5 +81,8 @@ def mDNS_search_for_meter() -> str | int:
ip_address, port_num = mDNS_search_for_meter()
creds = look_for_creds()
meter = xcelMeter(INTEGRATION_NAME, ip_address, port_num, creds)
# The run method controls all the looping, querying, and mqtt sending
meter.run()
meter.setup()

if meter.initalized:
# The run method controls all the looping, querying, and mqtt sending
meter.run()
3 changes: 2 additions & 1 deletion xcel_itron2mqtt/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ ifaddr==0.2.0
requests==2.28.2
urllib3==1.26.14
zeroconf==0.47.3
pyyaml==6.0
pyyaml==6.0.1
paho-mqtt==1.6.1
tenacity==8.2.3
14 changes: 11 additions & 3 deletions xcel_itron2mqtt/xcelEndpoint.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import yaml
import json
import requests
import logging
import paho.mqtt.client as mqtt
import xml.etree.ElementTree as ET
from copy import deepcopy
from tenacity import retry, stop_after_attempt, before_sleep_log, wait_exponential

logger = logging.getLogger(__name__)

# Prefix that appears on all of the XML elements
IEEE_PREFIX = '{urn:ieee:std:2030.5:ns}'
Expand All @@ -14,8 +18,8 @@ class xcelEndpoint():
Expects a request session that should be shared amongst the
instances.
"""
def __init__(self, session: requests.session, mqtt_client: mqtt.Client,
url: str, name: str, tags: list, device_info: dict, poll_rate = 5.0):
def __init__(self, session: requests.Session, mqtt_client: mqtt.Client,
url: str, name: str, tags: list, device_info: dict):
self.requests_session = session
self.url = url
self.name = name
Expand All @@ -32,14 +36,18 @@ def __init__(self, session: requests.session, mqtt_client: mqtt.Client,
# Setup the rest of what we need for this endpoint
self.mqtt_send_config()

@retry(stop=stop_after_attempt(15),
wait=wait_exponential(multiplier=1, min=1, max=15),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True)
def query_endpoint(self) -> str:
"""
Sends a request to the given endpoint associated with the
object instance
Returns: str in XML format of the meter's response
"""
x = self.requests_session.get(self.url, verify=False, timeout=4.0)
x = self.requests_session.get(self.url, verify=False, timeout=15.0)

return x.text

Expand Down
50 changes: 32 additions & 18 deletions xcel_itron2mqtt/xcelMeter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import yaml
import json
import requests
import logging
import paho.mqtt.client as mqtt
import xml.etree.ElementTree as ET
from time import sleep
from typing import Tuple
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
from requests.packages.urllib3.poolmanager import PoolManager
from requests.adapters import HTTPAdapter
from tenacity import retry, stop_after_attempt, before_sleep_log, wait_exponential

# Local imports
from xcelEndpoint import xcelEndpoint
Expand All @@ -18,6 +19,8 @@
# Our target cipher is: ECDHE-ECDSA-AES128-CCM8
CIPHERS = ('ECDHE')

logger = logging.getLogger(__name__)

# Create an adapter for our request to enable the non-standard cipher
# From https://lukasa.co.uk/2017/02/Configuring_TLS_With_Requests/
class CCM8Adapter(HTTPAdapter):
Expand All @@ -26,22 +29,20 @@ class CCM8Adapter(HTTPAdapter):
Not really sure how much redundancy is actually required here
"""
def init_poolmanager(self, *args, **kwargs):
ssl_version=ssl.PROTOCOL_TLSv1_2
context = create_urllib3_context(ssl_version=ssl_version)
context.check_hostname = False
context.verify_mode = ssl.CERT_REQUIRED
context.set_ciphers(CIPHERS)
kwargs['ssl_context'] = context
kwargs['ssl_context'] = self.create_ssl_context()
return super(CCM8Adapter, self).init_poolmanager(*args, **kwargs)

def proxy_manager_for(self, *args, **kwargs):
kwargs['ssl_context'] = self.create_ssl_context()
return super(CCM8Adapter, self).proxy_manager_for(*args, **kwargs)

def create_ssl_context(self):
ssl_version=ssl.PROTOCOL_TLSv1_2
context = create_urllib3_context(ssl_version=ssl_version)
context.check_hostname = False
context.verify_mode = ssl.CERT_REQUIRED
context.set_ciphers(CIPHERS)
kwargs['ssl_context'] = context
return super(CCM8Adapter, self).proxy_manager_for(*args, **kwargs)
return context

class xcelMeter():

Expand All @@ -59,6 +60,17 @@ def __init__(self, name: str, ip_address: str, port: int, creds: Tuple[str, str]
# Create a new requests session based on the passed in ip address and port #
self.requests_session = self.setup_session(creds, ip_address)

# List to store our endpoint objects in
self.endpoints_list = self.load_endpoints('endpoints.yaml')

# Set to uninitialized
self.initalized = False

@retry(stop=stop_after_attempt(15),
wait=wait_exponential(multiplier=1, min=1, max=15),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True)
def setup(self) -> None:
# XML Entries we're looking for within the endpoint
hw_info_names = ['lFDI', 'swVer', 'mfID']
# Endpoint of the meter used for HW info
Expand All @@ -81,9 +93,11 @@ def __init__(self, name: str, ip_address: str, port: int, creds: Tuple[str, str]
# Send homeassistant a new device config for the meter
self.send_mqtt_config()

# List to store our endpoint objects in
self.endpoints_list = self.load_endpoints('endpoints.yaml')
# create endpoints from list
self.endpoints = self.create_endpoints(self.endpoints_list, self.device_info)

# ready to go
self.initalized = True

def get_hardware_details(self, hw_info_url: str, hw_names: list) -> dict:
"""
Expand All @@ -104,14 +118,14 @@ def get_hardware_details(self, hw_info_url: str, hw_names: list) -> dict:
return hw_info_dict

@staticmethod
def setup_session(creds: tuple, ip_address: str) -> requests.session:
def setup_session(creds: tuple, ip_address: str) -> requests.Session:
"""
Creates a new requests session with the given credentials pointed
at the give IP address. Will be shared across each xcelQuery object.
Returns: request.session
"""
session = requests.session()
session = requests.Session()
session.cert = creds
# Mount our adapter to the domain
session.mount('https://{ip_address}', CCM8Adapter())
Expand Down Expand Up @@ -166,9 +180,9 @@ def setup_mqtt(mqtt_server_address, mqtt_port) -> mqtt.Client:
"""
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
logging.info("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
logging.error("Failed to connect, return code %d\n", rc)

# Check if a username/PW is setup for the MQTT connection
mqtt_username = os.getenv('MQTT_USER')
Expand Down Expand Up @@ -210,9 +224,9 @@ def send_mqtt_config(self) -> None:
}
config_dict.update(self.device_info)
config_json = json.dumps(config_dict)
#print(f"Sending MQTT Discovery Payload")
#print(f"TOPIC: {state_topic}")
#print(f"Config: {config_json}")
logging.debug(f"Sending MQTT Discovery Payload")
logging.debug(f"TOPIC: {state_topic}")
logging.debug(f"Config: {config_json}")
self.mqtt_client.publish(state_topic, str(config_json))

def run(self) -> None:
Expand Down

0 comments on commit b89b5ff

Please sign in to comment.