<a href="https://colab.research.google.com/github/sam80402/mywebsite/blob/main/%E4%BA%BA%E5%B7%A5%E6%99%BA%E6%85%A7%E6%9C%9F%E4%B8%AD%E5%A0%B1%E5%91%8A_%E6%9C%80%E7%B5%82%E6%A8%A1%E5%9E%8B%E7%A8%8B%E5%BC%8F%E7%A2%BCModel1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Model 2(使用tensorflow版本)

### input train.csv / test.csv

In [None]:
import pandas as pd

data_train = pd.read_csv("fashion-mnist_train.csv")
data_test = pd.read_csv("fashion-mnist_test.csv")

In [None]:
data_train.head(2)

In [None]:
unique_labels = data_train["label"].unique()
unique_labels

In [None]:
data_train.iloc[:,1:].values

In [None]:
import numpy as np
np.reshape(data_train.iloc[:,1:].values[0], (28,28)).astype('float32') / 255.0

In [None]:
import numpy as np

# 希望將 data reshape成 (28*28)，並將資料限縮在[0~1]之間
def restructure_data(data):
    return np.reshape(data.values,(28,28)).astype('float32')/ 255.0

In [None]:
cat_map= {0 : "T-shirt/top",
          1 : "Trouser",
          2 : "Pullover",
          3 : "Dress",
          4 : "Coat",
          5 : "Sandal",
          6 : "Shirt",
          7 : "Sneaker",
          8 : "Bag",
          9 : "Ankle boo"}

In [None]:
# 將 labels對應到 classs' name
data_train["label"] = data_train["label"].map(lambda x : cat_map[x] )

In [None]:
data_train.head(2)

In [None]:
# one-hot encoding

dum = pd.get_dummies(data_train["label"])
dum.head(2)

In [None]:
len(dum) #總共有6000個 labels

In [None]:
dum['Ankle boo']

In [None]:
for col in dum.columns: # ['Ankle boo', 'Bag', 'Coat']...
    data_train[col] = dum[col]

In [None]:
data_train.columns[-10:]

In [None]:
# 原本的columns: "labels"(1) + "pixel 1~pixel 784"(784) + "10個dum.columns"(10) = 795
len(data_train.columns)

In [None]:
y = data_train.iloc[:,-10:].values 
y

In [None]:
# prepare data for training 

import tensorflow as tf
from tensorflow import keras


def gen():
    batch = data_train
    
    # x: input training picture / 把每一橫行打造成(28*28)後進行標準化
    x = batch.iloc[:,1:-10].apply(restructure_data,axis =1).values 
    
    # y: correct label
    y = batch.iloc[:,-10:].values 
    
    for x_item,y_item in zip(x,y):
        yield(x_item,y_item)

# 可以逐批讀取資料，不必一次將資料全部讀取放在記憶體
def get_input_data(batch_size = 10):
    return tf.data.Dataset.from_generator(gen,
                                          output_signature=(tf.TensorSpec(shape=(28, 28), dtype=tf.float32),
                                                            tf.TensorSpec(shape=(10), dtype=tf.float32))).shuffle(batch_size).batch(batch_size)

In [None]:
# input_shape 指出输入的形状(rows,cols,channels)
input_layer = tf.keras.layers.Input(shape=(28,28,1))

h = tf.keras.layers.Conv2D(filters=64, kernel_size=(3,3), activation='relu')(input_layer)

h = tf.keras.layers.MaxPool2D()(h)

h = tf.keras.layers.Conv2D(filters=64,kernel_size=(3,3),activation='relu')(h)

h = tf.keras.layers.MaxPool2D()(h)

h = tf.keras.layers.Flatten()(h)

output_layer = tf.keras.layers.Dense(10,activation= "softmax")(h)

In [None]:
model = tf.keras.Model(inputs=input_layer, outputs=output_layer)

optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits=True)

train_acc_metric = tf.keras.metrics.CategoricalAccuracy()

batch_size = 10

In [None]:
model.summary()

In [None]:
# %%bash
# mkdir -p tmp

In [None]:
# plot model graph
# tf.keras.utils.plot_model(model, to_file="./tmp/model.png", show_shapes=True,show_layer_names=False)

In [None]:
class KerasCustomModel(tf.keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

In [None]:
keras_custom_model = KerasCustomModel(input_layer,output_layer)

In [None]:
# 對模型進行設置，損失函數、優化器、指標

keras_custom_model.compile(optimizer="adam", 
                           loss="categorical_crossentropy", 
                           metrics=["accuracy"])

In [None]:
from datetime import datetime

# start_time = datetime.now()

# 訓練
keras_custom_model.fit(get_input_data(), epochs=10, verbose=1)

# end_time = datetime.now()
# time_elapsed = end_time - start_time

# Model1最終版本

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np


In [None]:
def load_mnist(path, kind='train'):
    import os
    import gzip
    import numpy as np

    """Load MNIST data from `path`"""
    labels_path = os.path.join(path,
                               '%s-labels-idx1-ubyte.gz'
                               % kind)
    images_path = os.path.join(path,
                               '%s-images-idx3-ubyte.gz'
                               % kind)

    with gzip.open(labels_path, 'rb') as lbpath:
        labels = np.frombuffer(lbpath.read(), dtype=np.uint8,
                               offset=8)

    with gzip.open(images_path, 'rb') as imgpath:
        images = np.frombuffer(imgpath.read(), dtype=np.uint8,
                               offset=16).reshape(len(labels), 784)

    return images, labels

In [None]:
X_train, y_train = load_mnist('drive/MyDrive/人工智慧導論/data/fashion', kind='train')
X_test, y_test = load_mnist('drive/MyDrive/人工智慧導論/data/fashion', kind='t10k')

# --輸入資料的標準化--
ave_trainX = np.average(X_train)
ave_testX = np.average(X_test)
std_trainX = np.std(X_train)
std_testX = np.std(X_test)
input_train = (X_train - ave_trainX) / std_trainX
input_test = (X_test - ave_testX) / std_testX

from keras.utils import np_utils
y_train_onehot = np_utils.to_categorical(y_train)
y_test_onehot = np_utils.to_categorical(y_test)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn import datasets

correct_train = y_train_onehot  # 正確答案
correct_test = y_test_onehot  #  正確答案

n_train = input_train.shape[0]  # 訓練資料的樣本數 (75)
n_test = input_test.shape[0]  # 測試資料的樣本數 (75)

# -- 各設定値 --
n_in = 784  # 輸入層的神經元數量
n_mid = 512  # 中間層的神經元數量
n_out = 10  # 輸出層的神經元數量

wb_width = 0.01  # 設定權重參數的初始值乘上 0.01
eta = 0.005  # 學習率 (eta)
epoch = 25
batch_size = 100
interval = 10

# -- 父類別 --
class BaseLayer:
  def __init__(self, n_upper, n):
    # print('權重:',np.random.randn(n_upper, n))
    # print('偏值:',np.random.randn(n))
    self.w = wb_width * np.random.randn(n_upper, n)  # 權重矩陣
    self.b = wb_width * np.random.randn(n)  # 偏值向量

    self.h_w = np.zeros((n_upper, n )) + 1e-8
    self.h_b = np.zeros(n) + 1e-8
        
  def update(self, eta):
    # self.w -= eta * self.grad_w
    # self.b -= eta * self.grad_b

    self.h_w += self.grad_w * self.grad_w
    self.w -= eta / np.sqrt(self.h_w) * self.grad_w

    self.h_b += self.grad_b * self.grad_b
    self.b -= eta / np.sqrt(self.h_b) * self.grad_b

# -- Dropout layer --
class Dropout:
  def __init__(self, dropout_ratio):
    self.dropout_ratio = dropout_ratio #丟棄率

  def forward( self, x, is_train): 
    if is_train:
      rand = np.random.rand(*x.shape) #亂數的矩陣
      self.dropout = np.where(rand > self.dropout_ratio, 1 ,0)
      self.y = x * self.dropout
    else:
      self.y = (1-self.dropout_ratio)*x

  def backward(self,grad_y):
    self.grad_x = grad_y * self.dropout


# -- 中間層 --
class MiddleLayer(BaseLayer):
  def forward(self, x):
    self.x = x
    self.u = np.dot(x, self.w) + self.b
    self.y = np.where(self.u <= 0, 0, self.u) # ReLU
    # self.y = np.where(self.u <= 0, 0.01*self.u, self.u) # Leaky ReLU
    # self.y = 1/(1+np.exp(-self.u) # Sigmoid 

  def backward(self, grad_y):
    # delta = grad_y * self.u(1- self.u)  # Sigmoid 的微分
    delta = grad_y * np.where(self.u <= 0, 0, 1)  # ReLU的微分
    # delta = grad_y * np.where(self.u <= 0, 0.01, 1)  # Leaky ReLU的微分
    self.grad_w = np.dot(self.x.T, delta)
    self.grad_b = np.sum(delta, axis=0)
    self.grad_x = np.dot(delta, self.w.T) 

# -- 輸出層 --
class OutputLayer(BaseLayer):     
  def forward(self, x):
    self.x = x
    u = np.dot(x, self.w) + self.b
    self.y = np.exp(u)/np.sum(np.exp(u), axis=1, keepdims=True)  # Softmax

  def backward(self, t):
    delta = self.y - t
    self.grad_w = np.dot(self.x.T, delta)
    self.grad_b = np.sum(delta, axis=0)
    self.grad_x = np.dot(delta, self.w.T) 

# -- 各層的實體化 --
middle_layer_1 = MiddleLayer(n_in, n_mid)
dropout_1 = Dropout(0.5)
middle_layer_2 = MiddleLayer(n_mid, n_mid)
dropout_2 = Dropout(0.5)
output_layer = OutputLayer(n_mid, n_out)

# -- 前向傳播 --
def forward_propagation(x,is_train):
  middle_layer_1.forward(x)
  dropout_1.forward(middle_layer_1.y ,is_train)
  middle_layer_2.forward(middle_layer_1.y)
  dropout_2.forward(middle_layer_2.y ,is_train)
  output_layer.forward(middle_layer_2.y)

# -- 反向傳播 --
def backpropagation(t):
  output_layer.backward(t)
  dropout_2.backward(output_layer.grad_x)
  middle_layer_2.backward(output_layer.grad_x)
  dropout_1.backward(middle_layer_2.grad_x)
  middle_layer_1.backward(middle_layer_2.grad_x)

# -- 修正權重參數 --
def update_wb():
  middle_layer_1.update(eta)
  middle_layer_2.update(eta)
  output_layer.update(eta)

# -- 計算誤差 --
def get_error(t, batch_size):
  return -np.sum(t * np.log(output_layer.y + 1e-7)) / batch_size  # 交叉熵誤差

# -- 開始訓練 --
# -- 記錄誤差用 --
train_error_x = []
train_error_y = []
test_error_x = []
test_error_y = []

# -- 記錄學習與進度--
n_batch = n_train // batch_size  # 每 1 epoch 的批次數量

for i in range(epoch):
  # -- 計算誤差 --  
  forward_propagation(input_train ,False)
  error_train = get_error(correct_train, n_train)
  forward_propagation(input_test, False)
  error_test = get_error(correct_test, n_test)
    
  # -- 記錄誤差 -- 
  test_error_x.append(i)
  test_error_y.append(error_test) 
  train_error_x.append(i)
  train_error_y.append(error_train) 
    
  # -- 顯示進度 -- 
  if i%interval == 0:
    print("Epoch:" + str(i) + "/" + str(epoch),
       "Error_train:" + str(error_train),
       "Error_test:" + str(error_test))
   
  # -- 訓練 -- 
  index_random = np.arange(n_train)
  np.random.shuffle(index_random)  # 索引洗牌
  for j in range(n_batch):
        
    # 取出小批次
    mb_index = index_random[j*batch_size : (j+1)*batch_size]
    x = input_train[mb_index, :]
    t = correct_train[mb_index, :]
        
    # 前向傳播與反向傳播
    forward_propagation(x ,True)
    backpropagation(t)
       
    # 更新權重與偏值
    update_wb()

# -- 以圖表顯示誤差記錄 -- 
plt.plot(train_error_x, train_error_y, label="Train")
plt.plot(test_error_x, test_error_y, label="Test")
plt.legend()

plt.xlabel("Epochs")
plt.ylabel("Error")
plt.show()

# -- 計算準確率 -- 
forward_propagation(input_train,True)
count_train = np.sum(np.argmax(output_layer.y, axis=1) == np.argmax(correct_train, axis=1))

forward_propagation(input_test,True)
count_test = np.sum(np.argmax(output_layer.y, axis=1) == np.argmax(correct_test, axis=1))

print("Accuracy Train:", str(count_train/n_train*100) + "%",
      "Accuracy Test:", str(count_test/n_test*100) + "%")


In [None]:
middle_layer_1,dropout_1,middle_layer_2,dropout_2,output_layer

In [None]:
class model():
  def __init__(self):
    self.input = None
    self.drop1 = None
    self.middle = None
    self.drop2 = None
    self.output = None
  
  def save(self,Input,drop1,middle,drop2,output):
    self.input = Input
    self.drop1 = drop1
    self.middle = middle
    self.drop2 = drop2
    self.output = output

    

In [None]:
Model = model()
Model.save(middle_layer_1,dropout_1,middle_layer_2,dropout_2,output_layer)
import pickle
with open('drive/MyDrive/人工智慧導論/Model.pickle', 'wb') as f:
  pickle.dump(Model,f)