In [2]:
import pandas as pd
import numpy as np
import pathlib
from pathlib import Path
import librosa
from typing import List, Tuple
import toolz as tz
from dask.diagnostics import ProgressBar
import dask.bag as db
import dask
import dask.array as da
import zarr
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import robust_scale
pd.set_option('display.float_format', lambda x: '%.3f' % x)

In [3]:
def make_diagnosis_df(data_folder: pathlib.Path) -> pd.DataFrame:
    return pd.read_csv(
        data_folder.joinpath("patient_diagnosis.csv"),
        header=None,
        names=["Patient", "Diagnosis"],
    ).set_index("Patient")


def make_file_df(
    diagnosis_df: pd.DataFrame, audio_txt_folder: pathlib.Path
) -> pd.DataFrame:
    file_stats = [
        get_record_stats(audio_txt_folder, x.name.split(".")[0])
        for x in audio_txt_folder.glob("*.wav")
    ]

    file_df = pd.DataFrame(
        file_stats,
        columns=[
            "Patient",
            "Section",
            "Location",
            "n_channels",
            "device",
            "filesize",
            "n_breaths",
        ],
    ).assign(
        Diagnosis=lambda x: x["Patient"]
        .astype(int)
        .map(lambda y: diagnosis_df.loc[y, "Diagnosis"])
    )
    return file_df


def get_record_stats(folder: str, file: str) -> Tuple[str, str, str, int, int]:
    name_elems = file.split("_")
    wav_size = Path(folder).joinpath(f"{file}.wav").stat().st_size
    # Count the lines, then subtract 1 cuz they end on an empty
    n_breath_cycles = (
        sum(1 for line in open(Path(folder).joinpath(f"{file}.txt"), "r")) - 1
    )
    return tuple(name_elems + [wav_size] + [n_breath_cycles])


def pad_to_length(arr: np.array, max_len: int) -> np.array:
    arr_len = arr.shape[0]
    diff = max_len - arr_len
    return np.pad(arr, (0, diff), mode="wrap")


def make_breath_array(
    audio_txt_folder: pathlib.Path, file_df: pd.DataFrame
) -> dask.array.Array:
    files_to_use = list(audio_txt_folder.rglob("*.wav"))
    # I downsampled it to the lowest I could get it without
    # running into DivideByZero errors.  Breathing is
    # low-frequency
    wav_bag = (
        db.from_sequence(files_to_use, npartitions=8)
        .map(lambda x: librosa.core.load(x, sr=87)[0])
        .compute()
    )

    max_len = max(x.shape[0] for x in wav_bag)

    breath_array = (
        db.from_sequence(wav_bag, npartitions=8)
        .map(lambda x: pad_to_length(x, max_len))
        .to_dataframe()
        .to_dask_array(lengths=True)
    )

    new_cols = da.stack(
        [
            da.from_array(file_df["n_breaths"].values),
            da.from_array((file_df["Diagnosis"] == "Healthy").astype(np.int8).values),
        ],
        axis=1,
    )

    return da.concatenate([breath_array, new_cols], axis=1).astype(np.float32)


def save_to_zarr(arr: dask.array.Array, folder: pathlib.Path, filename: str) -> None:
    destination = str(Path(folder, filename))
    da.to_zarr(arr.rechunk(), destination)

In [4]:
# Pathlib has a number of advantages, including being
# cross-platform
data_folder = Path("..", "data")

raw_respiratory = Path(
    data_folder,
    "raw",
    "respiratory-sound-database",
    "Respiratory_Sound_Database",
    "Respiratory_Sound_Database",
)

audio_txt_folder = raw_respiratory.joinpath("audio_and_txt_files")

In [10]:
diagnosis_df = make_diagnosis_df(raw_respiratory)
file_df = make_file_df(diagnosis_df, audio_txt_folder)

In [11]:
with ProgressBar():
    breath_array = make_breath_array(audio_txt_folder, file_df)

[########################################] | 100% Completed |  1min 30.9s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  9.7s


In [12]:
save_to_zarr(breath_array, 
             data_folder.joinpath("interim"), 
             "breath_data_full.zarr")

In [13]:
train, test = train_test_split(
    breath_array.rechunk(), test_size=0.3, random_state=0, stratify=breath_array[:,-1]
)

In [14]:
save_to_zarr(train, 
             data_folder.joinpath("interim"), 
             "breath_data_train.zarr")

save_to_zarr(test, 
             data_folder.joinpath("interim"), 
             "breath_data_test.zarr")

In [5]:
data_folder = Path("..", "data", "interim")
train_dask = da.from_zarr(str(data_folder.joinpath("breath_data_train.zarr")))
train = train_dask.compute()