-
Notifications
You must be signed in to change notification settings - Fork 530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cover fake position persistent #213
Changes from 11 commits
f5a49a9
3526fdc
8e4c8e2
5ec9b70
16b6859
3518953
4a4296e
fd0f88b
feeb6de
e6c6a1b
d175b7f
de09977
5a0ebcc
da6ad53
42cbb30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ | |
COVER_12_CMDS = "1_2_3" | ||
COVER_MODE_NONE = "none" | ||
COVER_MODE_POSITION = "position" | ||
COVER_MODE_FAKE = "fake" | ||
COVER_MODE_TIMED = "timed" | ||
|
||
DEFAULT_COMMANDS_SET = COVER_ONOFF_CMDS | ||
DEFAULT_POSITIONING_MODE = COVER_MODE_NONE | ||
|
@@ -47,7 +47,7 @@ def flow_schema(dps): | |
[COVER_ONOFF_CMDS, COVER_OPENCLOSE_CMDS, COVER_FZZZ_CMDS, COVER_12_CMDS] | ||
), | ||
vol.Optional(CONF_POSITIONING_MODE, default=DEFAULT_POSITIONING_MODE): vol.In( | ||
[COVER_MODE_NONE, COVER_MODE_POSITION, COVER_MODE_FAKE] | ||
[COVER_MODE_NONE, COVER_MODE_POSITION, COVER_MODE_TIMED] | ||
), | ||
vol.Optional(CONF_CURRENT_POSITION_DP): vol.In(dps), | ||
vol.Optional(CONF_SET_POSITION_DP): vol.In(dps), | ||
|
@@ -64,16 +64,16 @@ class LocaltuyaCover(LocalTuyaEntity, CoverEntity): | |
def __init__(self, device, config_entry, switchid, **kwargs): | ||
"""Initialize a new LocaltuyaCover.""" | ||
super().__init__(device, config_entry, switchid, _LOGGER, **kwargs) | ||
self._state = None | ||
self._current_cover_position = 50 | ||
commands_set = DEFAULT_COMMANDS_SET | ||
if self.has_config(CONF_COMMANDS_SET): | ||
commands_set = self._config[CONF_COMMANDS_SET] | ||
self._open_cmd = commands_set.split("_")[0] | ||
self._close_cmd = commands_set.split("_")[1] | ||
self._stop_cmd = commands_set.split("_")[2] | ||
self._timer_start = time.time() | ||
self._previous_state = self._stop_cmd | ||
self._state = self._stop_cmd | ||
self._previous_state = self._state | ||
self._current_cover_position = 0 | ||
print("Initialized cover [{}]".format(self.name)) | ||
|
||
@property | ||
|
@@ -120,20 +120,19 @@ def is_closed(self): | |
async def async_set_cover_position(self, **kwargs): | ||
"""Move the cover to a specific position.""" | ||
self.debug("Setting cover position: %r", kwargs[ATTR_POSITION]) | ||
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_FAKE: | ||
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED: | ||
newpos = float(kwargs[ATTR_POSITION]) | ||
|
||
currpos = self.current_cover_position | ||
posdiff = abs(newpos - currpos) | ||
mydelay = posdiff / 50.0 * self._config[CONF_SPAN_TIME] | ||
mydelay = posdiff / 100.0 * self._config[CONF_SPAN_TIME] | ||
if newpos > currpos: | ||
self.debug("Opening to %f: delay %f", newpos, mydelay) | ||
await self.async_open_cover() | ||
else: | ||
self.debug("Closing to %f: delay %f", newpos, mydelay) | ||
await self.async_close_cover() | ||
await asyncio.sleep(mydelay) | ||
await self.async_stop_cover() | ||
self.hass.async_create_task(self.async_stop_after_timeout(mydelay)) | ||
self.debug("Done") | ||
|
||
elif self._config[CONF_POSITIONING_MODE] == COVER_MODE_POSITION: | ||
|
@@ -146,21 +145,48 @@ async def async_set_cover_position(self, **kwargs): | |
converted_position, self._config[CONF_SET_POSITION_DP] | ||
) | ||
|
||
async def async_stop_after_timeout(self, delay_sec): | ||
"""Stop the cover if timeout (max movement span) occurred.""" | ||
await asyncio.sleep(delay_sec) | ||
await self.async_stop_cover() | ||
|
||
async def async_open_cover(self, **kwargs): | ||
"""Open the cover.""" | ||
self.debug("Launching command %s to cover ", self._open_cmd) | ||
await self._device.set_dp(self._open_cmd, self._dp_id) | ||
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED: | ||
# for timed positioning, stop the cover after a full opening timespan | ||
# instead of waiting the internal timeout | ||
self.hass.async_create_task( | ||
self.async_stop_after_timeout(self._config[CONF_SPAN_TIME] + 5) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Magic constant: what does 5 mean? Create a constant for it. |
||
) | ||
|
||
async def async_close_cover(self, **kwargs): | ||
"""Close cover.""" | ||
self.debug("Launching command %s to cover ", self._close_cmd) | ||
await self._device.set_dp(self._close_cmd, self._dp_id) | ||
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED: | ||
# for timed positioning, stop the cover after a full opening timespan | ||
# instead of waiting the internal timeout | ||
self.hass.async_create_task( | ||
self.async_stop_after_timeout(self._config[CONF_SPAN_TIME] + 5) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same |
||
) | ||
|
||
async def async_stop_cover(self, **kwargs): | ||
"""Stop the cover.""" | ||
self.debug("Launching command %s to cover ", self._stop_cmd) | ||
await self._device.set_dp(self._stop_cmd, self._dp_id) | ||
|
||
def status_restored(self): | ||
"""Restore the last stored cover status.""" | ||
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED: | ||
stored_pos = int( | ||
self._stored_state.attributes.get("current_position", "-1") | ||
) | ||
if stored_pos != -1: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You return the string "-1" if the variable does not exist but compare with an integer here, is that intentional? Maybe just check if not |
||
self._current_cover_position = stored_pos | ||
self.debug("Restored cover position %s", self._current_cover_position) | ||
|
||
def status_updated(self): | ||
"""Device status was updated.""" | ||
self._previous_state = self._state | ||
|
@@ -177,13 +203,13 @@ def status_updated(self): | |
else: | ||
self._current_cover_position = curr_pos | ||
if ( | ||
self._config[CONF_POSITIONING_MODE] == COVER_MODE_FAKE | ||
self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED | ||
and self._state != self._previous_state | ||
): | ||
if self._previous_state != self._stop_cmd: | ||
# the state has changed, and the cover was moving | ||
time_diff = time.time() - self._timer_start | ||
pos_diff = round(time_diff / self._config[CONF_SPAN_TIME] * 50.0) | ||
pos_diff = round(time_diff / self._config[CONF_SPAN_TIME] * 100.0) | ||
if self._previous_state == self._close_cmd: | ||
pos_diff = -pos_diff | ||
self._current_cover_position = min( | ||
|
@@ -197,6 +223,7 @@ def status_updated(self): | |
time_diff, | ||
pos_diff, | ||
) | ||
|
||
# store the time of the last movement change | ||
self._timer_start = time.time() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really have to store it in
self._stored_state
, can't we just pass the value as an argument tostatus_restored
?