Scripts are supposed to change format of txt files of training data to the same format, lets call it 'tpi'
0 - exhale, 1 - inhale, 2 - silence
after every class we have 1 number saying how many milliseconds does this class last
for example for following csv file:
1 1500
0 2500
We have a recording that has 1 second of inhale and 2.5 seconds of exhale, total of 3.5 second recording.
For every csv file we have a corresponding .wav file with the same name.

# Master thesis dataset preparation

In [4]:
import os
import csv
import shutil
import wave
import numpy as np
from scipy.io.wavfile import read, write
from scipy.signal import resample

# Function that converts a txt file to a csv file in the desired format
def convert_to_csv_master(txt_file, output_file, sampling_rate):
    # Open the txt file
    with open(txt_file, 'r') as f:
        lines = f.readlines()

    events = []
    last_end = 0
    MIN_SILENCE_SAMPLES = int(0.01 * sampling_rate)  # Minimum silence duration in samples

    # Iterate over lines in the txt file
    for line in lines:
        # Split the line by spaces
        parts = line.strip().split()

        # If there are not 3 parts (class, start, and end), skip this line
        if len(parts) != 3:
            raise ValueError(f"Invalid line in {txt_file}: {line.strip()}")

        event, start, end = parts[0], int(parts[1]), int(parts[2])

        # Add silence if the gap between events is longer than the minimum silence duration
        if start - last_end > MIN_SILENCE_SAMPLES:
            events.append(('silence', last_end, start))
        if event == 'wydech':  # Exhale
            events.append(('exhale', start, end))
        elif event == 'wdech':  # Inhale
            events.append(('inhale', start, end))
        last_end = end

    # Write the events to the csv file
    with open(output_file, 'w') as f:
        writer = csv.writer(f)
        writer.writerow(["phase_code", "start_sample", "end_sample"])
        for event in events:
            writer.writerow([event[0], event[1], event[2]])

# Function to get the sampling rate of a wav file
def get_sampling_rate(wav_file):
    with wave.open(wav_file, 'r') as wf:
        sample_rate = wf.getframerate()
    return sample_rate

# Function that processes txt files in a directory and converts them to csv files
def process_directory_master(input_directory, output_directory):
    i = 1  # Counter for output files

    # Create the output directory if it doesn't exist
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # Iterate over files in the directory
    for filename in os.listdir(input_directory):
        # If the file is a txt file
        if filename.endswith('.txt'):
            # Get the paths of the txt and corresponding wav files
            txt_file = os.path.join(input_directory, filename)
            wav_file = os.path.join(input_directory, filename.replace('.txt', '.wav'))

            # If the corresponding wav file exists
            if os.path.exists(wav_file):
                # Create the output paths
                output_csv = os.path.join(output_directory, f'data-master{i}.csv')
                output_wav = os.path.join(output_directory, f'data-master{i}.wav')

                # Convert the txt file to a csv file in the desired format
                convert_to_csv_master(txt_file, output_csv, get_sampling_rate(wav_file))

                # Load, resample, and save the wav file
                rate, data = read(wav_file)

                if data.dtype != np.int16:
                    raise Exception(f"Data type is not int16, it's {data.dtype}. Make sure you have used right sequence creator.")

                if data.ndim > 1 and data.shape[1] > 1:
                    data = data.mean(axis=1).astype(data.dtype)

                if rate != 44100:
                    adjust_labels(output_csv, rate, 44100)
                    num_samples = int(len(data) * 44100 / rate)
                    data = resample(data, num_samples)

                write(output_wav, 44100, data)

                # Split the audio and labels into 10-second segments
                split_into_segments(output_wav, output_csv)

                # Increment the counter
                i += 1


# Function to adjust the labels in the csv file based on the resampling ratio
def adjust_labels(csv_file, original_rate, resampled_rate):
    # Calculate the resampling ratio
    ratio = resampled_rate / original_rate

    if ratio == 1:
        return

    # Read the csv file
    with open(csv_file, 'r') as f:
        reader = csv.reader(f)
        header = next(reader)
        rows = list(reader)

    # Adjust the labels
    adjusted_rows = []
    for row in rows:
        phase_code, start_sample, end_sample = row
        start_sample = int(int(start_sample) * ratio)
        end_sample = int(int(end_sample) * ratio)
        adjusted_rows.append([phase_code, start_sample, end_sample])

    # Write the adjusted labels back to the csv file
    with open(csv_file, 'w') as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(adjusted_rows)

sequence_id = 1  # Counter for output files
remaining_samples = np.array([], dtype=np.int16)  # Store remaining audio samples
remaining_labels = []

# Function to split the audio and labels into 10-second segments
def split_into_segments(wav_file, csv_file):
    global sequence_id, remaining_samples, remaining_labels

    TARGET_SAMPLES = 441000  # 10 seconds at 44.1 kHz
    output_folder = '../../master-sequences-processed'

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Read audio file as samples
    sample_rate, current_samples = read(wav_file)

    if sample_rate != 44100:
        raise Exception("Sampling rate is not 44100. Make sure you have used right sequence creator.")
    if current_samples.dtype != np.int16:
        raise Exception(f"Data type is not int16, it's {current_samples.dtype}. Make sure you have used right sequence creator.")
    if current_samples.ndim != 1:
        raise Exception("Audio is not mono. Make sure you have used right sequence creator.")

    current_labels = load_labels(csv_file)  # Load labels

    # Combine remaining audio samples and labels
    full_samples = np.concatenate((remaining_samples, current_samples))
    full_labels = remaining_labels + shift_labels(current_labels, len(remaining_samples))

    # Calculate number of full segments
    num_full_segments = len(full_samples) // TARGET_SAMPLES

    offset = 0
    for i in range(num_full_segments):
        segment_samples = full_samples[offset:offset + TARGET_SAMPLES]
        segment_labels = [
            (label, max(0, start - offset), min(TARGET_SAMPLES - 1, end - offset))
            for label, start, end in full_labels if start < offset + TARGET_SAMPLES and end > offset
        ]
        save_sequence_and_labels(segment_samples, segment_labels, sequence_id, output_folder, sample_rate)
        sequence_id += 1
        offset += TARGET_SAMPLES

    # Store remaining samples and labels for next call
    remaining_samples = full_samples[offset:]
    remaining_labels = [(label, start - offset, end - offset) for label, start, end in full_labels if start >= offset]

def save_sequence_and_labels(samples, labels, seq_id, folder, sample_rate):
    wav_path = os.path.join(folder, f"sequence_{seq_id}_master.wav")
    csv_path = os.path.join(folder, f"sequence_{seq_id}_master.csv")

    # Save the audio segment
    write(wav_path, sample_rate, samples)

    # Save labels
    with open(csv_path, 'w') as f:
        writer = csv.writer(f)
        writer.writerow(["label", "start_sample", "end_sample"])
        writer.writerows(labels)


# Function to shift the labels by a specified amount
def shift_labels(labels, shift):
    return [(l, s + shift, e + shift) for l, s, e in labels]


# Function to load labels from a csv file
def load_labels(csv_file):
    labels = []
    with open(csv_file, 'r') as file:
        reader = csv.reader(file)
        next(reader)  # Skip header
        for row in reader:
            phase_code = row[0]
            start_sample = int(row[1])
            end_sample = int(row[2])
            labels.append((phase_code, start_sample, end_sample))
    return labels


# Process the dataset directory
process_directory_master('../../data-master-thesis', '../../master-sequences')
print("end")

end


In [23]:
import os
import csv

def calculate_class_durations(directory):
    class_durations = {'exhale': 0, 'inhale': 0, 'silence': 0}

    for filename in os.listdir(directory):
        if filename.endswith('.csv'):
            csv_file = os.path.join(directory, filename)
            with open(csv_file, 'r') as file:
                reader = csv.reader(file)
                next(reader)  # Skip header
                for row in reader:
                    phase_code = row[0]
                    start_sample = int(row[1])
                    end_sample = int(row[2])
                    duration = (end_sample - start_sample + 1) / 44100  # Convert samples to seconds
                    if phase_code in class_durations:
                        class_durations[phase_code] += duration

    return class_durations

# Calculate and print class durations
directory = '../../master-sequences-processed'
durations = calculate_class_durations(directory)
for phase, duration in durations.items():
    print(f"{phase}: {duration:.2f} seconds")

exhale: 889.40 seconds
inhale: 338.59 seconds
silence: 712.46 seconds


# Script to delete unnecessary txt files that are left after our scripts

In [None]:
import os

# Function that removes all txt files in specified directory
def remove_txt_files(directory):
    for filename in os.listdir(directory):
        if filename.endswith('.txt'):
            txt_file = os.path.join(directory, filename)
            os.remove(txt_file)
            print(f'Deleted: {txt_file}')

# Delete all txt files in specified directory
remove_txt_files('../data-repo')

# Turns out that we need also script to convert frames to milliseconds in our csv format

In [None]:
import os
import pandas as pd
import wave

# Function that converts samples to milliseconds
def convert_samples_to_ms(csv_file, wav_file):

    # Open wav file and get sample rate
    with wave.open(wav_file, 'r') as wf:
        sample_rate = wf.getframerate()

    # Open csv file and process data
    df = pd.read_csv(csv_file, header=None)
    df[1] = df[1] / sample_rate * 1000

    # Save processed data to the same csv file
    df.to_csv(csv_file, header=False, index=False)

# Function that processes all csv files in specified directory
def process_directory(directory):

    # Iterate over files in directory
    for filename in os.listdir(directory):

        # If file is csv file
        if filename.endswith('.csv'):

            # Get paths of both csv and wav file
            csv_file = os.path.join(directory, filename)
            wav_file = os.path.join(directory, filename.replace('.csv', '.wav'))

            # If corresponding wav file exists
            if os.path.exists(wav_file):

                # Convert samples to milliseconds
                convert_samples_to_ms(csv_file, wav_file)

# Process files in the directory ../data-master-thesis
process_directory('../data-master-thesis')

# Repo dataset preparation

In [None]:
import os

# Function to map the class values
def map_class(value):
    if value == 0:  # Their silence
        return 2
    elif value in [1, 3]:  # Their inhale
        return 1
    elif value in [2, 4]:  # Their exhalation
        return 0

# Function that converts txt file to csv file in our wanted format
def convert_to_csv_repo(txt_file, output_file):

    # Open txt file
    with open(txt_file, 'r') as f:
        lines = f.readlines()

    events = []

    # Iterate over lines in txt file
    for line in lines:

        # Split line by tabs
        parts = line.strip().split('\t')

        # If there are not 3 parts (start, end, class), skip this line
        if len(parts) != 3:
            continue

        # Extract start, end and class
        start, end, event = float(parts[0]), float(parts[1]), int(parts[2])

        # Map class value
        event = map_class(event)

        # Convert duration to milliseconds and throw error if anomaly occurs
        try:
            duration = int((end - start) * 1000)
            if duration < 0:
                raise ValueError(f"Negative duration: {duration} for line: {line.strip()}")
        except ValueError as e:
            print(e)
            continue

        # Append event to events list
        events.append((event, duration))

    # Write events to csv file
    with open(output_file, 'w') as f:
        for event in events:
            f.write(f'{event[0]},{event[1]}\n')

# Function that changes txt files to csv files in our wanted format in specified directory
def process_directory_repo(directory):
    i = 1  # Counter for output files

    # Iterate over files in directory
    for filename in os.listdir(directory):

        # If file is txt file
        if filename.endswith('.txt'):

            # Get paths
            txt_file = os.path.join(directory, filename)
            wav_file = os.path.join(directory, filename.replace('.txt', '.wav'))

            # If corresponding wav file exists
            if os.path.exists(wav_file):

                print(f'Processing: {txt_file}')
                # Create output paths
                output_csv = os.path.join(directory, f'data-repo{i}.csv')
                output_wav = os.path.join(directory, f'data-repo{i}.wav')

                # Convert txt file to csv file in our wanted format
                convert_to_csv_repo(txt_file, output_csv)

                # Rename wav file
                os.rename(wav_file, output_wav)

                # Increment counter
                i += 1

# Process dataset directory
process_directory_repo('../data-repo')

## That's all we need to prepare ready dataset for training - only thing left is to process our data (using breating_sequence_creator.py)

## We got sequences of different lengts, varying from 10 seconds to even 2.5 minutes. For training, in order to not have to pad sequences, we need to make sequences as close to same length as possible (lets say 30s)

In [1]:
import os
import csv
from pydub import AudioSegment

TARGET_DURATION = 30000  # Sequence duration
INPUT_FOLDER = './data-seq'
OUTPUT_FOLDER = '../train'

if not os.path.exists(OUTPUT_FOLDER):
    os.makedirs(OUTPUT_FOLDER)

# Function to load labels (boring)
def load_labels(csv_file):
    labels = []
    with open(csv_file, 'r') as file:
        reader = csv.reader(file)
        for row in reader:
            labels.append((int(row[0]), float(row[1])))
    return labels

# Function to save sequence and labels (takes audio recording, labels to it and id of sequence)
def save_sequence_and_labels(sequence, labels, sequence_id):
    audio_path = os.path.join(OUTPUT_FOLDER, f"seq_{sequence_id}.wav")
    sequence.export(audio_path, format="wav")

    csv_path = os.path.join(OUTPUT_FOLDER, f"seq_{sequence_id}.csv")
    with open(csv_path, 'w', newline='') as file:
        writer = csv.writer(file)
        for label in labels:
            writer.writerow(label)

# Function to process files
def process_files():

    # Sequence ID for sequence name
    sequence_id = 1

    # current_sequence stores currently created sequence
    current_sequence = AudioSegment.silent(duration=0)

    # current_labels stores labels of current sequence
    current_labels = []

    # current_duration stores duration of current sequence
    current_duration = 0

    # We run through all files in INPUT_FOLDER
    for filename in os.listdir(INPUT_FOLDER):

        # If file is wav file
        if filename.endswith('.wav'):

            # We need to get both wav file and csv file with same name
            wav_file = os.path.join(INPUT_FOLDER, filename)
            csv_file = os.path.join(INPUT_FOLDER, filename.replace('.wav', '.csv'))

            # If corresponding csv file does not exist, just skip this file
            if not os.path.exists(csv_file):
                continue

            # Load audio and labels from currently processed audio file
            audio = AudioSegment.from_wav(wav_file)
            labels = load_labels(csv_file)

            # We run through all labels in the file (lets say sub recordings)
            for label, duration in labels:

                # We need to get duration of currently checked sub sequence
                duration = int(duration)

                # We append our current sequence with currently checked subsequence

                # We add the sub recording to current sequence
                current_sequence += audio[:duration]

                # We add label to current labels
                current_labels.append((label, duration))

                # Update the current duration
                current_duration += duration

                # And we cut the sub recording from the audio
                audio = audio[duration:]

                # And if the duration is larger than our TARGET_DURATION, we want to save the sequence and create new one
                if current_duration > TARGET_DURATION:

                    # We save the sequence as it has reached the TARGET_DURATION
                    save_sequence_and_labels(current_sequence, current_labels, sequence_id)
                    sequence_id += 1

                    # We create the new, empty one
                    current_sequence = AudioSegment.silent(duration=0)

                    # And we reset labels and duration
                    current_labels = []
                    current_duration = 0

    # After running through all wav files, we need to save the last sequence, even if its not long enough
    if current_duration > 0:
        save_sequence_and_labels(current_sequence, current_labels, sequence_id)
        sequence_id += 1

if __name__ == "__main__":
    process_files()