Import Library

In [1]:
import os
import music21 as m21
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import importlib as imp
from torch.utils.data import DataLoader
import tensorflow as tf
import pickle
import time
import matplotlib.pyplot as plt
import torch.nn.functional as F

2024-03-26 22:45:16.790733: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [16]:
import a02_transformer
imp.reload(a02_transformer)
import a00_funs_make_symbol_seqs as fmseq
imp.reload(fmseq)
from a01_melody_preprocessor import MelodyPreprocessor
from a02_transformer import TransformerModel
from a04_melody_generator import MelodyGenerator
import a03_train

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

Using device: cpu


設定參數

In [4]:
## Parameters for Data Preprocessing
time_signature = '4/4'
beats_per_measure=4
step_duration = 0.25  # 0.25 = a 1/16 note 
acceptable_durations = np.arange(0.25, 8.1, 0.25) 

In [8]:
Kern_Dataset_Path = "/Users/ranli/Documents/python_ve/MS_Pytorch_Thesis/Thesis/German_Folk/deutschl/erk"
Save_Path = "/Users/ranli/Documents/python_ve/MS_Pytorch_Thesis/Thesis/German_Folk/dataset"

載入資料集 （如果已經載入過，可以直接使用 pickle 檔）

In [17]:
## Import Data and Prepare batches
songs = fmseq.load_songs_in_kern(Kern_Dataset_Path)
melodies = fmseq.make_melody_symbol_sequences(songs, time_signature, acceptable_durations, Save_Path)

In [18]:
preprocessor = MelodyPreprocessor(melodies)
training_dataset = preprocessor.create_training_dataset()
training_batches = DataLoader(training_dataset, shuffle=True,
                              batch_size=128)

print(preprocessor.vocab_size)
print(preprocessor.data_size)
print(preprocessor.seq_length)

149
25762
140


儲存成 pickle 檔案，讓 Data 不要每次都重載一次

In [24]:
# 存儲 preprocessor 物件
with open('preprocessor.pkl', 'wb') as f:
    pickle.dump(preprocessor, f)

# 存儲 training_dataset
with open('training_dataset.pkl', 'wb') as f:
    pickle.dump(training_dataset, f)

之後要使用，載入 pickle 檔案

In [14]:
# # 載入 preprocessor 物件
# with open('preprocessor.pkl', 'rb') as f:
#     preprocessor = pickle.load(f)

# # 載入 training_dataset
# with open('training_dataset.pkl', 'rb') as f:
#     training_dataset = pickle.load(f)

In [15]:
# training_batches = DataLoader(training_dataset, shuffle=True,
#                               batch_size=128)

搭建模型

In [6]:
# def key_padding_mask(seq, pad_token=0):
#     return (seq == pad_token)

# def look_ahead_mask(dim):
#     return nn.Transformer.generate_square_subsequent_mask(dim)

上述方法會出現 warning:
UserWarning: Support for mismatched key_padding_mask and attn_mask is deprecated. Use same type for both instead

In [25]:
def key_padding_mask(seq, pad_token=0):
    # 轉換為布林類型
    return (seq == pad_token).bool()

def look_ahead_mask(dim):  #使下三角變為 ０（from ChatGPT 有點怪）
    # 使用相同類型的遮罩
    mask = torch.triu(torch.ones(dim, dim, dtype=torch.bool), diagonal=1)
    return mask  # 將布林張量取反 ~mask

In [26]:
def test_masks():
    # 測試 key_padding_mask
    seq = torch.tensor([[1, 2, 0, 0], [3, 0, 0, 0], [4, 5, 6, 0]])
    pad_token = 0
    padding_mask = key_padding_mask(seq, pad_token)
    print("Padding Mask:")
    print(padding_mask)

    # 測試 look_ahead_mask
    dim = 4
    ahead_mask = look_ahead_mask(dim)
    print("\nLook Ahead Mask:")
    print(ahead_mask)

test_masks()

Padding Mask:
tensor([[False, False,  True,  True],
        [False,  True,  True,  True],
        [False, False, False,  True]])

Look Ahead Mask:
tensor([[False,  True,  True,  True],
        [False, False,  True,  True],
        [False, False, False,  True],
        [False, False, False, False]])


In [None]:
# def padding_mask_2(seq, pad_idx):
#     return (seq != pad_idx).unsqueeze(-2)   # [B, 1, L]

# def sequence_mask_2(seq):
#     batch_size, seq_len = seq.size()
#     mask = 1- torch.triu(torch.ones((seq_len, seq_len), dtype=torch.uint8),diagonal=1)
#     mask = (mask!=0).unsqueeze(0).expand(batch_size, -1, -1)  # [B, L, L]
#     return mask

# def test():
#     # 以最简化的形式测试Transformer的两种mask
#     seq = torch.LongTensor([[1,2,0]]) # batch_size=1, seq_len=3，padding_idx=0
#     embedding = torch.nn.Embedding(num_embeddings=3, embedding_dim=10, padding_idx=0)
#     query, key = embedding(seq), embedding(seq)
#     scores = torch.matmul(query, key.transpose(-2, -1))  #最后得到的token之间相互的分数

#     mask_p = padding_mask_2(seq, 0)
#     mask_s = sequence_mask_2(seq)
#     mask_decoder = mask_p & mask_s # 结合 padding mask 和 sequence mask
#     print(mask_p)
#     print(mask_s)
#     print(mask_decoder)

#     scores_encoder = scores.masked_fill(mask_p==0, -1e9) # 对于scores，在mask==0的位置填充
#     scores_decoder = scores.masked_fill(mask_decoder==0, -1e9)
#     print(scores_encoder)
#     print(scores_decoder)

In [27]:
def position_encoding(num_pos, d_model):
    position = torch.arange(num_pos).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2).float() *
                         (-torch.log(torch.tensor(10000.0)) / d_model))
    angles = position * div_term
    pos_encoding = torch.zeros(num_pos, d_model)
    pos_encoding[:, 0::2] = torch.sin(angles)
    pos_encoding[:, 1::2] = torch.cos(angles)
    return pos_encoding.unsqueeze(0)  # Add batch dimension

In [28]:
class TransformerModel(nn.Module):

    def __init__(self, d_model, nhead, dropout, dim_feedforward, vocab_size_padding,
                 num_encoder_layers, num_decoder_layers, device):
        super(TransformerModel, self).__init__()  
        self.d_model = d_model
        self.device = device
        self.embedding = nn.Embedding(vocab_size_padding, d_model).to(device)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, 
                                                        dropout=dropout, batch_first=True)
        self.encoder = nn.TransformerEncoder(self.encoder_layer, num_encoder_layers)
        self.decoder_layer = nn.TransformerDecoderLayer(d_model, nhead, dim_feedforward, 
                                                        dropout=dropout, batch_first=True)
        self.decoder = nn.TransformerDecoder(self.decoder_layer, num_decoder_layers)
        self.dropout = nn.Dropout(dropout)
        self.final_layer = nn.Linear(d_model, vocab_size_padding)



    def forward(self, src, tgt):
        src_padding_mask = key_padding_mask(src).to(self.device)
        tgt_padding_mask = key_padding_mask(tgt).to(self.device)
        tgt_mask = look_ahead_mask(tgt.size(-1)).to(self.device)  
        scale_factor = torch.sqrt(torch.tensor(self.d_model, dtype=torch.float32, device=self.device))

        x = self.embedding(src) 
        x *= scale_factor
        x += position_encoding(src.size(-1), self.d_model).to(self.device)
        x = self.dropout(x)
        enc_output = self.encoder(x, src_key_padding_mask=src_padding_mask)

        y = self.embedding(tgt)
        y *= scale_factor
        y += position_encoding(tgt.size(-1), self.d_model).to(self.device)
        y = self.dropout(y)
        dec_output = self.decoder(y, enc_output, tgt_mask=tgt_mask,
                                  tgt_key_padding_mask=tgt_padding_mask)
        output = self.final_layer(dec_output)
        return output

Training 設定

In [29]:
# Model Specification and Training
vocab_size_padding = preprocessor.vocab_size + 1
model = TransformerModel(d_model=128, nhead=2, dim_feedforward=128, dropout=0.1, 
                         vocab_size_padding=vocab_size_padding, 
                         num_encoder_layers=6, num_decoder_layers=6, device=device)
model = model.to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0005)

  from .autonotebook import tqdm as notebook_tqdm


In [30]:
# epochs = 200
# save_interval = 20
epochs = 3
save_interval = 1
save_dir= "/Users/ranli/Documents/python_ve/MS_Pytorch_Thesis/Thesis/German_Folk/epoch_train"
start_sequence = ['C4-1.0', 'G4-1.0', 'E4-1.0', 'C4-1.0']

losses = []
epoch_times = [] 

for epoch in range(epochs):
    start_time = time.time()
    average_loss = a03_train.train_each_step(training_batches, model, 
                                             criterion, optimizer, device)
    losses.append(average_loss)  # 將當前 epoch 的 loss 加入到列表中

    
    end_time = time.time()
    epoch_duration = end_time - start_time
    epoch_times.append(epoch_duration)

    print(f'Epoch {epoch + 1}/{epochs}, Average Loss: {average_loss},Duration: {epoch_duration} seconds')

    if epoch > 0 and (epoch + 1) % save_interval == 0:
        melody_generator = MelodyGenerator(model, preprocessor.tokenizer, device)
        new_melody = melody_generator.generate(start_sequence, preprocessor.tokenizer)
        np.savetxt(f"{save_dir}/{epoch + 1}.txt", new_melody, fmt='%s')

KeyboardInterrupt: 

In [None]:
# 將 loss 和每個 epoch 的執行時間寫入同一個文件中
with open(f"{save_dir}/loss_and_epoch_times.txt", "w") as file:
    for epoch, (loss, duration) in enumerate(zip(losses, epoch_times), 1):
        file.write(f'Epoch {epoch}, Average Loss: {loss}, Duration: {duration} seconds\n')

In [None]:
# ## Generation
# start_sequence = ["C4-2.0", "G4-2.0", "E4-2.0", "D4-1.0", "C4-1.0"]
# start_sequence = ["C4-2.0", "F4-2.0", "A4-1.0", "D5-0.5", "C5-0.5"]
# melody_generator = MelodyGenerator(model, preprocessor.tokenizer, device)
# new_melody = melody_generator.generate(start_sequence, preprocessor.tokenizer)
# print(f"Generated melody: {new_melody}") 

In [None]:
## Save model
torch.save(model.state_dict(), 'model_state_dict.pth')
torch.save(optimizer.state_dict(), 'optimizer_state_dict.pth')

In [None]:
# ## Load model
# model = TransformerModel(d_model=128, nhead=2, dim_feedforward=128, dropout=0.1, 
#                          vocab_size_padding=vocab_size_padding, 
#                          num_encoder_layers=2, num_decoder_layers=2, device=device)
# model.load_state_dict(torch.load('model_state_dict.pth'))
# optimizer.load_state_dict(torch.load('optimizer_state_dict.pth'))

In [None]:
# 讀取 loss 和每個 epoch 的執行時間文件
losses = []
epoch_times = []
with open(f"{save_dir}/loss_and_epoch_times.txt", "r") as file:
    for line in file:
        if line.startswith('Epoch'):
            parts = line.strip().split(', ')
            loss = float(parts[1].split(': ')[1])
            losses.append(loss)
            duration = float(parts[2].split(': ')[1].split()[0])
            epoch_times.append(duration)

# 生成 epochs
epochs = range(1, len(losses) + 1)

# 繪製 loss 曲線
plt.plot(epochs, losses, 'y', label='Training loss', linewidth=2.0)
plt.title('Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.show()

In [None]:
# Generated_melody =('C4-2.0', 'F4-2.0', 'A4-1.0', 'D5-0.5', 'C5-0.5', 'B4-2.0', 'A4-2.0', 'R-1.0', 'C5-1.0', 'C5-1.0', 'B4-1.0', 'A4-1.0', 'G4-1.0', 'G4-1.0', 'A4-1.0', 'B4-1.0', 'C5-0.5', 'D5-0.5', 'D5-2.0', 'C5-0.5', 'D5-0.5', 'D5-2.0', 'C5-2.0', 'R-1.0', 'C5-1.0', 'C5-1.0', 'B4-1.0', 'B4-1.0', 'A4-1.0', 'G4-1.0', 'G4-1.0', 'A4-1.0', 'B4-1.0', 'C5-0.5', 'D5-0.5', 'C5-0.5', 'B4-2.0', 'R-1.0', 'D5-0.5', 'D5-2.0', 'C5-1.0', 'C5-1.0', 'B4-1.0', 'B4-1.0', 'C5-1.0', 'C5-1.0', 'B4-1.0', 'B4-1.0', 'A4-1.0', 'A4-2.0')

In [4]:
# import music21 as m21

# def melody_symbols_to_midi(melody_symbols, output_file):
#     stream = m21.stream.Stream()
#     for symbol in melody_symbols:
#         if symbol.startswith('R'):
#             rest_duration = float(symbol.split('-')[1])
#             rest = m21.note.Rest(quarterLength=rest_duration)
#             stream.append(rest)
#         else:
#             pitch_name, duration = symbol.split('-')
#             note = m21.note.Note(pitch_name, quarterLength=float(duration))
#             stream.append(note)
#     stream.write('midi', fp=output_file)

# # 使用示例
# Generated_melody = ['C4-2.0', 'F4-2.0', 'A4-1.0', 'D5-0.5', 'C5-0.5', 'B4-2.0', 'A4-2.0', 'R-1.0', 'C5-1.0', 'C5-1.0', 'B4-1.0', 'A4-1.0', 'G4-1.0', 'G4-1.0', 'A4-1.0', 'B4-1.0', 'C5-0.5', 'D5-0.5', 'D5-2.0', 'C5-0.5', 'D5-0.5', 'D5-2.0', 'C5-2.0', 'R-1.0', 'C5-1.0', 'C5-1.0', 'B4-1.0', 'B4-1.0', 'A4-1.0', 'G4-1.0', 'G4-1.0', 'A4-1.0', 'B4-1.0', 'C5-0.5', 'D5-0.5', 'C5-0.5', 'B4-2.0', 'R-1.0', 'D5-0.5', 'D5-2.0', 'C5-1.0', 'C5-1.0', 'B4-1.0', 'B4-1.0', 'C5-1.0', 'C5-1.0', 'B4-1.0', 'B4-1.0', 'A4-1.0', 'A4-2.0']
# melody_symbols_to_midi(Generated_melody, 'generated_melody.mid')

In [5]:
# Generated_melody_2 =  ['C4-1.0', 'G4-1.0', 'E4-0.5', 'D4-0.5', 'C4-2.0', 'G4-1.0', 'A4-1.0', 'A4-1.0', 'G4-1.0', 'G4-1.0', 'A4-1.0', 'A4-1.0', 'A4-1.0', 'G4-1.0', 'G4-1.0', 'A4-1.0', 'B4-1.0', 'C5-1.0', 'B4-1.0', 'A4-1.0', 'A4-1.0', 'G4-1.0', 'G4-1.0', 'A4-1.0', 'B4-1.0', 'C5-1.0', 'B4-1.0', 'A4-1.0', 'A4-1.0', 'G4-1.0', 'G4-1.0', 'A4-1.0', 'B4-1.0', 'C5-1.0', 'B4-1.0', 'A4-1.0', 'A4-1.0', 'G4-1.0', 'G4-1.0', 'A4-1.0', 'B4-1.0', 'C5-1.0', 'G4-1.0', 'E4-1.0', 'A4-1.0', 'A4-1.0', 'G4-1.0', 'G4-1.0', 'G4-1.0', 'A4-1.0']
# melody_symbols_to_midi(Generated_melody_2, 'generated_melody_2.mid')

其他紀錄：
- CPU : one epoch for  about 30 mins
- CPU : 3 epoch for 104m
- 可以嘗試不同的起始值的影響

Try Padding

In [9]:
# import torch

# def padding_mask_2(seq, pad_idx):
#     return (seq != pad_idx).unsqueeze(-2)   # [B, 1, L]

# def sequence_mask_2(seq):
#     batch_size, seq_len = seq.size()
#     mask = 1- torch.triu(torch.ones((seq_len, seq_len), dtype=torch.uint8),diagonal=1)
#     mask = (mask!=0).unsqueeze(0).expand(batch_size, -1, -1)  # [B, L, L]
#     return mask

# def test():
#     # 以最简化的形式测试Transformer的两种mask
#     seq = torch.LongTensor([[1,2,0]]) # batch_size=1, seq_len=3，padding_idx=0
#     embedding = torch.nn.Embedding(num_embeddings=3, embedding_dim=10, padding_idx=0)
#     query, key = embedding(seq), embedding(seq)
#     scores = torch.matmul(query, key.transpose(-2, -1))  #最后得到的token之间相互的分数

#     mask_p = padding_mask_2(seq, 0)
#     mask_s = sequence_mask_2(seq)
#     mask_decoder = mask_p & mask_s # 结合 padding mask 和 sequence mask
#     print(mask_p)
#     print(mask_s)
#     print(mask_decoder)

#     scores_encoder = scores.masked_fill(mask_p==0, -1e9) # 对于scores，在mask==0的位置填充
#     scores_decoder = scores.masked_fill(mask_decoder==0, -1e9)
#     print(scores_encoder)
#     print(scores_decoder)

# test()

tensor([[[ True,  True, False]]])
tensor([[[ True, False, False],
         [ True,  True, False],
         [ True,  True,  True]]])
tensor([[[ True, False, False],
         [ True,  True, False],
         [ True,  True, False]]])
tensor([[[ 6.8092e+00, -9.4008e-01, -1.0000e+09],
         [-9.4008e-01,  6.5163e+00, -1.0000e+09],
         [ 0.0000e+00,  0.0000e+00, -1.0000e+09]]],
       grad_fn=<MaskedFillBackward0>)
tensor([[[ 6.8092e+00, -1.0000e+09, -1.0000e+09],
         [-9.4008e-01,  6.5163e+00, -1.0000e+09],
         [ 0.0000e+00,  0.0000e+00, -1.0000e+09]]],
       grad_fn=<MaskedFillBackward0>)
