<a href="https://colab.research.google.com/gist/parulnith/7f8c174e6ac099e86f0495d3d9a4c01e/untitled9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Cough classification notebook

In [1]:
# 安装 ffmpeg

# apt update
# apt install ffmpeg
# ffmpeg -version

## Importing Libraries

In [2]:
# feature extractoring and preprocessing data
import librosa
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import os
from PIL import Image
import pathlib
import csv
from tqdm import tqdm

# Preprocessing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler

# #Keras
# import keras

import warnings
warnings.filterwarnings('ignore')

%cd ../../


/root/autodl-nas/cough_videos


## Extracting cough audios and features

### Dataset

We use [COUGHVID](https://zenodo.org/record/7024894#.YysPHHZBybg) dataset for classification. 
<br>
<br>
数据原本将每段咳嗽音区分为 i.e
 * COVID-19
 * healthy
 * symptomatic
 
Total dataset: xxx samples


我们现在将任务改为二分类：healthy (label 0) vs unhealthy (label 1)

## Meta data


In [3]:
# 查看每个json文件，选取标注

import json

dict_id2label = {}

folder = './datasets/coughvid_v1/public_dataset'
count = 0
for filename in tqdm(os.listdir(folder)):
#     print(filename)
    if not filename.endswith("json"):
        continue
    
    annotation_info = json.load(
        open(os.path.join(folder, filename), "r", encoding="utf-8")
    )
#     print(annotation_info)
    
    expert_anno_1 = annotation_info.get("expert_labels_1")
    diagnosis_1 = None
    if expert_anno_1:
        diagnosis_1 = expert_anno_1["diagnosis"]
        if expert_anno_1["quality"] == "no_cough":
            diagnosis_1 = None
    
    expert_anno_2 = annotation_info.get("expert_labels_2")
    diagnosis_2 = None
    if expert_anno_2:
        diagnosis_2 = expert_anno_2["diagnosis"]
        if expert_anno_2["quality"] == "no_cough":
            diagnosis_2 = None
    
    expert_anno_3 = annotation_info.get("expert_labels_3")
    diagnosis_3 = None
    if expert_anno_3:
        diagnosis_3 = expert_anno_3.get("diagnosis")
#         diagnosis_3 = expert_anno_3["diagnosis"]
        if expert_anno_3["quality"] == "no_cough":
            diagnosis_3 = None
    
    status = annotation_info.get("status")
    if diagnosis_1 is None and diagnosis_2 is None and diagnosis_3 is None and status is None:
        continue
    
    diagnose_label = None
    
    if status:
        if status == "healthy":
            diagnose_label = "healthy"
        else:
            diagnose_label = "unhealthy"
        
    else: 
        
        if set([diagnosis_1, diagnosis_2, diagnosis_3]) == set(["healthy_cough", None]):
            diagnose_label = "healthy"
        elif set([diagnosis_1, diagnosis_2, diagnosis_3]) == set(["healthy_cough"]):
            diagnose_label = "healthy"
        elif "healthy" not in json.dumps([diagnosis_1, diagnosis_2, diagnosis_3]):
            diagnose_label = "unhealthy"
        else:   
            print(status, (diagnosis_1, diagnosis_2, diagnosis_3))
            if "\"healthy_cough\", \"healthy_cough\"" not in json.dumps(sorted([diagnosis_1, diagnosis_2, diagnosis_3])):
                diagnose_label = "unhealthy"
            else:
                diagnose_label = "healthy"
    
    count += 1
    dict_id2label[filename.replace(".json", "")] = 1 if diagnose_label == "unhealthy" else 0

print("有效样本： ", count)


  8%|▊         | 3258/40145 [00:01<00:13, 2800.41it/s]

None ('COVID-19', 'COVID-19', 'healthy_cough')


 10%|▉         | 3854/40145 [00:01<00:18, 1981.73it/s]

None ('lower_infection', 'COVID-19', 'healthy_cough')


 14%|█▎        | 5437/40145 [00:02<00:10, 3384.29it/s]

None ('healthy_cough', 'upper_infection', 'healthy_cough')


 26%|██▌       | 10502/40145 [00:05<00:22, 1306.13it/s]

None ('healthy_cough', 'COVID-19', 'healthy_cough')


 27%|██▋       | 10955/40145 [00:05<00:20, 1413.93it/s]

None ('healthy_cough', 'upper_infection', 'healthy_cough')


 41%|████▏     | 16618/40145 [00:09<00:16, 1416.48it/s]

None ('healthy_cough', 'COVID-19', 'healthy_cough')


 54%|█████▎    | 21557/40145 [00:13<00:12, 1430.71it/s]

None ('lower_infection', 'obstructive_disease', 'healthy_cough')


 60%|██████    | 24179/40145 [00:15<00:10, 1528.44it/s]

None ('lower_infection', 'upper_infection', 'healthy_cough')


 65%|██████▌   | 26104/40145 [00:16<00:09, 1421.28it/s]

None ('healthy_cough', 'healthy_cough', 'upper_infection')


 69%|██████▉   | 27794/40145 [00:17<00:09, 1307.48it/s]

None ('healthy_cough', 'COVID-19', 'healthy_cough')


 70%|███████   | 28201/40145 [00:18<00:08, 1343.57it/s]

None ('healthy_cough', 'upper_infection', 'healthy_cough')


 93%|█████████▎| 37474/40145 [00:24<00:01, 1340.33it/s]

None ('lower_infection', 'COVID-19', 'healthy_cough')


100%|██████████| 40145/40145 [00:26<00:00, 1508.67it/s]

有效样本：  11630





In [4]:
# 正负样本数量：
neg_sample = 0   # healthy coughs
for k, v in dict_id2label.items():
    if v == 0:
        neg_sample += 1

print("负样本率： ", neg_sample / count)

负样本率：  0.7428202923473775


## Extracting features from Spectrogram


We will extract

* Mel-frequency cepstral coefficients (MFCC)(20 in number)
* Spectral Centroid,
* Zero Crossing Rate
* Chroma Frequencies
* Spectral Roll-off.

In [5]:
header = 'filename chroma_stft rmse spectral_centroid spectral_bandwidth rolloff zero_crossing_rate'
for i in range(1, 21):
    header += f' mfcc{i}_mean'
for i in range(1, 21):
    header += f' mfcc{i}_std'
header += ' label'


## Extracting the Spectrogram and features for every Audio

We write the data to a csv file 

In [None]:
f = open('./datasets/coughvid_v1/features/data.csv', 'w', encoding="utf-8")
f.write(header + "\n")

cmap = plt.get_cmap('inferno')
plt.figure(figsize=(10,10))

list_durations = []
list_files = os.listdir(folder)
import random
random.shuffle(list_files)
for filename in tqdm(list_files):
    
    if filename.endswith("json"):
        continue
    if filename.endswith("csv"):
        continue
    
    idx = filename.split(".")[0]
    if idx not in dict_id2label:
        continue
    
    label = dict_id2label[idx]
    
    # print(label, idx)
    
    # load file
    y, sr = librosa.load(os.path.join(folder, filename), mono=True, duration=30)
    # print(y.shape)
    # print(sr)
    dur = librosa.get_duration(y=y, sr=sr)
    list_durations.append(dur)
    # print("持续时间: ", dur)
    
    plt.specgram(y, NFFT=2048, Fs=2, Fc=0, noverlap=128, cmap=cmap, sides='default', mode='default', scale='dB');
    plt.axis('off');
    plt.savefig(f"./datasets/coughvid_v1/spectrograms/{idx}-label_{label}.png", dpi=200)
    plt.close()
    
    chroma_stft = librosa.feature.chroma_stft(y=y, sr=sr)
    rmse = librosa.feature.rms(y=y, )
    spec_cent = librosa.feature.spectral_centroid(y=y, sr=sr)
    spec_bw = librosa.feature.spectral_bandwidth(y=y, sr=sr)
    rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
    zcr = librosa.feature.zero_crossing_rate(y)
    mfcc = librosa.feature.mfcc(y=y, sr=sr)
    to_append = f'{filename} {np.mean(chroma_stft)} {np.mean(rmse)} {np.mean(spec_cent)} {np.mean(spec_bw)} {np.mean(rolloff)} {np.mean(zcr)}'    

    for e in mfcc:
        to_append += f' {np.mean(e)}'
    for e in mfcc:
        to_append += f' {np.std(e)}'
            
    to_append += f' {label}'
    # print(to_append)
    f.write(to_append + "\n")

f.close()

 11%|█▏        | 4552/40145 [25:33<2:42:19,  3.65it/s]

In [None]:
# 录音时长分布
plt.figure(figsize=(10, 10))

plt.hist(list_durations, bins=30)
plt.show()


The data has been extracted into a [data.csv](https://github.com/parulnith/Music-Genre-Classification-with-Python/blob/master/data.csv) file.

# Analysing the Data in Pandas

In [None]:
data = pd.read_csv('./datasets/coughvid_v1/features/data.csv', sep=" ")
data.head()

In [None]:
data.shape

In [None]:
# Dropping unneccesary columns
data = data.drop(['filename'],axis=1)

In [None]:

from src.baseline1_feature_extraction.tree_training_utils import train_cv_with_thres, threshold_prob, threshold_prob_via_jordan

from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression

random_state = 100
cbt = CatBoostClassifier(verbose=0, allow_writing_files=False, random_state=random_state)
lgb = LGBMClassifier(is_unbalance=True, random_state=random_state)
xgb = XGBClassifier(verbose=0, scale_pos_weight=3, enable_categorical=True, random_state=random_state)
logistic = LogisticRegression(verbose=0, random_state=random_state)


In [None]:
# models 
list_model_names = [
    "cbt",
    "lgb",
    "xgb",
    "logistic",
#     "stack"
]
list_model_clfs = [
    cbt, 
    lgb, 
    xgb, 
    logistic, 
#     stack
]

In [None]:
import json

features = [w for w in data.columns if w not in ["label"]]
print(features)

for clf_name, clf in zip(list_model_names, list_model_clfs):
    
    res = train_cv_with_thres(data,
                        clf,
                        features,
                        num_folds=5,
                        random_state=101,
                        sensitivity_threshold=0.75,
                        save_dir=None,
                        )
    print(f"**{clf_name}**" * 30)
    print(res)
    print(f"**{clf_name}**" * 30)

## Encoding the Labels

In [None]:
genre_list = data.iloc[:, -1]
encoder = LabelEncoder()
y = encoder.fit_transform(genre_list)

## Scaling the Feature columns

In [None]:
scaler = StandardScaler()
X = scaler.fit_transform(np.array(data.iloc[:, :-1], dtype = float))

## Dividing data into training and Testing set

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [None]:
len(y_train)

In [None]:
len(y_test)

In [None]:
X_train[10]

# Classification with Keras

## Building our Network

In [None]:
from keras import models
from keras import layers
from keras import callbacks

model = models.Sequential()
model.add(layers.Dense(32, activation='gelu', input_shape=(X_train.shape[1],)))
model.add(layers.Dense(32, activation='gelu'))
model.add(layers.Dense(32, activation='gelu'))
model.add(layers.Dense(10, activation='softmax'))

In [None]:
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['acc'])

In [None]:
history = model.fit(X_train,
                    y_train,
                    validation_data=(X_test, y_test),
                    epochs=30,
                    batch_size=8,
                    callbacks=callbacks.EarlyStopping(
                        monitor='val_loss',
                        min_delta=0.005,
                        patience=5,
                        verbose=0,
                        mode='auto',
                        baseline=None,
                        restore_best_weights=True
                    )
                   )
                   

In [None]:
test_loss, test_acc = model.evaluate(X_test,y_test)

In [None]:
print('test_acc: ',test_acc)

Tes accuracy is less than training dataa accuracy. This hints at Overfitting

## Validating our approach
Let's set apart 200 samples in our training data to use as a validation set:

In [None]:
x_val = X_train[:200]
partial_x_train = X_train[200:]

y_val = y_train[:200]
partial_y_train = y_train[200:]

Now let's train our network for 20 epochs:

In [None]:
model = models.Sequential()
model.add(layers.Dense(512, activation='relu', input_shape=(X_train.shape[1],)))
model.add(layers.Dense(256, activation='relu'))
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

model.fit(partial_x_train,
          partial_y_train,
          epochs=30,
          batch_size=512,
          validation_data=(x_val, y_val))
results = model.evaluate(X_test, y_test)

In [None]:
results

## Predictions on Test Data

In [None]:
predictions = model.predict(X_test)

In [None]:
predictions[0].shape

In [None]:
np.sum(predictions[0])

In [None]:
np.argmax(predictions[0])