#### **오디오 데이터 추출**

##### **사전 준비**

라이브러리 import

In [1]:
import os
import torch
import torchaudio
import numpy as np
from transformers import ASTFeatureExtractor, ASTModel

사전 학습 모델 import

In [2]:
pretrained_model = "MIT/ast-finetuned-audioset-10-10-0.4593"
feature_extractor = ASTFeatureExtractor.from_pretrained(pretrained_model)
model = ASTModel.from_pretrained(pretrained_model)

GPU 사용 확인 및 모델 설정

In [3]:
print(torch.cuda.is_available()) 
print(torch.cuda.get_device_name(0)) 

True
NVIDIA GeForce RTX 3070 Ti Laptop GPU


In [4]:
model.eval() # 모델 추론 모드로 설정
model.cuda() # GPU 사용

ASTModel(
  (embeddings): ASTEmbeddings(
    (patch_embeddings): ASTPatchEmbeddings(
      (projection): Conv2d(1, 768, kernel_size=(16, 16), stride=(10, 10))
    )
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (encoder): ASTEncoder(
    (layer): ModuleList(
      (0-11): 12 x ASTLayer(
        (attention): ASTAttention(
          (attention): ASTSelfAttention(
            (query): Linear(in_features=768, out_features=768, bias=True)
            (key): Linear(in_features=768, out_features=768, bias=True)
            (value): Linear(in_features=768, out_features=768, bias=True)
          )
          (output): ASTSelfOutput(
            (dense): Linear(in_features=768, out_features=768, bias=True)
            (dropout): Dropout(p=0.0, inplace=False)
          )
        )
        (intermediate): ASTIntermediate(
          (dense): Linear(in_features=768, out_features=3072, bias=True)
          (intermediate_act_fn): GELUActivation()
        )
        (output): ASTOutput(
          (d

#### **1. .wav 파일 로드**

In [5]:
import soundfile as sf

def load_audio(path):
    waveform, sr = sf.read(path, dtype='float32')
    waveform = torch.from_numpy(waveform).squeeze() # [T]
    return waveform, sr

##### **1-1. waveform segmenting**

In [6]:
import torch.nn.functional as F

def split_waveform(waveform, num_segments=18):
    T = waveform.shape[0]
    target_length = int(np.ceil(T / num_segments)) * num_segments  # 최소한 18개로 나눌 수 있는 가장 가까운 수
    pad_len = target_length - T
    if pad_len > 0:
        waveform = F.pad(waveform, (0, pad_len))
    segment_length = waveform.shape[0] // num_segments
    return waveform.split(segment_length)  # 리스트 of [segment_length]

#### **2. Spectrogram 변환**

In [7]:
def extract_spectrogram_segments(segments, sr=16000):
    specs = []
    for segment in segments:
        inputs = feature_extractor(
            segment,
            sampling_rate=sr,
            return_tensors="pt"
        )
        inputs = {k: v.cuda() for k, v in inputs.items()}
        specs.append(inputs)
    return specs  # 리스트 of dicts

#### **3. ASTModel에 입력 & 4. [CLS] 임베딩 추출**

In [65]:
'''
@torch.no_grad()
def forward_ast_sequence(spec_input_list):
    cls_embeddings = []
    for inputs in spec_input_list:
        outputs = model(**inputs)
        cls = outputs.last_hidden_state[:, 0, :]  # shape: [1, 768]
        cls_embeddings.append(cls)
    return torch.cat(cls_embeddings, dim=0)  # shape: [18, 768]
'''
@torch.no_grad()
def forward_ast_sequence(spec_input_list):
    # spec_input_list는 dict들의 리스트
    input_values = torch.cat([inp["input_values"] for inp in spec_input_list], dim=0)  # [18, L, 128]
    outputs = model(input_values)  # 한 번에 처리
    cls_embeddings = outputs.last_hidden_state[:, 0, :]  # [18, 768]
    return cls_embeddings


#### **5. 128-dim Linear Projection & 6. .npy 저장**

In [66]:
projector = torch.nn.Linear(768, 128).cuda()
projector.eval()

def project_sequence_feature(patch_sequence):
    feature_seq = projector(patch_sequence)  # shape: [18, 128]
    feature_seq = feature_seq.detach().cpu().numpy()  # shape: [18, 128]
    return feature_seq

##### **테스트**

In [25]:
audio_path = r"D:\Audio\training\barking\18frames\getty-dog-barks-video-id513564656_7.wav"

#1
waveform, sr = load_audio(audio_path)
print(waveform)
print(waveform.shape)
print(len(waveform))

tensor([0.0847, 0.1185, 0.0897,  ..., 0.0000, 0.0000, 0.0000])
torch.Size([48298])
48298


In [None]:
print(len(split_waveform(waveform)))

18


In [41]:
# 2
segments = split_waveform(waveform, num_segments=18)
spec_inputs = extract_spectrogram_segments(segments)

print(len(segments))
print([seg.shape for seg in segments]) 
print(len(spec_inputs))
print(spec_inputs[0]["input_values"].shape)

18
[torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684]), torch.Size([2684])]
18
torch.Size([1, 1024, 128])


In [42]:
patch_seq = forward_ast_sequence(spec_inputs)
print(patch_seq.shape)             # ✅ torch.Size([18, 768])

torch.Size([18, 768])


In [43]:
projected = projector(patch_seq)
print(projected.shape)             # ✅ torch.Size([18, 128])

torch.Size([18, 128])


##### **피처 추출 및 딕셔너리 구성**

In [None]:
import os
import numpy as np

def process_and_save_features(audio_dir, target_classes):
    results = {}

    for class_name in os.listdir(audio_dir):
        if class_name not in target_classes:
            continue

        input_class_dir = os.path.join(audio_dir, class_name, '18frames')
        if not os.path.exists(input_class_dir):
            continue

        for filename in os.listdir(input_class_dir):
            if not filename.endswith(".wav"):
                continue

            audio_path = os.path.join(input_class_dir, filename)
            video_id = os.path.splitext(filename)[0]
            try:
                waveform, sr = load_audio(audio_path)
                segments = split_waveform(waveform, num_segments=18)
                inputs = extract_spectrogram_segments(segments, sr=16000)
                segments_embedding = forward_ast_sequence(inputs)
                features = project_sequence_feature(segments_embedding)
            except Exception as e:
                print(f"[ERROR] {audio_path}: {e}")
                continue

            key = (class_name, video_id)
            results[key] = features
            print(f"처리 완료: {class_name}/{video_id}, 피처 shape: {features.shape}")

    return results

In [None]:
if __name__ == '__main__':
    audio_dir = root_rgb = r"D:\Audio\training"
    target_classes = [
            "adult+female+singing", "adult+female+speaking", "adult+male+singing",
            "adult+male+speaking", "applauding", "ascending", "asking", "assembling",
            "autographing", "baking", "balancing", "barbecuing", "barking", "bending",
            "bicycling", "biting", "blowing", "boarding", "boating", "boiling"
        ]
    
    results = process_and_save_features(audio_dir, target_classes)
    np.save("audio_filtered.npy", results)

처리 완료: adult+female+singing/2cEKxGB6-YM_35, 피처 shape: (18, 128)
처리 완료: adult+female+singing/90Mk6DgOIAI_35, 피처 shape: (18, 128)
처리 완료: adult+female+singing/BJWOChJ5EKc_1, 피처 shape: (18, 128)
처리 완료: adult+female+singing/getty-factory-workers-singing-and-dancing-during-lunch-break-united-kingdom-video-idmr_00076507_35, 피처 shape: (18, 128)
처리 완료: adult+female+singing/getty-female-officers-singing-christmas-carols-while-playing-an-accordion-video-idmr_00101935_4, 피처 shape: (18, 128)
처리 완료: adult+female+singing/getty-in-world-war-ii-munitions-factory-singer-steps-off-stage-and-begins-video-idmr_00076512_6, 피처 shape: (18, 128)
처리 완료: adult+female+singing/getty-telephone-operator-singing-to-subscribers-and-receiving-flowers-and-video-idmr_00102439_35, 피처 shape: (18, 128)
처리 완료: adult+female+singing/getty-telephone-operators-in-matching-costumes-working-and-singing-united-video-idmr_00102427_12, 피처 shape: (18, 128)
처리 완료: adult+female+singing/peeks-www_k_to_keek_0iPneab_6, 피처 shape: (18, 128)


##### train/test/val split

In [None]:
import numpy as np

# 오디오 전체 딕셔너리 로드
audio_dict = np.load("audio_filtered.npy", allow_pickle=True).item()

# 비디오 기준 키 로드
train_rgb = np.load(r"D:\Video-Feature\training\3.18frames-audio유효-split-feature\rgb_filtered_train.npy", allow_pickle=True).item()
val_rgb = np.load(r"D:\Video-Feature\training\3.18frames-audio유효-split-feature\rgb_filtered_val.npy", allow_pickle=True).item()
test_rgb = np.load(r"D:\Video-Feature\training\3.18frames-audio유효-split-feature\rgb_filtered_test.npy", allow_pickle=True).item()

# 키셋만 따로 저장
train_keys = set(train_rgb.keys())
val_keys = set(val_rgb.keys())
test_keys = set(test_rgb.keys())

# 오디오 피처 분할
train_audio = {k: v for k, v in audio_dict.items() if k in train_keys}
val_audio = {k: v for k, v in audio_dict.items() if k in val_keys}
test_audio = {k: v for k, v in audio_dict.items() if k in test_keys}

# 저장
np.save("audio_filtered_train.npy", train_audio)
np.save("audio_filtered_val.npy", val_audio)
np.save("audio_filtered_test.npy", test_audio)

print(f"✅ 분할 완료: train={len(train_audio)}, val={len(val_audio)}, test={len(test_audio)}")


✅ 분할 완료: train=9074, val=1937, test=1967


#### waveform 추출

In [27]:
import os
import numpy as np
import torch
import torch.nn.functional as F
import soundfile as sf

def load_audio(path):
    waveform, sr = sf.read(path, dtype='float32', always_2d=True)  # 항상 [T, C]
    waveform = torch.from_numpy(waveform[:, 0])  # mono로 변환
    return waveform, sr

def pad_waveform_ceil_18(waveform, num_segments=18):
    T = waveform.shape[0]
    target_length = int(np.ceil(T / num_segments)) * num_segments  # 가장 가까운 18의 배수
    pad_len = target_length - T
    if pad_len > 0:
        waveform = F.pad(waveform, (0, pad_len))  # 오른쪽만 padding
    return waveform  # [T_pad]

def save_waveform(audio_dir, target_classes):
    results = {}

    for class_name in os.listdir(audio_dir):
        if class_name not in target_classes:
            continue

        input_class_dir = os.path.join(audio_dir, class_name, '18frames')
        if not os.path.exists(input_class_dir):
            continue

        for filename in os.listdir(input_class_dir):
            if not filename.endswith(".wav"):
                continue

            audio_path = os.path.join(input_class_dir, filename)
            video_id = os.path.splitext(filename)[0]
            try:
                waveform, sr = load_audio(audio_path)
                waveform = pad_waveform_ceil_18(waveform)
            except Exception as e:
                print(f"[ERROR] {audio_path}: {e}")
                continue

            key = (class_name, video_id)
            results[key] = waveform
            print(f"✔️ 저장 완료: {class_name}/{video_id}, padded length: {waveform.shape[0]}")

    return results


In [28]:
if __name__ == '__main__':
    audio_dir = root_rgb = r"D:\Audio\training"
    target_classes = [
            "adult+female+singing", "adult+female+speaking", "adult+male+singing",
            "adult+male+speaking", "applauding", "ascending", "asking", "assembling",
            "autographing", "baking", "balancing", "barbecuing", "barking", "bending",
            "bicycling", "biting", "blowing", "boarding", "boating", "boiling"
        ]
    
    results = save_waveform(audio_dir, target_classes)
    np.save("audio_waveform.npy", results)

✔️ 저장 완료: adult+female+singing/2cEKxGB6-YM_35, padded length: 48312
✔️ 저장 완료: adult+female+singing/90Mk6DgOIAI_35, padded length: 48312
✔️ 저장 완료: adult+female+singing/BJWOChJ5EKc_1, padded length: 48312
✔️ 저장 완료: adult+female+singing/getty-factory-workers-singing-and-dancing-during-lunch-break-united-kingdom-video-idmr_00076507_35, padded length: 48132
✔️ 저장 완료: adult+female+singing/getty-female-officers-singing-christmas-carols-while-playing-an-accordion-video-idmr_00101935_4, padded length: 48132
✔️ 저장 완료: adult+female+singing/getty-in-world-war-ii-munitions-factory-singer-steps-off-stage-and-begins-video-idmr_00076512_6, padded length: 48132
✔️ 저장 완료: adult+female+singing/getty-telephone-operator-singing-to-subscribers-and-receiving-flowers-and-video-idmr_00102439_35, padded length: 48132
✔️ 저장 완료: adult+female+singing/getty-telephone-operators-in-matching-costumes-working-and-singing-united-video-idmr_00102427_12, padded length: 48132
✔️ 저장 완료: adult+female+singing/peeks-www_k_to_k

In [29]:
from collections import Counter
import numpy as np

# 딕셔너리 불러오기
waveform_dict = np.load("audio_waveform.npy", allow_pickle=True).item()

# 모든 waveform 길이 수집
lengths = [v.shape[0] for v in waveform_dict.values()]

# 길이별 개수 세기
length_counts = Counter(lengths)

# 길이 오름차순 정렬해서 출력
for length in sorted(length_counts.keys()):
    print(f"Length: {length}, Count: {length_counts[length]}")


Length: 42732, Count: 1
Length: 43848, Count: 1
Length: 46080, Count: 1
Length: 46818, Count: 1
Length: 47196, Count: 2
Length: 47556, Count: 2
Length: 47790, Count: 1
Length: 48132, Count: 1228
Length: 48312, Count: 11741


In [30]:
import numpy as np

# 오디오 전체 딕셔너리 로드
audio_dict = np.load("audio_waveform.npy", allow_pickle=True).item()

# 비디오 기준 키 로드
train_rgb = np.load(r"D:\Video-Feature\training\3.18frames-audio유효-split-feature\rgb_filtered_train.npy", allow_pickle=True).item()
val_rgb = np.load(r"D:\Video-Feature\training\3.18frames-audio유효-split-feature\rgb_filtered_val.npy", allow_pickle=True).item()
test_rgb = np.load(r"D:\Video-Feature\training\3.18frames-audio유효-split-feature\rgb_filtered_test.npy", allow_pickle=True).item()

# 키셋만 따로 저장
train_keys = set(train_rgb.keys())
val_keys = set(val_rgb.keys())
test_keys = set(test_rgb.keys())

# 오디오 피처 분할
train_audio = {k: v for k, v in audio_dict.items() if k in train_keys}
val_audio = {k: v for k, v in audio_dict.items() if k in val_keys}
test_audio = {k: v for k, v in audio_dict.items() if k in test_keys}

# 저장
np.save("audio_waveform_train.npy", train_audio)
np.save("audio_waveform_val.npy", val_audio)
np.save("audio_waveform_test.npy", test_audio)

print(f"✅ 분할 완료: train={len(train_audio)}, val={len(val_audio)}, test={len(test_audio)}")


✅ 분할 완료: train=9074, val=1937, test=1967
