In [None]:
import streamlit as st
import numpy as np
import librosa
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import joblib
import torch
import torch.nn as nn
import asyncio
import warnings
import os
warnings.filterwarnings("ignore")

# Define DNN class (must match Task 1 and Task 2 architectures)
class AudioDNN(nn.Module):
    def __init__(self, input_dim):
        super(AudioDNN, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )
    def forward(self, x):
        return self.net(x)

class DefectDNN(nn.Module):
    def __init__(self, input_dim, n_labels):
        super(DefectDNN, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, n_labels),
            nn.Sigmoid()
        )
    def forward(self, x):
        return self.net(x)

# Define MultiLabelPerceptron class (must match Task 2 definition)
class MultiLabelPerceptron:
    def __init__(self, n_features, n_labels, learning_rate=0.01):
        self.weights = np.zeros((n_labels, n_features))
        self.bias = np.zeros(n_labels)
        self.lr = learning_rate

    def predict(self, X):
        scores = X @ self.weights.T + self.bias
        return (scores > 0).astype(int)

    def update(self, x, y_true):
        y_pred = self.predict(x.reshape(1, -1))[0]
        for label in range(len(y_true)):
            if y_true[label] != y_pred[label]:
                self.weights[label] += self.lr * (y_true[label] - y_pred[label]) * x
                self.bias[label] += self.lr * (y_true[label] - y_pred[label])

# Initialize event loop (workaround for RuntimeError)
try:
    loop = asyncio.get_event_loop()
    if not loop.is_running():
        asyncio.set_event_loop(asyncio.new_event_loop())
except RuntimeError:
    pass

# Load pre-trained models and preprocessing objects
try:
    # Task 1 (Audio Models)
    lr_audio_model = joblib.load("lr_audio_model.joblib")
    svm_audio_model = joblib.load("svm_audio_model.joblib")
    perceptron_audio_model = joblib.load("perceptron_audio_model.joblib")
    dnn_audio_model = AudioDNN(input_dim=3900)  # Match Task 1 input dimension (13 * 300)
    dnn_audio_model.load_state_dict(torch.load("dnn_audio_model.pth"))
    dnn_audio_model.eval()
    audio_scaler = joblib.load("audio_scaler.joblib")

    # Task 2 (Defect Models)
    lr_defect_model = joblib.load("lr_defect_model.joblib")
    svm_defect_model = joblib.load("svm_defect_model.joblib")
    perceptron_defect_model = joblib.load("perceptron_defect_model.joblib")
    dnn_defect_model = DefectDNN(input_dim=500, n_labels=7)  # Match Task 2 input dimension and labels
    dnn_defect_model.load_state_dict(torch.load("dnn_defect_model.pth"))
    dnn_defect_model.eval()
    vectorizer = joblib.load("vectorizer.joblib")
    scaler = joblib.load("scaler.joblib")

    st.success("Models and preprocessing objects loaded successfully!")
except Exception as e:
    st.error(f"Error loading models or preprocessing objects: {e}")

# Preprocessing functions (match with Task 1 and Task 2)
def extract_mfcc(audio_file, n_mfcc=13, max_len=300):
    """Extract MFCC features from audio file."""
    y, sr = librosa.load(audio_file, sr=None)
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
    if mfcc.shape[1] < max_len:
        mfcc = np.pad(mfcc, ((0, 0), (0, max_len - mfcc.shape[1])), mode='constant')
    else:
        mfcc = mfcc[:, :max_len]
    return mfcc.flatten().reshape(1, -1)

In [None]:

def preprocess_defect_features(feature_vector, vectorizer, scaler):
    """Transform input feature vector for defect prediction."""
    tfidf_features = vectorizer.transform([feature_vector]).toarray()
    scaled_features = scaler.transform(tfidf_features)
    return scaled_features

# Function to get list of audio files from deepfake_detection_dataset_urdu
def get_audio_files():
    desktop_path = os.path.expanduser("~/Desktop/22F-3625-DS-A-04/deepfake_detection_dataset_urdu")
    audio_files = []
    for root, dirs, files in os.walk(desktop_path):
        for file in files:
            if file.endswith(".wav"):
                audio_files.append(os.path.join(root, file))
    return audio_files

# Streamlit App
st.title("Multi-Task Prediction App")
st.write("Upload an audio file for deepfake detection or input a feature vector for defect prediction.")

# Sidebar for model selection
model_options = ["Logistic Regression", "SVM", "Perceptron", "DNN"]
selected_model = st.sidebar.selectbox("Select Model", model_options)

# Audio Upload Section
st.subheader("Deepfake Audio Detection")
audio_files = get_audio_files()
selected_audio = st.selectbox("Select an Audio File from Dataset", ["Upload a new file"] + audio_files)

if selected_audio == "Upload a new file":
    audio_file = st.file_uploader("Upload Audio File", type=["wav", "mp3"])
else:
    audio_file = selected_audio

if audio_file is not None:
    # Handle both uploaded files and file paths
    if isinstance(audio_file, str):
        # If it's a file path from the dataset
        audio_path = audio_file
    else:
        # If it's an uploaded file, save it temporarily
        audio_path = "temp_audio.wav"
        with open(audio_path, "wb") as f:
            f.write(audio_file.getvalue())

    mfcc_features = extract_mfcc(audio_path, max_len=300)  # Match Task 1 max_len
    scaled_features = audio_scaler.transform(mfcc_features)  # Use Task 1 scaler

    if st.button("Predict Deepfake"):
        if selected_model == "Logistic Regression":
            prediction = lr_audio_model.predict_proba(scaled_features)[:, 1][0]
        elif selected_model == "SVM":
            prediction = svm_audio_model.decision_function(scaled_features)[0]  # Approx. probability
        elif selected_model == "Perceptron":
            prediction = (perceptron_audio_model.predict(scaled_features) > 0).astype(int)[0]  # Binary output
        else:  # DNN
            with torch.no_grad():
                input_tensor = torch.tensor(scaled_features, dtype=torch.float32)
                prediction = dnn_audio_model(input_tensor).numpy()[0][0]

        confidence = prediction if selected_model != "SVM" else 1 / (1 + np.exp(-prediction))  # Sigmoid for SVM
        label = "Deepfake" if confidence > 0.5 else "Bonafide"
        st.write(f"Prediction: {label}")
        st.write(f"Confidence Score: {confidence:.4f}")

    # Clean up temporary file if it was created
    if isinstance(audio_file, str) is False and os.path.exists("temp_audio.wav"):
        os.remove("temp_audio.wav")

# Defect Prediction Section
st.subheader("Multi-Label Defect Prediction")
feature_input = st.text_area("Enter Feature Vector (e.g., text report)", "Sample text here")
label_cols = ["type_blocker", "type_regression", "type_bug", "type_documentation", "type_enhancement", "type_task", "type_dependency_upgrade"]

if st.button("Predict Defects"):
    if feature_input:
        defect_features = preprocess_defect_features(feature_input, vectorizer, scaler)

        if selected_model == "Logistic Regression":
            # Convert list of arrays to a single numpy array
            probas = lr_defect_model.predict_proba(defect_features)
            # Ensure each proba is 2D with shape (1, 2)
            probas_2d = [proba.reshape(1, -1) if proba.ndim == 1 else proba for proba in probas]
            # Extract the probabilities for the positive class (index 1)
            predictions = np.array([proba[:, 1] for proba in probas_2d]).T  # Shape: (n_samples, n_labels)
        elif selected_model == "SVM":
            # decision_function returns scores, not probabilities
            predictions = svm_defect_model.decision_function(defect_features).T  # Shape: (n_labels, n_samples)
        elif selected_model == "Perceptron":
            predictions = perceptron_defect_model.predict(defect_features).T  # Shape: (n_labels, n_samples)
        else:  # DNN
            with torch.no_grad():
                input_tensor = torch.tensor(defect_features, dtype=torch.float32)
                predictions = dnn_defect_model(input_tensor).numpy().T  # Shape: (n_labels, n_samples)

        # Convert to binary predictions and confidence scores
        binary_predictions = (predictions > 0.5).astype(int)
        confidence_scores = np.clip(predictions, 0, 1)  # Ensure scores are between 0 and 1

        st.write("Predicted Labels:")
        for i, label in enumerate(label_cols):
            st.write(f"{label}: {binary_predictions[i][0]}, Confidence: {confidence_scores[i][0]:.4f}")
    else:
        st.write("Please enter a feature vector.")

# UI Styling
st.sidebar.header("Instructions")
st.sidebar.write("1. Select a model from the sidebar.")
st.sidebar.write("2. Upload an audio file or select from the dataset for deepfake detection.")
st.sidebar.write("3. Input a text feature for defect prediction.")
st.sidebar.write("4. Click 'Predict' to see results with confidence scores.")

st.markdown("""
<style>
    .stButton>button {
        background-color: #4CAF50;
        color: white;
        padding: 10px 20px;
    }
    .stTextArea {
        margin-bottom: 10px;
    }
</style>
""", unsafe_allow_html=True)