In [282]:
%matplotlib inline

In [283]:
import numpy as np
import matplotlib.pyplot as plt
import librosa as lb
import IPython.display as ipd
from scipy.signal import medfilt
import scipy.io as sio
import math
import pyaudio
import wave
import ipywidgets as widgets
import soundfile as sf
import io

### Implement TSM with dynamically changing time stretch factor in an offline setting 
In hw10 we implemented the following function
y = tsm_hybrid(x, alpha=1.0, sr=22050)
Where x is the input audio, y is the output audio, alpha is the global TSM factor, and sr is the sample rate
* You will implement the following function
y = tsm_hybrid_variable(x, sr, t_alpha, alpha)
x, sr, and y are same as above
alpha is a 1-d array specifying the desired TSM factors at different points in time
t_alpha is a 1-d array of the same size as alpha that specifies the time (in sec, in the timeline of the original unmodified audio recording) when the TSM factor should be changed/set; the first value of this array should always be zero (in order to set the initial TSM factor)
* You can think of t_alpha and alpha as a way of simulating a  person controlling the slider where they make a discrete number of changes over the length of the recording
E.g. if t_alpha = [0,2.5, 7.5] and alpha = [1.0, 2.0, 0.5], then the first 2.5 sec of the original audio should be played with no TSM, the next 7.5 - 2.5 = 5 sec of the original audio should be played with time stretch factor 2x, and the rest of the recording should be played with time stretch factor 0.5.

Use a simple example like this to verify that your code is working properly – you should be able to double check that the total duration is what is expected, and also listen to confirm that it is doing the time stretch properly


#### OLA Method

In [284]:
def tsm_overlap_add(x, alpha = 1.0, L = 220):
    '''
    Time stretches the input signal using the overlap-add method.  Uses an synthesis hop size that is half the
    value of L.
    
    Inputs
    x: the input signal
    alpha: the time stretch factor, which is defined as the ratio of the synthesis hop size to the analysis hop size
    L: the length of each analysis frame in samples
    
    Returns the time-stretched signal y.
    '''
    assert(L % 2 == 0), "Frame length must be even."
    Hs = L // 2
    
    # compute analysis frames
    Ha = int(np.round(Hs/alpha))
    numFrames = (len(x) - L) // Ha + 1
    analysisFrames = np.zeros((L, numFrames))
    for i in range(numFrames):
        offset = i * Ha
        analysisFrames[:, i] = x[offset: offset + L]
    
    # reconstruction
    synthesisFrames = analysisFrames * hann_window(L).reshape((-1,1)) # use broadcasting
    y = np.zeros(Hs * (numFrames-1) + L)
    for i in range(numFrames):
        offset = i * Hs
        y[offset:offset+L] += synthesisFrames[:,i]
    
    return y

In [285]:
def hann_window(L):
    w = .5 * (1 - np.cos(2*np.pi * np.arange(L)/ L))
    return w

### Phase Vocoder

In [286]:
def tsm_phase_vocoder(x, alpha = 1.0, L = 2048, sr = 22050):
    '''
    Time stretches the input signal using the phase vocoder method.  Uses a synthesis hop size that is one-fourth the
    value of L.
    
    Inputs
    x: the input signal
    alpha: the time stretch factor, which is defined as the ratio of the synthesis hop size to the analysis hop size
    L: the length of each analysis frame in samples
    sr: sampling rate
    
    Returns the time-stretched signal y.
    '''
    assert(L % 4 == 0), "Frame length must be divisible by four."
    Hs = L // 4
    
    # compute STFT
    Ha = int(np.round(Hs/alpha))
    window = hann_window(L)
    X = lb.core.stft(x, n_fft = L, hop_length = Ha, window=window, center=False)
    
    # compute modified STFT
    w_if = estimateIF(X, sr, Ha)
    phase_mod = np.zeros(X.shape)
    phase_mod[:,0] = np.angle(X[:,0])
    for i in range(1, phase_mod.shape[1]):
        phase_mod[:,i] = phase_mod[:,i-1] + w_if[:,i-1] * Hs / sr
    Xmod = np.abs(X) * np.exp(1j * phase_mod)
    
    # signal reconstruction
    y = invert_stft(Xmod, Hs, window)
    #y = lb.core.istft(Xmod, hop_length=Hs, center=False)
    
    return y

In [287]:
def estimateIF(S, sr, hop_samples):
    '''
    Estimates the instantaneous frequencies in a STFT matrix.
    
    Inputs
    S: the STFT matrix, should only contain the lower half of the frequency bins
    sr: sampling rate
    hop_samples: the hop size of the STFT analysis in samples
    
    Returns a matrix containing the estimated instantaneous frequency at each time-frequency bin.
    This matrix should contain one less column than S.
    '''
    hop_sec = hop_samples / sr
    fft_size = (S.shape[0] - 1) * 2
    w_nom = np.arange(S.shape[0]) * sr / fft_size * 2 * np.pi
    w_nom = w_nom.reshape((-1,1))    
    unwrapped = np.angle(S[:,1:]) - np.angle(S[:,0:-1]) - w_nom * hop_sec
    wrapped = (unwrapped + np.pi) % (2 * np.pi) - np.pi
    w_if = w_nom + wrapped / hop_sec
    return w_if

In [288]:
def invert_stft(S, hop_length, window):
    '''
    Reconstruct a signal from a modified STFT matrix.
    
    Inputs
    S: modified STFT matrix
    hop_length: the synthesis hop size in samples
    window: an array specifying the window used for FFT analysis
    
    Returns a time-domain signal y whose STFT is closest to S in squared error distance.
    '''
    
    L = len(window)
    
    # construct full stft matrix
    fft_size = (S.shape[0] - 1) * 2
    Sfull = np.zeros((fft_size, S.shape[1]), dtype=np.complex64)
    Sfull[0:S.shape[0],:] = S
    Sfull[S.shape[0]:,:] = np.conj(np.flipud(S[1:fft_size//2,:]))
    
    # compute inverse FFTs
    frames = np.zeros_like(Sfull)
    for i in range(frames.shape[1]):
        frames[:,i] = np.fft.ifft(Sfull[:,i])
    frames = np.real(frames) # remove imaginary components due to numerical roundoff
    
    # synthesis frames
    num = window.reshape((-1,1))
    den = calc_sum_squared_window(window, hop_length)
    #den = np.square(window) + np.square(np.roll(window, hop_length))
    frames = frames * window.reshape((-1,1)) / den.reshape((-1,1))
    #frames = frames * window.reshape((-1,1))
    
    # reconstruction
    y = np.zeros(hop_length*(frames.shape[1]-1) + L)
    for i in range(frames.shape[1]):
        offset = i * hop_length
        y[offset:offset+L] += frames[:,i]
    
    return y

In [289]:
def calc_sum_squared_window(window, hop_length):
    '''
    Calculates the denominator term for computing synthesis frames.
    
    Inputs
    window: array specifying the window used in FFT analysis
    hop_length: the synthesis hop size in samples
    
    Returns an array specifying the normalization factor.
    '''
    assert (len(window) % hop_length == 0), "Hop length does not divide the window evenly."
    
    numShifts = len(window) // hop_length
    den = np.zeros_like(window)
    for i in range(numShifts):
        den += np.roll(np.square(window), i*hop_length)
        
    return den

### Hybrid method

In [290]:
def harmonic_percussive_separation(x, sr=22050, fft_size = 2048, hop_length=512, lh=6, lp=6):
    
    window = hann_window(fft_size)
    X = lb.core.stft(x, n_fft=fft_size, hop_length=512, window=window, center=False)
    Y = np.abs(X)
    Yh = medfilt(Y, (1, 2*lh+1))
    Yp = medfilt(Y, (2*lp+1, 1))
    Mh = (Yh > Yp)
    Mp = np.logical_not(Mh)
    Xh = X * Mh
    Xp = X * Mp
    xh = invert_stft(Xh, hop_length, window)
    xp = invert_stft(Xp, hop_length, window)
    
    return xh, xp, Xh, Xp

In [291]:
def mix_recordings(x1, x2):
    min_length = min(len(x1), len(x2))
    y = .5 * (x1[0:min_length] + x2[0:min_length])
    return y

In [292]:
def tsm_hybrid(x, alpha=1.0, sr=22050):
    '''
    Time stretches the input signal using a hybrid method that combines overlap-add and phase vocoding.
    
    Inputs
    x: the input signal
    alpha: the time stretch factor, which is defined as the ratio of the synthesis hop size to the analysis hop size
    sr: sampling rate
    
    Returns the time-stretched signal y.
    '''
    
    xh, xp, _, _ = harmonic_percussive_separation(x)
    xh_stretched = tsm_phase_vocoder(xh, alpha)
    xp_stretched = tsm_overlap_add(xp, alpha)
    y = mix_recordings(xh_stretched, xp_stretched)
    
    return y

### Including dynamic variable

#### tsm_ola_variable, applying OLA when we have a dynamic variable $\alpha$

In [293]:
choir, c_sr = lb.load("choir.wav")
beatbox, bb_sr = lb.load("beatbox.wav")

In [294]:
def tsm_ola_variable(x, sr, t_alpha, alpha, L=220):
    '''
    Time stretches the input signal using overlap-add with variable stretch factors.
    
    Inputs:
    x: input signal
    sr: sample rate
    t_alpha: array of time points (seconds) when stretch factors change
    alpha: array of stretch factors corresponding to t_alpha
    L: frame length (must be even)
    
    Returns time-stretched signal y
    '''
    assert L % 2 == 0, "Frame length must be even."
    Hs = L // 2
    
    # Convert time points to samples
    t_alpha_samples = [int(t * sr) for t in t_alpha]
    t_alpha_samples.append(len(x))  # Add end of signal as final boundary
    
    # Initialize
    current_alpha_idx = 0
    curr_offset = 0
    analysis_frames = []
        
    # Analysis phase
    while curr_offset <= len(x) - L:
        current_alpha = alpha[current_alpha_idx]
        Ha = int(np.round(Hs / current_alpha))

        # Check if next window would cross boundary
        if curr_offset + Ha > t_alpha_samples[current_alpha_idx + 1]:
            current_alpha_idx += 1
        
        # Extract and store frame
        frame = x[curr_offset:curr_offset + L]
        analysis_frames.append(frame)        
        # Advance position
        curr_offset += Ha
    
    num_frames = len(analysis_frames)
    if num_frames == 0:
        return np.zeros(0)
    
    analysis_frames = np.array(analysis_frames).T
    print(analysis_frames.shape)
    # Synthesis phase
    synthesis_frames = analysis_frames * hann_window(L).reshape((-1, 1))
    y = np.zeros(Hs * (num_frames-1) + L)
    
    for i in range(num_frames):
        offset = i * Hs
        y[offset:offset + L] += synthesis_frames[:, i]
    
    return y

In [295]:
def hann_window(L):
    w = .5 * (1 - np.cos(2*np.pi * np.arange(L)/ L))
    return w

In [314]:
t_alpha = [0, 2.5, 7.5]
alpha = [0.5, 1, 2]
y = tsm_ola_variable(beatbox, bb_sr, t_alpha, alpha)

(220, 2251)


In [297]:
ipd.Audio(y, rate=bb_sr)

In [298]:
choir_fast = tsm_ola_variable(choir, c_sr, t_alpha, alpha)

(220, 2925)


In [299]:
ipd.Audio(choir_fast, rate=c_sr)

#### tsm_phase_vocoder_variable, applying phase vocoder when we have a dynamic variable $\alpha$

In [300]:
def storeAnalysisFrames(x, sr, t_alpha, alpha, Hs, L):
    t_alpha_samples = [int(t * sr) for t in t_alpha]
    t_alpha_samples.append(len(x))  # Add end of signal as final boundary
    curr_offset = 0
    while curr_offset <= len(x) - L:
        current_alpha = alpha[current_alpha_idx]
        Ha = int(np.round(Hs / current_alpha))

        # Check if next window would cross boundary
        if curr_offset + Ha > t_alpha_samples[current_alpha_idx + 1]:
            current_alpha_idx += 1
        frame = x[curr_offset: curr_offset+L]
        


store the previous frame

In [301]:
# def tsm_phasevocoder_variable(x, sr, t_alpha, alpha, L=2048):
    
#     Hs = L // 4
#     Ha = int(np.round(Hs/alpha[0]))
#     window = hann_window(L)

#     # Convert time points to samples
#     t_alpha_samples = [int(t * sr) for t in t_alpha]
#     t_alpha_samples.append(len(x))  # Add end of signal as final boundary

#     # Initialize
#     current_alpha_idx = 0
#     curr_offset = 0
#     analysis_frames = []
#     window = hann_window(L)

        
#     # Analysis phase
#     while curr_offset <= len(x) - L:
#         current_alpha = alpha[current_alpha_idx]
#         Ha = int(np.round(Hs / current_alpha))
#         previous_fft = None

#         # Check if next window would cross boundary
#         if curr_offset + Ha > t_alpha_samples[current_alpha_idx + 1]:
#             current_alpha_idx += 1
#         frame = x[curr_offset: curr_offset+L]
#         X = lb.core.stft(frame, n_fft = L, hop_length = Ha, window=window, center=False)
#         print(X.shape)
#         w_if = estimateIF(X, sr, Ha)
#         print(w_if)
        
#         phase_mod = np.zeros(X.shape)
#         phase_mod[:,0] = np.angle(X[:,0])
#         for i in range(1, phase_mod.shape[1]):
#             phase_mod[:,i] = phase_mod[:,i-1] + w_if[:,i-1] * Hs / sr
#         Xmod = np.abs(X) * np.exp(1j * phase_mod)

#         # Extract and store frame
#         x_mod = getSynthFrame(Xmod, window) #getSynthFrame outputs inverse FFT of each frame
#         analysis_frames.append(x_mod.flatten())
        
#         # Advance position
#         curr_offset += Ha

#     num_frames = len(analysis_frames)

#     #Reconstruction

#     analysis_frames = np.array(analysis_frames).T
#     den = calc_sum_squared_window(window, Hs)
#     synthesis_frames = analysis_frames * window.reshape((-1, 1)) / den.reshape((-1,1))

#     print(synthesis_frames.shape)

#     y = np.zeros(Hs * (num_frames-1) + L)

#     for i in range(num_frames):
#         offset = i * Hs
#         y[offset:offset + L] += synthesis_frames[:, i]
    
#     return y

In [None]:
import numpy as np

def tsm_phasevocoder_variable(x, sr, t_alpha, alpha, L=2048):
    """
    Phase‐vocoder time‐scale modification with piecewise constant speed factors.
    
    Parameters
    ----------
    x : np.ndarray
        Input audio signal (1D array of samples).
    sr : int
        Sampling rate in Hz.
    t_alpha : list of float
        Time instants (in seconds) at which the speed factor changes.
        Must be the same length as `alpha`, e.g. [0, 2.5, 7.5].
    alpha : list of float
        Speed factors corresponding to each segment defined by `t_alpha`.
        For example, [0.75, 1.0, 1.25] means:
          - from 0 to 2.5 s, play at 0.75× speed (slower),
          - from 2.5 to 7.5 s, play at 1.0× speed (original),
          - from 7.5 s to end, play at 1.25× speed (faster).
    L : int, optional
        FFT size and window length (defaults to 2048 samples).
    
    Returns
    -------
    y : np.ndarray
        Time‐scaled output signal.
    """
    Hs = L // 4
    window = np.hanning(L)  # Hann window of length L

    t_alpha_samples = [int(t * sr) for t in t_alpha] + [len(x)]

    # output buffer with worst‐case length
    est_len = int(len(x) / min(alpha)) + L # maximum output length is roughly len(x) / min(alpha) + L
    y = np.zeros(est_len) # allocate a zeros array of that max size and trim later

    prev_fft    = None # stores the FFT of the previous frame (None for first frame)
    prev_phase  = np.zeros(L//2 + 1) # running synthesis phase for each frequency bin
    write_pos   = 0 # current write pointer in the output y
    read_pos    = 0 # current read pointer in the input x
    seg_idx     = 0 # which segment (alpha[seg_idx]) we’re in

    # precompute nominal angular frequencies for each FFT bin
    omega_nom = np.arange(L//2 + 1) * 2 * np.pi * sr / L

    # main processing loop
    while read_pos + L <= len(x): # stop when not enough frame for entire window
        if read_pos >= t_alpha_samples[seg_idx + 1]:
            seg_idx += 1 # if we’ve crossed the next time boundary, increment seg_idx.
        a  = alpha[seg_idx] #the speed factor for this frame.
        Ha = int(round(Hs / a)) # Ha length in terms of Hs --> output_length approx input_length * (Ha / Hs)

        # extract, window, and FFT the current frame
        frame = x[read_pos : read_pos + L] * window
        S     = np.fft.rfft(frame)

    # estimate IF
        if prev_fft is None: # nothing for first frame
            w_if = np.zeros_like(omega_nom)
        else:
            
            dphi = np.angle(S) - np.angle(prev_fft) # phase difference between current & previous FFT
            dphi = dphi - omega_nom * (Ha / sr)
            
            dphi = (dphi + np.pi) % (2 * np.pi) - np.pi # wrap to [-π, π] to undo any 2π jumps
            w_if = omega_nom + dphi * (sr / Ha) # IF = nom + correction
 
        prev_phase = prev_phase + w_if * (Hs / sr) # accumulates phase after every time boundary

        # build the modified spectrum & inverse FFT back to time domain
        X_mod    = np.abs(S) * np.exp(1j * prev_phase)
        frame_mod = np.fft.irfft(X_mod)

        # add to output vector
        y[write_pos : write_pos + L] += frame_mod * window

        # advance pointers, store current FFT for next IF calc
        read_pos  += Ha
        write_pos += Hs
        prev_fft   = S

    #trim the output to the actual synthesized length
    return y[:write_pos + L]


In [304]:
def getSynthFrame(S, window):
    L = len(window)
    
    # construct full stft matrix
    fft_size = (S.shape[0] - 1) * 2
    Sfull = np.zeros((fft_size, S.shape[1]), dtype=np.complex64)
    Sfull[0:S.shape[0],:] = S
    Sfull[S.shape[0]:,:] = np.conj(np.flipud(S[1:fft_size//2,:]))
    
    # compute inverse FFTs
    frames = np.zeros_like(Sfull)
    for i in range(frames.shape[1]):
        frames[:,i] = np.fft.ifft(Sfull[:,i])
    frames = np.real(frames) # remove imaginary components due to numerical roundoff
    return frames
    

In [305]:
def estimateIF_single(prev_X, curr_X, sr, hop_samples):
    """
    Estimate instantaneous frequency given two consecutive FFT frames.
    Inputs:
      prev_X: FFT of previous frame.
      curr_X: FFT of current frame.
      sr: Sampling rate.
      hop_samples: Analysis hop size in samples.
    Returns:
      w_if: Instantaneous frequency for the current frame (per bin).
    """
    hop_sec = hop_samples / sr
    fft_size = (prev_X.shape[0] - 1) * 2
    w_nom = np.arange(prev_X.shape[0]) * sr / fft_size * 2 * np.pi
    unwrapped = np.angle(curr_X) - np.angle(prev_X) - w_nom * hop_sec
    wrapped = (unwrapped + np.pi) % (2 * np.pi) - np.pi
    w_if = w_nom + wrapped / hop_sec
    return w_if

In [315]:
tryout = tsm_phasevocoder_variable(choir, c_sr, t_alpha, alpha)

In [316]:
ipd.Audio(tryout, rate=c_sr)

You will want to keep track of where you are in the original audio recording (i.e. the sample offset where you are currently located), this will form the outer loop of the algorithm:
 - Figure out what data needs to be accessed in order to compute the next chunk of the output y
- Then process just that chunk and append the result to y
In order to make this more efficient, think about what can be pre-computed beforehand and what has to be computed in real-time


In order to make this more efficient, think about what can be pre-computed beforehand and what has to be computed in real-time
- E.g. you have access to the whole recording beforehand, but you don’t know the desired TSM factor that will be adjusted in real-time (though at this stage we will assume that it is constant)
- So you could, for example, pre-compute the STFT beforehand
- We may also want to consider approximations that will greatly speed up the runtime efficiency
    - e.g. you could pre-compute the STFT and instantaneous frequencies beforehand for the whole recording; at runtime the exact current sample location may not match exactly with the offset of the pre-computed STFT (since the analysis hop size will vary depending on the current TSM factor) but it could be a close enough approximation to just find the nearest neighbor and use its pre-computed instantaneous frequency values.  This approach will not be exactly the same as an offline implementation, but it could be much faster at runtime since a lot of the work could be pre-computed beforehand.  Let’s be open to trying such approximations and comparing the quality of the generated time domain signals.
- Once you implement a simulated real-time version of this, you can compare the output to the hw10 offline implementations – they should match either exactly or very close (they should be indistinguishable to the human ear)
    - Have demos of the hw10 offline implementations and your simulated real-time implementation to verify that they match in behavior
- Find most commonly used packages for real-time audio processing
    - Present pros/cons of different options
    - Select what you think is the best option and justify your choice
    - Show code example(s) of real-time audio processing with that package


In [308]:
def tsm_ola_simRealTime(x, sr, alpha):





    return None

In [309]:
def tsm_phaseVocoder_simRealTime(x, sr, alpha):





    return None

In [310]:
def tsm_hybrid_simRealTime(x, sr, alpha):




    return None