Skip to content

Commit

Permalink
Merge 991cc05 into 8ee5459
Browse files Browse the repository at this point in the history
  • Loading branch information
Rodgers-PAC-Lab committed Mar 29, 2022
2 parents 8ee5459 + 991cc05 commit 99342fd
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 12 deletions.
4 changes: 2 additions & 2 deletions autopilot/setup/scripts.py
Expand Up @@ -49,7 +49,7 @@
'text': 'install system packages necessary for autopilot Pilots? (required if they arent already)',
'commands': [
"sudo apt-get update",
"sudo apt-get install -y build-essential cmake git python3-dev libatlas-base-dev libsamplerate0-dev libsndfile1-dev libreadline-dev libasound-dev i2c-tools libportmidi-dev liblo-dev libhdf5-dev libzmq-dev libffi-dev",
"sudo apt-get install -y build-essential cmake git python3-dev libatlas-base-dev libsamplerate0-dev libsndfile1-dev libreadline-dev libasound-dev i2c-tools libportmidi-dev liblo-dev libhdf5-dev libzmq3-dev libffi-dev",
]
},
'env_terminal': {
Expand Down Expand Up @@ -125,7 +125,7 @@
'type': 'bool',
'text': 'Install jack audio from source, try this if youre having compatibility or runtime issues with jack (required if AUDIOSERVER == jack)',
'commands': [
"git clone git://github.com/jackaudio/jack2 --depth 1",
"git clone https://github.com/jackaudio/jack2 --depth 1",
"cd jack2",
"./waf configure --alsa=yes --libdir=/usr/lib/arm-linux-gnueabihf/",
"./waf build -j6",
Expand Down
173 changes: 164 additions & 9 deletions autopilot/stim/sound/sounds.py
Expand Up @@ -41,9 +41,11 @@
import os
import sys
import typing
from typing import Union, Optional
from time import sleep
from scipy.io import wavfile
from scipy.signal import resample
import scipy.signal
import numpy as np
import threading
from itertools import cycle
Expand All @@ -52,6 +54,7 @@
from autopilot import prefs
from autopilot.stim.sound.base import get_sound_class, Sound
import autopilot
from autopilot.transform.timeseries import Filter_IIR

BASE_CLASS = get_sound_class()

Expand Down Expand Up @@ -97,19 +100,123 @@ def init_sound(self):

self.initialized = True

class Tritone(BASE_CLASS):
"""An unpleasant-sounding tritone"""
PARAMS = ['frequency', 'duration', 'amplitude', 'channel']
type = 'Tritone'

def __init__(self, frequency, duration, amplitude=0.01, channel=None, **kwargs):
"""Initialize a new tritone with specified parameters.
The sound itself is stored as the attribute `self.table`. This can
be 1-dimensional or 2-dimensional, depending on `channel`. If it is
2-dimensional, then each channel is a column.
Args:
frequency (float): frequency of the base tone
duration (float): duration of the sound
amplitude (float): amplitude of the sound as a proportion of 1.
channel (int or None): which channel should be used
If 0, play noise from the first channel
If 1, play noise from the second channel
If None, send the same information to all channels ("mono")
**kwargs: extraneous parameters that might come along with instantiating us
"""
# This calls the base class, which sets server-specific parameters
# like sampling rate
super(Tritone, self).__init__(**kwargs)

# Set the parameters specific to Tritone
self.frequency = float(frequency)
self.duration = float(duration)
self.amplitude = float(amplitude)
try:
self.channel = int(channel)
except TypeError:
self.channel = channel

# Currently only mono or stereo sound is supported
if self.channel not in [None, 0, 1]:
raise ValueError(
"audio channel must be 0, 1, or None, not {}".format(
self.channel))

# Initialize the sound itself
self.init_sound()

def init_sound(self):
"""Defines `self.table`, the waveform that is played.
The way this is generated depends on `self.server_type`, because
parameters like the sampling rate cannot be known otherwise.
The sound is generated and then it is "chunked" (zero-padded and
divided into chunks). Finally `self.initialized` is set True.
"""
# Depends on the server_type
if self.server_type == 'pyo':
raise NotImplementedError("pyo not supported for Tritone")

elif self.server_type == 'jack':
# This calculates the number of samples, using the specified
# duration and the sampling rate from the server, and stores it
# as `self.nsamples`.
self.get_nsamples()

# generate samples
# Generate time
t = np.arange(self.nsamples)

# Generate base tone
data = np.sin(
2 * np.pi * self.frequency * t / self.fs)

# Add on the upper tone, which is sqrt(2) higher
data += np.sin(
2 * np.pi * self.frequency * np.sqrt(2) * t / self.fs)

# The shape of the table depends on `self.channel`
if self.channel is None:
# The table will be 1-dimensional for mono sound
self.table = data

else:
# The table will be 2-dimensional for stereo sound
# Each channel is a column
# Only the specified channel contains data and the other is zero

# Put in correct column
self.table = np.zeros((self.nsamples, 2))
assert self.channel in [0, 1]
self.table[:, self.channel] = data

# Scale by the amplitude
self.table = self.table * self.amplitude

# Convert to float32
self.table = self.table.astype(np.float32)

# Chunk the sound
self.chunk()

# Flag as initialized
self.initialized = True

class Noise(BASE_CLASS):
"""Generates a white noise burst with specified parameters
The `type` attribute is always "Noise".
"""
# These are the parameters of the sound, I think this is used to generate
# sounds automatically for a protocol
PARAMS = ['duration','amplitude', 'channel']
# These are the parameters of the sound
# These can be set in the GUI when generating a Nafc step in a Protocol
PARAMS = ['duration', 'amplitude', 'highpass', 'channel']

# The type of the sound
type='Noise'

def __init__(self, duration, amplitude=0.01, channel=None, **kwargs):
def __init__(self, duration, amplitude=0.01, channel=None,
highpass: Optional[Union[float, Filter_IIR]]=None,
**kwargs):
"""Initialize a new white noise burst with specified parameters.
The sound itself is stored as the attribute `self.table`. This can
Expand All @@ -123,15 +230,36 @@ def __init__(self, duration, amplitude=0.01, channel=None, **kwargs):
If 0, play noise from the first channel
If 1, play noise from the second channel
If None, send the same information to all channels ("mono")
highpass (float, :class:`~.transform.timeseries.Filter_IIR`, or None):
This optionally applies a filter to the Noise.
If None, no filter is applied.
If :class:`~.transform.timeseries.Filter_IIR`, then that
filter is applied.
If float, then a :class:`~.transform.timeseries.Filter_IIR`
is created for you. By default it will be a 2nd-order
Butterworth high-pass with the specified float as `Wn`.
This value should be provided in Hz, not as a fraction
of the Nyquist frequency, because we also provide
the sampling rate to Filter_IIR (and from there to
scipy.signal.iirfilter).
**kwargs: extraneous parameters that might come along with instantiating us
"""
# This calls the base class, which sets server-specific parameters
# like samplign rate
# like sampling rate
super(Noise, self).__init__(**kwargs)

# Set the parameters specific to Noise
self.duration = float(duration)
self.amplitude = float(amplitude)
self._highpass = None
if isinstance(highpass, (float, int)):
self._highpass = float(highpass)
elif isinstance(highpass, Filter_IIR):
if highpass.btype != 'highpass':
self.logger.warning(f'Got a {highpass.btype} filter instead of a highpass, this should still work, but indicates this parameter should probably be renamed...')

self._highpass = highpass

try:
self.channel = int(channel)
except TypeError:
Expand Down Expand Up @@ -165,17 +293,23 @@ def init_sound(self):
# duration and the sampling rate from the server, and stores it
# as `self.nsamples`.
self.get_nsamples()

# Generate the table by sampling from a uniform distribution
data = np.random.uniform(-1, 1, self.nsamples)

if self.highpass is not None:
data = self.highpass.filter(data)

# The shape of the table depends on `self.channel`
if self.channel is None:
# The table will be 1-dimensional for mono sound
self.table = np.random.uniform(-1, 1, self.nsamples)
self.table = data

else:
# The table will be 2-dimensional for stereo sound
# Each channel is a column
# Only the specified channel contains data and the other is zero
data = np.random.uniform(-1, 1, self.nsamples)

self.table = np.zeros((self.nsamples, 2))
assert self.channel in [0, 1]
self.table[:, self.channel] = data
Expand All @@ -193,6 +327,25 @@ def init_sound(self):
# Flag as initialized
self.initialized = True

@property
def highpass(self) -> Union[Filter_IIR, None]:
"""
A highpass filter (see the ``highpass`` argument) applied to generated samples
Returns:
:class:`~.transform.timeseries.Filter_IIR` or None, if none supplied
"""

if isinstance(self._highpass, float):
# If we were given a float, make the filter and stash it
filter = Filter_IIR(ftype='butter', N=2, Wn=self._highpass,
btype="highpass", fs=self.fs)
self._highpass = filter

return self._highpass


def iter_continuous(self) -> typing.Generator:
"""
Continuously yield frames of audio. If this method is not overridden,
Expand All @@ -210,13 +363,15 @@ def iter_continuous(self) -> typing.Generator:

rng = np.random.default_rng()


while True:
if self.channel is None:
table[:] = rng.uniform(-self.amplitude, self.amplitude, self.blocksize)
else:
table[:,self.channel] = rng.uniform(-self.amplitude, self.amplitude, self.blocksize)

if self.highpass is not None:
table = self.highpass.filter(table)

yield table

class File(BASE_CLASS):
Expand Down
27 changes: 27 additions & 0 deletions autopilot/transform/timeseries.py
Expand Up @@ -24,6 +24,8 @@ class Filter_IIR(Transform):
* ``N`` - filter order
* ``Wn`` - array or scalar giving critical frequencies
This should be provided in Hz, not as a fraction of the
Nyquist frequency.
* ``btype`` - type of band: ``['bandpass', 'lowpass', 'highpass', 'bandstop']``
Attributes:
Expand All @@ -40,6 +42,7 @@ def __init__(self, ftype="butter", buffer_size=256, coef_type='sos', axis=0, *ar
self.ftype = ftype
self.coef_type = coef_type
self.axis = axis
self.btype = kwargs.get('btype', 'bandpass')
self.coefs = signal.iirfilter(ftype=self.ftype, output=coef_type, **kwargs)
self.buffer = deque(maxlen=buffer_size)

Expand All @@ -60,6 +63,30 @@ def process(self, input:float):
elif self.coef_type == "sos":
return signal.sosfilt(self.coefs, self.buffer, axis=self.axis)[-1]

def filter(self, input: typing.Union[np.ndarray, typing.List[float]], axis:int=0, **kwargs) -> np.ndarray:
"""
Filter an array using the configured filter without using the single-value buffer.
Since this assumes that the signal is static, uses :func:`scipy.signal.filtfilt` :func:`scipy.signal.sosfiltfilt`
depending on :attr:`.coef_type`
Args:
input (:class:`numpy.ndarray` or list of ints): Input array to filter
axis (int): Axis to filter over (default: ``0`` )
kwargs (dict): Passed to :func:`scipy.signal.filtfilt` :func:`scipy.signal.sosfiltfilt`
Returns:
:class:`numpy.ndarray` filtered array
"""

if self.coef_type == "ba":
return signal.filtfilt(self.coefs[0], self.coefs[1], input, axis=axis, **kwargs)
elif self.coef_type == "sos":
return signal.sosfiltfilt(self.coefs, input, axis=axis, **kwargs)
else:
raise ValueError(f"coef_type must be one of `ba` or `sos`, got {self.coef_type}")



class Gammatone(Transform):
"""
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/installation.rst
Expand Up @@ -62,7 +62,7 @@ in the guided installation of Autopilot. (``python -m autopilot.setup.run_script
libportmidi-dev \
liblo-dev \
libhdf5-dev \
libzmq-dev \
libzmq3-dev \
libffi-dev


Expand Down
28 changes: 28 additions & 0 deletions tests/test_sound.py
Expand Up @@ -67,6 +67,8 @@
import autopilot.external
import autopilot.stim.sound
from autopilot.stim.sound import jackclient, sounds
from autopilot.transform.timeseries import Filter_IIR
from scipy.signal import welch


## Ensure we get the same random sound every time
Expand All @@ -79,6 +81,7 @@
jackclient.FS = sample_rate
jackclient.BLOCKSIZE = block_size


# These are only used in Jack_Sound.__del__
# Setting them here to avoid warnings during garbage collection
# Why is there one global PLAY and STOP, rather than sound-specific?
Expand Down Expand Up @@ -373,6 +376,31 @@ def test_init_multichannel_noise(duration_ms, amplitude, channel,
assert concatted.shape == (len(noise.table) + n_padded_zeros, 2)
assert (concatted[:len(noise.table)] == noise.table).all()

def test_highpass_noise():
"""
Test that a highpass filter can be created from a float as well as explicitly given
"""
filter_freq = sample_rate/4

noise = sounds.Noise(
duration=1000, amplitude=0.5, highpass=filter_freq)
assert isinstance(noise.highpass, Filter_IIR)
assert noise.highpass.btype == 'highpass'

filter = Filter_IIR(ftype='butter', N=2, Wn=filter_freq,
btype="highpass", fs=sample_rate)
noise2 = sounds.Noise(duration=1000, amplitude=0.5, highpass=filter)
assert noise2.highpass is filter

# make sure that the filter works...
# let's just check that the lowest 10 are much smaller than the highest 10
freqs = welch(noise.table, fs=sample_rate)
ratio = np.mean(freqs[1][0:10])/np.mean(freqs[1][-10:])
assert ratio < 0.001




def test_unpadded_gap():
"""
A gap in a continous sound should not be padded (had its last chunk filled with zeros).
Expand Down

0 comments on commit 99342fd

Please sign in to comment.