# Audio Repair
Workbook for replicating the functionality of Audacity to repair audio sections based on interpolation of adjacent buffered inputs.

In [17]:
# Imports
import librosa
import numpy as np
import pandas as pd
from numpy.linalg import inv
import soundfile as sf
import os
import time 
import warnings
warnings.filterwarnings('ignore')

# Set a variable for current notebook's path for various loading/saving mechanisms
nb_path = os.getcwd()

Below, I'm tempted to replace the "linear_interpolate_audio" function with a cosine interpolation function. Since the chunks are extremely small, I expect filling them with a smooth curve will hopefully have minimal impact? I've generally found that this is the typical result of Audactiy's "repair" function, and that alternately simply deleting chunks from the start of the click up to a sample with equal amplitude on a successive wave also usually results in clean audio. There is the opportunity to fill the space with a cosine function. 

$ {amplitude \over 2} \cdot cos({duration \over π }) + c $

amplitude: the sample value before the start of the chunk minus the sample value after.  
duration: the length of the chunk in samples.   
c: the mean of the sample value before and after the chunk.

This way, the space will always be filled with a smooth curve of samples for its duration, which will also reflect its direction (being a positive or negative curve), size and length. Because the cosine starts at the peak (or trough) of a wave, it will smoothly transition to the other end of the filled section, and by adjusting the length with π, it will always end at a trough (or peak) as well.


In [2]:
def linear_interpolate_audio(buffer, first_bad, num_bad):
    '''
    Output: This function modifies the buffer array in-place by performing linear 
    interpolation on the specified range of bad samples. After the function is executed, 
    the bad samples in the buffer will be replaced with interpolated values.
    
    buffer: The array of sound samples.
    first_bad: The index of the first bad sample in the buffer.
    num_bad: The number of consecutive bad samples starting from first_bad.
    '''
    
    decay = 0.9

    if first_bad == 0:
        # If there are no samples before the bad section, take a linear function from the last "bad" sample 
        # and the one immediately following
        delta = buffer[num_bad] - buffer[num_bad + 1]
        value = buffer[num_bad]
        i = num_bad - 1
        while i >= 0:
            value += delta
            buffer[i] = value
            value *= decay
            delta *= decay
            i -= 1
    elif first_bad + num_bad == len(buffer):
        delta = buffer[first_bad - 1] - buffer[first_bad - 2]
        value = buffer[first_bad - 1]
        i = first_bad
        while i < first_bad + num_bad:
            value += delta
            buffer[i] = value
            value *= decay
            delta *= decay
            i += 1
    else:
        v1 = buffer[first_bad - 1]
        v2 = buffer[first_bad + num_bad]
        value = v1
        delta = (v2 - v1) / (num_bad + 1)
        i = first_bad
        while i < first_bad + num_bad:
            value += delta
            buffer[i] = value
            i += 1

In [13]:
# Translation from C to Python of Audactiy Interpolation function
def interpolate_audio(buffer, first_bad, num_bad):
    '''
    Output: This function modifies the buffer array in-place by performing interpolation 
    using the LSAR (Least Squares AutoRegression) algorithm. The bad samples in the specified 
    range will be replaced with interpolated values based on the surrounding audio data. 
    If the LSAR algorithm fails (due to a singular matrix or other conditions), it falls back 
    to using linear_interpolate_audio to perform linear interpolation.
    
    buffer: The array of sound samples.
    first_bad: The index of the first bad sample in the buffer.
    num_bad: The number of consecutive bad samples starting from first_bad.

    '''

    len_buffer = len(buffer)
    
    if num_bad >= len_buffer:
        return  # Should never have been called!

    if first_bad == 0:
        # Reverse the buffer and interpolate in the reversed order
        reversed_buffer = np.flip(buffer)
        interpolate_audio(reversed_buffer, len_buffer - first_bad, num_bad)
        buffer[:] = np.flip(reversed_buffer)
        return

    s = np.array(buffer)

    # Define the order of the autoregression calculation
    # (The order of an autoregression is the number of immediately preceding values 
    # in the series that are used to predict the value at the present time).
    # Determined based on number of bad samples and available data on either 
    # side of the section.
    # Generally prefer a smaller window for simplicity.
    P = min(min(num_bad * 3, 50), max(first_bad - 1, len_buffer - (first_bad + num_bad) - 1))

    # If the length of the bad sample is 1 or the window the size of the available array, 
    # use linear interpolation
    if P < 3 or P >= len_buffer:
        linear_interpolate_audio(buffer, len_buffer, first_bad, num_bad)
        return

    # Add a small amount of random noise to the input signal
    # this sounds like a bad idea, but the amount we're adding
    # is only about 1 bit in 16-bit audio, and it's an extremely
    # effective way to avoid nearly-singular matrices.  If users
    # run it more than once they get slightly different results;
    # this is sometimes even advantageous.
    s += np.random.randn(len_buffer) / 10000.0

    # Solve for the best autoregression coefficients using least-squares fit 
    # to all of the non-bad
    X = np.zeros((P, P))
    b = np.zeros(P)

    # Create matrices used to solve LS function
    # Note, this iterates through the entire input buffer, so 
    # make sure to limit this input appropiately!
    for i in range(len_buffer - P):
        if i + P < first_bad or i >= (first_bad + num_bad):
            X += np.outer(s[i:i+P], s[i:i+P])
            b += s[i+P] * s[i:i+P]

    
    Xinv = np.linalg.inv(X)
    if np.isnan(Xinv).any():
        # The matrix is singular!  Fall back on linear...
        # In practice I have never seen this happen if
        # we add the tiny bit of random noise.
        linear_interpolate_audio(buffer, len_buffer, first_bad, num_bad)
        return

    a = np.dot(Xinv, b)

    # Create the autoregressive matrix
    A = np.zeros((len_buffer - P, len_buffer))
    for row in range(len_buffer - P):
        A[row, row:row+P] = -a
        A[row, row+P] = 1

    # Split the autoregressive matrix and the signal
    # "u" is for unknown (bad)
    # "k" is for known (good)
    Au = A[:, first_bad:first_bad+num_bad]
    A_left = A[:, :first_bad]
    A_right = A[:, first_bad+num_bad:]
    Ak = np.concatenate((A_left, A_right), axis=1)
    
    s_left = s[:first_bad]
    s_right = s[first_bad+num_bad:]
    sk = np.concatenate((s_left, s_right))

    # Do some linear algebra to find the best possible
    # values that fill in the "bad" area
    AuT = np.transpose(Au)
    X1 = np.dot(AuT, Au)
    try:
        X2 = np.linalg.inv(X1)
    except np.linalg.LinAlgError:
        # The matrix is singular! Fall back on linear...
        linear_interpolate_audio(buffer, len_buffer, first_bad, num_bad)
        return

    X2b = X2 * -1.0
    X3 = np.dot(X2b, AuT)
    X4 = np.dot(X3, Ak)
    # This vector contains our best guess as to the
    # unknown values
    su = np.dot(X4, sk)
    
    # Update the buffer with the interpolated values
    s = buffer
    s[first_bad:first_bad+num_bad] = su

    return s


In [4]:
test1, sr = librosa.load('{}\WAV\\test 7s.wav'.format(nb_path), sr=None)

In [5]:
len(test1)

330972

In [6]:
# Set parameters for test section to fix
bad_start_t = 0.234467
bad_end_t = 0.240453514
bad_start_s = int(bad_start_t * sr)
bad_end_s = int(bad_end_t * sr)
num_bad = bad_end_s - bad_start_s
num_bad

264

In [7]:
# get subsection of test1 that is the bad window, plus the length of the bad window + 50 either way
bad_plus_buffer = test1[(bad_start_s - num_bad -50):(bad_end_s + num_bad + 50)]
len(bad_plus_buffer)

892

In [8]:
outputs = pd.DataFrame(bad_plus_buffer)

In [9]:
outputs.rename(columns={0: "Base"}, inplace=True)

In [10]:
outputs['base'] = test1[(bad_start_s - num_bad -50):(bad_end_s + num_bad + 50)]

In [11]:
outputs['Redone'] = interpolate_audio(bad_plus_buffer, num_bad + 50, num_bad)

-0.0010681152
314
-0.0013182430164642499
-0.001318243


In [12]:
outputs[310:320]

Unnamed: 0,Base,base,Redone
310,-0.001129,-0.001129,-0.001129
311,-0.000961,-0.000961,-0.000961
312,-0.001434,-0.001434,-0.001434
313,-0.001068,-0.001068,-0.001068
314,-0.001318,-0.001068,-0.001318
315,-0.001286,-0.000641,-0.001286
316,-0.001451,-0.000977,-0.001451
317,-0.001372,-0.001099,-0.001372
318,-0.001431,-0.000793,-0.001431
319,-0.001486,-0.00116,-0.001486


Redone seems to be modifying the input array in place, which seems... bad? I know that's in the function definition but it's bloody annoying.

In [73]:
outputs.to_csv('{}\hope.csv'.format(nb_path))

In [21]:
def gross_mag_in_threshold(chunk, sr=44100, band_start=5000, band_end=10000):
    """Calculate the total intensity of frequencies within a chunk using Fourier Transform.
    chunk - array of longs - a librosa loaded sound object containing sound data to be analyzed
    band_start - int - the minimum frequency band to include
    band_end - int - the maximum frequency band to include
    """
    stft = librosa.stft(chunk, n_fft=len(chunk))
    
    # Get the frequency bin indices
    freq_bins = librosa.fft_frequencies(sr=sr, n_fft=len(chunk))

    # Get the magnitude of each frequency band by averaging each element returned in stft
    magnit = np.abs(stft)
    av_mag = np.mean(magnit, axis=1)
    gross_mag = np.sum(av_mag * (freq_bins > band_start) * (freq_bins < band_end))
    return gross_mag


def tick_detector(chunks, start_times, chunk_ms=4, threshold_factor=3, band_start=5000, band_end=10000, sr=44100):
    """Returns a pandas DataFrame presenting timestamps within a sound file that contain ticks.
    chunks - array of overlapping librosa loaded sound objects containing sound data to be analyzed
    start_times - list of longs - timestamps (in seconds) of the time where each chunk begins
    chunk_ms - length (in milliseconds) of each chunk
    threshold_factor - long - how much a chunk has to exceed its neighbors by to indicate a likely tick
    band_start - int - the minimum frequency band to include
    band_end - int - the maximum frequency band to include
    sr - int - sample rate of the chunks
    """
    flags = np.zeros(len(chunks), dtype=bool) #initialize array with all False values
    
    chunk_size = int(sr * (chunk_ms / 1000))  # chunk size in samples

    chunk_mags = np.array([gross_mag_in_threshold(chunk, band_start=band_start, band_end=band_end, sr=sr)
                                   for chunk in chunks])
    
    # set array for comparable chunks (e.g. chunk n can only be compared to n-4 and n+4)
    prev_chunks = chunks[:-(2 * chunk_ms)]
    current_chunks = chunks[chunk_ms:-chunk_ms]
    next_chunks = chunks[(2 * chunk_ms):]

    prev_frequency_mag = chunk_mags[:-(2 * chunk_ms)]
    current_frequency_mag = chunk_mags[chunk_ms:-chunk_ms]
    next_frequency_mag = chunk_mags[(2 * chunk_ms):]
    
    flags[chunk_ms:-chunk_ms] = (current_frequency_mag > threshold_factor * prev_frequency_mag) & \
                                (current_frequency_mag > threshold_factor * next_frequency_mag)

    df = pd.DataFrame({'Start Time': start_times, 'Flag': flags})
    
    # Filter the flagged chunks
    mouth_sounds = df.loc[df['Flag'] == True].copy()
    # Calculate the end times - need to use chunk_size because of rounding issues
    mouth_sounds['End Time'] = mouth_sounds['Start Time'] + (chunk_size / sr)
    # Reset the index
    mouth_sounds.reset_index(drop=True, inplace=True)

    return mouth_sounds


def generate_mouth_sounds_labels(filepath, output_as = 'df', output_file_name = 'output_file.txt', sample_rate=None):
    '''
    Analyses a given input soundfile for potential mouthsounds and returns a table of start and end times of 
    identified bad sections. By default this is returned as a pandas dataframe for further processing by other
    functions, but can also generate a tab delimited text file for import into Audacity.
    filepath - string - Path to the file to be analysed
    output_as - string - Will output as a text file (instead of a dataframe) if specified as "labels"
    output_file_name - string - Name of the file to be created if output_as is set to "labels"
    sample_rate - int - The specified sample rate to use in analysing the input file. 
                        If left blank it will use the file's native rate.
    '''
    kickoff_time = time.time()
    print(time.strftime('%X %x %Z'))
    
    test1, sr = librosa.load(filepath, sr=sample_rate)
    
    # cutting it into chunks of "chunk_ms" length each starting 1ms after the other
    chunk_ms = 4
    chunk_size = int(sr * (chunk_ms / 1000))
    hop_size = int(sr * 0.001)
    
    chunks = np.array([test1[i:i+chunk_size] for i in range(0, len(test1) - chunk_size, hop_size)])
    start_times = np.arange(0, len(chunks) * hop_size / sr, hop_size / sr)

    df_2 = tick_detector(chunks, start_times, threshold_factor=2, band_end=12000, sr=sr)
    
    # Initialize the consolidated dataframe
    consolidated_chunks = pd.DataFrame(columns=['Start Time', 'End Time'])
    
    # Initialize variables for tracking the current chunk
    current_start = None
    current_end = None

    # Iterate through each row in the dataframe
    for index, row in df_2.iterrows():
        if current_start is None:
            # If it's the first chunk, set the current chunk
            current_start = row['Start Time']
            current_end = row['End Time']
        elif row['Start Time'] <= current_end:
            # If the current chunk overlaps with the next chunk, extend the current chunk
            current_end = row['End Time']
        else:
            # If the next chunk is not overlapping, add the consolidated chunk to the dataframe
            consolidated_chunks = consolidated_chunks.append({'Start Time': current_start, 'End Time': current_end},
                                                             ignore_index=True)
            # Set the next chunk as the current chunk
            current_start = row['Start Time']
            current_end = row['End Time']

    # Add the last consolidated chunk to the dataframe
    consolidated_chunks = consolidated_chunks.append({'Start Time': current_start, 'End Time': current_end},
                                                     ignore_index=True)
    
    
    # iterate over each chunk 4 steps each delayed 0.5ms to find the maximum energy in high frequency bands to narrow
    # down the starting point. This part is ripe for optimisation. I had 1 attempt but it didn't really work out.
    
    # Iterate over each row in the dataframe
    for index, row in consolidated_chunks.iterrows():
        start_time = row['Start Time']
        end_time = row['End Time']

        # Find the corresponding samples for the start and end times
        start_sample = int(start_time * sr)
        end_sample = int(end_time * sr)

        # Extract the chunk from the original loaded .wav object
        chunk = test1[start_sample:end_sample]

        # Create four new chunks with staggered start times (0.5ms apart)
        chunk_offsets = [0, int(0.5e-3 * sr), int(1e-3 * sr), int(1.5e-3 * sr)]
        chunks = [chunk[offset:] for offset in chunk_offsets]

        # Calculate the energy for each chunk within the desired frequency band
        energies = [gross_mag_in_threshold(chunk, sr=sr) for chunk in chunks]

        # Find the index of the chunk with the maximum energy
        max_energy_index = max(0, np.argmax(energies)-1)

        # Update the start time based on the selected chunk
        new_start_time = start_time + chunk_offsets[max_energy_index] / sr

        # Update the dataframe with the new start time
        consolidated_chunks.loc[index, 'Start Time'] = new_start_time
    
    if output_as == 'labels':
        #export the dataframe to a tab delimited text file for import into Audacity
        consolidated_chunks['Index'] = consolidated_chunks.index

        consolidated_chunks.to_csv(output_file_name, sep='\t', columns=['Start Time', 'End Time', 'Index'], index=False)
        print(time.strftime('%X %x %Z'))
        print("time taken: --- %s seconds ---" % (time.time() - kickoff_time))
        return
    
    print(time.strftime('%X %x %Z'))
    print("time taken: --- %s seconds ---" % (time.time() - kickoff_time))
    return consolidated_chunks


In [14]:
def interpolate_subsection(audio_samples, start_time, end_time, sample_rate, sub_buffer_length):
    # Load the audio file
    audio = audio_samples
    sr = sample_rate

    # Calculate the start and end indices of the subsection
    start_index = int(start_time * sample_rate)
    end_index = int(end_time * sample_rate)

    # Determine the additional samples to include on either side
    additional_samples = int(sub_buffer_length * sample_rate)

    # Define the subsection boundaries
    sub_start_index = max(0, start_index - additional_samples)
    sub_end_index = min(len(audio), end_index + additional_samples)

    # Extract the subsection from the audio
    sub_buffer = audio[sub_start_index:sub_end_index]

    # Calculate the indices of the bad samples within the subsection
    first_bad = start_index - sub_start_index
    num_bad = end_index - start_index

    # Interpolate the subsection using the LSAR algorithm
    interpolate_audio(sub_buffer, first_bad, num_bad)

    # Replace the repaired subsection in the original audio buffer
    audio[start_index:end_index] = sub_buffer[first_bad:first_bad + num_bad]

    # Return the modified audio buffer
    return audio


In [16]:
def create_repaired_wav_file(input_array, sample_rate, output_file):
    # Normalize the input array to the range [-1, 1]
    normalized_array = input_array / np.max(np.abs(input_array))
    
    # Write the repaired audio to a new .wav file
    sf.write(output_file, normalized_array, sample_rate, 'PCM_16')

In [22]:
# get bad sections
repair_sections = generate_mouth_sounds_labels('{}\WAV\\test 30s.wav'.format(nb_path))
repair_sections.head()

10:27:18 07/06/23 AUS Eastern Standard Time
10:27:33 07/06/23 AUS Eastern Standard Time
time taken: --- 14.356364965438843 seconds ---


Unnamed: 0,Start Time,End Time
0,0.234467,0.240454
1,0.381134,0.386122
2,0.398095,0.403084
3,1.87873,1.885714
4,2.159093,2.164082


In [24]:
# loop through bad sections to create a "fixed" version of the soundfile values
file_to_fix, sr = librosa.load('{}\WAV\\test 30s.wav'.format(nb_path), sr=None)

In [32]:
def repair_bad_sections(original_array, sample_rate, bad_sections_df):
    repaired_array = original_array.copy()
    
    # Iterate through each row in the dataframe
    for _, row in bad_sections_df.iterrows():
        start_time = row['Start Time']
        end_time = row['End Time']
        
        # Convert start and end times to sample indices
        start_index = int(start_time * sample_rate)
        end_index = int(end_time * sample_rate)
        
        # Repair the bad section using your interpolation method
        repaired_section = interpolate_subsection(original_array, start_time, end_time, sample_rate, 
                                                  sub_buffer_length=(end_time - start_time)*1.5)
        
        # Replace the bad section in the repaired array
        repaired_array = repaired_section
    
    return repaired_array


In [33]:
fixed_file_samples = repair_bad_sections(file_to_fix, sr, repair_sections)
create_repaired_wav_file(fixed_file_samples, sr, '{}\WAV\\test 30s - repaired.wav'.format(nb_path))

Hmmm. This works, but there are a couple of missed spots and weird mic blowouts that I don't remember being there. I need to compare the actual soundwaves visually for myself, so I'll check those out in Audacity when I get some time. Yes, I realise librosa can 100% do this for me, but no surprise the a GUI is superior for ease of access and comparing where the corrections did vs didn't work. It may be that the sub-sections are too long for LSAR to work effectively, so I'd need to use the approach I adopted in Audacity itself which was to repair 128 samples at a time, with 64 sample step forwards to cover the full length of the bad section.