<div align="right"><i>COM418 - Computers and Music</i></div>
<div align="right"><a href="https://people.epfl.ch/paolo.prandoni">Paolo Prandoni</a> and <a href="https://people.epfl.ch/lucie.perrotta">Lucie Perrotta</a>, <a href="https://www.epfl.ch/labs/lcav/">LCAV, EPFL</a></div>

<p style="font-size: 30pt; font-weight: bold; color: #B51F1F;">Pitch shifting and time stretching</p>

In [None]:
%matplotlib inline
import ipywidgets as widgets
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal as sp
from IPython.display import Audio
from scipy.io import wavfile

plt.rcParams['figure.figsize'] = 14, 4 
plt.rcParams['image.cmap'] = 'tab10'

In [None]:
def load_audio(filename):
    x_sf, x = wavfile.read(filename)
    x = (x - np.mean(x)) / 32767.0
    return x, x_sf

def multiplay(clips, sf, title=None):
    outs = [widgets.Output() for c in clips]
    for ix, item in enumerate(clips):
        with outs[ix]:
            print(title[ix] if title is not None else "")
            display(Audio(item, rate=sf, normalize=True))
    return widgets.HBox(outs)

In [None]:
def plot_spec(x, Fs, max_freq=None, do_fft=True):
    C = int(len(x) / 2)  # positive frequencies only
    if max_freq:
        C = int(C * max_freq / float(Fs) * 2) 
    X = np.abs(np.fft.fft(x)[0:C]) if do_fft else x[0:C]
    N = Fs * np.arange(0, C) / len(x);
    plt.plot(N, X)
    return N, X

In [None]:
ss, ssf = load_audio('snd/speech.wav')
ms, msf = load_audio('snd/music.wav')
cs, csf = load_audio('snd/cymbal.wav')
ks, ksf = load_audio('snd/clarinet.wav')
ys, ysf = load_audio('snd/yesterday.wav')
assert ssf == msf == csf == ksf == ysf, 'notebook requires sampling rates for all audio samples to be the same'

# The initial tradeoffs

## Speed, pitch, timbre

 * the speed of an audio signal is related to the time interval between events, eg $ x_\mathrm{start} = x(t_0), x_\mathrm{stop} = x(t_1), \Delta_t = t_1 - t_0 $
 * the sensation of _pitch_ is related to the (local) periodicity of a signal: if $ x(t) \approx x(t + nP) $, pitch "frequency" is $F = 1/P$
 * the _timbre_ is related to the spectrum $ X(f) $
 
Ideal scenario: being able to change pitch and speed independently without changing timbre.

## Time scaling

Changing the time scale is a linear operation: $x'(t) = x(t / \alpha)$

Changing the time scale affects everything at once:
 * speed is scaled by $\alpha$ as $\Delta'_t = \alpha \Delta_t $: faster if $\alpha < 1$, slower otherwise
 * period is changed as $P' = \alpha P$ and pitch as $F' = F/\alpha$: higher pitch if $\alpha < 1$, lower otherwise
 * timbre is changed as $X'(f) = X(\alpha f)$ so that if $\alpha < 1$ frequencies are stretched (chipmunk) otherwise they are contracted (Darth Vader)

### Analog time scaling

<img width="400" style="" src="img/turntable.jpg">

### Digital time scaling

#### Method #1
<img width="600" style="float:right;" src="img/multirate.png">

 * $\alpha = N/M$
 * upsample by $N$
 * downsample by $M$
 
can get needlessly expensive for large values of $M, N$

#### Method #2

Fractional resampling: for each output index $m$:

 * compute the closest input index $n = \lfloor \alpha m \rfloor$
 * linearly interpolate between $x[n]$ and $x[n+1]$ at $\tau = \alpha m - n$

In [None]:
def resample(x, alpha):
    # length of the output signal after resampling
    n_out = int(np.floor(len(x) * alpha))
    y = np.zeros(n_out)
    for iy in range(0, n_out - 1):
        t = iy / alpha 
        ix = int(t)
        y[iy] = (1 - (t - ix)) * x[ix] + (t - ix) * x[ix + 1] 
    return y

In [None]:
multiplay([ss, resample(ss, 0.6), resample(ss, 1.5)], ssf, title=['speech sample', 'sped up via resampling', 'slowed down via resampling'])

In [None]:
multiplay([ms, resample(ms, 0.6), resample(ms, 1.5)], msf, title=['music sample', 'sped up via resampling', 'slowed down via resampling'])

### Issues with time scaling:

 * if used to change duration: pitch and timbre also affected
 * if used to change pitch: duration and timbre also affected
 * cannot be implemented in real time

## Frequency shifting

### Naive upmodulation
Remember the modulation theorem: $\mathrm{FT}\{x(t)\cos(2\pi f_0 t)\} = [X(f - f_0) + X(f + f_0)]/2$

In [None]:
def pitchshift_mod(x, f, sf):
    return x * np.cos(2 * np.pi * f / sf * np.arange(0, len(x)))

In [None]:
multiplay([ss, pitchshift_mod(ss, 400, ssf)], ssf, title=['speech sample', 'shifted up (simple modulation)'])

<img width="250" style="float: left; margin: 0px 120px 0 0;" src="img/voice_changer.jpg">

Good enough for a prank call but:

 * audio is baseband, so we can only go "up"
 * shift is smaller than effective bandwidth: aliasing
 * inaudible low frequencies are brought into hearing range (warbling)

### Better spectral shifting

<img width="600" style="float: right;" src="img/ssb.png">

Take a page out of SSB (single-sideband modulation)

 * bandpass filter the audio to eliminate low frequencies and limit bandwidth
 * if shifting down, eliminate low frequencies below modulation frequency
 * compute analytic signal using a FIR approximation to the Hilbert filter
 * shift using complex exponential
 * take real part


In [None]:
def pitchshift_ssb(x, f, sf, L=40):
    # bandpass the voice signal
    f0 = 100.0 if f > 0 else 100.0 - f
    x = sp.lfilter(*sp.butter(6, [f0, min(10000, 0.9 * sf/2)], fs=sf, btype='band'), x)
    # compute analytic signal
    h = sp.remez(2 * L + 1, [0.02, 0.48], [-1], type='hilbert')
    xa = 1j * sp.lfilter(h, 1, x)[L:] + x[:-L]
    # shift and take real part
    return np.real(xa * np.exp(1j * 2 * np.pi * f / sf * np.arange(0, len(xa))))

In [None]:
multiplay([ss, pitchshift_ssb(ss, 400, ssf), pitchshift_ssb(ss, -400, ssf)], ssf, title=['speech sample', 'shifted up (SSB)', 'shifted down (SSB)'])

In [None]:
multiplay([ss, pitchshift_mod(ss, 400, ssf), pitchshift_ssb(ss, 400, ssf)], ssf, title=['speech sample', 'simple up-modulation', 'ssb up-modulation'])

In [None]:
%%capture spectra
for n, x in enumerate([ss, pitchshift_mod(ss, 400, ssf), pitchshift_ssb(ss, 400, ssf)]):
    plt.subplot(3, 1, n+1)
    plot_spec(x, ssf, 2000)
    if n > 0:
        plt.axvline(400, color='red')

 * spectrum of modulated signal shows lots of spurious content in low frequency region
 * modulated signal has aliasing (see the two peaks at $400 \pm 150$Hz
 * SSB-modulated signal is much better

In [None]:
spectra.show()

### _The_ issue with spectral shifting

Good things so far: 
 * can be implemented in real time
 * preserves the speed of the original audio
 
But:

In [None]:
multiplay([ms, pitchshift_ssb(ms, 400, ssf), pitchshift_ssb(ms, -400, ssf)], ssf, title=['music sample', 'shifted up (SSB)', 'shifted down (SSB)'])

Pitch perception requires a _harmonic_ spectral structure, with spectral lines at multiples of a fundamental frequency:

$$
    f_n = nf_1, n = 1, 2, 3, \ldots \Rightarrow \frac{f_1}{f_n} = \frac{1}{n}
$$

Spectral shifting breaks the harmonic structure:

$$
    f'_n = \Delta f + nf_1 \Rightarrow \frac{f'_1}{f'_n} = \frac{\Delta f + f_1}{\Delta f + nf_1}
$$

# Granular Synthesis
<img width="500" style="float: right;" src="img/gsplot.jpg">

In [Granular Synthesis](https://en.wikipedia.org/wiki/Granular_synthesis) complex waveforms are be built by stitching together very short sound snippets called "grains". 
 
 * used as a compositional tool to generate complex timbres at arbitrary pitches
 * each grain must be "pitched"
 * works well for non-pitched sounds too
 * lots of sophisticated variations exist to maximize output quality



Helper function to convert milliseconds to samples

In [None]:
def ms2n(ms, sf):
    return int(float(sf) * float(ms) / 1000.0)

## Time stretching via granular synthesis 

### Proof of concept: grain repetition

 * split signal into small grain
 * repeat each grain two or more times

In [None]:
def gs_poc(x, grain_size, M=2):
    y = np.zeros(M * (len(x) + 1))
    for n in range(0, len(x) - grain_size, grain_size):
        y[M * n : M * (n + grain_size)] = np.tile(x[n : n + grain_size], M)
    return y

In [None]:
grain_size = ms2n(30, ssf)
multiplay([gs_poc(ss, grain_size), gs_poc(ms, grain_size)], ssf, title=['slowed down speech', 'slowed down music'])   

It works, although the simple tiling of the grains creates discontinuities at the boundaries that result in _blocking artefacts_ (the clicking noise). We need to fix that.

### Crossfading grains

First ingredient to mitigate artefacts: overlap and *crossfade* the grains via a a *tapering* window. 

 * windows should fade to zero at both ends
 * grains should be overlapped so that the sum of windows is one

The following function returns a simple tapering window with linear crossfading at the edges
 
 * the overlap parameter $0 \le a \le 1$ determines the *total* amount of taper (left and right)
 * the function also returns a *stride* value $S$ in samples, i.e. the amount of shift for correct overlap

In [None]:
def tapering_window(N, overlap):
    R = int(N * overlap / 2)
    r = np.arange(0, R) / float(R)
    win = np.r_[r, np.ones(N - 2*R), r[::-1]]
    stride = N - R - 1 if R > 0 else N
    return win, stride

In [None]:
def test_overlap(win, stride, size=None, N = 3):
    size = size if size else len(win)
    y = np.zeros((N - 1) * stride + size)
    for n in range(0, N):
        plt.plot(np.arange(n * stride, n * stride + size), win, 'C0')
        y[n*stride:n*stride+size] += win
    plt.plot(y, 'C2', linewidth=8, alpha=0.3)
    plt.gca().set_ylim([0, 1.1]);

In [None]:
test_overlap(*tapering_window(100, 0.4))

### Basic pitch-preserving time stretching

<img width="800" style="float: right;" src="img/ola.png">

For a stretch factor $\alpha$: 

 * choose grain size (around 40 ms for speech, 100 ms for music) 
 * choose overlap (from 40% to 100%) 
 * compute the output stride $S$ (in samples)
 * iterate:
     * get an input grain at sample $k \lfloor S / \alpha \rfloor$  
     * blend the grain in the output signal at sample $kS$

In [None]:
def timescale_gs(x, alpha, grain_size, overlap=0.4):
    win, stride = tapering_window(grain_size, overlap)
    in_hop, out_hop = int(stride / alpha), stride
    y = np.zeros(int(alpha * len(x)))
    ix, iy = 0, 0
    while ix < len(x) - grain_size and iy < len(y) - grain_size:
        y[iy:iy+grain_size] += x[ix:ix+grain_size] * win
        iy += out_hop
        ix += in_hop
    return y

In [None]:
grain_size = ms2n(40, ssf)
multiplay([ss, timescale_gs(ss, 0.8, grain_size), timescale_gs(ss, 1.5, grain_size)], ssf, title=['speech sample', 'sped up (GS)', 'slowed down (GS)'])

In [None]:
grain_size = ms2n(100, msf)
multiplay([ms, timescale_gs(ms, 0.8, grain_size), timescale_gs(ms, 1.5, grain_size)], ssf, title=['music sample', 'sped up (GS)', 'slowed down (GS)'])

Remarks so far:
  * time stretching works best to speed up speech
  * slowed down speech still has clicking artefacts
  * music is OK but the lower pitches have significant detuning
  
There seems to be a problem with low frequencies and still significant artefacts. We'll talk more about those later.

## Pitch shifting via granular synthesis

Idea:
 * use GS to stretch time by a factor $\alpha$ without affecting pitch
 * use time scaling (i.e. resampling) by a factor $1/\alpha$ to bring original time back and change pitch as a side effect

### Basic implementation

In two passes, non real-time:

In [None]:
def pitchshift_gs(x, alpha, grain_size, overlap=0.4):
    return(resample(timescale_gs(x, alpha, grain_size, overlap), 1 / alpha))

In [None]:
semitone = 2 ** (1.0 / 12)
alpha, grain_size = semitone ** 2, ms2n(100, ssf)
multiplay([ms, pitchshift_gs(ms, alpha, grain_size), pitchshift_gs(ms, 1 / alpha, grain_size)], ssf, title=['music sample', 'up two semitones (GS)', 'down two semitones (GS)'])

### Real-time implementation

Time stretching and resampling can be combined by synthesizing the output via *resampled grains*. 

Observaton: for a simple resampler, the (non-integer) input time index for an output sample index $n$ is $t = n/\alpha$. Graphically:

In [None]:
%%capture ramps
n = np.arange(0, 100)
for ix, alpha in enumerate([1, 1.6, 0.6]):
    plt.subplot(1, 3, ix+1)
    plt.plot(resample(n, alpha)[:-1])
    plt.gca().set_xlim([0, 100])
    plt.xlabel('output index')    
    plt.ylabel('input index')
    plt.title(rf'$\alpha$ = {alpha}')
plt.subplots_adjust(wspace=.3)

In [None]:
ramps.show()

Idea: use granular synthesis but directly resample each input grain before using it in the output:

 * for a nomimal output grain size we will need to use a larger or smaller input chunk
 * the process goes through the input signal in a zig-zag patterns
 * since we are not changing the time scale, the input and output indices are aligned at the beginning of each grain

(note that we use no overlap here for clarity)

In [None]:
def gs_map(n, alpha, grain_size):
    # computes the fractional index inside an input grain for a given output index
    # start sample for input grain
    t = np.floor(n / grain_size) * grain_size
    # fractional index in input grain
    t += (n - t) * alpha
    return t

In [None]:
%%capture ramps
n = np.arange(0, 100)
for ix, alpha in enumerate([1, 1.6, 0.6]):
    plt.subplot(1, 3, ix+1)
    plt.plot(gs_map(n, alpha, 15))
    plt.gca().set_xlim([0, 100])
    plt.gca().set_ylim([0, 100])
    plt.title(rf'$\alpha$ = {alpha}')
    plt.xlabel('output index')    
    plt.ylabel('input index')

In [None]:
ramps.show()

More in detail, in granular pitch shifting we generate output grains by performing fractional resampling on fixed-length input _chunks_; the start times for an input chunk and the corresponding output grain are synchronized but the number of input samples needed to produce the output grain will be larger or smaller than the size of the output grain according to whether we're raising or lowering the pitch. For instance, if the grain size is 100 samples and we are increasing the output frequency by 20%, we will need to use input chunks that are at least 120 samples long. To preserve the synchronicity of input and output, at the end of a grain generation process we will need to jump back in the input data by 20 samples. This way, the maximum buffer delay to produce a grain will be at most than $\alpha G$ samples, where $G$ is the size of the grain and $\alpha$ is the resampling factor. 

Here is a real-time granular pitch shifter using overlapping grains with a tapering window:

In [None]:
def pitchshift_gs_rt(x, alpha, grain_size, overlap=0.4):
    win, stride = tapering_window(grain_size, overlap)
    # resampling needs these many input samples to produce an output grain of the chosen size
    chunk_size = int(np.floor(grain_size + 1) * alpha)
    y = np.zeros(len(x))
    # input chunks and output grains are always aligned in pitch shifting (in_hop = out_hop = stride)
    for n in range(0, len(x) - max(chunk_size, grain_size), stride):
        y[n:n+grain_size] += resample(x[n:n+chunk_size], 1 / alpha) * win
    return y

In [None]:
grain_size = ms2n(100, ssf)
multiplay([ms, pitchshift_gs_rt(ms, semitone ** 2, grain_size), pitchshift_gs_rt(ms, semitone ** (-2), grain_size)], ssf, title=['music sample', 'up two semitones (GS-RT)', 'down two semitones (GS-RT)'])

### Microcontroller implementation
<img width="300" style="float: right;" src="img/microcontroller.gif">

[This gitbook](https://hwlab.learndsp.org/) shows how to implement a granular synthesis pitch shifter on a microcontroller:
 * very low memory 
 * all signal processing must be done in fixed point for efficiency

### Before DSP...

<img width="600" style="" src="img/pitchshift.jpg"> 

Although we have just described a purely digital version of grain-based pitch shifting, it is interesting to remark that, before digital audio was a reality, the only true pitch-shifting devices available to the music industry were extremely complex (and costly) mechanical devices that implemented, in analog, the same principle behind granular synthesis. 

Above is the block diagram of such a contraption: the original sound is recorded on the main tape spool, which is run at a speed that can vary with respect to the nominal recording speed to raise or lower the pitch. To compensate for these changes in speed the tape head is actually a rotating disk containing four individual coils; at any given time, at least two neighboring coils are picking up the signal from the tape, with an automatic fade-in and fade-out as they approach and leave the tape. The head disk rotates at a speed that compensates for the change in speed of the main tape, therefore keeping the timebase constant. The coils on the head disk picking up the signal are in fact producing overlapping "grains" that are mixed together in the output signal.  

# The phase vocoder

Simple granular synthesis has still a lot of quality issues. Let's explore (and try to solve) the key problem.

## The problem with "simple" granular synthesis

Granular synthesis seems to work well on non pitched sounds but not so well on sustained periodic sounds

In [None]:
grain_size = ms2n(15, ssf)
multiplay([cs, timescale_gs(cs, 1.5, grain_size)], ssf, title=['cymbal', 'stretched'])   

In [None]:
grain_size = ms2n(100, ssf)
multiplay([ks, timescale_gs(ks, 1.5, grain_size)], ssf, title=['clarinet', 'stretched'])   

It will be easier to investigate the problem if we use a simple sinusoid

In [None]:
test_sin = np.sin(2 * np.pi * (110 / ssf) * np.arange(0, ssf))
plt.plot(test_sin[:2000]);
Audio(test_sin, rate=ssf)

In [None]:
test_grain_size = 500
test_sin_scaled = timescale_gs(test_sin, 2, test_grain_size)
plt.plot(test_sin_scaled[1000:3000])
Audio(test_sin_scaled, rate=ssf)

The reason for the strange waveform becomes evident if we set the gain overlap to zero: phase jumps!

In [None]:
plt.plot(timescale_gs(test_sin, 2, test_grain_size, overlap=0)[0:2000])
for n in range(test_grain_size, 2000, test_grain_size):
    plt.axvline(n, color='red', alpha=0.4)

For a single sinusoid the solution is easy:
 * compute the instantaneous final phase of the $n$-th grain $\varphi_1(n)$ at the point of overlap
 * set the initial phase $\varphi_0(n+1)$ of the next sinusoidal grain equal to $\varphi_1(n)$
 
But music is more than a simple sinusoid...

## Phase adjustment in time stretching

Several approaches for real-world signals; the most well-known is the phase vocoder algorithm:
 
 * compute the DFT of the input grain (i.e. decompose the grain into a set of harmonic sinusoids)
 * change its phase so that it aligns to the phase of the previous grain at the point of overlap
 * rebuild the grain via an inverse DFT

### Example

Consider two segments of an audio signal and their superposition:

In [None]:
L, A, B = 2048, 60000, 61500
x1, x2 = ms[A:A+L], ms[B:B+L]
plt.plot(x1)
plt.plot(x2);

It is clear that they are relatively out of phase and that they will not blend well.

Consider now the same segments, but the phase of the second one has been set to that of the first: notice how the maxima and minima tend to align

In [None]:
plt.plot(x1)
plt.plot(np.real(np.fft.ifft(np.abs(np.fft.fft(x2)) * 1j * np.angle(np.fft.fft(x1)))));

### Phase propagation

Phase continuity is achieved via a time-varying a _global_ phase vector $\boldsymbol{\varphi}_n$ like so:
 * the phase of the current input grain $\boldsymbol{\theta}_0$ is computed
 * the phase $\boldsymbol{\theta}_S$ of a grain-sized input chunk shifted by $S$ samples is computed (i.e. the expected phase at the point of overlap)
 * the current grain is resynthesized using the global phase vector
 * the global phase vector is updates as  $\boldsymbol{\varphi}_{n+1} = \boldsymbol{\varphi}_n + (\boldsymbol{\theta}_S - \boldsymbol{\theta}_0)$

### The Hanning window

Phase estimation is numerically delicate. To minimize errors a Hanning window with 100% overlap is used both in the DFT computations and to blend grains together

In [None]:
def hanning_window(size):
    # make sure size is odd
    stride = size // 2
    size = 2 * stride + 1
    win = np.hanning(size)
    return win, stride, size

In [None]:
test_overlap(*hanning_window(100))

### The final algorithm

In [None]:
def timescale_gs_pv(x, alpha, grain_size):
    # we will use an odd-length Hanning window with 100% overlap
    win, stride, grain_size = hanning_window(grain_size)
    in_hop, out_hop = int(stride / alpha), stride
    # initialize output phase with phase of first grain
    phase = np.angle(np.fft.fft(x[0:grain_size]))
    y, ix, iy = np.zeros(int(alpha * len(x))), 0, 0    
    while ix < len(x) - 2 * grain_size and iy < len(y) - grain_size:
        # FFT of current grain
        grain_fft = np.fft.fft(win * x[ix:ix+grain_size])
        # phase of the grain at the point of intersection with next grain in the output 
        end_phase = np.angle(np.fft.fft(win * x[ix+out_hop:ix+out_hop+grain_size]))
        phase_diff = end_phase - np.angle(grain_fft)
        # compute rephased grain and add with overlap to output
        grain = np.real(np.fft.ifft(np.abs(grain_fft) * np.exp(1j * phase)))
        y[iy:iy+grain_size] += grain * win
        iy += out_hop
        ix += in_hop
        # update output phase for next grain and reduce modulo 2pi
        phase = phase + phase_diff  
        phase = phase - 2 * np.pi * np.round(phase / (2 * np.pi))
    return y

We can now test with the sinusoidal signal again

In [None]:
test_sin_scaled_pv = timescale_gs_pv(test_sin, 2, test_grain_size)
plt.plot(test_sin_scaled_pv[1000:3000])
multiplay([test_sin, test_sin_scaled, test_sin_scaled_pv], ssf, title=['test sinusoid', 'stretched with GS', 'stretched with GS-PV'])   

#### Speeding up music

In [None]:
alpha, grain_size = 0.8, ms2n(100, ssf)
multiplay([ms, timescale_gs(ms, alpha, grain_size), timescale_gs_pv(ms, alpha, grain_size)], ssf, title=['music sample', 'sped up with GS', 'sped up with GS-PV'])   

#### Slowing down music

In [None]:
alpha, grain_size = 2, ms2n(100, ssf)
multiplay([ms, timescale_gs(ms, alpha, grain_size), timescale_gs_pv(ms, alpha, grain_size)], ssf, title=['music sample', 'slowed down with GS', 'slowed down with GS-PV'])   

#### Speeding up speech

In [None]:
alpha, grain_size = 0.8, ms2n(40, ssf)
multiplay([ss, timescale_gs(ss, alpha, grain_size), timescale_gs_pv(ss, alpha, grain_size)], ssf, title=['speech sample', 'sped up with GS', 'sped up with GS-PV'])   

#### Slowing down speech

In [None]:
alpha, grain_size = 2, ms2n(60, ssf)
multiplay([ss, timescale_gs(ss, alpha, grain_size), timescale_gs_pv(ss, alpha, grain_size)], ssf, title=['speech sample', 'slowed down with GS', 'slowed down with GS-PV'])   

## Pitch shifting with the phase vocoder

### Basic implementation (two passes)

In [None]:
def pitchshift_gs_pv_rt(x, alpha, grain_size):
    return(resample(timescale_gs_pv(x, alpha, grain_size), 1 / alpha))

### Real-time implementation

Differences with the non-phase compensated implementation:
 * need to rescale two consecutive input chunks
 * need to explicitly compute two DFT's per output grain to compute the rescaled signal phase offset
 * initialize the global phase at zero

In [None]:
def pitchshift_gs_pv_rt(x, alpha, grain_size):
    win, stride, grain_size = hanning_window(grain_size)
    # resampling needs these many input samples to produce an output grain of the chosen size
    chunk_size = int(np.floor(grain_size + 1) * alpha)
    phase = np.zeros(grain_size)
    y = np.zeros(len(x))
    # input chunks and output grains are always aligned in pitch shifting (in_hop = out_hop = stride)
    for n in range(0, len(x) - 2 * max(chunk_size, grain_size), stride):
        # resample two contiguous chunks to compute the phase difference
        resampled_chunk = resample(x[n:n+chunk_size+chunk_size], 1 / alpha)
        grain_fft_curr = np.fft.fft(win * resampled_chunk[0:grain_size])
        grain_fft_next = np.fft.fft(win * resampled_chunk[stride:stride+grain_size])
        phase_diff = np.angle(grain_fft_next) - np.angle(grain_fft_curr)
        # resynthesize current grain
        grain = np.real(np.fft.ifft(np.abs(grain_fft_curr) * np.exp(1j * phase)))
        y[n:n+grain_size] += grain * win
        # update phase for next grain
        phase = phase + phase_diff  
        phase = phase - 2 * np.pi * np.round(phase / (2 * np.pi))
    return y

compare the difference between normal and phase-compensate pitch shifting

In [None]:
alpha, grain_size = semitone ** 2, ms2n(100, ssf)
multiplay([ms, pitchshift_gs_rt(ms, alpha, grain_size), pitchshift_gs_pv_rt(ms, alpha, grain_size)], ssf, 
          title=['original (music)', 'up 2 semitones (GS)', 'up 2 semitones (GS-PV)'])

# Autotune

The phase vocoder, when applied to a singing voice, still produces slighty unnatural-sounding speech.

In [None]:
alpha, grain_size = semitone ** 2, ms2n(100, ssf)
multiplay([ys, pitchshift_gs_pv_rt(ys, alpha, grain_size), pitchshift_gs_pv_rt(ys, 1/alpha, grain_size)], ssf, title=['vocal sample', 'up two semitones', 'down two semitones'])

## The source-filter model for voice
<img width="400" style="float: right; margin-right: 30px;" src="img/lpc.jpg"> 

Sources:
 * vibration of vocal cords (voiced sounds)
 * noise-like air flow (unvoiced sounds)
 
Filter:
 * vocal tract (larynx and mouth: time varying)
 * head and chest resonances (fixed)

In [None]:
# load a short voiced speech segment and plot its spectrum and the corresponding envelope
vs, vsf = load_audio('snd/voiced.wav');
# envelope will be computed explicitly later, here we just plot it
vs_env = np.fft.fft([1.0, -2.1793, 2.4140, -1.6790, 0.3626, 0.5618, -0.7047, 
                0.1956, 0.1872, -0.2878, 0.2354, -0.0577, -0.0815, 0.0946, 
                0.1242, -0.1360, 0.0677, -0.0622, -0.0306, 0.0430, -0.0169], len(vs));

In [None]:
%%capture --no-stderr --no-stdout voiced
_, __ = plot_spec(vs, vsf)
_, __ = plot_spec(np.abs(np.divide(1.0, vs_env)), vsf, do_fft=False)

The spectrum of a short voiced speech segment shows the harmonic nature of the sound and the overall envelope determined by the head and mouth

In [None]:
voiced.show();

Key observations:

 * the frequency response of the filter is independent of the excitation
 * to pitch-shift speech we must modify only the source
 * if we shift the envelope, the voice will sound unnatural

## LPC analysis

Premise: work on short voice segments so that both source and filter can be considered constant.

Voice production model:

$$
    X(z) = A(z)E(z)
$$

We need to find _both_  $A(z)$ and $E(z)$.

### The all-pole filter model

Resonances in the vocal tract are adequately captured by an all-pole filter

$$
  A(z) = \frac{1}{1 - \sum_{k=1}^{p}a_kz^{-k}}
$$ 

### The AR estimation problem

$$
  x[n] = \sum_{k=1}^{p}a_k x[n-k] + e[n]
$$

which becomes

$$
  e[n] = x[n] - \sum_{k=1}^{p}a_k x[n-k]
$$

The optimal solution can be found by minimizing $E[e^2[n]]$.

If $E[e^2[n]]$ is minimized, then $e[n]$ is orthogonal to $x[n]$ (see the [orhtogonality principle](https://en.wikipedia.org/wiki/Orthogonality_principle))

Orthogonality means no shared information between signals: we have separated excitation and source!

### The linear prediction coefficients

The coefficients of the filter $A(z)$ can be found as

$$
    \begin{bmatrix}
        r_0 & r_1 & r_2 & \ldots & r_{p-1} \\
        r_1 & r_0 & r_1 & \ldots & r_{p-2} \\        
        & & & \vdots \\
        r_{p-1} & r_{p-2} & r_{p-3} & \ldots & r_{0} \\
    \end{bmatrix}
    \begin{bmatrix}
        a_1 \\
        a_2 \\        
        \vdots \\
        a_{p}
    \end{bmatrix} = 
    \begin{bmatrix}
        r_1 \\
        r_2 \\        
        \vdots \\
        r_{p}
    \end{bmatrix}
$$

where $r$ is the biased autocorrelation of the $N$-point input data:

$$
  r_m = (1/N)\sum_{k = 0}^{N-m-1}x[k]x[k+m]
$$

Because of the Toeplitz structure of the autocorrelation matrix, the system of equations can be solved very efficiently using the Levinson-Durbin algorithm. Here is a direct implementation of the method:

In [None]:
def ld(r, p):
    # solve the toeplitz system using the Levinson-Durbin algorithm
    g = r[1] / r[0]
    a = np.array([g])
    v = (1. - g * g) * r[0];
    for i in range(1, p):
        g = (r[i+1] - np.dot(a, r[1:i+1])) / v
        a = np.r_[ g,  a - g * a[i-1::-1] ]
        v *= 1. - g*g
    # return the coefficients of the A(z) filter
    return np.r_[1, -a[::-1]]     

In [None]:
def bac(x, p):
    # compute the biased autocorrelation for x up to lag p
    L = len(x)
    r = np.zeros(p+1)
    for m in range(0, p+1):
        for n in range(0, L-m):
            r[m] += x[n] * x[n+m]
        r[m] /= float(L)
    return r

In [None]:
def lpc(x, p):
    # compute p LPC coefficients for a speech segment
    return ld(bac(x, p), p)

let's plot the previous figure again by direct computation

In [None]:
plot_spec(vs, vsf)
A = np.fft.fft(lpc(vs, 20), len(vs))
plot_spec(np.abs(np.divide(1.0, A)), vsf, do_fft=False);

## LPC-based pitch shifting

Simple autotune algorithm based on granular synthesis

For each input chunk:
 * compute the LPC coefficients for chunk
 * inverse-filter the chunk and recover the excitation signal
 * pitch-shift the excitation via resampling
 * forward-filter the shifted excitation to re-apply the original envelope.
 * combine the resulting grains using a tapered window

In [None]:
def pitchshift_gs_lpc_rt(x, alpha, grain_size, overlap=0.4, LPC_order=20):
    win, stride = tapering_window(grain_size, overlap)
    # size of input chunk before resampling
    chunk_size = int(np.floor(grain_size + 1) * alpha)
    filter_state = np.zeros(LPC_order)
    y = np.zeros(len(x))
    for n in range(0, len(x) - max(chunk_size, grain_size), stride):
        chunk = x[n:n+chunk_size]
        a = lpc(chunk, LPC_order)
        exc = sp.lfilter(a, [1], chunk)
        # this changes the length of exc from chunk_size to grain_size:
        exc = resample(exc, 1 / alpha)
        grain, filter_state = sp.lfilter([1], a, exc, zi=filter_state)
        y[n:n+grain_size] += grain * win
    return y

In [None]:
alpha, grain_size = semitone ** 2, ms2n(100, ssf)
multiplay([ys, pitchshift_gs_lpc_rt(ys, alpha, grain_size), pitchshift_gs_lpc_rt(ys, 1/alpha, grain_size)], ssf, title=['vocal sample', 'up two semitones', 'down two semitones'])

## Adding phase continuity

The current implementation can be improved by ensuring phase continuity in the excitation. For this we need to
 * consider a longer chunk that yields two grains
 * compute the LPC coefficients only for the first half
 * extract the excitation for the whole chunk
 * compute the phase difference in the excitation to update global phase
 * resynthesize the excitation using the global phase

In [None]:
def pitchshift_gs_lpc_pv_rt(x, alpha, grain_size, LPC_order=20):
    win, stride, grain_size = hanning_window(grain_size)
    # size of input chunk before resampling
    chunk_size = int(np.floor(grain_size + 1) * alpha)
    filter_state = np.zeros(LPC_order)
    phase = np.zeros(grain_size)
    y = np.zeros(len(x))
    for n in range(0, len(x) - 2 * max(chunk_size, grain_size), stride):
        a = lpc(x[n:n+chunk_size], LPC_order)
        exc = sp.lfilter(a, [1], x[n:n+chunk_size+chunk_size])
        exc = resample(exc, 1 / alpha)
        exc_fft_curr = np.fft.fft(win * exc[0:grain_size])
        exc_fft_next = np.fft.fft(win * exc[stride:stride+grain_size])
        phase_diff = np.angle(exc_fft_next) - np.angle(exc_fft_curr)
        grain_exc = np.real(np.fft.ifft(np.abs(exc_fft_curr) * np.exp(1j * phase)))
        grain, filter_state = sp.lfilter([1], a, grain_exc, zi=filter_state)
        y[n:n+grain_size] += grain * win
        phase = phase + phase_diff  
        phase = phase - 2 * np.pi * np.round(phase / (2 * np.pi))
    return y

In [None]:
alpha, grain_size = semitone ** 2, ms2n(100, ssf)
multiplay([ys, pitchshift_gs_lpc_rt(ys, alpha, grain_size), pitchshift_gs_lpc_pv_rt(ys, alpha, grain_size)], ssf, title=['vocal sample', 'up two semitones (GS-LPC)', 'up two semitones (GS-LPC-PV)'])

In [None]:
multiplay([ys, pitchshift_gs_lpc_rt(ys, 1 / alpha, grain_size), pitchshift_gs_lpc_pv_rt(ys, 1/alpha, grain_size)], ssf, title=['vocal sample', 'down two semitones (GS-LPC)', 'down two semitones (GS-LPC-PV)'])

## A simple vocoder

The LPC analysis can also be used to produce extremely artificial-sounding voices, as demonstrated here, where we replace the excitation siganl by a square wave of constant frequency. This is the type of sound created by the early [Vocoder](https://en.wikipedia.org/wiki/Vocoder) machines, for instance, and is still in use today to achieve some characteristic effects in popular music.

In [None]:
def vocoder(x, pitch, sf, grain_size_ms=0, LPC_order=20):
    grain_size = ms2n(grain_size_ms, sf) if grain_size_ms > 0 else ms2n(40, sf)
    w, ph = 2 * np.pi * pitch / sf, 0
    win, stride, grain_size = hanning_window(grain_size)
    filter_state = np.zeros(LPC_order)
    y = np.zeros(len(x))
    for n in range(0, len(x) - grain_size, stride):
        grain = x[n:n+grain_size]
        a = lpc(grain, LPC_order)
        if pitch < 0:
            exc = np.random.rand(grain_size) - 0.5
        elif pitch == 0:
            exc = np.r_[1, np.zeros(grain_size - 1)]
        else:
            exc = np.sign(np.sin(ph + w * np.arange(0, grain_size)))
            ph += w * stride
        grain, filter_state = sp.lfilter([1], a, exc, zi=filter_state)
        y[n:n+grain_size] += grain * win
    return y

In [None]:
multiplay([ss, vocoder(ss, -1, ssf), vocoder(ss, 140, ssf), vocoder(ss, 0, ssf)], ssf, title=['speech sample', 'whisper', 'daft', 'robot'])

In [None]:
multiplay([ys, vocoder(ys, -1, ssf), vocoder(ys, 140, ssf), vocoder(ys, 0, ssf)], ssf, title=['vocal sample', 'whisper', 'daft', 'robot'])