/
driver.py
executable file
·85 lines (71 loc) · 3.15 KB
/
driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python3
from assistant import Assistant
from config import init_assistants
from errors import DasApplicationNotRunningError
from json import loads
from multiprocessing import Process
from requests import delete, exceptions as requests_exceptions, get, Response
from settings import BASE_URL, HEADERS, PID
from typing import List
from urllib3 import exceptions as url_exceptions
all_bindings: List[Process] = []
def kill_all_bindings():
for binding in all_bindings:
binding.terminate()
url: str = BASE_URL + '/shadows'
try:
response: Response = get(url, headers=HEADERS)
except (requests_exceptions.ConnectionError,
url_exceptions.NewConnectionError):
e: DasApplicationNotRunningError = (
DasApplicationNotRunningError('Keyboard Driver'))
e.elaborate()
exit()
for signal in loads(response.content):
zone_id: str = signal['zoneId']
delete(BASE_URL + '/pid/' + PID + '/zoneId/' + zone_id,
headers=HEADERS)
def initiate_binding(binding: Process):
binding.start()
all_bindings.append(binding)
def main():
"""The Driver of the application that spawns a thread for each binding
The Driver will fork once for each assistant, giving each assistant thier
own thread. The main thread of the driver will continually parse the input
from the user listening for the shutdown command
NOTE: The driver can simply be run by navigating to this directory and
running `./driver.py`
NOTE: All assistants should be created in `init_assistants()` in
`config.py`
NOTE: Every assistant returned in the List from `init_assistants()` has
thier binding managed by the driver - once an assistant is added to that
list, there is nothing more needed to orchestrate an additional assistant
"""
print("Igniting kindling...")
all_assistants: List[Assistant] = init_assistants()
for assistant in all_assistants:
initiate_binding(Process(target=assistant.create_binding))
print("")
print(" ( ( ) * ) ( ")
print(" )\ ) )\ ) ( /( ( ` * ) ( /( )\ ) ( ( ( ")
print("(()/((()/( )\()) )\))( ( ` ) /( )\()) ( ( (()/( )\))( ( )\ ")
print(" /(_))/(_))((_)\ ((_)()\ )\ ( )(_))((_)\ )\ )\ /(_))__((_)()\ )((_) ")
print("(_)) (_)) ((_) (_()((_)((_) (_(_()) _((_)((_) ((_)(_)) |_|(()((_)((_)_ ")
print("| _ \| _ \ / _ \ | \/ || __||_ _|| || || __|| | | |/ __| | __| / _ \ ")
print("| _/| /| (_) || |\/| || _| | | | __ || _| | |_| |\__ \===|__ \| (_) | ")
print("|_| |_|_\ \___/ |_| |_||___| |_| |_||_||___| \___/ |___/ |___/ \__\_\ ")
print("")
# infinte loop while the child processes run
while 1:
try:
key = input('Press Q + <ENTER> to extinguish:\n')
if key == 'Q':
raise KeyboardInterrupt
else:
print('Unexpected Input')
except KeyboardInterrupt:
print('Extinguishing...')
kill_all_bindings()
return
if __name__ == '__main__':
main()