Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
70 lines (53 sloc) 2.76 KB
import appdaemon.plugins.hass.hassapi as hass
class ApplianceOnEventListener(hass.Hass):
"""Event listener for Telegram bot events."""
def initialize(self):
self.listen_event(self.receive_telegram_callback, 'telegram_callback')
self.listen_state(
self.check_appliance_turned_on, entity=self.args["power_sensor"])
self.timer_handle = None
self.command_name = self.args["appliance_name"].replace(" ", "_")
def check_appliance_turned_on(self, entity, attribute, old, new, kwargs):
if float(new) > 0 and self.timer_handle is None:
# Appliance turned on. Start timer.
self.log("Starting timer for {}".format(self.args["appliance_name"]))
self.timer_handle = self.run_in(self.send_notification,
self.args["time_before_shutoff"])
elif float(new) == 0:
# Appliance turned off. Kill any timers.
self.log("Stopping timer for {}".format(self.args["appliance_name"]))
self.cancel_timer(self.timer_handle)
self.timer_handle = None
def send_notification(self, kwargs):
self.log("Notifying user about {}".format(self.args["appliance_name"]))
# Check if still enabled
keyboard = [[("Turn Off", "/turn_off_{}".format(self.command_name)),
("Do Nothing", "/do_nothing_{}".format(self.command_name))]]
self.call_service(
'telegram_bot/send_message',
target=self.args["telegram_user"],
message=
"You have left the {} for {} seconds. Do you want to turn it off?".
format(self.args["appliance_name"], self.args["time_before_shutoff"]),
disable_notification=True,
inline_keyboard=keyboard)
# Keep sending notification every 2 minutes
self.timer_handle = self.run_in(self.send_notification, 120)
def receive_telegram_callback(self, event_id, payload_event, *args):
assert event_id == 'telegram_callback'
data_callback = payload_event['data']
callback_id = payload_event['id']
if data_callback == '/turn_off_{}'.format(self.command_name):
self.call_service(
'telegram_bot/answer_callback_query',
message='Turning it off!',
callback_query_id=callback_id)
self.call_service(
'switch/turn_off', entity_id=self.args["power_switch"])
elif data_callback == '/do_nothing_{}'.format(self.command_name):
self.call_service(
'telegram_bot/answer_callback_query',
message='OK, you said no!',
callback_query_id=callback_id)
self.cancel_timer(self.timer_handle)
self.timer_handle = None