In [None]:
import numpy as np
import h5py
import matplotlib.pyplot as plt
import testCases
from dnn_utils import sigmoid, sigmoid_backward,relu, relu_backward
import lr_utils

In [None]:
np.random.seed(1)

In [None]:
# 初始化参数
def initialize_parameters(n_x, n_h, n_y):
    w1 = np.random.randn(n_h, n_x) * 0.01
    w2 = np.random.randn(n_y, n_h) * 0.01
    b1 = np.zeros((n_h, 1))
    b2 = np.zeros((n_y, 1))
    
    assert(w1.shape == (n_h, n_x))
    parameters = {"W1":w1, "W2":w2, "b1":b1, "b2":b2}
    return parameters

In [None]:
# print("==============测试initialize_parameters==============")
# parameters = initialize_parameters(3,2,1)
# print("W1 = " + str(parameters["W1"]))
# print("b1 = " + str(parameters["b1"]))
# print("W2 = " + str(parameters["W2"]))
# print("b2 = " + str(parameters["b2"]))

In [None]:
# 初始化深层网络参数
def initialize_parameters_deep(layer_dims):
    """
    layer_dims:列表，每层神经元个数，包括输入层
    """
    for index in range(1, len(layer_dims)):
        parameters["W" + str(index)] = np.random.randn(layer_dims[index], layer_dims[index - 1]) / np.sqrt(layer_dims[index - 1])
        parameters["b" + str(index)] = np.zeros((layer_dims[index], 1))
    return parameters

In [None]:
# #测试initialize_parameters_deep
# print("==============测试initialize_parameters_deep==============")
# layers_dims = [5,4,3]
# parameters = initialize_parameters_deep(layers_dims)
# print("W1 = " + str(parameters["W1"]))
# print("b1 = " + str(parameters["b1"]))
# print("W2 = " + str(parameters["W2"]))
# print("b2 = " + str(parameters["b2"]))

In [None]:
def linear_forward(A, W, b):
#     print("w.shape:", W.shape)
#     print("A.shape:", A.shape)    
    Z = np.dot(W, A) + b
    assert(Z.shape == (W.shape[0], A.shape[1])) #考虑多个样本
    cache = (A, W, b) # A：上一层激活值，计算delta和dW,W:用来反向计算delta
    return Z, cache

In [None]:
# #测试linear_forward
# print("==============测试linear_forward==============")
# A,W,b = testCases.linear_forward_test_case()
# Z,linear_cache = linear_forward(A,W,b)
# print("Z = " + str(Z))

In [None]:
def linear_activation_forward(A_prev, W, b, activation):  # 前向传播最关键的函数
    """
    记录上一层的激活值A，本层的W、b、A。linear_cache:Al-1、W、b; activation_cache;Al
    """
    if activation == "sigmoid":
        Z, linear_cache = linear_forward(A_prev, W, b)
        A, activation_cache = sigmoid(Z)
    elif activation == "relu":
        Z, linear_cache = linear_forward(A_prev, W, b)
        A, activation_cache = relu(Z)
    cache = (linear_cache, activation_cache)
    return A, cache

In [None]:
# #测试linear_activation_forward
# print("==============测试linear_activation_forward==============")
# A_prev, W,b = testCases.linear_activation_forward_test_case()

# A, linear_activation_cache = linear_activation_forward(A_prev, W, b, activation = "sigmoid")
# print("sigmoid，A = " + str(A))

# A, linear_activation_cache = linear_activation_forward(A_prev, W, b, activation = "relu")
# print("ReLU，A = " + str(A))

In [None]:
# L层模型的前向传播
def L_model_forward(X, parameters):
#     print("X:", X[1:5, 1:5])
#     print("parameters", parameters["W1"][1:3,1:4])
#     """
#     参数决定了网络的形状,最终返回所有样本的输出值，和所有层的缓存
#     """
    caches = []    #列表，记录每层的linear_cache：Al-1、W、b和activation_cache：Al，每个元素是一个元组
    A = X
    num_layer = len(parameters) // 2
    # 1到num_layer - 1层都是relu激活
    for index in range(1, num_layer):
        A_prev = A
        A, cache = linear_activation_forward(A_prev, parameters["W" + str(index)], parameters["b" + str(index)], activation = "relu")
        caches.append(cache)
    # num_layer层sigmodi激活
    A_output, cache = linear_activation_forward(A, parameters["W" + str(num_layer)], parameters["b" + str(num_layer)], activation = "sigmoid")
    caches.append(cache)
    #assert(A_output.shape == (len(patameters["b" + str(num_layer)])), X.shape[1])
    return A_output, caches

In [None]:
#测试L_model_forward
print("==============测试L_model_forward==============")
X,parameters = testCases.L_model_forward_test_case()
AL,caches = L_model_forward(X,parameters)
print("AL = " + str(AL))
print("caches 的长度为 = " + str(len(caches)))

In [None]:
def compute_cost(A_output, Y):
    m = Y.shape[1]
    cost = -(1 / m) * (np.dot(Y, np.log(A_output.T)) + np.dot(1 - Y, np.log(1 - A_output.T)))
    cost = np.squeeze(cost)
    return cost

In [None]:
# #测试compute_cost
# print("==============测试compute_cost==============")
# Y,AL = testCases.compute_cost_test_case()
# print("cost = " + str(compute_cost(AL, Y)))


In [None]:
def linear_backward(dZ, linear_cache):    
    A_prev, W, b = linear_cache
    m = dZ.shape[1]  
    dW = (1 / m) * np.dot(dZ, A_prev.T)
    db = (1 / m) * np.sum(dZ, axis = 1, keepdims = True)
    dA_prev = np.dot(W.T, dZ)
    assert(dA_prev.shape == A_prev.shape)
    assert(dW.shape == W.shape)
    assert(db.shape == b.shape)
    return dA_prev, dW, db

In [None]:
# #测试linear_backward
# print("==============测试linear_backward==============")
# dZ, linear_cache = testCases.linear_backward_test_case()

# dA_prev, dW, db = linear_backward(dZ, linear_cache)
# print ("dA_prev = "+ str(dA_prev))
# print ("dW = " + str(dW))
# print ("db = " + str(db))

In [None]:
def linear_activation_backward(dA, cache, activation = "relu"):   # 后向传播最关键的函数,从这里入手写
    """
    最终要通过当前L层的dA和cache（W,b,A_prev,A）计算出上一层L - 1的dA_prev
    """
    linear_cache, activation_cache = cache #activation_cache实际上就是A
    if activation == "relu":
        dZ = relu_backward(dA, activation_cache)
        dA_prev, dW, db = linear_backward(dZ, linear_cache)
    elif activation == "sigmoid":
        dZ = sigmoid_backward(dA, activation_cache)
        dA_prev, dW, db = linear_backward(dZ, linear_cache)        
    return dA_prev, dW, db 

总结：
对于每层来说， 两个函数操作，一个是线性组合，一个激活。linear_cache存的是线性组合的缓存（偏导），包括上一层的激活值，本层的参数。activation存的是激活值的缓存（偏导）。cache包括这两个缓存值。使用linear_activation_backward(dA, cache, activation = "relu")函数，计算上一层激活值得偏导dA_prev,计算本层的dW、db，具体分为两步计算，dZ = relu_backward(dA, activation_cache)和dA_prev, dW, db = linear_backward(dZ, linear_cache)

In [None]:
# #测试linear_activation_backward
# print("==============测试linear_activation_backward==============")
# AL, linear_activation_cache = testCases.linear_activation_backward_test_case()

# dA_prev, dW, db = linear_activation_backward(AL, linear_activation_cache, activation = "sigmoid")
# print ("sigmoid:")
# print ("dA_prev = "+ str(dA_prev))
# print ("dW = " + str(dW))
# print ("db = " + str(db) + "\n")

# dA_prev, dW, db = linear_activation_backward(AL, linear_activation_cache, activation = "relu")
# print ("relu:")
# print ("dA_prev = "+ str(dA_prev))
# print ("dW = " + str(dW))
# print ("db = " + str(db))

In [None]:
def L_model_backward(A_output, Y, caches):
    """
    A_outpu:最终的输出，m列
    Y：m列
    caches：列表，共L个元素，每个代表某层的缓存（两部分）
    返回：grads列表，每层参数的偏导值，dA、dW、db
    """
    grads = {}
    num_layers = len(caches)
    
    #先计算最后一层
    dA_output = - (np.divide(Y, A_output) - np.divide(1 - Y, 1 - A_output))
    cache_output_layer = caches[num_layers - 1]
    dA_prev_layer, dW_output, db_output = linear_activation_backward(dA_output, cache_output_layer, activation = "sigmoid")#dA_prev_layer：倒数第二层激活值的偏导
    
    grads["dA" + str(num_layers)] = dA_output
    grads["dW" + str(num_layers)] = dW_output
    grads["db" + str(num_layers)] = db_output
    
    dA_index_layer = dA_prev_layer
    for index in range(1, num_layers):
        #计算当前层参数的偏导数和上一层激活值的偏导数
        dA_prev_layer, dW_index_layer, db_index_layer = linear_activation_backward(dA_index_layer, caches[num_layers - index - 1], activation = "relu")
        
        grads["dA" + str(num_layers - index)] = dA_index_layer
        grads["dW" + str(num_layers - index)] = dW_index_layer
        grads["db" + str(num_layers - index)] = db_index_layer
        
        dA_index_layer = dA_prev_layer
    return grads


    

In [None]:
#测试L_model_backward
print("==============测试L_model_backward==============")
AL, Y_assess, caches = testCases.L_model_backward_test_case()
grads = L_model_backward(AL, Y_assess, caches)
print ("dW1 = "+ str(grads["dW1"]))
print ("db1 = "+ str(grads["db1"]))
print ("dA1 = "+ str(grads["dA1"]))

In [None]:
def update_parameters(parameters, grads, learning_rate):
    num_params = len(parameters) // 2
    for index in range(1,num_params + 1):
        parameters["W" + str(index)] = parameters["W" + str(index)] - learning_rate * grads["dW" + str(index)]
        parameters["b" + str(index)] = parameters["b" + str(index)] - learning_rate * grads["db" + str(index)]
    return parameters

In [None]:
#测试update_parameters
print("==============测试update_parameters==============")
parameters, grads = testCases.update_parameters_test_case()
parameters = update_parameters(parameters, grads, 0.1)

print ("W1 = "+ str(parameters["W1"]))
print ("b1 = "+ str(parameters["b1"]))
print ("W2 = "+ str(parameters["W2"]))
print ("b2 = "+ str(parameters["b2"]))


In [None]:
#搭建两层神经网络
def two_layer_model(X,Y,layers_dims,learning_rate=0.0075,num_iterations=120,print_cost=False,isPlot=True):
    """
    丢入数据集、网络大小、学习率、迭代次数等，就能返回更新后的参数
    """
    np.random.seed(1)
    (n_x, n_h, n_y) = layers_dims
    grads = {}
    costs = []
    print('.........................................')
    parameters = initialize_parameters(n_x, n_h, n_y)
    
    print("train start...")
    for index in range(num_iterations):
        W1 = parameters["W1"]
        W2 = parameters["W2"]
        b1 = parameters["b1"]
        b2 = parameters["b2"]
        A1, cache1 = linear_activation_forward(X, W1, b1,activation = "relu")
        A_output, cache2 = linear_activation_forward(A1, W2, b2, activation = "sigmoid" )
        
        cost = compute_cost(A_output, Y)
        
        dA_output = - (np.divide(Y, A_output) - np.divide(1 - Y, 1 - A_output))
        dA_prev_layer, dW_output, db_output = linear_activation_backward(dA_output, cache2, activation = "sigmoid")
        dA_prev_layer, dW_1, db_1 = linear_activation_backward(dA_prev_layer, cache1, activation = "relu")
        
        grads["dW2"] = dW_output
        grads["dW1"] = dW_1
        grads["db1"] = db_1
        grads["db2"] = db_output
        
        parameters = update_parameters(parameters, grads, learning_rate) #更新参数
        
        if index % 100 == 0:
            costs.append(cost)
            if print_cost:
                print("Iteration: {} | cost: {}".format(index, np.squeeze(cost)))
    print("train end...")           
    if isPlot:
        plt.plot(np.squeeze(costs))
#         plt.plot(np.squeeze(costs))
        plt.ylabel('cost')
        plt.xlabel('iterations (per tens)')
        plt.title("Learning rate =" + str(learning_rate))
        plt.show()
    return parameters


In [None]:
# 加载数据
train_set_x_orig , train_set_y , test_set_x_orig , test_set_y , classes = lr_utils.load_dataset()

train_x_flatten = train_set_x_orig.reshape(train_set_x_orig.shape[0], -1).T 
test_x_flatten = test_set_x_orig.reshape(test_set_x_orig.shape[0], -1).T
print(train_x_flatten.shape)
train_x = train_x_flatten / 255.0
train_y = train_set_y
test_x = test_x_flatten / 255.0
test_y = test_set_y

In [None]:
layers_dims = (12288, 7, 1)
parameters = two_layer_model(train_x, train_y, layers_dims, num_iterations = 2500, print_cost=True,isPlot=True)

In [None]:
def predict(X, y, parameters):
    
    A_output, caches = L_model_forward(X, parameters)
    pred = np.zeros_like(A_output)
    
    for output_index in range(A_output.shape[1]):
        if A_output[0, output_index] > 0.5: 
            pred[0, output_index] = 1
        else:
            pred[0, output_index] = 0
    accuracy = float(np.sum(pred == y) / pred.shape[1])
    print('The accuracy is {}%'.format(accuracy * 100))
    return pred

In [None]:
predictions_train = predict(train_x, train_y, parameters) #训练集
predictions_test = predict(test_x, test_y, parameters) #测试集

In [None]:
def L_layer_model(X, Y, layers_dims, learning_rate=0.0075, num_iterations=3000, print_cost=False,isPlot=True):
    np.random.seed(1)
    parameters = initialize_parameters_deep(layers_dims)
    costs = []
    print('start train ...')
    for i in range(num_iterations):
        A_output, caches = L_model_forward(X, parameters)
        if i == 0:
            print('A_output:',A_output[0, 1:5])
        cost = compute_cost(A_output, Y)

        grads = L_model_backward(A_output, Y, caches)
        parameters = update_parameters(parameters, grads, learning_rate)
        if i % 100 == 0:
            costs.append(cost)
            if  print_cost:
                print('Iterations:{}\t| cost:{}'.format(i, cost))
    
    print('finish train ...')
    if isPlot:
        plt.plot(np.squeeze(costs))
        plt.ylabel('cost')
        plt.xlabel('iterations per 100')
        plt.title("Learning rate = " + str(learning_rate))
        plt.show
    return parameters


In [None]:
train_set_x_orig , train_set_y , test_set_x_orig , test_set_y , classes = lr_utils.load_dataset()

train_x_flatten = train_set_x_orig.reshape(train_set_x_orig.shape[0], -1).T 
test_x_flatten = test_set_x_orig.reshape(test_set_x_orig.shape[0], -1).T

train_x = train_x_flatten / 255
train_y = train_set_y
test_x = test_x_flatten / 255
test_y = test_set_y

layers_dims = [12288, 20, 7, 5, 1] #  5-layer model
parameters = L_layer_model(train_x, train_y, layers_dims, num_iterations = 2500, print_cost = True,isPlot=True)


In [None]:
pred_train = predict(train_x, train_y, parameters) #训练集
pred_test = predict(test_x, test_y, parameters) #测试集

In [None]:
def print_mislabeled_images(classes, X, y, p):
    """
    绘制预测和实际不同的图像。
        X - 数据集
        y - 实际的标签
        p - 预测
    """
    a = p + y
    mislabeled_indices = np.asarray(np.where(a == 1))
    plt.rcParams['figure.figsize'] = (40.0, 40.0) # set default size of plots
    num_images = len(mislabeled_indices[0])
    for i in range(num_images):
        index = mislabeled_indices[1][i]

        plt.subplot(2, num_images, i + 1)
        plt.imshow(X[:,index].reshape(64,64,3), interpolation='nearest')
        plt.axis('off')
        plt.title("Prediction: " + classes[int(p[0,index])].decode("utf-8") + " \n Class: " + classes[y[0,index]].decode("utf-8"))


print_mislabeled_images(classes, test_x, test_y, pred_test)

