forked from mileperhour/localtuya-homeassistant
-
Notifications
You must be signed in to change notification settings - Fork 530
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #506 from rikman122/vacuum_platform_new
Vacuum platform new
- Loading branch information
Showing
4 changed files
with
326 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,258 @@ | ||
"""Platform to locally control Tuya-based vacuum devices.""" | ||
import logging | ||
from functools import partial | ||
|
||
import voluptuous as vol | ||
from homeassistant.components.vacuum import ( | ||
DOMAIN, | ||
STATE_CLEANING, | ||
STATE_DOCKED, | ||
STATE_IDLE, | ||
STATE_RETURNING, | ||
STATE_PAUSED, | ||
STATE_ERROR, | ||
SUPPORT_BATTERY, | ||
SUPPORT_FAN_SPEED, | ||
SUPPORT_PAUSE, | ||
SUPPORT_RETURN_HOME, | ||
SUPPORT_START, | ||
SUPPORT_STATE, | ||
SUPPORT_STATUS, | ||
SUPPORT_STOP, | ||
SUPPORT_LOCATE, | ||
StateVacuumEntity, | ||
) | ||
|
||
from .common import LocalTuyaEntity, async_setup_entry | ||
|
||
from .const import ( | ||
CONF_POWERGO_DP, | ||
CONF_IDLE_STATUS_VALUE, | ||
CONF_RETURNING_STATUS_VALUE, | ||
CONF_DOCKED_STATUS_VALUE, | ||
CONF_BATTERY_DP, | ||
CONF_MODE_DP, | ||
CONF_MODES, | ||
CONF_FAN_SPEED_DP, | ||
CONF_FAN_SPEEDS, | ||
CONF_CLEAN_TIME_DP, | ||
CONF_CLEAN_AREA_DP, | ||
CONF_CLEAN_RECORD_DP, | ||
CONF_LOCATE_DP, | ||
CONF_FAULT_DP, | ||
CONF_PAUSED_STATE, | ||
CONF_RETURN_MODE, | ||
CONF_STOP_STATUS, | ||
) | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
CLEAN_TIME = "clean_time" | ||
CLEAN_AREA = "clean_area" | ||
CLEAN_RECORD = "clean_record" | ||
MODES_LIST = "cleaning_mode_list" | ||
MODE = "cleaning_mode" | ||
FAULT = "fault" | ||
|
||
DEFAULT_IDLE_STATUS = "standby,sleep" | ||
DEFAULT_RETURNING_STATUS = "docking" | ||
DEFAULT_DOCKED_STATUS = "charging,chargecompleted" | ||
DEFAULT_MODES = "smart,wall_follow,spiral,single" | ||
DEFAULT_FAN_SPEEDS = "low,normal,high" | ||
DEFAULT_PAUSED_STATE = "paused" | ||
DEFAULT_RETURN_MODE = "chargego" | ||
DEFAULT_STOP_STATUS = "standby" | ||
|
||
|
||
def flow_schema(dps): | ||
"""Return schema used in config flow.""" | ||
return { | ||
vol.Required(CONF_IDLE_STATUS_VALUE, default=DEFAULT_IDLE_STATUS): str, | ||
vol.Required(CONF_POWERGO_DP): vol.In(dps), | ||
vol.Required(CONF_DOCKED_STATUS_VALUE, default=DEFAULT_DOCKED_STATUS): str, | ||
vol.Optional( | ||
CONF_RETURNING_STATUS_VALUE, default=DEFAULT_RETURNING_STATUS | ||
): str, | ||
vol.Optional(CONF_BATTERY_DP): vol.In(dps), | ||
vol.Optional(CONF_MODE_DP): vol.In(dps), | ||
vol.Optional(CONF_MODES, default=DEFAULT_MODES): str, | ||
vol.Optional(CONF_RETURN_MODE, default=DEFAULT_RETURN_MODE): str, | ||
vol.Optional(CONF_FAN_SPEED_DP): vol.In(dps), | ||
vol.Optional(CONF_FAN_SPEEDS, default=DEFAULT_FAN_SPEEDS): str, | ||
vol.Optional(CONF_CLEAN_TIME_DP): vol.In(dps), | ||
vol.Optional(CONF_CLEAN_AREA_DP): vol.In(dps), | ||
vol.Optional(CONF_CLEAN_RECORD_DP): vol.In(dps), | ||
vol.Optional(CONF_LOCATE_DP): vol.In(dps), | ||
vol.Optional(CONF_FAULT_DP): vol.In(dps), | ||
vol.Optional(CONF_PAUSED_STATE, default=DEFAULT_PAUSED_STATE): str, | ||
vol.Optional(CONF_STOP_STATUS, default=DEFAULT_STOP_STATUS): str, | ||
} | ||
|
||
|
||
class LocaltuyaVacuum(LocalTuyaEntity, StateVacuumEntity): | ||
"""Tuya vacuum device.""" | ||
|
||
def __init__(self, device, config_entry, switchid, **kwargs): | ||
"""Initialize a new LocaltuyaVacuum.""" | ||
super().__init__(device, config_entry, switchid, _LOGGER, **kwargs) | ||
self._state = None | ||
self._battery_level = None | ||
self._attrs = {} | ||
|
||
self._idle_status_list = [] | ||
if self.has_config(CONF_IDLE_STATUS_VALUE): | ||
self._idle_status_list = self._config[CONF_IDLE_STATUS_VALUE].split(",") | ||
|
||
self._modes_list = [] | ||
if self.has_config(CONF_MODES): | ||
self._modes_list = self._config[CONF_MODES].split(",") | ||
self._attrs[MODES_LIST] = self._modes_list | ||
|
||
self._docked_status_list = [] | ||
if self.has_config(CONF_DOCKED_STATUS_VALUE): | ||
self._docked_status_list = self._config[CONF_DOCKED_STATUS_VALUE].split(",") | ||
|
||
self._fan_speed_list = [] | ||
if self.has_config(CONF_FAN_SPEEDS): | ||
self._fan_speed_list = self._config[CONF_FAN_SPEEDS].split(",") | ||
|
||
self._fan_speed = "" | ||
self._cleaning_mode = "" | ||
|
||
print("Initialized vacuum [{}]".format(self.name)) | ||
|
||
@property | ||
def supported_features(self): | ||
"""Flag supported features.""" | ||
supported_features = ( | ||
SUPPORT_START | ||
| SUPPORT_PAUSE | ||
| SUPPORT_STOP | ||
| SUPPORT_STATUS | ||
| SUPPORT_STATE | ||
) | ||
|
||
if self.has_config(CONF_RETURN_MODE): | ||
supported_features = supported_features | SUPPORT_RETURN_HOME | ||
if self.has_config(CONF_FAN_SPEED_DP): | ||
supported_features = supported_features | SUPPORT_FAN_SPEED | ||
if self.has_config(CONF_BATTERY_DP): | ||
supported_features = supported_features | SUPPORT_BATTERY | ||
if self.has_config(CONF_LOCATE_DP): | ||
supported_features = supported_features | SUPPORT_LOCATE | ||
|
||
return supported_features | ||
|
||
@property | ||
def state(self): | ||
"""Return the vacuum state.""" | ||
return self._state | ||
|
||
@property | ||
def battery_level(self): | ||
"""Return the current battery level.""" | ||
return self._battery_level | ||
|
||
@property | ||
def extra_state_attributes(self): | ||
"""Return the specific state attributes of this vacuum cleaner.""" | ||
return self._attrs | ||
|
||
@property | ||
def fan_speed(self): | ||
"""Return the current fan speed.""" | ||
return self._fan_speed | ||
|
||
@property | ||
def fan_speed_list(self) -> list: | ||
"""Return the list of available fan speeds.""" | ||
return self._fan_speed_list | ||
|
||
async def async_start(self, **kwargs): | ||
"""Turn the vacuum on and start cleaning.""" | ||
await self._device.set_dp(True, self._config[CONF_POWERGO_DP]) | ||
|
||
async def async_pause(self, **kwargs): | ||
"""Stop the vacuum cleaner, do not return to base.""" | ||
await self._device.set_dp(False, self._config[CONF_POWERGO_DP]) | ||
|
||
async def async_return_to_base(self, **kwargs): | ||
"""Set the vacuum cleaner to return to the dock.""" | ||
if self.has_config(CONF_RETURN_MODE): | ||
await self._device.set_dp( | ||
self._config[CONF_RETURN_MODE], self._config[CONF_MODE_DP] | ||
) | ||
else: | ||
_LOGGER.error("Missing command for return home in commands set.") | ||
|
||
async def async_stop(self, **kwargs): | ||
"""Turn the vacuum off stopping the cleaning.""" | ||
if self.has_config(CONF_STOP_STATUS): | ||
await self._device.set_dp( | ||
self._config[CONF_STOP_STATUS], self._config[CONF_MODE_DP] | ||
) | ||
else: | ||
_LOGGER.error("Missing command for stop in commands set.") | ||
|
||
async def async_clean_spot(self, **kwargs): | ||
"""Perform a spot clean-up.""" | ||
return None | ||
|
||
async def async_locate(self, **kwargs): | ||
"""Locate the vacuum cleaner.""" | ||
if self.has_config(CONF_LOCATE_DP): | ||
await self._device.set_dp("", self._config[CONF_LOCATE_DP]) | ||
|
||
async def async_set_fan_speed(self, fan_speed, **kwargs): | ||
"""Set the fan speed.""" | ||
await self._device.set_dp(fan_speed, self._config[CONF_FAN_SPEED_DP]) | ||
|
||
async def async_send_command(self, command, params=None, **kwargs): | ||
"""Send a command to a vacuum cleaner.""" | ||
if command == "set_mode" and "mode" in params: | ||
mode = params["mode"] | ||
await self._device.set_dp(mode, self._config[CONF_MODE_DP]) | ||
|
||
def status_updated(self): | ||
"""Device status was updated.""" | ||
state_value = str(self.dps(self._dp_id)) | ||
|
||
if state_value in self._idle_status_list: | ||
self._state = STATE_IDLE | ||
elif state_value in self._docked_status_list: | ||
self._state = STATE_DOCKED | ||
elif state_value == self._config[CONF_RETURNING_STATUS_VALUE]: | ||
self._state = STATE_RETURNING | ||
elif state_value == self._config[CONF_PAUSED_STATE]: | ||
self._state = STATE_PAUSED | ||
else: | ||
self._state = STATE_CLEANING | ||
|
||
if self.has_config(CONF_BATTERY_DP): | ||
self._battery_level = self.dps_conf(CONF_BATTERY_DP) | ||
|
||
self._cleaning_mode = "" | ||
if self.has_config(CONF_MODES): | ||
self._cleaning_mode = self.dps_conf(CONF_MODE_DP) | ||
self._attrs[MODE] = self._cleaning_mode | ||
|
||
self._fan_speed = "" | ||
if self.has_config(CONF_FAN_SPEEDS): | ||
self._fan_speed = self.dps_conf(CONF_FAN_SPEED_DP) | ||
|
||
if self.has_config(CONF_CLEAN_TIME_DP): | ||
self._attrs[CLEAN_TIME] = self.dps_conf(CONF_CLEAN_TIME_DP) | ||
|
||
if self.has_config(CONF_CLEAN_AREA_DP): | ||
self._attrs[CLEAN_AREA] = self.dps_conf(CONF_CLEAN_AREA_DP) | ||
|
||
if self.has_config(CONF_CLEAN_RECORD_DP): | ||
self._attrs[CLEAN_RECORD] = self.dps_conf(CONF_CLEAN_RECORD_DP) | ||
|
||
if self.has_config(CONF_FAULT_DP): | ||
self._attrs[FAULT] = self.dps_conf(CONF_FAULT_DP) | ||
if self._attrs[FAULT] != 0: | ||
self._state = STATE_ERROR | ||
|
||
|
||
async_setup_entry = partial(async_setup_entry, DOMAIN, LocaltuyaVacuum, flow_schema) |