In [1]:
import numpy as np
from .RELU_Layer import *
from .Tanh_Layer import *
from .pRELU_Layer import *
from .Output_Layer import *

actFuncs = {}
actFuncs['tanh'] = tanh
actFuncs['relu'] = RELU
actFuncs['prelu'] = pRELU
actFuncs['outlayer'] = outlayer

def softmax(h):
    phat = np.exp(h)/((np.sum(np.exp(h),axis = 1)).reshape(len(h),1))
    return phat

def sigmoid(h):
    return (1/(1 + np.exp(-h)))

def crossEntropy(p,y):
    if y.shape[1] <= 1:
        return -np.sum(np.multiply(y,np.log(p))+np.multiply((1-y),np.log(1-p)))/len(p)
    else:
        return -np.sum(np.multiply(y,np.log(p)))/len(p)

def sumOfSquares(yhat, y):
    if y.shape[1] <= 1:
        return np.sum((y-yhat).T@(y-yhat))/len(yhat)
    else:
        return np.sum(np.trace((y-yhat).T.dot(y-yhat)))/len(yhat)

ModuleNotFoundError: No module named '__main__.RELU_Layer'; '__main__' is not a package

In [None]:
class NeuralNetwork:
    
    def __init__(self, indims, outdims, layers=['RELU','RELU','RELU'], nodes=[6,6,6], method='vanilla', 
                 task='classification', cost=None):        
        self.indims=indims
        self.outdims=outdims
        self.method = method
        if task.lower() == 'classification':
            self.cost = crossEntropy
        elif task.lower() == 'regression':
            self.cost = sumOfSquares
        elif task.lower == 'custom':
            self.cost = cost
        else:
            print('Invalid cost function')
            return None
        self.layers = layers
        self.layers.append('outlayer')
        self.lcount = len(layers)
        self.nodes = [indims]+nodes
        self.to_run = []
        for i in range(lcount-1):
            self.to_run.append(actFuncs[layers[i].lower()](indims=nodes[i], nodes=nodes[i+1], 
                                                           method=self.method))
        self.to_run.append(actFuncs[layers[lcount-1]](indims=nodes[lcount-1], outdims=self.outdims,
                                                     method=self.method, task=self.task))
                                                          
        
        
        
    def split(self, X,Y, split=.8):
        data_index = np.array(range(0,len(X)))
        np.random.shuffle(data_index)
        sp_i = int(np.floor(len(X)*split))
        trn_i = data_index[0:sp_i]
        val_i = data_index[sp_i:]
        self.trn = np.take(X, trn_i, axis=0)
        self.ytrn = np.take(Y, trn_i, axis=0)
        self.val = np.take(X, val_i, axis=0)
        self.yval = np.take(Y, val_i, axis=0)        
    
    def everyday(self, X, Y):
        rng_state = numpy.random.get_state()
        numpy.random.shuffle(X)
        numpy.random.set_state(rng_state)
        numpy.random.shuffle(Y)
        
    def forwardprop(self, X):
        A = X
        for i in range(self.lcount):
            A = self.to_run[i].forward(A)
        self.yhat = A
        
    def backprop(self, LR, y):
        D=y
        if self.method=='vanilla':
            for i in range(self.lcount-1,-1,-1):
                D=self.to_run[i].backward(D, LR)
        
    def train(self, X, Y, LR=1e-5, optimizer=None, epochs=1000, vocal=False, split=.8, mu=.9, gamma=.9):
        self.X = X
        self.Y = Y
        self.mu = mu
        self.gamma = gamma
        self.LR = LR
        self.split(self.X, self.Y, split=split)
        self.Errors = []
        err_best = np.inf
        A = self.X
        for ep in range(epochs):
            self.forwardprop(self.trn)
            self.backprop(LR, self.ytrn)
            err = self.costFunc(self.yhat,self.yval)
            self.Errors.append(err)
            if vocal == True:
                print('Epoch: {} / Error: {}'.format(ep, err))
            if np.isnan(err) | np.isinf(err):
                break
            if err < err_best:
                self.Wbest = self.W
                self.Bbest = self.B
                err_best = err
    
    
    def batchTrain(self, LR=1e-6, epochs=1000, vocal=False, reinit=True, split=.8, scaleweights=True, 
                   ):
        self.LR = LR
        self.everyday(self.X, self.Y)
        self.split(self.X, self.Y, split=split)
        self.Errors = []
        if reinit == True:
            self.initializeWeightsScaled()
        for ep in range(epochs):
            for a in range(0,X.shape[0],steps):
                self.validate(X[a:steps+a,:])
                self.setDerivs(self.yhat, Y[a:steps+a,:])
                self.weightUpdate(X[a:steps+a,:], cof=momentum)
            self.validate(X)
            err = self.costFunc(self.yhat,Y)
            self.Errors.append(err)
            if vocal == True:
                print('Epoch: {} / Error: {}'.format(ep, err))
                                     
        
    def predict(self, test):
        self.Z = {}
        self.Z['0'] = self.activate(test@self.Wbest['0']+self.Bbest['0'])
        for i in range(1,self.layers-1):
            self.Z[str(i)] = self.activate(self.Z[str(i-1)]@self.Wbest[str(i)]+self.Bbest[str(i)])
        self.predictions=self.outputFunc(self.Z[str(self.layers-2)]@self.Wbest[str(self.layers-1)]+self.Bbest[str(self.layers-1)])


In [None]:
import matplotlib.pyplot as plt
#Min-max scaling
def normalizeMM(x,xtrn):
    newx = ((x - xtrn.min(0))/(xtrn.max(0)-xtrn.min(0)))
    return newx
    
def radius(x,r1,r2):
    delete = []
    for i in range(np.size(x,0)):
        if (np.sqrt(x[i,0]**2 + x[i,1]**2) < r1) | (r2 < np.sqrt(x[i,0]**2 + x[i,1]**2)):
            delete.append(i)
    xnew = np.delete(x, delete, 0)
    return xnew

def oneHotEncode(x):
    x = np.array(x)
    unique = np.unique(x)
    newx = np.zeros((x.shape[0], len(unique)))
    for i in range(x.shape[0]):
        for j in range(len(unique)):
            if x[i] == unique[j]:
                newx[i,j] = 1
    return newx

In [None]:
N = 300
x1 = np.random.randn(N,2)
x2 = np.random.randn(N*10,2)*4
x3 = np.random.randn(N*100,2)*8
x2 = radius(x2,4,8)
x3 = radius(x3,10,16)
np.random.shuffle(x2)
x2 = x2[0:300,]
np.random.shuffle(x3)
x3 = x3[0:300,]
labels1 = np.ones((N,1))
labels2 = np.zeros((N,1))
labels3 = np.repeat(2,N).reshape(N,1)
x1 = np.hstack((x1,labels1))
x2 = np.hstack((x2,labels2))
x3 = np.hstack((x3,labels3))
x_all = np.vstack((x1,x2,x3))
X = x_all[:,:-1]
Y = x_all[:,-1].reshape(len(x_all),1)
X = normalizeMM(X,X)
Y = oneHotEncode(Y)
plt.scatter(x1[:,0], x1[:,1], color="green", s=3)
plt.scatter(x2[:,0], x2[:,1], color="red", s=3)
plt.scatter(x3[:,0], x3[:,1], color="blue", s=3)
plt.show()

In [None]:
N = 1000
x0 = np.random.randn(N,2)+4
lab0 = np.zeros((N,1))
x1 = np.random.randn(N,2)
lab1 = np.ones((N,1))
X = np.vstack((x0,x1))
Y = np.vstack((lab0,lab1))
X = normalizeMM(X,X)
plt.scatter(x0[:,0],x0[:,1], s=2, alpha=.5, c="teal")
plt.scatter(x1[:,0],x1[:,1], s=2, alpha=.5, c="brown")
plt.show()

In [None]:
import time
def accuracy(p,y):
    if p.shape[1] == 1:
        yhat = np.rint(p)
    elif p.shape[1] > 1:
        yhat = (p == p.max(axis=1)[:,None]).astype(int)
    return np.mean(yhat == y)

In [None]:
NNr = NeuralNetwork(X,Y,nodes=[10,10], activation='RELU', task='custom', customtask=sigmoid, customcost=accuracy)

In [None]:
NNr.train(LRupdate=True, vocal=True, epochs=10000)

In [None]:
thingy = None

In [None]:
if thingy:
    print('hey!')