Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly detect disconnections from the bridge #5

Merged
merged 2 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions aiocomfoconnect/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from aiocomfoconnect.discovery import discover_bridges
from aiocomfoconnect.exceptions import (
AioComfoConnectNotConnected,
AioComfoConnectTimeout,
ComfoConnectNotAllowed,
)
from aiocomfoconnect.sensors import SENSORS
Expand Down Expand Up @@ -140,15 +141,20 @@ def sensor_callback(sensor, value):

try:
while True:
# Wait for updates and send a keepalive every 60 seconds
await asyncio.sleep(60)
# Wait for updates and send a keepalive every 30 seconds
await asyncio.sleep(30)

try:
print("Sending keepalive...")
await comfoconnect.cmd_keepalive()
except AioComfoConnectNotConnected:
# Use cmd_time_request as a keepalive since cmd_keepalive doesn't send back a reply we can wait for
await comfoconnect.cmd_time_request()

except (AioComfoConnectNotConnected, AioComfoConnectTimeout):
# Reconnect when connection has been dropped
await comfoconnect.connect(uuid)
try:
await comfoconnect.connect(uuid)
except AioComfoConnectTimeout:
_LOGGER.warning("Connection timed out. Retrying later...")

except KeyboardInterrupt:
pass
Expand Down
69 changes: 43 additions & 26 deletions aiocomfoconnect/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import logging
import struct
from asyncio import IncompleteReadError, StreamReader, StreamWriter
from typing import Awaitable

from google.protobuf.message import DecodeError
from google.protobuf.message import Message as ProtobufMessage

from .exceptions import (
AioComfoConnectNotConnected,
AioComfoConnectTimeout,
ComfoConnectBadRequest,
ComfoConnectError,
ComfoConnectInternalError,
Expand All @@ -25,6 +27,8 @@

_LOGGER = logging.getLogger(__name__)

TIMEOUT = 5


class EventBus:
"""An event bus for async replies."""
Expand Down Expand Up @@ -87,36 +91,47 @@ def set_alarm_callback(self, callback: callable):

async def connect(self, uuid: str):
"""Connect to the bridge."""
await self.disconnect()

_LOGGER.debug("Connecting to bridge %s", self.host)
self._reader, self._writer = await asyncio.open_connection(self.host, self.PORT)
try:
self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self.host, self.PORT), TIMEOUT)
except asyncio.TimeoutError as exc:
raise AioComfoConnectTimeout() from exc

self._reference = 1
self._local_uuid = uuid
self._event_bus = EventBus()

# We are connected, start the background task
self._read_task = self._loop.create_task(self._read_messages())

_LOGGER.debug("Connected to bridge %s", self.host)

async def disconnect(self):
"""Disconnect from the bridge."""
_LOGGER.debug("Disconnecting from bridge %s", self.host)

# Cancel the background task
self._read_task.cancel()
if self._read_task:
# Cancel the background task
self._read_task.cancel()

# Wait for background task to finish
try:
await self._read_task
except asyncio.CancelledError:
pass

# Disconnect
await self.cmd_close_session()
if self._writer:
self._writer.close()

# Wait for background task to finish
try:
await self._read_task
except asyncio.CancelledError:
pass
_LOGGER.debug("Disconnected from bridge %s", self.host)

def is_connected(self) -> bool:
"""Returns True if the bridge is connected."""
return self._writer is not None and not self._writer.is_closing()

def _send(self, request, request_type, params: dict = None, reply: bool = True) -> Message:
async def _send(self, request, request_type, params: dict = None, reply: bool = True) -> Message:
"""Sends a command and wait for a response if the request is known to return a result."""
# Check if we are actually connected
if not self.is_connected():
Expand Down Expand Up @@ -149,7 +164,11 @@ def _send(self, request, request_type, params: dict = None, reply: bool = True)
# Increase message reference for next message
self._reference += 1

return fut
try:
return await asyncio.wait_for(fut, TIMEOUT)
except asyncio.TimeoutError as exc:
_LOGGER.warning("Timeout while waiting for response from bridge")
raise AioComfoConnectTimeout from exc

async def _read(self) -> Message:
# Read packet size
Expand Down Expand Up @@ -237,29 +256,27 @@ async def _read_messages(self):
except DecodeError as exc:
_LOGGER.error("Failed to decode message: %s", exc)

def cmd_start_session(self, take_over: bool = False):
def cmd_start_session(self, take_over: bool = False) -> Awaitable[Message]:
"""Starts the session on the device by logging in and optionally disconnecting an already existing session."""
_LOGGER.debug("StartSessionRequest")
# pylint: disable=no-member
result = self._send(
return self._send(
zehnder_pb2.StartSessionRequest,
zehnder_pb2.GatewayOperation.StartSessionRequestType,
{"takeover": take_over},
)
return result

def cmd_close_session(self):
def cmd_close_session(self) -> Awaitable[Message]:
"""Stops the current session."""
_LOGGER.debug("CloseSessionRequest")
# pylint: disable=no-member
result = self._send(
return self._send(
zehnder_pb2.CloseSessionRequest,
zehnder_pb2.GatewayOperation.CloseSessionRequestType,
reply=False, # Don't wait for a reply
)
return result

def cmd_list_registered_apps(self):
def cmd_list_registered_apps(self) -> Awaitable[Message]:
"""Returns a list of all the registered clients."""
_LOGGER.debug("ListRegisteredAppsRequest")
# pylint: disable=no-member
Expand All @@ -268,7 +285,7 @@ def cmd_list_registered_apps(self):
zehnder_pb2.GatewayOperation.ListRegisteredAppsRequestType,
)

def cmd_register_app(self, uuid: str, device_name: str, pin: int):
def cmd_register_app(self, uuid: str, device_name: str, pin: int) -> Awaitable[Message]:
"""Register a new app by specifying our own uuid, device_name and pin code."""
_LOGGER.debug("RegisterAppRequest")
# pylint: disable=no-member
Expand All @@ -282,7 +299,7 @@ def cmd_register_app(self, uuid: str, device_name: str, pin: int):
},
)

def cmd_deregister_app(self, uuid: str):
def cmd_deregister_app(self, uuid: str) -> Awaitable[Message]:
"""Remove the specified app from the registration list."""
_LOGGER.debug("DeregisterAppRequest")
if uuid == self._local_uuid:
Expand All @@ -295,7 +312,7 @@ def cmd_deregister_app(self, uuid: str):
{"uuid": bytes.fromhex(uuid)},
)

def cmd_version_request(self):
def cmd_version_request(self) -> Awaitable[Message]:
"""Returns version information."""
_LOGGER.debug("VersionRequest")
# pylint: disable=no-member
Expand All @@ -304,7 +321,7 @@ def cmd_version_request(self):
zehnder_pb2.GatewayOperation.VersionRequestType,
)

def cmd_time_request(self):
def cmd_time_request(self) -> Awaitable[Message]:
"""Returns the current time on the device."""
_LOGGER.debug("CnTimeRequest")
# pylint: disable=no-member
Expand All @@ -313,7 +330,7 @@ def cmd_time_request(self):
zehnder_pb2.GatewayOperation.CnTimeRequestType,
)

def cmd_rmi_request(self, message, node_id: int = 1):
def cmd_rmi_request(self, message, node_id: int = 1) -> Awaitable[Message]:
"""Sends a RMI request."""
_LOGGER.debug("CnRmiRequest")
# pylint: disable=no-member
Expand All @@ -323,7 +340,7 @@ def cmd_rmi_request(self, message, node_id: int = 1):
{"nodeId": node_id or 1, "message": message},
)

def cmd_rpdo_request(self, pdid: int, pdo_type: int = 1, zone: int = 1, timeout=None):
def cmd_rpdo_request(self, pdid: int, pdo_type: int = 1, zone: int = 1, timeout=None) -> Awaitable[Message]:
"""Register a RPDO request."""
_LOGGER.debug("CnRpdoRequest")
# pylint: disable=no-member
Expand All @@ -333,7 +350,7 @@ def cmd_rpdo_request(self, pdid: int, pdo_type: int = 1, zone: int = 1, timeout=
{"pdid": pdid, "type": pdo_type, "zone": zone or 1, "timeout": timeout},
)

def cmd_keepalive(self):
def cmd_keepalive(self) -> Awaitable[Message]:
"""Sends a keepalive."""
_LOGGER.debug("KeepAlive")
# pylint: disable=no-member
Expand Down
6 changes: 5 additions & 1 deletion aiocomfoconnect/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ class ComfoConnectRmiError(ComfoConnectError):


class AioComfoConnectNotConnected(Exception):
"""An error occured because the bridge is not connected."""
"""An error occurred because the bridge is not connected."""


class AioComfoConnectTimeout(Exception):
"""An error occurred because the bridge didn't reply in time."""