In [7]:
!pip install streamlit

!pip install pyngrok



In [8]:
%%writefile config.py
root_dir = '/content'
resnet_dir = root_dir + '/resnet_best_early_stop.pth'
custom_dir = root_dir + '/custom_best_early_stop.pth'

Overwriting config.py


In [9]:
!ls

app.py	   custom_best_early_stop.pth  __pycache__		   sample_data
config.py  nohup.out		       resnet_best_early_stop.pth


In [10]:
%%writefile app.py

import streamlit as st
import plotly.express as px
# Import packages
import os
import torch
import torch.nn as nn
import torch.optim as optim
import config
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from IPython.core.debugger import Tracer
import soundfile as sf
import torch.utils.data as data
from torchvision import transforms, utils, models, ops
from multiprocessing import cpu_count, Pool
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import seaborn as sns
import io
from tqdm import tqdm
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import librosa
import librosa.display
import time
import librosa.effects
import IPython.display as ipd
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import ReduceLROnPlateau
import copy

def plot_waveplot(audio_data, sr):
    fig, ax = plt.subplots(figsize=(10, 3))
    ax.set_title('Waveplot of the audio', size=15)
    librosa.display.waveshow(audio_data, sr=sr, ax=ax)
    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Amplitude')
    return fig

def plot_spectrogram(audio_data, sr):
    X = librosa.stft(audio_data)
    Xdb = librosa.amplitude_to_db(abs(X))
    fig, ax = plt.subplots(figsize=(11, 3))
    ax.set_title('Spectrogram of the audio', size=15)
    img = librosa.display.specshow(Xdb, sr=sr, x_axis='time', y_axis='hz', ax=ax)
    fig.colorbar(img, ax=ax, format="%+2.f dB")
    return fig

def plot_mfcc(audio_data, sr):
    fig, ax = plt.subplots(figsize=(10, 4))
    mfcc_features = librosa.feature.mfcc(y=audio_data, sr=sr, n_mfcc=40)
    img = librosa.display.specshow(mfcc_features, x_axis='time', sr=sr, vmin=-1000, vmax=200, ax=ax)
    fig.colorbar(img, ax=ax, format='%+2.0f dB')
    ax.set_title('MFCC of the audio')
    return fig, mfcc_features

class InferenceDataset(data.Dataset):
    def __init__(self, ms=4000):
        self.ms = ms
        self.target_sr = 16000
        self.n_samples = self.target_sr * ms // 1000


    def _preprocess(self, y, sr):
      try:
        # convert to mono if needed
        if len(y.shape) > 1:
            y = librosa.to_mono(y)
        # resample to 16000 Hz
        if sr != 16000:
            y = librosa.resample(y, orig_sr=sr, target_sr=self.target_sr)
        # replicate if audio is too short
        if len(y) < self.n_samples:
            y = np.tile(y, self.n_samples // len(y) + 1)

        start = (len(y) - self.n_samples) // 2
        y = y[start:start + self.n_samples]


        win_length = int(0.03 * self.target_sr)
        hop_length = int(0.015 * self.target_sr)

        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40, win_length=win_length, hop_length=hop_length)
        # normalize MFCCs
        mfcc = (mfcc - np.mean(mfcc)) / np.std(mfcc)
        # return features and label
        return mfcc
      except Exception as e:
        st.error(f"Error during audio processing: {e}")
        return None

class AudioClassifier(nn.Module):
  def __init__(self, num_mfcc_features, num_time_frames, num_classes):
        super(AudioClassifier, self).__init__()
        self.relu = nn.ReLU()

        self.conv0 = nn.Conv2d(1, 512, kernel_size=(5,5), stride=1, padding=2)
        self.bn0 = nn.BatchNorm2d(512)
        self.pool = nn.MaxPool2d(kernel_size=(2,2), stride=2)

        self.conv1 = nn.Conv2d(512, 512, kernel_size=(5,5), stride=1, padding=2)
        self.bn1 =  nn.BatchNorm2d(512)

        self.conv2 = nn.Conv2d(512, 256, kernel_size=(5,5), stride=1, padding=2)
        self.bn2 = nn.BatchNorm2d(256)

        self.conv3 = nn.Conv2d(256, 128, kernel_size=(5,5), stride=1, padding=2)
        self.bn3 =  nn.BatchNorm2d(128)

        self.conv4 = nn.Conv2d(128, 64, kernel_size=(5,5), stride=1, padding=2)
        self.bn4 =  nn.BatchNorm2d(64)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        self.dropout1 = nn.Dropout(0.2)
        self.dropout2 = nn.Dropout(0.3)
        self.fc1 = nn.Linear(64, 32)
        self.fc2 = nn.Linear(32, num_classes)

  def forward(self, x):
        x = self.pool(self.relu(self.bn0(self.conv0(x))))
        x = self.pool(self.relu(self.bn1(self.conv1(x))))
        x = self.pool(self.relu(self.bn2(self.conv2(x))))
        x = self.pool(self.relu(self.bn3(self.conv3(x))))
        x = self.dropout1(x)
        x = self.pool(self.relu(self.bn4(self.conv4(x))))

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)

        x = self.relu(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

class PretrainedAudioClassifier(nn.Module):
    def __init__(self, num_mfcc_features, num_time_frames, num_classes):
        super(PretrainedAudioClassifier, self).__init__()
        self.resnet = models.resnet18(weights='DEFAULT')
        # Modify the first convolutional layer to accept 1 input channel
        self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        # Remove the original fully connected layer
        self.resnet.fc = nn.Identity()
        # Add adaptive average pooling 2D
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # Classifier layer
        self.classifier = nn.Linear(512, num_classes) # 512 is the output feature size of ResNet18 before the FC layer

    def forward(self, x):
        # Input x shape: (batch_size, 1, 40, 194)
        x = self.resnet(x)
        # After resnet, x should ideally be (batch_size, 512, H', W')
        # If the dimensions are collapsed, we need to ensure it's 4D for AdaptiveAvgPool2d
        # We can try to reshape it, assuming the last dimension is the flattened spatial one if it's 3D
        if x.dim() == 3:
            # Assuming shape is (batch_size, channels, spatial_dim), reshape to (batch_size, channels, 1, spatial_dim)
            x = x.unsqueeze(-2)
        elif x.dim() == 2:
             # This case is less likely but handle it by reshaping to (batch_size, channels, 1, 1)
             x = x.unsqueeze(-1).unsqueeze(-1)


        x = self.avgpool(x) # Apply adaptive average pooling 2D, output shape (batch_size, 512, 1, 1)
        x = x.view(x.size(0), -1)  # Flatten the output to (batch_size, 512)
        x = self.classifier(x) # Output shape (batch_size, num_classes)
        return x

# --- Configuration ---
# Set the page configuration for a wider layout
st.set_page_config(layout="wide", page_title="Audio Analysis Dashboard")

# --- Page Header ---
st.title("🎶 Emotion Detection Dashboard")
st.markdown("""
Welcome to the audio analysis dashboard! Upload your audio files, preprocess them,
and run inference with different models.
""")

@st.cache_resource
def load_model(model_name):
    # This is where you would load your actual .pth, .h5, or .pkl files
    # For demonstration, we'll create dummy models
    st.write(f"Loading model: {model_name}...")
    model = None
    labels = ['female_angry', 'female_disgust', 'female_fear', 'female_happy', 'female_neutral', 'female_sad', 'female_surprise', 'male_angry', 'male_disgust', 'male_fear', 'male_happy', 'male_neutral', 'male_sad', 'male_surprise']
    NUM_CLASSES = len(labels)
    NUM_MFCC_FEATURES = 40
    NUM_TIME_FRAMES = 194
    try:
      if model_name == "ResNet18":
          # Load your actual Model
          model = PretrainedAudioClassifier(NUM_MFCC_FEATURES, NUM_TIME_FRAMES, NUM_CLASSES)
          checkpoint = torch.load(config.resnet_dir, map_location=torch.device('cpu'))
          model_state_dict = checkpoint['net_state_dict']
          model.load_state_dict(model_state_dict)
          return model, labels

      elif model_name == "Custom Model":
          model = AudioClassifier(NUM_MFCC_FEATURES, NUM_TIME_FRAMES, NUM_CLASSES)
          checkpoint = torch.load(config.custom_dir, map_location=torch.device('cpu'))
          model_state_dict = checkpoint['net_state_dict']
          model.load_state_dict(model_state_dict)
          return model, labels
      else:
          st.error(f"Unknown model name: {model_name}")
          return None, labels
      model.eval() # Set model to evaluation mode
      st.success(f"{model_name} loaded successfully!")
      return model, labels
    except Exception as e:
        st.error(f"Error loading model '{model_name}': {e}. Check path, model architecture, and file integrity.")
        return None, labels

@st.cache_data
def preprocess_for_inference(audio_file_bytes):
    y_raw, sr_raw = None, None
    processed_mfcc = None

    try:
        # Use soundfile directly with BytesIO as librosa.load can have issues with it for some formats/setups
        # sf.read expects the file-like object to be seekable and readable
        audio_io = io.BytesIO(audio_file_bytes)

        # This is the line that caused 'sf' not defined before, ensure 'import soundfile as sf' is present
        y_raw, sr_raw = sf.read(audio_io)

        # sf.read returns float64, librosa features sometimes prefer float32, and always mono for MFCC.
        if y_raw.dtype != np.float32:
            y_raw = y_raw.astype(np.float32)

        # Ensure audio is mono, as required by librosa.to_mono
        if y_raw.ndim > 1: # Check if it has multiple channels
            y_raw = librosa.to_mono(y_raw.T) # .T transposes for channels-last format if needed

        inference_processor = InferenceDataset(ms=4000)
        processed_mfcc = inference_processor._preprocess(y_raw, sr_raw)

        return y_raw, sr_raw, processed_mfcc

    except Exception as e:
        st.error(f"Error during audio processing for inference: {e}")
        st.error(f"Type of uploaded file: {type(audio_file_bytes)}")
        st.error(f"Length of uploaded bytes: {len(audio_file_bytes)} bytes")
        return None, None, None


st.header("📤 Upload Audio File")
uploaded_file = st.file_uploader("Choose an audio file...", type=["wav"])

audio_data_raw = None
sr_raw = None
mfccs_for_model = None

if uploaded_file is not None:

    file_contents = uploaded_file.read()
    # Display the audio player directly from the uploaded file bytes
    st.audio(file_contents, format=f'audio/{uploaded_file.type.split("/")[-1]}', start_time=0)
    st.success("File uploaded successfully!")

    with st.spinner("Processing audio and extracting features..."):
        # Pass the byte stream to our processing function
        audio_data_raw, sr_raw, mfccs_for_model = preprocess_for_inference(file_contents)

    if audio_data_raw is not None and mfccs_for_model is not None:
        st.subheader("📊 Audio Features & Visualizations")

        # Create tabs for better organization
        tab_waveform, tab_mfcc, tab_spectrogram = st.tabs(["Waveform", "MFCC", "Spectrogram"])
        with tab_waveform:
            st.write("#### Waveform")
            fig_wave = plot_waveplot(audio_data_raw, sr_raw)
            st.pyplot(fig_wave)
            plt.close(fig_wave)
        with tab_mfcc:
              st.write("#### Mel-frequency Cepstral Coefficients (MFCCs)")
              # Note: plot_mfcc is called with raw audio data to recompute for display purposes
              # mfccs_for_model variable holds the *preprocessed* mfccs for the model
              fig_mfcc, _ = plot_mfcc(audio_data_raw, sr_raw)
              st.pyplot(fig_mfcc)
              plt.close(fig_mfcc)
        with tab_spectrogram:
            st.write("#### Spectrogram")
            fig_spec = plot_spectrogram(audio_data_raw, sr_raw)
            st.pyplot(fig_spec)
            plt.close(fig_spec)
        st.header("🧠 Model Inference")
        model_options = ["ResNet18", "Custom Model"]
        selected_model_name = st.selectbox("Choose a model for inference:", model_options)
        selected_model_idx = model_options.index(selected_model_name)
        current_model_name = model_options[selected_model_idx]

        st.info(f"Selected Model: **{current_model_name}**")

        if st.button(f"Run Inference with {current_model_name}"):
          if mfccs_for_model is not None:
              model, labels = load_model(current_model_name)

              if model and labels:
                  st.subheader(f"Results from {current_model_name}:")
                  with st.spinner("Running inference..."):
                      try:
                          # Prepare MFCCs for the model
                          # Assuming your PyTorch model expects (batch_size, 1, n_mfcc, n_frames)
                          # You might need to adjust this reshaping based on your actual model's input
                          mfccs_tensor = torch.tensor(mfccs_for_model).float().unsqueeze(0).unsqueeze(0)

                          with torch.no_grad(): # Disable gradient calculation for inference
                              outputs = model(mfccs_tensor)
                              # For classification, typically use argmax or softmax
                              probabilities = torch.softmax(outputs, dim=1)[0]
                              predicted_class_idx = torch.argmax(probabilities).item()
                              predicted_label = labels[predicted_class_idx]

                              st.success(f"**Prediction:** `{predicted_label}`")
                              st.write("---")
                              st.write("#### Probabilities:")
                              # Display probabilities for all classes
                              prob_df = pd.DataFrame({
                                  'Class': labels,
                                  'Probability': probabilities.numpy()
                              })
                              prob_df = prob_df.sort_values(by='Probability', ascending=False)
                              st.dataframe(prob_df.set_index('Class'))

                      except Exception as e:
                          st.error(f"Error during model inference: {e}. Please check model input shape and loading.")
                          st.write(f"Expected MFCC shape: {mfccs_tensor.shape if 'mfccs_tensor' in locals() else 'N/A'}")
                          st.write(f"Raw MFCCs for model shape: {mfccs_for_model.shape}")
              else:
                  st.error("Could not load the selected model. Please check model definitions and paths.")
          else:
              st.warning("Please upload and preprocess an audio file first to run inference.")

else:
  st.info("Upload an audio file (.wav) to get started!")


Overwriting app.py


In [11]:
!nohup streamlit run app.py &

nohup: appending output to 'nohup.out'


In [12]:
from pyngrok import ngrok
!ngrok config add-authtoken 2zghj3h5IaUPeaXw810rFZiXotS_4zDtUxAVb65xq9m3LFHcb
# Setup a tunnel to the streamlit port 8501
public_url = ngrok.connect(addr=8501, proto="http")
public_url

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


<NgrokTunnel: "https://02a3543e7ceb.ngrok-free.app" -> "http://localhost:8501">