Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
Gadgetoid committed Feb 9, 2022
2 parents 0ae1002 + 208c2e1 commit a01326e
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 31 deletions.
5 changes: 4 additions & 1 deletion examples/anvil-colour-control.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@
# Define functions that can be called from your Anvil app on the web
anvil.server.connect(app.uplink_key)


@anvil.server.callable
def set_color(color_string):
print("Setting LEDs to {}".format(color_string))
c = Color(color_string)
blinkt.set_all(c.red*255, c.green*255, c.blue*255, 1.0)
blinkt.set_all(c.red * 255, c.green * 255, c.blue * 255, 1.0)
blinkt.show()


@anvil.server.callable
def clear():
print("Clearing LEDs")
blinkt.clear()
blinkt.show()


# Display the URL where you can control the Blinkt LEDS
print("Control your Blinkt LEDs at {}".format(app.origin))
print("Press Ctrl-C to exit")
Expand Down
14 changes: 7 additions & 7 deletions examples/eyedropper.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
#!/usr/bin/env python3

from PIL import Image, ImageGrab
import pyautogui, sys
import blinkt
from PIL import ImageGrab
import pyautogui
import blinkt
import time

print('Press Ctrl-C to quit.')

try:
while True:
x, y = pyautogui.position()
im = ImageGrab.grab(bbox =(x-1, y, x, y+1))
im = ImageGrab.grab(bbox=(x - 1, y, x, y + 1))
rawrgb = list(im.getdata())
rgb = str(rawrgb)[2:-2]
r, g, b = rgb.split(', ')
rgb = str(rawrgb)[2:-2]
r, g, b = rgb.split(', ')
blinkt.set_all(r, g, b)
blinkt.set_brightness(1)
blinkt.show()
time.sleep(0.01)
except KeyboardInterrupt:
print('\n')
print('\n')
45 changes: 22 additions & 23 deletions examples/kitt.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python

from sys import exit # so the blinkt lights can turn off
import time # so we can wait between frames
import blinkt # so we can talk to our blinkt lights!

Expand All @@ -9,9 +8,9 @@
DECAY_FACTOR = 1.5 # how quickly should MAX_COLOUR fade? (1.5 works well)
TIME_SLEEP = 0.04 # seconds (0.04 works well)

PIXELS = blinkt.NUM_PIXELS # usually 8, can use fewer if you like!
PIXELS = blinkt.NUM_PIXELS # usually 8, can use fewer if you like!

blinkt.clear # make all pixels blank / black
blinkt.clear # make all pixels blank / black
blinkt.set_brightness(BRIGHTNESS)

brightpixel = -1
Expand All @@ -20,23 +19,23 @@
print('Hello Michael.\nHow are you today?')

while True:
# decay all pixels
for x in range(PIXELS):
pixel = blinkt.get_pixel(x) # format is [ r, g, b, brightness? ]
blinkt.set_pixel(x, pixel[0] / DECAY_FACTOR, 0, 0)

# brightpixel should move back and forth all the pixels,
# in a ping-pong, triangle wave. Not (co)sine.
brightpixel += direction
if brightpixel >= PIXELS - 1:
brightpixel = PIXELS - 1
direction = - abs(direction)
if brightpixel <= 0:
brightpixel = 0
direction = abs(direction)

blinkt.set_pixel(brightpixel, MAX_COLOUR, 0, 0)

blinkt.show() # draw the lights!
time.sleep(TIME_SLEEP) # wait a bit before working on next frame
# decay all pixels
for x in range(PIXELS):
pixel = blinkt.get_pixel(x) # format is [ r, g, b, brightness? ]
blinkt.set_pixel(x, pixel[0] / DECAY_FACTOR, 0, 0)

# brightpixel should move back and forth all the pixels,
# in a ping-pong, triangle wave. Not (co)sine.
brightpixel += direction

if brightpixel >= PIXELS - 1:
brightpixel = PIXELS - 1
direction = - abs(direction)
if brightpixel <= 0:
brightpixel = 0
direction = abs(direction)

blinkt.set_pixel(brightpixel, MAX_COLOUR, 0, 0)

blinkt.show() # draw the lights!
time.sleep(TIME_SLEEP) # wait a bit before working on next frame
229 changes: 229 additions & 0 deletions projects/mqtt/mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
#!/usr/bin/env python

from sys import exit
import argparse
import time

try:
import paho.mqtt.client as mqtt
except ImportError:
raise ImportError("This example requires the paho-mqtt module\nInstall with: sudo pip install paho-mqtt")

import blinkt


MQTT_SERVER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "pimoroni/blinkt"

# Set these to use authorisation
MQTT_USER = None
MQTT_PASS = None

description = """\
MQTT Blinkt! Control
This example uses MQTT messages from {server} on port {port} to control Blinkt!
It will monitor the {topic} topic by default, and understands the following messages:
rgb,<pixel>,<r>,<g>,<b> - Set a single pixel to an RGB colour. Example: rgb,1,255,0,255
clr - Clear Blinkt!
bri,<val> - Set global brightness (for val in range 0.0-1.0)
You can use the online MQTT tester at http://www.hivemq.com/demos/websocket-client/ to send messages.
Use {server} as the host, and port 80 (Eclipse's websocket port). Set the topic to topic: {topic}
""".format(
server=MQTT_SERVER,
port=MQTT_PORT,
topic=MQTT_TOPIC
)
parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-H', '--host', default=MQTT_SERVER,
help='MQTT broker to connect to')
parser.add_argument('-P', '--port', default=MQTT_PORT, type=int,
help='port on MQTT broker to connect to')
parser.add_argument('-T', '--topic', action='append',
help='MQTT topic to subscribe to; can be repeated for multiple topics')
parser.add_argument('-u', '--user',
help='MQTT broker user name')
parser.add_argument('-p', '--pass', dest='pw',
help='MQTT broker password')
parser.add_argument('-q', '--quiet', default=False, action='store_true',
help='Minimal output (eg for running as a daemon)')
parser.add_argument('-g', '--green-hack', default=False, action='store_true',
help='Apply hack to green channel to improve colour saturation')
parser.add_argument('--timeout', default='0',
help='Pixel timeout(s). Pixel will blank if last update older than X seconds. May be a single number or comma-separated list. Use 0 for no timeout')
parser.add_argument('-D', '--daemon', metavar='PIDFILE',
help='Run as a daemon (implies -q)')

args = parser.parse_args()

# Get timeout list into expected form
args.timeout = args.timeout.split(',')

if len(args.timeout) == 1:
args.timeout = args.timeout * blinkt.NUM_PIXELS
elif len(args.timeout) != blinkt.NUM_PIXELS:
print("--timeout list must be {} elements long".format(blinkt.NUM_PIXELS))
exit(1)

try:
args.timeout = [int(x) for x in args.timeout]
except ValueError as e:
print("Bad timeout value: {}".format(e))
exit(1)

args.timeout = [x and x or 0 for x in args.timeout]
args.min_timeout = min(args.timeout)

if args.daemon:
import signal
try:
import daemon
except ImportError:
raise ImportError("--daemon requires the daemon module. Install with: sudo pip install python-daemon")
try:
import lockfile.pidlockfile
except ImportError:
raise ImportError("--daemon requires the lockfile module. Install with: sudo pip install lockfile")

if not args.topic:
args.topic = [MQTT_TOPIC]


class PixelClient(mqtt.Client):
def __init__(self, prog_args, *args, **kwargs):
super(PixelClient, self).__init__(*args, **kwargs)
self.args = prog_args
self.update_time = [None] * blinkt.NUM_PIXELS
self.on_connect = self.on_connect
self.on_message = self.on_message

blinkt.set_clear_on_exit()
# Some stuff doesn't get set up until the first time show() is called
blinkt.show()

if self.args.user is not None and self.args.pw is not None:
self.username_pw_set(username=self.args.user, password=self.args.pw)
self.connect(self.args.host, self.args.port, 60)

def cmd_clear(self):
blinkt.clear()
blinkt.show()

def cmd_brightness(self, bri):
try:
bri = float(bri)
except ValueError:
print("Malformed command brightness, expected float, got: {}".format(str(bri)))
return
blinkt.set_brightness(bri)
blinkt.show()

def cmd_rgb(self, pixel, data):
try:
if pixel == "*":
pixel = None
else:
pixel = int(pixel)
if pixel > 7:
print("Pixel out of range: {}".format(str(pixel)))
return

r, g, b = [int(x) & 0xff for x in data]
if self.args.green_hack:
# Green is about twice the luminosity for a given value
# than red or blue, so apply a hackish linear compensation
# here taking care of corner cases at 0 and 255. To do it
# properly, it should really be a curve but this approximation
# is quite a lot better than nothing.
if r not in [0, 255]:
r = r + 1
if g not in [0]:
g = g / 2 + 1
if b not in [0, 255]:
b = b + 1

if not self.args.quiet:
print('rgb', pixel, r, g, b)

except ValueError:
print("Malformed RGB command: {} {}".format(str(pixel), str(data)))
return

if pixel is None:
for x in range(blinkt.NUM_PIXELS):
blinkt.set_pixel(x, r, g, b)
self.update_time[x] = time.time()
else:
blinkt.set_pixel(pixel, r, g, b)
self.update_time[pixel] = time.time()

blinkt.show()

def on_connect(self, client, userdata, flags, rc):
if not self.args.quiet:
print("Connected to {s}:{p} listening for topics {t} with result code {r}.\nSee {c} --help for options.".format(
s=self.args.host,
p=self.args.port,
t=', '.join(self.args.topic),
r=rc,
c=parser.prog
))

for topic in self.args.topic:
client.subscribe(topic)

def on_message(self, client, userdata, msg):

data = msg.payload.decode('utf-8').strip().split(',')
command = data.pop(0)
print(command, data)

if command == "clr" and len(data) == 0:
self.cmd_clear()
return

if command == "bri" and len(data) == 1:
self.cmd_brightness(data[0])
return

if command == "rgb" and len(data) == 4:
self.cmd_rgb(data[0], data[1:])
return

def blank_timed_out_pixels(self):
now = time.time()
to_upt_pairs = zip(self.args.timeout, self.update_time)
for pixel, (to, uptime) in enumerate(to_upt_pairs):
if to is not None and uptime is not None and uptime < now - to:
blinkt.set_pixel(pixel, 0, 0, 0)
self.update_time[pixel] = None
blinkt.show()

def loop(self, timeout=1.0, max_packets=1):
self.blank_timed_out_pixels()
return super(PixelClient, self).loop(timeout, max_packets)


def sigterm(signum, frame):
client._thread_terminate = True


if args.daemon:
# Monkey-patch daemon so's the daemon starts reasonably quickly. FDs don't
# strictly speaking need closing anyway because we haven't opened any (yet).
daemon.daemon.get_maximum_file_descriptors = lambda: 32
args.quiet = True
pidlf = lockfile.pidlockfile.PIDLockFile(args.daemon)
with daemon.DaemonContext(
pidfile=pidlf,
signal_map={signal.SIGTERM: sigterm}):
client = PixelClient(args)
client.loop_forever()
else:
client = PixelClient(args)
client.loop_forever()

0 comments on commit a01326e

Please sign in to comment.