In [18]:
import librosa
from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import os
import soundfile as sf

In [5]:
DATASET_PATH = r'D:\python\safevision\Respiratory_Sound_Database\audio_and_txt_files'
audioFilePaths = []
recordingInfoFilePaths = []
for dirname, _, filenames in os.walk(DATASET_PATH):
    for filename in filenames:
        fullPath = os.path.join(dirname, filename)
        if filename.endswith("wav"):
            audioFilePaths.append(fullPath)
        elif filename.endswith("txt"):
            recordingInfoFilePaths.append(fullPath) 
        #print(os.path.join(dirname, filename))

print(len(audioFilePaths))
print(len(recordingInfoFilePaths))

920
920


In [7]:
gSampleRate = 7000

def loadFiles(fileList):
    outputBuffers = []
    for filename in fileList:
        audioBuffer, nativeSampleRate = librosa.load(filename, dtype=np.float32, mono=True, sr=None)
        if nativeSampleRate == gSampleRate:
            outputBuffers.append(audioBuffer)
        else:
            duration = len(audioBuffer) / nativeSampleRate
            nTargetSamples = int(duration * gSampleRate)
            timeXSource = np.linspace(0, duration, len(audioBuffer), dtype=np.float32)
            timeX = np.linspace(0, duration, nTargetSamples, dtype=np.float32)
            resampledBuffer = np.interp(timeX, timeXSource, audioBuffer)
            outputBuffers.append(resampledBuffer)
            
    return outputBuffers

audioBuffers = loadFiles(audioFilePaths)

In [8]:
upperCutoffFreq = 3000
cutoffFrequencies = [80, upperCutoffFreq]

highPassCoeffs = signal.firwin(401, cutoffFrequencies, fs=gSampleRate, pass_zero="bandpass")

def applyHighpass(npArr):
    return signal.lfilter(highPassCoeffs, [1.0], npArr)

def applyLogCompressor(signal, gamma):
    sign = np.sign(signal)
    absSignal = 1 + np.abs(signal) * gamma
    logged = np.log(absSignal)
    scaled = logged * (1 / np.log(1.0 + gamma))
    return sign * scaled

def normalizeVolume(npArr):
    minAmp, maxAmp = (np.amin(npArr), np.amax(npArr))
    maxEnv = max(abs(minAmp), abs(maxAmp))
    scale = 1.0 / maxEnv
    #in place multiply
    npArr *= scale
    return npArr

#Removing the low-freq noise, re-normalizing volume then apply compressor
noiseRemoved = [normalizeVolume(applyHighpass(buffer)) for buffer in audioBuffers]
noiseRemoved = [applyLogCompressor(sig, 30) for sig in noiseRemoved]

In [None]:
def plot_spectrum(audio_signal, sample_rate=7000):
    spectrum = np.fft.fft(audio_signal)
    freqs = np.fft.fftfreq(len(spectrum), 1/sample_rate)

    pos_mask = freqs > 0
    spectrum = np.abs(spectrum[pos_mask])
    freqs = freqs[pos_mask]

    plt.figure(figsize=(10, 6))
    plt.plot(freqs, spectrum)
    plt.title('Спектр звука')
    plt.xlabel('Частота (Гц)')
    plt.ylabel('Амплитуда')
    plt.grid(True)
    plt.show()

plot_spectrum(noiseRemoved[19], sample_rate=7000)


In [None]:
import IPython.display as ipd

fig, axs = plt.subplots(1,2, figsize=(16,5))

selectedSampleIdx = 0

fig.suptitle('Before/After Bandpass filtering + Log Compression', fontsize=18)

axs[0].plot(audioBuffers[selectedSampleIdx])
axs[0].set_title("Before")

axs[1].plot(noiseRemoved[selectedSampleIdx])
axs[1].set_title("After")

for ax in axs.flat:
    ax.set(ylabel='Amplitude', xlabel='Sample Index')    

plt.tight_layout() 
plt.show()

print("Before Filtering")
ipd.display(ipd.Audio(audioBuffers[selectedSampleIdx], rate=gSampleRate))
print("Post Filtering")
ipd.display(ipd.Audio(noiseRemoved[selectedSampleIdx], rate=gSampleRate))

In [None]:
from scipy.io.wavfile import write
import numpy as np

def save_audio_to_wav(audio_array, filename, sample_rate=7000):
    audio_int16 = np.int16(audio_array * 32767)
    write(filename, sample_rate, audio_int16)

for idx, audio in enumerate(noiseRemoved):
    filename = f'processed_audio_{idx+1}.wav'
    save_audio_to_wav(audio, filename)
    print(f'Сохранено: {filename}')


In [23]:
OUTPUT_DATASET_PATH = r'D:\python\safevision\Respiratory_Sound_Database\PROCESSING_BIG_DATASET/'

for i, audio_buffer in enumerate(noiseRemoved):
    # Извлекаем имя файла без расширения
    base_name = os.path.splitext(os.path.basename(audioFilePaths[i]))[0]

    # Путь для сохранения аудиофайла
    output_audio_path = os.path.join(OUTPUT_DATASET_PATH, f"{base_name}_processed.wav")
    
    # Сохраняем аудиофайл в формате WAV
    sf.write(output_audio_path, audio_buffer, 7000)  # 7000 Hz - частота дискретизации

    # Найти соответствующий текстовый файл с таким же именем
    corresponding_txt_file = None
    for txt_path in recordingInfoFilePaths:
        if base_name in os.path.basename(txt_path):
            corresponding_txt_file = txt_path
            break
    
    if corresponding_txt_file:
        # Копируем текстовый файл в целевую директорию с тем же именем
        output_txt_path = os.path.join(OUTPUT_DATASET_PATH, f"{base_name}_processed.txt")
        with open(corresponding_txt_file, 'r') as txt_file:
            txt_content = txt_file.read()

        with open(output_txt_path, 'w') as output_txt_file:
            output_txt_file.write(txt_content)
    else:
        print(f"Текстовый файл для {base_name} не найден.")
