# pyA - Python Audio Coding Package Examples
(c) 2019 by Thomas Hermann, Bielefeld University, Bielefeld, Germany

In [None]:
from pya import *

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from ipywidgets import interact, fixed, widgets
%matplotlib qt5
%matplotlib inline

## Asig Audio Signal class

In [None]:
# help(Asig)  # uncomment to see details

### Creating Audio Signals as Asig instances

* An Asig(sig, sr, label) can be created by passing as sig
    * (1) a numpy ndarray,
        * the fast index is for time, the slow index for channels
        * sr=44100 is the default sampling rate in Hz, if no other value is given
    * (2) a filename,
        * the file is loaded via scipy.io.loadwav, 
        * converted to float64 within [-1,1] without normalization, 
        * sampling rate sr is taken from the file. 
        * Multi-channel audio is supported.
    * (3) an integer
        * an empty (zero) signal with the given number of samples is created
    * (4) a float
        * an empty (zero) signal of given duration is created

In [None]:
# load an impact sound 
asnap = Asig("samples/snap.wav", label='snap')
asnap

* the \_\_repr\_\_() reports basic channels x samples @ sampling_rate = duration

In [None]:
# load a speech sample
aword = Asig("samples/sonification.wav", label='word')
aword

In [None]:
# create a signal from data
anoise = Asig(np.random.randn(44100), sr=44100, label='noise')
anoise

In [None]:
# record an audio signal
arec = Asig(record(2.0), label='rec').norm()

In [None]:
# create 2s silence at default sr
Asig(2.0, label='sonification')

### Plot and play Asigs

* play(rate=1, block=False) allows to control the rate
    * 1=original rate, 2=twice as fast, 0.5=half time
    * internally the signal is resampled before sending to audio output
    * currently simpleaudio is used for playback
    * by default the sound is non-blocking
* play returns self but sets the key 'play' in dict self._ with the player instance

In [None]:
asnap.play()  # try with 0.5, 0.1

In [None]:
aword.play(0.7)  # try with 0.5, 0.1

* plot(fn=None, **kwargs):
    * fn either accepts a string e.g. 'db' for decibel plotting
        * this assumes 16bit=96dB for full signal
    * or fn can be a function for custom warping
* plot plots the signal using matplotlib
    * return value is the lines
    * kwargs are propagated to the plot()
* plot returns self but sets the 'plot' key in dict _ with the matplotlib.lines.Line2D 

In [None]:
aword.plot(lambda x: ampdb(abs(x)+0.01), lw=0.5, color="r")  # try also with arg 'db'

In [None]:
aword.plot(lambda x: ampdb(abs(x)*1e2+1), lw=0.15)
aword.plot(lambda x: 100*abs(x)**2, color='red', lw=0.25)

In [None]:
anoise.play(block=True)
arec.play(block=True)

In [None]:
# demonstrate plot() and play() daisy-chained and subsequent access to the _ dict
asnap.plot(marker='o', mfc='r', ms=7, lw=0.1).play()._['plot'][0].set_markevery((700, 200))
plt.xlim(0, 0.2);
asnap._

### Accessing items, slicing and time slicing 

* The signal is stored in the attribute self.sig
* you can read and assign to that attribute directly
    * note that by doing so you are responsible for keeping 
        self.sr and self.samples valid
* slicing works just as with arrays, sample-accurate

In [None]:
b = aword[5000:57000].plot()
b

In [None]:
b.sig *= 0.5+0.2*np.sin(2*np.pi*15*b.get_times())  # ampl modulation
b.plot().norm().play()
# note that repeated cell executions changes signal more and more

* use full slice [start:stop:stride] to downsample or reverse signal

In [None]:
aword[-1:0:-1].play(1)  # reversed word

In [None]:
def test_stride(stride=2):
    aword[0:40000:stride].play()
interact(test_stride, stride=(1,20,1));

* slicing with time: using tslice()

In [None]:
b = asnap.tslice(0.0, 0.4).plot().play() # try to adjust first arg to begin of snap

### Normalize signal amplitude and set gain

* norm(norm=1, dcflag) allows to normalize the signal 
    * to an extreme value given by norm>0
    * negative norm are interpreted as level in dB
* set dcflag=True to first remove DC bias.

In [None]:
for n in [1, 0.5, 0.1, -6, -12, -18, -24, -30, -36, -42]:
    asnap.tslice(0.1,0.4).norm(n).play(block=True)

* apply gain(amp=None, db=None) to returns an amplified signal
    * db overwrites amp, so use as follows

In [None]:
# increase level by 20 db
asnap.tslice(0.3, 0.5).gain(db=20).play()

In [None]:
# multiply signal with 42
asnap.tslice(0.3, 0.5).gain(42).play()

### Fading in and out, and arbitrary envelopes

The methods 
* fade_in(dur=0.1, curve=1) and
* fade_out(dur=0.1, curve=1)

allow to apply a polynomial fading at begin (_in) or end (_out)
* curve is the exponent to the line from 0 to 1, i.e. 
    * curve=2 is a parabolic curve, etc...
    * curve=0.5 is a sqrt curve, etc...

In [None]:
b = anoise.fade_in(0.4, curve=1).fade_out(0.4, curve=1) # try 1,2,3, 0.5, 0.33, 0.25
b.norm().play(1).plot()

In [None]:
anoise.fade_in(0.00, curve=1).fade_out(0.95, curve=8).play().plot() # snare drum

**envelope(amps, ts=None, curve=1, kind='linear')** 
allows to apply arbitrary linear envelopes
* amps is list or array of amplitude gains
* ts, is set, needs to be corresponding times for values in amp
* curve (as of now) is a polynomial exponent
* kind is either 'linear' or 'exp' (not yet implemented)

In [None]:
anoise.envelope([0,1,0.3,0.6,0]).plot()

In [None]:
plt.subplot(211)
anoise.envelope([0,1,0.5,0.5,0], [0,0.05,0.2,0.6,1]).plot() # adsr
plt.subplot(212)
anoise.adsr(0.05, 0.15, 0.5, 0.4, curve=2).plot(color='r', lw=0.4)

### Resample

**resample(self, target_sr=44100, rate=1, kind='quadratic')**:
* resample signal at sampling rate
* at the same time the playback rate can be modified
    * rate 0.5 (resp. 2) is half (resp. twice) the speed
* use kind to control the kind of interpolation
    * valid are those accepted by scipy.interpolate.interp1d, 
    * ('linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic', 'previous', 'next')
    * An integer specifies the order of the spline interpolator to use.
    * samples are seen as time points at which a new reading is taken
        * i.e. the left side of a rectangle in a sample and hold plot
* **Warning**: this is not band-limited. Aliasing will occur when downsampling

In [None]:
print(asnap)
asnap.resample(16000)   # resample signal at sampling rate

In [None]:
a = asnap[1630:1640]
a.plot(marker='o', lw=1, color='b', markersize=6)
a.resample(3*a.sr, kind='linear').plot(marker='.', lw=0.4, color='r')
a.resample(9*a.sr, rate=1, kind=2).plot(marker='.', lw=0.4, color='g');

In [None]:
asnap.play(block=True).resample(8000).play(block=True)  # beware of aliasing

### RMS

rms(axis=0) returns the root-mean-square of the signal
* no window is used
* use axis=1 to compute the rms samplewise over channels
* can be used with window_op() see below to estimate the amplitude envelope of a signal

In [None]:
asnap.rms(axis=0)

In [None]:
# here rms is used in window_op to compute stepwise signal energy, see window_op() below
asnap.plot(lw=0.1)
asnap.window_op(nperseg=512, stride=256, win='cosine', fn='rms', pad='mirror').plot(lw=3)
plt.axis([0,0.4, 0, 0.3]);

### get_duration, get_times

get_duration() returns the duration of the signal in seconds, computed as self.samples/self.sr

In [None]:
asnap.get_duration()

get_times() returns the array of timestamps for all samples, i.e. linspace(0,self.samples-1, self.samples)

In [None]:
Asig([0,1,0,1,0,1,0,1,0.5,0], sr=10).resample(20).get_times()  # try  other resampling rates, e.g. 5, 10, 20, 40

### add

as.add(sig, pos=None, amp=1, onset=None) 
* linearly superimposes signal sig (multiplied with amp) on signal as,
* starting at position pos
* onset overwrites pos, if specified

In [None]:
as1 = Asig(2.0, label='mix')

In [None]:
aev = Asig(np.sin(2*np.pi*256*np.linspace(0,0.5,int(0.5*as1.sr))), label='event').fade_out(0.5, 2).play()

In [None]:
for _ in range(100):
    as1.add(aev.resample(rate=6+2*np.random.randn()), onset=1.5*np.random.random())
as1.norm().plot().play()

### window

window(win='triang', **kwargs)
* applies a window function to the signal 
* the win argument and optional subsequent kwargs are forwarded to the scipy.signal.get_window(), see documentation there
* available functions are:
    * boxcar, triang, blackman, hamming, hann, bartlett, flattop, parzen, bohman, blackmanharris, nuttall, barthann, kaiser (needs beta), gaussian (needs standard deviation), general_gaussian (needs power, width), slepian (needs width), dpss (needs normalized half-bandwidth), chebwin (needs attenuation), exponential (needs decay scale), tukey (needs taper fraction)
* if parameters are needed, use a tuple instead of a string as first argument

In [None]:
anoise.window('hann').plot()
anoise.window(('gaussian', 5000)).gain(db=-6).plot()

### iirfilter

iirfilter(cutoff_freqs, btype='bandpass', ftype='butter', order=4, filter='lfilter', rp=None, rs=None)
* filters the signal with an iirfilter 
* of given ftype = ‘butter’, ‘cheby1’, ‘cheby2’, ‘ellip’, ‘bessel’
* of given btype = 'bandpass', 'bandstop', 'lowpass', 'highpass'
* of given order (integer)
* filtering with given filter method (default 'lfilter' but use 'filtfilt' for forward-backword filtering
* note that some filters require maximum ripple dB in rp and minimum passband attenuation dB in rs
* returns filtered signal as new signal, setting the _ dict keys 'a' and 'b' with filter coefficients

In [None]:
af = anoise.iirfilter([240, 2000], order=4, btype='bandpass', filter='lfilter')
afs = af.to_spec().plot(lambda x: ampdb(x)-46, lw=0.2)  # why -46?
af.plot_freqz(200, lw=3)
plt.ylim(-70,10)
plt.semilogx()  # comment out to see linear frequency
af._

### window_op

window_op(nperseg=64, stride=32, win=None, fn='rms', pad='mirror')
* performs a windowed operation on the signal
* using chunks of nperseg samples
* selected at stride stride
* applying window win to the chunk (any scipy.signal.window possible)
* and subjecting that signal to the function fn (default 'rms')
* TODO: implement proper padding, currently first window start is at 0, not centered at 0...

In [None]:
# here rms is used in window_op to compute stepwise signal energy, see window_op() below
asnap.plot(lw=0.1)
asnap.window_op(nperseg=512, stride=256, win='cosine', fn='rms', pad='mirror').plot(lw=3)
plt.axis([0,0.4, 0, 0.3]);

In [None]:
import scipy

In [None]:
# local linear correlation coefficints as signal - signal statistics audification in 3 line of code
def lk(a):
    return scipy.stats.pearsonr(a.sig, np.arange(a.sig.shape[0]))[0]
aword.window_op(8, 2, None, fn=lk).plot(lw=0.05).play()

### Overlap and add demo

overlap_add(nperseg=64, stride_in=32, stride_out=32, win=None, pad='mirror')
* cuts the signal in chunks of lengths nperseg
* starting at sample 0 with stride stride_in
* applying a window win to the chunks
* and adding them together into an empty signal at stride stride_out
* TODO: padding needs to be implemented...
* by choosing different values for stride_in and stride_out, a granular time stretching can be achieved

In [None]:
atest = aword
def ola_demo(begin=0.0, end=2.0, nperseg=128, stride_in=64, jitter_in=0, 
             stride_out=64, jitter_out=0):
    b = atest.tslice(begin, end).overlap_add(nperseg, stride_in, stride_out, 
                    jitter_in=jitter_in, jitter_out=jitter_out, win='triang')
    b.plot().norm(0.2).play()
interact(ola_demo, nperseg=(64,1024,32), 
         stride_in=(2, 512, 1), jitter_in=(0,200,10), 
         stride_out=(2,512,1), jitter_out=(0,200,10));

### find_events

find_events(self, step_dur=0.001, sil_thr=-20, sil_min_dur=0.1, sil_pad=[0.001,0.1])
* detects events separated by silence
* criterion for event is exceeding the silence threshold sil_thr (in dB)
* ending after silence of at least sil_min_dur seconds is observed
* the resulting event is then padded with signal left and right given by sil_pad (in seconds)
* find_events returns self, but sets its findings into dict self._ in key 'events'
    * which is a ndarray with column 1 all event_start_sample and event_stop_sample in columns

In [None]:
aa = Asig("samples/vocal_sequence.wav").plot() #.play()
# or record your own...
# arec = Asig(record(6.0), label='rec').norm()

In [None]:
aa.plot(lambda x: ampdb(abs(x)+1e-3));
# obviously events exceed -35 dB, and noise is below that level

In [None]:
import time

In [None]:
aa.find_events(step_dur=0.001, sil_thr=-35, sil_min_dur=0.1, sil_pad=[0.001,0.05])
for i, (a,e) in enumerate(aa._['events']):
    aa[a:e].norm().play(block=False)[::20].plot(lambda x: i+0.5*x, lw=0.5, color='r')
    time.sleep(0.2)

In [None]:
# show all event onsets
aa._['events'][:,0]

### select_event

select_event(index=None, onset=None) 
* allows to easily select an event in an audio file
* for that it uses the _['events'] entry as set either manually or via the above find_events() method
* index specifies the number in the list, starting with 0
* the event is sliced from the signal using the begin and end samples
* an onset argument has priority over index and finds the event whose begin is closest to the onset
    * TODO: preferred: the event in which the onset lies should be preferred to the nearest begin...

In [None]:
ae = aa.select_event(4).norm(-6).plot().play(0.8)
ae

In [None]:
aa.select_event(onset=5.2).plot().play()

### spectrum

In [None]:
asnap.spectrum() # computes fft, returns first half

### plot_spectrum

In [None]:
asnap.plot_spectrum(lw=0.5)  #plots spectrum magnitude and phase

### spectrogram

In [None]:
plt.subplot(211); 
a = asnap.norm().plot('db');plt.xlim(0, 1)
freqs, times, S = a.spectrogram(nperseg=512)

plt.subplot(212);
plt.pcolormesh(times, freqs, ampdb(np.abs(S)+1e-10), cmap='hot')
plt.colorbar();

### to_spec

### fun stuff...

In [None]:
#.to_spec().weight([0,1,0.3, 0.1], [800, 1200, 5500, 12000]).to_sig() #.norm().play()
# as3[15000::].tslice(0,0.5).norm().fade_in(0.2).fade_out(0.2).to_spec().weight([0, 1,5,1], [4000, 4001, 9000, 13000]).plot() # to_sig().play(0.5)
# gain(amp=1).plot_spectrum()
# as3[0:7000].resample(rate=0.125).norm().fade_in(0.2, curve=2).fade_out(0.1, curve=4).play()

In [None]:
# aa = Asig(np.random.random(10000)-0.5, 8000)
h = asnap[6000:15000].resample(8000).to_spec().weight([0,1,0.2,0], [100, 1510, 1920, 2990], curve=1)
h.plot() # rfftspec
h.to_sig().norm().gain(0.2).play(1)

## Asig synthesis/sonification examples

In [None]:
sr = 44100
t = np.linspace(0, 1, sr)
v = np.sin(2*np.pi*101*t**1.5)
si = Asig(v, sr, "chirp").envelope([0,1,0], [0,0.05,1], curve=1.9)
# si.window_op(64, 256, fn=lambda a: np.max(a.sig)).norm(0.9).plot()
%time si[::4].window_op(256, 128, fn='rms', win='bartlett').plot()

In [None]:
sr = 8000
t = np.linspace(0, 0.4, int(sr*0.2))
v = np.sin(2*np.pi*200*t**1.1)
si = Asig(v, sr, "chirp").fade_in(0.01).envelope([0,1,0], [0,0.03,0.2], curve=4).plot().play()

In [None]:
son = Asig(np.zeros(5*sr), sr, "sonification")

In [None]:
for i in range(500):
    onset = np.random.randint(0, 4000)/1000
    amp = abs((i-250)/250)
    son.add(si.resample(sr, rate=1+2*np.random.random()), onset=onset, amp=amp)
son.norm().play(1)
son.plot();

## Aspec - Audio Spectrum class

### init

### repr

### plot

### weight

### to_sig

## Astft - Audio STFT class

In [None]:
araw = Asig(record(2), 44100, 'vocal').norm()
a = araw[30000:80000].resample(22050)

In [None]:
a.norm().play()

In [None]:
ast = Astft(a)
ast

In [None]:
ast.plot(ampdb)

In [None]:
nf, nt = ast.stft.shape
ast.stft[0:20,70:170]=0.00001

In [None]:
ast.plot(np.log10);

In [None]:
ast.to_sig().norm(0.8).play()