In [10]:
#!/opt/anaconda3/bin/python
# https://machinelearningmastery.com/implement-backpropagation-algorithm-scratch-python/

import numpy as np
import random

from csv import reader
from math import exp
from random import randrange, shuffle
from libitmal import utils_v2 as itmalutils

def load_csv_data(filename,shuffle=True,verbose=False):
    # Load a CSV file
    def load_csv(filename):
        dataset = list()
        with open(filename, 'r') as file:
            csv_reader = reader(file)
            for row in csv_reader:
                if not row:
                    continue
                dataset.append(row)
        return dataset

    # Convert string column to float
    def str_column_to_float(dataset, column):
        for row in dataset:
            #print("row=",row,", column=",column)    
            row[column] = float(row[column].strip())
            itmalutils.CheckFloat(row[column])

    # Convert string column to integer
    def str_column_to_int(dataset, column):
        class_values = [row[column] for row in dataset]
        unique = set(class_values)
        lookup = dict()
        for i, value in enumerate(unique):
            lookup[value] = i
        for row in dataset:
            row[column] = lookup[row[column]]
        return lookup

    # Find the min and max values for each column
    def dataset_minmax(dataset):
        minmax = list()
        stats = [[min(column), max(column)] for column in zip(*dataset)]
        return stats

    # Rescale dataset columns to the range 0-1
    def normalize_dataset(dataset, minmax):
        for row in dataset:
            for i in range(len(row)-1):
                row[i] = (row[i] - minmax[i][0]) / (minmax[i][1] - minmax[i][0])

    # load and prepare data
    dataset = load_csv(filename)
    for i in range(len(dataset[0])-1):
        str_column_to_float(dataset, i)
        
    # convert class column to integers
    str_column_to_int(dataset, len(dataset[0])-1)
    # normalize input variables
    minmax = dataset_minmax(dataset)
    normalize_dataset(dataset, minmax)
    if shuffle:
        dataset = shuffle_dataset(dataset)
    
    for i in dataset:
        for j in i:
            itmalutils.CheckFloat(j,checkrange=True,xmin=0,xmax=1)

    if verbose:
        print("type(dataset)=",type(dataset),", dataset shape=",[len(dataset), len(dataset[0])])
        print("dataset[0]=", itmalutils.ListToVector(dataset[0]))
        print("dataset[1]=", itmalutils.ListToVector(dataset[1]))
        print("dataset[-1]=",itmalutils.ListToVector(dataset[-1]))
    
    return dataset

# Evaluate an algorithm using a cross validation split
def cross_validate_evaluate_algorithm(dataset, algorithm, n_folds, *args):
    print("cross_validate_evaluate_algorithm()...")
    folds = cross_validation_split(dataset, n_folds)
    scores = list()
    k=1
    for fold in folds:
        print(f"  k-fold={k}/{len(folds)}")
        k += 1
        train_set = list(folds)
        train_set.remove(fold)
        train_set = sum(train_set, [])
        test_set = list()
        for row in fold:
            row_copy = list(row)
            test_set.append(row_copy)
            row_copy[-1] = None
            
        predicted = algorithm(train_set, test_set, *args)
        actual = [row[-1] for row in fold]
        
        accuracy = accuracy_metric(actual, predicted)
        scores.append(accuracy) 
        print("    accuracy=",accuracy)
    return scores

print("OK")

OK


In [13]:
# A Neural Network in Python, three layer MLP with activationfunction and BProp

# NOTE: transfer() and transfer_derivative() defined here, on outer level, to allow for later modification
# Transfer neuron activation, 
def transfer(z):
    # a(z) = 1/(1+exp(-z))
    return 1.0 / (1.0 + exp(-z)) 

# Calculate the derivative of an neuron output
def transfer_derivative(output):
    # for a(z) = 1/(1+exp(-z))
    #  a'(z) = d(a(z)) / dz = exp(-z)/ ((1+exp(-z)+1)^2) 
    #        = a(z)*(1-a(z)) 
    # NOTE: no need to recalc anything, just use a(z)/output to 
    #       fast find the deriverty!
    # [https://en.wikipedia.org/wiki/Backpropagation]
    #
    return output * (1.0 - output) 

# Forward propagate input to a network output
def forward_propagate(network, row):
    
    # Calculate neuron activation for an input
    def activate(weights, inputs):
        activation = weights[-1]
        for i in range(len(weights)-1):
            activation += weights[i] * inputs[i]
        return activation

    inputs = row
    for layer in network:
        new_inputs = []
        for neuron in layer:
            activation = activate(neuron['weights'], inputs)
            neuron['output'] = transfer(activation)
            new_inputs.append(neuron['output'])
        inputs = new_inputs
    return inputs

# Train a network for a fixed number of epochs
def train_network(network, train, l_rate, n_epoch, n_outputs):
    
    # Backpropagate error and store in neurons
    def backward_propagate_error(network, expected):
        for i in reversed(range(len(network))):
            layer = network[i]
            errors = list()
            if i != len(network)-1:
                for j in range(len(layer)):
                    error = 0.0
                    for neuron in network[i + 1]:
                        error += (neuron['weights'][j] * neuron['delta'])
                    errors.append(error)
            else:
                for j in range(len(layer)):
                    neuron = layer[j]
                    errors.append(expected[j] - neuron['output'])
            for j in range(len(layer)):
                neuron = layer[j]
                neuron['delta'] = errors[j] * transfer_derivative(neuron['output'])
                
    # Update network weights with error
    def update_weights(network, row, l_rate):
        assert(l_rate >= 0)
        for i in range(len(network)):
            inputs = row[:-1]
            if i != 0:
                inputs = [neuron['output'] for neuron in network[i - 1]]
            for neuron in network[i]:
                for j in range(len(inputs)):
                    neuron['weights'][j] += l_rate * neuron['delta'] * inputs[j]
                neuron['weights'][-1] += l_rate * neuron['delta']
    
    #print("  shape of train=",[len(train),len(train[0])])
    assert l_rate>0
    assert n_epoch>0
    assert len(train)>0
    assert len(train[0])>0
    assert n_outputs>0
    
    for epoch in range(n_epoch):
        for row in train:
            outputs = forward_propagate(network, row)
            # NOTE: the following is in effect a to_categorical() fun
            expected = [0 for i in range(n_outputs)]
            expected[row[-1]] = 1 
            #print("expected=",expected)
            
            backward_propagate_error(network, expected)
            update_weights(network, row, l_rate)

# Make a prediction with a network for a single row
def predict_row(network, row):
    outputs = forward_propagate(network, row)
    return outputs.index(max(outputs))

# Make a prediction with a network for a full dataset
def predict_network(network, test):
    predictions = list()
    for row in test:
        prediction = predict_row(network, row)
        predictions.append(prediction)
    return(predictions)

# Initialize a network
def initialize_network(n_inputs, n_hidden, n_outputs):
    #print("    init: n_inputs=",n_inputs,", n_hidden=",n_hidden,", n_outputs=",n_outputs)
    network = list()
    hidden_layer = [{'weights':[random.random() for i in range(n_inputs + 1)]} for i in range(n_hidden)]
    network.append(hidden_layer)
    output_layer = [{'weights':[random.random() for i in range(n_hidden + 1)]} for i in range(n_outputs)]
    network.append(output_layer)
    return network

# Backpropagation Algorithm With Stochastic Gradient Descent
def train_predict_backprop(train, test, l_rate, n_epoch, n_hidden): 
    n_inputs = len(train[0]) - 1
    n_outputs = len(set([row[-1] for row in train]))
    print("    back_propagation: n_inputs=",n_inputs,", n_hidden=",n_hidden," n_outputs=",n_outputs,", shapes(train,test)=",[len(train),len(train[0])],[len(test),len(test[0])])
    network = initialize_network(n_inputs, n_hidden, n_outputs)
    
    train_network(network, train, l_rate, n_epoch, n_outputs)
    return predict_network(network, test)

print("OK, NN ready for use...")

OK, NN ready for use...


In [14]:
# reset random
itmalutils.ResetRandom()

# Test Backprop on Seeds dataset
dataset = load_csv_data('Dat/seeds_dataset.csv',shuffle=True,verbose=False)

# evaluate algorithm
n_folds  = 5
l_rate   = 0.3
n_epoch  = 500
n_hidden = 5

scores = cross_validate_evaluate_algorithm(dataset, train_predict_backprop, n_folds, l_rate, n_epoch, n_hidden)
score_mean = sum(scores)/float(len(scores))

print("Scores: ",scores)
print(f"Mean Accuracy: {score_mean:1.3f}")

# Test, flippes btw. diff expected, values, fixed(?) via itmalutils.ResetRandom() fun
expected_score_mean=[.9589743589743588,.9538461538261537] # NOTE: random seems to flip between these, even seed is used!
                         
e0=itmalutils.InRange(score_mean, expected_score_mean[0]) 
e1=itmalutils.InRange(score_mean, expected_score_mean[1])
print("  DEBUG: e0=",e0,", e1=",e1)
assert e0 or e1
assert (e0 and not e1) or (not e0 and e1)

#expected_scores_x0=[ .9743589743589743, .9230769230769231,  .9487179487179487, .9743589743589743, .9487179487179487]
#expected_scores_x1=[ .9743589743589743, .9230769230769231,  .9743589743589743, .9743589743589743, .9487179487179487]
#itmalutils.AssertInRange(sorted(scores), sorted(expected_scores_x))

#expected_scores_0=[.923076923076923,  .9743589743589743, 1.0,                .9487179487179486, .9487179487179486]
#expected_scores_1=[.9743589743589743, .923076923076923 ,  .9487179487179486, .9743589743589743, .9487179487179486]
#if e0:
#    itmalutils.AssertInRange(sorted(scores), sorted(expected_scores_0))
#else:
#    itmalutils.AssertInRange(sorted(scores), sorted(expected_scores_1))

print("OK")

cross_validate_evaluate_algorithm()...
  k-fold=1/5
    back_propagation: n_inputs= 7 , n_hidden= 5  n_outputs= 3 , shapes(train,test)= [156, 8] [39, 8]
    accuracy= 0.9743589743589743
  k-fold=2/5
    back_propagation: n_inputs= 7 , n_hidden= 5  n_outputs= 3 , shapes(train,test)= [156, 8] [39, 8]
    accuracy= 0.9230769230769231
  k-fold=3/5
    back_propagation: n_inputs= 7 , n_hidden= 5  n_outputs= 3 , shapes(train,test)= [156, 8] [39, 8]
    accuracy= 0.9743589743589743
  k-fold=4/5
    back_propagation: n_inputs= 7 , n_hidden= 5  n_outputs= 3 , shapes(train,test)= [156, 8] [39, 8]
    accuracy= 0.9743589743589743
  k-fold=5/5
    back_propagation: n_inputs= 7 , n_hidden= 5  n_outputs= 3 , shapes(train,test)= [156, 8] [39, 8]
    accuracy= 0.9487179487179487
Scores:  [0.9743589743589743, 0.9230769230769231, 0.9743589743589743, 0.9743589743589743, 0.9487179487179487]
Mean Accuracy: 0.959
  DEBUG: e0= True , e1= False
OK


In [3]:
import keras

def FromCategorical(predict_c, y):
    assert predict_c.ndim==2 and predict_c.shape[1]>1
    predicted = np.argmax(predict_c, axis=1)
    assert y.shape==predicted.shape
    return predicted

X, y = itmalutils.DToXy(dataset)

num_classes = 3
n=X.shape[0]
d=X.shape[1]

# convert class vectors to binary class matrices
y_c = keras.utils.to_categorical(y, num_classes)
assert (np.argmax(y_c, axis=1)==y).all()

print("X.shape=",X.shape,", y.shape=",y.shape,", d=",d,", num_classes=",num_classes)
print("OK")

X.shape= (199, 7) , y.shape= (199,) , d= 7 , num_classes= 3
OK


In [4]:
from sklearn.base import BaseEstimator

class myMLPClassifier(BaseEstimator):
    def __init__(self, inputs=7, hidden=5, outputs=3, l_rate=0.3, n_epoch=500):
        self.n_inputs = inputs
        self.n_hidden = hidden
        self.n_outputs= outputs
        self.l_rate   = l_rate 
        self.n_epoch  = n_epoch
        self.fitted=False
        
        self.network=initialize_network(self.n_inputs, self.n_hidden, self.n_outputs)
    
    def fit(self, X, y):
        assert X.shape[1]==self.n_inputs
        assert y.ndim==1, "cannot handle multiclass/categorical y"
        
        print("do train0")
        # NOTE: convert X,y matrix/vector to dataset
        train = itmalutils.XyToD(X, y)     
        print("do train1")
        train_network(self.network, train, self.l_rate, self.n_epoch, self.n_outputs)
        self.fitted=True 
        
    def predict(self, X):
        assert self.fitted
        assert X.shape[1]==self.n_inputs
        
        # NOTE: pass X as np.array instead of list...works!
        predictions = predict_network(self.network, X)
        return itmalutils.ListToVector(predictions)

    def score(self, X, y):
        assert y.ndim==1, "cannot handle multiclass/categorical y"

        y_pred=self.predict(X)
        assert y_pred.shape==y.shape
        acc = accuracy_metric(y, y_pred) 
        return acc

    #def evaluate(self, X, y):
    #    return score(self, X, y)
    
    def __str__ (self, verbose=False):
        s = "myMLPClassifier:"
        s += "\n  fitted  =" + str(self.fitted)
        s += "\n  l_rate  =" + str(self.l_rate)
        s += "\n  n_epoch =" + str(self.n_epoch)
        s += "\n  input   =" + str(self.n_inputs)
        n=len(self.network)
        for i in range(n):
            s += "\n  layer[" + str(i) + "]=" + str(len(self.network[i]))
            if verbose:
                w = self.network[i]
                m = len(w)
                s += "\n    w(" + str(m) + ")="
                for j in range(m):
                    ww = w[j]['weights']
                    s += "\n      w[" + str(j) + "]: "
                    for k in range(len(ww)):
                        s += "%8.2f " % ww[k]
        return s
    
itmalutils.ResetRandom()

m = myMLPClassifier()
print(m.__str__(True))

m.fit(X, y)
p=m.predict(X)
score=m.score(X, y)

print(f"Score(/accuracy): {score:1.3f}")

expected_score=0.985
itmalutils.AssertInRange(score, expected_score, eps=1E-3)

print("OK")

myMLPClassifier:
  fitted  =False
  l_rate  =0.3
  n_epoch =500
  input   =7
  layer[0]=5
    w(5)=
      w[0]:     0.13     0.85     0.76     0.26     0.50     0.45     0.65     0.79 
      w[1]:     0.09     0.03     0.84     0.43     0.76     0.00     0.45     0.72 
      w[2]:     0.23     0.95     0.90     0.03     0.03     0.54     0.94     0.38 
      w[3]:     0.22     0.42     0.03     0.22     0.44     0.50     0.23     0.23 
      w[4]:     0.22     0.46     0.29     0.02     0.84     0.56     0.64     0.19 
  layer[1]=3
    w(3)=
      w[0]:     0.99     0.86     0.12     0.33     0.72     0.71 
      w[1]:     0.94     0.42     0.83     0.67     0.30     0.59 
      w[2]:     0.88     0.85     0.51     0.59     0.03     0.24 
do train0
do train1
Score(/accuracy): 0.975


AssertionError: x=0.9748743718592965 is not within the range [0.984;0.986] for eps=0.001, got eps=0.010125628140703502

In [None]:
# NOTE: CEF, unfinished from here!

print(X)
#sdf

print(y)
def myargmax(p):
    assert p.ndim==2
    n=p.shape[0]
    d=p.shape[1]
    am=np.empty([n])
    
    for i in range(n):
        m=p[i,0]
        mj=0
        for j in range(d):
            #print("i=",i,", j=",j,", p[i,j]=",p[i,j])
            if p[i,j]>m:
                m=p[i,j]
                mj=j
        #print(mj)
        am[i]=mj
    return am
#[[0.6713763  0.40817234 0.46002817]
# [0.68884546 0.4148609  0.4603176 ]
# [0.7057993  0.41240785 0.46892616]
# [0.6829971  0.40489745 0.4576759 ]
# [0.6907633  0.41742143 0.46148202] ...
print(myargmax(p))
print(p)

In [None]:
from libitmal import dataloaders_v3 as dataloaders

X, y = dataloaders.MNIST_GetDataSet(fetchmode=False)

X = X.reshape(70000, 784)
X = X/np.float32(255) # NOTE: remembered convert to float and scale 

#y = (y == 5)    

print(f"X.shape={X.shape}")
print(f"  type(X[0][0])={type(X[0][0])}")
print(f"  X.dtype={X.dtype}")
print(f"  np.max(X)={np.max(X)}")
print(f"  np.min(X)={np.min(X)}")
print(f"y.shape={y.shape}")
print(f"  np.max(y)={np.max(y)}")
print(f"  np.min(y)={np.min(y)}")

from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

m = myMLPClassifier(inputs=28*28, hidden=12, outputs=2)

m.fit(X, y)
p=m.predict(X)
score=m.score(X, y)

In [None]:
from sklearn.model_selection import cross_val_score

def RunCV(cvmodel, expected_score_mean, verbose=False):
    itmalutils.ResetRandom()

    print("CV: ",cvmodel.__str__(verbose))
    scores = cross_val_score(cvmodel, X, y, cv=n_folds)
    score_mean = scores.mean()

    print("  scores:",scores)
    print(f"  mean accuracy: {score_mean:1.3f}")

    if not itmalutils.InRange(score_mean, expected_score_mean):
        print(f"WARNING: not in range, score_mean={score_mean} and expected_score_mean={expected_score_mean}")
    else:
        print("CV: DONE")

RunCV(m, 0.9648717948717949, True) # was: 0.9597435897435898
print("OK")

In [None]:
from math import tanh, cos, acos, sin
from enum import Enum

class ActivationType(Enum):
    NONE   =-1
    SIGMOID=0 # or LOGIT
    RELU   =1
    TANH   =2
    COS    =3
    
g_actitype=ActivationType.NONE

def transfer(z):
    #t = str(type(activation))
    #assert t=="<class 'numpy.float64'>" or t=="<class 'float'>"
    #itmalutils.CheckFloat(activation)
        
    if g_actitype==ActivationType.SIGMOID:
        return 1.0 / (1.0 + exp(-z))
    elif g_actitype==ActivationType.RELU:
        return np.maximum(0,z)
    elif g_actitype==ActivationType.TANH:
        return tanh(z)
    elif g_actitype==ActivationType.COS:
        return cos(z)
    else:
        assert False, "wrong g_actitype value"

def transfer_derivative(output):
    #t = str(type(output))
    #assert t=="<class 'numpy.float64'>" or t=="<class 'float'>"
    #itmalutils.CheckFloat(output)
    
    if g_actitype==ActivationType.SIGMOID:
        return output * (1.0 - output)
    elif g_actitype==ActivationType.RELU:
        if output>0.0:
            return 1.0
        else:
            return 0.0
    elif g_actitype==ActivationType.TANH:
        # d(tanh(z))/dz = 1 - (tanh(z))^2
        return 1 - output*output
    elif g_actitype==ActivationType.COS:
        a=acos(output)
        return -sin(a)
    else:
        assert False, "wrong g_actitype value"

def PrintTransferAndDerivate(a=[-1, -0.1, 0, 1, 2, 10]):
    print("g_actitype=",g_actitype) 
    for i in a:
        output=transfer(i)
        dev=transfer_derivative(output)
        print(f"  transfer           (z={i:6.3f}) = {output:6.3f}")
        print(f"  transfer_derivative(z={i:6.3f}) = {dev:6.3f}")
    print("")
    
print("OK, new transfer funs ready for use..")

In [None]:
g_actitype = ActivationType.RELU 

print("Parameter search: l_rate=.. , g_actitype=",g_actitype)
PrintTransferAndDerivate()

m = myMLPClassifier(l_rate=0.2, n_epoch=500)    
RunCV(m, 0.3366666666666666)

m = myMLPClassifier(l_rate=0.1, n_epoch=500)    
RunCV(m, 0.3266666666666666)

m = myMLPClassifier(l_rate=0.05, n_epoch=500)    
RunCV(m, 0.31166666666666665)

m = myMLPClassifier(l_rate=0.01, n_epoch=500)    
RunCV(m, 0.9650000000000001)

m = myMLPClassifier(l_rate=0.005, n_epoch=500)    
RunCV(m, 0.9698717948717949)

m = myMLPClassifier(l_rate=0.001, n_epoch=500)    
RunCV(m, 0.9346153846153845)

print("OK")

In [None]:
l_rate_best=0.005

print("Parameter search: l_rate=",l_rate,", n_epoch=.. , g_actitype=",g_actitype)

m = myMLPClassifier(l_rate=l_rate_best, n_epoch=1)    
RunCV(m, 0.4625641025641025)

m = myMLPClassifier(l_rate=l_rate_best, n_epoch=2)    
RunCV(m, 0.5975641025641025)

m = myMLPClassifier(l_rate=l_rate_best, n_epoch=5)    
RunCV(m, 0.7584615384615384)

m = myMLPClassifier(l_rate=l_rate_best, n_epoch=10)    
RunCV(m, 0.8742307692307693)

m = myMLPClassifier(l_rate=l_rate_best, n_epoch=50)    
RunCV(m, 0.9194871794871796)

m = myMLPClassifier(l_rate=l_rate_best, n_epoch=100)    
RunCV(m, 0.9346153846153846)

m = myMLPClassifier(l_rate=l_rate_best, n_epoch=200)    
RunCV(m, 0.9697435897435897)

m = myMLPClassifier(l_rate=l_rate_best, n_epoch=500)    
RunCV(m, 0.9698717948717949)

m = myMLPClassifier(l_rate=l_rate_best, n_epoch=1000)    
RunCV(m, 0.9648717948717949)

# NOTE: takes too long
#m = myMLPClassifier(l_rate=l_rate_best, n_epoch=10000)    
#RunCV(m, 0.8894871794871795)

print("OK")

In [None]:
expected_scores=[0.7735897435897436, 0.9698717948717949, 0.9650000000000001, 0.5971794871794872]

k=0
for i in [ActivationType.SIGMOID, ActivationType.RELU, ActivationType.TANH, ActivationType.COS]:
    g_actitype = i 
    PrintTransferAndDerivate()

    n_epoch_best=500
    l_rate_best=0.005
    m = myMLPClassifier(l_rate_best, n_epoch_best)    
    RunCV(m, expected_scores[k]) # NOTE: ++k NOT c/c++ like!
    k += 1
    print("")

print("OK")