In [9]:
# Config dasar
import os, random, math, gc
import numpy as np, pandas as pd, torch

RANDOM_SEED = 42
torch.manual_seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

DATA_CSV = r"D:\INDONERIS-DATAMINING\multimodal-hoax-detection\data\training\multimodal_splits\audio_only_dataset.csv"
AUDIO_BASE_DIR = r"D:\INDONERIS-DATAMINING\multimodal-hoax-detection"  
MODEL_SAVE_PATH = r"D:\INDONERIS-DATAMINING\multimodal-hoax-detection\models\audio_baseline/audio_head_improved.pt"
EMB_SAVE = "audio_embeddings_precomputed.npz"
TARGET_SR = 16000
SEGMENT_SEC = 10
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("DEVICE =", DEVICE)


DEVICE = cuda


In [10]:
# Load CSV dan buat kolom path full
df = pd.read_csv(DATA_CSV)
def to_full_path(rel_path):
    import pandas as pd
    if pd.isna(rel_path): return ""
    p = str(rel_path).lstrip("./")
    return os.path.join(AUDIO_BASE_DIR, p)
df["audio_path_full"] = df["audio_path"].apply(to_full_path)
print("rows:", len(df))
print(df[["audio_path","audio_path_full"]].head(5))
# quick existence check (first 5)
for p in df["audio_path_full"].head(5):
    print(p, os.path.exists(p))


rows: 208
                              audio_path  \
0  ./data/raw/youtube/audio/YT_00081.wav   
1  ./data/raw/youtube/audio/YT_00867.wav   
2  ./data/raw/youtube/audio/YT_00904.wav   
3  ./data/raw/youtube/audio/YT_01161.wav   
4  ./data/raw/youtube/audio/YT_00039.wav   

                                     audio_path_full  
0  D:\INDONERIS-DATAMINING\multimodal-hoax-detect...  
1  D:\INDONERIS-DATAMINING\multimodal-hoax-detect...  
2  D:\INDONERIS-DATAMINING\multimodal-hoax-detect...  
3  D:\INDONERIS-DATAMINING\multimodal-hoax-detect...  
4  D:\INDONERIS-DATAMINING\multimodal-hoax-detect...  
D:\INDONERIS-DATAMINING\multimodal-hoax-detection\data/raw/youtube/audio/YT_00081.wav True
D:\INDONERIS-DATAMINING\multimodal-hoax-detection\data/raw/youtube/audio/YT_00867.wav True
D:\INDONERIS-DATAMINING\multimodal-hoax-detection\data/raw/youtube/audio/YT_00904.wav True
D:\INDONERIS-DATAMINING\multimodal-hoax-detection\data/raw/youtube/audio/YT_01161.wav True
D:\INDONERIS-DATAMINING\multimo

In [11]:
# Loader: center segment, resample, mono, VAD check, optional spectral gating denoise
import librosa, soundfile as sf

def load_center_segment_waveform(path, target_sr=TARGET_SR, segment_sec=SEGMENT_SEC):
    import torchaudio, torch.nn as nn
    wav, sr = torchaudio.load(path)
    if sr != target_sr:
        wav = torchaudio.functional.resample(wav, sr, target_sr)
    if wav.shape[0] > 1:
        wav = wav.mean(dim=0, keepdim=True)
    num_samples = wav.shape[1]
    seg_len = int(target_sr * segment_sec)
    if num_samples <= seg_len:
        pad_len = seg_len - num_samples
        wav = nn.functional.pad(wav, (0, pad_len))
    else:
        center = num_samples // 2
        start = max(0, center - seg_len // 2)
        end = start + seg_len
        wav = wav[:, start:end]
    arr = wav.squeeze(0).numpy()
    return arr

def is_voiced_enough(y, sr=TARGET_SR, top_db=25, min_voiced_sec=0.5):
    intervals = librosa.effects.split(y, top_db=top_db)
    voiced_samples = sum((end-start) for start,end in intervals)
    return (voiced_samples / sr) >= min_voiced_sec

# optional spectral gating denoise (very light)
def spectral_gate(y, sr, prop_decrease=0.9):
    import numpy as np, scipy.signal
    # simple spectral gating using short-time magnitude threshold (fast)
    S = librosa.stft(y, n_fft=1024, hop_length=256)
    mag = np.abs(S)
    med = np.median(mag, axis=1, keepdims=True)
    mask = mag >= med * 1.0
    S2 = S * mask
    y2 = librosa.istft(S2)
    return y2


In [12]:
# Precompute wav2vec embeddings to disk (one-time). Jika sudah ada file EMB_SAVE, skip.
if os.path.exists(EMB_SAVE):
    print("Embedding file exists, skip precompute:", EMB_SAVE)
else:
    from transformers import Wav2Vec2Processor, Wav2Vec2Model
    import torch
    processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
    model_wv = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base").to(DEVICE)
    model_wv.eval()

    ids = []
    paths = []
    embeddings = []
    failed = []
    from tqdm.auto import tqdm
    for i, row in tqdm(df.iterrows(), total=len(df)):
        pid = row.get("sample_id", str(i))
        p = row["audio_path_full"]
        ids.append(pid); paths.append(p)
        try:
            y = load_center_segment_waveform(p)
            if not is_voiced_enough(y):
                embeddings.append(np.zeros((1,768), dtype=np.float32))
                continue
            # optional denoise: uncomment if needed (slow)
            # y = spectral_gate(y, TARGET_SR)
            inputs = processor(y, sampling_rate=TARGET_SR, return_tensors="pt", padding=True)
            with torch.no_grad():
                out = model_wv(inputs.input_values.to(DEVICE))
            hid = out.last_hidden_state.cpu().numpy()  # (1, L, D)
            embeddings.append(hid.squeeze(0).astype(np.float32))
        except Exception as e:
            failed.append((p, str(e)))
            embeddings.append(np.zeros((1,768), dtype=np.float32))
    np.savez_compressed(EMB_SAVE, ids=np.array(ids), paths=np.array(paths), embeddings=embeddings)
    print("Saved embeddings:", EMB_SAVE, "failures:", len(failed))
    del model_wv; torch.cuda.empty_cache(); gc.collect()


Embedding file exists, skip precompute: audio_embeddings_precomputed.npz


In [13]:
# Dataset that reads precomputed embeddings (.npz)
import numpy as np
from torch.utils.data import Dataset, DataLoader

data_np = np.load(EMB_SAVE, allow_pickle=True)
emb_list = data_np["embeddings"]
paths_saved = data_np["paths"].tolist()
ids_saved = data_np["ids"].tolist()

class EmbeddingDataset(Dataset):
    def __init__(self, df, emb_list, label_map=None):
        self.df = df.reset_index(drop=True)
        self.emb = emb_list
        self.label_map = label_map or {"hoax":0, "valid":1}
    def __len__(self): return len(self.emb)
    def __getitem__(self, idx):
        e = self.emb[idx].astype(np.float32)
        label = self.df.iloc[idx]["label"]
        if isinstance(label, str):
            label = self.label_map.get(label, 0)
        return {"embeddings": torch.tensor(e), "label": torch.tensor(int(label))}

def collate_varbatch(batch):
    emb_list = [item["embeddings"] for item in batch]
    labs = torch.stack([item["label"] for item in batch])
    max_len = max(e.shape[0] for e in emb_list); dim = emb_list[0].shape[1]
    padded = torch.zeros(len(emb_list), max_len, dim, dtype=torch.float32)
    for i,e in enumerate(emb_list): padded[i, :e.shape[0], :] = e
    return {"embeddings": padded, "label": labs}


In [14]:
# Model head and focal loss
import torch.nn as nn, torch

class AttentivePool(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.linear = nn.Linear(dim, dim)
        self.context = nn.Linear(dim, 1)
    def forward(self, x):
        h = torch.tanh(self.linear(x))
        scores = self.context(h).squeeze(-1)
        w = torch.softmax(scores, dim=1).unsqueeze(-1)
        return (w * x).sum(dim=1)

class Head(nn.Module):
    def __init__(self, dim=768, hidden=256, nclass=2):
        super().__init__()
        self.pool = AttentivePool(dim)
        self.fc = nn.Sequential(
            nn.Linear(dim, hidden),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden, nclass)
        )
    def forward(self, x): return self.fc(self.pool(x))

# focal loss (binary/multi)
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, alpha=0.25, reduction="mean"):
        super().__init__()
        self.gamma = gamma; self.alpha = alpha; self.reduction = reduction
    def forward(self, logits, targets):
        probs = torch.softmax(logits, dim=1)
        targets_onehot = torch.nn.functional.one_hot(targets, num_classes=logits.size(1)).float()
        p_t = (probs * targets_onehot).sum(dim=1)
        loss = - self.alpha * (1 - p_t) ** self.gamma * torch.log(p_t + 1e-9)
        return loss.mean()


In [15]:
# Build dataloader, compute class weights, train head only (fast)
ds = EmbeddingDataset(df, emb_list)
loader = DataLoader(ds, batch_size=8, shuffle=True, collate_fn=collate_varbatch, num_workers=0)

# class weights
from collections import Counter
cnt = Counter(df["label"].astype(str).tolist())
# map to numeric labels used earlier
lab_map = {"hoax":0, "valid":1}
counts = [cnt.get("hoax",0), cnt.get("valid",0)]
total = sum(counts)
class_weights = [total/(c+1e-9) if c>0 else 1.0 for c in counts]
class_weights = torch.tensor(class_weights).to(DEVICE)

model = Head(dim=768).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.5, patience=1, verbose=True)
criterion = torch.nn.CrossEntropyLoss(weight=class_weights)
# or use focal: criterion = FocalLoss(gamma=2.0, alpha=0.25)

for epoch in range(6):
    model.train()
    total_loss=0.0; n=0
    for b in loader:
        emb = b["embeddings"].to(DEVICE)
        labels = b["label"].to(DEVICE)
        logits = model(emb)
        loss = criterion(logits, labels)
        optimizer.zero_grad(); loss.backward(); optimizer.step()
        total_loss += loss.item(); n+=1
    avg = total_loss / max(1,n)
    print(f"epoch {epoch+1} loss {avg:.4f}")
    scheduler.step(avg)
torch.save(model.state_dict(), MODEL_SAVE_PATH)
print("Saved head to", MODEL_SAVE_PATH)




epoch 1 loss 0.6929
epoch 2 loss 0.6749
epoch 3 loss 0.6704
epoch 4 loss 0.6577
epoch 5 loss 0.6467
epoch 6 loss 0.6270
Saved head to D:\INDONERIS-DATAMINING\multimodal-hoax-detection\models\audio_baseline/audio_head_improved.pt


In [16]:
# Evaluate and show classification report + confusion matrix
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import numpy as np

def evaluate_pretty(model, df, emb_list, batch_size=8, label_names=["hoax","valid"]):
    ds = EmbeddingDataset(df, emb_list)
    loader = DataLoader(ds, batch_size=batch_size, shuffle=False, collate_fn=collate_varbatch, num_workers=0)
    model.to(DEVICE); model.eval()
    y_true=[]; y_pred=[]; y_prob=[]
    with torch.no_grad():
        for b in loader:
            emb = b["embeddings"].to(DEVICE)
            labels = b["label"].cpu().numpy()
            logits = model(emb).cpu().numpy()
            exp = np.exp(logits - np.max(logits, axis=1, keepdims=True))
            probs = exp / exp.sum(axis=1, keepdims=True)
            preds = np.argmax(probs, axis=1)
            y_true.extend(labels.tolist()); y_pred.extend(preds.tolist()); y_prob.extend(probs.tolist())
    print(classification_report(y_true, y_pred, target_names=label_names, digits=4))
    print("accuracy:", accuracy_score(y_true, y_pred))
    print("confusion matrix:\n", confusion_matrix(y_true, y_pred))

# usage:
evaluate_pretty(model, df, emb_list, batch_size=8)


              precision    recall  f1-score   support

        hoax     0.5424    0.6154    0.5766        52
       valid     0.8658    0.8269    0.8459       156

    accuracy                         0.7740       208
   macro avg     0.7041    0.7212    0.7112       208
weighted avg     0.7849    0.7740    0.7786       208

accuracy: 0.7740384615384616
confusion matrix:
 [[ 32  20]
 [ 27 129]]
