In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install music21
!pip install matplotlib
!pip install pretty_midi
from music21 import converter
!pip install MIDIUtil
#MIDIにする行列の設定は上のセル
from midiutil import MIDIFile

In [None]:
### U-net
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import cv2
from PIL import Image
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader
from torch.utils.data import Dataset as BaseDataset
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
from torchvision.transforms import functional
!pip install -U segmentation-models-pytorch
import segmentation_models_pytorch as smp

In [None]:
piano_list=np.load('/content/drive/MyDrive/music21/data/piano_list_128_16_2850.npy')
quartet_list=np.load('/content/drive/MyDrive/music21/data/quartet_list_128_16_5700.npy')
print(len(piano_list))
print(len(quartet_list))

In [None]:
plt.imshow(piano_list[5],cmap='gray')
plt.show

### データセットの読み込み

In [None]:
#128,16の場合
train_df = [piano_list[0:1200],quartet_list[0:2400]] #半    データ数
val_df =[piano_list[1201:1800],quartet_list[2401:3600]]  #1/4
test_df =[piano_list[1801:2400],quartet_list[3601:4800]]  #1/4

class Dataset(BaseDataset):  #importしたBaseDatasetクラスを継承してDatasetというクラスを作成
  def __init__(
      self,
      dataset_list,
      transform = None,
      classes = None,
      augmentation = None
      ):
    self.imgpath_list = dataset_list[0]   #ここにカットしたピアノ譜の小節のリストを入れる、1439ではなくu1438にして、lenメソッドで2で割って曲の数を求めるため　　元の場合は奇数だったため　元のコードは[:-1]
    self.labelpath_list = dataset_list[1] #四重奏譜の小節のリストが入る   listはclass Datasetの引数に与えられる   train_df、val_df,test_dfがlistになりそれぞれ2つの要素を持つのでlist[0],list[1]

  #get itemのpermuteの確認は大学アカウントのノートブックの行列化の詳細
  def __getitem__(self, i):
    img_temporal_getitem=[]
    #元のはパスから読み込みだがこの場合は行列がすでにあるので途中から
    for k in range(2):  #ラベルは4パート
          img_index=2*i+k   #i番目の曲のj個目など　　imgpath_list=train_df[0]なのでさらに[0]などとしなくて良い
          img = self.imgpath_list[img_index]   #train_df[0]ピアノ譜の行列リストがp1,p2,p1,p2の順で格納されている、それのp1の一小節目
          img = torch.from_numpy(img.astype(np.float32)).clone()  #画像をNumPy配列からPyTorchのテンソルに変換し、データタイプを float32 に変更
          img_temporal_getitem.append(img)  #imgの形状は(128,16)
    #(128,16)のテンソルを2つ結合
    img_stack=torch.stack([img_temporal_getitem[0],img_temporal_getitem[1]],dim=2)  #class Datasetでテンソルを2つ結合したものと4つ結合したものを対応させていてこれはチャンネル方向の結合
    img_tensor=img_stack.permute(2,0,1)
    label_temporal_getitem=[]
    for j in range(4):  #ラベルは4パート
        label_index=4*i+j   #i番目の曲のj個目など
        label=self.labelpath_list[label_index]
        label = torch.from_numpy(label.astype(np.float32)).clone()  #ラベルテンソルのデータタイプを float32 に変更
        label = label.to(torch.float32)  #ラベルテンソルのデータタイプを float32 に変更
        label_temporal_getitem.append(label)  #ここでlabel_temporalgetitem[0]はtensor(128,16,4)

    #label_temporalにはtrain_df[1]の全ての行列がテンソル型で格納されている、それぞれtensor(128,16,4)でtrain_df[1]の長さ2878ある     [i]で良いのかは微妙 iはデータセットの長さだが、2878ある
    #(128,16)のテンソルを4つ結合
    label_stack=torch.stack([label_temporal_getitem[0],label_temporal_getitem[1],label_temporal_getitem[2],label_temporal_getitem[3]],dim=2)
    label_tensor=label_stack.permute(2, 0, 1)  #テンソルの次元を変更します。これにより、ラベルテンソルもPyTorchが期待する形式（(チャンネル, 高さ, 幅)）になる
    data = {"img": img_tensor, "label": label_tensor}
    return data

  def __len__(self):
    return len(self.imgpath_list)//2  #for i, data in enumerate(train_loader): のループ内で i は__len__の返り値をデータセットの長さとして動く
                                      #4*iのようにiは曲数になるためimgpathlist(ピアノ譜p1,p2を合わせたリストの長さ)を２で割る   //は切り捨て除算
len(train_df[0])

In [None]:
BATCH_SIZE = 32
train_dataset = Dataset(train_df)
train_loader = DataLoader(train_dataset,
                          batch_size=BATCH_SIZE,
                          num_workers=4,
                          shuffle=True)
val_dataset = Dataset(val_df)
val_loader = DataLoader(val_dataset,
                          batch_size=BATCH_SIZE,
                          num_workers=4,
                          shuffle=False)
test_dataset = Dataset(test_df)
test_loader = DataLoader(test_dataset,
                          batch_size=1,
                          num_workers=4)

### モデル

In [None]:
class TwoConvBlock(nn.Module):
    def __init__(self, in_channels, middle_channels, out_channels):  #in_channelsは入力チャンネル数、middle_channelsは中間チャンネル数、out_channelsは出力チャンネル数
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, middle_channels, kernel_size = 3, padding="same") #padding="same"で入出力のテンソルの形状を一致させる、strideはデフォルトで1
        self.bn1 = nn.BatchNorm2d(middle_channels)
        self.rl = nn.SiLU()  #活性化関数
        self.conv2 = nn.Conv2d(middle_channels, out_channels, kernel_size = 3, padding="same")
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.residual_conv=nn.Conv2d(in_channels,out_channels,kernel_size=1,bias=False)#1*1の畳み込みで形状を変えずにチャンネル数を増やす　特徴量次元数を畳み込み後のxとresidueで揃える

    def forward(self, x):
        #residue=self.residual_conv(x)     #x=(32,2,128,16)
        x = self.conv1(x)  #(32,64,128,16)
        x = self.bn1(x)   #(32,64,128,16)
        x = self.rl(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.rl(x)
        return x

class UpConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.up = nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size = 2, padding="same")
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        x = self.up(x)
        x = self.bn1(x)
        x = self.conv(x)
        x = self.bn2(x)
        return x

#Unetの全層
class UNet_2D(nn.Module):
    def __init__(self):
        super().__init__()
        self.TCB1 = TwoConvBlock(2, 64, 64)  #TwoConvBlockは畳み込み、バッチ正規化、活性化関数を２つ繋げたブロック　入力は2チャンネル、1つ目のconvで出力が64チャンネル、2つ目のconvで64チャンネルになるということ
        self.TCB2 = TwoConvBlock(64, 128, 128)
        self.TCB3 = TwoConvBlock(128, 256, 256)
        self.TCB4 = TwoConvBlock(256, 512, 512)
        self.TCB5 = TwoConvBlock(512, 1024, 1024)
        self.TCB6 = TwoConvBlock(1024, 512, 512)
        self.TCB7 = TwoConvBlock(512, 256, 256)
        self.TCB8 = TwoConvBlock(256, 128, 128)
        self.TCB9 = TwoConvBlock(128, 64, 64)
        self.maxpool = nn.MaxPool2d(2, stride = 2)
        self.UC1 = UpConv(1024, 512)
        self.UC2 = UpConv(512, 256)
        self.UC3 = UpConv(256, 128)
        self.UC4= UpConv(128, 64)
        self.conv1 = nn.Conv2d(64, 4, kernel_size = 1)
        self.dropout=nn.Dropout(0.5)

    def forward(self, x):
        x = self.TCB1(x) #縦横2ずつ小さくなる  (126,14)
        x1 = x
        x = self.maxpool(x) #マックスプーリングでノイズ除去、位置変化に頑健 形状が半分になる  (63,7)
        x = self.TCB2(x)  #(61,5)
        x2 = x
        x = self.maxpool(x) #(30,2)
        x = self.TCB3(x)
        x3 = x
        x = self.maxpool(x)
        x = self.TCB4(x)
        x4 = x                 #512チャンネル
        x = self.maxpool(x)
        x = self.TCB5(x)
        #ここからUnetのデコーダー
        x = self.UC1(x)
        x=self.dropout(x)
        x = torch.cat([x4, x], dim = 1)#Unetのスキップ接続　図の通りコピーを結合
        x = self.TCB6(x)
        x = self.UC2(x)
        x=self.dropout(x)
        x = torch.cat([x3, x], dim = 1)
        x = self.TCB7(x)
        x = self.UC3(x)
        x = torch.cat([x2, x], dim = 1)
        x = self.TCB8(x)
        x = self.UC4(x)
        if x1.size(3) !=x.size(3):  #128の次元が一致していない場合
          x=torch.nn.functional.interpolate(x,size=(x1.size(2),x1.size(3)),mode='bilinear',align_corners=True)
        x = torch.cat([x1, x], dim = 1)
        x = self.TCB9(x)
        x = self.conv1(x)
        return x

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
weights=torch.tensor([100,100,100,100],device=device)
pos_weight=torch.tensor([100],device=device)
unet = UNet_2D().to(device)
optimizer = optim.Adam(unet.parameters(), lr=1e-6)
history = {"train_loss": []} #各エポックごとのtrain_loss
n = 0
m = 0
#初期化
for epoch in range(61):  #trainは1438あり2パートなので約700セット
  train_loss = 0
  val_loss = 0
  unet.train() #unetを訓練モードにする
  print('tpの初期化')
  for i, data in enumerate(train_loader):  #トレーニングデータローダーからバッチごとにデータを取得し、それをループ、trainloderは辞書形式 iは何番目のバッチかを示す
    inputs,labels = data["img"].to(device), data["label"].to(device) #データローダーから入力画像とラベルを取り出し、それらを計算を行うデバイス（CPUまたはGPU）に移動
    optimizer.zero_grad()  #最適化関数の勾配を初期化
    outputs = unet(inputs) #モデルに入力データを与えて、出力を計算
    loss_function=nn.BCEWithLogitsLoss(pos_weight=pos_weight)
    loss=loss_function(outputs,labels)
    #損失関数元のコードのを使う場合
    loss.backward()  #誤差逆伝播法でネットワークの重みに関する損失関数の勾配を求める　どのように各重みを調整すれば損失を減少させることができるかが分かる
    optimizer.step()  #計算された勾配を使用してモデルのパラメータを更新
    train_loss += loss.item()  #現在のバッチにおける損失のスカラー値を返す　train_loss +=は、この損失値をtrain_loss変数に加算　これは、エポック全体を通じて処理されたすべてのバッチの損失を合計するため　エポックの終わりに、この合計損失をバッチの総数で割ることで、エポック全体の平均損失が計算され、モデルの訓練中のパフォーマンスを評価するための指標になる
    history["train_loss"].append(loss.item())  #historyという辞書のtrain_lossというキーにloss.item()を追加
    n += 1

  unet.eval()
  with torch.no_grad():
    tp=0
    fn=0
    fp=0
    tn=0
    total_score=0
    for i, data in enumerate(val_loader):
      inputs, labels = data["img"].to(device), data["label"].to(device)
      outputs = unet(inputs)
      loss_function=nn.BCEWithLogitsLoss(pos_weight=pos_weight)
      loss=loss_function(outputs,labels)
      #softmaxの場合　合計1にするので閾値0.5ではなく各列　各時間に対応　の最大値
      outputs=torch.softmax(outputs,dim=3)
      max_indices = np.argmax(outputs.cpu(), axis=3)
      # 各行の最大値だけ1にして、それ以外を0にする
      new_array = np.zeros_like(outputs.cpu())

      for i in range(outputs.shape[0]):
        for j in range(outputs.shape[1]):
          for k in range(outputs.shape[2]):
              new_array[i,j,k,max_indices[i,j,k]]=1
      outputs=new_array
      val_loss += loss.item()
      m += 1
      if i % (len(val_df[0])//BATCH_SIZE) == len(val_df[0])//BATCH_SIZE - 1:   # バッチの個数でバッチ番号iを割ったものが　バッチの個数-1 になった時　最後のバッチの時
        print(f"epoch:{epoch+1}  index:{i+1}  val_loss:{val_loss/m:.5f}")
        m = 0
        val_loss = 0
        val_acc = 0

    #評価指標の計算 for i,data の外側なのでエポックごとのTPなど tpの初期化はエポックごとではないので全体のtpが求まる
    outputs_eval_list=[]
    print(type(outputs))
    for l in range(outputs.shape[0]):  #バッチサイズ
        for m in range(4):  #チャンネル数
            outputs_eval_list=outputs[l][m].tolist()  #outputsに
            labels_eval_list=labels[l][m].tolist()

            for j in range(16):
              for k in range(128):   #128_64での変更点
                if outputs_eval_list[j][k]==1 and labels_eval_list[j][k]==1:
                  tp+=1
                elif outputs_eval_list[j][k]==1 and labels_eval_list[j][k]==0:
                  fp+=1
                elif outputs_eval_list[j][k]==0 and labels_eval_list[j][k]==1:
                  fn+=1
                elif outputs_eval_list[j][k]==0 and labels_eval_list[j][k]==0:
                  tn+=1
    #評価指標の計算続き
    if tp+fn>0:
      precision=tp/(tp+fn)
    else:
      precision=0
    if tp+fp>0:
      recall=tp/(tp+fp)
    else:
      recall=0
    if 2*tp+fp+fn>0:
      f1_score=2*tp/(2*tp+fp+fn)
    else:
      f1_score=0
    print("epoch",epoch)
    print("precision",precision)
    print("Recall",recall)
    print("F1_Score",f1_score)
    if (precision+recall+f1_score)>total_score:
      total_score=recall+f1_score  #recallとf1scoreが重要なため
      print(epoch,"epoch ","precision",precision,"recall",recall,"f1_score",f1_score)
  torch.save(unet.state_dict(), f"./train_{epoch+1}.pth")#インデントの位置からepochごとに重み情報などをstate_dictに保存
  print("epoch",epoch)
print("finish training")

plt.plot(history["train_loss"]) #曲線のグラフ
plt.xlabel('batch')
plt.ylabel('loss')


### 可視化

In [None]:
fig,ax=plt.subplots(1,2,figsize=(15,8))
ax[0].imshow(data["img"][0,:,:,:][0])   #軸表示は逆だったが最初の行列化の時点で上下は正しくなっているはずなのでここでは変更なし
ax[0].set_title("ピアノ譜右手")
ax[0].axis("off")
ax[1].imshow(data["img"][0,:,:,:][1])
ax[1].set_yticks(range(128,0,-10))
ax[1].set_title("ピアノ譜左手")
ax[1].axis("off")
plt.figure()

classes = ["第1パート","第2パート","第3パート","第4パート"]
fig, ax = plt.subplots(2, 4, figsize=(15,8))

for i in range(2):
  for j, cl in enumerate(classes):   #enumerate(classes)で各要素jとインデックスclを取得
    if i == 0:
      output_score=[]
      pred_transpose=pred[0,:,:,j].to('cpu').detach().numpy().copy()  #predの1チャンネル目0を指定するのはそもそも1チャンネル目は1次元しかないため、テンソルをcpuメモリに移動、.detachで勾配情報を取り除く、numpyにしてコピーを作成
      print(type(pred_transpose))  #ndarray
      print("pred_transpose",pred_transpose.shape)  #(128,16)
      print(pred_transpose[31:73].shape)   #(42,16)

      #音域設定
      if j==0 or j==1:  #第1パート
         pred_transpose=pred_transpose[31:73].T
         pred_transpose_index=(np.argmax(pred_transpose,axis=1))  #axis=1は列方向になるのでaxis=0
         pred_transpose_index+=31

      elif j==2:
         pred_transpose=pred_transpose[43:80].T
         pred_transpose_index=(np.argmax(pred_transpose,axis=1))  #axis=1は列方向になるのでaxis=0
         pred_transpose_index+=43

      elif j==3:
         pred_transpose=pred_transpose[55:92].T
         pred_transpose_index=(np.argmax(pred_transpose,axis=1))  #axis=1は列方向になるのでaxis=0
         pred_transpose_index+=55

      b=np.zeros((16,128))
      for k in range(16):    #全て0にした行列(16,128)の指定したインデックスの場所だけ1にする,b[k]は128個の要素がある1次元配列(128,)は(128,1)とは異なる
        b[k][pred_transpose_index[k]]=1       #(16,128)の行ごとに指定のインデックスの部分を1にする b[k]は各行(128,)
        output_score.append(b[k].tolist())
      #出力の行列をMIDIにする時用いるリスト
      #各リストはそれぞれ予測のパート1~4
      if j ==0:
        output_midi_list=[output_score]
      else:
        output_midi_list.append(output_score)

      output_score=np.array(output_score)  #listからndarray
      output_score=torch.from_numpy(output_score.astype(np.float32)).clone()  #ndarrayからtensorにする
      output_score=output_score.T  #(16,128)から元の形状(128,16)にする
      print("output_scoreの形状",output_score.shape)
      ax[i,j].imshow(output_score)   #iは0,j=1~4   pred[0,全ての行と列,1~4チャンネル]
      ax[i,j].axis("off")
      ax[i,j].set_title(f"pred_{cl}")
    else:
      ax[i,j].imshow(data["label"][0,j,:,:])
      ax[i,j].axis("off")
      ax[i,j].set_title(f"label_{cl}")

### MIDIファイル化

MIDIファイルに変換することで音声として聞くことが可能になる

In [None]:
#ここでMIDIファイルにする行列を設定  output_midi_list[3].T  など
#predはoutput_midi_listに4つ格納されている
a=data["img"][0,:,:,:][0]
a=torch.from_numpy(a.astype(np.float32)).clone()

# バイナリ行列の例
matrix = a
# MIDIファイルを初期化
midi_file = MIDIFile(1)  # トラック数1
track = 0   # トラック0
time = 0    # 開始時間
midi_file.addTrackName(track, time, "Track")
midi_file.addTempo(track, time, 60)  # テンポ120 BPM
# 行列からMIDIノートを生成
for row in range(matrix.shape[0]):
    note = row  # MIDIノートナンバー
    duration = 0  # 音符の持続時間
    for col in range(matrix.shape[1]):
        if matrix[row, col] == 1:
            duration += 1  # 音符の持続時間を延長
        elif duration > 0:
            # 音符の終わりに到達
            midi_file.addNote(track, 0, note, time + col - duration, duration, 100)
            duration = 0
    #行の要素の全てが1の場合、行の最後でもfor文の中のifブロックが実行されるのでelifが実行されず保存がないためforループの外で行の全てが1の場合を保存
    if duration > 0:
         print("activated")
         midi_file.addNote(track, 0, note, time + matrix.shape[1] - duration, duration, 100) #forループ外なのでcolではなくmatrix.shape[1]-1

# MIDIファイルを保存
with open("output.mid", "wb") as output_file:
    midi_file.writeFile(output_file)
    
from google.colab import files
files.download('output.mid')