Skip to content

Latest commit



230 lines (163 loc) · 8.67 KB


File metadata and controls

230 lines (163 loc) · 8.67 KB


A very simple task: Blink an LED

Written by @mikewehr in the mike branch:

Demonstrates the basic structure of a task with one stage, described in the comments throughout the task.

This page is rendered in the docs here in order to provide links to the mentioned objects/classes/etc., but it was written as source code initially and translated to .rst, so the narrative flow is often inverted: text follows code as comments, rather than text introducing and narrating code.


import itertools
import tables
import time
from datetime import datetime

from autopilot.hardware import gpio
from autopilot.tasks import Task
from collections import OrderedDict as odict

class Blink(Task):
    Blink an LED.

        pulse_duration (int, float): Duration the LED should be on, in ms
        pulse_interval (int, float): Duration the LED should be off, in ms


Note that we subclass the :class:`~autopilot.tasks.Task` class (Blink(Task)) to provide us with some methods useful for all Tasks.

Tasks need to have a few class attributes defined to be integrated into the rest of the system See here for more about class vs. instance attributes


STAGE_NAMES = ["pulse"] # type: list
An (optional) list or tuple of names of methods that will be used as stages for the task.

See ``stages`` for more information

PARAMS = odict()
PARAMS['pulse_duration'] = {'tag': 'LED Pulse Duration (ms)', 'type': 'int'}
PARAMS['pulse_interval'] = {'tag': 'LED Pulse Interval (ms)', 'type': 'int'}

PARAMS - A dictionary that specifies the parameters that control the operation of the task -- each task presumably has some range of options that allow slight variations (eg. different stimuli, etc.) on a shared task structure. This dictionary specifies each PARAM as a human-readable tag and a type that is used by the gui to create an appropriate input object. For example:

PARAMS['pulse_duration'] = {'tag': 'LED Pulse Duration (ms)', 'type': 'int'}

When instantiated, these params are passed to the __init__ method.

A :class:`collections.OrderedDict` is used so that parameters can be presented in a predictable way to users.


class TrialData(tables.IsDescription):
    trial_num = tables.Int32Col()
    timestamp_on = tables.StringCol(26)
    timestamp_off = tables.StringCol(26)

TrialData declares the data that will be returned for each "trial" -- or complete set of executed task stages. It is used by the :class:`~autopilot.core.subject.Subject` object to make a data table with the correct data types. Declare each piece of data using a pytables Column descriptor (see for available data types, and the pytables guide: for more information)

For each trial, we'll return two timestamps, the time we turned the LED on, the time we turned it off, and the trial number. Note that we use a 26-character :class:`tables.StringCol` for the timestamps,


    'LEDS': {
        'dLED': gpio.Digital_Out

Declare the hardware that will be used in the task. Each hardware object is specified with a group and an id as nested dictionaries. These descriptions require a set of hardware parameters in the autopilot prefs.json (typically generated by :mod:`autopilot.setup.setup_autopilot` ) with a matching group and id structure. For example, an LED declared like this in the :attr:`~examples.tasks.Blink.HARDWARE` attribute:

HARDWARE = {'LEDS': {'dLED': gpio.Digital_Out}}

requires an entry in prefs.json like this:

"HARDWARE": {"LEDS": {"dLED": {
    "pin": 1,
    "polarity": 1

that will be used to instantiate the :class:`.hardware.gpio.Digital_Out` object, which is then available for use in the task like:



first we call the superclass ('Task')'s initialization method. All tasks should accept *args and **kwargs to pass parameters not explicitly specified by subclass up to the superclass.:

def __init__(self, stage_block=None, pulse_duration=100, pulse_interval=500, *args, **kwargs):
    super(Blink, self).__init__(*args, **kwargs)

    # store parameters given on instantiation as instance attributes
    self.pulse_duration = int(pulse_duration)
    self.pulse_interval = int(pulse_interval)
    self.stage_block = stage_block # type: "threading.Event"

    # This allows us to cycle through the task by just repeatedly calling
    self.stages = itertools.cycle([self.pulse])

Some generator that returns the stage methods that define the operation of the task.

To run a task, the :class:`.pilot.Pilot` object will call each stage function, which can return some dictionary of data (see :meth:`~examples.tasks.Blink.pulse` ) and wait until some flag (:attr:`~examples.tasks.Blink.stage_block` ) is set to compute the next stage. Since in this case we want to call the same method (:meth:`~examples.tasks.Blink.pulse` ) over and over again, we use an :class:`itertools.cycle` object (if we have more than one stage to call in a cycle, we could provide them like itertools.cycle([self.stage_method_1, self.stage_method_2]) . More complex tasks can define a custom generator for finer control over stage progression.:

self.trial_counter = itertools.count()
Some counter to keep track of the trial number

Hardware is initialized by the superclass's :meth:`.Task.init_hardware` method, which creates all the hardware objects defined in :attr:`~examples.tasks.Blink.HARDWARE` according to their parameterization in prefs.json , and makes them available in the :attr:`~examples.tasks.Blink.hardware` dictionary.:

self.logger.debug('Hardware initialized')

All task subclass objects have an :attr:`~autopilot.tasks.Task.logger` -- a :class:`logging.Logger` that allows users to easily debug their tasks and see feedback about their operation. To prevent stdout from getting clogged, logging messages are printed and stored according to the LOGLEVEL pref -- so this message would only appear if LOGLEVEL == "DEBUG":


We set the stage block and never clear it so that the :class:`.Pilot` doesn't wait for a trigger to call the next stage -- it just does it as soon as the previous one completes.

See :meth:`~autopilot.core.pilot.Pilot.run_task` for more detail on this loop.

Stage Methods

def pulse(self, *args, **kwargs):
    Turn an LED on and off according to :attr:`~examples.tasks.Blink.pulse_duration` and :attr:`~examples.tasks.Blink.pulse_interval`

        dict: A dictionary containing the trial number and two timestamps.
    # -------------
    # turn light on

    # use :meth:`.hardware.gpio.Digital_Out.set` method to turn the LED on
    # store the timestamp
    timestamp_on =
    # log status as a debug message
    self.logger.debug('light on')
    # sleep for the pulse_duration
    time.sleep(self.pulse_duration / 1000)

    # ------------
    # turn light off, same as turning it on.

    timestamp_off =
    self.logger.debug('light off')
    time.sleep(self.pulse_interval / 1000)

    # count and store the number of the current trial
    self.current_trial = next(self.trial_counter)

    data = {
        'trial_num': self.current_trial,
        'timestamp_on': timestamp_on,
        'timestamp_off': timestamp_off
    return data

Create the data dictionary to be returned from the stage. Note that each of the keys in the dictionary must correspond to the names of the columns declared in the :attr:`~examples.tasks.Blink.TrialData` descriptor.

At the conclusion of running the task, we will be able to access the data from the run with :meth:`.Subject.get_trial_data`, which will be a :class:`pandas.DataFrame` with a row for each trial, and a column for each of the fields here.

Full Source

.. literalinclude:: ../examples/tasks/
    :language: python