Skip to content

Commit

Permalink
Merge pull request #2 from jthiem/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
jthiem committed May 8, 2018
2 parents a3e9a31 + 5835e4f commit 85f3072
Show file tree
Hide file tree
Showing 12 changed files with 243 additions and 54 deletions.
6 changes: 2 additions & 4 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,5 @@
stay constant it is better to store the FIR filter in frequency domain, taking
a bit more memory but saving reconverting the time domain impulse into
frequency domain in every invocation.

- Slaney's 1995 gammatone filters should be implemented before I release
this library to 1.0.0


- Document all classes and helper functions!
3 changes: 2 additions & 1 deletion gtfblib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .ozgtfb import OZGTFB
from .chen import Chen
from .fir import FIR
from .ozgtfb import OZGTFB
from .slaney import Slaney
22 changes: 11 additions & 11 deletions gtfblib/chen.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
#!/usr/local/bin/python3.5
from __future__ import division
import numpy as np
from scipy.signal import lfilter, lfiltic

from .gtfb import gtfb, Hz2ERBnum

class Chen(gtfb):

def __init__(self, N_GD=None, **kwargs):
"""Initialize coefficients for GTFB as in Chen & Hohmann 2015."""
gtfb.__init__(self, **kwargs)

# if N_GD is not set, assume value corresponding to 16 ms
if N_GD is None:
N_GD = np.ceil(0.016*self.fs).astype(int)

# filter coefficient parameter and powers
self.Ak = np.exp(-self._normB + 1j*self._omega)
self.Ak2 = self.Ak**2
self.Ak3 = self.Ak**3
# per-channel gain is based on the difference (in ERB) to the
# next filter. This does not work for the last filter, so we
# next filter. This does not work for the last filter, so we
# duplicate it
ERBstep = np.diff(Hz2ERBnum(self.cfs))
ERBstep = np.hstack((ERBstep, ERBstep[-1]))
Expand All @@ -30,7 +30,7 @@ def __init__(self, N_GD=None, **kwargs):
self.Ck = np.exp(-1j*self._omega*np.fmin(N_GD, N_PE))
# group delay
self.Dk = np.fmax(0, N_GD-N_PE).astype(int)

# actual coefficients
# self.ComplexA = [1 -4*C2(i) 6*C2(i)^2 -4*C2(i)^3 C2(i)^4]
# self.ComplexB = normalizedGain(i)*exp(compensatedPhase(i))*...
Expand All @@ -40,18 +40,18 @@ def __init__(self, N_GD=None, **kwargs):
self.ComplexA = np.zeros((self.nfilt, 5), dtype=np.complex128)
self.ComplexB = np.zeros((self.nfilt, 4+N_GD), dtype=np.complex128)
for n in range(self.nfilt):
self.ComplexA[n, :] = [1, -4*self.Ak[n], 6*self.Ak2[n],
self.ComplexA[n, :] = [1, -4*self.Ak[n], 6*self.Ak2[n],
-4*self.Ak3[n], self.Ak[n]**4]
self.ComplexB[n, self.Dk[n]+1:self.Dk[n]+4] = self.Bk[n] \
* self.Ck[n] * np.array([self.Ak[n], 4*self.Ak2[n], self.Ak3[n]])

self._clear()

def _clear(self):
"""clear initial conditions"""
self._ics = np.vstack([lfiltic(self.ComplexB[n, :],
self._ics = np.vstack([lfiltic(self.ComplexB[n, :],
self.ComplexA[n, :], np.complex128([])) for n in range(self.nfilt)])

def process(self, insig):
"""Process one-dimensional signal, returning an array of signals
which are the outputs of the filters"""
Expand Down
20 changes: 10 additions & 10 deletions gtfblib/fir.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/local/bin/python3.5
from __future__ import division
import numpy as np

from .gtfb import gtfb
Expand Down Expand Up @@ -29,20 +29,20 @@ def __init__(self, complexresponse=False, L=None, reversetime=False,
t = np.arange(1, L+1)/self.fs
edelay = np.zeros(self.nfilt)
if groupdelay is None:
groupdelay = int(np.ceil(self.fs*3/(2*np.pi*self.ERB[0])))
groupdelay = int(np.ceil(3/self._normB[0]))
# print('Group delay set to', groupdelay)
if groupdelay>0:
t = np.arange(-groupdelay+1, L-groupdelay+1)/self.fs
edelay = 3/(2*np.pi*self.ERB)
edelay = 3/(self._normB*self.fs)
self.groupdelay = groupdelay

# compute impulse responses
self.ir = np.zeros((L, self.cfs.shape[0]), dtype=np.complex128)
self.ir = np.zeros((self.nfilt, L), dtype=np.complex128)
for n, cf in enumerate(self.cfs):
env = ((2*np.pi*self.ERB[n])**4)/6 * (t+edelay[n])**3 \
* np.exp(-2*np.pi*self.ERB[n]*(t+edelay[n]))
env = ((self._normB[n]*self.fs)**4)/6 * (t+edelay[n])**3 \
* np.exp(-self._normB[n]*self.fs*(t+edelay[n]))
env[env<0] = 0
self.ir[:, n] = env * np.exp(2*np.pi*1j*self.cfs[n]*t)
self.ir[n, :] = env * np.exp(2*np.pi*1j*self.cfs[n]*t)

# if real result is wanted, convert now
if not complexresponse:
Expand All @@ -60,11 +60,11 @@ def _clear(self):
for n in range(self.nfilt)]

def process(self, insig):
out = np.zeros((insig.shape[0], self.nfilt), dtype=self.ir.dtype)
out = np.zeros((self.nfilt, insig.shape[0]), dtype=self.ir.dtype)
for n in range(self.nfilt):
out[:, n], self._memory[n] = olafilt(self.ir[:, n], insig,
out[n, :], self._memory[n] = olafilt(self.ir[n, :], insig,
self._memory[n])
return out

def process_single(self, insig, n):
return olafilt(self.ir[:, n], insig)
return olafilt(self.ir[n, :], insig)
13 changes: 6 additions & 7 deletions gtfblib/gtfb.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import division
from __future__ import division
import numpy as np

def Hz2ERBnum(Hz):
"""Return the ERB filter number for a given frequency."""
"""Return the ERB filter number for a given frequency."""
return 21.4*np.log10(Hz*4.37e-3 + 1.0)

def ERBnum2Hz(ERB):
Expand All @@ -23,18 +23,17 @@ def ERBspacing_given_spacing(cf_first, cf_last, ERBstep):

class gtfb:
"""Superclass for gammatone filterbank objects."""
def __init__(self, fs=16000, cfs=None):

def __init__(self, fs=16000, cfs=None, EarQ=(1/0.108), Bfact=1.0186):

if cfs is None:
cfs = ERBspacing_given_N(80, 0.9*fs/2, 32)

self.cfs = cfs
self.ERB = 1.0186*(0.108*cfs+24.7)
self.ERB = cfs/EarQ+24.7
self.nfilt = cfs.shape[0]
self.fs = fs

# shortcuts used by derived methods
self._omega = 2*np.pi*cfs/fs
self._normB = 2*np.pi*self.ERB/fs

self._normB = 2*np.pi*Bfact*self.ERB/fs
1 change: 1 addition & 0 deletions gtfblib/olafilt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Overlap-add FIR filter, (c) Joachim Thiemann 2016
# also available on its own from https://github.com/jthiem/overlapadd
#
from __future__ import division
import numpy as np

def olafilt(b, x, zi=None):
Expand Down
9 changes: 4 additions & 5 deletions gtfblib/ozgtfb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/local/bin/python3.5
from __future__ import division
import numpy as np
from scipy.signal import lfilter, lfiltic

Expand All @@ -9,20 +9,20 @@ class OZGTFB(gtfb):
def __init__(self, **kwargs):
"""Initialize OZGTFB coefficients."""
gtfb.__init__(self, **kwargs)

# calculate filter coefficients
z = np.exp(1j * self._omega);
# feedback coefs
self._feedback = np.ones((self.nfilt, 3))
self._feedback[:,1] = -2*np.cos(self._omega)*np.exp(-self._normB)
self._feedback[:,2] = np.exp(-2*self._normB)

# per-channel gain
tSG = np.abs(1 - 2*np.cos(self._omega) * z *
np.exp(-self._normB) + np.exp(-2*self._normB) * z**2)
fG = np.abs(tSG/(1-z))
self._gain = fG*tSG**3

# set initial conditions
self._clear()

Expand Down Expand Up @@ -61,4 +61,3 @@ def process_single(self, insig, n):
stage3out = lfilter([1,], self._feedback[n,:], stage2out)
stage4out = lfilter([1,], self._feedback[n,:], stage3out)
return self._gain[n]*stage4out

71 changes: 71 additions & 0 deletions gtfblib/slaney.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import division
import numpy as np
from numpy import cos, exp, pi, sin, sqrt
from scipy.signal import lfilter, lfiltic

from .gtfb import gtfb

class Slaney(gtfb):

def __init__(self, EarQ=9.26449, Bfact=1.019, **kwargs):
"""Initialize coefficients for GTFB as in Slaney 1993."""
gtfb.__init__(self, EarQ=EarQ, Bfact=Bfact, **kwargs)
T = 1/self.fs
B = self._normB*self.fs
cf = self.cfs
# the following copied almost verbatim from Slaneys Apple TR #35
gain = np.abs((-2*exp(4j*cf*pi*T)*T +
2*exp(-(B*T) + 2j*cf*pi*T)*T *
(cos(2*cf*pi*T) - sqrt(3 - 2**(3/2))*
sin(2*cf*pi*T))) *
(-2*exp(4j*cf*pi*T)*T +
2*exp(-(B*T) + 2j*cf*pi*T)*T *
(cos(2*cf*pi*T) + sqrt(3 - 2**(3/2)) *
sin(2*cf*pi*T))) *
(-2*exp(4j*cf*pi*T)*T +
2*exp(-(B*T) + 2j*cf*pi*T) * T *
(cos(2*cf*pi*T) -
sqrt(3 + 2**(3/2))*sin(2*cf*pi*T))) *
(-2*exp(4j*cf*pi*T)*T+2*exp(-(B*T) + 2j*cf*pi*T) * T *
(cos(2*cf*pi*T) + sqrt(3 + 2**(3/2))*sin(2*cf*pi*T))) /
(-2 / exp(2*B*T) - 2*exp(4j*cf*pi*T) +
2*(1 + exp(4j*cf*pi*T)) / exp(B*T))**4)
self.feedback = np.zeros((self.nfilt, 9))
self.forward = np.zeros((self.nfilt, 5))
self.forward[:,0] = T**4 / gain
self.forward[:,1] = -4*T**4*cos(2*cf*pi*T)/exp(B*T)/gain
self.forward[:,2] = 6*T**4*cos(4*cf*pi*T)/exp(2*B*T)/gain
self.forward[:,3] = -4*T**4*cos(6*cf*pi*T)/exp(3*B*T)/gain
self.forward[:,4] = T**4*cos(8*cf*pi*T)/exp(4*B*T)/gain
self.feedback[:,0] = np.ones((self.nfilt,))
self.feedback[:,1] = -8*cos(2*cf*pi*T)/exp(B*T)
self.feedback[:,2] = 4*(4 + 3*cos(4*cf*pi*T))/exp(2*B*T)
self.feedback[:,3] = -8*(6*cos(2*cf*pi*T) + cos(6*cf*pi*T))/exp(3*B*T)
self.feedback[:,4] = 2*(18 + 16*cos(4*cf*pi*T) + cos(8*cf*pi*T))/exp(4*B*T)
self.feedback[:,5] = -8*(6*cos(2*cf*pi*T) + cos(6*cf*pi*T))/exp(5*B*T)
self.feedback[:,6] = 4*(4 + 3*cos(4*cf*pi*T))/exp(6*B*T)
self.feedback[:,7] = -8*cos(2*cf*pi*T)/exp(7*B*T)
self.feedback[:,8] = exp(-8*B*T)
# for testing and debugging
self._gain = gain
# last task: initialize memory
self._clear()

def _clear(self):
"""clear initial conditions"""
self._ics = np.vstack([lfiltic(self.forward[n, :],
self.feedback[n, :], np.zeros((1))) for n in range(self.nfilt)])

def process(self, insig):
"""Process one-dimensional signal, returning an array of signals
which are the outputs of the filters"""
out = np.zeros((self.nfilt, insig.shape[0]))
for n in range(self.nfilt):
out[n, :], self._ics[n, :] = lfilter(self.forward[n, :],
self.feedback[n, :], insig, zi=self._ics[n, :])
return out

def process_single(self, insig, n):
"""Process a signal with just one of the filters of the
filterbank, without affecting the state."""
return lfilter(self.forward[n, :], self.feedback[n, :], insig)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup

setup(name='gtfblib',
version='0.2.0',
version='0.3.0',
description='A selection of Gammatone Filterbanks',
url='http://github.com/jthiem/gtfblib',
author='Joachim Thiemann',
Expand Down
Binary file added test/Slaney16kTestdata.mat
Binary file not shown.
69 changes: 54 additions & 15 deletions test/test_FIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
from gtfblib.gtfb import ERBnum2Hz
from gtfblib.fir import FIR

#from test_aux_functions import peak_error
def peak_error(a, b):
assert(a.shape==b.shape)
return np.max(np.abs(a-b))
from test_aux_functions import peak_error

# The FIR filterbank is designed to replicate the FB I used for my
# Thesis work, which can be found at
Expand All @@ -29,36 +26,78 @@ def peak_error(a, b):
# warning: some arrays are transposed depending on if you are using
# MATLAB or Octave

fs = 16000
insig = loadmat('test/Inputdata.mat', squeeze_me=True)['indata16k']
Fdata = loadmat('test/FIR16kTestdata.mat', squeeze_me=True)['FD']
refout = loadmat('test/FIR16kTestdata.mat', squeeze_me=True)['y']
fbFIR = FIR(fs=fs, cfs=ERBnum2Hz(np.arange(1, 32.1, .5)), complexresponse=True)

def test_FIR_ERB():
assert(peak_error(fbFIR.ERB, Fdata['b'][()])<1e-10)
# check if the filter bandwidths are calculated the same way
# as the MATLAB reference
Fdata = loadmat('test/FIR16kTestdata.mat', squeeze_me=True)['FD']
fbFIR = FIR(fs=16000, cfs=ERBnum2Hz(np.arange(1, 32.1, .5)),
complexresponse=True)
assert(peak_error(1.0186*fbFIR.ERB, Fdata['b'][()])<1e-10)

def test_FIR_ir():
assert(peak_error(fbFIR.ir, Fdata['G'][()].T.conj())<1e-10)
# check if the impulse response matches the MATLAB version
Fdata = loadmat('test/FIR16kTestdata.mat', squeeze_me=True)['FD']
fbFIR = FIR(fs=16000, cfs=ERBnum2Hz(np.arange(1, 32.1, .5)),
complexresponse=True)

assert(peak_error(fbFIR.ir, Fdata['G'][()].conj())<1e-10)

def test_FIR_process():
fbFIR._clear()
# check if filtering works correctly
insig = loadmat('test/Inputdata.mat', squeeze_me=True)['indata16k']

fbFIR = FIR(fs=16000, cfs=ERBnum2Hz(np.arange(1, 32.1, .5)),
complexresponse=True)
outsig = fbFIR.process(insig)
refout = loadmat('test/FIR16kTestdata.mat', squeeze_me=True)['y'].T

assert(peak_error(outsig, refout)<1e-10)

def test_FIR_process_single():
# check if a single channel is processed correctly
insig = loadmat('test/Inputdata.mat', squeeze_me=True)['indata16k']

fbFIR = FIR(fs=16000, cfs=ERBnum2Hz(np.arange(1, 32.1, .5)),
complexresponse=True)
refout = loadmat('test/FIR16kTestdata.mat', squeeze_me=True)['y']
outsig = fbFIR.process_single(insig, 10)

assert(peak_error(outsig, refout[:, 10])<1e-10)

def test_FIR_process_memory():
fbFIR._clear()
# check if processing a whole is the same as processing 2 chunks
insig = loadmat('test/Inputdata.mat', squeeze_me=True)['indata16k']

fbFIR = FIR(fs=16000, cfs=ERBnum2Hz(np.arange(1, 32.1, .5)),
complexresponse=True)
refout = fbFIR.process(insig)

fbFIR = FIR(fs=16000, cfs=ERBnum2Hz(np.arange(1, 32.1, .5)),
complexresponse=True)
outsig1 = fbFIR.process(insig[:800])
outsig2 = fbFIR.process(insig[800:])
outsig = np.vstack((outsig1, outsig2))
outsig = np.hstack((outsig1, outsig2))

assert(peak_error(outsig, refout)<1e-10)

def test_FIR_clear():
# make sure that if _clear() is called result is same as a brand
# new filterbank
insig = loadmat('test/Inputdata.mat', squeeze_me=True)['indata16k']

fbFIR = FIR(fs=16000, cfs=ERBnum2Hz(np.arange(1, 32.1, .5)),
complexresponse=True)
refout = fbFIR.process(insig)

fbFIR = FIR(fs=16000, cfs=ERBnum2Hz(np.arange(1, 32.1, .5)),
complexresponse=True)
_ = fbFIR.process(np.random.randn(1000))
fbFIR._clear()
outsig = fbFIR.process(insig)
assert(peak_error(outsig, refout)<1e-10)

def test_FIR_reversetime():
# check if the reverse time option works as expected
fbFIR1 = FIR()
fbFIR2 = FIR(reversetime=True)
assert(all(f1.ir[10, ::-1]==f2.ir[10,:]))

0 comments on commit 85f3072

Please sign in to comment.