# pix2pixを用いた手法

In [1]:
import os
import random
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

from sklearn.metrics import mean_squared_error as mse

- ノイズ付加関数

In [2]:
def add_noise(raw):
    noise = np.array([(-2*v)*np.random.rand()+v for v in raw.flatten()]).reshape(raw.shape[0],raw.shape[1])
    noised_raw = noise + raw
    return noised_raw

- データ読み込み

In [3]:
val = 80
def load_batch(n_batches=1,is_testing=False):

    base_path = '../input/raw/'
    
    if not is_testing:
        path = sorted(os.listdir(base_path))[:val]
    else:
        path = sorted(os.listdir(base_path))[val:]
    
    # x_data:ノイズありmel, y_data:ノイズなしmel
    x_data, y_data = [], []
    for _ in range(n_batches):
        for mel_path in path:
            raw = np.load(base_path + mel_path)
            
            for i in range(0,raw.shape[0]-mel_shape[0],3):
                for j in range(0,raw.shape[1]-mel_shape[1],3):
                    y = raw[i:i+mel_shape[0],j:j+mel_shape[1]]
                    x = add_noise(y)
                    
                    x = x.reshape(mel_shape[0],mel_shape[1],1)
                    y = y.reshape(mel_shape[0],mel_shape[1],1)
                    
                    x_data.append(x)
                    y_data.append(y)

    return  np.array(x_data), np.array(y_data)

- モデル読み込み

In [4]:
import numpy as np
from tensorflow.keras.models import *
from tensorflow.keras.layers import  Input,Conv2D,BatchNormalization,Activation,Lambda,Subtract

def create_dncnn():
    inpt = Input(shape=(mel_shape[0],mel_shape[1],1))
    # 1st layer, Conv+relu
    x = Conv2D(filters=64, kernel_size=(3,3), strides=(1,1), padding='same')(inpt)
    x = Activation('relu')(x)
    # 15 layers, Conv+BN+relu
    for i in range(15):
        x = Conv2D(filters=64, kernel_size=(3,3), strides=(1,1), padding='same')(x)
        x = BatchNormalization(axis=-1, epsilon=1e-3)(x)
        x = Activation('relu')(x)   
    # last layer, Conv
    x = Conv2D(filters=1, kernel_size=(3,3), strides=(1,1), padding='same')(x)
    x = Subtract()([inpt, x])   # input - noise
    model = Model(inputs=inpt, outputs=x)
    
    return model

In [5]:
mel_shape = (32,32) # input_shape

In [6]:
x_train, y_train = load_batch(n_batches=1,is_testing=False)
x_test, y_test = load_batch(n_batches=1,is_testing=True)

In [7]:
generator = create_dncnn()
generator.compile(optimizer='adam',loss='mse')

In [8]:
# generatorは事前学習させる
generator.fit(x_train,y_train,epochs=3,validation_data=(x_test,y_test))

Train on 153472 samples, validate on 41184 samples
Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x7f32351d6f28>

- discriminator

In [9]:
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Dropout, Concatenate
from tensorflow.keras.layers import BatchNormalization, Activation, ZeroPadding2D
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import UpSampling2D, Conv2D
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam,RMSprop
import tensorflow.keras.backend as K


def create_discriminator():

    model = Sequential()

    model.add(Conv2D(16, kernel_size=3, strides=2, input_shape=((mel_shape[0],mel_shape[1],1)), padding="same"))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Conv2D(32, kernel_size=3, strides=2, padding="same"))
    model.add(ZeroPadding2D(padding=((0,1),(0,1))))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Conv2D(64, kernel_size=3, strides=2, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Conv2D(128, kernel_size=3, strides=1, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(1))

    img = Input(shape=(mel_shape[0],mel_shape[1],1))
    validity = model(img)

    return Model(img, validity)

optimizer = Adam(0.0002, 0.5)
discriminator = create_discriminator()
discriminator.compile(loss='mse',
            optimizer=optimizer,
            metrics=['accuracy'])

- combined

In [11]:
mel_noise = Input(shape=(mel_shape[0],mel_shape[1],1))

mel_fake = generator(mel_noise)

discriminator.trainable = False

valid = discriminator(mel_fake)

combined = Model(inputs=mel_noise, outputs=[valid,mel_fake])
optimizer = Adam(0.0002, 0.5)
combined.compile(loss=['mse','mse'],
                 loss_weights=[1, 100],
                 optimizer=optimizer)

- 学習

In [30]:
def load_data():
    
    base_path = '../input/raw/'
    path = os.listdir(base_path)[:val]

    for n in range(len(path)):
        batch = path[n*len(path):(n+1)*len(path)]
        for mel_path in batch:
            
            x_data, y_data = [], []
            mel = np.load(base_path + mel_path)

            for i in range(0,mel.shape[0]-mel_shape[0],5):
                for j in range(0,mel.shape[1]-mel_shape[1],5):
                    y = mel[i:i+mel_shape[0],j:j+mel_shape[1]]
                    x = add_noise(y)
                    
                    x = x.reshape(mel_shape[0],mel_shape[1],1)
                    y = y.reshape(mel_shape[0],mel_shape[1],1)
                    
                    x_data.append(x)
                    y_data.append(y)
                
            yield np.array(x_data), np.array(y_data)

In [26]:
epochs=50

for epoch in range(epochs):
    d_lo = []
    d_ac = []
    g_lo = []
    for batch_i, (x_train, y_train) in enumerate(load_data()):

        # ---------------------
        #  Train Discriminator
        # ---------------------

        # Condition on B and generate a translated version
        mels_fake = generator.predict(x_train)
        
        # Adversarial loss ground truths
        valid = np.ones((len(x_train), 1))
        fake = np.zeros((len(x_train), 1))

        # Train the discriminators (original images = real / generated = Fake)
        d_loss_real = discriminator.train_on_batch(y_train, valid)
        d_loss_fake = discriminator.train_on_batch(mels_fake, fake)
        d_loss = 0.5 * np.add(d_loss_fake, d_loss_real)

        # -----------------
        #  Train Generator
        # -----------------

        # Train the generators
        g_loss = combined.train_on_batch(x_train, [valid,y_train])
        
        d_lo.append(d_loss[0])
        d_ac.append(d_loss[1])
        g_lo.append(g_loss)
        

    # Plot the progress
    if epoch % 5 == 0:
        print ("epoch:%d [D loss: %f, acc: %f] [G loss: %f]" % 
               (epoch,  np.mean(d_lo),100*np.mean(d_ac),np.mean(g_lo)))

- noisedデータにモデルを適用

In [41]:
def fitting(noised_tgt):
    vector_dict = {}
    for i in range(noised_tgt.shape[0]):
        for j in range(noised_tgt.shape[1]):
            vector_dict[(i,j)] = []

    x_test = []
    for i in range(0,noised_tgt.shape[0]-mel_shape[0],5):
        for j in range(0,noised_tgt.shape[1]-mel_shape[1],5):
            v = noised_tgt[i:i+mel_shape[0],j:j+mel_shape[1]].reshape(mel_shape[0],mel_shape[1],1)
            x_test.append(v)

    x_test = np.array(x_test)
    v_noise = generator.predict(x_test).reshape(-1,mel_shape[0],mel_shape[1])
    c = 0
    for i in range(0,noised_tgt.shape[0]-mel_shape[0],5):
        for j in range(0,noised_tgt.shape[1]-mel_shape[1],5):
            for i2 in range(v_noise[0].shape[0]):
                for j2 in range(v_noise[0].shape[1]):
                    vector_dict[(i+i2,j+j2)].append(v_noise[c][i2][j2])
            c += 1
            
    pred = np.copy(noised_tgt)
    for i in range(noised_tgt.shape[0]):
        for j in range(noised_tgt.shape[1]):
            if len(vector_dict[(i,j)]) > 0:
                value = np.mean(vector_dict[(i,j)])
                pred[i][j] = value

    return pred

- val_dataに対して実行

In [42]:
base_path = '../input/raw/'
path = sorted(os.listdir(base_path))[val:]

targets = []
noises = []
preds = []

for mel_path in path:
    raw = np.load(base_path + mel_path)
    noised_raw = add_noise(raw)
    pred = fitting(noised_raw)
    
    targets.append(raw)
    noises.append(noised_raw)
    preds.append(pred)
    
    
print('before:',np.mean([mse(targets[i].flatten(),noises[i].flatten()) for i in range(len(targets))]))
print('after :',np.mean([mse(targets[i].flatten(),preds[i].flatten()) for i in range(len(targets))]))

before: 4.279852637887082
after : 0.5917038743068908


- submit_dataに対して実行

In [43]:
name = '003_pix2pix'
path = '../output/unzip_data/%s/' % (name)

os.makedirs(path, exist_ok=True)
noised_path = '../input/noised_tgt/'

for p in sorted(os.listdir(noised_path)):
    noised_tgt = np.load(noised_path+p)
    clean_tgt = fitting(noised_tgt)
                    
    np.save(path+p[7:],clean_tgt)

- dataをzip化

In [44]:
import shutil
base = '../output/'
unzip_path = base+'unzip_data/'+name
zip_path = base+'zip_data/'+name
shutil.make_archive(zip_path,"zip",root_dir = unzip_path)

'/home/share/BandaiNamco-DSChallenge/output/zip_data/003_pix2pix.zip'