In [None]:
from music21 import converter, instrument, note, chord, stream

def get_notes(song_path,song_names):
    """获得midi音乐文件中的音符

    :param song_path: [文件的保存地址]
    :type song_path: [str]
    :param song_names: [所有音乐文件的文件名]
    :type song_names: [list]
    :return: [所有符合要求的音符]
    :rtype: [list]
    """
    all_notes = []
    for song_name in song_names:
        stream = converter.parse(song_path+song_name)
        instru = instrument.partitionByInstrument(stream)
        if instru:  # 如果有乐器部分，取第一个乐器部分
            notes = instru.parts[0].recurse()
        else:  #如果没有乐器部分，直接取note
            notes = stream.flat.notes
        for element in notes:
            # 如果是 Note 类型，取音调
            # 如果是 Chord 类型，取音调的序号,存int类型比较容易处理
            if isinstance(element, note.Note):
                all_notes.append(str(element.pitch))
            elif isinstance(element, chord.Chord):
                all_notes.append('.'.join(str(n) for n in element.normalOrder))
    return all_notes

In [None]:
def save_data(filename,content):
    """保存音符

    :param filename: [保存的文件名]
    :type filename: [str]
    :param content: [内容]
    :type content: [list]]
    """
    with open(filename,"w") as f:
        for data in content:
            f.write(str(data)+"\n")

def get_data(filename):
    """从文件中获取音符

    :param filename: [文件名]
    :type filename: [str]
    :return: [返回音符]
    :rtype: [list]
    """
    with open(filename) as f:
       all_notes = f.readlines()
    return [ note[:len(note)-1]  for note in all_notes]

In [None]:
import os
song_path = "./midi_songs/"
song_names = os.listdir(song_path)

In [None]:
# 获取note数组
all_notes = get_notes(song_path,song_names)
# 保存文件
save_data("data.txt",all_notes)

In [None]:
from collections import Counter
# 对出现过的note进行统计
counter = Counter(all_notes)
# 根据出现的次数，进行从大到小的排序
note_count = sorted(counter.items(),key=lambda x : -x[1])
notes,_ = zip(*note_count)
# 产生note到id的映射
note_to_id = {note:id for id,note in enumerate(notes)}

In [None]:
print(list(note_to_id.items())[:20])

In [None]:
X_train = []
Y_train = []
sequence_batch = 100
for i in range(len(all_notes)-sequence_batch):
    X_pre = all_notes[i:i+sequence_batch]
    Y_pre = all_notes[i+sequence_batch]
    X_train.append([note_to_id[note] for note in X_pre])
    Y_train.append(note_to_id[Y_pre])

In [None]:
print(X_train[:2])

In [None]:
from keras.utils import to_categorical
X_one_hot = to_categorical(X_train)
Y_one_hot = to_categorical(Y_train)

In [None]:
print("X_one_hot shape is:",X_one_hot.shape)
print("Y_one_hot shape is:",Y_one_hot.shape)

In [None]:
import keras
from keras.callbacks import ModelCheckpoint
from keras.models import Input, Model
from keras.layers import  Dropout, Dense,LSTM 
from keras.optimizers import Adam
from keras.utils import plot_model
# X_one_hot.shape[1:] = (100, 308)
input_tensor = Input(shape=X_one_hot.shape[1:])
lstm = LSTM(512,return_sequences=True)(input_tensor)
dropout = Dropout(0.3)(lstm)

lstm = LSTM(256)(dropout)
dropout = Dropout(0.3)(lstm)
# Y_one_hot.shape[-1] = 308
dense = Dense(Y_one_hot.shape[-1], activation='softmax')(dropout)

model = Model(inputs=input_tensor, outputs=dense)
# 画图
# plot_model(model, to_file='model.png', show_shapes=True, expand_nested=True, dpi=500)
optimizer = Adam(lr=0.001)
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
model.summary()

In [None]:
filepath = "./{epoch}--weights{loss:.2f}.hdf5"
checkpoint = ModelCheckpoint(
    filepath,
    monitor='loss',  # 监控的对象是loss
    verbose=0,
    save_best_only=True,
    mode='min'  # 如果监控对象是val_acc则取max，是loss则取min
)
callbacks_list = [checkpoint]
model.fit(X_one_hot, Y_one_hot, epochs=100, batch_size=2048,callbacks=callbacks_list)