diff --git a/haiopy/__init__.py b/haiopy/__init__.py index ab59fe2..11b8af7 100644 --- a/haiopy/__init__.py +++ b/haiopy/__init__.py @@ -3,3 +3,13 @@ __author__ = """The pyfar developers""" __email__ = 'marco.berzborn@akustik.rwth-aachen.de' __version__ = '0.1.0' + +from .devices import AudioDevice +from .io import Playback, Record, PlaybackRecord + +__all__ = [ + 'AudioDevice' + 'Playback', + 'Record', + 'PlaybackRecord' + ] diff --git a/haiopy/io.py b/haiopy/io.py index 9cfa4bc..2d8595a 100644 --- a/haiopy/io.py +++ b/haiopy/io.py @@ -1,57 +1,242 @@ +""" +Playback and recording functionality including classes and convenience +functions. +""" + +import numpy as np +import pyfar as pf + from . import devices +from abc import ABC, abstractmethod -class _AudioIO(object): - def __init__( - self, - device - ): - super().__init__() +class _AudioIO(ABC): + """Abstract class for playback and recording. + + This class holds all the methods and properties that are common to its + three sub-classes :py:class:`Playback`, :py:class:`Record`, and + :py:class:`PlaybackRecord`. + """ + def __init__(self, device, blocking): + if isinstance(device, devices._Device): + self._device = device + self.blocking = blocking + else: + raise ValueError("Incorrect device, needs to be a" + ":py:class:`~haiopy.AudioDevice` object.") + @property + def blocking(self): + """Boolean parameter blocking.""" + return self._blocking + + @blocking.setter + def blocking(self, value): + """Set blocking parameter to True or False.""" + if isinstance(value, bool): + self._blocking = value + else: + raise ValueError("Blocking needs to be True or False.") + + @abstractmethod def start(): + """ This function depends on the use case (playback, recording or + playback and record) and therefore is implemented in the subclasses. + """ pass - def stop(): - pass + def stop(self): + """Immediately terminate the playback/recording.""" + self._device.abort() + print("Playback / Recording terminated.") - def wait(): - pass + def wait(self): + """Wait until playback/recording is finished.""" + self._device.wait() class Playback(_AudioIO): + """Class for playback of signals. + """ def __init__( - self, - device, - input_channels, - repetitions=1, - loop=False): - super().__init__(device=device) - self._output_signal = None + self, device, output_channels, repetitions=1, + output_signal=None, digital_level=0., blocking=False): + """Create a Playback object. + + Parameters + ---------- + device : haiopy.AudioDevice + The device to play the signal. + output_channels : array-like + The output channels. The parameter can be a single number, list, + tuple or a 1D array with unique values. + repetitions : int, optional + Number of repitions, the default ``1``. + output_signal : pyfar.Signal, optional + The signal to be played. The default ``None``, requires the signal + to be set before :py:func:`~play` is called. + digital_level : float, optional + Digital output level (the digital output amplification) in dB + referenced to an amplitude of 1, so only levels <= 0 dB can be set. + The default is ``0``, which results in an unmodified playback. + blocking : bool, optional + If ``True`` :py:func:`~play` function doesn’t return until the + playback is finished. The default is ``False``. + """ + super().__init__(device=device, blocking=blocking) + # Set properties, check implicitly + self.output_channels = output_channels + self.repetitions = repetitions + self.output_signal = output_signal + self.digital_level = digital_level + # Initialize device + self.device.initialize_playback(self.output_channels) + + @property + def device(self): + """Output device.""" + return self._device @property def output_signal(self): + """``pyfar.Signal`` to be played back.""" return self._output_signal @output_signal.setter - def output_signal(self, sig): - self._output_signal = sig + def output_signal(self, signal): + """Set ``pyfar.Signal`` to be played back.""" + if signal is None: + self._output_signal = signal + elif not isinstance(signal, pf.Signal): + raise ValueError("Output signal needs to be a pyfar.Signal.") + elif signal.sampling_rate != self.device.sampling_rate: + raise ValueError( + f"Sampling rates of the signal ({signal.sampling_rate}) " + f"and the device ({self.device.sampling_rate}) " + f"do not match.") + elif signal.dtype != self.device.dtype: + raise ValueError( + f"Datatypes of the signal ({signal.dtype}) " + f"and the device ({self.device.dtype}) " + f"do not match.") + elif signal.cshape != self.output_channels.shape: + raise ValueError( + f"The shapes of the signal ({signal.cshape}) " + f"and the channels ({self.output_channels.shape}) " + f"do not match.") + else: + self._output_signal = signal + + @property + def output_channels(self): + """Output channels.""" + return self._output_channels + + @output_channels.setter + def output_channels(self, channels): + """Set output_channels parameter. It can be a single number, list, + tuple or a 1D array with unique values. + """ + channels_int = np.unique(channels).astype(int) + if np.atleast_1d(channels).shape != channels_int.shape: + raise ValueError("Output_channels must be a single number, list, " + "tuple or a 1D array with unique values.") + elif not np.all(channels == channels_int): + raise ValueError("Parameter output_channels must contain only" + "integers.") + else: + self._output_channels = channels_int + + # Initialize device + self.device.initialize_playback(self._output_channels) + + @property + def repetitions(self): + """Number of repetitions of the playback.""" + return self._repetitions + + @repetitions.setter + def repetitions(self, value): + """Set the number of repetitions of the playback. ``repetitions`` can + be set to a decimal number. The default is ``1``, + which is a single playback.""" + try: + value = float(value) + except (ValueError, TypeError): + raise ValueError("Repetitions must be a scalar number.") + if value > 0: + self._repetitions = value + else: + raise ValueError("Repetitions must be positive.") + + @property + def digital_level(self): + """Digital output level in dB.""" + return self._digital_level + + @digital_level.setter + def digital_level(self, value): + """Set the digital output level in dB. The level is referenced to an + amplitude of 1, so only levels <= 0 dB can be set.""" + try: + level = float(value) + except (ValueError, TypeError): + raise ValueError("The digital level must be single number.") + if level <= 0: + self._digital_level = level + else: + raise ValueError("The digital level must be <= 0.") + + def start(self): + """Start the playback.""" + if self.output_signal is None: + raise ValueError("To start the playback, first set an output " + "signal.") + # Extract time data + data = self.output_signal.time + # Amplification / Attenuations + data = data * 10**(self._digital_level/20) + # Repeat and append + append_idx = int(self.repetitions % 1 * self.output_signal.n_samples) + data_out = np.tile(data, int(self.repetitions)) + data_out = np.append(data_out, data[..., :append_idx], axis=-1) + self.device.playback(data_out) + # Block + if self._blocking: + self.wait() class Record(_AudioIO): def __init__( self, device, - output_channels, + input_channels, duration=None, fft_norm='amplitude', + blocking=False ): - super().__init__(device=device) + super().__init__(device=device, blocking=blocking) self._input_signal = None + self.input_channels = input_channels + + @property + def input_channels(self): + return self._input_channels + + @input_channels.setter + def input_channels(self, channels): + self._input_channels = channels @property def input_signal(self): return self._input_signal + def start(): + """ This function depends on the use case (playback, recording or + playback and record) and therefore is implemented in the subclasses. + """ + pass + class PlaybackRecord(Playback, Record): def __init__( @@ -59,11 +244,24 @@ def __init__( device, input_channels, output_channels, + blocking=False ): - super().__init__( + Record.__init__( + self, device=device, input_channels=input_channels, - output_channels=output_channels) + blocking=blocking) + Playback.__init__( + self, + device=device, + output_channels=output_channels, + blocking=blocking) + + def start(): + """ This function depends on the use case (playback, recording or + playback and record) and therefore is implemented in the subclasses. + """ + pass def playback(): diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a945667 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,11 @@ +import pytest +import numpy as np +from unittest import mock + +import haiopy as hp + + +@pytest.fixture +@mock.patch('haiopy.AudioDevice', autospec=True) +def device_stub(AudioDevice): + return hp.AudioDevice(0, 44100, 512, np.float) diff --git a/tests/test_io.py b/tests/test_io.py new file mode 100644 index 0000000..1b862ab --- /dev/null +++ b/tests/test_io.py @@ -0,0 +1,264 @@ +import pytest +import numpy as np +import pyfar as pf +import numpy.testing as npt + +from unittest import mock + +from haiopy import io +from haiopy import Playback +from haiopy import Record +from haiopy import PlaybackRecord + + +class TestPlayback(): + @mock.patch('haiopy.devices._Device', autospec=True) + def test__AudioIO_init_error(self, device_stub): + """ Test error for instatiating abstract class""" + with pytest.raises(TypeError): + io._AudioIO(0) + + @mock.patch('haiopy.io.isinstance', return_value=True) + def test_init(self, isinstance_mock, device_stub): + output_channels = 0 + Playback(device_stub, output_channels) + + def test_init_default_parameters(self, device_stub): + output_channels = 0 + pb = Playback(device_stub, output_channels) + assert pb._device == device_stub + assert pb._output_channels == output_channels + assert pb._repetitions == 1 + assert pb._output_signal is None + assert pb._blocking is False + + def test_init_set_parameters(self, device_stub): + output_channels = 2 + repetitions = 3 + signal = pf.signals.sine(100, 100) + digital_level = -10 + blocking = True + device_stub.sampling_rate = signal.sampling_rate + device_stub.dtype = signal.dtype + pb = Playback( + device_stub, output_channels, repetitions, signal, + digital_level, blocking) + assert pb._device == device_stub + assert pb._output_channels == output_channels + assert pb._repetitions == repetitions + assert pb._output_signal == signal + assert pb._digital_level == digital_level + assert pb._blocking is blocking + + def test_init_device_error(self): + device = 0 + channels = 0 + with pytest.raises(ValueError, match="Incorrect device"): + Playback(device, channels) + + def test_device_getter(self, device_stub): + pb = Playback(device_stub, 0) + pb._device = 1 + assert pb.device == 1 + + def test_output_channels_getter(self, device_stub): + pb = Playback(device_stub, 0) + pb._output_channels = 1 + assert pb.output_channels == 1 + + @pytest.mark.parametrize("channels", [1, [1, 2], np.array([1, 2]), (1, 2)]) + def test_output_channels_setter(self, device_stub, channels): + pb = Playback(device_stub, 0) + pb.output_channels = channels + assert np.all(pb._output_channels == channels) + assert len(pb._output_channels.shape) == 1 + + @pytest.mark.parametrize( + "channels", [1.1, [1.1, 2], np.array([1.1, 2]), (1.1, 2)]) + def test_output_channels_setter_errors(self, device_stub, channels): + # non integer input + with pytest.raises(ValueError, match="integers"): + Playback(device_stub, channels) + + def test_output_channels_setter_errors_2D(self, device_stub): + # array which is not 2D + ch = np.array([[1, 2], [3, 4]]) + with pytest.raises(ValueError, match="1D array"): + Playback(device_stub, ch) + + @pytest.mark.parametrize("channels", [[1, 1], np.array([1, 1]), (1, 1)]) + def test_output_channels_setter_errors_unique(self, device_stub, channels): + with pytest.raises(ValueError, match="unique"): + Playback(device_stub, channels) + + def test_repetitions_getter(self, device_stub): + pb = Playback(device_stub, 0) + pb._repetitions = 2 + assert pb.repetitions == 2 + + def test_repetitions_setter(self, device_stub): + pb = Playback(device_stub, 0) + pb.repetitions = 2 + assert pb._repetitions == 2 + + @pytest.mark.parametrize("reps", ['a', [1], np.array([1, 2])]) + def test_repetitions_setter_errors_scalar_number(self, device_stub, reps): + pb = Playback(device_stub, 0) + with pytest.raises(ValueError, match="scalar number"): + pb.repetitions = reps + + @pytest.mark.parametrize("reps", [0, -1]) + def test_repetitions_setter_errors_positive(self, device_stub, reps): + pb = Playback(device_stub, 0) + with pytest.raises(ValueError, match="positive"): + pb.repetitions = reps + + def test_output_signal_getter(self, device_stub): + pb = Playback(device_stub, 0) + pb._output_signal = 1 + assert pb.output_signal == 1 + + def test_output_signal_setter(self, device_stub): + # One channel + signal = pf.signals.sine(100, 100) + device_stub.sampling_rate = signal.sampling_rate + device_stub.dtype = signal.dtype + pb = Playback(device_stub, 0) + pb.output_signal = signal + assert pb._output_signal == signal + # Two channels + signal = pf.signals.sine([100, 200], 100) + pb = Playback(device_stub, [0, 1]) + pb.output_signal = signal + assert pb._output_signal == signal + + def test_output_signal_setter_errors(self, device_stub): + device_stub.sampling_rate = 44100 + device_stub.dtype = np.float64 + pb = Playback(device_stub, 0) + # Signal + with pytest.raises(ValueError, match="pyfar.Signal"): + pb.output_signal = 1 + # sampling_rate + with pytest.raises(ValueError, match="Sampling rate"): + signal = pf.Signal( + np.random.rand(100), 1000, dtype=device_stub.dtype) + pb.output_signal = signal + # dtype + with pytest.raises(ValueError, match="Datatypes"): + signal = pf.Signal(np.random.rand(100), 44100, dtype=np.int32) + pb.output_signal = signal + # shape + with pytest.raises(ValueError, match="shapes"): + signal = pf.Signal( + np.array([[1, 2, 3], [4, 5, 6]]), 44100) + pb.output_signal = signal + with pytest.raises(ValueError, match="shapes"): + pb = Playback(device_stub, [0, 1]) + signal = pf.Signal(np.array([[1, 2, 3]]), 44100) + pb.output_signal = signal + + def test_blocking_getter(self, device_stub): + pb = Playback(device_stub, 0) + pb._blocking = 1 + assert pb.blocking == 1 + + def test_blocking_setter(self, device_stub): + pb = Playback(device_stub, 0) + pb.blocking = True + assert pb._blocking is True + + def test_blocking_setter_errors(self, device_stub): + pb = Playback(device_stub, 0) + with pytest.raises(ValueError, match="True or False"): + pb.blocking = 1 + + def test_digital_level_getter(self, device_stub): + pb = Playback(device_stub, 0) + pb._digital_level = 'a' + assert pb.digital_level == 'a' + + def test_digital_level_setter(self, device_stub): + pb = Playback(device_stub, 0) + pb.digital_level = -10 + assert pb._digital_level == -10 + + @pytest.mark.parametrize("level", ['a', (1, 2), [1, 2], np.array([1, 2])]) + def test_digital_level_setter_errors_number(self, device_stub, level): + pb = Playback(device_stub, 0) + with pytest.raises(ValueError, match="single number"): + pb.digital_level = level + + def test_digital_level_setter_errors_positive(self, device_stub): + pb = Playback(device_stub, 0) + with pytest.raises(ValueError, match="<= 0"): + pb.digital_level = 10 + + @pytest.mark.parametrize( + "data, repetitions, expected", + [(np.array([[0., 1., 2.]]), 1, np.array([[0., 1., 2.]])), + (np.array([[0., 1., 2.]]), 1.5, np.array([[0., 1., 2., 0.]])), + (np.array([[0., 1., 2.]]), 1.7, np.array([[0., 1., 2., 0., 1.]])), + (np.array([[0., 1., 2., 3.]]), 1.5, + np.array([[0., 1., 2., 3., 0., 1.]])), + (np.array([[0, 1, 2], [3, 4, 5]]), 1.5, + np.array([[0., 1., 2., 0.], [3., 4., 5., 3.]]))]) + def test_start_repetitions(self, device_stub, data, repetitions, expected): + signal = pf.Signal(data, 44100) + channels = np.arange(data.shape[0]) + device_stub.sampling_rate = signal.sampling_rate + device_stub.dtype = signal.dtype + pb = Playback(device_stub, channels, repetitions, signal) + with mock.patch.object( + device_stub, 'playback', + new=lambda x: npt.assert_array_equal(x, expected)): + pb.start() + + def test_start_digital_level(self, device_stub): + signal = pf.Signal([1, 2, 3], 44100) + device_stub.sampling_rate = signal.sampling_rate + device_stub.dtype = signal.dtype + pb = Playback( + device=device_stub, output_channels=0, repetitions=2, + output_signal=signal, digital_level=-20) + expected = np.array([[.1, .2, .3, .1, .2, .3]]) + with mock.patch.object( + device_stub, 'playback', + new=lambda x: npt.assert_allclose(x, expected, atol=1e-15)): + pb.start() + + def test_start_missing_signal(self, device_stub): + pb = Playback(device_stub, 0) + with pytest.raises(ValueError, match="set an output signal"): + pb.start() + + def test_start_blocking(self, device_stub): + signal = pf.Signal([1, 2, 3], 44100) + device_stub.sampling_rate = signal.sampling_rate + device_stub.dtype = signal.dtype + pb = Playback(device_stub, 0, output_signal=signal, blocking=True) + with mock.patch.object(device_stub, 'wait') as wait_mock: + pb.start() + wait_mock.assert_called_once() + + def test_stop(self, device_stub): + pb = Playback(device_stub, 0) + with mock.patch.object(device_stub, 'abort') as abort_mock: + pb.stop() + abort_mock.assert_called_once() + + +class TestRecord: + def test_init_device_error(self): + device = 0 + channels = 0 + with pytest.raises(ValueError, match="Incorrect device"): + Record(device, channels) + + +class TestPlaybackRecord: + def test_init_device_error(self): + device = 0 + channels = 0 + with pytest.raises(ValueError, match="Incorrect device"): + PlaybackRecord(device, channels, channels)