<a href="https://colab.research.google.com/github/shaja-asm/cry-detection/blob/main/tf_lite_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from tensorflow.keras.utils import Sequence
import datetime
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, TensorBoard, EarlyStopping
from tensorflow.keras.optimizers import Adam
from scipy.ndimage import zoom
import ctypes

# gpus = tf.config.experimental.list_physical_devices('GPU')
# if gpus:
#     try:
#         for gpu in gpus:
#             tf.config.experimental.set_memory_growth(gpu, True)
#     except RuntimeError as e:
#         print(e)
# print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

In [None]:
AUDIO_PATH = 'CryCorpusFinal'
CRY_FOLDER = os.path.join(AUDIO_PATH, 'cry/augmented')
NOTCRY_FOLDER = os.path.join(AUDIO_PATH, 'notcry')
IMG_SIZE = (128, 128)
BATCH_SIZE = 32
EPOCHS = 25

In [None]:
def load_audio_files(folder):
    files = []
    for filename in os.listdir(folder):
        if filename.endswith('.wav'):
            files.append(os.path.join(folder, filename))
    return files

def compute_spectrogram(y, sr, n_fft=2048, hop_length=512):
    D = librosa.stft(y, n_fft=n_fft, hop_length=hop_length)
    D_dB = librosa.amplitude_to_db(np.abs(D), ref=np.max)
    return D_dB

In [None]:
def save_spectrogram_to_disk(D_dB, save_path):
    if not os.path.exists(os.path.dirname(save_path)):
        os.makedirs(os.path.dirname(save_path))
    np.save(save_path, D_dB)

cry_files = load_audio_files(CRY_FOLDER)
notcry_files = load_audio_files(NOTCRY_FOLDER)

data = []
labels = []

for idx, file in enumerate(cry_files):
    y, sr = librosa.load(file, sr=None)
    y = librosa.util.normalize(y)
    D_dB = compute_spectrogram(y, sr)
    save_path = os.path.join(f'{0}/spectrograms'.format(AUDIO_PATH), f'cry_{idx}.npy')
    save_spectrogram_to_disk(D_dB, save_path)
    data.append(save_path)
    labels.append(1)

for idx, file in enumerate(notcry_files):
    y, sr = librosa.load(file, sr=None)
    y = librosa.util.normalize(y)
    D_dB = compute_spectrogram(y, sr)
    save_path = os.path.join(f'{0}/spectrograms'.format(AUDIO_PATH), f'notcry_{idx}.npy')
    save_spectrogram_to_disk(D_dB, save_path)
    data.append(save_path)
    labels.append(0)

data = np.array(data)
labels = np.array(labels)



In [None]:
# Split the datasets
X_train, X_val, y_train, y_val = train_test_split(data, labels, test_size=0.2, random_state=42)

class OnTheFlyDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, file_paths, labels, batch_size, img_size, shuffle=True, augment=False):
        self.file_paths = file_paths
        self.labels = labels
        self.batch_size = batch_size
        self.img_size = img_size
        self.shuffle = shuffle
        self.augment = augment
        self.indices = np.arange(len(self.file_paths))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.file_paths) / self.batch_size))

    def __getitem__(self, index):
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batch_file_paths = [self.file_paths[i] for i in batch_indices]
        batch_labels = [self.labels[i] for i in batch_indices]

        X, y = self.__data_generation(batch_file_paths, batch_labels)
        return X, y

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def __data_generation(self, batch_file_paths, batch_labels):
        X = np.empty((len(batch_file_paths), *self.img_size, 1), dtype=np.float32)
        y = np.empty((len(batch_file_paths),), dtype=int)

        for i, file_path in enumerate(batch_file_paths):
            D_dB = np.load(file_path)
            D_dB = D_dB[..., np.newaxis]  # Add channel dimension

            # Resizing
            zoom_factors = [self.img_size[0] / D_dB.shape[0], self.img_size[1] / D_dB.shape[1], 1]
            D_dB = zoom(D_dB, zoom_factors, order=3)  # order=3 for cubic interpolation

            if self.augment:
                if np.random.rand() > 0.5:
                    D_dB = np.flip(D_dB, axis=1)  # Flip left-right
                if np.random.rand() > 0.5:
                    D_dB = np.flip(D_dB, axis=0)  # Flip up-down
                if np.random.rand() > 0.5:
                    D_dB = D_dB + np.random.uniform(-0.2, 0.2, size=D_dB.shape)  # Random brightness

            X[i,] = D_dB
            y[i] = batch_labels[i]

        return X, y

train_generator = OnTheFlyDataGenerator(X_train, y_train, BATCH_SIZE, IMG_SIZE, shuffle=True, augment=True)
val_generator = OnTheFlyDataGenerator(X_val, y_val, BATCH_SIZE, IMG_SIZE, shuffle=False, augment=False)

# l2 regularization
l2_regularizer = tf.keras.regularizers.l2(0.001)

model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=(IMG_SIZE[0], IMG_SIZE[1], 1), kernel_regularizer=l2_regularizer),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),
        Conv2D(64, (3, 3), activation='relu', kernel_regularizer=l2_regularizer),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),
        Conv2D(128, (3, 3), activation='relu', kernel_regularizer=l2_regularizer),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),
        Flatten(),
        Dense(128, activation='relu', kernel_regularizer=l2_regularizer),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])

optimizer = Adam(learning_rate=1e-4)
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1, profile_batch='500,520')
checkpoint_callback = ModelCheckpoint('cry_detection_model.keras', monitor='val_loss', save_best_only=True, mode='min')
lr_callback = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
class_weights = {0: 1., 1: 1.}

history = model.fit(
    train_generator,
    epochs=EPOCHS,
    validation_data=val_generator,
    class_weight=class_weights,
    callbacks=[tensorboard_callback, checkpoint_callback, lr_callback, early_stopping_callback]
)


In [None]:
y_pred = model.predict(val_generator)
y_pred = (y_pred > 0.5).astype(int)
acc = accuracy_score(y_val, y_pred)
f1 = f1_score(y_val, y_pred)

print(f'Accuracy: {acc}')
print(f'F1 Score: {f1}')

model.save('cry_detection_model.keras')

In [None]:
import pathlib
tflite_models_dir = pathlib.Path("tflite_models")
tflite_models_dir.mkdir(exist_ok=True, parents=True)


converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

tflite_model_file = tflite_models_dir/"cry_detection_model.tflite"
tflite_model_file.write_bytes(tflite_model)

converter.optimizations = [tf.lite.Optimize.DEFAULT]

tflite_fp16_model = converter.convert()
tflite_model_fp16_file = tflite_models_dir/"cry_detection_model_quant.tflite"
tflite_model_fp16_file.write_bytes(tflite_fp16_model)

# converter.target_spec.supported_types = [tf.float16]
# tflite_quant_model = converter.convert()
# tflite_model_quant_file = tflite_models_dir/"cry_detection_model_quant_f16.tflite"
# tflite_model_fp16_file.write_bytes(tflite_fp16_model)


In [None]:
interpreter = tf.lite.Interpreter(model_path="tflite_models/cry_detection_model_quant.tflite")
interpreter.allocate_tensors()

# Get input and output tensors
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

def preprocess_audio(file_path, img_size):
    y, sr = librosa.load(file_path, sr=None)
    y = librosa.util.normalize(y)
    D = librosa.stft(y, n_fft=2048, hop_length=512)
    D_dB = librosa.amplitude_to_db(np.abs(D), ref=np.max)

    # Calculate zoom factors for resizing
    zoom_factors = [img_size[0] / D_dB.shape[0], img_size[1] / D_dB.shape[1]]
    D_dB_resized = zoom(D_dB, zoom_factors, order=3)  # order=3 for cubic interpolation

    # Add channel dimension to match the original function's output
    D_dB_resized = D_dB_resized[..., np.newaxis]

    return D_dB_resized

def predict(file_path, img_size=IMG_SIZE):
    input_data = preprocess_audio(file_path, img_size)
    input_data = np.expand_dims(input_data, axis=0).astype(np.float32)

    # Set the tensor to point to the input data to be inferred
    interpreter.set_tensor(input_details[0]['index'], input_data)

    # Run inference
    interpreter.invoke()

    output_data = interpreter.get_tensor(output_details[0]['index'])

    return output_data

def process_folder(folder_path, img_size=IMG_SIZE):
    correct_predictions = 0
    total_files = 0
    results = []

    for file_name in os.listdir(folder_path):
        if file_name.endswith('.wav'):
            file_path = os.path.join(folder_path, file_name)
            prediction = predict(file_path, img_size)
            prediction_label = 'Cry' if prediction > 0.5 else 'Not Cry'
            results.append((file_name, prediction_label))
            ground_truth = 'Cry' if '_cry.wav' in file_name else 'Not Cry'

            if prediction_label == ground_truth:
                correct_predictions += 1

            total_files += 1

    accuracy = (correct_predictions / total_files) * 100 if total_files > 0 else 0

    return results, accuracy

folder_path = '{0}/Test'.format(AUDIO_PATH)
predictions, accuracy = process_folder(folder_path)

for file_name, prediction_label in predictions:
    print(f"File: {file_name}, Prediction: {prediction_label}")

print(f"Prediction Accuracy: {accuracy:.2f}%")


In [None]:
lib = ctypes.cdll.LoadLibrary('{0}/libtensorflowlite_c.so'.format(AUDIO_PATH))

# Define types for the C API functions
lib.TfLiteModelCreate.restype = ctypes.POINTER(ctypes.c_void_p)
lib.TfLiteInterpreterCreate.restype = ctypes.POINTER(ctypes.c_void_p)
lib.TfLiteInterpreterOptionsCreate.restype = ctypes.POINTER(ctypes.c_void_p)
lib.TfLiteInterpreterOptionsSetNumThreads.argtypes = [ctypes.POINTER(ctypes.c_void_p), ctypes.c_int]
lib.TfLiteInterpreterOptionsDelete.argtypes = [ctypes.POINTER(ctypes.c_void_p)]
lib.TfLiteInterpreterDelete.argtypes = [ctypes.POINTER(ctypes.c_void_p)]
lib.TfLiteModelDelete.argtypes = [ctypes.POINTER(ctypes.c_void_p)]
lib.TfLiteInterpreterGetInputTensor.restype = ctypes.POINTER(ctypes.c_void_p)
lib.TfLiteInterpreterGetOutputTensor.restype = ctypes.POINTER(ctypes.c_void_p)

model_path = b"tflite_models/cry_detection_model_quant.tflite"
with open(model_path, 'rb') as f:
    model_data = f.read()

model = lib.TfLiteModelCreate(ctypes.c_char_p(model_data), ctypes.c_size_t(len(model_data)))

# Create interpreter options and set number of threads
options = lib.TfLiteInterpreterOptionsCreate()

# Set number of threads (e.g., 2 threads)
lib.TfLiteInterpreterOptionsSetNumThreads(options, 2)

# Create the interpreter with the custom options
interpreter = lib.TfLiteInterpreterCreate(model, options)

# Allocate tensors
status = lib.TfLiteInterpreterAllocateTensors(interpreter)

# Get input and output tensor details
input_tensor = lib.TfLiteInterpreterGetInputTensor(interpreter, 0)
output_tensor = lib.TfLiteInterpreterGetOutputTensor(interpreter, 0)

# def preprocess_audio(file_path, img_size):
#     y, sr = librosa.load(file_path, sr=None)
#     y = librosa.util.normalize(y)
#     D = librosa.stft(y, n_fft=2048, hop_length=512)
#     D_dB = librosa.amplitude_to_db(np.abs(D), ref=np.max)

#     # Rescale the spectrogram to the target img_size
#     # zoom_factors = [img_size[0] / D_dB.shape[0], img_size[1] / D_dB.shape[1]]
#     # D_dB_resized = zoom(D_dB, zoom_factors).astype(np.float32)

#     # Resize using TensorFlow
#     # D_dB_resized = tf.image.resize(D_dB[..., np.newaxis], img_size).numpy()
#     # D_dB_resized = np.squeeze(D_dB_resized, axis=-1).astype(np.float32)

#     # Convert the spectrogram to an image
#     D_dB_img = Image.fromarray(D_dB)

#     # Resize the image using PIL with LANCZOS resampling
#     D_dB_resized = D_dB_img.resize(img_size, Image.Resampling.LANCZOS)

#     # Convert back to NumPy array
#     D_dB_resized = np.array(D_dB_resized).astype(np.float32)

#     return D_dB_resized

def preprocess_audio(file_path, img_size):
    y, sr = librosa.load(file_path, sr=None)
    y = librosa.util.normalize(y)
    D = librosa.stft(y, n_fft=2048, hop_length=512)
    D_dB = librosa.amplitude_to_db(np.abs(D), ref=np.max)

    # Calculate zoom factors for resizing
    zoom_factors = [img_size[0] / D_dB.shape[0], img_size[1] / D_dB.shape[1]]
    D_dB_resized = zoom(D_dB, zoom_factors, order=3)  # order=3 for cubic interpolation

    # Add channel dimension to match the original function's output
    D_dB_resized = D_dB_resized[..., np.newaxis]

    return D_dB_resized

def predict(file_path, img_size=(64, 64)):
    input_data = preprocess_audio(file_path, img_size)
    input_data = np.expand_dims(input_data, axis=0).astype(np.float32)

    # Set the tensor to point to the input data to be inferred
    lib.TfLiteTensorCopyFromBuffer(input_tensor, input_data.ctypes.data_as(ctypes.POINTER(ctypes.c_float)), ctypes.c_size_t(input_data.nbytes))

    # Run inference
    lib.TfLiteInterpreterInvoke(interpreter)

    # Extract output data
    output_size = 1
    output_data = np.empty(output_size, dtype=np.float32)
    lib.TfLiteTensorCopyToBuffer(output_tensor, output_data.ctypes.data_as(ctypes.POINTER(ctypes.c_float)), ctypes.c_size_t(output_data.nbytes))

    return output_data

def process_folder(folder_path, img_size=IMG_SIZE):
    correct_predictions = 0
    total_files = 0
    results = []

    for file_name in os.listdir(folder_path):
        if file_name.endswith('.wav'):
            file_path = os.path.join(folder_path, file_name)
            prediction = predict(file_path, img_size)
            prediction_label = 'Cry' if prediction > 0.5 else 'Not Cry'
            results.append((file_name, prediction_label))
            ground_truth = 'Cry' if '_cry.wav' in file_name else 'Not Cry'

            if prediction_label == ground_truth:
                correct_predictions += 1

            total_files += 1

    accuracy = (correct_predictions / total_files) * 100 if total_files > 0 else 0

    return results, accuracy

folder_path = '{0}/Test'.format(AUDIO_PATH)
predictions, accuracy = process_folder(folder_path)

for file_name, prediction_label in predictions:
    print(f"File: {file_name}, Prediction: {prediction_label}")

print(f"Prediction Accuracy: {accuracy:.2f}%")

# Clean up
lib.TfLiteInterpreterDelete(interpreter)
lib.TfLiteInterpreterOptionsDelete(options)
lib.TfLiteModelDelete(model)

print("All operations completed successfully.")


In [None]:
import os
import random
import librosa
import soundfile as sf
import numpy as np

def augment_data(input_folder, output_folder, ogg_files):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Resample ogg files to 22050 Hz
    ogg_clips = []
    for ogg_file in ogg_files:
        y, sr = librosa.load(ogg_file, sr=22050)
        if len(y) < 5 * sr:
            y = np.tile(y, int(np.ceil(5 * sr / len(y))))[:5 * sr]
        else:
            y = y[:5 * sr]
        ogg_clips.append((y, os.path.basename(ogg_file).split('.')[0]))

    input_files = [f for f in os.listdir(input_folder) if f.endswith('.wav')]
    num_groups = len(ogg_files)
    files_per_group = len(input_files) // num_groups

    # Split input files into groups
    for i, ogg_clip in enumerate(ogg_clips):
        group_files = input_files[i * files_per_group:(i + 1) * files_per_group]
        ogg_clip_data, ogg_clip_name = ogg_clip
        
        for input_file in group_files:
            input_path = os.path.join(input_folder, input_file)
            y, sr = librosa.load(input_path, sr=22050)
            
            # Randomly reduce gain of ogg clip
            gain_reduction = random.uniform(0, -20)
            ogg_clip_adjusted = librosa.util.normalize(ogg_clip_data) * (10 ** (gain_reduction / 20))
            
            # Mix the input file with the ogg clip
            mixed_audio = y + ogg_clip_adjusted[:len(y)]
            mixed_audio = librosa.util.normalize(mixed_audio)
            
            output_file = f"{os.path.splitext(input_file)[0]}_{ogg_clip_name}_augmented.wav"
            output_path = os.path.join(output_folder, output_file)
            sf.write(output_path, mixed_audio, sr)

input_folder = f'{AUDIO_PATH}/cry'
output_folder = f'{AUDIO_PATH}/cry/augmented'
ogg_files = [f'{AUDIO_PATH}/ac.ogg', f'{AUDIO_PATH}/dishwasher.ogg', f'{AUDIO_PATH}/fan.ogg', f'{AUDIO_PATH}/refridgerator.ogg', 
             f'{AUDIO_PATH}/tv.ogg',f'{AUDIO_PATH}/vaccum_cleaner.ogg']

augment_data(input_folder, output_folder, ogg_files)