Skip to content

Commit

Permalink
Handle concurrent requests senquentially
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieu-mp committed Jun 20, 2022
1 parent 5da720b commit 3fc5268
Showing 1 changed file with 46 additions and 43 deletions.
89 changes: 46 additions & 43 deletions intex_spa/intex_spa.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, address: str = "SPA_DEVICE", port: str = "8990"):
self.status = IntexSpaStatus()
self.last_successful_update_ms: int = None
self.is_available: bool = None
self._semaphore = asyncio.Semaphore(1)

async def _async_handle_intent(
self, intent: str = "status", expected_state: typing.Union[bool, int] = None
Expand Down Expand Up @@ -82,49 +83,51 @@ async def _async_handle_intent(
)
await self.async_update_status()

if (
# the provided intent is an update status
intent == "status"
# the provided intent is a command and its expected_state differs from the current state
or getattr(self.status, intent) != expected_state
):
_LOGGER.debug("'%s' intent: a spa query is needed", intent)

# Attempt maximum 5 times
for _ in range(5):
try:
_LOGGER.debug("'%s' intent: new spa query", intent)
# Initialize a query to the spa
query = IntexSpaQuery(intent, expected_state)

# Send the raw request bytes via the network object
await self.network.async_send(query.request_bytes)

# Receive the raw response bytes via the network object
received_bytes = await self.network.async_receive()
# Give the raw received_bytes back to the query object to render the new status
query.render_response_status(received_bytes)
_LOGGER.debug("'%s' intent: new status is rendered", intent)
# And update the status object with it
self.status = query.response_status

# Set availability info
self.last_successful_update_ms = int(time.time() * 1000)
self.is_available = True
except (
asyncio.TimeoutError,
asyncio.IncompleteReadError,
AssertionError,
):
_LOGGER.warning("Exception raised during spa querying")
await asyncio.sleep(2)
continue
else:
break
else: # No retry has succeeded
# Set unavailability info
self.status = IntexSpaStatus()
self.is_available = False
# Run concurrent requests senquentially
async with self._semaphore:
if (
# the provided intent is an update status
intent == "status"
# the provided intent is a command and its expected_state differs from the current state
or getattr(self.status, intent) != expected_state
):
_LOGGER.debug("'%s' intent: a spa query is needed", intent)

# Attempt maximum 5 times
for _ in range(5):
try:
_LOGGER.debug("'%s' intent: new spa query", intent)
# Initialize a query to the spa
query = IntexSpaQuery(intent, expected_state)

# Send the raw request bytes via the network object
await self.network.async_send(query.request_bytes)

# Receive the raw response bytes via the network object
received_bytes = await self.network.async_receive()
# Give the raw received_bytes back to the query object to render the new status
query.render_response_status(received_bytes)
_LOGGER.debug("'%s' intent: new status is rendered", intent)
# And update the status object with it
self.status = query.response_status

# Set availability info
self.last_successful_update_ms = int(time.time() * 1000)
self.is_available = True
except (
asyncio.TimeoutError,
asyncio.IncompleteReadError,
AssertionError,
):
_LOGGER.warning("Exception raised during spa querying")
await asyncio.sleep(2)
continue
else:
break
else: # No retry has succeeded
# Set unavailability info
self.status = IntexSpaStatus()
self.is_available = False

return self.status

Expand Down

0 comments on commit 3fc5268

Please sign in to comment.