In [None]:
import os
import random
import librosa
import pickle
import pandas as pd
import numpy as np
import librosa.display
import matplotlib.pyplot as plt
from scipy.fft import fft
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
from sklearn.preprocessing import StandardScaler
from tqdm.auto import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
base_path = '/content/drive/MyDrive/project/project1/sample'
features_file_path = '/content/drive/MyDrive/project/project1/sample/features_by_key.pkl'
output_dir = '/content/drive/MyDrive/project/project1/sample/stft_plots'

# **데이터 준비**

드라이브에서 파일들 접근하고, 각종 초기 노이즈 제거 및 정규화를 진행하여 이펙터 별로 라벨링합니다

In [None]:
#remove silent segment
def remove_silent(data, sr, db_threshold=-20, freq_threshold=4096, hop_length=512, n_fft=2048):

  stft = librosa.stft(data, n_fft=n_fft, hop_length=hop_length)
  freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft)

  freq_idx = np.where(freqs >= freq_threshold)[0][0]

  magnitude = np.abs(stft[freq_idx:, :])
  db_values = librosa.amplitude_to_db(magnitude, ref=np.max)

  start_frame = np.argmax(np.any(db_values > db_threshold, axis=0))

  if start_frame == 0 and np.all(db_values[:,0]<=db_threshold):
    return stft

  return stft[:, start_frame:]

In [None]:
#load files
def load_audio_files(base_path, target_sr=22050, max_length=5.0):
  audio_data = {}
  file_index = 0
  for root, dirs, files in tqdm(os.walk(base_path), desc='Walking through directories'):
    files.sort()
    for file in tqdm(files, desc="Processing files", leave=False):
      if file.endswith((".wav", ".mp3")):
        parts=root.split(os.sep)
        effect=parts[-3] if 'samples' not in parts[-2].lower() else parts[-1]
        key = (effect, file_index)
        file_index += 1
        if key not in audio_data:
          audio_data[key]=[]
        file_path = os.path.join(root, file)
        data, sr = librosa.load(file_path, sr=None)
        #Trim leading and trailing silence
        data, _ = librosa.effects.trim(data)

        #Remove silent segment
        data = remove_silent(data, sr, db_threshold=-20, freq_threshold=4096)

        # Amplitude Normalization
        data = librosa.util.normalize(data)

        audio_data[key].append((data, sr))
  return audio_data

In [None]:
def load_sample_files(base_path, target_sr=22050, max_length=5.0):
    audio_data = {}
    file_index = 0
    for root, dirs, files in tqdm(os.walk(base_path), desc='Walking through directories'):
        effect_files = [file for file in files if file.endswith((".wav", ".mp3"))]
        if effect_files:
            file = random.choice(effect_files)  # 각 폴더에서 랜덤으로 1개의 파일 선택
            parts = root.split(os.sep)
            effect = parts[-3] if 'samples' not in parts[-2].lower() else parts[-1]
            key = (effect, file_index)
            file_index += 1
            if key not in audio_data:
                audio_data[key] = []
            file_path = os.path.join(root, file)
            data, sr = librosa.load(file_path, sr=None)
            #Trim leading and trailing silence
            data, _ = librosa.effects.trim(data)

            #Remove silent segment
            data = remove_silent(data, sr, db_threshold=-20, freq_threshold=4096)

            # Amplitude Normalization
            data = librosa.util.normalize(data)

            audio_data[key].append((data, sr))
    return audio_data

In [None]:
#STFT funtion
'''
def ext_STFT_features(data, sr, n_fft=2048, hop_length=512):
  stft_vals = librosa.stft(data, n_fft=n_fft, hop_length=hop_length)
  return np.abs(stft_vals)
'''

'\ndef ext_STFT_features(data, sr, n_fft=2048, hop_length=512):\n  stft_vals = librosa.stft(data, n_fft=n_fft, hop_length=hop_length)\n  return np.abs(stft_vals)\n'

In [None]:
# preparing data
def data(audio_files, batch_size=100):
  features_list=[]
  labels_list=[]
  effect_mapping={
        'BluesDriver': 'drive',
        'Chorus': 'chorus',
        'Clean': 'clean',
        'Digital-Delay': 'delay',
        'Distortion': 'drive',
        'FeedbackDelay': 'delay',
        'Flanger': 'flanger',
        'Hall-Reverb': 'reverb',
        'NoFX': 'clean',
        'Overdrive': 'drive',
        'Phaser': 'phaser',
        'Plate-Reverb': 'reverb',
        'RAT': 'drive',
        'SlapbackDelay': 'delay',
        'Spring-Reverb': 'reverb',
        'Sweep-Echo': 'delay',
        'TapeEcho': 'delay',
        'Tremolo': 'tremolo',
        'TubeScreamer': 'drive',
        'Vibrato': 'vibrato'
  }

  label_mapping={
      'clean' : 0,
      'drive' : 1,
      'reverb' : 2,
      'delay' : 3,
      'chorus' : 4,
      'phaser' : 5,
      'flanger' : 6,
      'tremolo' : 7,
      'vibrato' : 8
  }

  keys = list(audio_files.keys())
  for start in tqdm(range(0, len(keys), batch_size), desc='Preparing data in batches'):
    batch_keys = keys[start:start + batch_size]
    for key in batch_keys:
      effect_type = key[0]
      category = effect_mapping.get(effect_type, None)
      if category in label_mapping:
        for stft in audio_files[key]:
          features_list.append(stft)
          labels_list.append(label_mapping[category])
  return features_list, labels_list, label_mapping

# **STFT plot**

각 이펙터 별 stft확인을 위해 spectrogram을 plot합니다

In [None]:
#plot spectogram graph
def plot_spectogram(feature, sr, title):
  plt.figure(figsize=(10,4))
  librosa.display.specshow(librosa.amplitude_to_db(feature, ref=np.max), sr=sr, hop_length=512, x_axis='time', y_axis='log')
  plt.colorbar(format='%+2.0f dB')
  plt.title(title)
  plt.tight_layout()
  plt.show()

# **데이터 준비**

spectrogram plot 및 cnn 학습을 위한 data 준비 단계입니다

In [None]:
audio_files = load_audio_files(base_path)

Walking through directories: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/1 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files:   0%|          | 0/138 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/1 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/1912 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1872 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1872 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1872 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1872 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1872 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/420 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1260 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1260 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1261 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1260 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/12 [00:00<?, ?it/s]

Processing files:   0%|          | 0/4 [00:00<?, ?it/s]

Processing files:   0%|          | 0/12 [00:00<?, ?it/s]

Processing files:   0%|          | 0/12 [00:00<?, ?it/s]

Processing files:   0%|          | 0/8 [00:00<?, ?it/s]

Processing files:   0%|          | 0/12 [00:00<?, ?it/s]

Processing files:   0%|          | 0/12 [00:00<?, ?it/s]

Processing files:   0%|          | 0/12 [00:00<?, ?it/s]

Processing files:   0%|          | 0/12 [00:00<?, ?it/s]

Processing files:   0%|          | 0/12 [00:00<?, ?it/s]

Processing files:   0%|          | 0/12 [00:00<?, ?it/s]

Processing files:   0%|          | 0/12 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/1 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/624 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1883 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1872 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1872 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1912 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/24 [00:00<?, ?it/s]

Processing files:   0%|          | 0/8 [00:00<?, ?it/s]

Processing files:   0%|          | 0/24 [00:00<?, ?it/s]

Processing files:   0%|          | 0/24 [00:00<?, ?it/s]

Processing files:   0%|          | 0/24 [00:00<?, ?it/s]

Processing files:   0%|          | 0/24 [00:00<?, ?it/s]

Processing files:   0%|          | 0/16 [00:00<?, ?it/s]

Processing files:   0%|          | 0/24 [00:00<?, ?it/s]

Processing files:   0%|          | 0/24 [00:00<?, ?it/s]

Processing files:   0%|          | 0/24 [00:00<?, ?it/s]

Processing files:   0%|          | 0/24 [00:00<?, ?it/s]

Processing files:   0%|          | 0/24 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/1260 [00:00<?, ?it/s]

Processing files: 0it [00:00, ?it/s]

Processing files:   0%|          | 0/1461 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1260 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1261 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1262 [00:00<?, ?it/s]

Processing files:   0%|          | 0/1260 [00:00<?, ?it/s]

In [None]:
features, labels, label_mapping  = data(audio_files)

Preparing data in batches:   0%|          | 0/417 [00:00<?, ?it/s]

In [None]:
#for feature, sr, effect_type in features:
#    title = f"Spectogram of {effect_type}"
#    plot_spectogram(feature, sr, title)

# **CNN 학습 및 평가**

전처리된 데이터를 통해 cnn을 학습하고 f1 score를 확인합니다.

In [None]:
#CNN_dataset
class AudioDataset(Dataset):
  def __init__(self, features, labels):
    self.features = [f[0] for f in features]
    self.labels = labels

    self.features = [self._pad_feature(f) for f in self.features]

  def __len__(self):
    return len(self.features)

  def __getitem__(self, idx):
    feature = self.features[idx]
    label = self.labels[idx]
    feature = np.expand_dims(feature, axis=0)
    return torch.tensor(feature, dtype=torch.float32), torch.tensor(label, dtype=torch.long)

  def _pad_feature(self, feature):
    max_shape = (1025,470)
    padded_feature = np.zeros(max_shape)
    padded_feature[:feature.shape[0], :feature.shape[1]]=feature
    return padded_feature

#CNN_model
class AudioCNN(nn.Module):
  def __init__(self, num_classes):
    super(AudioCNN, self).__init__()
    self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(16,32, kernel_size=3, stride=1, padding=1)
    self.conv3 = nn.Conv2d(32,64, kernel_size=3, stride=1, padding=1)
    self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
    self.fc1 = nn.Linear(64*128*58, 128)
    self.dropout = nn.Dropout(0.5)
    self.fc2 =nn.Linear(128, num_classes)

  def forward(self, x):
    x = self.pool(torch.relu(self.conv1(x)))
    x = self.pool(torch.relu(self.conv2(x)))
    x = self.pool(torch.relu(self.conv3(x)))
    x = x.view(x.size(0),-1)
    x = torch.relu(self.fc1(x))
    x = self.dropout(x)
    x = self.fc2(x)
    return x

In [None]:
#generate dataset / dataloader
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)
train_dataset = AudioDataset(X_train, y_train)
test_dataset = AudioDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

device = torch.device('cuda'if torch.cuda.is_available() else 'cpu')
model = AudioCNN(num_classes=len(label_mapping)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10

for epoch in tqdm(range(num_epochs)):
  model.train()
  running_loss = 0.0
  for inputs, labels in tqdm(train_loader):
    inputs, labels = inputs.to(device), labels.to(device)
    optimizer.zero_grad()
    outputs = model(inputs)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
    running_loss += loss.item()
  print(f'Epoch : {epoch+1}/{num_epochs}, Loss : {running_loss/len(train_loader)}')

  model.eval()
  correct = 0
  total = 0
  with torch.no_grad():
    for inputs, labels in test_loader:
      inputs, labels = inputs.to(device), labels.to(device)
      outputs = model(inputs)
      _, predicted = torch.max(outputs.data, 1)
      total += labels.size(0)
      correct += (predicted == labels).sum().item()

  print(f'Accuracy: {100*correct/total}%')

  padded_feature[:feature.shape[0], :feature.shape[1]]=feature


  0%|          | 0/10 [00:00<?, ?it/s]

  0%|          | 0/800 [00:00<?, ?it/s]

Epoch : 1/10, Loss : 1.5012114041298628
Accuracy: 70.08907641819034%


  0%|          | 0/800 [00:00<?, ?it/s]

Epoch : 2/10, Loss : 0.8132752602919936
Accuracy: 77.80903266135334%


  0%|          | 0/800 [00:00<?, ?it/s]

Epoch : 3/10, Loss : 0.607855347301811
Accuracy: 82.80981403344272%


  0%|          | 0/800 [00:00<?, ?it/s]

Epoch : 4/10, Loss : 0.48651941588148473
Accuracy: 86.41975308641975%


  0%|          | 0/800 [00:00<?, ?it/s]

Epoch : 5/10, Loss : 0.4125347180943936
Accuracy: 87.63869354586654%


  0%|          | 0/800 [00:00<?, ?it/s]

Epoch : 6/10, Loss : 0.35236836439929903
Accuracy: 89.3264572589467%


  0%|          | 0/800 [00:00<?, ?it/s]

Epoch : 7/10, Loss : 0.3017106945905834
Accuracy: 90.0922019065479%


  0%|          | 0/800 [00:00<?, ?it/s]

Epoch : 8/10, Loss : 0.2694427378475666
Accuracy: 91.51429910923582%


  0%|          | 0/800 [00:00<?, ?it/s]

Epoch : 9/10, Loss : 0.24523290873272344
Accuracy: 92.42069073292701%


  0%|          | 0/800 [00:00<?, ?it/s]

Epoch : 10/10, Loss : 0.21643005350488237
Accuracy: 92.56133770901704%


In [None]:
from sklearn.metrics import precision_recall_fscore_support

model.eval()
y_true = []
y_pred = []
with torch.no_grad():
  for inputs, labels in test_loader:
    inputs, labels = inputs.to(device), labels.to(device)
    outputs = model(inputs)
    _, predicted = torch.max(outputs.data,1)
    y_true.extend(labels.cpu().numpy())
    y_pred.extend(predicted.cpu().numpy())

precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average = 'weighted')
print(f"Precision: {precision: .4f}, Recall: {recall: .4f}, F1 Score: {f1: .4f}")

# **GAN synth pytorch**

In [None]:
import os, random
from tqdm import tqdm
import librosa, numpy as np
import tensorflow as tf
from concurrent.futures import ProcessPoolExecutor

effect_mapping = {
    'BluesDriver': 'drive',
    'Chorus': 'chorus',
    'Clean': 'clean',
    'Digital-Delay': 'delay',
    'Distortion': 'drive',
    'FeedbackDelay': 'delay',
    'Flanger': 'flanger',
    'Hall-Reverb': 'reverb',
    'NoFX': 'clean',
    'Overdrive': 'drive',
    'Phaser': 'phaser',
    'Plate-Reverb': 'reverb',
    'RAT': 'drive',
    'SlapbackDelay': 'delay',
    'Spring-Reverb': 'reverb',
    'Sweep-Echo': 'delay',
    'TapeEcho': 'delay',
    'Tremolo': 'tremolo',
    'TubeScreamer': 'drive',
    'Vibrato': 'vibrato'
}

def make_stft_pairs_for_effect(audio_data, effect_category,
                                clean_labels=('Clean','NoFX'),
                                n_fft=2048, hop_length=512):
    """
    audio_data: load_audio_files 로 만든 dict
    effect_category: 'drive','chorus','delay',... 중 하나
    clean_labels: 원본 폴더명이 clean 계열인 경우
    """
    clean_list = []
    effect_list = []
    for (orig_eff, _), waves in audio_data.items():
        if orig_eff in clean_labels:
            for stft, _ in waves:
                clean_list.append(stft.astype(np.complex64))
    for (orig_eff, _), waves in audio_data.items():
        if effect_mapping.get(orig_eff) == effect_category:
            for stft, _ in waves:
                effect_list.append(stft.astype(np.complex64))
    n = min(len(clean_list), len(effect_list))
    paired = list(zip(clean_list[:n], effect_list[:n]))
    return paired

def remove_silent(data, sr,
                  db_threshold=-20,
                  freq_threshold=4096,
                  hop_length=512,
                  n_fft=2048):
    stft = librosa.stft(data, n_fft=n_fft, hop_length=hop_length)
    freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft)
    freq_idx = np.where(freqs >= freq_threshold)[0][0]
    mag = np.abs(stft[freq_idx:,:])
    db = librosa.amplitude_to_db(mag, ref=np.max)
    start = np.argmax(np.any(db > db_threshold, axis=0))
    if start==0 and np.all(db[:,0] <= db_threshold):
        return stft.astype(np.complex64)
    return stft[:, start:].astype(np.complex64)

def process_file(args):
    root, fname, idx, target_sr = args
    data, sr = librosa.load(os.path.join(root, fname), sr=target_sr)
    data, _ = librosa.effects.trim(data)
    stft = remove_silent(data, sr)

    effect = root.split(os.sep)[-3]
    return (effect, idx), (stft.astype(np.complex64), sr)


def load_audio_files(base_path, target_sr=22050, max_workers=4):
    tasks = []
    idx = 0
    for root, _, files in os.walk(base_path):
        for f in files:
            if not f.lower().endswith((".wav", ".mp3")):
                continue
            tasks.append((root, f, idx, target_sr))
            idx += 1

    audio_data = {}
    with ProcessPoolExecutor(max_workers=max_workers) as exe:
        for key, stft_pair in tqdm(
            exe.map(process_file, tasks),
            total=len(tasks),
            desc="Parallel processing"
        ):
            audio_data.setdefault(key, []).append(stft_pair)

    return audio_data

def make_stft_pairs(audio_data, clean_labels=('Clean','NoFX')):
    clean_list, effect_list = [], []
    for (eff, _), waves in audio_data.items():
        for stft, _ in waves:
            if eff in clean_labels: clean_list.append(stft)
            else:                  effect_list.append(stft)
    n = min(len(clean_list), len(effect_list))
    return list(zip(clean_list[:n], effect_list[:n])), \
           list(zip(effect_list[:n], clean_list[:n]) )

BASE = "/content/drive/MyDrive/project/project1/sample/sound samples 1"

audio_data = load_audio_files(BASE, target_sr=22050, max_workers=4)

print(set(orig_eff for (orig_eff,_) in audio_data))

pairs_ce, pairs_ec = make_stft_pairs(audio_data)

Parallel processing: 100%|██████████| 8970/8970 [26:34<00:00,  5.63it/s]

{'Flanger', 'Chorus', 'Hall-Reverb', 'RAT', 'TubeScreamer', 'Digital-Delay', 'Plate-Reverb', 'Clean', 'Sweep-Echo', 'TapeEcho', 'Phaser', 'BluesDriver', 'Spring-Reverb'}





In [None]:
!git clone https://github.com/ss12f32v/GANsynth-pytorch.git /content/GANsynth-pytorch

Cloning into '/content/GANsynth-pytorch'...
remote: Enumerating objects: 36, done.[K
remote: Total 36 (delta 0), reused 0 (delta 0), pack-reused 36 (from 1)[K
Receiving objects: 100% (36/36), 1.60 MiB | 39.91 MiB/s, done.
Resolving deltas: 100% (8/8), done.


In [None]:
!ls /content/GANsynth-pytorch

 Inference.ipynb	     PGGAN.py		  spec_ops.py
 LICENSE		     phase_operation.py   spectrograms_helper.py
'Make Training Data.ipynb'   pytorch_nsynth_lib   STFT.py
 normalizer.py		     README.md		  train.py


In [None]:
from google.colab import drive
import os
import h5py
import numpy as np

drive.mount('/content/drive')

DRIVE_HDF5_DIR = '/content/drive/MyDrive/project/project1/gansynth/hdf5'
os.makedirs(DRIVE_HDF5_DIR, exist_ok=True)

available = sorted({eff for (eff, _) in audio_data.keys()})
print(">> audio_data has effects:", available)

def preprocess_and_stack(stft: np.ndarray) -> np.ndarray:
    """
    입력 stft: complex numpy array of shape [F, T]
    반환    : float32 array of shape [2, F, T]
               channel 0 = magnitude (log-scaled), channel 1 = phase
    """
    mag = np.abs(stft)
    mag = np.log1p(mag)
    phase = np.angle(stft)
    # [2, F, T]
    return np.stack([mag, phase], axis=0).astype(np.float32)

categories = ['drive','chorus','delay','flanger','phaser','reverb']

for cat in categories:
    # A) clean -> effect
    ce_pairs = make_stft_pairs_for_effect(audio_data, cat)
    print(f"\n--- Category '{cat}' → clean→{cat}: found {len(ce_pairs)} pairs ---")
    if not ce_pairs:
        print(f"Skipping clean2{cat}.h5 (no data)")
    else:
        ce_path = os.path.join(DRIVE_HDF5_DIR, f'clean2{cat}.h5')
        with h5py.File(ce_path, 'w') as f:
            grp = f.create_group('examples')
            for i, (clean_stft, eff_stft) in enumerate(ce_pairs):
                grp.create_dataset(f'in_{i}',  data=preprocess_and_stack(clean_stft))
                grp.create_dataset(f'out_{i}', data=preprocess_and_stack(eff_stft))
        print(f"Saved clean2{cat}.h5 with {len(ce_pairs)} examples")

    # B) effect -> clean (swap pairs)
    ec_pairs = [(e, c) for c, e in ce_pairs]
    print(f"--- Category '{cat}' → {cat}→clean: found {len(ec_pairs)} pairs ---")
    if not ec_pairs:
        print(f"Skipping effect2{cat}.h5 (no data)")
    else:
        ec_path = os.path.join(DRIVE_HDF5_DIR, f'effect2{cat}.h5')
        with h5py.File(ec_path, 'w') as f:
            grp = f.create_group('examples')
            for i, (eff_stft, clean_stft) in enumerate(ec_pairs):
                grp.create_dataset(f'in_{i}',  data=preprocess_and_stack(eff_stft))
                grp.create_dataset(f'out_{i}', data=preprocess_and_stack(clean_stft))
        print(f"Saved effect2{cat}.h5 with {len(ec_pairs)} examples")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
>> audio_data has effects: ['BluesDriver', 'Chorus', 'Clean', 'Digital-Delay', 'Flanger', 'Hall-Reverb', 'Phaser', 'Plate-Reverb', 'RAT', 'Spring-Reverb', 'Sweep-Echo', 'TapeEcho', 'TubeScreamer']

--- Category 'drive' → clean→drive: found 690 pairs ---
Saved clean2drive.h5 with 690 examples
--- Category 'drive' → drive→clean: found 690 pairs ---
Saved effect2drive.h5 with 690 examples

--- Category 'chorus' → clean→chorus: found 690 pairs ---
Saved clean2chorus.h5 with 690 examples
--- Category 'chorus' → chorus→clean: found 690 pairs ---
Saved effect2chorus.h5 with 690 examples

--- Category 'delay' → clean→delay: found 690 pairs ---
Saved clean2delay.h5 with 690 examples
--- Category 'delay' → delay→clean: found 690 pairs ---
Saved effect2delay.h5 with 690 examples

--- Category 'flanger' → clean→flanger: found 690 pairs ---
Saved clean2flanger.h5 with 690

In [None]:
!pip install intervaltree

Collecting intervaltree
  Downloading intervaltree-3.1.0.tar.gz (32 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: intervaltree
  Building wheel for intervaltree (setup.py) ... [?25l[?25hdone
  Created wheel for intervaltree: filename=intervaltree-3.1.0-py2.py3-none-any.whl size=26098 sha256=d5bc39d2ab9343d6e6fd5ad3aadb7c62bb2bf82b3f886b17272c24407c44f61d
  Stored in directory: /root/.cache/pip/wheels/31/d7/d9/eec6891f78cac19a693bd40ecb8365d2f4613318c145ec9816
Successfully built intervaltree
Installing collected packages: intervaltree
Successfully installed intervaltree-3.1.0


In [None]:
%cd /content/GANsynth-pytorch

/content/GANsynth-pytorch


In [None]:
!mkdir -p data

In [None]:
try:
    f.close()
except NameError:
    pass

In [None]:

import os, h5py, numpy as np
import phase_operation
import spectrograms_helper as spec_helper

def make_stft_pairs_for_effect(audio_data,
                                effect_category,
                                clean_labels=('Clean','NoFX')):
    clean_list, effect_list = [], []
    # 1. clean 톤만 모으기
    for (eff, _), waves in audio_data.items():
        if eff in clean_labels:
            for stft, _ in waves:
                clean_list.append(stft)
    # 2. 원하는 이펙터 톤만 모으기
    for (eff, _), waves in audio_data.items():
        if effect_mapping.get(eff) == effect_category:
            for stft, _ in waves:
                effect_list.append(stft)
    # 3. 길이 맞춰 페어 생성
    n = min(len(clean_list), len(effect_list))
    return list(zip(clean_list[:n], effect_list[:n]))

def preprocess_and_stack(stft: np.ndarray) -> np.ndarray:
    mag   = np.log1p(np.abs(stft))
    phase = np.angle(stft)
    return np.stack([mag, phase], axis=0).astype(np.float32)

import os, h5py, numpy as np
import librosa
import phase_operation

import os
import h5py
import numpy as np
import librosa
import phase_operation

# setting
sr         = 22050
n_fft      = 2048
hop_length = 512
n_mels     = 128

F_target, T_target = 256, 256

mel_basis = librosa.filters.mel(sr=sr, n_fft=n_fft, n_mels=n_mels)

# ─────────────────────────────────────────
# 2) clean→drive STFT 페어 가져오기
# ─────────────────────────────────────────
ce_pairs = make_stft_pairs_for_effect(audio_data, 'drive')
N = len(ce_pairs)
print(f"▶ clean→drive 페어 {N}개")

# ─────────────────────────────────────────
# 3) 결과 담을 리스트 초기화
# ─────────────────────────────────────────
spec_list     = []
IF_list       = []
pitch_list    = []
mel_spec_list = []
mel_IF_list   = []

# ─────────────────────────────────────────
# 4) 각 STFT에 대해 크롭/패딩 + 멜 계산
# ─────────────────────────────────────────
# ─────────────────────────────────────────
# for clean_stft, _ in ce_pairs:
# ─────────────────────────────────────────
for clean_stft, _ in ce_pairs:
    # 1) 원본 차원
    full_mag   = np.abs(clean_stft)               # [1025, T]
    full_phase = np.angle(clean_stft)             # [1025, T]
    full_IF    = phase_operation.instantaneous_frequency(full_phase, time_axis=1).astype(np.float32)  # [1025, T]

    # 2) 멜 계산 (원본 STFT에 멜 필터 적용)
    #    mel_basis: [n_mels, 1025]
    logmel     = np.log1p(mel_basis.dot(full_mag))  # [n_mels, T]
    mel_if     = mel_basis.dot(full_IF)             # [n_mels, T]

    # 3) 네트워크 입력용 256×T로 크롭
    st         = clean_stft[:F_target, :]          # [256, T]
    mag        = np.log1p(np.abs(st))              # [256, T]
    phase      = np.angle(st)                      # [256, T]
    spec       = np.stack([mag, phase], axis=0)    # [2,256,T]
    IF         = phase_operation.instantaneous_frequency(phase, time_axis=1).astype(np.float32)

    # (4) 패딩용 빈 배열 생성
    spec_pad     = np.zeros((2, F_target, T_target), dtype=np.float32)
    IF_pad       = np.zeros((F_target, T_target),   dtype=np.float32)
    mel_spec_pad = np.zeros((n_mels,   T_target),   dtype=np.float32)
    mel_IF_pad   = np.zeros((n_mels,   T_target),   dtype=np.float32)

    # 실제 프레임 수
    T_spec   = spec.shape[2]
    T_IF     = IF.shape[1]
    T_mel    = logmel.shape[1]   # 동일하게 mel_if.shape[1]
    # (pitch는 1D이므로 T_pitch = T_spec)

    # 복사
    spec_pad    [:, :, :T_spec] = spec
    IF_pad      [:, :T_IF]      = IF
    mel_spec_pad[:, :T_mel]     = logmel
    mel_IF_pad  [:, :T_mel]     = mel_if
    pitch_pad   = np.zeros((T_target,), dtype=np.int32)
    pitch_pad   [:T_spec]       = 0  # 필요 시 실제 pitch 정보로 대체

    # 리스트에 추가
    spec_list    .append(spec_pad)
    IF_list      .append(IF_pad)
    pitch_list   .append(pitch_pad)
    mel_spec_list.append(mel_spec_pad)
    mel_IF_list  .append(mel_IF_pad)

# ─────────────────────────────────────────
# 6) HDF5 쓰기
# ─────────────────────────────────────────
hdf5_path = 'data/Nsynth_valid_spec_IF_pitch_and_melSpec.hdf5'
os.makedirs(os.path.dirname(hdf5_path), exist_ok=True)
with h5py.File(hdf5_path, 'w') as f:
    f.create_dataset('Spec',     data=np.stack(spec_list,    axis=0))  # [N,2,256,256]
    f.create_dataset('IF',       data=np.stack(IF_list,      axis=0))  # [N,256,256]
    f.create_dataset('pitch',    data=np.stack(pitch_list,   axis=0))  # [N,256]
    f.create_dataset('mel_Spec', data=np.stack(mel_spec_list,axis=0))  # [N,128,256]
    f.create_dataset('mel_IF',   data=np.stack(mel_IF_list,  axis=0))  # [N,128,256]

print("✅ 실제 멜 스펙트로그램과 멜 IF를 포함한 256×256 HDF5 생성 완료:", hdf5_path)

▶ clean→drive 페어 690개
✅ 실제 멜 스펙트로그램과 멜 IF를 포함한 256×256 HDF5 생성 완료: data/Nsynth_valid_spec_IF_pitch_and_melSpec.hdf5


In [None]:
# 1) 포팅판 디렉토리로 이동
%cd /content/GANsynth-pytorch

/content/GANsynth-pytorch


In [None]:
import torch

# GPU가 있으면 cuda:0, 없으면 cpu 로 자동 선택
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:
%%bash
# (1) pitch_classifier 정의 수정
sed -i 's|self.pitch_classifier= nn.Linear(self.channel_list\[0\]\*2\*16, 128)|self.pitch_classifier= nn.Linear(512, 128)|' PGGAN.py

# (2) discriminator_classifier 정의 수정
sed -i 's|self.discriminator_classifier= nn.Linear(self.channel_list\[0\]\*2\*16, 1)|self.discriminator_classifier= nn.Linear(512, 1)|' PGGAN.py

# (3) 제대로 바뀌었는지 확인
grep -n "classifier" PGGAN.py

319:            pitch_distribution = self.Softmax(self.pitch_classifier(x))
320:            discriminator_output= self.discriminator_classifier(x)
338:            pitch_distribution = self.Softmax(self.pitch_classifier(x))
339:            discriminator_output= self.discriminator_classifier(x)
362:        """ Create pitch classifier and discriminator output"""
364:            self.pitch_classifier= nn.Linear(512, 128)
365:            self.discriminator_classifier= nn.Linear(512, 1)


In [None]:
%%bash
# 1) 기존 PGGAN.py 백업
cp /content/GANsynth-pytorch/PGGAN.py /content/GANsynth-pytorch/PGGAN.py.bak

# 2) GitHub에서 원본 파일만 다운로드해 덮어쓰기
curl -sL https://raw.githubusercontent.com/ss12f32v/GANsynth-pytorch/main/PGGAN.py \
  -o /content/GANsynth-pytorch/PGGAN.py

In [None]:
%%bash
# 1) 작업 디렉토리로 이동
cd /content

# 2) 기존 포팅판 전체 삭제
rm -rf GANsynth-pytorch

# 3) 깔끔하게 다시 클론
git clone https://github.com/ss12f32v/GANsynth-pytorch.git
cd GANsynth-pytorch

# 4) 분류기 in_features만 512로 쉽게 교체
sed -i "s/self.pitch_classifier *= *nn.Linear(self.channel_list\[0\]\*2\*16, */        self.pitch_classifier         = nn.Linear(512, /" PGGAN.py
sed -i "s/self.discriminator_classifier *= *nn.Linear(self.channel_list\[0\]\*2\*16, */        self.discriminator_classifier = nn.Linear(512, /" PGGAN.py

# 5) 제대로 바뀌었나 확인
echo "▶ pitch_classifier:"
grep -n "pitch_classifier" PGGAN.py
echo "▶ discriminator_classifier:"
grep -n "discriminator_classifier" PGGAN.py

▶ pitch_classifier:
319:            pitch_distribution = self.Softmax(self.pitch_classifier(x))
338:            pitch_distribution = self.Softmax(self.pitch_classifier(x))
364:                    self.pitch_classifier         = nn.Linear(512, 128)
▶ discriminator_classifier:
320:            discriminator_output= self.discriminator_classifier(x)
339:            discriminator_output= self.discriminator_classifier(x)
365:                    self.discriminator_classifier = nn.Linear(512, 1)


Cloning into 'GANsynth-pytorch'...


In [None]:
%cd /content/GANsynth-pytorch

!sed -i "1iimport torch\ndevice = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\nprint(f'Using device: {device}')" train.py
!sed -i "s/\.cuda()/\.to(device)/g" train.py
!sed -i "s/model\.cuda()/model.to(device)/g" train.py

/content/GANsynth-pytorch


In [None]:
%%bash
cd /content/GANsynth-pytorch

# train.py에서 기존 data/경로를 /content/data/... 경로로 교체
sed -i "s|h5py.File('data/Nsynth_valid_spec_IF_pitch_and_melSpec.hdf5'|h5py.File('/content/data/Nsynth_valid_spec_IF_pitch_and_melSpec.hdf5'|g" train.py

# 잘 바뀌었는지 확인
grep -n "h5py.File" train.py

42:        # self.dataset = h5py.File('../data/Nsynth_spec_IF_pitch.hdf5','r')     
43:        self.dataset = h5py.File('/content/data/Nsynth_valid_spec_IF_pitch_and_melSpec.hdf5','r')     


In [None]:
%%bash
cd /content/GANsynth-pytorch

echo "── lines 150–180 of train.py ──"
sed -n '150,180p' train.py

── lines 150–180 of train.py ──
            # train mel spec IF
            spec = mel_spec
            IF = mel_IF

            stack_real_image = torch.stack((spec,IF),dim=1).to(device)
            stack_real_image = torch.transpose(stack_real_image,2,3)

            little_batch_size = spec.size()[0]


            
            while stack_real_image.size()[2] != current_level_res:
                stack_real_image = self.avg_layer(stack_real_image)
            stack_real_image = stack_real_image.cuda(0)

        
            if net_status =='stable':
                net_alpha = 1.0
            elif net_status =='fadein':
                
                if little_batch_size==BATCH_SIZE:
                    net_alpha = 1.0 - (cur_step * TOTAL_DATA_SIZE + batch_idx * little_batch_size) / (self.fadein_steps * TOTAL_DATA_SIZE)
                else:
                    net_alpha = 1.0 - (cur_step * TOTAL_DATA_SIZE + batch_idx*(BATCH_SIZE) + little_batch_size) / (sel

In [None]:
%%bash
cd /content/GANsynth-pytorch

# (1) train.py에서 avg_layer 호출이 있는 라인을 확인
grep -n "self.avg_layer" train.py

98:        self.avg_layer = torch.nn.AvgPool2d((2,2),stride=(2,2))
162:                stack_real_image = self.avg_layer(stack_real_image)


In [None]:
%%bash
cd /content/GANsynth-pytorch

# (1) grep으로 while문 위치 확인 (여러분 환경에선 161~163 사이일 수 있습니다)
grep -n "while stack_real_image.size" train.py

161:            while stack_real_image.size()[2] != current_level_res:


In [None]:
%%bash

# 예를 들어 162번이라면 LINENO=162 로 세팅
LINENO=161

# (2) 그 바로 위에 if current_level_res > 1: 를 삽입
sed -i "${LINENO}i\            if current_level_res > 1:" train.py

# (3) while 루프 안의 두 줄을 한 단계 들여쓰기
sed -i "${LINENO},$((LINENO+1))s|^|                |" train.py

            
            while stack_real_image.size()[2] != current_level_res:
                stack_real_image = self.avg_layer(stack_real_image)


In [None]:
%%bash

# (4) 변경된 부분 확인
sed -n "160, 163p" train.py


            
            while stack_real_image.size()[2] != current_level_res:
                stack_real_image = self.avg_layer(stack_real_image)


In [None]:
# clean → drive 학습
!python train.py \
  --batch_size    16 \
  --num_epochs    50 \
  --learning_rate 2e-4 \
  --log_dir       /content/drive/MyDrive/project/project1/gansynth/checkpoint/clean2drive

  File "/content/GANsynth-pytorch/train.py", line 169
    stack_real_image = self.avg_layer(stack_real_image)
IndentationError: expected an indented block after 'if' statement on line 168


In [None]:
# drive → clean 학습
python train.py \
  --batch_size    16 \
  --num_epochs    50 \
  --learning_rate 2e-4 \
  --log_dir       /content/drive/MyDrive/project/project1/gansynth/checkpoint/drive2clean

# **GAN synth**

In [None]:
!git clone https://github.com/magenta/magenta.git
%cd magenta/magenta/models/gansynth

!pip install -q tensorflow-gpu==1.15 magenta librosa tqdm

%pwd

Cloning into 'magenta'...
remote: Enumerating objects: 16700, done.[K
remote: Counting objects: 100% (12/12), done.[K
remote: Compressing objects: 100% (11/11), done.[K
remote: Total 16700 (delta 4), reused 2 (delta 1), pack-reused 16688 (from 2)[K
Receiving objects: 100% (16700/16700), 36.70 MiB | 8.74 MiB/s, done.
Resolving deltas: 100% (12763/12763), done.
/content/magenta/magenta/models/gansynth
[31mERROR: Could not find a version that satisfies the requirement tensorflow-gpu==1.15 (from versions: 2.12.0)[0m[31m
[0m[31mERROR: No matching distribution found for tensorflow-gpu==1.15[0m[31m
[0m

'/content/magenta/magenta/models/gansynth'

In [None]:
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
tf.compat.v1.disable_v2_behavior()

print("TensorFlow version:", tf.__version__)

Instructions for updating:
non-resource variables are not supported in the long term


TensorFlow version: 2.18.0


In [None]:
import os

BASE = "/content/drive/MyDrive/project/project1/sample/sound samples 1"

# 1. 전체 leaf 디렉토리 수 (파일이 실제로 들어 있는 폴더)
leaf_dirs = sum(1 for root, dirs, files in os.walk(BASE) if any(f.lower().endswith((".wav",".mp3")) for f in files))

# 2. 전체 파일 수
total_files = sum(1 for root, dirs, files in os.walk(BASE) for f in files if f.lower().endswith((".wav",".mp3")))

print(f"▶ Leaf 디렉토리 수: {leaf_dirs}")
print(f"▶ 전체 오디오 파일 수: {total_files}")

▶ Leaf 디렉토리 수: 65
▶ 전체 오디오 파일 수: 8970


In [None]:
import os, random
from tqdm import tqdm
import librosa, numpy as np
import tensorflow as tf
from concurrent.futures import ProcessPoolExecutor

# effect_mapping
effect_mapping = {
    'BluesDriver': 'drive',
    'Chorus': 'chorus',
    'Clean': 'clean',
    'Digital-Delay': 'delay',
    'Distortion': 'drive',
    'FeedbackDelay': 'delay',
    'Flanger': 'flanger',
    'Hall-Reverb': 'reverb',
    'NoFX': 'clean',
    'Overdrive': 'drive',
    'Phaser': 'phaser',
    'Plate-Reverb': 'reverb',
    'RAT': 'drive',
    'SlapbackDelay': 'delay',
    'Spring-Reverb': 'reverb',
    'Sweep-Echo': 'delay',
    'TapeEcho': 'delay',
    'Tremolo': 'tremolo',
    'TubeScreamer': 'drive',
    'Vibrato': 'vibrato'
}

def make_stft_pairs_for_effect(audio_data, effect_category,
                                clean_labels=('Clean','NoFX'),
                                n_fft=2048, hop_length=512):
    """
    audio_data: load_audio_files 로 만든 dict
    effect_category: 'drive','chorus','delay',... 중 하나
    clean_labels: 원본 폴더명이 clean 계열인 경우
    """
    clean_list = []
    effect_list = []
    # 1. clean STFT 수집
    for (orig_eff, _), waves in audio_data.items():
        if orig_eff in clean_labels:
            for stft, _ in waves:
                clean_list.append(stft.astype(np.complex64))
    # 2. 선택한 이펙터 STFT 수집
    for (orig_eff, _), waves in audio_data.items():
        if effect_mapping.get(orig_eff) == effect_category:
            for stft, _ in waves:
                effect_list.append(stft.astype(np.complex64))
    # 3. 길이를 맞춰 페어 생성
    n = min(len(clean_list), len(effect_list))
    paired = list(zip(clean_list[:n], effect_list[:n]))
    return paired


# preprocessing function
def remove_silent(data, sr,
                  db_threshold=-20,
                  freq_threshold=4096,
                  hop_length=512,
                  n_fft=2048):
    stft = librosa.stft(data, n_fft=n_fft, hop_length=hop_length)
    freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft)
    freq_idx = np.where(freqs >= freq_threshold)[0][0]
    mag = np.abs(stft[freq_idx:,:])
    db = librosa.amplitude_to_db(mag, ref=np.max)
    start = np.argmax(np.any(db > db_threshold, axis=0))
    if start==0 and np.all(db[:,0] <= db_threshold):
        return stft.astype(np.complex64)
    return stft[:, start:].astype(np.complex64)


def process_file(args):
    root, fname, idx, target_sr = args
    data, sr = librosa.load(os.path.join(root, fname), sr=target_sr)
    data, _ = librosa.effects.trim(data)
    stft = remove_silent(data, sr)
    effect = root.split(os.sep)[-3]
    return (effect, idx), (stft.astype(np.complex64), sr)


def load_audio_files(base_path, target_sr=22050, max_workers=4):
    tasks = []
    idx = 0
    for root, _, files in os.walk(base_path):
        for f in files:
            if not f.lower().endswith((".wav", ".mp3")):
                continue
            tasks.append((root, f, idx, target_sr))
            idx += 1

    audio_data = {}
    with ProcessPoolExecutor(max_workers=max_workers) as exe:
        for key, stft_pair in tqdm(
            exe.map(process_file, tasks),
            total=len(tasks),
            desc="Parallel processing"
        ):
            audio_data.setdefault(key, []).append(stft_pair)

    return audio_data

# TFRecord Writer
def _bytes_feature(v): return tf.train.Feature(bytes_list=tf.train.BytesList(value=[v]))
def _int64_feature(v): return tf.train.Feature(int64_list=tf.train.Int64List(value=[v]))

def write_pairs_to_tfrecord(paired, out_path):
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    with tf.io.TFRecordWriter(out_path) as writer:
        for clean_stft, effect_stft in tqdm(paired, desc=f"Writing {os.path.basename(out_path)}"):
            f_bins, t_steps = clean_stft.shape
            feat = {
                'clean_real': tf.train.Feature(bytes_list=tf.train.BytesList(value=[clean_stft.real.tobytes()])),
                'clean_imag': tf.train.Feature(bytes_list=tf.train.BytesList(value=[clean_stft.imag.tobytes()])),
                'effect_real': tf.train.Feature(bytes_list=tf.train.BytesList(value=[effect_stft.real.tobytes()])),
                'effect_imag': tf.train.Feature(bytes_list=tf.train.BytesList(value=[effect_stft.imag.tobytes()])),
                'freq_bins': tf.train.Feature(int64_list=tf.train.Int64List(value=[f_bins])),
                'time_steps': tf.train.Feature(int64_list=tf.train.Int64List(value=[t_steps])),
            }
            ex = tf.train.Example(features=tf.train.Features(feature=feat))
            writer.write(ex.SerializeToString())

def make_stft_pairs(audio_data, clean_labels=('Clean','NoFX')):
    clean_list, effect_list = [], []
    for (eff, _), waves in audio_data.items():
        for stft, _ in waves:
            if eff in clean_labels: clean_list.append(stft)
            else:                  effect_list.append(stft)
    n = min(len(clean_list), len(effect_list))
    return list(zip(clean_list[:n], effect_list[:n])), \
           list(zip(effect_list[:n], clean_list[:n]) )

# main preprocessing pipeline
BASE = "/content/drive/MyDrive/project/project1/sample/sound samples 1"

audio_data = load_audio_files(BASE, target_sr=22050, max_workers=4)

print(set(orig_eff for (orig_eff,_) in audio_data))

pairs_ce, pairs_ec = make_stft_pairs(audio_data)

# TFRecord 저장
os.makedirs("/content/tfrecords", exist_ok=True)
for category in ['drive','chorus','delay','flanger','phaser','reverb','tremolo','vibrato']:
    paired = make_stft_pairs_for_effect(audio_data, category)
    tfrecord_path = f"/content/tfrecords/clean2{category}.tfrecord"
    write_pairs_to_tfrecord(paired, tfrecord_path)
    print(f"→ {category} TFRecord done: {len(paired)} pairs")

Parallel processing: 100%|██████████| 8970/8970 [25:25<00:00,  5.88it/s]


{'Flanger', 'TubeScreamer', 'Sweep-Echo', 'Spring-Reverb', 'Chorus', 'RAT', 'Digital-Delay', 'TapeEcho', 'BluesDriver', 'Clean', 'Plate-Reverb', 'Phaser', 'Hall-Reverb'}


Writing clean2drive.tfrecord: 100%|██████████| 690/690 [00:13<00:00, 51.68it/s]


→ drive TFRecord done: 690 pairs


Writing clean2chorus.tfrecord: 100%|██████████| 690/690 [00:13<00:00, 52.39it/s]


→ chorus TFRecord done: 690 pairs


Writing clean2delay.tfrecord: 100%|██████████| 690/690 [00:13<00:00, 51.77it/s]


→ delay TFRecord done: 690 pairs


Writing clean2flanger.tfrecord: 100%|██████████| 690/690 [00:13<00:00, 51.64it/s]


→ flanger TFRecord done: 690 pairs


Writing clean2phaser.tfrecord: 100%|██████████| 690/690 [00:13<00:00, 51.88it/s]


→ phaser TFRecord done: 690 pairs


Writing clean2reverb.tfrecord: 100%|██████████| 690/690 [00:13<00:00, 50.97it/s]


→ reverb TFRecord done: 690 pairs


Writing clean2tremolo.tfrecord: 0it [00:00, ?it/s]


→ tremolo TFRecord done: 0 pairs


Writing clean2vibrato.tfrecord: 0it [00:00, ?it/s]

→ vibrato TFRecord done: 0 pairs





In [None]:
!mkdir -p /content/drive/MyDrive/project/project1/gansynth/tfrecords

!cp /content/tfrecords/*.tfrecord /content/drive/MyDrive/project/project1/gansynth/tfrecords

!ls -lh /content/drive/MyDrive/project/project1/gansynth/tfrecords

total 13G
-rw------- 1 root root 2.2G May 10 17:25 clean2chorus.tfrecord
-rw------- 1 root root 2.2G May 10 17:25 clean2delay.tfrecord
-rw------- 1 root root 2.2G May 10 17:25 clean2drive.tfrecord
-rw------- 1 root root 2.2G May 10 17:26 clean2flanger.tfrecord
-rw------- 1 root root 2.2G May 10 17:26 clean2phaser.tfrecord
-rw------- 1 root root 2.2G May 10 17:26 clean2reverb.tfrecord
-rw------- 1 root root    0 May 10 17:26 clean2tremolo.tfrecord
-rw------- 1 root root    0 May 10 17:26 clean2vibrato.tfrecord


In [None]:
categories = ['drive','chorus','delay','flanger','phaser','reverb']

TF_DIR = "/content/drive/MyDrive/project/project1/gansynth/tfrecords"

for category in categories:
    ce_pairs = make_stft_pairs_for_effect(audio_data, category)
    tfrecord_path = f"{TF_DIR}/effect2{category}.tfrecord"
    write_pairs_to_tfrecord(ec_pairs, tfrecord_path)
    print(f"→ effect2{category}.tfrecord: {len(ec_pairs)} pairs")

Writing effect2drive.tfrecord: 100%|██████████| 690/690 [00:19<00:00, 35.93it/s]


→ effect2drive.tfrecord: 690 pairs


Writing effect2chorus.tfrecord: 100%|██████████| 690/690 [00:18<00:00, 38.26it/s]


→ effect2chorus.tfrecord: 690 pairs


Writing effect2delay.tfrecord: 100%|██████████| 690/690 [00:17<00:00, 39.88it/s]


→ effect2delay.tfrecord: 690 pairs


Writing effect2flanger.tfrecord: 100%|██████████| 690/690 [00:16<00:00, 40.63it/s]


→ effect2flanger.tfrecord: 690 pairs


Writing effect2phaser.tfrecord: 100%|██████████| 690/690 [00:16<00:00, 41.03it/s]


→ effect2phaser.tfrecord: 690 pairs


Writing effect2reverb.tfrecord: 100%|██████████| 690/690 [00:17<00:00, 39.10it/s]

→ effect2reverb.tfrecord: 690 pairs





In [None]:
!ls -lh /content/drive/MyDrive/project/project1/gansynth/tfrecords

total 26G
-rw------- 1 root root 2.2G May 10 17:25 clean2chorus.tfrecord
-rw------- 1 root root 2.2G May 10 17:25 clean2delay.tfrecord
-rw------- 1 root root 2.2G May 10 17:25 clean2drive.tfrecord
-rw------- 1 root root 2.2G May 10 17:26 clean2flanger.tfrecord
-rw------- 1 root root 2.2G May 10 17:26 clean2phaser.tfrecord
-rw------- 1 root root 2.2G May 10 17:26 clean2reverb.tfrecord
-rw------- 1 root root    0 May 10 17:26 clean2tremolo.tfrecord
-rw------- 1 root root    0 May 10 17:26 clean2vibrato.tfrecord
-rw------- 1 root root 2.2G May 10 17:27 effect2chorus.tfrecord
-rw------- 1 root root 2.2G May 10 17:28 effect2delay.tfrecord
-rw------- 1 root root 2.2G May 10 17:27 effect2drive.tfrecord
-rw------- 1 root root 2.2G May 10 17:28 effect2flanger.tfrecord
-rw------- 1 root root 2.2G May 10 17:28 effect2phaser.tfrecord
-rw------- 1 root root 2.2G May 10 17:29 effect2reverb.tfrecord


In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
os.environ['PYTHONPATH'] = '/content/magenta'

!pip install -q librosa tqdm note-seq tensorflow-gan

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m367.1/367.1 kB[0m [31m25.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# train clean2drive
!python gansynth_train.py \
  --train_data_path=/content/drive/MyDrive/project/project1/gansynth/tfrecords/clean2drive.tfrecord \
  --train_root_dir=/content/drive/MyDrive/project/project1/checkpoint/drive \
  --dataset_name=paired \
  --data_type=paired_stft \
  --audio_length=64000 \
  --sample_rate=22050 \
  --batch_size_schedule=[16] \
  --generator_learning_rate=0.0002 \
  --discriminator_learning_rate=0.0002

# test clean2drive
!python gansynth_generate.py \
  --checkpoint_dir=/content/drive/MyDrive/project/project1/checkpoint/drive \
  --mode=generate \
  --output_dir=/content/drive/MyDrive/project/project1/output/drive \
  --sample_rate=22050 \
  --num_outputs=10

2025-05-10 17:39:56.670121: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1746898796.688094   18605 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1746898796.693557   18605 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
Traceback (most recent call last):
  File "/content/magenta/magenta/models/gansynth/gansynth_train.py", line 46, in <module>
    from magenta.models.gansynth.lib import data_helpers
  File "/content/magenta/magenta/models/gansynth/lib/data_helpers.py", line 22, in <module>
    from magenta.models.gansynth.lib import train_util
  File "/content/magenta/magenta/models/gansynth/lib/train_util.py", line 30, in <module>
    import tensorf

In [None]:
# train drive2clean
!python gansynth_train.py \
  --train_data_path=/content/drive/MyDrive/project/project1/gansynth/tfrecords/effect2drive.tfrecord \
  --train_root_dir=/content/drive/MyDrive/project/project1/checkpoint/drive2clean \
  --dataset_name=paired \
  --data_type=paired_stft \
  --audio_length=64000 \
  --sample_rate=22050 \
  --batch_size_schedule=[16] \
  --generator_learning_rate=0.0002 \
  --discriminator_learning_rate=0.0002

# test drive2clean
!python gansynth_generate.py \
  --checkpoint_dir=/content/drive/MyDrive/project/project1/checkpoint/drive2clean \
  --mode=generate \
  --output_dir=/content/drive/MyDrive/project/project1/gansynth/samples/drive2clean \
  --sample_rate=22050 \
  --num_outputs=10

# GAN 사용 연습

clean tone to effector tone /
effector tone to clean tone
을 학습시키고 생성합니다

In [None]:
# calculate frequency
def calculate_freq_difference(clean_stft, effect_stft):
  return effect_stft - clean_stft

In [None]:
#data preparing for GAN generate
clean_files = []
drive_files = []

for key, stfts in audio_files.items():
  effect_type=key[0]
  if effect_type in ['Clean','NoFX']:
    clean_files.extend([stft for stft in stfts])
  elif effect_type in ['BluesDriver', 'Distortion', 'Overdrive', 'RAT', 'TubeScreamer']:
    drive_files.extend([stft for stft in stfts])

paired_data = []
for clean, effect in zip(clean_files, drive_files):
  clean_stft, sr = clean
  effect_stft, _ = effect
#  if clean_sr != effect_sr:
#    raise ValueError('Sample rates do not match!')
  paired_data.append((clean_stft, effect_stft))

class GANAudioDataset(Dataset):
  def __init__(self, paired_data):
    self.clean_stfts = [data[0] for data in paired_data]
    self.effect_stfts = [data[1] for data in paired_data]
    self.max_shape = (1025, max([stft.shape[1] for stft in self.clean_stfts+self.effect_stfts]))

  def __len__(self):
    return len(self.clean_stfts)

  def __getitem__(self, idx):
    clean_stft = self._pad_feature(self.clean_stfts[idx])
    effect_stft = self._pad_feature(self.effect_stfts[idx])
    clean_stft = np.stack((clean_stft.real, clean_stft.imag), axis=0)
    effect_stft = np.stack((effect_stft.real, effect_stft.imag), axis=0)
    return torch.tensor(clean_stft, dtype=torch.float32), torch.tensor(effect_stft, dtype=torch.float32)

  def _pad_feature(self, feature):
    padded_feature = np.zeros(self.max_shape, dtype=np.complex64)
    padded_feature[:feature.shape[0], :feature.shape[1]]=feature
    return padded_feature

dataset = GANAudioDataset(paired_data)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

class Generator(nn.Module):
  def __init__(self):
    super(Generator, self).__init__()
    self.main = nn.Sequential(
        nn.Conv2d(2, 64, kernel_size=4, stride=2, padding=1),
        nn.BatchNorm2d(64),
        nn.ReLU(True),
        nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
        nn.BatchNorm2d(128),
        nn.ReLU(True),
        nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
        nn.BatchNorm2d(256),
        nn.ReLU(True),
        nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
        nn.BatchNorm2d(128),
        nn.ReLU(True),
        nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
        nn.BatchNorm2d(64),
        nn.ReLU(True),
        nn.ConvTranspose2d(64, 2, kernel_size=4, stride=2, padding=1),
        nn.Tanh()
    )

  def forward(self, x):
    return self.main(x)

class Discriminator(nn.Module):
  def __init__(self):
    super(Discriminator, self).__init__()
    self.main = nn.Sequential(
        nn.Conv2d(2,64,kernel_size=4, stride=2, padding=1),
        nn.LeakyReLU(0.2, inplace=True),
        nn.Conv2d(64,128,kernel_size=4, stride=2, padding=1),
        nn.BatchNorm2d(128),
        nn.LeakyReLU(0.2, inplace=True),
        nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
        nn.BatchNorm2d(256),
        nn.LeakyReLU(0.2, inplace=True),
        nn.Conv2d(256,1, kernel_size=4, stride=1, padding=0),
        nn.Sigmoid()
    )

  def forward(self, x):
    return self.main(x)

In [None]:
#training model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

generator = Generator().to(device)
discriminator = Discriminator().to(device)

criterion = nn.BCELoss()
optimizer_g = optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5,0.999))
optimizer_d = optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5,0.999))

num_epochs = 50

for epoch in tqdm(range(num_epochs), desc=f'{epoch+1} time updating'):
  for clean_stft, effect_stft in tqdm(dataloader):
    clean_stft, effect_stft = clean_stft.to(device), effect_stft.to(device)

    # Train Discriminator
    discriminator.zero_grad()
    real_outputs = discriminator(effect_stft)
    real_labels = torch.ones_like(real_outputs)
    real_loss = criterion(real_outputs, real_labels)
    real_loss.backward()

    fake_stft = generator(clean_stft)
    fake_outputs = discriminator(fake_stft.detach())
    fake_labels = torch.zeros_like(fake_outputs)
    fake_loss = criterion(fake_outputs, fake_labels)
    fake_loss.backward()
    optimizer_d.step()

    # Train Generator
    generator.zero_grad()
    fake_outputs = discriminator(fake_stft)
    g_loss = criterion(fake_outputs, real_labels)
    g_loss.backward()
    optimizer_g.step()

  print(f'Epoch [{epoch+1}/{num_epochs}], d_loss : {(real_loss.item()+fake_loss.item())/2}, g_loss : {g_loss.item()}')

#Save the trained models
generator_save_path = '/content/drive/MyDrive/project/project1/models/generator.pth'
discriminator_save_path = '/content/drive/MyDrive/project/project1/models/discriminator.pth'

# Save the trained models to Google Drive
torch.save(generator.state_dict(), generator_save_path)
torch.save(discriminator.state_dict(), discriminator_save_path)

print(f"Generator model saved to {generator_save_path}")
print(f"Discriminator model saved to {discriminator_save_path}")


1 time updating:   0%|          | 0/50 [00:00<?, ?it/s]

  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [1/50], d_loss : 0.43963825702667236, g_loss : 1.94021475315094


  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [2/50], d_loss : 0.13231704011559486, g_loss : 3.1996243000030518


  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [3/50], d_loss : 0.07692711614072323, g_loss : 4.598793983459473


  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [4/50], d_loss : 0.05839582346379757, g_loss : 5.176196098327637


  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [5/50], d_loss : 0.022945511154830456, g_loss : 6.03462028503418


  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [6/50], d_loss : 0.013909435365349054, g_loss : 6.475983142852783


  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [7/50], d_loss : 0.010060438420623541, g_loss : 6.335396766662598


  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [8/50], d_loss : 0.008387899957597256, g_loss : 6.378857612609863


  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [9/50], d_loss : 0.00596982566639781, g_loss : 6.854513645172119


  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [10/50], d_loss : 0.005926721962168813, g_loss : 7.084181785583496


  0%|          | 0/55 [00:00<?, ?it/s]

Epoch [11/50], d_loss : 0.004000481450930238, g_loss : 6.991405487060547


  0%|          | 0/55 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
def load_generator(model_path, device):
    model = Generator().to(device)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

# Function to convert STFT to time-domain signal
def stft_to_audio(stft, sr, hop_length=512, n_fft=2048):
    stft_complex = stft[0] + 1j * stft[1]
    audio = librosa.istft(stft_complex, hop_length=hop_length)
    return audio

def generate_effect_tone(clean_audio_path, generator, device, sr=22050, hop_length=512, n_fft=2048):
    clean_audio, _ = librosa.load(clean_audio_path, sr=sr)

    # Compute STFT
    clean_stft = librosa.stft(clean_audio, n_fft=n_fft, hop_length=hop_length)
    clean_stft = np.stack((clean_stft.real, clean_stft.imag), axis=0)
    clean_stft = torch.tensor(clean_stft, dtype=torch.float32).unsqueeze(0).to(device)



generator_model_path = 'generator.pth'

clean_audio_path = '/content/drive/MyDrive/project/project1/sample/sound samples 1/Clean/Clean/Bridge/1-13.wav'

# Load the generator model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
generator = load_generator(generator_model_path, device)

# Generate effect tone
effect_audio = generate_effect_tone(clean_audio_path, generator, device)

# Save the generated effect audio
librosa.output.write_wav('generated_effect_audio.wav', effect_audio, sr=22050)