In [None]:
#part 1 of  mini project phase 2

In [18]:
import os
from pydub import AudioSegment
from pydub.silence import split_on_silence
import tensorflow as tf
import numpy as np
from scipy.signal import resample
import csv
from tensorflow.keras.utils import register_keras_serializable
from tensorflow.keras import layers

# Define the custom ResidualUnit layer
@register_keras_serializable()
class ResidualUnit(tf.keras.layers.Layer):
    def __init__(self, filters, strides=1, **kwargs):
        super().__init__(**kwargs)
        self.filters = filters
        self.strides = strides
        self.conv1 = layers.Conv2D(filters, kernel_size=3, strides=strides, padding="same", use_bias=False)
        self.bn1 = layers.BatchNormalization()
        self.activation = layers.Activation("relu")
        self.conv2 = layers.Conv2D(filters, kernel_size=3, strides=1, padding="same", use_bias=False)
        self.bn2 = layers.BatchNormalization()

        if strides > 1 or filters != kwargs.get('input_shape', [None, 374, 129, 1])[-1]:
            self.skip_conv = layers.Conv2D(filters, kernel_size=1, strides=strides, padding="same", use_bias=False)
            self.skip_bn = layers.BatchNormalization()
        else:
            self.skip_conv = None

    def call(self, inputs, training=False):
        x = self.conv1(inputs)
        x = self.bn1(x, training=training)
        x = self.activation(x)
        x = self.conv2(x)
        x = self.bn2(x, training=training)

        if self.skip_conv is not None:
            skip = self.skip_conv(inputs)
            skip = self.skip_bn(skip, training=training)
        else:
            skip = inputs

        return self.activation(x + skip)

    def get_config(self):
        config = super().get_config()
        config.update({
            "filters": self.filters,
            "strides": self.strides,
        })
        return config

# Load the trained model
model = tf.keras.models.load_model(
    'farsi_numbers_detectionjupyter.keras',
    custom_objects={'ResidualUnit': ResidualUnit}
)

# Define the class labels
commands = np.array(['8', '5', '4', '9', '1', '7', '6', '3', '2', '10', '0'])

# Function to get MFCCs from audio
def get_mfccs(audio, sample_rate):
    frame_length = int(sample_rate / 40)
    frame_step = int(sample_rate / 100)
    fft_length = frame_length
    num_feats = 40

    stfts = tf.signal.stft(audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length)
    spectrograms = tf.abs(stfts)

    num_spectrogram_bins = stfts.shape[-1]
    lower_edge_hertz, upper_edge_hertz, num_mel_bins = 0, sample_rate / 2, num_feats
    linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
        num_mel_bins, num_spectrogram_bins, sample_rate, lower_edge_hertz, upper_edge_hertz)
    mel_spectrograms = tf.tensordot(spectrograms, linear_to_mel_weight_matrix, 1)
    mel_spectrograms.set_shape(spectrograms.shape[:-1].concatenate(linear_to_mel_weight_matrix.shape[-1:]))

    log_mel_spectrograms = tf.math.log(mel_spectrograms + 1e-6)
    mfccs = tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrograms)
    mfccs = mfccs[..., tf.newaxis]
    return mfccs

# Split audio into chunks
def split_audio(file_path, min_silence_len=200, silence_thresh=-30):
    audio = AudioSegment.from_file(file_path)
    audio_chunks = split_on_silence(audio, min_silence_len=min_silence_len, silence_thresh=silence_thresh)
    return audio_chunks

# Process and predict for a single file
def process_single_file(audio_file_path, output_txt_path):
    audio_chunks = split_audio(audio_file_path)
    labels = []

    for i, chunk in enumerate(audio_chunks):
        chunk_path = f'temp_chunk_{i}.wav'
        chunk.export(chunk_path, format="wav")

        audio_binary = tf.io.read_file(chunk_path)
        audio, sample_rate = tf.audio.decode_wav(audio_binary)

        if len(audio.shape) > 1:
            audio = tf.reduce_mean(audio, axis=-1)
        else:
            audio = tf.squeeze(audio, axis=-1)

        desired_sample_rate = 16000
        if sample_rate.numpy() != desired_sample_rate:
            num_samples = int(desired_sample_rate / sample_rate.numpy() * len(audio))
            audio = resample(audio.numpy(), num_samples)
            audio = tf.convert_to_tensor(audio, dtype=tf.float32)

        mfccs = get_mfccs(audio, desired_sample_rate)
        input_shape = model.input_shape[1:]
        mfccs = tf.image.resize(mfccs, [input_shape[0], input_shape[1]])
        mfccs = tf.expand_dims(mfccs, axis=0)

        predictions = model.predict(mfccs)
        predicted_label_index = np.argmax(predictions, axis=1)[0]
        predicted_label = commands[predicted_label_index]
        labels.append(predicted_label)

        os.remove(chunk_path)

    with open(output_txt_path, 'w') as f:
        for label in labels:
            f.write(f'{label}\n')
    print(f'Predicted labels written to {output_txt_path}')

# Process and predict for multiple files
def process_multiple_files(audio_folder_path, output_csv_path):
    wav_files = [f for f in os.listdir(audio_folder_path) if f.endswith('.wav')]
    wav_files = sorted(wav_files)[:30]

    with open(output_csv_path, 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)

        for wav_file in wav_files:
            audio_file_path = os.path.join(audio_folder_path, wav_file)
            audio_chunks = split_audio(audio_file_path)
            labels = []

            for i, chunk in enumerate(audio_chunks):
                chunk_path = f'temp_chunk_{i}.wav'
                chunk.export(chunk_path, format="wav")

                audio_binary = tf.io.read_file(chunk_path)
                audio, sample_rate = tf.audio.decode_wav(audio_binary)

                if len(audio.shape) > 1:
                    audio = tf.reduce_mean(audio, axis=-1)
                else:
                    audio = tf.squeeze(audio, axis=-1)

                desired_sample_rate = 16000
                if sample_rate.numpy() != desired_sample_rate:
                    num_samples = int(desired_sample_rate / sample_rate.numpy() * len(audio))
                    audio = resample(audio.numpy(), num_samples)
                    audio = tf.convert_to_tensor(audio, dtype=tf.float32)

                mfccs = get_mfccs(audio, desired_sample_rate)
                input_shape = model.input_shape[1:]
                mfccs = tf.image.resize(mfccs, [input_shape[0], input_shape[1]])
                mfccs = tf.expand_dims(mfccs, axis=0)

                predictions = model.predict(mfccs)
                predicted_label_index = np.argmax(predictions, axis=1)[0]
                predicted_label = commands[predicted_label_index]
                labels.append(predicted_label)

                os.remove(chunk_path)

            # Replace .wav with .mp3 before writing to CSV
            mp3_file_name = wav_file.replace('.wav', '.mp3')
            csvwriter.writerow([mp3_file_name] + labels)

    print(f'Predicted labels written to {output_csv_path}')


if __name__ == "__main__":
    # For single file processing
    #audio_file_path = "mini_test/dastiwav3/dastiwav3/0013.wav"
    #output_txt_path = "mini_test/predicted_labels_0013.txt"
    #process_single_file(audio_file_path, output_txt_path)

    # For batch processing
    audio_folder_path = "mini_test/dastiwav3/dastiwav3"
    output_csv_path = "mini_test/predicted_labels4.csv"
    process_multiple_files(audio_folder_path, output_csv_path)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 234ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 58ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 51ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 127ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 

In [8]:
import os
from pydub import AudioSegment
from pydub.silence import split_on_silence
import tensorflow as tf
import numpy as np
from scipy.signal import resample
import csv
from tensorflow.keras.utils import register_keras_serializable
from tensorflow.keras import layers

        # Define the custom ResidualUnit layer
@register_keras_serializable()
class ResidualUnit(tf.keras.layers.Layer):
    def __init__(self, filters, strides=1, **kwargs):
        super().__init__(**kwargs)
        self.filters = filters
        self.strides = strides
        self.conv1 = layers.Conv2D(filters, kernel_size=3, strides=strides, padding="same", use_bias=False)
        self.bn1 = layers.BatchNormalization()
        self.activation = layers.Activation("relu")
        self.conv2 = layers.Conv2D(filters, kernel_size=3, strides=1, padding="same", use_bias=False)
        self.bn2 = layers.BatchNormalization()

        if strides > 1 or filters != kwargs.get('input_shape', [None, 374, 129, 1])[-1]:
            self.skip_conv = layers.Conv2D(filters, kernel_size=1, strides=strides, padding="same", use_bias=False)
            self.skip_bn = layers.BatchNormalization()
        else:
            self.skip_conv = None

    def call(self, inputs, training=False):
        x = self.conv1(inputs)
        x = self.bn1(x, training=training)
        x = self.activation(x)
        x = self.conv2(x)
        x = self.bn2(x, training=training)

        if self.skip_conv is not None:
            skip = self.skip_conv(inputs)
            skip = self.skip_bn(skip, training=training)
        else:
            skip = inputs

        return self.activation(x + skip)

    def get_config(self):
        config = super().get_config()
        config.update({
            "filters": self.filters,
            "strides": self.strides,
        })
        return config

model = tf.keras.models.load_model(
    'farsi_numbers_detectionjupyter.keras',
    custom_objects={'ResidualUnit': ResidualUnit}  # Include custom layers if used
)




