In [1]:
import torchaudio
import IPython.display as ipd
from datasets.mix_music import MusicDataset,MixMusicDataLoader
import matplotlib.pyplot as plt
import torchaudio.transforms as T
import torch
print(torch.__version__)
import torchvision
from torch import nn
import os
from torch import optim
from utils.io_utils import create_directory_if_not_exists
from torch.utils.tensorboard import SummaryWriter
import subprocess
from tqdm import tqdm

2.2.0+cu121


In [2]:
DEVICE = 'cuda'
EPOCHS = 1000
BATCH_SIZE = 2
EXP_NAME = "train_model"
INPUT_CHANNEL = 1
OUTPUT_CHANNEL = 2
INPUT_SHAPE = (INPUT_CHANNEL,10000)
OUTPUT_SHAPE = (OUTPUT_CHANNEL,10000)
MAX_DATA_NUM = 50
MAX_CROP_SECOND = 1

In [3]:


model_path = f'save/{EXP_NAME}/'
create_directory_if_not_exists(model_path)

log_path = f'logs/tensorboard/{EXP_NAME}/'
create_directory_if_not_exists(log_path)
writer = SummaryWriter(log_dir=log_path)

process = subprocess.Popen(f'tensorboard --logdir={log_path} --port=6677 --bind_all', shell=True)
# debug_print(f"open tensorboard, cmd: tensorboard --logdir={log_path}")
print(f"open tensorboard: 127.0.0.1:6677")

input_sample = torch.randn((2,) + INPUT_SHAPE).to(DEVICE)
print("input size",input_sample.shape)
output_sample = torch.randn((2,) + OUTPUT_SHAPE).to(DEVICE)
print("output size",output_sample.shape)

open tensorboard: 127.0.0.1:6677
input size torch.Size([2, 1, 10000])
output size torch.Size([2, 2, 10000])


TensorFlow installation not found - running with reduced feature set.

NOTE: Using experimental fast data loading logic. To disable, pass
    "--load_fast=false" and report issues on GitHub. More details:
    https://github.com/tensorflow/tensorboard/issues/4784



In [4]:
dataset = MusicDataset(basicSize=OUTPUT_CHANNEL,maxDataNum=MAX_DATA_NUM,max_CropSecond=MAX_CROP_SECOND)
dataloader = MixMusicDataLoader(dataset,batch_size=BATCH_SIZE)

In [5]:
class AdaptivePositionalEncoding(nn.Module):
    def __init__(self, d_model):
        super(AdaptivePositionalEncoding, self).__init__()
        self.d_model = d_model

        # Initialize learnable positional embeddings
        self.position_embeddings = nn.Parameter(torch.randn(1, 1, d_model), requires_grad=True)

    def forward(self, x):
        batch_size, seq_len, _ = x.size()

        # Expand learnable positional embeddings to match input sequence length
        position_embeddings = self.position_embeddings.expand(batch_size, seq_len, -1)

        return position_embeddings
        
class MyTransformer(nn.Module):
    def __init__(self, input_channels, output_channels, nhead=8, num_layers=3, d_model=512):
        super(MyTransformer, self).__init__()
        self.src_embedding = nn.Linear(input_channels, d_model)
        self.tgt_embedding = nn.Linear(output_channels, d_model)
        self.pos_encoder = AdaptivePositionalEncoding(d_model)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead),
            num_layers=num_layers
        )
        self.transformer_decoder = nn.TransformerDecoder(nn.TransformerDecoderLayer(d_model=d_model, nhead=nhead), num_layers=num_layers)
        self.head = nn.Linear(d_model,output_channels)
        self.input_channels = input_channels
        self.output_channels = output_channels

    def forward(self, src,tgt):
        src = src.permute(2, 0, 1)
        tgt = tgt.permute(2, 0, 1)
        
        embedded_src = self.src_embedding(src)
        embedded_src = self.pos_encoder(embedded_src)
        encoded_src = self.transformer_encoder(embedded_src)
     

        embedded_tgt = self.tgt_embedding(tgt)
        embedded_tgt = self.pos_encoder(embedded_tgt)
        
        decoded_tgt = self.transformer_decoder(embedded_tgt, encoded_src)
       
        out = self.head(decoded_tgt)
       
        out = out.permute(1, 2, 0)
        return out
    
        

In [6]:

from IPython.lib.display import Audio


def inference(model, src, out_channel):
    model.eval()
    start_tensor = torch.zeros(src.size(0), out_channel, 1).to(src.device)
    generated_sequence = []
    with torch.no_grad():
        for _ in tqdm(range(src.size(2)),desc='Inference',position=0, dynamic_ncols=True,leave=False):

            output = model(src, start_tensor)

            next_token_value = output[:, :, -1]

            generated_sequence.append(next_token_value)

            next_token_tensor = torch.unsqueeze(next_token_value, dim=2)

            start_tensor = torch.cat((start_tensor, next_token_tensor), dim=2)

    return generated_sequence
    
def train_model(model, dataloader, optimizer, epoch,writer,model_name):
    model.train()  # 模型训练
    total_loss = 0.0
    loss = []

    loop = tqdm(enumerate(dataloader) ,total=len(dataloader),position=0)
    for batch_index, (data_waveform,target_waveforms,sample_rate) in loop:
        src, target= data_waveform.to(DEVICE), target_waveforms.to(DEVICE)

        optimizer.zero_grad()  # 梯度初始化为0

        output = model(src,target)  # 训练后的结果

        sep_loss = torch.nn.functional.mse_loss(torch.mean(output,dim=1,keepdim=True), src)

        total_loss = sep_loss
        loss.append(total_loss.item())

        total_loss.backward()
        optimizer.step()  # 参数优化
        
        loop.set_description(f'Train Epoch [{epoch}/{EPOCHS}]')
        loop.set_postfix(loss = total_loss.item())

    # Calculate average losses
    avg_loss = total_loss.item() / len(dataloader)
    # mean_loss = sum(loss) / len(loss)
    writer.add_scalar(f'Train/Loss/{model_name}', avg_loss, epoch)
def test_Model(model, dataloader, epoch, writer, minLoss, model_path, model_name):
    model.eval()

    total_loss = 0.0

    with torch.no_grad():
        loop = tqdm(enumerate(dataloader) ,total=len(dataloader),position=0)
        for batch_index, (data_waveform, target_waveforms, sample_rate) in loop:
            src, target = data_waveform.to(DEVICE), target_waveforms.to(DEVICE)


            generated_sequence = inference(model, src, out_channel=target.size(1))

            generated_sequence_tensor = torch.stack(generated_sequence, dim=2).to(DEVICE)
            if epoch%20 == 0 and batch_index == 0:
                tqdm.write("input")
                display(Audio(src[0][0].cpu().numpy(), rate=sample_rate))
                for i in range(generated_sequence_tensor[0].size(0)):
                    tqdm.write(f"output{i}")
                    display(Audio(generated_sequence_tensor[0][i].cpu().numpy(), rate=sample_rate))
            # 计算损失
            sep_loss = torch.nn.functional.mse_loss(torch.mean(generated_sequence_tensor,dim=1,keepdim=True), src)

            total_loss += sep_loss

            avg_loss = total_loss / len(dataloader)
            loop.set_description(f'Test Epoch [{epoch}/{EPOCHS}]')
            loop.set_postfix(loss = avg_loss.item())

        writer.add_scalar(f'Test/Loss/{model_name}', avg_loss, epoch)

        if minLoss > avg_loss:
            model_name = os.path.join(model_path, f'{model_name}.ckpt')
            torch.save(model.state_dict(), model_name)
            print(f'save model to {model_name}')
            return avg_loss

        return minLoss

In [7]:
def exp(MODEL_NAME='LMSN'):
    model = MyTransformer(input_channels=INPUT_CHANNEL,output_channels=OUTPUT_CHANNEL,d_model=8,nhead=OUTPUT_CHANNEL)
    model = model.to(DEVICE)

    writer.add_graph(model, (input_sample,output_sample))
    

    optimizer= optim.Adam(model.parameters())

    print(f"train {MODEL_NAME}")
    print("------------------")

    loss = 0.0
    for epoch in range(1, EPOCHS + 1):
        train_model(model, dataloader, optimizer, epoch,writer,MODEL_NAME)
        loss=test_Model(model, dataloader,epoch,writer,loss,model_path,MODEL_NAME)


In [None]:
exp()

TensorBoard 2.16.2 at http://DESKTOP-UQPIEF5.:6677/ (Press CTRL+C to quit)


train LMSN
------------------


Train Epoch [1/1000]:  45%|█████████████████████████████████████████████████████████████████████████▋                                                                                        | 5/11 [01:51<02:07, 21.17s/it, loss=0.259]