In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
from pathlib import Path
import numpy as np
import librosa
import soundfile as sf
import matplotlib.pyplot as plt

BASE_DIR = Path("/content/drive/MyDrive/robot_project/train_data")

# 5개 재질과 라벨 매핑
MATERIALS = ["Aluminum", "Ceramic", "Plastic", "Paper", "Wood"]
material_to_label = {m: i for i, m in enumerate(MATERIALS)}
print(material_to_label)


{'Aluminum': 0, 'Ceramic': 1, 'Plastic': 2, 'Paper': 3, 'Wood': 4}


# Data Preprocessing match to PANNs input type

Find peak and seperate into 1 sec length audio files


In [None]:
!pip install noisereduce

Collecting noisereduce
  Downloading noisereduce-3.0.3-py3-none-any.whl.metadata (14 kB)
Downloading noisereduce-3.0.3-py3-none-any.whl (22 kB)
Installing collected packages: noisereduce
Successfully installed noisereduce-3.0.3


In [None]:
import noisereduce as nr
from scipy.signal import find_peaks

def load_audio(file_path, sampling_rate=44100):
    y, sampling_rate = librosa.load(file_path, sr=sampling_rate)
    return y, sampling_rate

def reduce_noise_spectral_gating(audio, sr=44100, prop_decrease=0.8, stationary=False):
    return nr.reduce_noise(y=audio, sr=sr, stationary=stationary, prop_decrease=prop_decrease)

def detect_impacts_peak_finding(audio, sr, height_ratio=0.3, min_distance_sec=0.5):
    audio_abs = np.abs(audio)
    max_amp = audio_abs.max() if len(audio_abs) > 0 else 0.0
    if max_amp == 0:
        return [], []

    height = max_amp * height_ratio
    distance = int(min_distance_sec * sr)

    peaks, _ = find_peaks(audio_abs, height=height, distance=distance)
    impact_indices = peaks.tolist()
    impact_times = [idx / sr for idx in impact_indices]
    return impact_times, impact_indices

def trim_audio_around_impacts(
    audio,
    sr,
    impact_times,
    before_sec=0.05,
    after_sec=0.95,
    overlap_handling="exclude",
):
    total_length_sec = before_sec + after_sec  # 1초
    total_length_samples = int(total_length_sec * sr)
    before_samples = int(before_sec * sr)
    after_samples = int(after_sec * sr)

    trimmed_segments = []

    background_noise_mean = np.mean(audio[: int(0.5 * sr)]) if len(audio) > int(0.5 * sr) else 0.0

    for impact_time in impact_times:
        impact_idx = int(impact_time * sr)
        start_idx = impact_idx - before_samples
        end_idx = impact_idx + after_samples

        # 1초 segment 만들기 (padding 포함)
        if start_idx < 0:
            segment = np.zeros(total_length_samples)
            actual_start = 0
            actual_end = min(end_idx, len(audio))
            segment[-start_idx : -start_idx + (actual_end - actual_start)] = audio[actual_start:actual_end]
        elif end_idx > len(audio):
            segment = np.zeros(total_length_samples)
            actual_start = max(start_idx, 0)
            actual_end = len(audio)
            segment[: actual_end - actual_start] = audio[actual_start:actual_end]
        else:
            segment = audio[start_idx:end_idx].copy()

        # 다른 impact와 겹치는 경우 처리
        segment_start_time = impact_time - before_sec
        segment_end_time = impact_time + after_sec

        has_overlap = False
        for other_time in impact_times:
            if other_time == impact_time:
                continue
            if segment_start_time <= other_time <= segment_end_time:
                has_overlap = True
                break

        if has_overlap:
            if overlap_handling == "exclude":
                continue
            elif overlap_handling == "replace":
                for other_time in impact_times:
                    if other_time == impact_time:
                        continue
                    if segment_start_time <= other_time <= segment_end_time:
                        overlap_start_idx = int((other_time - segment_start_time) * sr) - int(0.05 * sr)
                        overlap_end_idx = int((other_time - segment_start_time) * sr) + int(0.05 * sr)
                        overlap_start_idx = max(0, overlap_start_idx)
                        overlap_end_idx = min(len(segment), overlap_end_idx)
                        segment[overlap_start_idx:overlap_end_idx] = background_noise_mean

        trimmed_segments.append(segment)

    return trimmed_segments


In [None]:
def get_material_from_path(path: Path):
    parts = path.parts
    for m in MATERIALS:
        if m in parts:
            return m
    return None  # 못 찾으면 None


In [None]:
import pandas as pd
from tqdm import tqdm

SEGMENT_WAV_DIR = BASE_DIR / "segments_wav"  # /robot_project/segments_wav/<Material>/xxx.wav
SEGMENT_WAV_DIR.mkdir(parents=True, exist_ok=True)

meta = []

# BASE_DIR 아래에 있는 모든 .ogg, .ogx를 한 번에 스캔
ogg_files = list(BASE_DIR.rglob("*.ogg"))
ogx_files = list(BASE_DIR.rglob("*.ogx"))
audio_files = ogg_files + ogx_files

print("총 ogg 파일 수:", len(ogg_files))
print("총 ogx 파일 수:", len(ogx_files))
print("합계 audio 파일 수:", len(audio_files))

global_idx = 0

for audio_path in tqdm(audio_files):
    material = get_material_from_path(audio_path)
    if material is None:
        # 재질 이름이 없는 경로면 스킵
        continue
    label = material_to_label[material]

    try:
        audio, sr = load_audio(str(audio_path), sampling_rate=44100)
    except Exception as e:
        print("로드 실패:", audio_path, e)
        continue

    # 디노이즈
    audio_denoised = reduce_noise_spectral_gating(audio, sr, prop_decrease=0.8) # prop_decrease 조절

    # 피크 탐지
    impact_times, impact_indices = detect_impacts_peak_finding(
        audio_denoised,
        sr,
        height_ratio=0.3,
        min_distance_sec=0.5,
    )

    if len(impact_times) == 0:
        continue

    # 충돌 주변 1초 클립 (overlap 있는 건 제외)
    segments = trim_audio_around_impacts(
        audio_denoised,
        sr,
        impact_times,
        before_sec=0.05,
        after_sec=0.95,
        overlap_handling="exclude",
    )

    # 각 segment를 32kHz로 리샘플하고 .wav 저장
    for seg in segments:
        seg_32k = librosa.resample(seg, orig_sr=sr, target_sr=32000)
        seg_dir = SEGMENT_WAV_DIR / material
        seg_dir.mkdir(parents=True, exist_ok=True)

        global_idx += 1
        out_path = seg_dir / f"{global_idx:06d}.wav"
        sf.write(str(out_path), seg_32k, 32000)

        meta.append({
            "segment_id": global_idx,
            "wav_path": str(out_path),
            "material": material,
            "label": label,
            "source_file": str(audio_path),
        })

meta_df = pd.DataFrame(meta)
meta_csv_path = BASE_DIR / "segments_metadata.csv"
meta_df.to_csv(meta_csv_path, index=False)
print("저장된 segment 개수:", len(meta_df))
print("메타데이터:", meta_csv_path)


총 ogg 파일 수: 421
총 ogx 파일 수: 210
합계 audio 파일 수: 631


100%|██████████| 631/631 [10:17<00:00,  1.02it/s]

저장된 segment 개수: 2308
메타데이터: /content/drive/MyDrive/robot_project/train_data/segments_metadata.csv





# Balanced Dataset
각 class 당 동일한 개수의 train data로 학습

- 'Aluminum'     293
- 'Ceramic'   326
- 'Glass'     *110*
- 'Plastic'    1073
- 'Paper'     333
- 'Wood'    283

Glass : 너무 적음. 그냥 빼는 것도 고려



In [None]:
import pandas as pd
from pathlib import Path

# 1) 임베딩 메타데이터 불러오기
csv_path = Path(BASE_DIR) / "segments_metadata.csv"
df = pd.read_csv(csv_path)

# 2) label별 개수 확인
class_counts = df["label"].value_counts().sort_index()
print("원래 클래스별 개수:\n", class_counts)

# 3) 각 class에서 뽑을 개수 = 가장 적은 클래스의 개수
min_count = class_counts.min()
print("각 클래스에서 샘플링할 개수:", min_count)

# 4) 각 label 그룹에서 min_count개씩 랜덤 샘플링 → balanced df 생성
balanced_df = (
    df.groupby("label", group_keys=False)
      .apply(lambda g: g.sample(n=min_count, random_state=42))
      .reset_index(drop=True)
)

# 5) 섞어주기
balanced_df = balanced_df.sample(frac=1.0, random_state=42).reset_index(drop=True)

print("balanced 클래스별 개수:\n", balanced_df["label"].value_counts().sort_index())

# 6) 저장
balanced_csv_path = Path(BASE_DIR) / "segments_train_emb_balanced.csv"
balanced_df.to_csv(balanced_csv_path, index=False)
print("balanced train csv 저장:", balanced_csv_path)


원래 클래스별 개수:
 label
0     293
1     326
2    1073
3     333
4     283
Name: count, dtype: int64
각 클래스에서 샘플링할 개수: 283
balanced 클래스별 개수:
 label
0    283
1    283
2    283
3    283
4    283
Name: count, dtype: int64
balanced train csv 저장: /content/drive/MyDrive/robot_project/train_data/segments_train_emb_balanced.csv


  .apply(lambda g: g.sample(n=min_count, random_state=42))


# 1. PANNs Pretrained model as a feature extractor & Training Light Classifier

In [None]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, WeightedRandomSampler
# train set metadata root
metadata_csv = Path(BASE_DIR) / "segments_train_emb_balanced.csv"
train_df = pd.read_csv(metadata_csv)

# label별 개수 확인
class_counts = train_df["label"].value_counts().sort_index()
print("class_counts:\n", class_counts)


class_counts:
 label
0    283
1    283
2    283
3    283
4    283
Name: count, dtype: int64


In [None]:
# PANNs Inference 사용
!pip install panns-inference

from panns_inference import AudioTagging
import numpy as np

at = AudioTagging(checkpoint_path=None, device='cuda')  # GPU 사용
print(at.model.__class__.__name__)


Checkpoint path: /root/panns_data/Cnn14_mAP=0.431.pth
GPU number: 1
DataParallel


In [None]:
def get_panns_embedding_from_wav(wav_path, target_sr=32000):
    audio, sr = librosa.load(wav_path, sr=target_sr)
    assert sr == target_sr, f"sr mismatch: {sr}"
    audio = audio[None, :].astype(np.float32)  # (1, T)

    # panns_inference의 inference API
    clipwise_output, embedding = at.inference(audio)
    # embedding shape: (1, D)
    return embedding[0]  # (D,)


In [None]:
from tqdm import tqdm

# PANNs Output Embedding
EMB_DIR = BASE_DIR / "segments_embedding"
EMB_DIR.mkdir(parents=True, exist_ok=True)

meta_df = pd.read_csv(metadata_csv)
emb_dim = None

emb_paths = []

for i, row in tqdm(meta_df.iterrows(), total=len(meta_df)):
    wav_path = row["wav_path"]
    segment_id = row["segment_id"]

    emb = get_panns_embedding_from_wav(wav_path, target_sr=32000) # get PANNs embedding from .wav file
    if emb_dim is None:
        emb_dim = emb.shape[0]
        print("Embedding dim:", emb_dim)

    emb_path = EMB_DIR / f"{segment_id:06d}.npy" #.npy로 임베딩 저장
    np.save(emb_path, emb.astype(np.float32))
    emb_paths.append(str(emb_path))

meta_df["emb_path"] = emb_paths

# csv file update
meta_df.to_csv(metadata_csv, index=False)
print("업데이트된 메타:", metadata_csv)


  0%|          | 5/1415 [00:00<00:32, 43.52it/s]

Embedding dim: 2048


100%|██████████| 1415/1415 [05:16<00:00,  4.47it/s]

업데이트된 메타: /content/drive/MyDrive/robot_project/train_data/segments_train_emb_balanced.csv





In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class ImpactEmbDataset(Dataset):
    def __init__(self, csv_path):
        import pandas as pd
        self.df = pd.read_csv(csv_path)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        emb = np.load(row["emb_path"]).astype(np.float32)  # (D,)
        label = int(row["label"])
        return torch.from_numpy(emb), torch.tensor(label, dtype=torch.long)


In [None]:
import numpy as np
import pandas as pd

full_df = pd.read_csv(metadata_csv)

print("전체 샘플 수:", len(full_df))
print("전체 클래스 분포:\n", full_df["label"].value_counts().sort_index())

train_ratio = 0.8
train_parts = []
val_parts = []

# label별로 나눠서 각 클래스 안에서 8:2 split
for label, group in full_df.groupby("label"):
    group = group.sample(frac=1.0, random_state=42).reset_index(drop=True)  # 그룹 내 셔플
    n_total = len(group)
    n_train = int(n_total * train_ratio)

    train_part = group.iloc[:n_train].reset_index(drop=True)
    val_part   = group.iloc[n_train:].reset_index(drop=True)

    train_parts.append(train_part)
    val_parts.append(val_part)

# 클래스별로 나눠둔 걸 다시 합치고, 전체 한 번 더 셔플
train_df = pd.concat(train_parts, axis=0).sample(frac=1.0, random_state=42).reset_index(drop=True)
val_df   = pd.concat(val_parts,   axis=0).sample(frac=1.0, random_state=42).reset_index(drop=True)

print("Train:", len(train_df), "Val:", len(val_df))
print("Train per class:\n", train_df["label"].value_counts().sort_index())
print("Val per class:\n",   val_df["label"].value_counts().sort_index())

train_csv = BASE_DIR / "segments_train_emb.csv"
val_csv   = BASE_DIR / "segments_val_emb.csv"
train_df.to_csv(train_csv, index=False)
val_df.to_csv(val_csv, index=False)

train_dataset = ImpactEmbDataset(train_csv)
val_dataset   = ImpactEmbDataset(val_csv)

from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=64, shuffle=False)

print("Train size:", len(train_dataset))
print("Val size:", len(val_dataset))


전체 샘플 수: 1415
전체 클래스 분포:
 label
0    283
1    283
2    283
3    283
4    283
Name: count, dtype: int64
Train: 1130 Val: 285
Train per class:
 label
0    226
1    226
2    226
3    226
4    226
Name: count, dtype: int64
Val per class:
 label
0    57
1    57
2    57
3    57
4    57
Name: count, dtype: int64
Train size: 1130
Val size: 285


In [None]:
import torch.nn as nn
import torch.nn.functional as F

EMB_DIM = emb_dim  # 앞에서 구한 embedding dimension

class EmbClassifier(nn.Module):
    def __init__(self, emb_dim=EMB_DIM, num_classes=5):
        super().__init__()
        self.fc1 = nn.Linear(emb_dim, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.fc2 = nn.Linear(512, 256)
        self.bn2 = nn.BatchNorm1d(256)
        self.fc3 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = F.dropout(x, p=0.3, training=self.training)

        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = F.dropout(x, p=0.3, training=self.training)

        x = self.fc3(x)
        return x


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EmbClassifier(emb_dim=EMB_DIM, num_classes=len(MATERIALS)).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
num_epochs = 20

for epoch in range(num_epochs):
    # ----- train -----
    model.train()
    train_loss = 0.0
    train_correct = 0
    train_total = 0

    for emb, labels in train_loader:
        emb = emb.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(emb)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * labels.size(0)
        _, preds = outputs.max(1)
        train_correct += (preds == labels).sum().item()
        train_total += labels.size(0)

    train_loss /= train_total
    train_acc = train_correct / train_total

    # ----- validation -----
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for emb, labels in val_loader:
            emb = emb.to(device)
            labels = labels.to(device)
            outputs = model(emb)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * labels.size(0)
            _, preds = outputs.max(1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)

    val_loss /= val_total
    val_acc = val_correct / val_total

    print(f"Epoch {epoch+1:02d} "
          f"Train loss: {train_loss:.4f}, acc: {train_acc:.3f} | "
          f"Val loss: {val_loss:.4f}, acc: {val_acc:.3f}")


Epoch 01 Train loss: 0.9629, acc: 0.647 | Val loss: 1.1989, acc: 0.688
Epoch 02 Train loss: 0.6034, acc: 0.796 | Val loss: 0.7059, acc: 0.789
Epoch 03 Train loss: 0.4502, acc: 0.853 | Val loss: 0.5374, acc: 0.821
Epoch 04 Train loss: 0.3795, acc: 0.879 | Val loss: 0.5007, acc: 0.839
Epoch 05 Train loss: 0.3334, acc: 0.882 | Val loss: 0.4456, acc: 0.825
Epoch 06 Train loss: 0.2779, acc: 0.906 | Val loss: 0.4256, acc: 0.856
Epoch 07 Train loss: 0.2546, acc: 0.918 | Val loss: 0.4084, acc: 0.867
Epoch 08 Train loss: 0.2058, acc: 0.930 | Val loss: 0.3693, acc: 0.874
Epoch 09 Train loss: 0.1981, acc: 0.935 | Val loss: 0.5015, acc: 0.839
Epoch 10 Train loss: 0.2015, acc: 0.933 | Val loss: 0.3912, acc: 0.863
Epoch 11 Train loss: 0.1569, acc: 0.946 | Val loss: 0.4966, acc: 0.842
Epoch 12 Train loss: 0.1543, acc: 0.942 | Val loss: 0.4351, acc: 0.863
Epoch 13 Train loss: 0.1332, acc: 0.952 | Val loss: 0.3960, acc: 0.874
Epoch 14 Train loss: 0.1190, acc: 0.964 | Val loss: 0.5100, acc: 0.849
Epoch 

In [None]:
SAVE_PATH = BASE_DIR / "panns_emb_classifier_20epoch.pth"
torch.save({
    "model_state_dict": model.state_dict(),
    "emb_dim": EMB_DIM,
    "num_classes": len(MATERIALS),
    "material_to_label": material_to_label,
}, SAVE_PATH)
print("모델 저장:", SAVE_PATH)


모델 저장: /content/drive/MyDrive/robot_project/train_data/panns_emb_classifier_20epoch.pth


In [None]:
ckpt = torch.load(SAVE_PATH, map_location=device)

EMB_DIM = ckpt["emb_dim"]
num_classes = ckpt["num_classes"]

model = EmbClassifier(emb_dim=EMB_DIM, num_classes=num_classes).to(device)
model.load_state_dict(ckpt["model_state_dict"])
model.eval()  # ← 평가 모드


EmbClassifier(
  (fc1): Linear(in_features=2048, out_features=512, bias=True)
  (bn1): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc2): Linear(in_features=512, out_features=256, bias=True)
  (bn2): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc3): Linear(in_features=256, out_features=5, bias=True)
)

# 2. PANNs Fine-Tuning with Balanced data

In [None]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import librosa
from pathlib import Path

balanced_csv_path = Path(BASE_DIR) / "segments_train_emb_balanced.csv"
full_df = pd.read_csv(balanced_csv_path)

print("Balanced 전체 개수:", len(full_df))
print(full_df.head())


Balanced 전체 개수: 1415
   segment_id                                           wav_path  material  \
0        2129  /content/drive/MyDrive/robot_project/train_dat...   Plastic   
1        1732  /content/drive/MyDrive/robot_project/train_dat...     Paper   
2        1909  /content/drive/MyDrive/robot_project/train_dat...   Ceramic   
3         218  /content/drive/MyDrive/robot_project/train_dat...  Aluminum   
4         196  /content/drive/MyDrive/robot_project/train_dat...  Aluminum   

   label                                        source_file  \
0      2  /content/drive/MyDrive/robot_project/train_dat...   
1      3  /content/drive/MyDrive/robot_project/train_dat...   
2      1  /content/drive/MyDrive/robot_project/train_dat...   
3      0  /content/drive/MyDrive/robot_project/train_dat...   
4      0  /content/drive/MyDrive/robot_project/train_dat...   

                                            emb_path  
0  /content/drive/MyDrive/robot_project/train_dat...  
1  /content/drive/MyD

In [None]:
import pandas as pd

full_df = full_df.sample(frac=1.0, random_state=42).reset_index(drop=True)

train_parts = []
val_parts = []
train_ratio = 0.8

for label, group in full_df.groupby("label"):
    group = group.sample(frac=1.0, random_state=42).reset_index(drop=True)  # 그룹 내 셔플
    n_total = len(group)
    n_train = int(n_total * train_ratio)

    train_part = group.iloc[:n_train].reset_index(drop=True)
    val_part   = group.iloc[n_train:].reset_index(drop=True)

    train_parts.append(train_part)
    val_parts.append(val_part)

train_df = pd.concat(train_parts, axis=0).sample(frac=1.0, random_state=42).reset_index(drop=True)
val_df   = pd.concat(val_parts,   axis=0).sample(frac=1.0, random_state=42).reset_index(drop=True)

print("Train:", len(train_df), "Val:", len(val_df))
print("Train per class:\n", train_df["label"].value_counts().sort_index())
print("Val per class:\n",   val_df["label"].value_counts().sort_index())



Train: 1130 Val: 285
Train per class:
 label
0    226
1    226
2    226
3    226
4    226
Name: count, dtype: int64
Val per class:
 label
0    57
1    57
2    57
3    57
4    57
Name: count, dtype: int64


In [None]:
class ImpactWaveDataset(Dataset):
    def __init__(self, df, target_sr=32000, target_len=32000, mode="train"):
        """
        df: DataFrame (columns: wav_path, label, ...)
        target_sr: PANNs가 사용하는 32kHz
        target_len: 파형 길이 (1초 = 32000 샘플). 부족하면 zero-pad, 길면 crop
        mode: "train"이면 random crop, "val"이면 center crop
        """
        self.df = df.reset_index(drop=True)
        self.target_sr = target_sr
        self.target_len = target_len
        self.mode = mode

    def __len__(self):
        return len(self.df)

    def _fix_length(self, wav: np.ndarray):
        L = len(wav)
        T = self.target_len
        if L == T:
            return wav
        elif L < T:
            # 뒤쪽 zero-padding
            pad = T - L
            return np.pad(wav, (0, pad), mode="constant")
        else:
            # 길면 crop: train은 random, val은 center
            if self.mode == "train":
                start = np.random.randint(0, L - T + 1)
            else:
                start = max(0, (L - T) // 2)
            return wav[start:start+T]

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        wav_path = row["wav_path"]
        label = int(row["label"])

        # 32kHz로 로드
        wav, sr = librosa.load(wav_path, sr=self.target_sr)
        wav = self._fix_length(wav).astype(np.float32)

        return torch.from_numpy(wav), torch.tensor(label, dtype=torch.long)

batch_size = 64

train_dataset = ImpactWaveDataset(train_df, target_sr=32000, target_len=32000, mode="train")
val_dataset   = ImpactWaveDataset(val_df,   target_sr=32000, target_len=32000, mode="val")

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False)

print("Train batches:", len(train_loader), "Val batches:", len(val_loader))


Train batches: 17 Val batches: 5


In [None]:
!pip install panns-inference

import torch
import torch.nn as nn
import torch.nn.functional as F
from panns_inference import AudioTagging
from torchlibrosa.augmentation import SpecAugmentation # Add this import!

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device:", device)

# Pretrained PANNs (예: Cnn14)
at = AudioTagging(checkpoint_path=None, device=device)
panns_model = at.model  # 이게 실제 torch.nn.Module (Cnn14 계열)
print(panns_model.__class__.__name__)

num_classes = len(MATERIALS)

# 원래 AudioSet용 fc layer를 가져와서 in_features 확인
in_features = panns_model.fc_audioset.in_features
print("fc_audioset in_features:", in_features)

# Get original SpecAugmentation parameters (time_stripes_num, freq_drop_width, freq_stripes_num)
# as they are not affected by this fix and should retain their default values.
original_time_stripes_num = panns_model.spec_augmenter.time_dropper.stripes_num
original_freq_drop_width = panns_model.spec_augmenter.freq_dropper.drop_width
original_freq_stripes_num = panns_model.spec_augmenter.freq_dropper.stripes_num

# Re-initialize SpecAugmentation with a smaller time_drop_width.
# The spectrogram time dimension for a 1-second audio (32kHz, n_fft=2048, hop_length=1024) is approximately 30 frames.
# The original time_drop_width was 64, which is too large and causes the RuntimeError.
# Setting it to a value smaller than 30 (e.g., 20) will prevent the error.
panns_model.spec_augmenter = SpecAugmentation(
    time_drop_width=20, # Reduced from the problematic 64
    time_stripes_num=original_time_stripes_num,
    freq_drop_width=original_freq_drop_width,
    freq_stripes_num=original_freq_stripes_num
)

# 새로운 5-class head로 교체
panns_model.fc_audioset = nn.Linear(in_features, num_classes)

panns_model.to(device)

Collecting panns-inference
  Downloading panns_inference-0.1.1-py3-none-any.whl.metadata (2.4 kB)
Collecting torchlibrosa (from panns-inference)
  Downloading torchlibrosa-0.1.0-py3-none-any.whl.metadata (3.5 kB)
Downloading panns_inference-0.1.1-py3-none-any.whl (8.3 kB)
Downloading torchlibrosa-0.1.0-py3-none-any.whl (11 kB)
Installing collected packages: torchlibrosa, panns-inference
Successfully installed panns-inference-0.1.1 torchlibrosa-0.1.0
device: cuda
Checkpoint path: /root/panns_data/Cnn14_mAP=0.431.pth
Using CPU.
Cnn14
fc_audioset in_features: 2048


Cnn14(
  (spectrogram_extractor): Spectrogram(
    (stft): STFT(
      (conv_real): Conv1d(1, 513, kernel_size=(1024,), stride=(320,), bias=False)
      (conv_imag): Conv1d(1, 513, kernel_size=(1024,), stride=(320,), bias=False)
    )
  )
  (logmel_extractor): LogmelFilterBank()
  (spec_augmenter): SpecAugmentation(
    (time_dropper): DropStripes()
    (freq_dropper): DropStripes()
  )
  (bn0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv_block1): ConvBlock(
    (conv1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (conv_block2): ConvBlock(
    (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (c

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(panns_model.parameters(), lr=1e-4, weight_decay=1e-5)
num_epochs = 50

for epoch in range(num_epochs):
    # ---------- Train ----------
    panns_model.train()
    train_loss = 0.0
    train_correct = 0
    train_total = 0

    for batch_wav, labels in train_loader:
        batch_wav = batch_wav.to(device)           # (B, T)
        labels = labels.to(device)                 # (B,)

        optimizer.zero_grad()

        # PANNs forward
        # Corrected: Access elements from the dictionary returned by panns_model
        output_dict = panns_model(batch_wav)
        clipwise_output = output_dict["clipwise_output"]
        embedding = output_dict["embedding"]
        # clipwise_output: (B, num_classes)

        loss = criterion(clipwise_output, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * labels.size(0)
        _, preds = clipwise_output.max(1)
        train_correct += (preds == labels).sum().item()
        train_total += labels.size(0)

    train_loss /= train_total
    train_acc = train_correct / train_total

    # ---------- Validation ----------
    panns_model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for batch_wav, labels in val_loader:
            batch_wav = batch_wav.to(device)
            labels = labels.to(device)

            # Corrected: Access elements from the dictionary returned by panns_model
            output_dict = panns_model(batch_wav)
            clipwise_output = output_dict["clipwise_output"]
            embedding = output_dict["embedding"]

            loss = criterion(clipwise_output, labels)

            val_loss += loss.item() * labels.size(0)
            _, preds = clipwise_output.max(1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)

    val_loss /= val_total
    val_acc = val_correct / val_total

    print(f"[Epoch {epoch+1:02d}] "
          f"Train loss: {train_loss:.4f}, acc: {train_acc:.3f} | "
          f"Val loss: {val_loss:.4f}, acc: {val_acc:.3f}")

[Epoch 01] Train loss: 1.6090, acc: 0.210 | Val loss: 1.6052, acc: 0.312
[Epoch 02] Train loss: 1.6020, acc: 0.335 | Val loss: 1.5938, acc: 0.512
[Epoch 03] Train loss: 1.5590, acc: 0.395 | Val loss: 1.5334, acc: 0.544
[Epoch 04] Train loss: 1.4398, acc: 0.523 | Val loss: 1.3926, acc: 0.656
[Epoch 05] Train loss: 1.3327, acc: 0.641 | Val loss: 1.2542, acc: 0.719
[Epoch 06] Train loss: 1.2712, acc: 0.666 | Val loss: 1.1665, acc: 0.807
[Epoch 07] Train loss: 1.2055, acc: 0.737 | Val loss: 1.1092, acc: 0.849
[Epoch 08] Train loss: 1.1586, acc: 0.781 | Val loss: 1.0703, acc: 0.867
[Epoch 09] Train loss: 1.1258, acc: 0.793 | Val loss: 1.0497, acc: 0.881
[Epoch 10] Train loss: 1.1034, acc: 0.835 | Val loss: 1.0311, acc: 0.895
[Epoch 11] Train loss: 1.0920, acc: 0.823 | Val loss: 1.0156, acc: 0.912
[Epoch 12] Train loss: 1.0666, acc: 0.847 | Val loss: 1.0033, acc: 0.923
[Epoch 13] Train loss: 1.0537, acc: 0.864 | Val loss: 0.9931, acc: 0.930
[Epoch 14] Train loss: 1.0403, acc: 0.873 | Val los

In [None]:
FT_SAVE_PATH = BASE_DIR / "panns_finetuned_material5_50epoch.pth"
torch.save({
    "model_state_dict": panns_model.state_dict(),
    "num_classes": num_classes,
    "material_to_label": material_to_label,
}, FT_SAVE_PATH)
print("Fine-tuned PANNs 저장:", FT_SAVE_PATH)


Fine-tuned PANNs 저장: /content/drive/MyDrive/robot_project/train_data/panns_finetuned_material5_50epoch.pth


# Inference

In [None]:
import numpy as np
import librosa
import torch
import torch.nn.functional as F
from pathlib import Path

def classify_impacts_in_wav(
    wav_path,
    model,
    at,
    device,
    sr_orig=44100,
    target_sr=32000,
    height_ratio=0.3,
    min_distance_sec=0.5,
    before_sec=0.05,
    after_sec=0.95,
):
    """
    하나의 .wav 파일에 대해:
      - 충돌 시점들을 찾고
      - 각 충돌 주변 1초 segment를 잘라서
      - PANNs embedding + 학습된 분류기로 재질 예측

    Returns
    -------
    results: list of dict
        [
          {
            "impact_idx": int,
            "impact_time": float,
            "pred_label": int,
            "pred_material": str,
            "probs": np.ndarray(num_classes,)
          },
          ...
        ]
    """
    wav_path = Path(wav_path)

    # 1) 오디오 로드 + 디노이즈
    audio, sr = load_audio(str(wav_path), sampling_rate=sr_orig)
    audio_denoised = reduce_noise_spectral_gating(audio, sr, prop_decrease=0.8)

    # 2) 피크 탐지
    impact_times, impact_indices = detect_impacts_peak_finding(
        audio_denoised,
        sr,
        height_ratio=height_ratio,
        min_distance_sec=min_distance_sec,
    )
    if len(impact_times) == 0:
        print(f"[WARN] No impacts detected in {wav_path}")
        return []

    # 3) 충돌 주변 1초 segment로 자르기 (overlap 제외)
    segments = trim_audio_around_impacts(
        audio_denoised,
        sr,
        impact_times,
        before_sec=before_sec,
        after_sec=after_sec,
        overlap_handling="exclude",
    )

    # impact_times와 segments 개수가 다를 수 있음 (overlap으로 제외된 것들 때문)
    valid_times = []
    for t in impact_times:
        segment_start_time = t - before_sec
        segment_end_time   = t + after_sec
        has_overlap = False
        for other_t in impact_times:
            if other_t == t:
                continue
            if segment_start_time <= other_t <= segment_end_time:
                has_overlap = True
                break
        if not has_overlap:
            valid_times.append(t)

    # 길이 체크
    if len(segments) != len(valid_times):
        min_len = min(len(segments), len(valid_times))
        segments = segments[:min_len]
        valid_times = valid_times[:min_len]

    results = []
    label_to_material = {v: k for k, v in material_to_label.items()}

    # 4) 각 segment에 대해: resample → embedding → 분류
    model.eval()
    for i, (seg, t) in enumerate(zip(segments, valid_times)):
        # 32kHz로 리샘플
        seg_32k = librosa.resample(seg, orig_sr=sr, target_sr=target_sr)

        # PANNs input: (1, T), float32
        audio_in = seg_32k[None, :].astype(np.float32)
        clipwise_output, embedding = at.inference(audio_in)  # embedding: (1, D)
        emb = torch.from_numpy(embedding[0]).to(device)      # (D,)
        emb = emb.unsqueeze(0)                               # (1, D)

        # classifier 통과
        with torch.no_grad():
            logits = model(emb)            # (1, num_classes)
            probs = F.softmax(logits, dim=1).cpu().numpy()[0]
            pred_label = int(np.argmax(probs))
            pred_material = label_to_material[pred_label]

        results.append({
            "impact_idx": i,
            "impact_time": float(t),
            "pred_label": pred_label,
            "pred_material": pred_material,
            "probs": probs,
        })

    return results


In [None]:
ckpt = torch.load(FT_SAVE_PATH, map_location=device)

panns_model = at.model   # 다시 base model 구조 가져오기
in_features = panns_model.fc_audioset.in_features
panns_model.fc_audioset = nn.Linear(in_features, ckpt["num_classes"])
panns_model.load_state_dict(ckpt["model_state_dict"])
panns_model.to(device)
panns_model.eval()


Cnn14(
  (spectrogram_extractor): Spectrogram(
    (stft): STFT(
      (conv_real): Conv1d(1, 513, kernel_size=(1024,), stride=(320,), bias=False)
      (conv_imag): Conv1d(1, 513, kernel_size=(1024,), stride=(320,), bias=False)
    )
  )
  (logmel_extractor): LogmelFilterBank()
  (spec_augmenter): SpecAugmentation(
    (time_dropper): DropStripes()
    (freq_dropper): DropStripes()
  )
  (bn0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv_block1): ConvBlock(
    (conv1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (conv_block2): ConvBlock(
    (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (c

In [None]:
import numpy as np

label_to_material = {v: k for k, v in material_to_label.items()}

def classify_impacts_in_wav_finetuned(
    wav_path,
    panns_model,
    device,
    sr_orig=44100,
    target_sr=32000,
    height_ratio=0.3,
    min_distance_sec=0.5,
    before_sec=0.05,
    after_sec=0.95,
):
    wav_path = Path(wav_path)

    # Helper function for length fixing, using center crop for inference
    def _fix_length_for_inference(wav: np.ndarray, target_len: int):
        L = len(wav)
        T = target_len
        if L == T:
            return wav
        elif L < T:
            pad = T - L
            return np.pad(wav, (0, pad), mode="constant")
        else:
            start = max(0, (L - T) // 2)  # Center crop
            return wav[start:start+T]

    # 1) 오디오 로드 + 디노이즈
    audio, sr = load_audio(str(wav_path), sampling_rate=sr_orig)
    audio_denoised = reduce_noise_spectral_gating(audio, sr, prop_decrease=0.8)

    # 2) 피크 탐지
    impact_times, impact_indices = detect_impacts_peak_finding(
        audio_denoised,
        sr,
        height_ratio=height_ratio,
        min_distance_sec=min_distance_sec,
    )
    if len(impact_times) == 0:
        print(f"[WARN] No impacts detected in {wav_path}")
        return []

    # 3) 충돌 주변 1초 segment로 자르기 (overlap 제외)
    segments = trim_audio_around_impacts(
        audio_denoised,
        sr,
        impact_times,
        before_sec=before_sec,
        after_sec=after_sec,
        overlap_handling="exclude",
    )

    # valid_times 계산 (overlap 없는 것만)
    valid_times = []
    for t in impact_times:
        segment_start_time = t - before_sec
        segment_end_time   = t + after_sec
        has_overlap = False
        for other_t in impact_times:
            if other_t == t:
                continue
            if segment_start_time <= other_t <= segment_end_time:
                has_overlap = True
                break
        if not has_overlap:
            valid_times.append(t)

    if len(segments) != len(valid_times):
        min_len = min(len(segments), len(valid_times))
        segments = segments[:min_len]
        valid_times = valid_times[:min_len]

    results = []
    panns_model.eval()
    target_len_samples = int(target_sr * (before_sec + after_sec)) # 1 second for 0.05 + 0.95

    for i, (seg, t) in enumerate(zip(segments, valid_times)):
        # 32kHz로 리샘플 & 길이 고정
        seg_32k = librosa.resample(seg, orig_sr=sr, target_sr=target_sr)
        seg_fixed = _fix_length_for_inference(seg_32k, target_len_samples)

        wav_tensor = torch.from_numpy(seg_fixed.astype(np.float32)).unsqueeze(0).to(device)  # (1, T)

        with torch.no_grad():
            # PANNs fine-tuned model expects raw waveform
            output_dict = panns_model(wav_tensor)
            clipwise_output = output_dict["clipwise_output"]
            probs = F.softmax(clipwise_output, dim=1).cpu().numpy()[0]
            pred_label = int(np.argmax(probs))
            pred_material = label_to_material[pred_label]

        results.append({
            "impact_idx": i,
            "impact_time": float(t),
            "pred_label": pred_label,
            "pred_material": pred_material,
            "probs": probs,
        })

    return results

In [None]:
test_file =  Path("/content/drive/MyDrive/robot_project") /"test_data" / "wood.wav"  # wood, plastic, paper # 이거 test 파일이 좀 이상함 새로 녹음 부탁해야할듯

results = classify_impacts_in_wav(test_file, model, at, device)

for r in results[:10]:  # 앞 10개만 출력
    print(f"time={r['impact_time']:.3f}s, pred={r['pred_material']}, probs={np.round(r['probs'], 3)}")


time=0.296s, pred=Paper, probs=[0.003 0.025 0.008 0.864 0.1  ]


In [None]:
test_file = Path("/content/drive/MyDrive/robot_project") /"test_data" / "paper.wav"

results = classify_impacts_in_wav_finetuned(
    test_file,
    panns_model,
    device,
)

for r in results[:10]:
    print(f"time={r['impact_time']:.3f}s, pred={r['pred_material']}, probs={np.round(r['probs'], 3)}")


time=1.668s, pred=Aluminum, probs=[0.238 0.19  0.197 0.188 0.188]
time=3.013s, pred=Ceramic, probs=[0.149 0.405 0.149 0.149 0.149]
time=4.647s, pred=Ceramic, probs=[0.169 0.313 0.183 0.168 0.168]
time=8.743s, pred=Plastic, probs=[0.142 0.189 0.385 0.142 0.142]
time=10.414s, pred=Ceramic, probs=[0.13  0.353 0.256 0.13  0.13 ]
