In [None]:
import librosa
import librosa.display
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import matplotlib.pyplot as plt
from scipy.io.wavfile import write
import soundfile as sf
import random
import os, sys
import subprocess
#subprocess.run("pip install pydub", shell = True)
import pydub
from pydub import AudioSegment

In [None]:
# 問題作成ツール
# 無音wavファイル: http://hitotose-switch.blogspot.com/2013/12/wav.html
class Question:
    SR = 48000 # サンプリング周波数
    
    # サンプリング値を秒に変換
    # こちらはサンプリング値でずらしたいが、Pydubは秒を欲しがる
    def rate2sec(rate):
        return rate / Question.SR
    
    # カット 頭と末尾のデータを刈り取る
    # 頭と末尾何秒「切り取るか?」を指定
    def cut_wave(x, head=0, tail=0):
        head = Question.rate2sec(head) * 1000
        tail = Question.rate2sec(tail) * 1000
        x = x[head:x.duration_seconds * 1000 - tail]
        return x
    
    
    # 2つの波形を合成する(ループで呼び出して複数まぜる、全ての波形を合成)
    # カットをしてからここを呼び出す
    def mix_wave(mix, x, skip=0):
        skip = Question.rate2sec(skip)
        mix  = mix.overlay(x, skip * 1000)
        return mix
    
    # 128個の問題データを作る
    # "wave_file/"ディレクトリに作成します。ディレクトリを作っておいてください。
    # "muon_10.sec.wav"はカレントディレクトリに
    # サンプルデータは"JKspeech/"から引っ張ります
    def create(w):
        for lp in range(0, 128):
            print(lp)
            # 無音データ(重ね合わせ用)
            muon = AudioSegment.from_file("muon_10sec.wav", format="wav")
            muon = muon + muon + muon; #30秒の無音データ
            # 作成
            speech = np.empty(0) # 読みデータ
            offset = np.empty(0) # 開始位置
            mix = muon
            max_time = 0
            num = random.randint(2, 20)
            remain = np.empty(0)
            ej = ["E", "J"]
            for i in range(1, 45):
                remain = np.append(remain, str(i).zfill(2))
            
            #print("合成数: " + str(num))
            for i in range(0, num):
                r_mus = random.randint(0, remain.size - 1)
                r_ej = random.randint(0, 1)
                #path = "/content/drive/MyDrive/JKspeech/" + ej[r_ej] + remain[r_mus] + ".wav" #勝手に変えました(content/drive)
                path = "JKspeech/" + ej[r_ej] + remain[r_mus] + ".wav"
                speech = np.append(speech, ej[r_ej] + remain[r_mus])
                #print(path)
                base_sound = AudioSegment.from_file(path, format="wav")
                #print("初期状態:" + str(base_sound.duration_seconds) + "秒") # カット前の秒数
                head_cut = random.randint(0, int(48000 * base_sound.duration_seconds - 24000)) # 0.5s以上確実に残して頭からカット
                tail_cut = random.randint(0, int(48000 * base_sound.duration_seconds - head_cut - 24000)) # 0.5s以上確実に残して後ろからカット
                base_sound = Question.cut_wave(base_sound, head = head_cut, tail = tail_cut)
                #print("最終状態:" + str(base_sound.duration_seconds) + "秒") # カット後の秒数
                skip_time = random.randint(0, max_time) #飛ばす時間
                offset = np.append(offset, skip_time)
                #print("スキップ時間:" + str(Question.rate2sec(skip_time)) + "秒")
                max_time = max(max_time, skip_time + int(base_sound.duration_seconds * Question.SR))
                #print("合計時間:" + str(Question.rate2sec(max_time)) + "秒")
                mix = Question.mix_wave(mix, base_sound, skip=skip_time) # 周波数単位でずらす
                remain = np.delete(remain, r_mus)
            
            outfile_name = "wave_file/information" + str(lp) + ".txt"
            f = open(outfile_name, mode='w')
            f.write("nspeech: " + str(num) + "\n")
            
            s = "speech: "
            for i in range(0, num):
                s += str(speech[i])
                if (i != num - 1):
                    s += ", "
            f.write(s)
            f.write("\n")
            
            s = "offset: "
            for i in range(0, num):
                s += str(int(offset[i]))
                if (i != num - 1):
                    s += ", "
            f.write(s)
            f.write("\n")
            
            f.close()
            
            #print("最終時間" + str(Question.rate2sec(max_time)) + "秒")
            mix = mix[:int(Question.rate2sec(max_time) * 1000)]
            outfile_name = "wave_file/mix" + str(lp) + ".wav"
            mix.export(outfile_name, format="wav")


In [None]:
# 学習モデル
def conv1x5(in_channels, out_channels, stride=1):
    return nn.Conv1d(
        in_channels,
        out_channels,
        kernel_size = 5,
        stride = stride,
        padding = 2,
        bias = False
    )

def conv1x1(in_channels, out_channels, stride=1):
    return nn.Conv1d(
        in_channels,
        out_channels,
        kernel_size = 1,
        stride = stride,
        bias = False
    )

#ResNet34以下で使うやつ
class BasicBlock(nn.Module):
    expansion = 1
    
    def __init__(self, in_channels, begin_channels, stride = 1):
        super().__init__()
        end_channels = begin_channels * self.expansion
        self.conv1 = conv1x5(in_channels, begin_channels, stride)
        self.bn1 = nn.BatchNorm1d(begin_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv1x5(begin_channels, end_channels)
        self.bn2 = nn.BatchNorm1d(end_channels)
        
        #入力と出力のチャネル数が異なるとき、ダウンサンプリングする
        if in_channels != end_channels:
            self.shortcut = nn.Sequential(
                conv1x1(in_channels, end_channels, stride),
                nn.BatchNorm1d(end_channels)
            )
        else:
            self.shortcut = nn.Sequential()
            
    def forward(self, x):
        y = self.conv1(x)
        y = self.bn1(y)
        y = self.relu(y)
        y = self.conv2(y)
        y = self.bn2(y)
        
        y += self.shortcut(x)
        
        y = self.relu(y)
        
        return y
    
#Resnet50以上で使うやつ
class Bottleneck(nn.Module):
    expansion = 4
    
    def __init__(self, in_channels, begin_channels, stride=1):
        super().__init__()
        end_channels = begin_channels * self.expansion
        self.conv1 = conv1x1(in_channels, begin_channels)
        self.bn1 = nn.BatchNorm1d(begin_channels)
        self.conv2 = conv1x5(begin_channels,begin_channels,stride)
        self.bn2 = nn.BatchNorm1d(begin_channels)
        self.conv3 = conv1x1(begin_channels, end_channels)
        self.bn3 = nn.BatchNorm1d(end_channels)
        self.relu = nn.ReLU(inplace=True)
        
        #入出力のチャネル数が違えばダウンサンプリング（同じとき無くないか？）
        if in_channels != end_channels:
            self.shortcut = nn.Sequential(
                conv1x1(in_channels, end_channels, stride),
                nn.BatchNorm1d(end_channels)
            )
        else:
            self.shortcut = nn.Sequential()
    
    def forward(self, x):
        y = self.conv1(x)
        y = self.bn1(y)
        y = self.relu(y)
        
        y = self.conv2(y)
        y = self.bn2(y)
        y = self.relu(y)
        
        y = self.conv3(y)
        y = self.bn3(y)
        
        y += self.shortcut(x)
        
        y = self.relu(y)
        
        return y


In [None]:
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=2):
        super().__init__()
        
        #24000->8000->2000->500->125->32->1
        #(st) 4->4->4->4->4
        #(pad) 12->2->2->2->2
        #(ker) 25->5->5->5->5
        self.out_channels = 64
        self.conv1 = nn.Conv1d(
            in_channels = 1,
            out_channels = self.out_channels,
            kernel_size = 25,
            stride = 4,
            padding = 12,
            bias = False
        )
        self.bn1 = nn.BatchNorm1d(self.out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool1d(kernel_size=5,stride=2,padding=2)
        self.conv2 = self._make_layer(block,64,layers[0],stride = 4)
        self.conv3 = self._make_layer(block,128,layers[1],stride = 4)
        self.conv4 = self._make_layer(block,256,layers[2],stride = 4)
        self.conv5 = self._make_layer(block,512,layers[3],stride = 4)
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(512 * block.expansion, num_classes)
        
        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(m.weight, mode = "fan_out", nonlinearity = "relu")
            elif isinstance(m, nn.BatchNorm1d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
        
    def _make_layer(self, block, channels, blocks, stride):
        layers = []
        layers.append(block(self.out_channels, channels, stride))
        self.out_channels = channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.out_channels, channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        #x = nn.Softmax(dim=1)(x)
        return x


In [None]:
def resnet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])

def resnet34():
    return ResNet(BasicBlock, [3, 4, 6, 3])

def resnet50():
    return ResNet(Bottleneck, [3, 4, 6, 3])

def resnet101():
    return ResNet(Bottleneck, [3, 4, 23, 3])

def resnet152():
    return ResNet(Bottleneck, [3, 8, 36, 3])

In [None]:
model = resnet50()

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
model = model.to(device)

In [None]:
#学習に使うメソッド
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [None]:
# 問題作成
question = Question()
question.create()

In [None]:
#波形データを得る
file_path = "wave_file/"
wave_name = np.array(["mix" + str(i) + ".wav" for i in range(0, 128)])

#波形の長さが異なるのでどうにかしないといけない
#学習では0.5秒に切り抜く（ランダムに）
#本番はそのままの音声を受け取ってCNNに入れるときに分割する
X = np.empty(0)
FS = np.empty(0)
for name in wave_name:
    x, fs = librosa.load(file_path + name, sr=48000)
    rstart = random.randint(0, len(x)-24000)
    X = np.append(X, x[rstart:rstart+24000])
    FS = np.append(FS, fs)
X = X.reshape([-1,24000])
#print(X)
#print(FS)

In [None]:
#各モデルに対して学習
Target = [] # E01からJ44を作成
EJ = ["E", "J"]
for ej in EJ:
    for i in range(1, 45):
        Target.append(ej + str(i).zfill(2))

for target in Target:
    #モデルの読み込み
    #モデルを一度保存してないとエラーします（とりあえずはここのコード無視してね）
    #GPU
    if torch.cuda.is_available():
        model_path = 'model/model_' + target + '.pth'
        model.load_state_dict(torch.load(model_path))
    #CPU
    else:
        model_path = 'model/model_cpu_' + target + '.pth'
        model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
    
    #狙いの音声が使われているかを取る
    file_path = "wave_file/"
    label_name = np.array(["information" + str(i) + ".txt" for i in range(0,128)])
    
    Y = np.empty(0)
    for name in label_name:
        f = open(file_path + name, 'r')
        s = f.read()
        #答えは[無い率,有る率]とする
        if target in s:
            Y = np.append(Y, 1)
        else:
            Y = np.append(Y, 0)
    #Y = Y.reshape([-1,1])
    #print(Y)
    
    #データセット作成
    #参考：https://dreamer-uma.com/pytorch-dataset/
    tensorX = torch.tensor(X, dtype=torch.float32)
    tensorY = torch.tensor(Y, dtype=torch.int64)
    Dataset = torch.utils.data.TensorDataset(tensorX, tensorY)
    
    #データローダー作成
    trainloader = torch.utils.data.DataLoader(dataset=Dataset,
                                              batch_size = 32,
                                              shuffle = True,
                                              num_workers = 0)
    
    #学習
    #参考 https://qiita.com/mathlive/items/8e1f9a8467fff8dfd03c
    train_loss_value = []
    train_acc_value = []
    #test_loss_value = []
    #test_acc_value = []
    
    BATCH_SIZE = 32
    EPOCH = 3
    for epoch in range(EPOCH):
        print('epoch', epoch+1)
        sum_loss = 0.0
        sum_correct = 0
        sum_total = 0
        for (inputs, labels) in trainloader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            #inputsの大きさが24000(0.5秒)じゃないとダメ
            inputs = inputs.unsqueeze(1)
            #print(inputs.shape)
            outputs = model(inputs)
            print(outputs[0])
            print(labels[0])
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            sum_loss += loss.item()
            _, predicted = outputs.max(1)  #出力の最大値の添え字を取得
            sum_total += labels.size(0)  #labelの数を足していくことでデータの総和を得る
            sum_correct += (predicted == labels).sum().item()
        
        mean_loss = sum_loss*BATCH_SIZE/len(trainloader.dataset)
        mean_accuracy = float(sum_correct/sum_total)
        print("train mean loss={}, accuracy={}".format(mean_loss, mean_accuracy))
        train_loss_value.append(mean_loss)
        train_acc_value.append(mean_accuracy)
        """
        sum_loss = 0.0
        sum_correct = 0
        sum_total = 0
        #テスト
        for (inputs, labels) in testloader:
            #inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            sum_loss += loss.item()
            _, predicted = outputs.max(1)  #出力の最大値の添え字を取得
            sum_total += labels.size(0)  #labelの数を足していくことでデータの総和を得る
            sum_correct += (predicted == labels).sum().item()
            
        mean_loss = sum_loss*BATCH_SIZE/len(trainloader.dataset)
        mean_accuracy = float(sum_correct/sum_total)
        print("test mean loss={}, accuracy={}".format(mean_loss, mean_accuracy))
        test_loss_value.append(mean_loss)
        test_acc_value.append(mean_accuracy)
        """
    
    #GPUで保存
    if torch.cuda.is_available():
        model_path = 'model/model_' + target + '.pth'
        torch.save(model.state_dict(), model_path)
    #CPUで保存
    else:
        model_path = 'model/model_cpu_' + target + '.pth'
        torch.save(model.to('cpu').state_dict(), model_path)
    """
    #学習ごとのlossの変化を表示
    plt.figure(figsize=(6,6))
    
    plt.plot(range(EPOCH), train_loss_value)
    #plt.plot(range(EPOCH), test_loss_value, c='#00ff00')
    plt.xlim(0, EPOCH)
    plt.ylim(0, 2.5)
    plt.xlabel('EPOCH')
    plt.ylabel('LOSS')
    plt.legend(['train loss', 'test loss'])
    plt.title('loss')
    
    #学習ごとのaccuracyの変化を表示
    plt.plot(range(EPOCH), train_acc_value)
    #plt.plot(range(EPOCH), test_acc_value, c='#00ff00')
    plt.xlim(0, EPOCH)
    plt.ylim(0, 1)
    plt.xlabel('EPOCH')
    plt.ylabel('ACCURACY')
    plt.legend(['train acc', 'test acc'])
    plt.title('accuracy')
    """


In [None]:
#from google.colab import drive
#drive.mount('/content/drive')