Skip to content

Rotary Encoder via GPIO (EN)

paradadf edited this page Mar 29, 2018 · 22 revisions

(This brief guide and python script is adapted from savetheclocktower’s project found on GitHub.)

Purpose

This script is meant for whom that want a physical hardware volume knob on a Recalbox project, such as arcade machines. This script is useful for arcade cabinets with speakers that don't have their own hardware volume knob, or would have difficulty moving the speaker control knob near the user.

This script utilizes a standard 5 pin rotary encoder and has been tested on the encoder from Adafruit. 5 wires are required for this rotary encoder: three for the knob part (A, B, and ground), and two for the button part (common and ground). Here’s a reference for the Raspberry pi’s GPIO pins.

Description BCM # Board #
knob A GPIO 26 37
knob B GPIO 19 35
knob ground ground pin below GPIO 26 39
button common GPIO 13 33
button ground ground pin opposite GPIO 13 34

You can use whichever pins you want; just update the volume-monitor.sh script if you change them. If you lack the push button in your rotary encoder then leave pins unoccupied. Any ground pin can be used, these pins are just suggested due to proximity.

Volume daemon

The script below works as the following: it listens on the specified pins, and when the knob is turned one way or another, it uses the states of the A and B pins to figure out whether the knob was turned to the left or to the right. That way it knows whether to increase or decrease the system volume in response, which it does with the command-line program amixer.

(1) To install the script into your recalbox: First you need to connect to your recalbox via ssh.

(2) Remount partition on read-write: mount -o remount, rw /

(3) Create/Edit your volume-monitor.py script in /recalbox/scripts via nano: nano /recalbox/scripts/volume-monitor.py

(4) Copy and Paste the script from below, into the file, then save the file by ctrl+x:

#!/usr/bin/env python2

"""
The daemon responsible for changing the volume in response to a turn or press
of the volume knob.
The volume knob is a rotary encoder. It turns infinitely in either direction.
Turning it to the right will increase the volume; turning it to the left will
decrease the volume. The knob can also be pressed like a button in order to
turn muting on or off.
The knob uses two GPIO pins and we need some extra logic to decode it. The
button we can just treat like an ordinary button. Rather than poll
constantly, we use threads and interrupts to listen on all three pins in one
script.
"""

import os
import signal
import subprocess
import sys
import threading

from RPi import GPIO
from multiprocessing import Queue

DEBUG = False

# SETTINGS
# ========

# The two pins that the encoder uses (BCM numbering).
GPIO_A = 26   
GPIO_B = 19

# The pin that the knob's button is hooked up to. If you have no button, set
# this to None.
GPIO_BUTTON = 13 

# The minimum and maximum volumes, as percentages.
#
# The default max is less than 100 to prevent distortion. The default min is
# greater than zero because if your system is like mine, sound gets
# completely inaudible _long_ before 0%. If you've got a hardware amp or
# serious speakers or something, your results will vary.
VOLUME_MIN = 60
VOLUME_MAX = 96

# The amount you want one click of the knob to increase or decrease the
# volume. I don't think that non-integer values work here, but you're welcome
# to try.
VOLUME_INCREMENT = 1

# (END SETTINGS)
# 


# When the knob is turned, the callback happens in a separate thread. If
# those turn callbacks fire erratically or out of order, we'll get confused
# about which direction the knob is being turned, so we'll use a queue to
# enforce FIFO. The callback will push onto a queue, and all the actual
# volume-changing will happen in the main thread.
QUEUE = Queue()

# When we put something in the queue, we'll use an event to signal to the
# main thread that there's something in there. Then the main thread will
# process the queue and reset the event. If the knob is turned very quickly,
# this event loop will fall behind, but that's OK because it consumes the
# queue completely each time through the loop, so it's guaranteed to catch up.
EVENT = threading.Event()

def debug(str):
  if not DEBUG:
    return
  print(str)

class RotaryEncoder:
  """
  A class to decode mechanical rotary encoder pulses.
  Ported to RPi.GPIO from the pigpio sample here: 
  http://abyz.co.uk/rpi/pigpio/examples.html
  """
  
  def __init__(self, gpioA, gpioB, callback=None, buttonPin=None, buttonCallback=None):
    """
    Instantiate the class. Takes three arguments: the two pin numbers to
    which the rotary encoder is connected, plus a callback to run when the
    switch is turned.
    
    The callback receives one argument: a `delta` that will be either 1 or -1.
    One of them means that the dial is being turned to the right; the other
    means that the dial is being turned to the left. I'll be damned if I know
    yet which one is which.
    """
    
    self.lastGpio = None
    self.gpioA    = gpioA
    self.gpioB    = gpioB
    self.callback = callback
    
    self.gpioButton     = buttonPin
    self.buttonCallback = buttonCallback
    
    self.levA = 0
    self.levB = 0
    
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(self.gpioA, GPIO.IN, pull_up_down=GPIO.PUD_UP)
    GPIO.setup(self.gpioB, GPIO.IN, pull_up_down=GPIO.PUD_UP)
    
    GPIO.add_event_detect(self.gpioA, GPIO.BOTH, self._callback)
    GPIO.add_event_detect(self.gpioB, GPIO.BOTH, self._callback)
    
    if self.gpioButton:
      GPIO.setup(self.gpioButton, GPIO.IN, pull_up_down=GPIO.PUD_UP)
      GPIO.add_event_detect(self.gpioButton, GPIO.FALLING, self._buttonCallback, bouncetime=500)
    
    
  def destroy(self):
    GPIO.remove_event_detect(self.gpioA)
    GPIO.remove_event_detect(self.gpioB)
    GPIO.cleanup()
    
  def _buttonCallback(self, channel):
    self.buttonCallback(GPIO.input(channel))
    
  def _callback(self, channel):
    level = GPIO.input(channel)
    if channel == self.gpioA:
      self.levA = level
    else:
      self.levB = level
      
    # Debounce.
    if channel == self.lastGpio:
      return
    
    # When both inputs are at 1, we'll fire a callback. If A was the most
    # recent pin set high, it'll be forward, and if B was the most recent pin
    # set high, it'll be reverse.
    self.lastGpio = channel
    if channel == self.gpioA and level == 1:
      if self.levB == 1:
        self.callback(1)
    elif channel == self.gpioB and level == 1:
      if self.levA == 1:
        self.callback(-1)

class VolumeError(Exception):
  pass

class Volume:
  """
  A wrapper API for interacting with the volume settings on the RPi.
  """
  MIN = VOLUME_MIN
  MAX = VOLUME_MAX
  INCREMENT = VOLUME_INCREMENT
  
  def __init__(self):
    # Set an initial value for last_volume in case we're muted when we start.
    self.last_volume = self.MIN
    self._sync()
  
  def up(self):
    """
    Increases the volume by one increment.
    """
    return self.change(self.INCREMENT)
    
  def down(self):
    """
    Decreases the volume by one increment.
    """
    return self.change(-self.INCREMENT)
    
  def change(self, delta):
    v = self.volume + delta
    v = self._constrain(v)
    return self.set_volume(v)
  
  def set_volume(self, v):
    """
    Sets volume to a specific value.
    """
    self.volume = self._constrain(v)
    output = self.amixer("set 'PCM' unmute {}%".format(v))
    self._sync(output)
    return self.volume
    
  def toggle(self):
    """
    Toggles muting between on and off.
    """
    if self.is_muted:
      output = self.amixer("set 'PCM' unmute")
    else:
      # We're about to mute ourselves, so we should remember the last volume
      # value we had because we'll want to restore it later.
      self.last_volume = self.volume
      output = self.amixer("set 'PCM' mute")
  
    self._sync(output)
    if not self.is_muted:
      # If we just unmuted ourselves, we should restore whatever volume we
      # had previously.
      self.set_volume(self.last_volume)
    return self.is_muted
  
  def status(self):
    if self.is_muted:
      return "{}% (muted)".format(self.volume)
    return "{}%".format(self.volume)
  
  # Read the output of `amixer` to get the system volume and mute state.
  #
  # This is designed not to do much work because it'll get called with every
  # click of the knob in either direction, which is why we're doing simple
  # string scanning and not regular expressions.
  def _sync(self, output=None):
    if output is None:
      output = self.amixer("get 'PCM'")
      
    lines = output.readlines()
    if DEBUG:
      strings = [line.decode('utf8') for line in lines]
      debug("OUTPUT:")
      debug("".join(strings))
    last = lines[-1].decode('utf-8')
    
    # The last line of output will have two values in square brackets. The
    # first will be the volume (e.g., "[95%]") and the second will be the
    # mute state ("[off]" or "[on]").
    i1 = last.rindex('[') + 1
    i2 = last.rindex(']')

    self.is_muted = last[i1:i2] == 'off'
    
    i1 = last.index('[') + 1
    i2 = last.index('%')
    # In between these two will be the percentage value.
    pct = last[i1:i2]

    self.volume = int(pct)
  
  # Ensures the volume value is between our minimum and maximum.
  def _constrain(self, v):
    if v < self.MIN:
      return self.MIN
    if v > self.MAX:
      return self.MAX
    return v
    
  def amixer(self, cmd):
    p = subprocess.Popen("amixer {}".format(cmd), shell=True, stdout=subprocess.PIPE)
    code = p.wait()
    if code != 0:
      raise VolumeError("Unknown error")
      sys.exit(0)
    
    return p.stdout


if __name__ == "__main__":
  
  gpioA = GPIO_A
  gpioB = GPIO_B
  gpioButton = GPIO_BUTTON
  
  v = Volume()
  
  def on_press(value):
    v.toggle()
    print("Toggled mute to: {}".format(v.is_muted))
    EVENT.set()
  
  # This callback runs in the background thread. All it does is put turn
  # events into a queue and flag the main thread to process them. The
  # queueing ensures that we won't miss anything if the knob is turned
  # extremely quickly.
  def on_turn(delta):
    QUEUE.put(delta)
    EVENT.set()
    
  def consume_queue():
    while not QUEUE.empty():
      delta = QUEUE.get()
      handle_delta(delta)
  
  def handle_delta(delta):
    if v.is_muted:
      debug("Unmuting")
      v.toggle()
    if delta == 1:
      vol = v.up()
    else:
      vol = v.down()
    print("Set volume to: {}".format(vol))
    
  def on_exit(a, b):
    print("Exiting...")
    encoder.destroy()
    sys.exit(0)
    
  debug("Volume knob using pins {} and {}".format(gpioA, gpioB))
  
  if gpioButton != None:
    debug("Volume button using pin {}".format(gpioButton))
  
  debug("Initial volume: {}".format(v.volume))

  encoder = RotaryEncoder(GPIO_A, GPIO_B, callback=on_turn, buttonPin=GPIO_BUTTON, buttonCallback=on_press)
  signal.signal(signal.SIGINT, on_exit)
  
  while True:
    # This is the best way I could come up with to ensure that this script
    # runs indefinitely without wasting CPU by polling. The main thread will
    # block quietly while waiting for the event to get flagged. When the knob
    # is turned we're able to respond immediately, but when it's not being
    # turned we're not looping at all.
    # 
    # The 1200-second (20 minute) timeout is a hack; for some reason, if I
    # don't specify a timeout, I'm unable to get the SIGINT handler above to
    # work properly. But if there is a timeout set, even if it's a very long
    # timeout, then Ctrl-C works as intended. No idea why.
    EVENT.wait(1200)
    consume_queue()
EVENT.clear()

(5) Mark script as an executable file: chmod +x /recalbox/scripts/volume-monitor.py

(6) To make the volume-monitor script start up with your system, do the following: touch ~/custom.sh && chmod u+x ~/custom.sh

(7) Open the custom.sh script in the nano editor: nano ~/custom.sh

(8) Finally copy and paste the following into the custom.sh file and save with ctrl+x: python /recalbox/scripts/volume-monitor.py

(9) Restart your Recalbox using the reboot command: reboot

(10) Enjoy your new hardware volume control.

Script Tweaks

These tweaks can all be found under the Settings heading in the Volume-monitor.py script

*TIP! Make sure to wait 3 seconds after the emulation station menu comes up BEFORE touching the rotary encoder. Turning the encoder early can make the script hang and the encoder become unresponsive.

If you want to change the default two pins that the encoder uses, then edit the following lines in the script. Make sure to use the BCM numbering codes. See reference for BCM numbers.

GPIO_A = 26

GPIO_B = 19

If you want to change the pin that the knob's button is hooked up to, then edit the following corresponding line in the script below. Make sure to use the BCM numbering codes. If you have no button, set this to None.

GPIO_BUTTON = 13

If you want to change the range (ie: min & max) that the script modulates the raspberrypi volume, then edit the corresponding line in the script below. The numbers are expressed as percentages. The default max is less than 100 to prevent distortion. The default min is greater than zero because if your system is like mine, sound gets completely inaudible long before 0%. If you've got a hardware amp or serious speakers or something, your results will vary.

VOLUME_MIN = 60

VOLUME_MAX = 96

If you want the volume of your system to change more rapidly and be more sensitive, then edit the corresponding line in the script below. The default setting is 1, change to 2 to double the rate of volume change.

VOLUME_INCREMENT = 1

How to Uninstall Script

Repeat the above Steps #7 and #8, but either delete the added line in Step#8 or commenting it out by adding a hash, such the line reads like below

#python /recalbox/scripts/volume-monitor.py

Credits

I would like to thank Substring for the assistance in making this guide and modification to Recalbox possible. I would also like to thank the rest of the Recalbox dev's for making this wonderful project available. And I would like to thank savetheclocktower for the original project's code and for assisting with the conversion to Python2. If this guide helps you, please post about it in the forum and alert me by using @dh04000 in your message. Thanks.

English

Basic

Advanced


Français

Basique

Avancée


Deutsch

Basic

Fortgeschritten


Español

Basic

Avanzado


Português

Básico

Avançado


Italiano

Di base

Avanzate

Clone this wiki locally
You can’t perform that action at this time.