In [None]:
# HuBERTで特徴量を抽出(768次元らしい)
import torchaudio
import torchaudio.models
import torchaudio.transforms as transforms
import torch
import os
import numpy as np
import pandas as pd

# HuBERTモデルをロードする関数
def load_hubert_model():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = torchaudio.models.hubert_base()
    model = model.to(device).eval()
    return model, device

# 音声ファイルからHuBERT特徴量を抽出する関数
def extract_hubert_features(audio_path, model, device):
    waveform, sample_rate = torchaudio.load(audio_path)
    if sample_rate != 16000:  # サンプリングレートを16kHzに変換
        resampler = transforms.Resample(orig_freq=sample_rate, new_freq=16000)
        waveform = resampler(waveform)
    waveform = waveform.to(device)
    with torch.no_grad():
        features, _ = model(waveform)
    return features.squeeze(0).cpu().numpy()  # 特徴量をNumPy配列として返す

# アライメント情報を使用して音節ごとの特徴量を抽出する関数
def extract_features_by_alignment(features, alignment_path, sample_rate=16000, hop_length=320):
    alignment = pd.read_csv(alignment_path)
    extracted_features = []
    for _, row in alignment.iterrows():
        start_frame = int(row['Start (s)'] * sample_rate // hop_length)
        end_frame = int(row['End (s)'] * sample_rate // hop_length)
        mora_features = features[start_frame:end_frame].mean(axis=0)  # 平均を計算
        extracted_features.append({
            "Mora": row['Mora'],
            **{f"Feature_{i}": value for i, value in enumerate(mora_features)}
        })
    return extracted_features

# 音声ファイルとアライメント情報を組み合わせて処理する関数
def process_audio_and_alignment(input_audio_dir, alignment_dir, output_dir, model, device, num_files=None):
    audio_files = sorted([f for f in os.listdir(input_audio_dir) if f.endswith(".wav")])
    if num_files is not None:
        audio_files = audio_files[:num_files]
    os.makedirs(output_dir, exist_ok=True)

    for audio_file in audio_files:
        audio_path = os.path.join(input_audio_dir, audio_file)
        alignment_file = f"alignment_{audio_file.replace('BASIC5000_', '').replace('.wav', '.csv')}"
        alignment_path = os.path.join(alignment_dir, alignment_file)
        output_path = os.path.join(output_dir, alignment_file)

        if not os.path.exists(alignment_path):
            print(f"Alignment file not found: {alignment_path}")
            continue

        print(f"Processing {audio_path} with {alignment_path}...")

        # HuBERT特徴量を抽出
        features = extract_hubert_features(audio_path, model, device)

        # 音節ごとの特徴量を抽出
        extracted_features = extract_features_by_alignment(features, alignment_path)

        # CSV形式で保存
        pd.DataFrame(extracted_features).to_csv(output_path, index=False)
        print(f"Saved features to {output_path}")

# メイン処理
if __name__ == "__main__":
    input_audio_directory = "./basic5000"  # 音声ファイルのディレクトリ
    alignment_directory = "./align_result_5000"  # アライメント情報のディレクトリ
    output_directory = "./basic5000_HuBERT_features_csv"  # 特徴量を保存するディレクトリ
    num_files_to_process = 5000  # 任意の処理ファイル数（例: 上から100個）

    # HuBERTモデルのロード
    hubert_model, device = load_hubert_model()

    # 音声とアライメント情報を組み合わせて処理
    process_audio_and_alignment(
        input_audio_directory,
        alignment_directory,
        output_directory,
        hubert_model,
        device,
        num_files=num_files_to_process
    )

Processing ./basic5000/BASIC5000_0001.wav with ./align_result_5000/alignment_0001.csv...
Saved features to ./basic5000_HuBERT_features_csv/alignment_0001.csv
Processing ./basic5000/BASIC5000_0002.wav with ./align_result_5000/alignment_0002.csv...
Saved features to ./basic5000_HuBERT_features_csv/alignment_0002.csv
Processing ./basic5000/BASIC5000_0003.wav with ./align_result_5000/alignment_0003.csv...
Saved features to ./basic5000_HuBERT_features_csv/alignment_0003.csv
Processing ./basic5000/BASIC5000_0004.wav with ./align_result_5000/alignment_0004.csv...
Saved features to ./basic5000_HuBERT_features_csv/alignment_0004.csv
Processing ./basic5000/BASIC5000_0005.wav with ./align_result_5000/alignment_0005.csv...
Saved features to ./basic5000_HuBERT_features_csv/alignment_0005.csv
Processing ./basic5000/BASIC5000_0006.wav with ./align_result_5000/alignment_0006.csv...
Saved features to ./basic5000_HuBERT_features_csv/alignment_0006.csv
Processing ./basic5000/BASIC5000_0007.wav with ./ali

In [None]:
# ここから！！(学習の特徴量として、HuBERTの特徴量の平均値を使用)
import os
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader

class AccentDataset(Dataset):
    def __init__(self, csv_files):
        self.data = []
        for file in csv_files:
            df = pd.read_csv(file)
            for _, row in df.iterrows():
                pooled_feature = row['pooled_feature']
                accent = row['Accent']
                self.data.append((pooled_feature, accent))
    
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        pooled_feature, accent = self.data[idx]
        # 特徴量をベクトルに変換
        pooled_feature = torch.tensor([float(x) for x in pooled_feature.split(',')])  # カンマ区切りで分割
        accent = torch.tensor(accent, dtype=torch.long)
        return pooled_feature, accent

# CSVファイルのパスを取得
csv_folder = 'pooled_features'
csv_files = [os.path.join(csv_folder, f) for f in os.listdir(csv_folder) if f.endswith('.csv')]

# データセットとデータローダの準備
dataset = AccentDataset(csv_files)
train_data, val_data = train_test_split(dataset.data, test_size=0.2, random_state=42)

train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)


In [14]:
import torch.nn as nn

class AccentTransformer(nn.Module):
    def __init__(self, input_dim, num_classes=2, num_heads=4, num_layers=2, hidden_dim=128):
        super(AccentTransformer, self).__init__()
        
        self.input_dim = input_dim
        self.embed_dim = 32  # 埋め込み次元
        self.num_heads = num_heads
        
        # 入力特徴量の次元を埋め込み次元に変換するための線形層
        self.input_projection = nn.Linear(self.input_dim, self.embed_dim)
        
        # Transformer Encoder
        self.encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=self.embed_dim, nhead=self.num_heads, dim_feedforward=hidden_dim),
            num_layers=num_layers
        )
        
        # 最終層（分類器）
        self.fc = nn.Linear(self.embed_dim, num_classes)
    
    def forward(self, x):
        # x: (batch_size, sequence_length, input_dim)
        
        # 特徴量を埋め込み次元に変換
        x = self.input_projection(x)  # (batch_size, sequence_length, embed_dim)
        
        # Transformerに入力するために次元を変更
        x = x.permute(1, 0, 2)  # (sequence_length, batch_size, embed_dim)
        
        # Transformerに通す
        x = self.encoder(x)
        
        # 出力の平均を取る（シーケンス全体の特徴量を集約）
        x = x.mean(dim=0)  # (batch_size, embed_dim)
        
        # 最終的にアクセント（0 or 1）を予測
        output = self.fc(x)  # (batch_size, num_classes)

        return output


In [17]:
from tqdm import tqdm
import torch.optim as optim
import torch.nn.functional as F

# モデルと最適化アルゴリズムの設定
model = AccentTransformer(input_dim=32)  # input_dim は特徴量の数（pooled_feature の次元数）
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# 学習ループ
num_epochs = 100
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(num_epochs):
    model.train()
    pbar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', ncols=100)
    for batch in pbar:
        pooled_features, accents = batch
        pooled_features = pooled_features.float().to(device)
        accents = accents.to(device)
        
        # 順伝播
        outputs = model(pooled_features)
        
        # 損失計算
        loss = criterion(outputs, accents)
        
        # 勾配の初期化と逆伝播
        optimizer.zero_grad()
        loss.backward()
        
        # パラメータの更新
        optimizer.step()

        pbar.set_postfix({'loss': loss.item()})
    
    # 評価
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for batch in val_loader:
            pooled_features, accents = batch
            pooled_features = pooled_features.float().to(device)
            accents = accents.to(device)
            
            outputs = model(pooled_features)
            _, predicted = torch.max(outputs, 1)
            
            total += accents.size(0)
            correct += (predicted == accents).sum().item()

        accuracy = 100 * correct / total
        print(f'Validation Accuracy: {accuracy:.2f}%')


Epoch 1/100:   0%|                                                         | 0/4428 [00:00<?, ?it/s]


RuntimeError: permute(sparse_coo): number of dimensions in the tensor input does not match the length of the desired ordering of dimensions i.e. input.dim() = 1 is not equal to len(dims) = 3

In [10]:
class AccentTransformer(nn.Module):
    def __init__(self, input_dim, num_classes=2, num_heads=4, num_layers=2, hidden_dim=128):
        super(AccentTransformer, self).__init__()

        # 埋め込み次元（入力次元）を num_heads で割り切れる値に変更
        self.embed_dim = 32  # 例: 32 は num_heads=4 で割り切れます
        self.input_dim = input_dim  # 特徴量の次元数を保持
        
        # 入力の次元を埋め込み次元に変換するための線形層
        self.input_projection = nn.Linear(self.input_dim, self.embed_dim)
        
        # Transformer Encoder
        self.encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=self.embed_dim, nhead=num_heads, dim_feedforward=hidden_dim),
            num_layers=num_layers
        )
        
        # 最終層
        self.fc = nn.Linear(self.embed_dim, num_classes)
    
    def forward(self, x):
        # x: (batch_size, sequence_length, feature_dim)
        
        # 特徴量の次元を embed_dim に変換
        x = self.input_projection(x)  # (batch_size, sequence_length, embed_dim)
        
        # Transformerに入力するために次元を変更
        x = x.permute(1, 0, 2)  # (sequence_length, batch_size, embed_dim) に変換
        
        # Transformerに通す
        x = self.encoder(x)
        
        # 出力の平均を取る（シーケンス全体の特徴量を集約）
        x = x.mean(dim=0)  # (batch_size, embed_dim)
        
        # fc 層に入力するために次元を変換
        output = self.fc(x)  # (batch_size, num_classes)

        return output

In [11]:
# モデル、損失関数、最適化器の設定
input_dim = 1  # pooled_featureは1次元なので
model = AccentTransformer(input_dim=input_dim)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 学習ループ
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    # 学習データでのトレーニング
    with tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch") as pbar:
        for features, labels in pbar:
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            
            pbar.set_postfix(loss=loss.item())
    
    print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader)}")

    # 評価データでの評価
    model.eval()
    total_correct = 0
    with torch.no_grad():
        for features, labels in eval_loader:
            outputs = model(features)
            _, predicted = torch.max(outputs, 1)
            total_correct += (predicted == labels).sum().item()

    accuracy = total_correct / len(eval_loader.dataset)
    print(f"Epoch {epoch+1}, Accuracy: {accuracy:.4f}")

Epoch 1/100:   0%|          | 0/4428 [00:00<?, ?batch/s]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x32 and 1x32)