![](https://chendongze.oss-cn-shanghai.aliyuncs.com/ipic/ihkvn.png)
# 下载处理后的数据库数据库

In [1]:

# !gdown --id 18x-aCvxJpOrWJoZ5VAuIe-xzNJHH9lqH

In [2]:
# !unzip pop17.zip -d pop17-dataset


In [1]:
!pip install miditok miditoolkit tqdm



In [3]:
from Model import *
from utils import batch
import math
import random
from dataset import CP_Word_Dataset


## 设置模型与训练参数

In [4]:
train_seq_length = 1025
database_name = 'pop17'
eval_out_path = './pop17-dataset/pop17_eval_dataset.json'
train_out_path = './pop17-dataset/pop17_train_dataset.json'
test_out_path = './pop17-dataset/pop17_test_dataset.json'
log_dir = './tf_logs/'
checkpoint_dir = './hy-tmp/checkpoints/'

# Train
epoch = 1
batch_size = 2
warmup_steps = 4000
save_n_step = 20
setLogs_n_step = 20
save_test_best_n_step = 500
test_start_step = 200
reset_states_n_step = 20

# [Family Bar/position Pitch Velocity Duration Chord Rest Tempo]
token_range = [2, 34, 89, 33, 65, 18, 10, 33]
emb_sizes = [32, 128, 512, 128, 128, 128, 128, 128]
vocab_sizes = [i+2 for i in token_range]
eos_tokens = [i+1 for i in token_range]
eos_tokens = vocab2token([eos_tokens])[0]
random.seed(1)  # for data
config = {
    "vocab_sizes": vocab_sizes,
    "emb_sizes": emb_sizes,
    "d_model": 256,
    "dff": 1024,
    "num_layers": 6,
    "num_heads": 4,
    "dropout_rate": 0.1,
    "length": 1024,
    "rpr": True,
    "dataset": f'{database_name}_{train_seq_length}',
}


## 初始化模型

In [5]:
mt = model().getLinearTransformerXL(config, log_dir, checkpoint_dir)


# 查看模型

In [6]:
inp = tf.constant([[[1, 1, 1, 1, 1, 1, 1, 1]]], tf.int64)
r, w = mt(inp, True, inp)
mt.summary()


Model: "linear_transformer_xl"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 encoder_layer (EncoderLayer  multiple                 855296    
 )                                                               
                                                                 
 encoder_layer_1 (EncoderLay  multiple                 855296    
 er)                                                             
                                                                 
 encoder_layer_2 (EncoderLay  multiple                 855296    
 er)                                                             
                                                                 
 encoder_layer_3 (EncoderLay  multiple                 855296    
 er)                                                             
                                                                 
 encoder_layer_4 (EncoderLay  multiple       

## 存放 run state

In [7]:
train_cache_dir = f'{mt.log_dir}/train_process.json'
train_process_json = Train_process_json(train_cache_dir)


## 设置训练信息

In [8]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=warmup_steps):
        super(CustomSchedule, self).__init__()

        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)
        self.warmup_steps = warmup_steps
        self.step = 0
        self.lr = 0

    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)
        lr = tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

        self.step = step
        self.lr = lr
        return lr


learning_rate = CustomSchedule(config['d_model'])
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)


def loss_function(real, pred):
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction='none')
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_sum(loss_)/tf.reduce_sum(mask)


def accuracy_function(real, pred):
    accuracies = tf.equal(real, tf.argmax(pred, axis=2))

    mask = tf.math.logical_not(tf.math.equal(real, 0))
    accuracies = tf.math.logical_and(mask, accuracies)

    accuracies = tf.cast(accuracies, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)
    return tf.reduce_sum(accuracies)/tf.reduce_sum(mask)


In [9]:
mt.train_setup(loss_function, accuracy_function,
               optimizer)


## 读取训练状态


In [10]:
best_acc = train_process_json.get()['best_acc']
best_loss = train_process_json.get()['best_loss']
current_step = train_process_json.get()['step']


## 配置tf-logs

In [11]:
stat_names = ['Family', 'Position', 'Pitch',
              'Velocity', 'Duration', 'Chord', 'Rest', 'Tempo', 'Total']
train_losses = [tf.keras.metrics.Mean(
    name=f'{n}_loss') for n in stat_names]
train_accuracy = [tf.keras.metrics.Mean(
    name=f'{n}_accuracy') for n in stat_names]

eval_losses = [tf.keras.metrics.Mean(
    name=f'{n}_loss') for n in stat_names]
eval_accuracy = [tf.keras.metrics.Mean(
    name=f'{n}_accuracy') for n in stat_names]

train_summary_writer = tf.summary.create_file_writer(f'{mt.log_dir}/train_dir')
eval_summary_writer = tf.summary.create_file_writer(f'{mt.log_dir}/eval_dir')
test_summary_writer = tf.summary.create_file_writer(f'{mt.log_dir}/test_dir')


def setLogs():
    step = current_step
    with train_summary_writer.as_default():
        for l in train_losses:
            tf.summary.scalar(l.name, l.result(), step=step)
        for a in train_accuracy:
            tf.summary.scalar(a.name, a.result(), step=step)
    with eval_summary_writer.as_default():
        for l in eval_losses:
            tf.summary.scalar(l.name, l.result(), step=step)
        for a in eval_accuracy:
            tf.summary.scalar(a.name, a.result(), step=step)


def reset_states():
    for t in train_losses:
        t.reset_states()
    for t in train_accuracy:
        t.reset_states()
    for t in eval_losses:
        t.reset_states()
    for t in eval_accuracy:
        t.reset_states()


## 定义 ckpt 保存函数

In [12]:
def save_lastest_ckpt():
    train_process_json.set(current_step, best_acc, best_loss)
    mt.save_weights(
        f'{mt.checkpoint_dir}/lastest-ckpt', overwrite=True, save_format="tf")


def save_test_best_ckpt(loss, acc):
    global best_acc
    global best_loss

    if(acc > best_acc):
        best_acc = acc
        train_process_json.set(current_step, best_acc, best_loss)
        mt.save_weights(
            f'{mt.checkpoint_dir}/best_acc-ckpt{current_step}', overwrite=True, save_format="tf")

    if(loss < best_loss):
        train_process_json.set(current_step, best_acc, best_loss)
        best_loss = loss
        mt.save_weights(
            f'{mt.checkpoint_dir}/best_loss-ckpt{current_step}', overwrite=True, save_format="tf")

    mt.save_weights(f'{mt.checkpoint_dir}/ckpt{current_step}',
                    overwrite=True, save_format="tf")

    with test_summary_writer.as_default():
        tf.summary.scalar('test_loss', loss, step=current_step)
        tf.summary.scalar('test_acc', acc, step=current_step)


## 开始训练

In [13]:
for e in range(epoch):

    eval_ds = CP_Word_Dataset(eval_out_path, train_seq_length, eos_tokens)
    print('eval_ds length', eval_ds.total_seq)

    train_ds = CP_Word_Dataset(train_out_path, train_seq_length, eos_tokens)

    print('train_ds length', train_ds.total_seq)

    test_ds = CP_Word_Dataset(test_out_path, train_seq_length, eos_tokens)

    print('test_ds length', test_ds.total_seq)

    train_step_per_epoch = math.ceil(train_ds.total_seq / batch_size)
    eval_step_per_epoch = math.ceil(eval_ds.total_seq / batch_size)

    current_epoch = math.ceil(current_step / train_step_per_epoch)

    train_data_range_idxs = random.sample(
        range(train_ds.total_seq), train_ds.total_seq)

    eval_data_range_idxs = random.sample(
        range(eval_ds.total_seq), eval_ds.total_seq)

    with tqdm(total=train_step_per_epoch) as bar:
        idx = 0
        train_seq_start_idx = current_step % train_step_per_epoch * batch_size

        for batch_idxs in batch(train_data_range_idxs[train_seq_start_idx:], batch_size):

            # train
            seqs = train_ds.get_seqs(batch_idxs)  # (batch_size, length, 6)
            losses, acc = mt.train_step(np.array(seqs))
            current_step += 1
            current_lr = learning_rate(tf.cast(current_step, tf.float32))

            for i in range(len(losses)):
                train_losses[i](losses[i].numpy())
                train_accuracy[i](acc[i])
            train_losses[-1](np.sum([l.numpy()
                             for l in losses]) / len(losses))  # 均值
            train_accuracy[-1](np.sum(acc) / len(losses))

            # eval
            e_start_idx = current_step % eval_step_per_epoch * batch_size
            eval_batch_idxs = eval_data_range_idxs[e_start_idx: e_start_idx + batch_size]

            eval_seqs = eval_ds.get_seqs(eval_batch_idxs)
            e_losses, e_acc = mt.eval(eval_seqs)

            for i in range(len(losses)):
                eval_losses[i](e_losses[i].numpy())
                eval_accuracy[i](e_acc[i])

            eval_losses[-1](np.sum([l.numpy()
                            for l in e_losses]) / len(e_losses))  # 均值
            eval_accuracy[-1](np.sum(e_acc) / len(e_losses))

            if idx % setLogs_n_step == 0:
                setLogs()

            if idx % save_n_step == 0:
                save_lastest_ckpt()

            # if idx % save_eval_best_n_step == 0:
            #     save_eval_best_ckpt()

            if current_step % save_test_best_n_step == 0 and current_step > test_start_step:
                loss, acc = mt.test(test_ds, batch_size)
                save_test_best_ckpt(loss, acc)

            # [Family Bar/position Pitch Velocity Duration Tempo]
            bar.set_description(
                f'Epoch/step {current_epoch}/{current_step}: lr {float(current_lr):.6}  Loss/eval/best {float(train_losses[-1].result()):.6}/{float(eval_losses[-1].result()):.6}/{best_loss:.6}  Acc/eval/best {float(train_accuracy[-1].result()):.6}/{float(eval_accuracy[-1].result()):.6}/{best_acc:.6}'
            )
            if idx == 0:
                bar.update(current_step % train_step_per_epoch)
            bar.update(1)

            if idx % reset_states_n_step == 0:
                reset_states()
            idx += 1


eval_ds length 9107


KeyboardInterrupt: 

## 生成 MIDI

In [14]:
from dataset import get_tokenizer
import datetime

tokenizer = get_tokenizer()
seed_length = 10
length = 500
temperature = [0.05, 0.01, 1, 1, 1.1, 0.01, 1, 0.01]
nucleus_p = [False, 0.9, 0.9, 0.9, 0.95, 0.9, 0.9, 0.9]
if_end = False  # Fasle 为屏蔽 EOS Token
seqs = np.array(eval_ds.get_seqs([0]))
inp = seqs[:batch_size, :seed_length]

gen = mt.generate(np.array(inp), int(length), temperature, nucleus_p, if_end)
tokens = [vocab2token(g) for g in gen]
current_time = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')

idx = 0
for midi_token in tokens:
    converted_back_midi = tokenizer.tokens_to_midi(
        np.array([midi_token]), [(0, False)])
    converted_back_midi.dump(
        f'./gen_midi/{idx}-{temperature}-{nucleus_p}-{length}.{current_time}.midi')
    idx += 1


ImportError: cannot import name 'Tokenizer' from 'dataset' (/Users/night/code/netpi/github/compound-word-transformer-tensorflow/dataset.py)