In [49]:
import copy

from TPNN.architecture.perceptron import*
import numpy as np
import numpy.random as rand
import plotly.express as px
import plotly.graph_objects as go

### Test net

In [50]:
net = Net()
net.insert_layer(0, 2, ident)
net.insert_layer(1, 5, sigmoid)
net.insert_layer(2, 1, ident)
print(net.layers_count)
net.init_weights(1, np.array([[1,1,1,1,1], [1,1,1,1,1]]))
net.init_weights(2, np.array([[1,1,1,1,1]]).transpose())
net.init_biases(2, np.array([[0]]))
net.init_biases(1, np.array([[1,1,1,1,1]]))

net.print_net_config()

3
layers=3
[0]
 size=2
 act_function=<function ident at 0x05F214F0>
 activations=[[0. 0.]]
 biases=[[0. 0.]]
 weights:
     empty
--------------------------
[1]
 size=5
 act_function=<function sigmoid at 0x0A2C37C0>
 activations=[[0. 0. 0. 0. 0.]]
 biases=[[1 1 1 1 1]]
 weights:
     |1 1 1 1 1 |
     |1 1 1 1 1 |

--------------------------
[2]
 size=1
 act_function=<function ident at 0x05F214F0>
 activations=[[0.]]
 biases=[[0]]
 weights:
     |1 |
     |1 |
     |1 |
     |1 |
     |1 |

--------------------------


In [51]:
net.calc_output(np.array([1, 1]))

array([[4.76287063]])

In [52]:
net = Net()
net.insert_layer(0, 5, ident)
net.insert_layer(1, 7, sigmoid)
net.insert_layer(2, 7, th)
net.insert_layer(3, 5, ident)

net.init_weights(1, np.array([[1,2,3,4,5,6,7],[1,2,3,4,5,6,7],[1,2,3,4,5,6,7],[1,2,3,4,5,6,7],[1,2,3,4,5,6,7]]))
net.init_weights(2, np.array([[1,2,3,4,5,6,7],[1,2,3,4,5,6,7],[1,2,3,-4,5,6,7],[1,2,3,4,5,6,7],[1,2,3,4,5,6,7],[1,2,3,4,5,6,7],[1,2,3,4,5,6,7]]))
net.init_weights(3, np.array([[1,1,0,1,1],[1,1,1,1,1],[1,1,1,0,1],[1,1,1,1,1],[1,1,1,1,1],[1,1,1,1,1],[1,1,1,1,1]]))
net.init_biases(3, np.array([[1,1,1,1,1]]))

net.print_net_config()

layers=4
[0]
 size=5
 act_function=<function ident at 0x05F214F0>
 activations=[[0. 0. 0. 0. 0.]]
 biases=[[0. 0. 0. 0. 0.]]
 weights:
     empty
--------------------------
[1]
 size=7
 act_function=<function sigmoid at 0x0A2C37C0>
 activations=[[0. 0. 0. 0. 0. 0. 0.]]
 biases=[[0. 0. 0. 0. 0. 0. 0.]]
 weights:
     |1 2 3 4 5 6 7 |
     |1 2 3 4 5 6 7 |
     |1 2 3 4 5 6 7 |
     |1 2 3 4 5 6 7 |
     |1 2 3 4 5 6 7 |

--------------------------
[2]
 size=7
 act_function=<function th at 0x0A2C3808>
 activations=[[0. 0. 0. 0. 0. 0. 0.]]
 biases=[[0. 0. 0. 0. 0. 0. 0.]]
 weights:
     |1 2 3 4 5 6 7 |
     |1 2 3 4 5 6 7 |
     |1 2 3 -4 5 6 7 |
     |1 2 3 4 5 6 7 |
     |1 2 3 4 5 6 7 |
     |1 2 3 4 5 6 7 |
     |1 2 3 4 5 6 7 |

--------------------------
[3]
 size=5
 act_function=<function ident at 0x05F214F0>
 activations=[[0. 0. 0. 0. 0.]]
 biases=[[1 1 1 1 1]]
 weights:
     |1 1 0 1 1 |
     |1 1 1 1 1 |
     |1 1 1 0 1 |
     |1 1 1 1 1 |
     |1 1 1 1 1 |
     |1 1 1 1 1 |
  

In [53]:
net.calc_output(np.array([1,2,0,2,1]))

array([[7.99999833, 7.99999833, 7.        , 6.99999833, 7.99999833]])

### Fill activation derivatives array

In [54]:
def calc_activations_derivatives(net: Net, target_vector):
    assert target_vector.shape[0] == 1

    last_layer = net.layers[net.layers_count - 1]
    activation_der_array = []
    last_layer_derivatives_array = []

    for i in range(last_layer.neuron_count):
        der = predict_error_der(last_layer.activations[0], target_vector, i)
        last_layer_derivatives_array.append(der)

    activation_der_array.insert(0, last_layer_derivatives_array)

    # cal derivatives on each layer, except last and first layers
    for i in range(net.layers_count - 2, 0, -1):
        cur_layer = net.layers[i]
        layer_derivatives_array = []

        for j in range(cur_layer.neuron_count):
            next_layer_der_array = activation_der_array[0]
            layer_derivatives_array.append(net.der_cost_act(i, j, next_layer_der_array))

        # add derivatives array of the current layer
        activation_der_array.insert(0, layer_derivatives_array)

    return activation_der_array

## Net training functions

In [55]:
def get_weight_gradient_matrix(net: Net, layer_idx, activation_der_array):
    assert 0 < layer_idx < net.layers_count
    prev_layer_neuron_count = net.layers[layer_idx - 1].neuron_count
    cur_layer_neuron_count = net.layers[layer_idx].neuron_count

    gradient_matrix = np.zeros((prev_layer_neuron_count, cur_layer_neuron_count))

    for i in range(prev_layer_neuron_count):
        for j in range(cur_layer_neuron_count):
            gradient_matrix[i][j] = net.der_cost_weigh(layer_idx, i, j, activation_der_array[layer_idx - 1][j])
    return gradient_matrix

def get_bias_gradient_matrix(net: Net, layer_idx, activation_der_array):
    assert 0 < layer_idx < net.layers_count
    cur_layer_neuron_count = net.layers[layer_idx].neuron_count
    gradient_vector = [np.zeros(cur_layer_neuron_count)]

    for j in range(cur_layer_neuron_count):
        gradient_vector[0][j] = net.der_cost_bias(layer_idx, j, activation_der_array[layer_idx - 1][j])

    return gradient_vector

def step(net: Net, optimizer, weight_grad_matrices, biases_grad_matrices):
    learning_rate = 0.001
    coefficient = optimizer.get_next_coefficient(weight_grad_matrices, biases_grad_matrices)

    l = lambda x: x * learning_rate * coefficient
    f = np.vectorize(l)

    for i in range(net.layers_count - 1):
        idx = i + 1
        cur_layer = net.layers[idx]
        cur_layer.weights -= f(weight_grad_matrices[idx - 1])
        cur_layer.biases -= f(biases_grad_matrices[idx - 1])
    # return gradient norm
    return get_norm(weight_grad_matrices, biases_grad_matrices)

def training_iteration(training_data_item, target_vector, net: Net, optimizer):
    net.calc_output(training_data_item)
    act_der_array = calc_activations_derivatives(net, target_vector)

    weight_grad_matrices = []
    biases_grad_matrices = []

    for i in range(net.layers_count - 1):
        idx = i + 1
        weight_grad_matrices.append(get_weight_gradient_matrix(net, idx, act_der_array))
        biases_grad_matrices.append(get_bias_gradient_matrix(net, idx, act_der_array))
    # change net parameters and return gradient norm
    return step(net, optimizer, weight_grad_matrices, biases_grad_matrices)

def calc_metric(net: Net, val_input_vectors, val_target_vectors):
    predict_errors = []

    idx = 0
    for target_vector in val_target_vectors:
        input_vector = val_input_vectors[idx]
        output = net.calc_output(input_vector)
        predict_errors = np.append(predict_errors, predict_error(output, target_vector))
        idx += 1

    return np.mean(predict_errors)

# net training on all training data items (returns metric on validation)
def training(net: Net, optimizer, gradient_change_history: list, add_grad_norm: bool, training_data, target_vectors, val_input_vectors, val_target_vectors):
    for training_item, target_vector in zip(training_data, target_vectors):
        grad_norm = training_iteration(training_item, target_vector, net, optimizer)

        if add_grad_norm:
            gradient_change_history.append(grad_norm)
        add_grad_norm = False

    return calc_metric(net, val_input_vectors, val_target_vectors)

# init weights and biases with start values
def init_net_parameters(net: Net):
    for i in range(net.layers_count - 1):
        idx = i + 1
        w_shape = net.layers[idx].weights.shape
        b_shape = net.layers[idx].biases.shape

        interval = (-0.8, 0.8)
        delta = interval[1] - interval[0]

        net.layers[idx].weights = rand.rand(w_shape[0], w_shape[1]) * delta + interval[0]
        net.layers[idx].biases = rand.rand(b_shape[0], b_shape[1]) * delta + interval[0]

## generation of training data

In [56]:
periods_count = 4
epsilon = 0.00001
st_points_count = 200

dst_dir = "../data_sets/"

In [57]:
training_data_file = open(dst_dir + "cos_values_data", "w")

# 1 - write extremum points
training_data_file.write(str(0) + " " + str(1) + "\n")

for i in range(periods_count - 1):
    x = (np.pi / 2) * (i + 1)

    val1 = np.cos(x)
    val2 = np.cos(-x)
    training_data_file.write(str(x) + " " + str(val1) + "\n" + str(-x) + " " + str(val2) + "\n")

# write other points
st_points = np.linspace(epsilon, np.pi / 2 - epsilon, st_points_count)

for st_point in st_points:
    for i in range(periods_count):
        x = st_point + (np.pi / 2) * i
        val1 = np.cos(x)
        val2 = np.cos(-x)
        training_data_file.write(str(x) + " " + str(val1) + "\n" + str(-x) + " " + str(val2) + "\n")

training_data_file.close()

## Load training data

In [58]:
training_data_file = open(dst_dir + "cos_values_data", "r")
training_data = np.array([])
target_vectors = np.array([])
lines = []
items_count = 0

# read lines and shuffle it
for line in training_data_file:
    lines.append(line)

print("before sorting: " + str(lines[0:5]))
lines = sorted(lines, key=lambda l: float(l.split()[0]))
print("after sorting: " + str(lines[0:5]))
print("-------------------------")

# get training data items and target vectors:
for line in lines:
    arg, value = line.split()
    arg = float(arg)
    value = float(value)

    training_data = np.append(training_data, arg)
    target_vectors = np.append(target_vectors, value)
    items_count += 1

# reshape data to array of 1-d vectors
training_data = training_data.reshape((items_count, 1))
target_vectors = target_vectors.reshape((items_count, 1))

print("first 5 training items: \n" + str(training_data[0:5]))
print("first 5 training target vectors: \n" + str(target_vectors[0:5]))

before sorting: ['0 1\n', '1.5707963267948966 6.123233995736766e-17\n', '-1.5707963267948966 6.123233995736766e-17\n', '3.141592653589793 -1.0\n', '-3.141592653589793 -1.0\n']
after sorting: ['-6.283175307179587 0.99999999995\n', '-6.275281958803732 0.9999687687047919\n', '-6.267388610427878 0.9998752347803469\n', '-6.259495262052025 0.9997194040042612\n', '-6.251601913676171 0.9995012860855126\n']
-------------------------
first 5 training items: 
[[-6.28317531]
 [-6.27528196]
 [-6.26738861]
 [-6.25949526]
 [-6.25160191]]
first 5 training target vectors: 
[[1.        ]
 [0.99996877]
 [0.99987523]
 [0.9997194 ]
 [0.99950129]]


## Split training data on batches

In [59]:
batch_size = 100
data_size = len(training_data)
batch_count = data_size // batch_size
rem = data_size % batch_size
training_data_batches = []
target_vectors_batches = []

if data_size % batch_size != 0:
    batch_count += 1

for i in range(batch_count):
    pos = i * batch_size
    tr_data_batch = []
    t_vectors_batch = []

    if i == batch_count - 1:
        tr_data_batch = training_data[pos:]
        t_vectors_batch = target_vectors[pos:]
    else:
        tr_data_batch = training_data[pos:pos + batch_size]
        t_vectors_batch = target_vectors[pos:pos + batch_size]
    training_data_batches.append(tr_data_batch)
    target_vectors_batches.append(t_vectors_batch)

print("batches' count: " + str(batch_count))
print("---------")
idx = 1
print("training data batch " + str(idx) + ":\n" + str(training_data_batches[idx]))
print("size=" + str(len(training_data_batches[idx])))

print("---------")
print("target vectors batch " + str(idx) + ":\n" + str(target_vectors_batches[idx]))
print("size=" + str(len(target_vectors_batches[idx])))

batches' count: 17
---------
training data batch 1:
[[-5.49384047]
 [-5.48594712]
 [-5.47805377]
 [-5.47016042]
 [-5.46226708]
 [-5.45437373]
 [-5.44648038]
 [-5.43858703]
 [-5.43069368]
 [-5.42280033]
 [-5.41490699]
 [-5.40701364]
 [-5.39912029]
 [-5.39122694]
 [-5.38333359]
 [-5.37544024]
 [-5.3675469 ]
 [-5.35965355]
 [-5.3517602 ]
 [-5.34386685]
 [-5.3359735 ]
 [-5.32808015]
 [-5.32018681]
 [-5.31229346]
 [-5.30440011]
 [-5.29650676]
 [-5.28861341]
 [-5.28072006]
 [-5.27282672]
 [-5.26493337]
 [-5.25704002]
 [-5.24914667]
 [-5.24125332]
 [-5.23335997]
 [-5.22546662]
 [-5.21757328]
 [-5.20967993]
 [-5.20178658]
 [-5.19389323]
 [-5.18599988]
 [-5.17810653]
 [-5.17021319]
 [-5.16231984]
 [-5.15442649]
 [-5.14653314]
 [-5.13863979]
 [-5.13074644]
 [-5.1228531 ]
 [-5.11495975]
 [-5.1070664 ]
 [-5.09917305]
 [-5.0912797 ]
 [-5.08338635]
 [-5.07549301]
 [-5.06759966]
 [-5.05970631]
 [-5.05181296]
 [-5.04391961]
 [-5.03602626]
 [-5.02813292]
 [-5.02023957]
 [-5.01234622]
 [-5.00445287]
 [-

# Create net and train it

In [60]:
print_flag = False
# net parameters
net = Net()
hidden_layers_size = 10
hidden_layers_count = 1
activation = th

net.insert_layer(0, 1, ident)

for i in range(hidden_layers_count):
    net.insert_layer(i + 1, hidden_layers_size, activation)
net.insert_layer(hidden_layers_count + 1, 1, ident)
#print(net.layers_count)

for i in range(hidden_layers_count):
    idx = i + 1
    cur_layer = net.layers[idx]

    if i == 0:
        cur_layer.weights = np.zeros((1, hidden_layers_size))
    else:
        cur_layer.weights = np.zeros((hidden_layers_size, hidden_layers_size))
    cur_layer.biases = np.zeros((1, hidden_layers_size))

net.init_weights(net.layers_count - 1, np.zeros((hidden_layers_size, 1)))
net.init_biases(net.layers_count - 1, np.zeros((1, 1)))

# init all net parameters with values from standart normal destribution
init_net_parameters(net)

if print_flag:
    net.print_net_config()

In [61]:
min_cost = 0.05
saved_nets = []
optimizer = Adam()
grad_change_history = []
cost_change_history = []

epoch_count = 500            ####
actual_epoch_count = 0


cur_min_cost = 100
result_net_cost_tuple = None

for epoch in range(epoch_count):
    actual_epoch_count += 1
    cost = 0
    add_grad_norm = True

    for i in range(batch_count):
        training_batch = training_data_batches[i]
        target_vectors_batch = target_vectors_batches[i]

        cost = training(net, optimizer, grad_change_history, add_grad_norm, training_batch, target_vectors_batch, training_data, target_vectors)

        if add_grad_norm:
            add_grad_norm = False

        # save net config with minimum cost
        if cost < cur_min_cost:
            result_net_cost_tuple = (copy.deepcopy(net), cost)
            cur_min_cost = cost

    print("epoch " + str(epoch) + ": " + str(cost))
    cost_change_history.append(cost)

    if cost <= min_cost:
        break

assert len(cost_change_history) == actual_epoch_count == len(grad_change_history)

epoch 0: 0.7050722035936405
epoch 1: 0.689033283169461
epoch 2: 0.6877309236960619
epoch 3: 0.6890168670998067
epoch 4: 0.6902688779782031
epoch 5: 0.6904410242039936
epoch 6: 0.6890316191225588
epoch 7: 0.6858356876579738
epoch 8: 0.6808212171729896
epoch 9: 0.6740544893038599
epoch 10: 0.6656190320211579
epoch 11: 0.6555223470841173
epoch 12: 0.6436197438087883
epoch 13: 0.6299603598223362
epoch 14: 0.6152121177675505
epoch 15: 0.6002655626570923
epoch 16: 0.5857890505429626
epoch 17: 0.5723077573837724
epoch 18: 0.5600550703829965
epoch 19: 0.5498407800405489
epoch 20: 0.5420618359810492
epoch 21: 0.5352919632483286
epoch 22: 0.5290146791298126
epoch 23: 0.5230234111525544
epoch 24: 0.5173498539814457
epoch 25: 0.5119445606973168
epoch 26: 0.5067084381653518
epoch 27: 0.5015526282410971
epoch 28: 0.4964198827463667
epoch 29: 0.4912730765128429
epoch 30: 0.48609954825936896
epoch 31: 0.48089651186525095
epoch 32: 0.4756712738554402
epoch 33: 0.4704400941942713
epoch 34: 0.46521881773

## Plot gradient change history

In [62]:
x = np.arange(1, actual_epoch_count + 1)
y = np.array(grad_change_history)
print("x size - " + str(len(x)))
print("y size - " + str(len(y)))

assert len(x) == len(y)

fig = px.line(x=x, y=y, labels={'x':'epoch', 'y':'grad norm'})
fig.show()

x size - 332
y size - 332


## Plot learning rate

In [63]:
x = np.arange(1, actual_epoch_count + 1)
y = np.array(cost_change_history)

fig = px.line(x=x, y=y, labels={'x':'epoch', 'y':'cost'})
fig.show()

## Get best net config

In [64]:
# check net cost function
result_net = result_net_cost_tuple[0]

print("metric of the best net - " + str(calc_metric(result_net, training_data, target_vectors)))

if print_flag:
    result_net.print_net_config()

metric of the best net - 0.042936342251122944


## Plot actual cos values and predicted cos values

In [65]:
comparison_graphic = go.Figure()

x = training_data.reshape((1, data_size))[0]
actual_y = np.cos(x)
predicted_y = []

for item in training_data:
    predicted_y.append(result_net.calc_output([item])[0][0])
predicted_y = np.array(predicted_y)

comparison_graphic.add_trace(go.Scatter(x=x, y=actual_y,
                    mode='lines',
                    name='actual graphic',))

comparison_graphic.add_trace(go.Scatter(x=x, y=predicted_y,
                    mode='lines',
                    name='predicted',))
comparison_graphic.show()

## Calculate R^2

In [66]:
actual_disp = np.std(actual_y)
predicted_disp = np.sum((predicted_y - actual_y) ** 2)

print("R^2=" + str(1 - predicted_disp / actual_disp))


R^2=-6.952933647157602
