forked from bnjmnp/pibooth
-
Notifications
You must be signed in to change notification settings - Fork 0
/
light.py
90 lines (71 loc) · 2.27 KB
/
light.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# -*- coding: utf-8 -*-
import threading
from pibooth.controls import GPIO
class BlinkingThread(threading.Thread):
"""Thread which manage blinking LEDs synchronously.
"""
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self._leds = []
self._tick = 0.3
self._lock = threading.Lock()
self._stop_event = threading.Event()
self.start()
def register(self, led):
"""Add a new LED to manage.
"""
with self._lock:
if led not in self._leds:
self._leds.append(led)
def unregister(self, led):
"""Remove the given LED from the blinking management.
"""
with self._lock:
if led in self._leds:
self._leds.remove(led)
def run(self):
"""Cyclic call to the method :py:meth:`PtbLed.switch_on` and
:py:meth:`PtbLed.switch_off` of the registered LED.
"""
sequence = ['switch_on', 'switch_off']
while not self._stop_event.is_set():
for func_name in sequence:
with self._lock:
for led in self._leds:
getattr(led, func_name)()
if self._stop_event.wait(self._tick):
return # Stop requested
def stop(self):
"""Stop the thread.
"""
self._stop_event.set()
self.join()
class PtbLed(object):
"""LED management.
"""
_blinking_thread = BlinkingThread()
def __init__(self, pin):
self.pin = pin
GPIO.setup(pin, GPIO.OUT)
def switch_on(self):
"""Switch on the LED.
"""
if threading.current_thread() != self._blinking_thread:
self._blinking_thread.unregister(self)
GPIO.output(self.pin, GPIO.HIGH)
def switch_off(self):
"""Switch off the LED.
"""
if threading.current_thread() != self._blinking_thread:
self._blinking_thread.unregister(self)
GPIO.output(self.pin, GPIO.LOW)
def blink(self):
"""Blink the LED.
"""
self._blinking_thread.register(self)
def quit(self):
"""Switch off and stop the blinking thread.
"""
self.switch_off()
self._blinking_thread.stop()