Skip to content

Commit

Permalink
Improve canceling set position task
Browse files Browse the repository at this point in the history
  • Loading branch information
rrooggiieerr committed May 14, 2024
1 parent 8221e9b commit 0fb6f45
Showing 1 changed file with 52 additions and 24 deletions.
76 changes: 52 additions & 24 deletions xyscreens/xyscreens.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def update_status(self) -> Tuple[XYScreensState, float]:
direction = -1.0
action_duration = self._up_duration
else:
self._last_recompute_time = time.time_ns()
return (self._state, self._position)

now = time.time_ns()
Expand Down Expand Up @@ -267,10 +268,8 @@ def _update_callbacks(self):
logger.error("Exception in callback: %s", ex)

def _post_up(self) -> bool:
if self._state is XYScreensState.DOWNWARD:
self.update_status()

if self._state not in (XYScreensState.UPWARD, XYScreensState.UP):
self.update_status()
self._state = XYScreensState.UPWARD
return True

Expand Down Expand Up @@ -314,6 +313,8 @@ def stop(self) -> bool:
async def async_stop(self) -> bool:
"Stop the screen."

await self._cancel_set_position()

if (
await self._async_send_command(XYScreensCommand.STOP.to_bytes())
and self._post_stop()
Expand All @@ -324,10 +325,8 @@ async def async_stop(self) -> bool:
return False

def _post_down(self) -> bool:
if self._state is XYScreensState.UPWARD:
self.update_status()

if self._state not in (XYScreensState.DOWNWARD, XYScreensState.DOWN):
self.update_status()
self._state = XYScreensState.DOWNWARD
return True

Expand Down Expand Up @@ -385,15 +384,11 @@ async def async_set_position(self, target_position: float) -> bool:
"""Initiates the screen to move to a given position."""
assert 0.0 <= target_position <= 100.0

if self._set_position_task is not None and not self._set_position_task.done():
if not self._set_position_task.cancel():
logger.error("Failed to cancel previous set position task")
return False
logger.debug("Canceled previous set position task")

if round(self._position) == round(target_position):
return await self.async_stop()

await self._cancel_set_position()

if self._position < target_position:
if not await self._async_send_command(XYScreensCommand.DOWN.to_bytes()):
return False
Expand All @@ -409,23 +404,56 @@ async def async_set_position(self, target_position: float) -> bool:

return True

async def _cancel_set_position(self) -> bool:
if self._set_position_task is not None and not (
self._set_position_task.done() or self._set_position_task.cancelled()
):
if not self._set_position_task.cancel():
logger.error("Failed to cancel set position task")
logger.debug("Set position task: %s", self._set_position_task)
return False
try:
await self._set_position_task
except asyncio.CancelledError:
logger.debug("Set position task was cancelled")

if self._set_position_task is not None:
if self._set_position_task.done() or self._set_position_task.cancelled():
self._set_position_task = None
else:
logger.error("Failed to cancel set position task")
logger.debug("Set position task: %s", self._set_position_task)
return False

self.update_status()
self._update_callbacks()

return self._set_position_task is None

async def _set_position_coroutine(self, target_position: float):
sleep_duration = min(self._up_duration, self._down_duration) / 1000.0
while True:
target_position_reached = self._target_position_reached(target_position)

self._update_callbacks()

if target_position_reached:
if self._state in (
XYScreensState.UPWARD,
XYScreensState.DOWNWARD,
) and await self._async_send_command(XYScreensCommand.STOP.to_bytes()):
self._post_stop()
try:
target_position_reached = self._target_position_reached(target_position)

self._update_callbacks()

if target_position_reached:
if self._state in (
XYScreensState.UPWARD,
XYScreensState.DOWNWARD,
) and await self._async_send_command(
XYScreensCommand.STOP.to_bytes()
):
self._post_stop()
self._update_callbacks()
break

await asyncio.sleep(sleep_duration)
except asyncio.CancelledError:
logger.debug("Set position task was canceled")
break

await asyncio.sleep(sleep_duration)

def state(self) -> XYScreensState:
"Returns the current state of the screen."
(state, _) = self.update_status()
Expand Down

0 comments on commit 0fb6f45

Please sign in to comment.