# 1장 케라스 시작

In [None]:
import keras

print(keras.backend.backend())

In [None]:
import numpy as np

x = np.array([0, 1, 2, 3, 4])
y = x * 2 + 1

In [None]:
import keras

model = keras.models.Sequential()
model.add(keras.layers.Dense(1, input_shape=(1, )))
model.compile(optimizer='SGD', loss='mse')

model.fit(x[:2], y[:2], epochs=1000, verbose=0)

print('Targets:', y[2:])
print('Predictions:', model.predict(x[2:]).flatten())

# 2장 ANN

In [None]:
# 분산 방식 모델링, 함수형 구현
from keras import layers, models

x = layers.Input(shape=(None, ))
h = layers.Activation('relu')(layers.Dense(None)(x))
y = layers.Activation('softmax')(layers.Dense(None)(h))

model = models.Model(x, y)

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
# 연쇄 방식 모델링, 함수형 구현
model = models.Sequential() # 모델 구조 정의 전 Sequential로 초기화
model.add(layers.Dense(None, activation='relu', input_shape=(None, )))
model.add(layers.Dense(None, activation='softmax'))

In [None]:
# 분산 방식 모델링, 객체지향형 구현
class ANN(models.Model):
    def __init__(self, input_num, hidden_num, output_num, **kwargs):
        hidden = layers.Dense(hidden_num)
        output = layers.Dense(output_num)
        relu = layers.Activation('relu')
        softmax = layers.Activation('softmax')
        super().__init__(**kwargs)

model = ANN(input_num, hidden_num, output_num)

In [None]:
# 연쇄 방식 모델링, 객체지향형 구현
class ANN(models.Sequential):
    def __init__(self, input_num, hidden_num, output_num, **kwargs):
        super().__init__(**kwargs)
        self.add(layers.Dense(hidden_num, activation='relu', input_shape=(input_num, )))
        self.add(layers.Dense(output_num, activation='softmax'))
        self.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
import numpy as np
# from keras import datasets  #mnist
from keras.datasets import mnist    # 이렇게 불러와야 실행됨
from keras.utils import np_utils # to_categorical

def Data_func():
    (X_train, y_train), (X_test, y_test) = mnist.load_data()
    y_train = np_utils.to_categorical(y_train)
    y_test = np_utils.to_categorical(y_test)

    L, W, H = X_train.shape
    X_train = X_train.reshape(-1, W*H)
    X_test = X_test.reshape(-1, W*H)

    X_train = X_train/255.0
    X_test = X_test/255.0

    return (X_train, y_train), (X_test, y_test)

In [None]:
import matplotlib.pyplot as plt

def plot_loss(history):
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc=0)

def plot_acc(history):
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc=0)

In [None]:
input_num = 784
hidden_num = 100
number_of_class = 10
output_num = number_of_class
model = ANN(input_num, hidden_num, output_num)
(X_train, y_train), (X_test, y_test) = Data_func()

history = model.fit(X_train, y_train, epochs=5, batch_size=100, validation_split=0.2)

performance_test = model.evaluate(X_test, y_test, batch_size=100)
print('Test loss and Accuracy -> {: .2f}, {: .2f}'.format(*performance_test))

In [None]:
plot_loss(history)
plt.show()
plot_acc(history)
plt.show()

In [None]:
# 시계열
from keras import layers, models

class ANN(models.Model):
    def __init__(self, Nin, Nh, Nout):
        hidden = layers.Dense(Nh)
        output = layers.Dense(Nout)
        relu = layers.Activation('relu')

        x = layers.Input(shape=(Nin, ))
        h = relu(hidden(x))
        y = output(h)

        super.__init__(x, y)
        self.compile(loss='mse', optimizer='sgd')

# 보스턴 집값 데이터
from keras.datasets import boston_housing
from sklearn import preprocessing

def Data_func():
    (X_train, y_train), (X_test, y_test) = boston_housing.load_data()
    scaler = preprocessing.MinMaxScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    return (X_train, y_train), (X_test, y_test)

# 시각화
from original.keraspp.skeras import plot_loss
import matplotlib.pyplot as plt

def main():
    Nin = 13
    Nh = 5
    Nout = 1

    model = ANN(Nin, Nh, Nout)
    (X_train, y_train), (X_test, y_test) = Data_func()
    history = model.fit(X_train, y_train, epochs=100, batch_size=100, validation_split=0.2, verbose=2)

    performance_test = model.evaluate(X_test, y_test, batch_size=100)
    print('\nTest Loss -> {: .2f}'.format(performance_test))

    plot_loss()
    plt.show()

# 3장 DNN

In [None]:
from keras import layers, models

Nin = 784
Nh_l = [100, 50]
number_of_class = 10
Nout = number_of_class

class DNN(models.Sequential):
    def __init__(self, Nin, Nh_1, Nout):
        super().__init__()

        self.add(layers.Dense(Nh_1[0], activation='relu', input_shape=(Nin, ), name='Hidden-1'))
        self.add(layers.Dropout(0.2))
        self.add(layers.Dense(Nh_1[1], activation='relu', name='Hidden-2'))
        self.add(layers.Dropout(0.2))
        self.add(layers.Dense(Nout, activation='softmax'))

        self.compile(loss='categorical_crossentropy', optimizer='adam', metrics='accuracy')

import numpy as np
from keras.datasets import mnist
from keras.utils import np_utils # to_categorical

def Data_func():
    (X_train, y_train), (X_test, y_test) = mnist.load_data()
    y_train = np_utils.to_categorical(y_train)
    y_test = np_utils.to_categorical(y_test)

    L, W, H = X_train.shape
    X_train = X_train.reshape(-1, W*H)
    X_test = X_test.reshape(-1, W*H)

    X_train = X_train/255.0
    X_test = X_test/255.0

    return (X_train, y_train), (X_test, y_test)

(X_train, y_train), (X_test, y_test) = Data_func()

model = DNN(Nin, Nh_l, Nout)
history = model.fit(X_train, y_train, epochs=10, batch_size=100, validation_split=0.2)
performance_test = model.evaluate(X_test, y_test, batch_size=100)
print('Test Loss and Accuracy ->', performance_test)

In [None]:
# 컬러 이미지
import numpy as np
from keras.datasets import cifar10
from keras.utils import np_utils

def Data_func():
    (X_train, y_train), (X_test, y_test) = cifar10.load_data()
    y_train = np_utils.to_categorical(y_train)
    y_test = np_utils.to_categorical(y_test)

    L, W, H, C = X_train.shape
    X_train = X_train.reshape(-1, W*H*C)
    X_test = X_test.reshape(-1, W*H*C)

    X_train = X_train/255.0
    X_test = X_test/255.0

    return (X_train, y_train), (X_test, y_test)


# 모델링
from keras import layers, models

class DNN(models.Sequential):
    def __init__(self, Nin, Nh_l, Pd_l, Nout):
        super().__init__()

        self.add(layers.Dense(Nh_l[0], activation='relu', input_shape=(Nin, ), name='Hiiden-1'))
        self.add(layers.Dropout(Pd_l[0]))
        self.add(layers.Dense(Nh_l[1], activation='relu', input_shape=(Nin, ), name='Hiiden-2'))
        self.add(layers.Dropout(Pd_l[1]))
        self.add(layers.Dense(Nout, activation='softmax'))
        self.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])

# 학습 및 시각화
from original.keraspp.skeras import plot_loss, plot_acc
import matplotlib.pyplot as plt

Nh_l = [100, 50]
Pd_l = [0.0, 0.0]
number_of_class = 10
Nout = number_of_class

(X_train, y_train), (X_test, y_test) = Data_func()
model = DNN(X_train.shape[1], Nh_l, Pd_l, Nout)
history = model.fit(X_train, y_train, epochs=10, batch_size=100, validation_split=0.2)
performance_test = model.evaluate(X_test, y_test, batch_size=100)
print('Test Loss and Accuracy ->', performance_test)

plot_acc(history)
plt.show(block=False)
plt.pause(2)
plt.close()
plot_loss(history)
plt.show(block=False)
plt.pause(2)
plt.close()

# 4장 CNN

In [None]:
import keras
from keras import models, layers
import keras.backend
import keras.optimizers

class CNN(models.Sequential):
    def __init__(self, input_shape, num_classes):
        super().__init__()

        # convolutionL 특징점 찾기
        self.add(layers.Conv2D(32, kernel_size=(3,3), activation='relu', input_shape=input_shape))
        self.add(layers.Conv2D(64, (3,3), activation='relu'))
        self.add(layers.MaxPooling2D(pool_size=(2,2)))  # pool_size: 1개의 값으로 변화할 크기
        self.add(layers.Dropout(0.25))
        self.add(layers.Flatten())

        # dense: 실질적 분류 작업
        self.add(layers.Dense(128, activation='relu'))
        self.add(layers.Dropout(0.5))
        self.add(layers.Dense(num_classes, activation='softmax'))

        self.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adadelta(), metrics=['acc'])

from keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# 채널의 위치는 image_data_format에 따름
print(keras.backend.backend_config.image_data_format())

# 1 = 채널의 크기
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
# X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2], 1)
X_test = X_test.reshape(*X_test.shape, 1)
input_shape = (X_train.shape[1], X_train.shape[2], 1)

## 채널의 위치가 앞 단에 존재할 경우
# X_train = X_train.reshape(X_train.shape[0], 1, X_train.shape[1], X_train.shape[2])
# X_test = X_test.reshape(X_test.shape[0], 1, X_test.shape[1], X_test.shape[2])
# input_shape = (1, X_train.shape[1], X_train.shape[2])

In [None]:
import keras
from keras import models, layers
import keras.backend
import keras.optimizers

class CNN(models.Sequential):
    def __init__(self, input_shape, num_classes):
        super().__init__()

        # convolutionL 특징점 찾기
        self.add(layers.Conv2D(32, kernel_size=(3,3), activation='relu', input_shape=input_shape))
        self.add(layers.Conv2D(64, (3,3), activation='relu'))
        self.add(layers.MaxPooling2D(pool_size=(2,2)))  # pool_size: 1개의 값으로 변화할 크기
        self.add(layers.Dropout(0.25))
        self.add(layers.Flatten())

        # dense: 실질적 분류 작업
        self.add(layers.Dense(128, activation='relu'))
        self.add(layers.Dropout(0.5))
        self.add(layers.Dense(num_classes, activation='softmax'))

        self.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adadelta(), metrics=['acc'])

from keras.datasets import mnist
class DATA():
    def __init__(self):
        num_classes = 10

        (X_train, y_train), (X_test, y_test) = mnist.load_data()
        
        # 채널의 위치는 image_data_format에 따름
        print(keras.backend.backend_config.image_data_format())

        # 1 = 채널의 크기
        X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
        # X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2], 1)
        X_test = X_test.reshape(*X_test.shape, 1)
        input_shape = (X_train.shape[1], X_train.shape[2], 1)

        ## 채널의 위치가 앞 단에 존재할 경우
        # X_train = X_train.reshape(X_train.shape[0], 1, X_train.shape[1], X_train.shape[2])
        # X_test = X_test.reshape(X_test.shape[0], 1, X_test.shape[1], X_test.shape[2])
        # input_shape = (1, X_train.shape[1], X_train.shape[2])

        X_train = X_train.astype('float32')
        X_test = X_test.astype('float32')
        X_train /= 255
        X_test /= 255

        y_train = keras.utils.to_categorical(y_train, num_classes)
        y_test = keras.utils.to_categorical(y_test, num_classes)
        
        self.input_shape = input_shape
        self.num_classes = num_classes
        self.X_train, self.y_train = X_train, y_train
        self.X_test, self.y_test = X_test, y_test

data = DATA()
model = CNN(data.input_shape, data.num_classes)
history = model.fit(data.X_train, data.y_train, batch_size=128, epochs=10, validation_split=0.2)
score = model.evaluate(data.X_test, data.y_test)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

plot_acc(history)
plt.show(block=False)
plt.pause(2)
plt.close()

plot_loss(history)
plt.show(block=False)
plt.pause(2)
plt.close()

In [None]:
# 컬러
import keras.backend
from sklearn import model_selection, metrics
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import matplotlib.pyplot as plt
import os
from keras import backend as K
from keras.utils import np_utils
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from original.keraspp import skeras, sfile

class CNN(Model):
    def __init__(model, nb_classes, in_shape=None):
        model.nb_classes = nb_classes
        model.in_shape = in_shape
        model.build_model()
        super().__init__(model.x, model.y)
        model.cl_part = Model(model.x, model.z_cl)
        model.fl_part = Model(model.x, model.z_fl)
        model.compile()
    
    def build_model(model):
        nb_classes = model.nb_classes
        in_shape = model.in_shape

        x = Input(in_shape)
        h = Conv2D(32, kernel_size=(3,3), activation='relu', input_shape=in_shape)(x)
        h = Conv2D(64, (3,3), activation='relu')(h)
        h = MaxPooling2D(pool_size=(2,2))(h)
        h = Dropout(0.25)(h)
        h = Flatten()(h)
        z_cl = h

        h = Dense(128, activation='relu')(h)
        h = Dropout(0.5)(h)
        z_fl = h

        y = Dense(nb_classes, activation='softmax', name='preds')(h)
        model.z_cl = z_cl
        model.z_fl = z_fl
        model.x, model.y = x, y

    def compile(model):
        Model.compile(model, loss='categorical_crossentropy', optimizer='adadelta', metrics=['acc'])

class DataSet():
    def __init__(self, X, y, nb_classes, scaling=True, test_size=0.2, random_state=0):
        self.X = X
        self.add_channels()
        X = self.X

        X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=0.2, random_state=random_state)
        X_train = X_train.astype('float32')
        X_test = X_test.astype('float32')

        if scaling:
            # scaling to have (0, 1) for each feature (each pixel)
            scaler = MinMaxScaler()
            n = X_train.shape[0]
            X_train = scaler.fit_transform(X_train.reshape(n, -1)).reshape(X_train.shape)
            n = X_test.shape[0]
            X_test = scaler.transform(X_test.reshape(n, -1)).reshape(X_test.shape)
            self.scaler = scaler
        
        y_train = np_utils.to_categorical(y_train, nb_classes)
        y_test = np_utils.to_categorical(y_test, nb_classes)
        
        self.X_train, self.X_test = X_train, X_test
        self.y_train, self.y_test = y_train, y_test
    
    def add_channels(self):
        X = self.X

        if len(X.shape) == 3:
            N, img_rows, img_cols = X.shape

            if K.image_data_format() == 'channels_first':
                X = X.reshape(X.shape[0], 1, img_rows, img_cols)
                input_shape = (1, img_rows, img_cols)
            else:
                X = X.reshape(X.shape[0], img_rows, img_cols, 1)
                input_shape = (img_rows, img_cols, 1)
        else:
            input_shape = X.shape[1:]   # channel is already included
        
        self.X = X
        self.input_shape = input_shape

class Machine():
    def __init__(self, X, y, nb_classes=2, fig=True):
        self.nb_classes = nb_classes
        self.set_data(X, y)
        self.set_model()
        self.fig = fig
    
    def set_data(self, X, y):
        nb_classes = self.nb_classes
        self.data = DataSet(X, y, nb_classes)
    
    def set_model(self):
        nb_classes = self.nb_classes
        data = self.data
        self.model = CNN(nb_classes=nb_classes, in_shape=data.input_shape)
    
    def fit(self, nb_epoch=10, batch_size=128, verbose=1):
        data = self.data
        model = self.model
        history = model.fit(data.X_train, data.y_train, batch_size=batch_size, epochs=nb_epoch, verbose=verbose, validation_data=(data.X_test, data.y_test))
        return history

    def run(self, nb_epoch=10, batch_size=128, verbose=1):
        data = self.data
        model = self.model
        fig = self.fig
    
        history = self.fit(nb_epoch=nb_epoch, batch_size=batch_size, verbose=verbose)
        score = model.evaluate(data.X_test, data.y_test, verbose=0)

        print('Confusion matrix')
        y_test_pred = model.predict(data.X_test, verbose=0)
        y_test_pred = np.argmax(y_test_pred, axis=1)
        print(metrics.confusion_matrix(np.argmax(data.y_test, axis=1), y_test_pred))

        print('Test score:', score[0])
        print('Test accuracy:', score[1])

        suffix = sfile.unique_filename('datatime')
        foldname = 'output_' + suffix
        os.makedirs(foldname)
        skeras.save_history_history('history_history.npy', history.history, fold=foldname)
        model.save_weights(os.path.join(foldname, 'dl_model.h5'))
        print('Output results are saved in', foldname)

        if fig:
            plt.figure(figsize=(12,4))
            plt.subplot(1, 2, 1)
            skeras.plot_acc(history)
            plt.subplot(1, 2, 2)
            skeras.plot_loss(history)
            plot_acc(history)
            plt.show(block=False)
            plt.pause(2)
            plt.close()

        self.history = history

from keras.datasets import cifar10
import keras
assert keras.backend.image_data_format() == 'channels_last'
from original.keraspp import aicnn

class Machine(Machine):
    def __init__(self):
        (X, y), (X_test, y_test) = cifar10.load_data()
        super().__init__(X=X, y=y, nb_classes=10)
    
def main():
    m = Machine()
    m.run()

if __name__ == '__main__':
    main()

# 5장 RNN

In [None]:
from __future__ import print_function   # 파이썬 2와 3 간의 호환성 위한 패키지
# 파이썬 2에선 print를 괄호 없이 사용하지만 이 함수를 호출함으로써 함수로써(괄호와 함꼐) 사용하게 함

# 자연어
from keras.preprocessing import sequence
from keras.utils import pad_sequences
from keras.datasets import imdb
from keras import layers, models

class Data:
    def __init__(self, max_features=20000, maxlen=80):
        (x_train, self.y_train), (x_test, self.y_test) = imdb.load_data(num_words=max_features)

        # x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
        # x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
        # pad_sequences의 경로가 바뀜
        self.x_train = pad_sequences(x_train, maxlen=maxlen)
        self.x_test = pad_sequences(x_test, maxlen=maxlen)

class RNN_LSTM(models.Model):
    def __init__(self, max_features, maxlen):
        # model = models.Sequential()
        # model.add(layers.Embedding(max_features, 128))
        # model.add(layers.LSTM(128, dropout=0.2, recurrent_dropout=0.2))
        # model.add(layers.Dense(1, activation='sigmoid'))
        # model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['acc'])

        x = layers.Input((maxlen, ))
        h = layers.Embedding(max_features, 128)(x)
        h = layers.LSTM(128, dropout=0.2, recurrent_dropout=0.2)(h)
        y = layers.Dense(1, activation='sigmoid')(h)
        super().__init__(x, y)
        self.compile(loss='binary_crossentropy', optimizer='adam', metrics=['acc'])

class Machine:
    def __init__(self, max_features=20000, maxlen=80):
        self.data = Data(max_features, maxlen)
        self.model = RNN_LSTM(max_features, maxlen)
    
    def run(self, epochs=3, batch_size=32):
        data = self.data
        model = self.model

        print('Training stage')
        print('====================')
        model.fit(data.x_train, data.y_train, batch_size=batch_size, epochs=epochs, validation_data=(data.x_test, data.y_test))
        score, acc = model.evaluate(data.x_test, data.y_test, batch_size=batch_size)
        print('Test performance: accuracy{0}, loss={1}'.format(acc, score))

def main():
    m = Machine()
    m.run(batch_size=512)

if __name__ == '__main__':
    main()

In [None]:
# 시계열
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn import model_selection
from keras import models, layers

from original.keraspp import skeras

def main():
    machine = Machine()
    machine.run(epochs=400)

class Machine():
    def __init__(self):
        self.data = Dataset()
        shape = self.data.X.shape[1:]
        self.model = rnn_model(shape)
    
    def run(self, epochs=400):
        d = self.data
        X_train, X_test = d.X_train, d.X_test
        y_train, y_test = d.y_train, d.y_test
        X, y = d.X, d.y
        
        m = self.model
        h = m.fit(X_train, y_train, epochs=epochs, validation_data=[X_test, y_test], verbose=0)
        skeras.plot_loss(h)
        plt.title('History of training')
        plt.show(block=False)
        plt.pause(2)
        plt.close()

        yp = m.predict(X_test)
        print('Loss:', m.evaluate(X_test, y_test))
        plt.plot(yp, label='Original')
        plt.plot(y_test, label='Prediction')
        plt.legend(loc=0)
        plt.title('Validation Results')
        plt.show(block=False)
        plt.pause(2)
        plt.close()

        yp = m.predict(X_test).reshape(-1)
        print('Loss:', m.evaluate(X_test, y_test))
        print(yp.shape, y_test.shape)
        df = pd.DataFrame()
        df['Sample'] = list(range(len(y_test))) * 2
        df['Normalized #Passengers'] = np.concatenate([y_test, yp], axis=0)
        df['Type'] = ['Original'] * len(y_test) + ['Prediction'] * len(yp)

        plt.figure(figsize=(7, 5))
        sns.barplot(x='Sample', y='Normalized #Passengers', hue='Type', data=df)
        plt.ylabel('Normalized #Passengers')
        plt.show(block=False)
        plt.pause(2)
        plt.close()

        yp = m.predict(X)
        plt.plot(yp, label='Original')
        plt.plot(y, label='Prediction')
        plt.legend(loc=0)
        plt.title('All Results')
        plt.show(block=False)
        plt.pause(2)
        plt.close()

def rnn_model(shape):
    m_x = layers.Input(shape=shape)
    m_h = layers.LSTM(10)(m_x)
    m_y = layers.Dense(1)(m_h)
    m = models.Model(m_x, m_y)
    m.compile('adam', 'mean_squared_error')
    m.summary()

    return m

class Dataset:
    def __init__(self, fname='original/international-airline-passengers.csv', D=12):
        data_dn = load_data(fname=fname)
        X, y = get_Xy(data_dn, D=D)
        X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=0.2, random_state=42)
        self.X, self.y = X, y
        self.X_train, self.X_test = X_train, X_test
        self.y_train, self.y_test = y_train, y_test
    
def load_data(fname='original/international-airline-passengers.csv'):
    dataset = pd.read_csv(fname, usecols=[1], engine='python', skipfooter=3)
    data = dataset.values.reshape(-1)

    plt.plot(data)
    plt.xlabel('Time')
    plt.ylabel('#Passengers')
    plt.title('Original Data')
    plt.show(block=False)
    plt.pause(2)
    plt.close()

    data_dn = (data - np.mean(data)) / np.std(data) / 5
    plt.plot(data_dn)
    plt.xlabel('Time')
    plt.ylabel('Normalized #Passengers')
    plt.title('Normalized data by $E[]$ and $5\sigma$')
    plt.show(block=False)
    plt.pause(2)
    plt.close()
    
    return data_dn

def get_Xy(data, D=12):
    # make X and y
    X_l = []
    y_l = []
    N = len(data)
    assert N>D, 'N should be larger than D, where N is len(data)'
    for ii in range(N-D-1):
        X_l.append(data[ii:ii+D])
        y_l.append(data[ii+D])
    X = np.array(X_l)
    X = X.reshape(X.shape[0], X.shape[1], 1)
    y = np.array(y_l)
    print(X.shape, y.shape)

    return X, y


if __name__ == '__main__':
    main()

# 6장 AE

In [None]:
from keras import layers, models

class AE(models.Model):
    def __init__(self, x_nodes, z_dim):
        x_shape = (x_nodes, )
        x = layers.Input(shape=x_shape) # 입력 계층
        z = layers.Dense(z_dim, activation='relu')(x)   # 은닉 계층
        y = layers.Dense(x_nodes, activation='sigmoid')(z)  # 출력 계층
        super().__init__(x, y)
        self.compile(optimizer='adam', loss='binary_crossentropy', metrics=['acc'])

        self.x = x
        self.z = z
        self.z_dim = z_dim
        
    def Encoder(self):	# output을 z로 불러와서 encoder 부분만으로 만든 모델
        return models.Model(self.x, self.z)
    
    def Decoder(self):	# deoder부분인 y_layer를 가져온 후 input을 맞춘 새로운 z를 입력한 모델
        z_shape = (self.z_dim, )
        z = layers.Input(shape=z_shape)
        y_layer = self.layers[-1]
        y = y_layer(z)

        return models.Model(z, y)


import matplotlib.pyplot as plt

def show_ae(autoencoder):
    (_, _), (x_test, _) = mnist.load_data()
    x_test = x_test.astype('float32') / 255.
    x_test = x_test.reshape((len(x_test), -1))
    
    encoder = autoencoder.Encoder()
    decoder = autoencoder.Decoder()
    encoded_imgs = encoder.predict(x_test)
    decoded_imgs = decoder.predict(encoded_imgs)

    n = 10
    plt.figure(figsize=(20, 6))
    for i in range(n):
        ax = plt.subplot(3, n, i+1)
        plt.imshow(x_test[i].reshape(28, 28))
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        
        ax = plt.subplot(3, n, i+1+n)
        plt.stem(encoded_imgs[i].reshape(-1))
        
        ax = plt.subplot(3, n, i+1+n+n)
        plt.imshow(decoded_imgs[i].reshape(28, 28))

    plt.show(block=False)
    plt.pause(2)
    plt.close()


from keras.datasets import mnist
from original.keraspp.skeras import plot_loss, plot_acc

def main():
    (x_train, _), (x_test, _) = mnist.load_data()
    x_train = x_train.astype('float32') / 255.
    x_test = x_test.astype('float32') / 255.
    x_train = x_train.reshape((len(x_train), -1))
    x_test = x_test.reshape((len(x_test), -1))

    x_nodes = 784
    z_dim = 36

    autoencoder = AE(x_nodes, z_dim)
    history = autoencoder.fit(x_train, x_train, epochs=10, batch_size=1024, shuffle=True, validation_data=(x_test, x_test))
    plot_acc(history)
    plt.show(block=False)
    plt.pause(2)
    plt.close()

    plot_loss(history)
    plt.show(block=False)
    plt.pause(2)


In [None]:
# 합성곱 AE 모델링
from keras import layers, models

def Conv2D(filters, kernel_size, padding='same', activation='relu'):
    return layers.Conv2D(filters, kernel_size, padding=padding, activation=activation)

class AE(models.Model):
    def __init__(self, org_shape):
        # Input
        original = layers.Input(shape=org_shape)
        x = Conv2D(4, (3,3))(original)
        x = layers.MaxPooling2D((2,2), padding='same')(x)
        x = Conv2D(8, (3,3))(x)
        x = layers.MaxPooling2D((2,2), padding='same')(x)

        z = Conv2D(1, (7,7))(x)  # encoding의 output이자 decoding의 input

        y = Conv2D(16, (3,3))(z)
        y = layers.UpSampling2D((2,2))(y)
        y = Conv2D(8, (3,3))(y)
        y = layers.UpSampling2D((2,2))(y)
        y = Conv2D(4, (3,3))(y)

        decoded = Conv2D(1, (3,3), activation='sigmoid')(y)

        super().__init__(original, decoded)
        self.compile(optimizer='adam', loss='binary_crossentropy', metrics=['acc'])


from practice_03 import DATA
from original.keraspp.skeras import plot_loss, plot_acc
import matplotlib.pyplot as plt
from keras import backend

def show_ae(autoencoder, data):
    x_test = data.X_test
    decoded_imgs = autoencoder.predict(x_test)
    print(decoded_imgs.shape, data.X_test.shape)

    if backend.image_data_format() == 'channels_first':
        N, n_ch, n_i, n_j = x_test.shape
    else:
        N, n_i, n_j, n_ch = x_test.shape
    
    x_test = x_test.reshape(N, n_i, n_j)
    decoded_imgs = decoded_imgs.reshape(decoded_imgs.shape[0], n_i, n_j)

    n = 10
    plt.figure(figsize=(20, 4))
    for i in range(n):
        ax = plt.subplot(2, n, i+1)
        plt.imshow(x_test[i])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        ax = plt.subplot(2, n, i+1+n)
        plt.imshow(decoded_imgs[i])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

def main(epochs=20, batch_size=128):
    data = DATA()
    autoencoder = AE(data.input_shape)
    history = autoencoder.fit(data.X_train, data.X_train, epochs=epochs, batch_size=batch_size, shuffle=True, validation_split=0.2)
    # history = autoencoder.fit(data.X_train, data.X_train, epochs=epochs, batch_size=batch_size, shuffle=True, validation_data=(data.X_test, data.X_test))

    plot_acc(history)
    plt.show(block=False)
    plt.pause(2)
    plt.close()

    plot_loss(history)
    plt.show(block=False)
    plt.pause(2)
    plt.close()

    show_ae(autoencoder, data)
    plt.show(block=False)
    plt.pause(2)
    plt.close()


if __name__ == '__main__':
    main(batch_size=512)

# 7장 GAN

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from keras import models
from keras.layers import Dense, Conv1D, Reshape, Flatten, Lambda
from keras.optimizers import Adam
from keras import backend as K

def main():
    machine = Machine(n_batch=1, ni_D=100)
    machine.run(n_repeat=200, n_show=200, n_test=100)

class Data:
    def __init__(self, mu, sigma, ni_D):
        self.real_sample = lambda n_batch: np.random.normal(mu, sigma, (n_batch, ni_D)) # 허구 데이터
        self.in_sample = lambda n_batch: np.random.rand(n_batch, ni_D)   # 노이즈

class Machine:
    def __init__(self, n_batch=10, ni_D=100):
        data_mean = 4
        data_stddev = 1.25
        self.data = Data(data_mean, data_stddev, ni_D)
        
        self.gan = GAN(ni_D=ni_D, nh_D=50, nh_G=50)
        self.n_batch = n_batch
        self.n_iter_D = 1
        self.n_iter_G = 5

    def run_epochs(self, epochs, n_test):
                   self.train((epochs))
                   self.test_and_show(n_test)

    def run(self, n_repeat=30000//200, n_show=200, n_test=100):
        for ii in range(n_repeat):
            print('Stage', ii, '(Epoch: {}'.format(ii*n_show))
            self.run_epochs(n_show, n_test)
            plt.show(block=False)
            plt.pause(2)
            plt.close()
    
    def train(self, epochs):
        for epoch in range(epochs):
            self.train_each()

    def train_each(self):
        for it in range(self.n_iter_D):
            self.train_D()
        for it in range(self.n_iter_G):
            self.train_GD()
    
    def test(self, n_test):
        gan = self.gan
        data = self.data
        Z = data.in_sample(n_test)
        Gen = gan.G.predict(Z)

        return Gen, Z
    
    def train_D(self):
        gan = self.gan
        n_batch = self.n_batch
        data = self.data

        Real = data.real_sample(n_batch)    # (n_batch, ni_D)
        Z = data.in_sample(n_batch)         # (n_batch, ni_D)
        Gen = gan.G.predict(Z)              # (n_batch, ni_D)
        gan.D.trainable = True
        gan.D_train_on_batch(Real, Gen)
    
    def train_GD(self):
        gan = self.gan
        n_batch = self.n_batch
        data = self.data
        Z = data.in_sample(n_batch)

        gan.D.trainable = False
        gan.GD_train_on_batch(Z)
    
    def test_and_show(self, n_test):
        data = self.data
        Gen, Z = self.test(n_test)
        Real = data.real_sample(n_test)
        self.show_hist(Real, Gen, Z)
        Machine.print_stat(Real, Gen)

    def show_hist(self, Real, Gen, Z):
        plt.hist(Real.reshape(-1), histtype='step', label='Real')
        plt.hist(Gen.reshape(-1), histtype='step', label='Generated')
        plt.hist(Z.reshape(-1), histtype='step', label='Input')
        plt.legend(loc=0)
    
    @staticmethod   # 인스턴스에 접근하지 않고 바로 함수로써 사용할 수 있게 하는 데코레이터
    def print_stat(Real, Gen):
        def stat(d):
            return (np.mean(d), np.std(d))
        print('Mean and Std of Real:', stat(Real))
        print('Mean and Std of Gen:', stat(Gen))

def add_decorate(x):
    m = K.mean(x, axis=-1, keepdims=True)
    d = K.square(x - m)

    return K.concatenate([x, d], axis=-1)

def add_decorate_shape(input_shape):
    shape = list(input_shape)
    assert len(shape)==2
    shape[1] *= 2
    return tuple(shape)

lr = 2e-4
adam = Adam(learning_rate=lr, beta_1=0.9, beta_2=0.999)
def model_compile(model):
    return model.compile(loss='binary_crossentropy', optimizer=adam, metrics=['acc'])

class GAN:
    def __init__(self, ni_D, nh_D, nh_G):
        self.ni_D = ni_D
        self.nh_D = nh_D
        self.nh_G = nh_G

        self.D = self.gen_D()
        self.G = self.gen_G()
        self.GD = self.make_GD()

    def gen_D(self):
        ni_D = self.ni_D
        nh_D = self.nh_D
        
        D = models.Sequential()
        D.add(Lambda(add_decorate, output_shape=add_decorate_shape, input_shape=(ni_D, )))
        D.add(Dense(nh_D, activation='relu'))
        D.add(Dense(nh_D, activation='relu'))
        D.add(Dense(1, activation='sigmoid'))
        model_compile(D)
         
        return D

    
    def gen_G(self):
        ni_D = self.ni_D
        nh_G = self.nh_G
        G = models.Sequential()
        G.add(Reshape((ni_D, 1), input_shape=(ni_D, )))
        G.add(Conv1D(nh_G, 1, activation='relu'))
        G.add(Conv1D(nh_G, 1, activation='sigmoid'))
        G.add(Conv1D(1,1))
        G.add(Flatten())
        model_compile(G)
        
        return G

    def make_GD(self):
        G, D = self.G, self.D
        GD = models.Sequential()
        GD.add(G)
        GD.add(D)
        D.trainable = False
        model_compile(GD)
        D.trainable = True

        return GD
    
    def D_train_on_batch(self, Real, Gen):
        D = self.D
        X = np.concatenate([Real, Gen], axis=0)
        y = np.array([1]*Real.shape[0] + [0]*Gen.shape[0])
        D.train_on_batch(X, y)
    
    def GD_train_on_batch(self, Z):
        GD = self.GD
        y = np.array([1]*Z.shape[0])
        GD.train_on_batch(Z, y)

if __name__ == '__main__':
    main()

In [None]:
# 합성곱 계층 GAN
from keras.datasets import mnist
from PIL import Image
import numpy as np
import math, os
import keras.backend as K
import tensorflow as tf

K.set_image_data_format('channels_first')
print(K.image_data_format)

def mse_4d(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=(1,2,3))

def mse_4d_tf(y_true, y_pred):
    return tf.reduce_mean(tf.square(y_pred - y_true), axis=(1,2,3))


import argparse

def main():
    parser = argparse.ArgumentParser()  # 인자값을 처리하는 클래스인데 용처를 잘 모르겠음

    parser.add_argument('--batch_size', type=int, default=16, help='Batch size for the networks')
    parser.add_argument('--epochs', type=int, default=1000, help='Epochs for the networks')
    parser.add_argument('--output_fold', type=str, default='GAN_OUT', help='Output fold to save the results')
    parser.add_argument('--input_dim', type=int, default=10, help='Input dimension for the generator')
    parser.add_argument('--n_train', type=int, default=32, help='The number of training data')

    args = parser.parse_args()


from keras import models, layers, optimizers

class GAN(models.Sequential):
    def __init__(self, input_dim=64):
        super().__init__()
        self.input_dim = input_dim
        self.generator = self.GENERATOR()
        self.discriminator = self.DISCRIMINATOR()

        self.add(self.generator)
        self.discriminator.trainable = False
        self.add(self.discriminator)
        self.compile_all()
    
    def compile_all(self):
        d_optim = optimizers.SGD(learning_rate=0.0005, momentum=0.9, nesterov=True)
        g_optim = optimizers.SGD(learning_rate=0.0005, momentum=0.9, nesterov=True)

        self.generator.compile(loss=mse_4d_tf, optimizer='SGD')
        self.compile(loss='binary_crossentropy', optimizer=g_optim)
        self.discriminator.trainable = True
        self.discriminator.compile(loss='binary_crossentropy', optimizer=d_optim)
    
    def GENERATOR(self):
        input_dim = self.input_dim
        model = models.Sequential()
        model.add(layers.Dense(1024, activation='tanh', input_dim=input_dim))
        model.add(layers.Dense(128*7*7, activation='tanh'))
        model.add(layers.BatchNormalization())
        model.add(layers.Reshape((128,7,7), input_shape=(128*7*7, )))
        model.add(layers.UpSampling2D(size=(2,2)))
        model.add(layers.Conv2D(64, (5,5), padding='same', activation='tanh'))
        model.add(layers.UpSampling2D(size=(2,2)))
        model.add(layers.Conv2D(1, (5,5), padding='same', activation='tanh'))

        return model
        
    def DISCRIMINATOR(self):
        model = models.Sequential()
        model.add(layers.Conv2D(64, (5,5), padding='same', activation='tanh', input_shape=(1, 28, 28)))
        model.add(layers.MaxPooling2D(pool_size=(2,2)))
        model.add(layers.Conv2D(128, (5,5), activation='tanh'))
        model.add(layers.MaxPooling2D(pool_size=(2,2)))
        model.add(layers.Flatten())
        model.add(layers.Dense(1024, activation='tanh'))
        model.add(layers.Dense(1, activation='sigmoid'))
    
        return model
    
    def get_z(self, ln):
        input_dim = self.input_dim
        return np.random.uniform(-1, 1, (ln, input_dim))
    

    def train_both(self, x):
        ln = x.shape[0]
        # First trial for training discriminator
        z = self.get_z(ln)
        w = self.generator.predict(z, verbose=0)
        xw = np.concatenate((x, w))
        y2 = [1] * ln + [0] * ln

        # train_on_batch에서 발생하는 오류 해결 못함
        # AttributeError: 'int' object has no attribute 'shape'

        # d_loss = self.discriminator.train_on_batch(xw, y2)
        d_loss = 0
        z = self.get_z(ln)
        self.discriminator.trainable = False

        # g_loss = self.generator.train_on_batch(z, [1] * ln)
        g_loss = 0
        self.discriminator.trainable = True

        return d_loss, g_loss



def combine_images(generated_images):
    num = generated_images.shape[0]
    width = int(math.sqrt(num))
    height = int(math.ceil(float(num) / width))
    shape = generated_images.shape[2:]
    image = np.zeros((height * shape[0], width * shape[1]),
                     dtype=generated_images.dtype)
    for index, img in enumerate(generated_images):
        i = int(index / width)
        j = index % width
        image[i * shape[0]:(i + 1) * shape[0],
        j * shape[1]:(j + 1) * shape[1]] = img[0, :, :]
    return image


def get_x(X_train, index, BATCH_SIZE):
    return X_train[index * BATCH_SIZE:(index + 1) * BATCH_SIZE]


def save_images(generated_images, output_fold, epoch, index):
    image = combine_images(generated_images)
    image = image * 127.5 + 127.5
    Image.fromarray(image.astype(np.uint8)).save(
        output_fold + '/' +
        str(epoch) + "_" + str(index) + ".png")


def load_data(n_train):
    (X_train, y_train), (_, _) = mnist.load_data()

    return X_train[:n_train]


def train(args):
    BATCH_SIZE = args.batch_size
    epochs = args.epochs
    output_fold = args.output_fold
    input_dim = args.input_dim
    n_train = args.n_train

    os.makedirs(output_fold, exist_ok=True)
    print('Output_fold is', output_fold)

    X_train = load_data(n_train)
    X_train = (X_train.astype(np.float32) - 127.5) / 127.5
    X_train = X_train.reshape((X_train.shape[0], 1) + X_train.shape[1:])

    gan = GAN(input_dim)

    d_loss_ll = []
    g_loss_ll = []
    for epoch in range(epochs):
        print("Epoch is", epoch)
        print("Number of batches", int(X_train.shape[0] / BATCH_SIZE))

        d_loss_l = []
        g_loss_l = []
    
        for index in range(int(X_train.shape[0] / BATCH_SIZE)):
            x = get_x(X_train, index, BATCH_SIZE)
            
            d_loss, g_loss = gan.train_both(x)
            d_loss_l.append(d_loss)
            g_loss_l.append(g_loss)

        if epoch%10 == 0 or epoch == epochs-1:
            z = gan.get_z(x.shape[0])
            w = gan.generator.predict(z, verbose=0)
            save_images(w, output_fold, epoch, index)
        
        d_loss_ll.append(d_loss_l)
        g_loss_ll.append(g_loss_l)
    
    gan.generator.save_weights(output_fold + '/' + 'generator', True)
    gan.discriminator.save_weights(output_fold + '/' + 'discriminator', True)

    np.savetxt(output_fold + '/' + 'd_loss', d_loss_ll)
    np.savetxt(output_fold + '/' + 'g_loss', g_loss_ll)



if __name__ == '__main__':
    main()
    
    train(args)