Skip to content

Commit

Permalink
Simplify configuration and arguments merging (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvervloesem committed Aug 4, 2023
1 parent 2e1e5d0 commit 671707d
Showing 1 changed file with 33 additions and 119 deletions.
152 changes: 33 additions & 119 deletions TheengsGateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import sys
from pathlib import Path
from typing import Dict

from .ble_gateway import run

Expand Down Expand Up @@ -55,173 +56,148 @@
}


def main() -> None:
"""Main entry point of the TheengsGateway program."""
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments and return them."""
parser = argparse.ArgumentParser()
parser.add_argument(
"-H",
"--host",
dest="host",
type=str,
help="MQTT host address",
)
parser.add_argument(
"-P",
"--port",
dest="port",
type=int,
help="MQTT host port",
)
parser.add_argument(
"-u",
"--user",
dest="user",
type=str,
help="MQTT username",
)
parser.add_argument(
"-p",
"--pass",
dest="pwd",
type=str,
help="MQTT password",
)
parser.add_argument(
"-pt",
"--pub_topic",
dest="pub_topic",
type=str,
help="MQTT publish topic",
)
parser.add_argument(
"-st",
"--sub_topic",
dest="sub_topic",
type=str,
help="MQTT subscribe topic",
)
parser.add_argument(
"-Lt",
"--lwt_topic",
dest="lwt_topic",
type=str,
help="MQTT LWT topic",
)
parser.add_argument(
"-prt",
"--presence_topic",
dest="presence_topic",
type=str,
help="MQTT presence topic",
)
parser.add_argument(
"-pr",
"--presence",
dest="presence",
type=int,
help="Enable (1) or disable (0) presence publication (default: 1)",
)
parser.add_argument(
"-pa",
"--publish_all",
dest="publish_all",
type=int,
help="Publish all (1) or only decoded (0) advertisements (default: 1)",
)
parser.add_argument(
"-sd",
"--scan_duration",
dest="scan_dur",
type=int,
help="BLE scan duration (seconds)",
)
parser.add_argument(
"-tb",
"--time_between",
dest="time_between",
type=int,
help="Seconds to wait between scans",
)
parser.add_argument(
"-ll",
"--log_level",
dest="log_level",
type=str,
help="TheengsGateway log level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
)
parser.add_argument(
"-Dt",
"--discovery-topic",
dest="discovery_topic",
type=str,
help="MQTT Discovery topic",
)
parser.add_argument(
"-D",
"--discovery",
dest="discovery",
type=int,
help="Enable(1) or disable(0) MQTT discovery",
)
parser.add_argument(
"-Dh",
"--hass_discovery",
dest="hass_discovery",
type=int,
help="Enable(1) or disable(0) Home Assistant MQTT discovery "
"(default: 1)",
)
parser.add_argument(
"-Dn",
"--discovery_name",
dest="discovery_device_name",
type=str,
help="Device name for Home Assistant",
)
parser.add_argument(
"-Df",
"--discovery_filter",
dest="discovery_filter",
nargs="+",
default=[],
help="Device discovery filter list for Home Assistant",
)
parser.add_argument(
"-a",
"--adapter",
dest="adapter",
type=str,
help="Bluetooth adapter (e.g. hci1 on Linux)",
)
parser.add_argument(
"-s",
"--scanning_mode",
dest="scanning_mode",
type=str,
choices=("active", "passive"),
help="Scanning mode (default: active)",
)
parser.add_argument(
"-ts",
"--time_sync",
dest="time_sync",
nargs="+",
default=[],
help="Addresses of Bluetooth devices to synchronize the time",
)
parser.add_argument(
"-tf",
"--time_format",
dest="time_format",
type=int,
help="Use 12-hour (1) or 24-hour (0) time format for clocks "
"(default: 0)",
)
parser.add_argument(
"-padv",
"--publish_advdata",
dest="publish_advdata",
type=int,
help="Publish advertising and advanced data (1) or not (0) (default: 0)",
)
Expand All @@ -230,24 +206,45 @@ def main() -> None:
"--config",
dest="conf_path",
type=str,
default=str(Path("~/theengsgw.conf").expanduser()),
help="Path to the configuration file (default: ~/theengsgw.conf)",
)
parser.add_argument(
"-bk",
"--bindkeys",
nargs="+",
metavar=("ADDRESS", "BINDKEY"),
dest="bindkeys",
default={},
help="Device addresses and their bindkeys: ADDR1 KEY1 ADDR2 KEY2",
)

args = parser.parse_args()
return parser.parse_args()


def merge_args_with_config(config: Dict, args: argparse.Namespace) -> None:
"""Merge command-line arguments into configuration.
Command-line arguments override the corresponding configuration if they are set.
Lists and dicts are merged.
"""
for key, value in args.__dict__.items():
if value is not None:
if isinstance(value, list):
if key == "bindkeys":
config[key].update(
dict(zip(value[::2], value[1::2])),
)
else:
for element in value:
if element not in config[key]:
config[key].append(element)
else:
config[key] = value

if args.conf_path:
conf_path = Path(args.conf_path)
else:
conf_path = Path("~/theengsgw.conf").expanduser()

def main() -> None:
"""Main entry point of the TheengsGateway program."""
args = parse_args()
conf_path = Path(args.conf_path)

try:
with conf_path.open(encoding="utf-8") as config_file:
Expand All @@ -260,93 +257,10 @@ def main() -> None:
# This guarantees that all keys we refer to are in the dictionary.
config = {**default_config, **config}

if args.host:
config["host"] = args.host
if args.port:
config["port"] = args.port
if args.user:
config["user"] = args.user
if args.pwd:
config["pass"] = args.pwd
if args.pub_topic:
config["publish_topic"] = args.pub_topic
if args.sub_topic:
config["subscribe_topic"] = args.sub_topic
if args.lwt_topic:
config["lwt_topic"] = args.lwt_topic
if args.presence_topic:
config["presence_topic"] = args.presence_topic
if args.presence is not None:
config["presence"] = args.presence
if args.publish_all is not None:
config["publish_all"] = args.publish_all
if args.scan_dur:
config["ble_scan_time"] = args.scan_dur
if args.time_between:
config["ble_time_between_scans"] = args.time_between
if args.log_level:
config["log_level"] = args.log_level

if args.discovery is not None:
config["discovery"] = args.discovery
elif "discovery" not in config.keys():
config["discovery"] = default_config["discovery"]
config["discovery_topic"] = default_config["discovery_topic"]
config["discovery_device_name"] = default_config[
"discovery_device_name"
]
config["discovery_filter"] = default_config["discovery_filter"]

if args.hass_discovery is not None:
config["hass_discovery"] = args.hass_discovery

if args.discovery_topic:
config["discovery_topic"] = args.discovery_topic
elif "discovery_topic" not in config.keys():
config["discovery_topic"] = default_config["discovery_topic"]

if args.discovery_device_name:
config["discovery_device_name"] = args.discovery_device_name
elif "discovery_device_name" not in config.keys():
config["discovery_device_name"] = default_config[
"discovery_device_name"
]

if args.discovery_filter:
config["discovery_filter"] = default_config["discovery_filter"]
if args.discovery_filter[0] != "reset":
for item in args.discovery_filter:
config["discovery_filter"].append(item)
elif "discovery_filter" not in config.keys():
config["discovery_filter"] = default_config["discovery_filter"]

if args.adapter:
config["adapter"] = args.adapter

if args.scanning_mode:
config["scanning_mode"] = args.scanning_mode

if args.time_sync:
config["time_sync"] = default_config["time_sync"]
if args.time_sync[0] != "reset":
for item in args.time_sync:
config["time_sync"].append(item)
elif "time_sync" not in config.keys():
config["time_sync"] = default_config["time_sync"]

if args.time_format is not None:
config["time_format"] = args.time_format

if args.publish_advdata is not None:
config["publish_advdata"] = args.publish_advdata

if args.bindkeys:
config["bindkeys"].update(
dict(zip(args.bindkeys[::2], args.bindkeys[1::2])),
)
merge_args_with_config(config, args)

if not config["host"]:
sys.exit("Invalid MQTT host")
sys.exit("MQTT host is not specified")

try:
with conf_path.open(mode="w", encoding="utf-8") as config_file:
Expand Down

0 comments on commit 671707d

Please sign in to comment.