-
Notifications
You must be signed in to change notification settings - Fork 0
/
chip-server.py
executable file
·166 lines (139 loc) · 5.54 KB
/
chip-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/python3
import sys
import asyncio
import asyncio.streams
import argparse
import json
import subprocess
import logging
logging.basicConfig(filename='chip-server.log',level=logging.DEBUG, format='%(asctime)s|%(name)s|%(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
class CHIPServer:
__CHIP_GPIO = 'chip-gpio.py'
def __init__(self, port):
self.server = None
self.clients = {}
self.port = port
self.logger = logging.getLogger('chip-server')
def _accept_client(self, client_reader, client_writer):
task = asyncio.Task(self._handle_client(client_reader, client_writer))
self.clients[task] = (client_reader)
def client_done(task):
del self.clients[task]
task.add_done_callback(client_done)
@asyncio.coroutine
def _handle_client(self, client_reader, client_writer):
self.logger.info('New client was accepted')
while True:
data = (yield from client_reader.readline()).decode("utf-8")
if not data:
break
try:
self.logger.info('Received data: ' + data)
except:
selg.logger.error("Could not log received message")
self.__process_command(data, client_writer)
def start(self, loop):
self.logger.info('Starting CHIP server')
self.server = loop.run_until_complete(
asyncio.streams.start_server(self._accept_client,
'127.0.0.1', self.port,
loop=loop))
def stop(self, loop):
self.logger.info('Stoping CHIP server')
if self.server is not None:
self.server.close()
loop.run_until_complete(self.server.wait_closed())
self.server = None
def __process_command(self, data, client_writer):
try:
json_data = json.loads(data)
except:
self.logger.error("Could not parse message")
return
command = None
pin = None
try:
json_dict = json_data[0]
command = json_dict['command']
pin = json_dict['pin']
data = [{'command': 'enable', 'result': False}]
if command == 'enable':
data[0]['result'] = self.__enable(pin)
elif command == 'disable':
data[0]['result'] = self.__disable(pin)
elif command == 'mode':
data[0]['result'] = self.__mode(pin, json_dict['mode'])
elif command == 'write':
data[0]['result'] = self.__write(pin, json_dict['level'])
elif command == 'read':
data[0]['result'] = True
data[0]['answer'] = self.__read(pin)
else:
self.logger.error('Invalid command: ' + command)
client_writer.write("{!r}".format(data).rstrip('\r\n').encode('utf-8'))
except:
self.logger.error("Could not parse JSON data")
def __enable(self, pin: int):
result = None
try:
result = subprocess.call([CHIPServer.__CHIP_GPIO, 'enable', str(pin)])
except FileNotFoundError as error:
self.logger.error("Chip gpio is not installed")
if result is not 0:
self.logger.error("Could not enable pin " + str(pin))
return False
return True
def __disable(self, pin: int):
result = None
try:
result = subprocess.call([CHIPServer.__CHIP_GPIO, 'disable', str(pin)])
except FileNotFoundError as error:
self.logger.error("Chip gpio is not installed")
if result is not 0:
self.logger.error("Could not disable pin " + str(pin))
return False
return True
def __mode(self, pin: int, mode: str):
result = None
try:
result = subprocess.call([CHIPServer.__CHIP_GPIO, 'mode', str(pin), '--mode={}'.format(mode)])
except FileNotFoundError as error:
self.logger.error("Chip gpio is not installed")
if result is not 0:
self.logger.error("Could not mode pin " + str(pin))
return False
return True
def __write(self, pin: int, level: str):
result = None
try:
result = subprocess.call([CHIPServer.__CHIP_GPIO, 'write', str(pin), '--level={}'.format(level)])
except FileNotFoundError as error:
self.logger.error("Chip gpio is not installed")
if result is not 0:
self.logger.error("Could not write on pin " + str(pin))
return False
return True
def __read(self, pin: int):
try:
proc = subprocess.Popen([CHIPServer.__CHIP_GPIO, 'read', str(pin)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
if proc.returncode is not 0:
self.logger.error("Could not read on pin " + str(pin))
self.logger.info('Read level {0} at GPIO {1}'.format(out, str(pin)))
return out
except FileNotFoundError as error:
self.logger.error("Chip gpio is not installed")
def main():
parser = argparse.ArgumentParser(description = 'GPIO Server.')
parser.add_argument('port', type = int, help = 'Server port to listen')
arguments = parser.parse_args()
loop = asyncio.get_event_loop()
server = CHIPServer(arguments.port)
server.start(loop)
try:
loop.run_forever()
finally:
server.stop(loop)
loop.close()
if __name__ == '__main__':
main()