# Block-processing of audio files
<div align="right"><a href="https://people.epfl.ch/paolo.prandoni">Paolo Prandoni</a>, <a href="https://www.epfl.ch/labs/lcav/">LCAV, EPFL</a></div>
<br />

This notebook shows a simple way to process audio files (in WAV format) in a block-by-block fashion; this is useful in two cases

 1. when you want to process a data stream in real time but you can accept a small processing delay; in this case a buffered approach can yield computational gains
 1. when you need to process a large file that does not fit entirely into memory. 
 
We will use a simple FIR filter as the processing block but of course any other processing algorithm will work. The only caution is to take border effects into account, namely, the processing block must remember its internal state from the previous call when applied to block processing.

In [6]:
import numpy as np
import scipy.signal as sp
import IPython
from scipy.io import wavfile

## 1. A Stateful FIR

The following class implements a generic stateful FIR filter; by "stateful" we mean that the filter can be called on successive chunks of input data (of arbitrary sizes) without border effects. By looking at the formula for the convolution implementing an $N$-tap filter:

$$
    y[n] = \sum_{k=0}^{N-1} h[k]x[n-k]
$$ 

we can see that this can be achieved by storing the last $N-1$ input values from the previous call in a buffer.

In [2]:
class FIR:
    def __init__(self, taps):
        self.taps = np.array(taps, dtype=float)
        self.buflen = len(taps) - 1
        self.buffer = np.zeros(self.buflen, dtype=float)
        
    def reset(self):
        self.buffer = self.buffer * 0.0 
        
    def get_delay(self):
        return (len(self.taps) - 1) / 2
        
    def filter(self, x):
        x = np.array(x)
        # prepend the buffered data and compute output only for full overlaps
        data = np.r_[self.buffer, x]
        y = np.convolve(data, self.taps, mode='valid')
        # update buffer
        if self.buflen > 0:
            self.buffer = data[-self.buflen:]
        return y

## 2. Block audio processing

In this simple function we open a WAV file, print its attributes, load it block by block, process each block and finally write the output to a file. We use the standard library `wave` to parse the header of the wav file. 

Note that in the current implementation, for simplicity, we only handle audio files with 16 bits per sample and we convert stereo files to mono files.

In [3]:
def block_process(in_file, out_file, processing=None, block_size=1024):
    import wave
    with wave.open(in_file) as f_in:
        # returns tuple (nchannels, sampwidth, framerate, nframes, comptype, compname)
        params = f_in.getparams()
        stereo = params[0] == 2
        print("stereo" if stereo else "mono", "file,", params[2], "samples per second")
        print(params[3], "samples total,", params[1], "bytes per sample.")
        # only deal with 16-bit samples
        assert params[1] == 2
        print("reading", block_size, "audio samples per block")
        with wave.open(out_file, 'wb') as f_out:
            # output file same format as input, except always mono
            f_out.setparams((1,) + params[1:])
            while True:
                # read audio one block at a time
                chunk = f_in.readframes(block_size)
                if len(chunk) == 0:
                    break
                if processing is not None:
                    # read 16-bit samples, convert to float
                    audio_data = np.frombuffer(chunk, dtype=np.int16).astype(float)
                    # if stereo, mixdown to mono
                    if stereo:
                        audio_data = (audio_data[0::2] + audio_data[1::2]) * .5
                    # now process
                    audio_data = processing.filter(audio_data)
                    # convert back to 16-bit samples
                    chunk = np.int16(audio_data)
                # write out the processed block
                f_out.writeframes(chunk)  
    print("done")

Let's try it with a simple lowpass filter:

In [4]:
lowpass = FIR(sp.remez(300, [0, 0.05, 0.06, 0.5], [1, 0], [1, 1], Hz=1)) 
block_process('test.wav', 'out.wav', processing=lowpass)

mono file, 44100 samples per second
653808 samples total, 2 bytes per sample.
reading 1024 audio samples per block
done


In [7]:
SF, s = wavfile.read('test.wav')
IPython.display.Audio(s, rate=SF)

In [8]:
SF, s = wavfile.read('out.wav')
IPython.display.Audio(s, rate=SF)