/
pi.py
285 lines (254 loc) · 12.4 KB
/
pi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# Copyright (C) 2013-2018 Jean-Francois Romang (jromang@posteo.de)
# Shivkumar Shivaji ()
# Jürgen Précour (LocutusOfPenguin@posteo.de)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import time
from threading import Lock, Timer
from ctypes import cdll, c_byte, create_string_buffer, pointer
from platform import machine
from utilities import DisplayMsg, hms_time
from dgt.api import Message
from dgt.util import ClockIcons, ClockSide
from dgt.translate import DgtTranslate
from dgt.board import DgtBoard
from dgt.iface import DgtIface
class DgtPi(DgtIface):
"""Handle the DgtPi communication."""
def __init__(self, dgtboard: DgtBoard):
super(DgtPi, self).__init__(dgtboard)
self.lib_lock = Lock()
self.lib = cdll.LoadLibrary('etc/dgtpicom.x86.so' if machine() == 'x86_64' else 'etc/dgtpicom.so')
# keep the last time to find out errorous DGT_MSG_BWTIME messages (error: current time > last time)
self.r_time = 3600 * 10 # max value cause 10h cant be reached by clock
self.l_time = 3600 * 10 # max value cause 10h cant be reached by clock
self.in_settime = False # this is true between set_clock and clock_start => use set values instead of clock
self._startup_i2c_clock()
incoming_clock_thread = Timer(0, self._process_incoming_clock_forever)
incoming_clock_thread.start()
def _startup_i2c_clock(self):
while self.lib.dgtpicom_init() < 0:
logging.warning('Init() failed - Jack half connected?')
DisplayMsg.show(Message.DGT_JACK_CONNECTED_ERROR())
time.sleep(0.5) # dont flood the log
if self.lib.dgtpicom_configure() < 0:
logging.warning('Configure() failed - Jack connected back?')
DisplayMsg.show(Message.DGT_JACK_CONNECTED_ERROR())
DisplayMsg.show(Message.DGT_CLOCK_VERSION(main=2, sub=2, dev='i2c', text=None))
def _process_incoming_clock_forever(self):
but = c_byte(0)
buttime = c_byte(0)
clktime = create_string_buffer(6)
counter = 0
logging.info('incoming_clock ready')
while True:
with self.lib_lock:
# get button events
res = self.lib.dgtpicom_get_button_message(pointer(but), pointer(buttime))
if res > 0:
ack3 = but.value
if ack3 == 0x01:
logging.info('(i2c) clock button 0 pressed')
DisplayMsg.show(Message.DGT_BUTTON(button=0, dev='i2c'))
if ack3 == 0x02:
logging.info('(i2c) clock button 1 pressed')
DisplayMsg.show(Message.DGT_BUTTON(button=1, dev='i2c'))
if ack3 == 0x04:
logging.info('(i2c) clock button 2 pressed')
DisplayMsg.show(Message.DGT_BUTTON(button=2, dev='i2c'))
if ack3 == 0x08:
logging.info('(i2c) clock button 3 pressed')
DisplayMsg.show(Message.DGT_BUTTON(button=3, dev='i2c'))
if ack3 == 0x10:
logging.info('(i2c) clock button 4 pressed')
DisplayMsg.show(Message.DGT_BUTTON(button=4, dev='i2c'))
if ack3 == 0x20:
logging.info('(i2c) clock button on/off pressed')
self.lib.dgtpicom_configure() # restart the clock - cause its OFF
DisplayMsg.show(Message.DGT_BUTTON(button=0x11, dev='i2c'))
if ack3 == 0x11:
logging.info('(i2c) clock button 0+4 pressed')
DisplayMsg.show(Message.DGT_BUTTON(button=0x11, dev='i2c'))
if ack3 == 0x40:
logging.info('(i2c) clock lever pressed > right side down')
DisplayMsg.show(Message.DGT_BUTTON(button=0x40, dev='i2c'))
if ack3 == -0x40:
logging.info('(i2c) clock lever pressed > left side down')
DisplayMsg.show(Message.DGT_BUTTON(button=-0x40, dev='i2c'))
if res < 0:
logging.warning('GetButtonMessage returned error %i', res)
# get time events
self.lib.dgtpicom_get_time(clktime)
times = list(clktime.raw)
counter = (counter + 1) % 10
if counter == 0:
l_hms = times[:3]
r_hms = times[3:]
logging.info('(i2c) clock new time received l:%s r:%s', l_hms, r_hms)
if self.in_settime:
logging.info('(i2c) clock still not finished set time, sending old time')
else:
# DgtPi needs 2secs for a stopped clock to return the correct(!) time
# we make it easy here and just set the time from the side counting down
if self.side_running == ClockSide.LEFT:
self.l_time = l_hms[0] * 3600 + l_hms[1] * 60 + l_hms[2]
if self.side_running == ClockSide.RIGHT:
self.r_time = r_hms[0] * 3600 + r_hms[1] * 60 + r_hms[2]
text = Message.DGT_CLOCK_TIME(time_left=self.l_time, time_right=self.r_time, connect=True, dev='i2c')
DisplayMsg.show(text)
time.sleep(0.1)
def _run_configure(self):
res = self.lib.dgtpicom_configure()
if res < 0:
logging.warning('Configure() also failed %i, resetting the dgtpi clock', res)
self.lib.dgtpicom_stop()
self.lib.dgtpicom_init()
def _display_on_dgt_pi(self, text: str, beep=False, left_icons=ClockIcons.NONE, right_icons=ClockIcons.NONE):
if len(text) > 11:
logging.warning('(i2c) clock message too long [%s]', text)
logging.debug('[%s]', text)
text = bytes(text, 'utf-8')
with self.lib_lock:
res = self.lib.dgtpicom_set_text(text, 0x03 if beep else 0x00, left_icons.value, right_icons.value)
if res < 0:
logging.warning('SetText() returned error %i, running configure', res)
self._run_configure()
res = self.lib.dgtpicom_set_text(text, 0x03 if beep else 0x00, left_icons.value, right_icons.value)
if res < 0:
logging.warning('finally failed %i', res)
return False
else:
return True
def display_text_on_clock(self, message):
"""Display a text on the dgtpi."""
text = message.l
if text is None:
text = message.m
if self.get_name() not in message.devs:
logging.debug('ignored %s - devs: %s', text, message.devs)
return
left_icons = message.ld if hasattr(message, 'ld') else ClockIcons.NONE
right_icons = message.rd if hasattr(message, 'rd') else ClockIcons.NONE
return self._display_on_dgt_pi(text, message.beep, left_icons, right_icons)
def display_move_on_clock(self, message):
"""Display a move on the dgtpi."""
bit_board, text = self.get_san(message)
text = '{:3d}{:s}'.format(bit_board.fullmove_number, text)
if self.get_name() not in message.devs:
logging.debug('ignored %s - devs: %s', text, message.devs)
return True
left_icons = message.ld if hasattr(message, 'ld') else ClockIcons.DOT
right_icons = message.rd if hasattr(message, 'rd') else ClockIcons.NONE
return self._display_on_dgt_pi(text, message.beep, left_icons, right_icons)
def display_time_on_clock(self, message):
"""Display the time on the dgtpi."""
if self.get_name() not in message.devs:
logging.debug('ignored endText - devs: %s', message.devs)
return True
if self.side_running != ClockSide.NONE or message.force:
with self.lib_lock:
res = self.lib.dgtpicom_end_text()
if res < 0:
logging.warning('EndText() returned error %i, running configure', res)
self._run_configure()
res = self.lib.dgtpicom_end_text()
if res < 0:
logging.warning('finally failed %i', res)
return False
else:
logging.debug('(i2c) clock isnt running - no need for endText')
return True
def light_squares_on_revelation(self, uci_move: str):
"""Handle this by hw.py."""
return True
def clear_light_on_revelation(self):
"""Handle this by hw.py."""
return True
def stop_clock(self, devs: set):
"""Stop the dgtpi."""
if self.get_name() not in devs:
logging.debug('ignored stopClock - devs: %s', devs)
return True
logging.debug('(%s) clock sending stop time to clock l:%s r:%s', ','.join(devs),
hms_time(self.l_time), hms_time(self.r_time))
return self._resume_clock(ClockSide.NONE)
def _resume_clock(self, side: ClockSide):
if self.l_time >= 3600 * 10 or self.r_time >= 3600 * 10:
logging.warning('time values not set - abort function')
return False
l_run = r_run = 0
if side == ClockSide.LEFT:
l_run = 1
if side == ClockSide.RIGHT:
r_run = 1
with self.lib_lock:
res = self.lib.dgtpicom_run(l_run, r_run)
if res < 0:
logging.warning('Run() returned error %i, running configure', res)
self._run_configure()
res = self.lib.dgtpicom_run(l_run, r_run)
if res < 0:
logging.warning('finally failed %i', res)
return False
else:
self.side_running = side
return True
def start_clock(self, side: ClockSide, devs: set):
"""Start the dgtpi."""
if self.get_name() not in devs:
logging.debug('ignored startClock - devs: %s', devs)
return True
l_hms = hms_time(self.l_time)
r_hms = hms_time(self.r_time)
logging.debug('(%s) clock sending start time to clock l:%s r:%s', ','.join(devs), l_hms, r_hms)
l_run = r_run = 0
if side == ClockSide.LEFT:
l_run = 1
if side == ClockSide.RIGHT:
r_run = 1
with self.lib_lock:
res = self.lib.dgtpicom_set_and_run(l_run, l_hms[0], l_hms[1], l_hms[2],
r_run, r_hms[0], r_hms[1], r_hms[2])
if res < 0:
logging.warning('SetAndRun() returned error %i, running configure', res)
self._run_configure()
res = self.lib.dgtpicom_set_and_run(l_run, l_hms[0], l_hms[1], l_hms[2],
r_run, r_hms[0], r_hms[1], r_hms[2])
if res < 0:
logging.warning('finally failed %i', res)
return False
else:
self.side_running = side
Timer(0.9, self.out_settime).start() # delay abit cause the clock needs time to update its time result
return True
def out_settime(self):
self.in_settime = False
def set_clock(self, time_left: int, time_right: int, devs: set):
"""Set the dgtpi."""
if self.get_name() not in devs:
logging.debug('ignored setClock - devs: %s', devs)
return True
l_hms = hms_time(time_left)
r_hms = hms_time(time_right)
logging.debug('(%s) clock received last time from clock l:%s r:%s [ign]', ','.join(devs),
hms_time(self.l_time), hms_time(self.r_time))
logging.debug('(%s) clock sending set time to clock l:%s r:%s [use]', ','.join(devs), l_hms, r_hms)
self.in_settime = True
self.l_time = time_left
self.r_time = time_right
return True
def get_name(self):
"""Get name."""
return 'i2c'