In [1]:
import os 
os.chdir('/Users/nalex2023/main/automation1.0/amb_display/audio_based/python/')
import time
import numpy as np
import pyaudio
import config

In [2]:
def start_stream(callback):
    p = pyaudio.PyAudio()
    frames_per_buffer = int(config.MIC_RATE / config.FPS)

    # Open the stream with optimized settings
    stream = p.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=config.MIC_RATE,
        input=True,
        frames_per_buffer=frames_per_buffer
    )

    overflows = 0
    prev_ovf_time = time.time()

    # Main loop for reading and processing audio data
    while True:
        try:
            # Non-blocking read to avoid delay
            y = np.frombuffer(stream.read(frames_per_buffer, exception_on_overflow=False), dtype=np.int16)
            y = y.astype(np.float32)

            # Avoid unnecessary reads (remove second stream.read)
            available = stream.get_read_available()
            if available > 0:
                stream.read(available, exception_on_overflow=False)

            # Pass the audio data to the callback function
            callback(y)

        except IOError:
            overflows += 1
            if time.time() > prev_ovf_time + 1:
                prev_ovf_time = time.time()
                print(f'Audio buffer has overflowed {overflows} times')

    # Close the stream properly
    stream.stop_stream()
    stream.close()
    p.terminate()

In [3]:
from __future__ import print_function
import numpy as np
import config
import melbank


class ExpFilter:
    """Simple exponential smoothing filter"""
    def __init__(self, val=0.0, alpha_decay=0.5, alpha_rise=0.5):
        """Small rise / decay factors = more smoothing"""
        assert 0.0 < alpha_decay < 1.0, 'Invalid decay smoothing factor'
        assert 0.0 < alpha_rise < 1.0, 'Invalid rise smoothing factor'
        self.alpha_decay = alpha_decay
        self.alpha_rise = alpha_rise
        self.value = val

    def update(self, value):
        if isinstance(self.value, (list, np.ndarray, tuple)):
            alpha = value - self.value
            alpha[alpha > 0.0] = self.alpha_rise
            alpha[alpha <= 0.0] = self.alpha_decay
        else:
            alpha = self.alpha_rise if value > self.value else self.alpha_decay
        self.value = alpha * value + (1.0 - alpha) * self.value
        return self.value


def rfft(data, window=None):
    window = 1.0 if window is None else window(len(data))
    ys = np.abs(np.fft.rfft(data * window))
    xs = np.fft.rfftfreq(len(data), 1.0 / config.MIC_RATE)
    return xs, ys


def fft(data, window=None):
    window = 1.0 if window is None else window(len(data))
    ys = np.fft.fft(data * window)
    xs = np.fft.fftfreq(len(data), 1.0 / config.MIC_RATE)
    return xs, ys


def create_mel_bank():
    global samples, mel_y, mel_x
    samples = int(config.MIC_RATE * config.N_ROLLING_HISTORY / (2.0 * config.FPS))
    mel_y, (_, mel_x) = melbank.compute_melmat(num_mel_bands=config.N_FFT_BINS,
                                               freq_min=config.MIN_FREQUENCY,
                                               freq_max=config.MAX_FREQUENCY,
                                               num_fft_bands=samples,
                                               sample_rate=config.MIC_RATE)
samples = None
mel_y = None
mel_x = None


In [4]:
from numpy import abs, append, arange, insert, linspace, log10, round, zeros


def hertz_to_mel(freq):
    """Returns mel-frequency from linear frequency input.
    Parameter
    ---------
    freq : scalar or ndarray
        Frequency value or array in Hz.
    Returns
    -------
    mel : scalar or ndarray
        Mel-frequency value or ndarray in Mel
    """
    return 2595.0 * log10(1 + (freq / 700.0))


def mel_to_hertz(mel):
    """Returns frequency from mel-frequency input.
    Parameter
    ---------
    mel : scalar or ndarray
        Mel-frequency value or ndarray in Mel
    Returns
    -------
    freq : scalar or ndarray
        Frequency value or array in Hz.
    """
    return 700.0 * (10**(mel / 2595.0)) - 700.0


def melfrequencies_mel_filterbank(num_bands, freq_min, freq_max, num_fft_bands):
    """Returns centerfrequencies and band edges for a mel filter bank
    Parameters
    ----------
    num_bands : int
        Number of mel bands.
    freq_min : scalar
        Minimum frequency for the first band.
    freq_max : scalar
        Maximum frequency for the last band.
    num_fft_bands : int
        Number of fft bands.
    Returns
    -------
    center_frequencies_mel : ndarray
    lower_edges_mel : ndarray
    upper_edges_mel : ndarray
    """

    mel_max = hertz_to_mel(freq_max)
    mel_min = hertz_to_mel(freq_min)
    delta_mel = abs(mel_max - mel_min) / (num_bands + 1.0)
    frequencies_mel = mel_min + delta_mel * arange(0, num_bands + 2)
    lower_edges_mel = frequencies_mel[:-2]
    upper_edges_mel = frequencies_mel[2:]
    center_frequencies_mel = frequencies_mel[1:-1]
    return center_frequencies_mel, lower_edges_mel, upper_edges_mel


def compute_melmat(num_mel_bands=12, freq_min=64, freq_max=8000,
                   num_fft_bands=513, sample_rate=16000):
    """Returns tranformation matrix for mel spectrum.
    Parameters
    ----------
    num_mel_bands : int
        Number of mel bands. Number of rows in melmat.
        Default: 24
    freq_min : scalar
        Minimum frequency for the first band.
        Default: 64
    freq_max : scalar
        Maximum frequency for the last band.
        Default: 8000
    num_fft_bands : int
        Number of fft-frequenc bands. This ist NFFT/2+1 !
        number of columns in melmat.
        Default: 513   (this means NFFT=1024)
    sample_rate : scalar
        Sample rate for the signals that will be used.
        Default: 44100
    Returns
    -------
    melmat : ndarray
        Transformation matrix for the mel spectrum.
        Use this with fft spectra of num_fft_bands_bands length
        and multiply the spectrum with the melmat
        this will tranform your fft-spectrum
        to a mel-spectrum.
    frequencies : tuple (ndarray <num_mel_bands>, ndarray <num_fft_bands>)
        Center frequencies of the mel bands, center frequencies of fft spectrum.
    """
    center_frequencies_mel, lower_edges_mel, upper_edges_mel =  \
        melfrequencies_mel_filterbank(
            num_mel_bands,
            freq_min,
            freq_max,
            num_fft_bands
        )

    center_frequencies_hz = mel_to_hertz(center_frequencies_mel)
    lower_edges_hz = mel_to_hertz(lower_edges_mel)
    upper_edges_hz = mel_to_hertz(upper_edges_mel)
    freqs = linspace(0.0, sample_rate / 2.0, num_fft_bands)
    melmat = zeros((num_mel_bands, num_fft_bands))

    for imelband, (center, lower, upper) in enumerate(zip(
            center_frequencies_hz, lower_edges_hz, upper_edges_hz)):

        left_slope = (freqs >= lower) == (freqs <= center)
        melmat[imelband, left_slope] = (
            (freqs[left_slope] - lower) / (center - lower)
        )

        right_slope = (freqs >= center) == (freqs <= upper)
        melmat[imelband, right_slope] = (
            (upper - freqs[right_slope]) / (upper - center)
        )

    return melmat, (center_frequencies_mel, freqs)


In [None]:
from __future__ import print_function, division
import time
import numpy as np
from scipy.ndimage import gaussian_filter1d

from PyQt5.QtWidgets import QApplication
import pyqtgraph as pg
create_mel_bank()
_time_prev = time.time() * 1000.0  # Used to calculate FPS
_fps = ExpFilter(val=config.FPS, alpha_decay=0.2, alpha_rise=0.2)  # Low-pass filter for FPS estimation

def memoize(function):
    """Provides a decorator for memoizing functions"""
    from functools import wraps
    memo = {}

    @wraps(function)
    def wrapper(*args):
        if args in memo:
            return memo[args]
        else:
            rv = function(*args)
            memo[args] = rv
            return rv
    return wrapper


@memoize
def _normalized_linspace(size):
    return np.linspace(0, 1, size)



def frames_per_second():
    global _time_prev, _fps
    time_now = time.time() * 1000.0
    dt = time_now - _time_prev
    _time_prev = time_now
    if dt == 0.0:
        return _fps.value
    return _fps.update(1000.0 / dt)

def interpolate(y, new_length):
    """Intelligently resizes the array by linearly interpolating the values

    Parameters
    ----------
    y : np.array
        Array that should be resized

    new_length : int
        The length of the new interpolated array

    Returns
    -------
    z : np.array
        New array with length of new_length that contains the interpolated
        values of y.
    """
    if len(y) == new_length:
        return y
    x_old = _normalized_linspace(len(y))
    x_new = _normalized_linspace(new_length)
    z = np.interp(x_new, x_old, y)
    return z

# Visualizer function for spectrum
_prev_spectrum = np.tile(0.01, config.N_PIXELS // 2)
def visualize_spectrum(y):
    global _prev_spectrum
    y = interpolate(y, config.N_PIXELS // 2)
    diff = y - _prev_spectrum
    _prev_spectrum = np.copy(y)
    r = np.abs(diff)
    g = y
    b = gaussian_filter1d(y, sigma=1.0)
    output = np.array([r, g, b]) * 255
    return output


r_filt = ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
                       alpha_decay=0.2, alpha_rise=0.99)
g_filt = ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
                       alpha_decay=0.05, alpha_rise=0.3)
b_filt = ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
                       alpha_decay=0.1, alpha_rise=0.5)
common_mode = ExpFilter(np.tile(0.01, config.N_PIXELS // 2),
                       alpha_decay=0.99, alpha_rise=0.01)
p_filt = ExpFilter(np.tile(1, (3, config.N_PIXELS // 2)),
                       alpha_decay=0.1, alpha_rise=0.99)
p = np.tile(1.0, (3, config.N_PIXELS // 2))
gain = ExpFilter(np.tile(0.01, config.N_FFT_BINS),
                     alpha_decay=0.001, alpha_rise=0.99)


def visualize_scroll(y):
    """Effect that originates in the center and scrolls outwards"""
    global p
    y = y**2.0
    gain.update(y)
    y /= gain.value
    y *= 255.0
    r = int(np.max(y[:len(y) // 3]))
    g = int(np.max(y[len(y) // 3: 2 * len(y) // 3]))
    b = int(np.max(y[2 * len(y) // 3:]))
    # Scrolling effect window
    p[:, 1:] = p[:, :-1]
    p *= 0.98
    p = gaussian_filter1d(p, sigma=0.2)
    # Create new color originating at the center
    p[0, 0] = r
    p[1, 0] = g
    p[2, 0] = b
    # Update the LED strip
    return np.concatenate((p[:, ::-1], p), axis=1)


def visualize_energy(y):
    """Effect that expands from the center with increasing sound energy"""
    global p
    y = np.copy(y)
    gain.update(y)
    y /= gain.value
    # Scale by the width of the LED strip
    y *= float((config.N_PIXELS // 2) - 1)
    # Map color channels according to energy in the different freq bands
    scale = 0.9
    r = int(np.mean(y[:len(y) // 3]**scale))
    g = int(np.mean(y[len(y) // 3: 2 * len(y) // 3]**scale))
    b = int(np.mean(y[2 * len(y) // 3:]**scale))
    # Assign color to different frequency regions
    p[0, :r] = 255.0
    p[0, r:] = 0.0
    p[1, :g] = 255.0
    p[1, g:] = 0.0
    p[2, :b] = 255.0
    p[2, b:] = 0.0
    p_filt.update(p)
    p = np.round(p_filt.value)
    # Apply substantial blur to smooth the edges
    p[0, :] = gaussian_filter1d(p[0, :], sigma=4.0)
    p[1, :] = gaussian_filter1d(p[1, :], sigma=4.0)
    p[2, :] = gaussian_filter1d(p[2, :], sigma=4.0)
    # Set the new pixel value
    return np.concatenate((p[:, ::-1], p), axis=1)


_prev_spectrum = np.tile(0.01, config.N_PIXELS // 2)

def visualize_spectrum(y):
    """Effect that maps the Mel filterbank frequencies onto the LED strip"""
    global _prev_spectrum
    y = np.copy(interpolate(y, config.N_PIXELS // 2))
    common_mode.update(y)
    diff = y - _prev_spectrum
    _prev_spectrum = np.copy(y)
    # Color channel mappings
    r = r_filt.update(y - common_mode.value)
    g = np.abs(diff)
    b = b_filt.update(np.copy(y))
    # Mirror the color channels for symmetric output
    r = np.concatenate((r[::-1], r))
    g = np.concatenate((g[::-1], g))
    b = np.concatenate((b[::-1], b))
    output = np.array([r, g,b]) * 255
    return output


fft_plot_filter = ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
                         alpha_decay=0.5, alpha_rise=0.99)
mel_gain = ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
                         alpha_decay=0.01, alpha_rise=0.99)
mel_smoothing = ExpFilter(np.tile(1e-1, config.N_FFT_BINS),
                         alpha_decay=0.5, alpha_rise=0.99)
volume = ExpFilter(config.MIN_VOLUME_THRESHOLD,
                       alpha_decay=0.02, alpha_rise=0.02)
fft_window = np.hamming(int(config.MIC_RATE / config.FPS) * config.N_ROLLING_HISTORY)
prev_fps_update = time.time()


def microphone_update(audio_samples):
    global y_roll, prev_rms, prev_exp, prev_fps_update
    # Normalize samples between 0 and 1
    y = audio_samples / 2.0**15
    # Construct a rolling window of audio samples
    y_roll[:-1] = y_roll[1:]
    y_roll[-1, :] = np.copy(y)
    y_data = np.concatenate(y_roll, axis=0).astype(np.float32)
    
    vol = np.max(np.abs(y_data))
    if vol < config.MIN_VOLUME_THRESHOLD:
        print('No audio input. Volume below threshold. Volume:', vol)
        
    else:
        # Transform audio input into the frequency domain
        N = len(y_data)
        N_zeros = 2**int(np.ceil(np.log2(N))) - N
        # Pad with zeros until the next power of two
        y_data *= fft_window
        y_padded = np.pad(y_data, (0, N_zeros), mode='constant')
        YS = np.abs(np.fft.rfft(y_padded)[:N // 2])
        # Construct a Mel filterbank from the FFT data
        mel = np.atleast_2d(YS).T * mel_y.T
        # Scale data to values more suitable for visualization
        # mel = np.sum(mel, axis=0)
        mel = np.sum(mel, axis=0)
        mel = mel**2.0
        # Gain normalization
        mel_gain.update(np.max(gaussian_filter1d(mel, sigma=1.0)))
        mel /= mel_gain.value
        mel = mel_smoothing.update(mel)
        # Map filterbank output onto LED strip
        output = visualization_effect(mel)
        if config.USE_GUI:
            # Plot filterbank output
            x = np.linspace(config.MIN_FREQUENCY, config.MAX_FREQUENCY, len(mel))
            mel_curve.setData(x=x, y=fft_plot_filter.update(mel))
            # Plot the color channels
            r_curve.setData(y=output[0])
            g_curve.setData(y=output[1])
            b_curve.setData(y=output[2])
    if config.USE_GUI:
        app.processEvents()
    
samples_per_frame = int(config.MIC_RATE / config.FPS)

y_roll = np.random.rand(config.N_ROLLING_HISTORY, samples_per_frame) / 1e16
visualization_effect = visualize_spectrum


if __name__ == '__main__':
    if config.USE_GUI:
        # Create the PyQtGraph Application
        app = QApplication([])

        # Set up the main window and layout
        view = pg.GraphicsView()
        layout = pg.GraphicsLayout(border=(100, 100, 100))
        view.setCentralItem(layout)
        view.show()
        view.setWindowTitle('Audio Visualization')
        view.resize(1000, 600)  # Adjust the window size

        ### Left-Side Controls with Box for Buttons ###
        left_controls = layout.addLayout(row=0, col=0)  # Create left-side layout

        # Create a Sub-Layout (Box) for the Effect Buttons
        button_box = pg.GraphicsLayout(border=(200, 200, 200))  # Gray border for box

        # Add Buttons to the Box Layout
        active_color = '#16dbeb'
        inactive_color = '#FFFFFF'

        def energy_click(_):
            global visualization_effect
            visualization_effect = visualize_energy
            energy_label.setText('Energy', color=active_color)
            scroll_label.setText('Scroll', color=inactive_color)
            spectrum_label.setText('Spectrum', color=inactive_color)

        def scroll_click(_):
            global visualization_effect
            visualization_effect = visualize_scroll
            energy_label.setText('Energy', color=inactive_color)
            scroll_label.setText('Scroll', color=active_color)
            spectrum_label.setText('Spectrum', color=inactive_color)

        def spectrum_click(_):
            global visualization_effect
            visualization_effect = visualize_spectrum
            energy_label.setText('Energy', color=inactive_color)
            scroll_label.setText('Scroll', color=inactive_color)
            spectrum_label.setText('Spectrum', color=active_color)

        # Create Labels for the Buttons
        energy_label = pg.LabelItem('Energy')
        scroll_label = pg.LabelItem('Scroll')
        spectrum_label = pg.LabelItem('Spectrum')

        # Connect Buttons to Click Events
        energy_label.mousePressEvent = energy_click
        scroll_label.mousePressEvent = scroll_click
        spectrum_label.mousePressEvent = spectrum_click

        # Add Buttons to the Box Layout
        button_box.addItem(energy_label)
        button_box.nextRow()
        button_box.addItem(scroll_label)
        button_box.nextRow()
        button_box.addItem(spectrum_label)

        # Add the Box to the Left Controls Layout
        left_controls.addItem(button_box, col=0)

        # Set Default Effect
        energy_click(0)

        ### Right-Side Layout for Plots ###
        plot_layout = layout.addLayout(row=0, col=1, colspan=2)

        # Filterbank Plot
        fft_plot = plot_layout.addPlot(title='Filterbank Output')
        fft_plot.setRange(yRange=[-0.1, 1.2])
        fft_plot.disableAutoRange(axis=pg.ViewBox.YAxis)

        # Data Curve for Filterbank Output
        x_data = np.array(range(1, config.N_FFT_BINS + 1))
        mel_curve = pg.PlotCurveItem()
        mel_curve.setData(x=x_data, y=x_data * 0)
        fft_plot.addItem(mel_curve)

        # LED Visualization Plot
        plot_layout.nextRow()
        led_plot = plot_layout.addPlot(title='Visualization Output')
        led_plot.setRange(yRange=[-5, 260])
        led_plot.disableAutoRange(axis=pg.ViewBox.YAxis)

        # RGB Channel Curves
        r_pen = pg.mkPen((255, 30, 30, 200), width=4)  # Red
        g_pen = pg.mkPen((30, 255, 30, 200), width=4)  # Green
        b_pen = pg.mkPen((30, 30, 255, 200), width=4)  # Blue

        r_curve = pg.PlotCurveItem(pen=r_pen)
        g_curve = pg.PlotCurveItem(pen=g_pen)
        b_curve = pg.PlotCurveItem(pen=b_pen)

        x_data = np.array(range(1, config.N_PIXELS + 1))
        r_curve.setData(x=x_data, y=x_data * 0)
        g_curve.setData(x=x_data, y=x_data * 0)
        b_curve.setData(x=x_data, y=x_data * 0)

        led_plot.addItem(r_curve)
        led_plot.addItem(g_curve)
        led_plot.addItem(b_curve)

        ### Horizontal Frequency Slider at the Bottom ###
        layout.nextRow()  # Move to a new row for the slider
        freq_slider = pg.TickSliderItem(orientation='bottom', allowAdd=False)

        # Frequency Slider Callback
        def freq_slider_change(tick):
            minf = freq_slider.tickValue(0) ** 2.0 * (config.MIC_RATE / 2.0)
            maxf = freq_slider.tickValue(1) ** 2.0 * (config.MIC_RATE / 2.0)
            t = f'Frequency range: {minf:.0f} - {maxf:.0f} Hz'
            freq_label.setText(t)
            config.MIN_FREQUENCY = minf
            config.MAX_FREQUENCY = maxf
            create_mel_bank()

        freq_slider.tickMoveFinished = freq_slider_change

        # Add Initial Ticks for Frequency Range
        freq_slider.addTick((config.MIN_FREQUENCY / (config.MIC_RATE / 2.0)) ** 0.5)
        freq_slider.addTick((config.MAX_FREQUENCY / (config.MIC_RATE / 2.0)) ** 0.5)

        # Add the Slider and Frequency Label to the Layout
        freq_label = pg.LabelItem('')
        layout.addItem(freq_label, row=2, col=0)  # Add label below the buttons
        layout.addItem(freq_slider, row=2, col=1, colspan=2)  # Slider spans two columns

        # Start the Microphone Stream and Visualization
        print(f"Mel filterbank shape: {mel_y.T.shape}")
        start_stream(microphone_update)




Mel filterbank shape: (735, 24)
