Skip to content
Browse files

whitespace and pep8 compliance :heart:

  • Loading branch information...
1 parent aee0a31 commit be3d5234dc2d1fbf530fc943b079bbac22b86b95 @ranman ranman committed Nov 26, 2013
Showing with 372 additions and 315 deletions.
  1. +93 −73 BreakfastSerial/BreakfastSerial.py
  2. +203 −174 BreakfastSerial/components.py
  3. +60 −51 BreakfastSerial/util.py
  4. +1 −0 MANIFEST.in
  5. +15 −17 setup.py
View
166 BreakfastSerial/BreakfastSerial.py
@@ -1,94 +1,114 @@
-import os, re, code, threading, pyfirmata
+import os
+import re
+import threading
from time import sleep
+import pyfirmata
+
+
def find_arduino():
- rport = re.compile('usb|acm', re.IGNORECASE)
- ports = filter(rport.search, os.listdir('/dev'))
+ rport = re.compile('usb|acm', re.IGNORECASE)
+ ports = filter(rport.search, os.listdir('/dev'))
+
+ if len(ports) == 0:
+ raise ArduinoNotFoundException
- if len(ports) == 0:
- raise ArduinoNotFoundException
+ print("Connecting to /dev/{0}".format(ports[0]))
+ return "/dev/%s" % ports[0]
- print "Connecting to /dev/%s" % ports[0]
- return "/dev/%s" % ports[0]
class ArduinoNotFoundException(Exception):
- pass
+ pass
+
class FirmataNotOnBoardException(Exception):
- pass
+ pass
+
class Arduino(pyfirmata.Arduino):
- def __init__(self, *args, **kwargs):
- # If no port was supplied, auto detect the arduino
- if len(args) >= 1:
- super(Arduino, self).__init__(*args, **kwargs)
- else:
- newargs = (find_arduino(),)
- super(Arduino, self).__init__(*newargs)
-
- if not self.get_firmata_version():
- raise FirmataNotOnBoardException
-
- self._observers = {}
-
- # Register a new handler for digital messages so we can tell sensors to update
- self.add_cmd_handler(pyfirmata.DIGITAL_MESSAGE, self._handle_digital_message_interceptor)
- self.add_cmd_handler(pyfirmata.ANALOG_MESSAGE, self._handle_analog_message_interceptor)
- self._monitor = Monitor(self)
-
- def _handle_digital_message_interceptor(self, port_nr, lsb, msb):
- self._handle_digital_message(port_nr, lsb, msb)
- self.emit('data') # TODO: Make less generic
-
- def _handle_analog_message_interceptor(self, port_nr, lsb, msb):
- self._handle_analog_message(port_nr, lsb, msb)
- self.emit('data') # TODO: Make less generic
-
- # TODO: Make generic eventemitter class and inherit
- def on(self, event, cb):
- if event not in self._observers:
- self._observers[event] = [cb,]
- else:
- if cb not in self._observers[event]:
- self._observers[event].append(cb)
- else:
- raise ValueError("Observer is already registered to event: ", event)
-
- def off(self, event, cb):
- if event not in self._observers:
- raise KeyError("No observers are registered for the event: ", event)
- else:
- if cb not in self._observers[event]:
- raise ValueError("Observer is not registered for the event: ", event)
- else:
- self._observers[event].remove(cb)
-
- def emit(self, event, *args):
- if event in self._observers:
- for observer in self._observers[event]:
- observer(*args)
+ def __init__(self, *args, **kwargs):
+ # If no port was supplied, auto detect the arduino
+ if len(args) >= 1:
+ super(Arduino, self).__init__(*args, **kwargs)
+ else:
+ newargs = (find_arduino(),)
+ super(Arduino, self).__init__(*newargs)
+
+ if not self.get_firmata_version():
+ raise FirmataNotOnBoardException
+
+ self._observers = {}
+
+ # Register a new handler for digital messages so we can tell sensors to
+ # update
+ self.add_cmd_handler(
+ pyfirmata.DIGITAL_MESSAGE,
+ self._handle_digital_message_interceptor)
+ self.add_cmd_handler(
+ pyfirmata.ANALOG_MESSAGE,
+ self._handle_analog_message_interceptor)
+ self._monitor = Monitor(self)
+
+ def _handle_digital_message_interceptor(self, port_nr, lsb, msb):
+ self._handle_digital_message(port_nr, lsb, msb)
+ self.emit('data') # TODO: Make less generic
+
+ def _handle_analog_message_interceptor(self, port_nr, lsb, msb):
+ self._handle_analog_message(port_nr, lsb, msb)
+ self.emit('data') # TODO: Make less generic
+
+ # TODO: Make generic eventemitter class and inherit
+ def on(self, event, cb):
+ if event not in self._observers:
+ self._observers[event] = [cb, ]
+ else:
+ if cb not in self._observers[event]:
+ self._observers[event].append(cb)
+ else:
+ raise ValueError(
+ "Observer is already registered to event: ",
+ event)
+
+ def off(self, event, cb):
+ if event not in self._observers:
+ raise KeyError(
+ "No observers are registered for the event: ",
+ event)
+ else:
+ if cb not in self._observers[event]:
+ raise ValueError(
+ "Observer is not registered for the event: ",
+ event)
+ else:
+ self._observers[event].remove(cb)
+
+ def emit(self, event, *args):
+ if event in self._observers:
+ for observer in self._observers[event]:
+ observer(*args)
+
class Monitor(threading.Thread):
- def __init__(self, board):
- threading.Thread.__init__(self)
+ def __init__(self, board):
+ threading.Thread.__init__(self)
- self.board = board
- self._shouldContinue = True
+ self.board = board
+ self._shouldContinue = True
- self.setDaemon(True)
- self.start()
+ self.setDaemon(True)
+ self.start()
- def run(self):
- while 1:
- while self.board.bytes_available():
- self.board.iterate()
+ def run(self):
+ while True:
+ while self.board.bytes_available():
+ self.board.iterate()
- sleep(0.004)
+ sleep(0.004)
- if not self._shouldContinue:
- break
+ if not self._shouldContinue:
+ break
- def stop(self):
- self._shouldContinue = False
+ def stop(self):
+ self._shouldContinue = False
View
377 BreakfastSerial/components.py
@@ -1,248 +1,277 @@
-from BreakfastSerial import Arduino
+import re
+import threading
+import pyfirmata
from util import EventEmitter, setInterval, debounce
-import pyfirmata, re, threading
-class ArduinoNotSuppliedException(Exception):
- pass
-
-class ServoOutOfRangeException(Exception):
- pass
-
-class InvalidPercentageException(Exception):
- pass
-class Component(EventEmitter):
-
- def __init__(self, board, pin):
- if not board:
- raise ArduinoNotSuppliedException
+class ArduinoNotSuppliedException(Exception):
+ pass
- super(Component, self).__init__()
- self._board = board
+class ServoOutOfRangeException(Exception):
+ pass
- analog_regex = re.compile('A(\d)')
- match = analog_regex.match(str(pin))
- if match:
- self._pin = self._board.analog[int(match.group(1))]
- else:
- self._pin = self._board.digital[int(pin)]
+class InvalidPercentageException(Exception):
+ pass
- @property
- def value(self): return self._pin.value
-class Sensor(Component):
+class Component(EventEmitter):
- def __init__(self, board, pin):
- super(Sensor, self).__init__(board, pin)
+ def __init__(self, board, pin):
+ if not board:
+ raise ArduinoNotSuppliedException
- self.threshold = 0.01
+ super(Component, self).__init__()
- self._pin.mode = pyfirmata.INPUT
- self._pin.enable_reporting()
+ self._board = board
- self._old_value = self.value
- self._board.on('data', self._handle_data)
+ analog_regex = re.compile('A(\d)')
+ match = analog_regex.match(str(pin))
- def _handle_data(self):
- value = self.value or 0
- high_value = value + self.threshold
- low_value = value - self.threshold
+ if match:
+ self._pin = self._board.analog[int(match.group(1))]
+ else:
+ self._pin = self._board.digital[int(pin)]
- if self._old_value < low_value or self._old_value > high_value:
- self._old_value = value
- self._handle_state_changed()
+ @property
+ def value(self):
+ return self._pin.value
- @debounce(0.005)
- def _handle_state_changed(self):
- self.emit('change')
- def change(self, cb):
- self.on('change', cb)
+class Sensor(Component):
-class Led(Component):
+ def __init__(self, board, pin):
+ super(Sensor, self).__init__(board, pin)
- def __init__(self, board, pin):
- super(Led, self).__init__(board, pin)
- self._isOn = False
- self._interval = None
+ self.threshold = 0.01
- def on(self):
- self._pin.write(1)
- self._isOn = True
- return self
+ self._pin.mode = pyfirmata.INPUT
+ self._pin.enable_reporting()
- def off(self, clear=True):
- self._pin.write(0)
- self._isOn = False
+ self._old_value = self.value
+ self._board.on('data', self._handle_data)
- if self._interval and clear:
- self._interval.clear()
+ def _handle_data(self):
+ value = self.value or 0
+ high_value = value + self.threshold
+ low_value = value - self.threshold
- return self
+ if self._old_value < low_value or self._old_value > high_value:
+ self._old_value = value
+ self._handle_state_changed()
- def toggle(self):
- if self._isOn:
- return self.off(clear=False)
- else:
- return self.on()
+ @debounce(0.005)
+ def _handle_state_changed(self):
+ self.emit('change')
- def blink(self, millis):
- if self._interval:
- self._interval.clear()
+ def change(self, cb):
+ self.on('change', cb)
- self._interval = setInterval(self.toggle, millis)
- def brightness(self, value):
- if int(value) > 100 or int(value) < 0:
- raise InvalidPercentageException
+class Led(Component):
- if self._pin.mode != pyfirmata.PWM:
- self._pin.mode = pyfirmata.PWM
+ def __init__(self, board, pin):
+ super(Led, self).__init__(board, pin)
+ self._isOn = False
+ self._interval = None
- _new_value = value / 100.0
+ def on(self):
+ self._pin.write(1)
+ self._isOn = True
+ return self
- if _new_value == 0:
- self._isOn = False
- else:
- self.isOn = True
+ def off(self, clear=True):
+ self._pin.write(0)
+ self._isOn = False
- self._pin.write(_new_value)
- return self
+ if self._interval and clear:
+ self._interval.clear()
-class RGBLed(EventEmitter):
+ return self
- def __init__(self, board, pins):
- if not board:
- raise ArduinoNotSuppliedException
+ def toggle(self):
+ if self._isOn:
+ return self.off(clear=False)
+ else:
+ return self.on()
- # TODO: Check that pins is dict
+ def blink(self, millis):
+ if self._interval:
+ self._interval.clear()
- super(RGBLed, self).__init__()
+ self._interval = setInterval(self.toggle, millis)
- self._red = Led(board, pins["red"])
- self._green = Led(board, pins["green"])
- self._blue = Led(board, pins["blue"])
+ def brightness(self, value):
+ if int(value) > 100 or int(value) < 0:
+ raise InvalidPercentageException
- def off(self):
- self._red.off(); self._green.off(); self._blue.off()
- return self
+ if self._pin.mode != pyfirmata.PWM:
+ self._pin.mode = pyfirmata.PWM
- def red(self):
- self._red.on(); self._green.off(); self._blue.off()
- return self
+ _new_value = value / 100.0
- def green(self):
- self._red.off(); self._green.on(); self._blue.off()
- return self
+ if _new_value == 0:
+ self._isOn = False
+ else:
+ self.isOn = True
- def blue(self):
- self._red.off(); self._green.off(); self._blue.on()
- return self
+ self._pin.write(_new_value)
+ return self
- def yellow(self):
- self._red.on(); self._green.on(); self._blue.off()
- return self
- def cyan(self):
- self._red.off(); self._green.on(); self._blue.on()
- return self
+class RGBLed(EventEmitter):
- def purple(self):
- self._red.on(); self._green.off(); self._blue.on()
- return self
+ def __init__(self, board, pins):
+ if not board:
+ raise ArduinoNotSuppliedException
+
+ # TODO: Check that pins is dict
+
+ super(RGBLed, self).__init__()
+
+ self._red = Led(board, pins["red"])
+ self._green = Led(board, pins["green"])
+ self._blue = Led(board, pins["blue"])
+
+ def off(self):
+ self._red.off()
+ self._green.off()
+ self._blue.off()
+ return self
+
+ def red(self):
+ self._red.on()
+ self._green.off()
+ self._blue.off()
+ return self
+
+ def green(self):
+ self._red.off()
+ self._green.on()
+ self._blue.off()
+ return self
+
+ def blue(self):
+ self._red.off()
+ self._green.off()
+ self._blue.on()
+ return self
+
+ def yellow(self):
+ self._red.on()
+ self._green.on()
+ self._blue.off()
+ return self
+
+ def cyan(self):
+ self._red.off()
+ self._green.on()
+ self._blue.on()
+ return self
+
+ def purple(self):
+ self._red.on()
+ self._green.off()
+ self._blue.on()
+ return self
+
+ def white(self):
+ self._red.on()
+ self._green.on()
+ self._blue.on()
+ return self
- def white(self):
- self._red.on(); self._green.on(); self._blue.on()
- return self
class Buzzer(Led):
- pass
+ pass
+
class Button(Sensor):
- def __init__(self, board, pin):
- super(Button, self).__init__(board, pin)
- self._old_value = False
- self._timeout = None
+ def __init__(self, board, pin):
+ super(Button, self).__init__(board, pin)
+ self._old_value = False
+ self._timeout = None
- self.change(self._emit_button_events)
+ self.change(self._emit_button_events)
- def _handle_data(self):
- value = self.value
+ def _handle_data(self):
+ value = self.value
- if self._old_value != value:
- self._old_value = value
- # This sucks, wish I could just call Super
- self._handle_state_changed()
+ if self._old_value != value:
+ self._old_value = value
+ # This sucks, wish I could just call Super
+ self._handle_state_changed()
- def _emit_button_events(self):
- if self.value == False:
- if(self._timeout):
- self._timeout.cancel()
+ def _emit_button_events(self):
+ if self.value is False:
+ if(self._timeout):
+ self._timeout.cancel()
- self.emit('up')
- elif self.value == True:
- def emit_hold():
- self.emit('hold')
+ self.emit('up')
+ elif self.value:
+ def emit_hold():
+ self.emit('hold')
- self._timeout = threading.Timer(1, emit_hold)
- self._timeout.start()
+ self._timeout = threading.Timer(1, emit_hold)
+ self._timeout.start()
- self.emit('down')
+ self.emit('down')
- def down(self, cb):
- self.on('down', cb)
+ def down(self, cb):
+ self.on('down', cb)
- def up(self, cb):
- self.on('up', cb)
+ def up(self, cb):
+ self.on('up', cb)
+
+ def hold(self, cb):
+ self.on('hold', cb)
- def hold(self, cb):
- self.on('hold', cb)
class Servo(Component):
- def __init__(self, board, pin):
- super(Servo, self).__init__(board, pin)
- self._pin.mode = pyfirmata.SERVO
+ def __init__(self, board, pin):
+ super(Servo, self).__init__(board, pin)
+ self._pin.mode = pyfirmata.SERVO
+
+ def set_position(self, degrees):
+ if int(degrees) > 180 or int(degrees) < 0:
+ raise ServoOutOfRangeException
+ self._pin.write(degrees)
- def set_position(self, degrees):
- if int(degrees) > 180 or int(degrees) < 0:
- raise ServoOutOfRangeException
- self._pin.write(degrees)
+ def move(self, degrees):
+ self.set_position(self.value + int(degrees))
- def move(self, degrees):
- self.set_position(self.value + int(degrees))
+ def center(self):
+ self.set_position(90)
- def center(self):
- self.set_position(90)
+ def reset(self):
+ self.set_position(0)
- def reset(self):
- self.set_position(0)
class Motor(Component):
- def __init__(self, board, pin):
- super(Motor, self).__init__(board, pin)
- self._speed = 0
- self._pin.mode = pyfirmata.PWM
+ def __init__(self, board, pin):
+ super(Motor, self).__init__(board, pin)
+ self._speed = 0
+ self._pin.mode = pyfirmata.PWM
- def start(self, speed=50):
- self.speed = speed
+ def start(self, speed=50):
+ self.speed = speed
- def stop(self):
- self.speed = 0
+ def stop(self):
+ self.speed = 0
- @property
- def speed(self):
- return self._speed
+ @property
+ def speed(self):
+ return self._speed
- @speed.setter
- def speed(self, speed):
- if int(speed) > 100 or int(speed) < 0:
- raise InvalidPercentageException
+ @speed.setter
+ def speed(self, speed):
+ if int(speed) > 100 or int(speed) < 0:
+ raise InvalidPercentageException
- self._speed = speed
- self._pin.write(speed / 100.0)
- self.emit('change', speed)
+ self._speed = speed
+ self._pin.write(speed / 100.0)
+ self.emit('change', speed)
View
111 BreakfastSerial/util.py
@@ -1,67 +1,76 @@
import threading
from time import sleep
+
class EventEmitter(object):
- def __init__(self, *args, **kwargs):
- self._observers = {}
+ def __init__(self, *args, **kwargs):
+ self._observers = {}
+
+ def on(self, event, cb):
+ if event not in self._observers:
+ self._observers[event] = [cb, ]
+ else:
+ if cb not in self._observers[event]:
+ self._observers[event].append(cb)
+ else:
+ raise ValueError(
+ "Observer is already registered to event: ",
+ event)
- def on(self, event, cb):
- if event not in self._observers:
- self._observers[event] = [cb,]
- else:
- if cb not in self._observers[event]:
- self._observers[event].append(cb)
- else:
- raise ValueError("Observer is already registered to event: ", event)
+ def off(self, event, cb):
+ if event not in self._observers:
+ raise KeyError(
+ "No observers are registered for the event: ",
+ event)
+ else:
+ if cb not in self._observers[event]:
+ raise ValueError(
+ "Observer is not registered for the event: ",
+ event)
+ else:
+ self._observers[event].remove(cb)
- def off(self, event, cb):
- if event not in self._observers:
- raise KeyError("No observers are registered for the event: ", event)
- else:
- if cb not in self._observers[event]:
- raise ValueError("Observer is not registered for the event: ", event)
- else:
- self._observers[event].remove(cb)
+ def emit(self, event, *args):
+ if event in self._observers:
+ for observer in self._observers[event]:
+ observer(*args)
- def emit(self, event, *args):
- if event in self._observers:
- for observer in self._observers[event]:
- observer(*args)
def debounce(wait):
- """ Decorator that will postpone a functions
- execution until after wait seconds
- have elapsed since the last time it was invoked. """
- def decorator(fn):
- def debounced(*args, **kwargs):
- def call_it():
- fn(*args, **kwargs)
- try:
- debounced.t.cancel()
- except(AttributeError):
- pass
- debounced.t = threading.Timer(wait, call_it)
- debounced.t.start()
- return debounced
- return decorator
+ """ Decorator that will postpone a functions
+ execution until after wait seconds
+ have elapsed since the last time it was invoked. """
+ def decorator(fn):
+ def debounced(*args, **kwargs):
+ def call_it():
+ fn(*args, **kwargs)
+ try:
+ debounced.t.cancel()
+ except(AttributeError):
+ pass
+ debounced.t = threading.Timer(wait, call_it)
+ debounced.t.start()
+ return debounced
+ return decorator
+
class setInterval(threading.Thread):
- def __init__(self, func, millis):
- threading.Thread.__init__(self)
- self.event = threading.Event()
- self.func = func
- self.seconds = millis / 1000.0
- self.shouldRun = True
+ def __init__(self, func, millis):
+ threading.Thread.__init__(self)
+ self.event = threading.Event()
+ self.func = func
+ self.seconds = millis / 1000.0
+ self.shouldRun = True
- self.setDaemon(True)
- self.start()
+ self.setDaemon(True)
+ self.start()
- def run(self):
- while self.shouldRun:
- self.func()
- sleep(self.seconds)
+ def run(self):
+ while self.shouldRun:
+ self.func()
+ sleep(self.seconds)
- def clear(self):
- self.shouldRun = False
+ def clear(self):
+ self.shouldRun = False
View
1 MANIFEST.in
@@ -0,0 +1 @@
+include README.md
View
32 setup.py
@@ -1,21 +1,19 @@
from setuptools import setup, find_packages
+
+def readme():
+ with open('README.md') as f:
+ return f.read()
+
setup(
- name = "BreakfastSerial",
- version = "0.0.9",
- description = "Python Framework for interacting with Arduino",
- author = "Swift",
- author_email = "theycallmeswift@gmail.com",
- packages = find_packages(),
+ name="BreakfastSerial",
+ version="0.0.9",
+ description="Python Framework for interacting with Arduino",
+ author="Swift",
+ author_email="theycallmeswift@gmail.com",
+ packages=find_packages(),
install_requires=['pyfirmata'],
- url = "http://github.com/theycallmeswift/BreakfastSerial/",
- keywords = ["arduino","firmata"],
- long_description = """\
- Firmata based framework for interacting with Arduino
- ----------------------------
-
- DESCRIPTION
- BreakfastSerial makes it easy to interact with Arduino boards over serial by using
- the Firmata protocol. See http://www.github.com/theycallmeswift/BreakfastSerial
- for more information.
- """ )
+ url="http://github.com/theycallmeswift/BreakfastSerial/",
+ keywords=["arduino", "firmata"],
+ long_description=readme()
+)

0 comments on commit be3d523

Please sign in to comment.
Something went wrong with that request. Please try again.