Skip to content

Commit

Permalink
Use internal _rpi_ws281x lib
Browse files Browse the repository at this point in the history
  • Loading branch information
rm-hull committed Jul 21, 2017
1 parent d5ce92e commit 44b48dc
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 58 deletions.
84 changes: 73 additions & 11 deletions luma/led_matrix/device.py
Expand Up @@ -218,16 +218,52 @@ def __init__(self, dma_interface=None, width=8, height=4, cascaded=None,
self.capabilities(width, height, rotate, mode="RGB")
self._mapping = list(mapping or range(self.cascaded))
assert(self.cascaded == len(self._mapping))
self._ws = dma_interface or self.__ws281x__()(num=width * height, pin=18)
self._ws.begin()

self._contrast = None
self._prev_contrast = 0x70

ws = self._ws = dma_interface or self.__ws281x__()

# Create ws2811_t structure and fill in parameters.
self._leds = ws.new_ws2811_t()

pin = 18
channel = 0
dma = 5
freq_hz = 800000
brightness = 255
strip_type = ws.WS2811_STRIP_GRB
invert = False

# Initialize the channels to zero
for channum in range(2):
chan = ws.ws2811_channel_get(self._leds, channum)
ws.ws2811_channel_t_count_set(chan, 0)
ws.ws2811_channel_t_gpionum_set(chan, 0)
ws.ws2811_channel_t_invert_set(chan, 0)
ws.ws2811_channel_t_brightness_set(chan, 0)

# Initialize the channel in use
self._channel = ws.ws2811_channel_get(self._leds, channel)
ws.ws2811_channel_t_count_set(self._channel, self.cascaded)
ws.ws2811_channel_t_gpionum_set(self._channel, pin)
ws.ws2811_channel_t_invert_set(self._channel, 0 if not invert else 1)
ws.ws2811_channel_t_brightness_set(self._channel, brightness)
ws.ws2811_channel_t_strip_type_set(self._channel, strip_type)

# Initialize the controller
ws.ws2811_t_freq_set(self._leds, freq_hz)
ws.ws2811_t_dmanum_set(self._leds, dma)

resp = ws.ws2811_init(self._leds)
if resp != 0:
raise RuntimeError('ws2811_init failed with code {0}'.format(resp))

self.clear()
self.show()

def __ws281x__(self):
from neopixel import Adafruit_NeoPixel
return Adafruit_NeoPixel
import _rpi_ws281x
return _rpi_ws281x

def display(self, image):
"""
Expand All @@ -239,10 +275,11 @@ def display(self, image):

ws = self._ws
m = self._mapping
for idx, (r, g, b) in enumerate(image.getdata()):
ws.setPixelColorRGB(m[idx], r, g, b)
for idx, (red, green, blue) in enumerate(image.getdata()):
color = (red << 16) | (green << 8) | blue
ws.ws2811_led_set(self._channel, m[idx], color)

ws.show()
self._flush()

def show(self):
"""
Expand Down Expand Up @@ -271,9 +308,34 @@ def contrast(self, value):
"""
assert(0x00 <= value <= 0xFF)
self._contrast = value
ws = self._ws
ws.setBrightness(value)
ws.show()
self._ws.ws2811_channel_t_brightness_set(self._channel, value)
self._flush()

def _flush(self):
resp = self._ws.ws2811_render(self._leds)
if resp != 0:
raise RuntimeError('ws2811_render failed with code {0}'.format(resp))

def __del__(self):
# Required because Python will complain about memory leaks
# However there's no guarantee that "ws" will even be set
# when the __del__ method for this class is reached.
if self._ws is not None:
self.cleanup()

def cleanup(self):
"""
Attempt to reset the device & switching it off prior to exiting the
python process.
"""
self.hide()
self.clear()

if self._leds is not None:
self._ws.ws2811_fini(self._leds)
self._ws.delete_ws2811_t(self._leds)
self._leds = None
self._channel = None


# Alias for ws2812
Expand Down
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -28,6 +28,7 @@ def read_file(fname):
install_deps = ["luma.core>=0.9.5"]
if os.uname()[4].startswith("arm"):
install_deps += "rpi-ws281x"
install_deps += "ws2812"

setup(
name="luma.led_matrix",
Expand Down
159 changes: 112 additions & 47 deletions tests/test_ws2812.py
Expand Up @@ -3,99 +3,164 @@
# Copyright (c) 2014-17 Richard Hull and contributors
# See LICENSE.rst for details.

import pytest

from luma.led_matrix.device import neopixel
from luma.core.render import canvas

from helpers import Mock, call


ws2812 = Mock(unsafe=True)
ws = Mock(unsafe=True)
chan = "channel"
leds = "leds"


def setup_function(function):
ws2812.reset_mock()
ws2812.command.side_effect = None
ws.reset_mock()
ws.command.side_effect = None
ws.ws2811_init = Mock(return_value=0)
ws.ws2811_render = Mock(return_value=0)
ws.ws2811_channel_get = Mock(return_value=chan)
ws.ws2811_new_ws2811_t = Mock(return_value=leds)


def test_init_cascaded():
device = neopixel(ws2812, cascaded=7)
device = neopixel(ws, cascaded=7)
assert device.width == 7
assert device.height == 1
ws2812.begin.assert_called_once_with()
ws2812.setPixelColorRGB.assert_has_calls([
call(i, 0, 0, 0) for i in range(7)])
assert ws.ws2811_channel_t_count_set.called
assert ws.ws2811_channel_t_gpionum_set.called
assert ws.ws2811_channel_t_invert_set.called
assert ws.ws2811_channel_t_brightness_set.called
assert ws.ws2811_channel_t_strip_type_set.called
assert ws.ws2811_t_freq_set.called
assert ws.ws2811_t_dmanum_set.called
assert ws.ws2811_init.called
ws.ws2811_led_set.assert_has_calls([
call(chan, i, 0x000000) for i in range(7)])
assert ws.ws2811_render.called


def test_init_4x8():
device = neopixel(ws2812)
device = neopixel(ws)
assert device.cascaded == 32
ws2812.begin.assert_called_once_with()
ws2812.setPixelColorRGB.assert_has_calls([
call(i, 0, 0, 0) for i in range(32)])
assert ws.ws2811_channel_t_count_set.called
assert ws.ws2811_channel_t_gpionum_set.called
assert ws.ws2811_channel_t_invert_set.called
assert ws.ws2811_channel_t_brightness_set.called
assert ws.ws2811_channel_t_strip_type_set.called
assert ws.ws2811_t_freq_set.called
assert ws.ws2811_t_dmanum_set.called
assert ws.ws2811_init.called
ws.ws2811_led_set.assert_has_calls([
call(chan, i, 0x000000) for i in range(32)])
assert ws.ws2811_render.called


def test_init_fail():
ws.reset_mock()
ws.ws2811_init = Mock(return_value=-1)
with pytest.raises(RuntimeError) as ex:
neopixel(ws, cascaded=7)
assert "ws2811_init failed with code -1" in str(ex.value)


def test_clear():
device = neopixel(ws)
ws.reset_mock()
device.clear()
ws.ws2811_led_set.assert_has_calls([
call(chan, i, 0x000000) for i in range(32)])
assert ws.ws2811_render.called


def test_cleanup():
device = neopixel(ws)
device.cleanup()
ws.ws2811_led_set.assert_has_calls([
call(chan, i, 0x000000) for i in range(32)])
assert ws.ws2811_render.called
assert ws.ws2811_fini.called
assert ws.delete_ws2811_t.called
assert device._leds is None
assert device._channel is None


def test_hide():
device = neopixel(ws2812, cascaded=5)
ws2812.reset_mock()
device = neopixel(ws, cascaded=5)
ws.reset_mock()
device.hide()
ws2812.setPixelColor.assert_not_called()
ws2812.show.assert_called_once_with()
ws.ws2811_led_set.assert_not_called()
assert ws.ws2811_render.called


def test_show():
device = neopixel(ws2812, cascaded=5)
ws2812.reset_mock()
device = neopixel(ws, cascaded=5)
ws.reset_mock()
device.hide()
device.show()
ws2812.setPixelColor.assert_not_called()
ws2812.show.assert_called_with()
ws.ws2811_led_set.assert_not_called()
assert ws.ws2811_render.called


def test_contrast():
device = neopixel(ws2812, cascaded=6)
ws2812.reset_mock()
device = neopixel(ws, cascaded=6)
ws.reset_mock()
device.contrast(0x6B)
ws2812.setBrightness.assert_called_once_with(0x6B)
ws.ws2811_channel_t_brightness_set.assert_called_once_with(chan, 0x6B)


def test_display():
device = neopixel(ws2812, width=4, height=4)
ws2812.reset_mock()
device = neopixel(ws, width=4, height=4)
ws.reset_mock()

with canvas(device) as draw:
draw.rectangle(device.bounding_box, outline="red")

ws2812.setPixelColorRGB.assert_has_calls([
call(0, 0xFF, 0, 0),
call(1, 0xFF, 0, 0),
call(2, 0xFF, 0, 0),
call(3, 0xFF, 0, 0),
call(4, 0xFF, 0, 0),
call(5, 0, 0, 0),
call(6, 0, 0, 0),
call(7, 0xFF, 0, 0),
call(8, 0xFF, 0, 0),
call(9, 0, 0, 0),
call(10, 0, 0, 0),
call(11, 0xFF, 0, 0),
call(12, 0xFF, 0, 0),
call(13, 0xFF, 0, 0),
call(14, 0xFF, 0, 0),
call(15, 0xFF, 0, 0),
ws.ws2811_led_set.assert_has_calls([
call(chan, 0, 0xFF0000),
call(chan, 1, 0xFF0000),
call(chan, 2, 0xFF0000),
call(chan, 3, 0xFF0000),
call(chan, 4, 0xFF0000),
call(chan, 5, 0x000000),
call(chan, 6, 0x000000),
call(chan, 7, 0xFF0000),
call(chan, 8, 0xFF0000),
call(chan, 9, 0x000000),
call(chan, 10, 0x000000),
call(chan, 11, 0xFF0000),
call(chan, 12, 0xFF0000),
call(chan, 13, 0xFF0000),
call(chan, 14, 0xFF0000),
call(chan, 15, 0xFF0000),
])
assert ws2812.show.called
assert ws.ws2811_render.called


def test_display_fail():
device = neopixel(ws, cascaded=7)
ws.reset_mock()
ws.ws2811_render = Mock(return_value=-1)

with pytest.raises(RuntimeError) as ex:
with canvas(device) as draw:
draw.rectangle(device.bounding_box, outline="red")

assert "ws2811_render failed with code -1" in str(ex.value)


def test_mapping():
num_pixels = 16
device = neopixel(ws2812, cascaded=num_pixels, mapping=reversed(list(range(num_pixels))))
ws2812.reset_mock()
device = neopixel(ws, cascaded=num_pixels, mapping=reversed(list(range(num_pixels))))
ws.reset_mock()

with canvas(device) as draw:
for i in range(device.cascaded):
draw.point((i, 0), (i, 0, 0))

expected = [call(num_pixels - i - 1, i, 0, 0) for i in range(num_pixels)]
ws2812.setPixelColorRGB.assert_has_calls(expected)
expected = [call(chan, num_pixels - i - 1, i << 16) for i in range(num_pixels)]
ws.ws2811_led_set.assert_has_calls(expected)

assert ws2812.show.called
assert ws.ws2811_render.called

0 comments on commit 44b48dc

Please sign in to comment.