<a href="https://colab.research.google.com/github/sheepjun96/python_Deeplearning/blob/main/Chapter_5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [382]:
%run ./mathutil.ipynb

In [383]:
import numpy as np
import time
import os
np.random.seed(1234)

In [384]:
class Model(object):
    def __init__(self, name, dataset):
        self.name = name
        self.dataset = dataset
        self.is_training = False
        if not hasattr(self, 'rand_std'):  self.rand_std = 0.030

    def __str__(self):
        return f'{self.name}/{self.dataset}'

    def exec_all(self, epoch_count=10, batch_size=10, learning_rate=0.001, report=0, show_cnt=3):
        self.train(epoch_count, batch_size, learning_rate, report)
        self.test()
        if show_cnt > 0:  self.visualize(show_cnt)

In [385]:
class MlpModel(Model):
    def __init__(self, name, dataset, hconfigs):
        super(MlpModel, self).__init__(name, dataset)
        self.init_parameters(hconfigs)

In [386]:
def mlp_init_parameters(self, hconfigs):
    self.hconfigs = hconfigs
    self.pm_hiddens = []

    prev_shape = self.dataset.input_shape

    for hconfig in hconfigs:
        pm_hidden, prev_shape = self.alloc_layer_param(prev_shape, hconfig)
        self.pm_hiddens.append(pm_hidden)

    output_cnt = int(np.prod(self.dataset.output_shape))
    self.pm_output, _ = self.alloc_layer_param(prev_shape, output_cnt)

In [387]:
def mlp_alloc_layer_param(self, input_shape, hconfig):
    input_cnt = np.prod(input_shape)
    output_cnt = hconfig

    weight, bias = self.alloc_param_pair([input_cnt, output_cnt])

    return {'w':weight, 'b':bias}, output_cnt

In [388]:
def mlp_alloc_param_pair(self, shape):
    weight = np.random.normal(0, self.rand_std, shape)
    bias = np.zeros([shape[-1]])
    return weight, bias

In [389]:
MlpModel.init_parameters = mlp_init_parameters
MlpModel.alloc_layer_param = mlp_alloc_layer_param
MlpModel.alloc_param_pair = mlp_alloc_param_pair

In [390]:
def mlp_model_train(self, epoch_count=10, batch_size=10, learning_rate=0.001, report=0):
    self.learning_rate = learning_rate

    batch_count = int(self.dataset.train_count / batch_size)
    time1 = time2 = int(time.time())
    if report != 0:
        print(f"Model {self.name} train started:")

    for epoch in range(epoch_count):
        costs = []
        accs = []
        self.dataset.shuffle_train_data(batch_size*batch_count)
        for n in range(batch_count):
            trX, trY = self.dataset.get_train_data(batch_size, n)
            cost, acc = self.train_step(trX, trY)
            costs.append(cost)
            accs.append(acc)

        if report > 0 and (epoch+1) % report == 0:
            vaX, vaY = self.dataset.get_validate_data(100)
            acc = self.eval_accuracy(vaX, vaY)
            time3 = int(time.time())
            tm1, tm2 = time3-time2, time3-time1
            self.dataset.train_prt_result(epoch+1, costs, accs, acc, tm1, tm2)
            time2 = time3

    tm_total = int(time.time()) - time1
    print(f'Model {self.name} trian ended in {tm_total} secs:')

In [391]:
MlpModel.train = mlp_model_train

In [392]:
def mlp_model_test(self):
    teX, teY = self.dataset.get_test_data()
    time1 = int(time.time())
    acc = self.eval_accuracy(teX, teY)
    time2 = int(time.time())
    self.dataset.test_prt_result(self.name, acc, time2-time1)

In [393]:
MlpModel.test = mlp_model_test

In [394]:
def mlp_model_visualize(self, num):
    print(f'Model {self.name} Visualization')
    deX, deY = self.dataset.get_visualize_data(num)
    est = self.get_estimate(deX)
    self.dataset.visualize(deX, est, deY)

In [395]:
MlpModel.visualize = mlp_model_visualize

In [396]:
def mlp_train_step(self, x, y):
    self.is_training = True

    output, aux_nn = self.forward_neuralnet(x)
    loss, aux_pp = self.forward_postproc(output, y)
    accuracy = self.eval_accuracy(x, y, output)

    G_loss = 1.0
    G_output = self.backprop_postproc(G_loss, aux_pp)
    self.backprop_neuralnet(G_output, aux_nn)

    self.is_training = False

    return loss, accuracy

In [397]:
MlpModel.train_step = mlp_train_step

In [398]:
def mlp_forward_neuralnet(self, x):
    hidden = x
    aux_layers = []

    for n, hconfig in enumerate(self.hconfigs):
        hidden, aux = self.forward_layer(hidden, hconfig, self.pm_hiddens[n])
        aux_layers.append(aux)

    output, aux_out = self.forward_layer(hidden, None, self.pm_output)

    return output, [aux_out, aux_layers]

In [399]:
def mlp_backprop_neuralnet(self, G_output, aux):
    aux_out, aux_layers = aux

    G_hidden = self.backprop_layer(G_output, None, self.pm_output, aux_out)

    for n in reversed(range(len(self.hconfigs))):
        hconfig, pm, aux = self.hconfigs[n], self.pm_hiddens[n], aux_layers[n]
        G_hidden = self.backprop_layer(G_hidden, hconfig, pm, aux)

    return G_hidden

In [400]:
MlpModel.forward_neuralnet = mlp_forward_neuralnet
MlpModel.backprop_neuralnet = mlp_backprop_neuralnet

In [401]:
def mlp_forward_layer(self, x, hconfig, pm):
    y = np.matmul(x, pm['w']) + pm['b']
    if hconfig is not None: y = relu(y)
    return y, [x,y]

In [402]:
def mlp_backprop_layer(self, G_y, hconfig, pm, aux):
    x, y = aux

    if hconfig is not None:  G_y = relu_derv(y) * G_y

    g_y_weight = x.transpose()
    g_y_input = pm['w'].transpose()

    G_weight = np.matmul(g_y_weight, G_y)
    G_bias = np.sum(G_y, axis=0)
    G_input = np.matmul(G_y, g_y_input)

    pm['w'] -= self.learning_rate * G_weight
    pm['b'] -= self.learning_rate * G_bias

    return G_input

In [403]:
MlpModel.forward_layer = mlp_forward_layer
MlpModel.backprop_layer = mlp_backprop_layer

In [404]:
def mlp_forward_postproc(self, output, y):
    loss, aux_loss = self.dataset.forward_postproc(output, y)
    extra, aux_extra = self.forward_extra_cost(y)
    return loss + extra, [aux_loss, aux_extra]

In [405]:
def mlp_forward_extra_cost(self, y):
    return 0, None

In [406]:
MlpModel.forward_postproc = mlp_forward_postproc
MlpModel.forward_extra_cost = mlp_forward_extra_cost

In [407]:
def mlp_backprop_postproc(self, G_loss, aux):
    aux_loss, aux_extra = aux
    self.backprop_extra_cost(G_loss, aux_extra)
    G_output = self.dataset.backprop_postproc(G_loss, aux_loss)
    return G_output

In [408]:
def mlp_backprop_extra_cost(self, G_loss, aux):
    pass

In [409]:
MlpModel.backprop_postproc = mlp_backprop_postproc
MlpModel.backprop_extra_cost = mlp_backprop_extra_cost

In [410]:
def mlp_eval_accuracy(self, x, y, output=None):
    if output is None:
        output, _ = self.forward_neuralnet(x)
    accuracy = self.dataset.eval_accuracy(x, y, output)
    return accuracy

In [411]:
MlpModel.eval_accuracy = mlp_eval_accuracy

In [412]:
def mlp_get_estimate(self, x):
    output, _ = self.forward_neuralnet(x)
    estimate = self.dataset.get_estimate(output)
    return estimate

In [413]:
MlpModel.get_estimate = mlp_get_estimate

In [414]:
class Dataset(object):
    def __init__(self, name, mode):
        self.name = name
        self.mode = mode

    def __str__(self):
        return f'{self.name}({self.mode}, {len(self.tr_xs)}+{len(self.te_xs)}+{len(self.va_xs)})'

    @property
    def train_count(self):
        return len(self.tr_xs)

In [415]:
def dataset_get_train_data(self, batch_size, nth):
    from_idx = nth * batch_size
    to_idx = (nth + 1) * batch_size

    tr_X = self.tr_xs[self.indices[from_idx:to_idx]]
    tr_Y = self.tr_ys[self.indices[from_idx:to_idx]]

    return tr_X, tr_Y

In [416]:
def dataset_shuffle_train_data(self, size):
    self.indices = np.arange(size)
    np.random.shuffle(self.indices)

In [417]:
Dataset.get_train_data = dataset_get_train_data
Dataset.shuffle_train_data = dataset_shuffle_train_data

In [418]:
def dataset_get_test_data(self):
    return self.te_xs, self.te_ys

In [419]:
Dataset.get_test_data = dataset_get_test_data

In [420]:
def dataset_get_validate_data(self, count):
    self.va_indices = np.arange(len(self.va_xs))
    np.random.shuffle(self.va_indices)

    va_X = self.va_xs[self.va_indices[0:count]]
    va_Y = self.va_ys[self.va_indices[0:count]]

    return va_X, va_Y

In [421]:
Dataset.get_validate_data = dataset_get_validate_data
Dataset.get_visualize_data = dataset_get_validate_data

In [422]:
def dataset_shuffle_data(self, xs, ys, tr_ratio=0.8, va_ratio=0.05):
    data_count = len(xs)

    tr_cnt = int(data_count * tr_ratio / 10) * 10
    va_cnt = int(data_count * va_ratio)
    te_cnt = data_count - (tr_cnt + va_cnt)

    tr_from, tr_to = 0, tr_cnt
    va_from, va_to = tr_cnt, tr_cnt + va_cnt
    te_from, te_to = tr_cnt + va_cnt, data_count

    indices = np.arange(data_count)
    np.random.shuffle(indices)

    self.tr_xs = xs[indices[tr_from:tr_to]]
    self.tr_ys = ys[indices[tr_from:tr_to]]
    self.va_xs = xs[indices[va_from:va_to]]
    self.va_ys = ys[indices[va_from:va_to]]
    self.te_xs = xs[indices[te_from:te_to]]
    self.te_ys = ys[indices[te_from:te_to]]

    self.input_shape = xs[0].shape
    self.output_shape = ys[0].shape

    return indices[tr_from:tr_to], indices[va_from:va_to], indices[te_from:te_to]

In [423]:
Dataset.shuffle_data = dataset_shuffle_data

In [424]:
def dataset_forward_postproc(self, output, y, mode=None):
    if mode is None:  mode = self.mode

    if mode == 'regression':
        diff = output - y
        square = np.square(diff)
        loss = np.mean(square)
        aux = diff
    elif mode == 'binary':
        entropy = sigmoid_cross_entropy_with_logits(y, output)
        loss = np.mean(entropy)
        aux = [y, output]
    elif mode == 'select':
        entropy = softmax_cross_entropy_with_logits(y, output)
        loss = np.mean(entropy)
        aux = [output, y, entropy]

    return loss, aux

In [425]:
Dataset.forward_postproc = dataset_forward_postproc

In [426]:
def dataset_backprop_postproc(self, G_loss, aux, mode=None):
    if mode is None:  mode = self.mode

    if mode == 'regression':
        diff = aux
        shape = diff.shape

        g_loss_square = np.ones(shape) / np.prod(shape)
        g_square_diff = 2 * diff
        g_diff_output = 1

        G_square = g_loss_square * G_loss
        G_diff = g_square_diff * G_square
        G_output = g_diff_output * G_diff
    elif mode == 'binary':
        y, output = aux
        shape = output.shape

        g_loss_entropy = np.ones(shape) / np.prod(shape)
        g_entropy_output = sigmoid_cross_entropy_with_logits_derv(y, output)

        G_entropy = g_loss_entropy * G_loss
        G_output = g_entropy_output * G_entropy
    elif mode == 'select':
        output, y, entropy = aux

        g_loss_entropy = 1.0 / np.prod(entropy.shape)
        g_entropy_output = softmax_cross_entropy_with_logits_derv(y, output)

        G_entropy = g_loss_entropy * G_loss
        G_output = g_entropy_output * G_entropy

    return G_output

In [427]:
Dataset.backprop_postproc = dataset_backprop_postproc

In [428]:
def dataset_eval_accuracy(self, x, y, output, mode=None):
    if mode is None:  mode = self.mode

    if mode == 'regression':
        mse = np.mean(np.square(output - y))
        accuracy = 1 - np.sqrt(mse) / np.mean(y)
    elif mode == 'binary':
        estimate = np.greater(output, 0)
        answer = np.equal(y, 1.0)
        correct = np.equal(estimate, answer)
        accuracy = np.mean(correct)
    elif mode == 'select':
        estimate = np.argmax(output, axis=1)
        answer = np.argmax(y, axis=1)
        correct = np.equal(estimate, answer)
        accuracy = np.mean(correct)

    return accuracy

In [429]:
Dataset.eval_accuracy = dataset_eval_accuracy

In [430]:
def dataset_get_estimate(self, output, mode=None):
    if mode is None:  mode = self.mode

    if mode == 'regression':
        estimate = output
    elif mode == 'binary':
        estimate = sigmoid(output)
    elif mode == 'select':
        estimate = softmax(output)

    return estimate

In [431]:
Dataset.get_estimate = dataset_get_estimate

In [432]:
def dataset_train_prt_result(self, epoch, costs, accs, acc, time1, time2):
    print(f'    Epoch {epoch}: cost={np.mean(costs):5.3f}, accuracy={np.mean(accs):5.3f}/{acc:5.3f} ({time1}/{time2} secs)')

In [433]:
def dataset_test_prt_result(self, name, acc, time):
    print(f'Model {name} test report: accuracy = {acc:5.3f}, ({time} secs)\n')

In [434]:
Dataset.train_prt_result = dataset_train_prt_result
Dataset.test_prt_result = dataset_test_prt_result

In [435]:
class AbaloneDataset(Dataset):
    def __init__(self):
        super(AbaloneDataset, self).__init__('abalone', 'regression')

        rows, _ = load_csv('/content/abalone.csv')

        xs = np.zeros([len(rows), 10])
        ys = np.zeros([len(rows), 1])

        for n, row in enumerate(rows):
            if row[0] == 'I':  xs[n, 0] = 1
            if row[0] == 'M':  xs[n, 1] = 1
            if row[0] == 'F':  xs[n, 2] = 1
            xs[n, 3:] = row[1:-1]
            ys[n, :] = row[-1:]

        self.shuffle_data(xs, ys, 0.8)

    def visualize(self, xs, estimates, answers):
        for n in range(len(xs)):
            x, est, ans = xs[n], estimates[n], answers[n]
            xstr = vector_to_str(x, '%4.2f')
            print(f'{xstr} => 추정 {est[0]:4.1f} : 정답 {ans[0]:4.1f}')

In [436]:
class PulsarDataset(Dataset):
    def __init__(self):
        super(PulsarDataset, self).__init__('pulsar', 'binary')

        rows, _ = load_csv('/content/pulsar_stars.csv')

        data = np.asarray(rows, dtype='float32')
        self.shuffle_data(data[:, :-1], data[:, -1:], 0.8)
        self.target_names = ['별', '펄서']

    def visualize(self, xs, estimates, answers):
        for n in range(len(xs)):
            x, est, ans = xs[n], estimates[n], answers[n]
            xstr = vector_to_str(x, '%5.1f', 3)
            estr = self.target_names[int(round(est[0]))]
            astr = self.target_names[int(round(ans[0]))]
            rstr = '0'
            if estr != astr:  rstr = 'X'
            print(f'{xstr} => 추정 {estr}(확률 {est[0]:4.2f}) : 정답 {astr} => {rstr}')

In [437]:
class SteelDataset(Dataset):
    def __init__(self):
        super(SteelDataset, self).__init__('steel', 'select')

        rows, headers = load_csv('/content/faults.csv')

        data = np.asarray(rows, dtype='float32')
        self.shuffle_data(data[:, :-7], data[:, -7:], 0.8)

        self.target_names = headers[-7:]

    def visualize(self, xs, estimates, answers):
        show_select_results(estimates, answers, self.target_names)

In [438]:
class PulsarSelectDataset(Dataset):
    def __init__(self):
        super(PulsarSelectDataset, self).__init__('pulsarselect', 'select')

        rows, _ = load_csv('/content/pulsar_stars.csv')

        data = np.asarray(rows, dtype='float32')
        self.shuffle_data(data[:, :-1], onehot(data[:, -1], 2), 0.8)
        self.target_names = ['별', '펄서']

    def visualize(self, xs, estimates, answers):
        show_select_results(estimates, answers, self.target_names)

In [439]:
class FlowersDataset(Dataset):
    pass

In [440]:
def flowers_init(self, resolution=[100, 100], input_shape=[-1]):
    super(FlowersDataset, self).__init__('flowers', 'select')

    path = '/content/flower'
    self.target_names = list_dir(path)

    images = []
    idxs = []

    for dx, dname in enumerate(self.target_names):
        subpath = path + '/' + dname
        filenames = list_dir(subpath)
        for fname in filenames:
            if fname[-4:] != '.jpg':
                continue
            imagepath = os.path.join(subpath, fname)
            pixels = load_image_pixels(imagepath, resolution, input_shape)
            images.append(pixels)
            idxs.append(dx)

    self.image_shape = resolution + [3]

    xs = np.asarray(images, np.float32)
    ys = onehot(idxs, len(self.target_names))

    self.shuffle_data(xs, ys, 0.8)

In [441]:
FlowersDataset.__init__ = flowers_init

In [442]:
def flowers_visualize(self, xs, estimates, answers):
    draw_images_horz(xs, self.image_shape)
    show_select_results(estimates, answers, self.target_names)

FlowersDataset.visualize = flowers_visualize

In [443]:
ad = AbaloneDataset()
am = MlpModel('abalone_model', ad, [])
am.exec_all(epoch_count=10, report=2)

Model abalone_model train started:
    Epoch 2: cost=8.191, accuracy=0.733/0.711 (1/1 secs)
    Epoch 4: cost=7.392, accuracy=0.744/0.694 (0/1 secs)
    Epoch 6: cost=7.247, accuracy=0.747/0.737 (0/1 secs)
    Epoch 8: cost=7.139, accuracy=0.748/0.725 (0/1 secs)
    Epoch 10: cost=7.054, accuracy=0.751/0.746 (0/1 secs)
Model abalone_model trian ended in 1 secs:
Model abalone_model test report: accuracy = 0.727, (0 secs)

Model abalone_model Visualization
[0.00,0.00,1.00,0.70,0.53,0.17,1.56,0.61,0.39,0.44] => 추정 12.9 : 정답 10.0
[0.00,1.00,0.00,0.46,0.38,0.12,0.48,0.22,0.10,0.17] => 추정  8.7 : 정답  7.0
[0.00,0.00,1.00,0.58,0.45,0.14,1.14,0.56,0.22,0.29] => 추정 11.3 : 정답  8.0


In [444]:
pd = PulsarDataset()
pm = MlpModel('pulsar_model', pd, [4])
pm.exec_all(epoch_count=10, report=2)

Model pulsar_model train started:
    Epoch 2: cost=0.110, accuracy=0.964/0.970 (0/0 secs)
    Epoch 4: cost=0.098, accuracy=0.969/1.000 (1/1 secs)
    Epoch 6: cost=0.096, accuracy=0.970/0.980 (1/2 secs)
    Epoch 8: cost=0.094, accuracy=0.971/0.990 (0/2 secs)
    Epoch 10: cost=0.093, accuracy=0.972/0.970 (1/3 secs)
Model pulsar_model trian ended in 3 secs:
Model pulsar_model test report: accuracy = 0.969, (0 secs)

Model pulsar_model Visualization
[122.7, 41.7,  0.4,...] => 추정 별(확률 0.05) : 정답 별 => 0
[ 83.1, 42.2,  0.9,...] => 추정 별(확률 0.06) : 정답 별 => 0
[119.6, 47.2,  0.2,...] => 추정 별(확률 0.00) : 정답 별 => 0


In [445]:
pm.visualize(5)

Model pulsar_model Visualization
[103.4, 50.0,  0.6,...] => 추정 별(확률 0.01) : 정답 별 => 0
[ 93.9, 40.4,  0.2,...] => 추정 별(확률 0.05) : 정답 별 => 0
[122.4, 46.0,  0.3,...] => 추정 별(확률 0.00) : 정답 별 => 0
[ 86.4, 46.7,  1.4,...] => 추정 별(확률 0.40) : 정답 펄서 => X
[ 94.2, 45.4,  0.3,...] => 추정 별(확률 0.01) : 정답 별 => 0
