Skip to content

Commit

Permalink
Update plasmad and plasmactl for new library
Browse files Browse the repository at this point in the history
Switch plasma to use /etc/plasma/plasma.conf for LED configuration, instead of editing the systemd unit.

A couple of bugfixes and tweaks to the plasma library make this work smoothly:

1. Allow "auto" to accept a config file path
2. Squash errors related to pruning config options before initialising a device
3. Support regular list/dict in set_sequence
  • Loading branch information
Gadgetoid committed Jun 17, 2021
1 parent c004b38 commit 56d7c0c
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 42 deletions.
14 changes: 14 additions & 0 deletions daemon/etc/plasma/plasma.conf
@@ -0,0 +1,14 @@
pixels: 40
devices:
picade_hat:
type: APA102
pixels: 40
offset: 0
gpio_data: 14
gpio_clock: 15
picade_player_x:
type: serial
port: /dev/ttyACM0
pixels: 40
offset: 0
enabled: False
50 changes: 24 additions & 26 deletions daemon/usr/bin/plasma
Expand Up @@ -7,13 +7,14 @@ import os
import sys
import threading
from datetime import datetime
from optparse import OptionParser
import argparse

# Application Defaults
CONFIG_FILE = "/etc/plasma/plasma.conf"
PIPE_FILE = "/tmp/plasma"
PATTERNS = "/etc/plasma/"
FPS = 30
LIGHTS = 10
LIGHTS = 10 # Actual number of pixels is 4x this number
DEBUG = False

# Log & PID files
Expand Down Expand Up @@ -66,18 +67,18 @@ class FIFO():


def main():
opts = options()
args = get_args()

if opts.daemonize:
if args.daemonize:
fork()

from plasma import get_device
Plasma, args = get_device(opts.device)
plasma = Plasma(opts.lights, **args)
from plasma import auto

plasma = auto(f"GPIO:14:15:pixel_count={LIGHTS * 4}", CONFIG_FILE)

log("Starting Plasma in the {daemon} with framerate {fps}fps".format(
daemon='background' if opts.daemonize else 'foreground',
fps=opts.fps))
daemon='background' if args.daemonize else 'foreground',
fps=args.fps))

log("Plasma input pipe: {}".format(PIPE_FILE))

Expand All @@ -90,7 +91,7 @@ def main():
alpha = pattern_meta['alpha']
channels = 4 if alpha else 3

while not stopped.wait(1.0 / opts.fps):
while not stopped.wait(1.0 / args.fps):
delta = time.time() * 60
command = fifo.readline()
if command is not None:
Expand All @@ -110,7 +111,7 @@ def main():
log("Invalid colour: {}".format(command))
elif len(rgb) == 2 and rgb[0] == "fps":
try:
opts.fps = int(rgb[1])
args.fps = int(rgb[1])
log("Framerate set to: {}fps".format(rgb[1]))
except ValueError:
log("Invalid framerate: {}".format(rgb[1]))
Expand All @@ -122,13 +123,12 @@ def main():
if pattern is not None:
offset_y = int(delta % pattern_h)
row = pattern[offset_y]
for x in range(opts.lights * 4):
for x in range(plasma.get_pixel_count()):
offset_x = (x * channels) % (pattern_w * channels)
r, g, b = row[offset_x:offset_x + 3]
plasma.set_pixel(x, r, g, b)
else:
for x in range(opts.lights):
plasma.set_light(x, r, g, b)
plasma.set_all(r, g, b)

plasma.show()

Expand All @@ -146,17 +146,15 @@ def load_pattern(pattern_name):
return None, 0, 0, None


def options():
parser = OptionParser()
parser.add_option("-d", "--daemonize", dest="daemonize", action="store_true", default=False,
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--daemonize", action="store_true", default=False,
help="run plasma as a daemon")
parser.add_option("-f", "--fps", action="store", dest="fps", type="int", default=FPS,
parser.add_argument("-f", "--fps", type=int, default=FPS,
help="set plasma LED update framerate")
parser.add_option("-l", "--lights", action="store", dest="lights", type="int", default=LIGHTS,
help="set number of lights in your plasma chain")
parser.add_option("-o", "--device", default="GPIO:15:14",
help="set output device, default is GPIO, BCM15 = Data, BCM14 = Clock")
return parser.parse_args()[0]
parser.add_argument("-c", "--config", type=str, default=CONFIG_FILE,
help="path to plasma config file")
return parser.parse_known_args()[0]


def fork():
Expand Down Expand Up @@ -184,9 +182,9 @@ def fork():
print("Fork #2 failed: {} ({})".format(e.errno, e.strerror))
sys.exit(1)

si = file("/dev/null", 'r')
so = file(LOG_FILE, 'a+')
se = file(ERR_FILE, 'a+', 0)
si = open("/dev/null", 'r')
so = open(LOG_FILE, 'a+')
se = open(ERR_FILE, 'a+')

os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
Expand Down
4 changes: 4 additions & 0 deletions daemon/usr/bin/plasmactl
Expand Up @@ -52,11 +52,15 @@ if __name__ == "__main__":
with open_fifo(FIFO) as fifo:
fifo.write(f"{args.pattern}\n".encode("utf-8"))
fifo.flush()
sys.exit(0)

if args.colour:
r, g, b = args.colour
print(f"Setting colour to {r}, {g}, {b}")
with open_fifo(FIFO) as fifo:
fifo.write(f"{r} {g} {b}\n".encode("utf-8"))
fifo.flush()
sys.exit(0)

parser.print_help()
sys.exit(1)
17 changes: 8 additions & 9 deletions library/plasma/__init__.py
Expand Up @@ -5,21 +5,20 @@
__version__ = '2.0.0'


def auto(default=None):
def auto(default=None, descriptor=None):
"""Return a Plasma device instance.
Will try to get arguments from command-line,
otherwise falling back to supplied defaults.
"""
descriptor = None

if len(sys.argv) > 1:
descriptor = sys.argv[1]
elif default is not None:
descriptor = default
else:
raise ValueError("get_device requires a descriptor")
if descriptor is None:
if len(sys.argv) > 1:
descriptor = sys.argv[1]
elif default is not None:
descriptor = default
else:
raise ValueError("get_device requires a descriptor")

plasma, options = get_device(descriptor)

Expand Down
11 changes: 9 additions & 2 deletions library/plasma/core.py
Expand Up @@ -78,8 +78,15 @@ def set_all(self, r, g, b, brightness=None):

def set_sequence(self, sequence):
"""Set RGB values from a Plasma FX sequence."""
for index, rgb in sequence:
self.set_pixel(index, *rgb)
if type(sequence) is list:
for index, led in enumerate(sequence):
self.set_pixel(index, *led)
elif type(sequence) is dict:
for index, led in sequence.items():
self.set_pixel(index, *led)
else:
for index, led in sequence:
self.set_pixel(index, *led)

def get_pixel(self, x):
"""Get the RGB and brightness value of a specific pixel.
Expand Down
32 changes: 28 additions & 4 deletions library/plasma/matrix.py
Expand Up @@ -29,13 +29,30 @@ def __init__(self, config_file=None):
self._pixel_count = int(self._config["pixels"])

for output_name, output_options in self._config["devices"].items():
enabled = output_options.get("enabled", True)
if not enabled:
continue

output_type = output_options.get("type")
pixels = output_options.get("pixels", self._pixel_count)
offset = output_options.get("offset", 0)

del output_options["type"]
del output_options["pixels"]
del output_options["offset"]

try:
del output_options["enabled"]
except KeyError:
pass

try:
del output_options["pixels"]
except KeyError:
pass

try:
del output_options["offset"]
except KeyError:
pass

output_device = self.get_output_device(output_type)

Expand Down Expand Up @@ -96,8 +113,15 @@ def set_all(self, r, g, b, brightness=None):

def set_sequence(self, sequence):
"""Set all LEDs from a buffer of individual colours."""
for index, led in sequence:
self.set_pixel(index, *led)
if type(sequence) is list:
for index, led in enumerate(sequence):
self.set_pixel(index, *led)
elif type(sequence) is dict:
for index, led in sequence.items():
self.set_pixel(index, *led)
else:
for index, led in sequence:
self.set_pixel(index, *led)

def get_pixel(self, x):
"""Get the RGB and brightness value of a specific pixel."""
Expand Down
17 changes: 17 additions & 0 deletions library/tests/conftest.py
Expand Up @@ -95,6 +95,23 @@ def config_file():
file.close()



@pytest.fixture(scope='function', autouse=False)
def config_file_default_pixels_and_offset():
"""Temporary config file."""
file = tempfile.NamedTemporaryFile(delete=False)
file.write(b"""pixels: 100
devices:
TABLE:
type: APA102
gpio_data: 10
gpio_clock: 11
""")
file.flush()
yield pathlib.Path(file.name)
file.close()


@pytest.fixture(scope='function', autouse=False)
def argv():
"""Replace sys.argv to avoid feeding Plasma auto the test args."""
Expand Down
46 changes: 46 additions & 0 deletions library/tests/test_apa102.py
Expand Up @@ -15,6 +15,52 @@ def test_apa102_setup(GPIO):
])


def test_apa102_set_pixel(GPIO):
"""Test a pixel can be set."""
from plasma.apa102 import PlasmaAPA102
plasma = PlasmaAPA102(10, gpio_data=10, gpio_clock=11)
plasma.set_pixel(0, 255, 0, 255)

assert plasma.get_pixel(0) == (255, 0, 255, 1.0)


def test_apa102_set_all(GPIO):
"""Test a pixel can be set."""
from plasma.apa102 import PlasmaAPA102
plasma = PlasmaAPA102(10, gpio_data=10, gpio_clock=11)
plasma.set_all(255, 0, 255)

assert plasma.get_pixel(0) == (255, 0, 255, 1.0)


def test_matrix_set_sequence_dict(config_file, GPIO, rpi_ws281x, serial):
from plasma.apa102 import PlasmaAPA102
plasma = PlasmaAPA102(10, gpio_data=10, gpio_clock=11)
plasma.set_sequence({
0: (255, 0, 0),
2: (0, 255, 0),
4: (0, 0, 255)
})

assert plasma.get_pixel(0) == (255, 0, 0, 1.0)
assert plasma.get_pixel(2) == (0, 255, 0, 1.0)
assert plasma.get_pixel(4) == (0, 0, 255, 1.0)


def test_matrix_set_sequence_list(config_file, GPIO, rpi_ws281x, serial):
from plasma.apa102 import PlasmaAPA102
plasma = PlasmaAPA102(10, gpio_data=10, gpio_clock=11)
plasma.set_sequence([
(255, 0, 0),
(0, 255, 0),
(0, 0, 255)
])

assert plasma.get_pixel(0) == (255, 0, 0, 1.0)
assert plasma.get_pixel(1) == (0, 255, 0, 1.0)
assert plasma.get_pixel(2) == (0, 0, 255, 1.0)


def test_apa102_parse_options():
from plasma.apa102 import PlasmaAPA102

Expand Down
11 changes: 11 additions & 0 deletions library/tests/test_auto.py
Expand Up @@ -24,3 +24,14 @@ def test_get_device_from_argv(argv_valid, GPIO):

assert isinstance(plasma, PlasmaAPA102)


def test_get_device_from_config(config_file, GPIO, rpi_ws281x, serial):
from plasma import auto
from plasma.matrix import PlasmaMatrix
from plasma.apa102 import PlasmaAPA102

plasma = auto("XXXXX", config_file)

assert isinstance(plasma, PlasmaMatrix)
assert isinstance(plasma.get_device("WALL"), PlasmaAPA102)

18 changes: 18 additions & 0 deletions library/tests/test_fx.py
@@ -0,0 +1,18 @@
import pytest


def test_fx_cycle(argv, GPIO):
"""Test that set_sequence supports the output of a PlasmaFX Sequence"""
from plasma import auto
from plasma.apa102 import PlasmaAPA102
from plasmafx import Sequence
from plasmafx.plugins import FXCycle

sequence = Sequence(10)
sequence.set_plugin(0, FXCycle())

plasma = auto("APA102:14:15:pixel_count=10")

plasma.set_sequence(sequence.get_pixels())

assert isinstance(plasma, PlasmaAPA102)

0 comments on commit 56d7c0c

Please sign in to comment.