In [69]:
import torch
import torchaudio
import librosa
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import math
from collections import OrderedDict

####################################################
# Configuration and Label Setup (Adjust as needed)
####################################################
audio_conf = {
    'sample_rate': 16000,      # Match your training sample_rate
    'window_size': 0.002,       # Match your training window_size
    'window_stride': 0.01,     # Match your training window_stride
    'window': 'hamming',       # Match your training window type
}

hidden_size = 1024      # Match rnn_hidden_size from training
hidden_layers = 5        # Match nb_layers from training
rnn_type = 'lstm'        # 'lstm', 'gru', or 'rnn' used during training
bidirectional = True      # Match your training setup

LABELS = [
    "_", "'",
    "A", "B", "C", "D", "E", "F", "G",
    "H", "I", "J", "K", "L", "M", "N",
    "O", "P", "Q", "R", "S", "T", "U",
    "V", "W", "X", "Y", "Z", " "
]

supported_rnns = {
    'lstm': nn.LSTM,
    'rnn': nn.RNN,
    'gru': nn.GRU
}
supported_rnns_inv = dict((v, k) for k, v in supported_rnns.items())

####################################################
# Utility Classes
####################################################
class SequenceWise(nn.Module):
    def __init__(self, module):
        super(SequenceWise, self).__init__()
        self.module = module

    def forward(self, x):
        t, n = x.size(0), x.size(1)
        x = x.view(t * n, -1)
        x = self.module(x)
        x = x.view(t, n, -1)
        return x

class MaskConv(nn.Module):
    def __init__(self, seq_module):
        super(MaskConv, self).__init__()
        self.seq_module = seq_module

    def forward(self, x, lengths):
        for module in self.seq_module:
            x = module(x)
            mask = torch.BoolTensor(x.size()).fill_(0)
            if x.is_cuda:
                mask = mask.cuda()
            for i, length in enumerate(lengths):
                length = length.item()
                if (mask[i].size(2) - length) > 0:
                    mask[i].narrow(2, length, mask[i].size(2) - length).fill_(1)
            x = x.masked_fill(mask, 0)
        return x, lengths

class InferenceBatchSoftmax(nn.Module):
    def forward(self, input_):
        if not self.training:
            return F.softmax(input_, dim=-1)
        else:
            return input_

class BatchRNN(nn.Module):
    def __init__(self, input_size, hidden_size, rnn_type=nn.LSTM, bidirectional=False, batch_norm=True):
        super(BatchRNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bidirectional = bidirectional
        self.batch_norm = SequenceWise(nn.BatchNorm1d(input_size)) if batch_norm else None
        self.rnn = rnn_type(input_size=input_size, hidden_size=hidden_size,
                            bidirectional=bidirectional, bias=True)
        self.num_directions = 2 if bidirectional else 1

    def flatten_parameters(self):
        self.rnn.flatten_parameters()

    def forward(self, x, output_lengths):
        if self.batch_norm is not None:
            x = self.batch_norm(x)
        x = nn.utils.rnn.pack_padded_sequence(x, output_lengths)
        x, _ = self.rnn(x)
        x, _ = nn.utils.rnn.pad_packed_sequence(x)
        if self.bidirectional:
            # (TxNxH*2) -> (TxNxH) by sum of directions
            x = x.view(x.size(0), x.size(1), 2, -1).sum(2).view(x.size(0), x.size(1), -1)
        return x

class Lookahead(nn.Module):
    # Wang et al 2016 - Lookahead Convolution Layer for Unidirectional RNN
    def __init__(self, n_features, context):
        super(Lookahead, self).__init__()
        assert context > 0
        self.context = context
        self.n_features = n_features
        self.pad = (0, self.context - 1)
        self.conv = nn.Conv1d(self.n_features, self.n_features, kernel_size=self.context,
                              stride=1, groups=self.n_features, padding=0, bias=None)

    def forward(self, x):
        x = x.transpose(0, 1).transpose(1, 2)  # N, H, T
        x = F.pad(x, pad=self.pad, value=0)
        x = self.conv(x)
        x = x.transpose(1, 2).transpose(0, 1).contiguous()
        return x

####################################################
# Decoder Base Class and GreedyDecoder
####################################################
class Decoder(object):
    def __init__(self, labels, blank_index=0):
        self.labels = labels
        self.int_to_char = dict([(i, c) for (i, c) in enumerate(labels)])
        self.blank_index = blank_index
        space_index = len(labels)
        if ' ' in labels:
            space_index = labels.index(' ')
        self.space_index = space_index

    def wer(self, s1, s2):
        # Simple WER calculation (not needed for this inference script, but left for completeness)
        import Levenshtein as Lev
        words1 = s1.split()
        words2 = s2.split()
        return Lev.distance(' '.join(words1), ' '.join(words2)) / float(len(words1) or 1)

    def cer(self, s1, s2):
        import Levenshtein as Lev
        s1, s2 = s1.replace(' ', ''), s2.replace(' ', '')
        if len(s1) == 0 and len(s2) == 0:
            return 0.0
        return Lev.distance(s1, s2)

    def decode(self, probs, sizes=None):
        raise NotImplementedError

class GreedyDecoder(Decoder):
    def __init__(self, labels, blank_index=0):
        super(GreedyDecoder, self).__init__(labels, blank_index)

    def convert_to_strings(self, sequences, sizes=None, remove_repetitions=False, return_offsets=False):
        strings = []
        offsets = [] if return_offsets else None
        for x in range(len(sequences)):
            seq_len = sizes[x] if sizes is not None else len(sequences[x])
            string, string_offsets = self.process_string(sequences[x], seq_len, remove_repetitions)
            strings.append([string])  # one path
            if return_offsets:
                offsets.append([string_offsets])
        if return_offsets:
            return strings, offsets
        else:
            return strings

    def process_string(self, sequence, size, remove_repetitions=False):
        string = ''
        offsets = []
        for i in range(size):
            char = self.int_to_char[sequence[i].item()]
            if char != self.int_to_char[self.blank_index]:
                if remove_repetitions and i != 0 and char == self.int_to_char[sequence[i - 1].item()]:
                    pass
                elif char == self.labels[self.space_index]:
                    string += ' '
                    offsets.append(i)
                else:
                    string = string + char
                    offsets.append(i)
        return string, torch.tensor(offsets, dtype=torch.int)

    def decode(self, probs, sizes=None):
        _, max_probs = torch.max(probs, 2)
        strings, offsets = self.convert_to_strings(
            max_probs.view(max_probs.size(0), max_probs.size(1)),
            sizes,
            remove_repetitions=True,
            return_offsets=True
        )
        return strings, offsets


####################################################
# DeepSpeech Model Class
####################################################
class DeepSpeech(nn.Module):
    def __init__(self, rnn_type, labels, rnn_hidden_size, nb_layers, audio_conf, bidirectional, context=20):
        super(DeepSpeech, self).__init__()
        self.hidden_size = rnn_hidden_size
        self.hidden_layers = nb_layers
        self.rnn_type = rnn_type
        self.audio_conf = audio_conf
        self.labels = labels
        self.bidirectional = bidirectional

        sample_rate = self.audio_conf["sample_rate"]
        window_size = self.audio_conf["window_size"]
        num_classes = len(self.labels)

        self.conv = MaskConv(nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5)),
            nn.BatchNorm2d(32),
            nn.Hardtanh(0, 20, inplace=True),
            nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 1), padding=(10, 5)),
            nn.BatchNorm2d(32),
            nn.Hardtanh(0, 20, inplace=True)
        ))

        # Calculate RNN input size
        rnn_input_size = int(math.floor((sample_rate * window_size) / 2) + 1)
        rnn_input_size = int(math.floor(rnn_input_size + 2 * 20 - 41) / 2 + 1)
        rnn_input_size = int(math.floor(rnn_input_size + 2 * 10 - 21) / 2 + 1)
        rnn_input_size *= 32

        rnns = []
        rnn = BatchRNN(input_size=rnn_input_size, hidden_size=rnn_hidden_size,
                       rnn_type=rnn_type, bidirectional=bidirectional, batch_norm=False)
        rnns.append(('0', rnn))
        for x in range(nb_layers - 1):
            rnn = BatchRNN(input_size=rnn_hidden_size, hidden_size=rnn_hidden_size,
                           rnn_type=rnn_type, bidirectional=bidirectional)
            rnns.append(('%d' % (x + 1), rnn))
        self.rnns = nn.Sequential(OrderedDict(rnns))

        self.lookahead = nn.Sequential(
            Lookahead(rnn_hidden_size, context=context),
            nn.Hardtanh(0, 20, inplace=True)
        ) if not bidirectional else None

        fully_connected = nn.Sequential(
            nn.BatchNorm1d(rnn_hidden_size),
            nn.Linear(rnn_hidden_size, num_classes, bias=False)
        )
        self.fc = nn.Sequential(SequenceWise(fully_connected))

        self.inference_softmax = InferenceBatchSoftmax()

    def forward(self, x, lengths):
        lengths = lengths.cpu().int()
        output_lengths = self.get_seq_lens(lengths)
        x, _ = self.conv(x, output_lengths)

        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # collapse feature dim
        x = x.transpose(1, 2).transpose(0, 1).contiguous()   # TxNxH

        for rnn in self.rnns:
            x = rnn(x, output_lengths)

        if not self.bidirectional:
            x = self.lookahead(x)

        x = self.fc(x)
        x = x.transpose(0, 1)
        x = self.inference_softmax(x)
        return x, output_lengths

    def get_seq_lens(self, input_length):
        seq_len = input_length
        for m in self.conv.modules():
            if type(m) == nn.Conv2d:
                seq_len = ((seq_len + 2 * m.padding[1] - m.dilation[1] * (m.kernel_size[1] - 1) - 1) / m.stride[1] + 1)
        return seq_len.int()

####################################################
# Audio Loading and Preprocessing
####################################################
def load_audio(audio_path, sample_rate):
    y, sr = librosa.load(audio_path, sr=sample_rate)
    n_fft = int(sample_rate * audio_conf['window_size'])
    hop_length = int(sample_rate * audio_conf['window_stride'])
    win_length = n_fft

    D = librosa.stft(y, n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=audio_conf['window'])
    spect, _ = librosa.magphase(D)
    spect = np.log1p(spect)  # log(1 + spect)

    mean = spect.mean()
    std = spect.std()
    spect = (spect - mean) / std

    spect = torch.FloatTensor(spect)
    return spect

def prepare_input_tensor(spect):
    spect = spect.unsqueeze(0).unsqueeze(0)  # [1, 1, freq, time]
    return spect

####################################################
# Main Inference Logic
####################################################

# Instantiate the model
model = DeepSpeech(
    rnn_type=supported_rnns[rnn_type],
    labels=LABELS,
    rnn_hidden_size=hidden_size,
    nb_layers=hidden_layers,
    audio_conf=audio_conf,
    bidirectional=bidirectional
)

# Load your pretrained model weights
model_path = "/content/drive/MyDrive/advanced-machine-learning/best_model.bin"
state_dict = torch.load(model_path, map_location='cpu')
model.load_state_dict(state_dict)
model.eval()

# Instantiate the decoder
decoder = GreedyDecoder(LABELS)

# Specify the path to the audio file you want to test
audio_file = "/content/drive/MyDrive/advanced-machine-learning/final4.wav"

# Load and preprocess the audio
spect = load_audio(audio_file, sample_rate=audio_conf['sample_rate'])
inputs = prepare_input_tensor(spect)
input_lengths = torch.IntTensor([inputs.size(3)])

# Run inference
with torch.no_grad():
    outputs, output_sizes = model(inputs, input_lengths)

# Decode
decoded_output, _ = decoder.decode(outputs, output_sizes)
transcribed_text = decoded_output[0][0]

print("Transcribed Text:", transcribed_text)


  state_dict = torch.load(model_path, map_location='cpu')


Transcribed Text: TN TOEES


In [70]:
import librosa
import numpy as np

from tensorflow.keras.models import load_model
from tensorflow.keras.models import model_from_json

# Load model configuration
with open("/content/drive/MyDrive/advanced-machine-learning/best_model.keras/config.json", "r") as json_file:
    model_config = json_file.read()

# Recreate the model architecture
model = model_from_json(model_config)

# Load the weights
model.load_weights("/content/drive/MyDrive/advanced-machine-learning/best_model.keras/model.weights.h5")

model.summary()

audio_path = "/content/drive/MyDrive/advanced-machine-learning/final4.wav"
audio_data, _ = librosa.load(audio_path, sr=16000) # Replace 16000 with your model's expected sample rate
input_data = np.expand_dims(audio_data, axis=0)  # Add batch dimension

# Assuming input_data is preprocessed
predictions = model.predict(input_data)
print(predictions)




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[[2.3495150e-04 2.2222779e-09 9.9964249e-01 5.8524306e-06 4.4477896e-05
  4.2014952e-05 2.9816256e-05 3.3385902e-07]]


labels = ['neutral', 'calm', 'sad', 'happy', 'fear', 'disgust', 'surprise', 'angry']

According to the prediction received above,  we have [[2.3495150e-04 2.2222779e-09 9.9964249e-01 5.8524306e-06 4.4477896e-05
  4.2014952e-05 2.9816256e-05 3.3385902e-07] and the maximum value is of 9.9964249e-01 which is index 2. Which means the prediction is SAD
  Which means it has succesfully predicted the speech]