In [None]:
import numpy as np
%matplotlib inline
import matplotlib.pylab as plt
print("Numpy version: "+np.__version__)

## Generación de conjunto XOR

In [None]:
s = 0.5 # ruido
N = 500

X = [1.0, -1.0] + s*np.random.randn(N, 2)
tmp = [-1.0, 1.0] + s*np.random.randn(N, 2)
X = np.concatenate((X, tmp))
tmp = [1.0, 1.0] + s*np.random.randn(N, 2)
X = np.concatenate((X, tmp))
tmp = [-1.0, -1.0] + s*np.random.randn(N, 2)
X = np.concatenate((X, tmp))
X = (X - np.mean(X, axis=0))/np.std(X, axis=0)

labels = np.zeros(shape=(4*N,), dtype=int)
labels[2*N:] = 1.0

# Conjuntos de entrenamiento y prueba
train_p = 0.25
P = np.random.permutation(2*N)
index_train = np.concatenate((np.arange(0, 4*N)[labels==0][P[:int(train_p*2*N)]], 
                              np.arange(0, 4*N)[labels==1][P[:int(train_p*2*N)]]))
index_test = np.concatenate((np.arange(0, 4*N)[labels==0][P[int(train_p*2*N):]], 
                             np.arange(0, 4*N)[labels==1][P[int(train_p*2*N):]]))

print("%d samples for train and %d for testing" %(len(index_train), len(index_test)))
fig = plt.figure(figsize=(10, 6))
ax = fig.add_subplot(1, 1, 1)
ax.scatter(X[labels==0, 0], X[labels==0, 1], color='b', marker='o', label="Class 1", alpha=0.5)
ax.scatter(X[labels==1, 0], X[labels==1, 1], color='r', marker='o', label="Class 2", alpha=0.5)
ax.grid()
ax.set_xlabel('X1')
ax.set_ylabel('X2')
_ = plt.legend()

## Implementación MLP con numpy

In [None]:
def sigmoid(x):
    return 1.0/(1.0 + np.exp(-x))

class Layer:
    def __init__(self, N_input, N_neurons, activation="tanh"):        
        self.N_input = N_input
        self.N_neurons = N_neurons
        self.activation = activation
        self.W = np.zeros(shape=(N_input, N_neurons))
        self.b = np.zeros(shape=(N_neurons, ))
        self.initialize_parameters()  
        self.z = np.zeros(shape=(N_neurons))
        self.y = np.zeros(shape=(N_neurons))
        
    def initialize_parameters(self):
        if self.activation == "tanh":
            self.W = np.array(np.random.uniform(low=-np.sqrt(6.0/(self.N_input + self.N_neurons)),
                                                high=np.sqrt(6.0/(self.N_input + self.N_neurons)),
                                                size=(self.N_input, self.N_neurons)))
        else: 
            self.W = np.array(np.zeros(shape=(self.N_input, self.N_neurons)))
        self.b = np.zeros(shape=(self.N_neurons, ))
        
    def output(self, x):  #  x is an array N_input x 1
        self.z = np.dot(x, self.W) + self.b
        if self.activation == "tanh":
            self.y = np.tanh(self.z)
        elif self.activation == "sigmoid":
            self.y = sigmoid(self.z)
        return self.y
    
class MLP:
    def __init__(self, N_input, N_hidden, N_output):
        self.layers = []
        self.N_output = N_output
        self.output_activation = "sigmoid"
        if N_hidden > 0:
            self.layers.append(Layer(N_input, N_hidden))
            self.layers.append(Layer(N_hidden, N_output, activation="sigmoid"))
        else:
            self.layers.append(Layer(N_input, N_output, activation="sigmoid"))
            
    def predict_proba(self, x):
        for layer in self.layers:
            x = layer.output(x)
        return x
        
    def predict(self, x):
        x = self.predict_proba(x)
        if self.N_output > 1:
            return np.argmax(x, axis=1)
        else:
            if self.output_activation is "sigmoid":
                return x > 0.5
            elif self.output_activation is "tanh":
                return x > 0
        
    def reset(self):
        for layer in self.layers:
            layer.initialize_parameters()
    
    def mse(self, x, y):
        e = self.predict_proba(x)[:,0] - y
        return np.mean(np.power(e, 2.0))

In [None]:
class Trainer:
    def __init__(self, classifier, learning_rate=1.0, max_epoch=1000):
        self.classifier = classifier
        self.lr = learning_rate
        self.train_cost = np.zeros(shape=(max_epoch,))
        self.test_cost = np.zeros(shape=(max_epoch, ))   
        self.max_epoch = max_epoch
        self.epoch = 0

    def update(self, data_train, label_train, data_test, label_test, batch_size=32):
        if self.epoch == 0:  
            self.train_cost[0] =  self.classifier.mse(data_train, label_train) 
            self.test_cost[0] =  self.classifier.mse(data_test, label_test) 
        # Present mini-batches in different order
        else:        
            rand_perm = np.random.permutation(len(label_train))
            data_train = data_train[rand_perm, :].copy()
            label_train = label_train[rand_perm].copy()
            self.minibach_eval(data_train, label_train, batch_size)
            self.train_cost[self.epoch] = self.classifier.mse(data_train, label_train) 
            self.test_cost[self.epoch] =  self.classifier.mse(data_test, label_test)               
        self.epoch += 1
    
    def reset(self):
        self.classifier.reset()        

    def minibach_eval(self, data, labels, batch_size=32):
        averaged_cost = 0.0
        N = len(labels)
        for Nbatches, (start, end) in enumerate(zip(range(0, N, batch_size), range(batch_size, N+1, batch_size))):
            self.train(data[start:end], labels[start:end])
        #return averaged_cost/Nbatches
    
    def train(self, data, labels):
        # do training
        N = len(labels)
        error = (self.classifier.predict_proba(data)[:,0] - labels)[:, np.newaxis]
        Nlayer = len(self.classifier.layers)
        # outer layer with one neuron
        layer_idx = Nlayer-1 
        out_layer = self.classifier.layers[layer_idx]
        tmp = np.multiply(error, sigmoid(out_layer.z)*(1.0-sigmoid(out_layer.z)))
        grad_b = np.mean(tmp, axis=0)
        if Nlayer == 1:
            grad_w = np.mean(np.multiply(np.tile(tmp, (1, out_layer.N_input)), data), axis=0)
        else:
            hidden_layer = self.classifier.layers[layer_idx-1]
            #print(hidden_layer.y.shape)
            #print(tmp.shape)
            grad_w = np.mean(np.multiply(np.tile(tmp, (1, out_layer.N_input)), hidden_layer.y), axis=0) 
            #print(grad_w.shape)
        self.classifier.layers[layer_idx].W -= self.lr*grad_w[:, np.newaxis]
        self.classifier.layers[layer_idx].b -= self.lr*grad_b
         # hidden layer
        if Nlayer > 1:
            Nh = out_layer.N_input
            E = np.tile(error*sigmoid(out_layer.z)*(1.0-sigmoid(out_layer.z)), (1, Nh))
            tmp = np.multiply(np.multiply(E, 1.0-np.tanh(hidden_layer.z)**2), np.tile(out_layer.W.T, (N, 1)))
            grad_b = np.mean(tmp, axis=0)
            grad_w = np.dot(tmp.T, data).T
            #print(grad_w)
            self.classifier.layers[layer_idx-1].W -= self.lr*grad_w
            self.classifier.layers[layer_idx-1].b -= self.lr*grad_b

In [None]:
classifier = MLP(N_input=X.shape[1], N_hidden=5, N_output=1)
trainer = Trainer(classifier, learning_rate=0.5, max_epoch=1000)

Entrenamiento de la red en vivo 

Ejecuta el siguiente bloque varias veces para agregar épocas (CTRL+ENTER)

In [None]:
# Corre CTRL+ENTER
trainer.update(X[index_train, :], labels[index_train], 
              X[index_test, :], labels[index_test], batch_size=10)

pred_labels = classifier.predict(X)
fig = plt.figure(figsize=(14, 5))
ax = fig.add_subplot(1, 2, 1)
x_min, x_max = X[:, 0].min() - 0.5, X[:, 0].max() + 0.5
y_min, y_max = X[:, 1].min() - 0.5, X[:, 1].max() + 0.5
xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.05), np.arange(y_min, y_max, 0.05))
Z = classifier.predict(np.c_[xx.ravel(), yy.ravel()])[:,0]
Z = Z.reshape(xx.shape)
ax.contourf(xx, yy, Z, cmap=plt.cm.jet, alpha=0.5)
ax.scatter(X[labels==0, 0], X[labels==0, 1], color='k', marker='o', alpha=0.5)
ax.scatter(X[labels==1, 0], X[labels==1, 1], color='k', marker='x', alpha=0.5)
ax = fig.add_subplot(1, 2, 2)
ax.plot(np.arange(0, trainer.epoch, step=1), trainer.train_cost[:trainer.epoch], 'b-o', label="train MSE")
ax.plot(np.arange(0, trainer.epoch, step=1), trainer.test_cost[:trainer.epoch], 'g-o', label="test MSE")
plt.legend()
plt.grid()
plt.tight_layout()

## Implementación MLP con tensorflow

In [None]:
import tensorflow as tf
print("Tensorflow version: "+tf.__version__)

Nh = 10 # Neuronas en la capa oculta
X_holder = tf.placeholder(tf.float32, [None, 2])
T_holder = tf.placeholder(tf.float32, [None, 1])
# Armando la red
b_z = tf.Variable(tf.zeros([Nh]), name="z_b", dtype=tf.float32)
W_z = tf.Variable(tf.random_uniform([2, Nh], -1.0, 1.0), name="z_w", dtype=tf.float32)
z = tf.tanh(tf.matmul(X_holder, W_z) + b_z)
b_y = tf.Variable(tf.zeros([1]), name="y_b", dtype=tf.float32)
W_y = tf.Variable(tf.random_uniform([Nh, 1], -1.0, 1.0), name="y_w", dtype=tf.float32)
y = tf.add(tf.matmul(z, W_y), b_y)
# Función de costo y optimizador
cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=T_holder, logits=y)
loss = tf.reduce_mean(cross_entropy)  
optimizer = tf.train.AdamOptimizer(1e-2)
train = optimizer.minimize(loss) 
# Inicializando las variables y creando una sesión te tensorflow
init = tf.global_variables_initializer()
sess = tf.InteractiveSession()
sess.run(init)  

nepochs = 500
train_loss = np.zeros(shape=(nepochs))
test_loss = np.zeros(shape=(nepochs))
for i, epoch in enumerate(range(0, nepochs)):
    # Entrenamos una epoca
    _, train_loss[i] = sess.run([train, loss], feed_dict={X_holder: X[index_train, :], 
                                                     T_holder: np.reshape(labels[index_train], [-1, 1])})
    # Evaluamos en el conjunto de test
    pred_test, test_loss[i] = sess.run([y, loss], feed_dict={X_holder: X[index_test, :], 
                                                             T_holder: np.reshape(labels[index_test], [-1, 1])})
sess.close()

In [None]:
plt.plot(train_loss, label='Train')
plt.plot(test_loss, label='Test')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()

fig = plt.figure(figsize=(7, 5))
ax = fig.add_subplot(1, 1, 1)

ax.scatter(X[index_test, :][labels[index_test]==0, 0], 
           X[index_test, :][labels[index_test]==0, 1], 
           c=pred_test[labels[index_test]==0], marker='o', 
           alpha=0.5, vmin=0.0, vmax=1.0, cmap=plt.cm.RdBu_r)
sc = ax.scatter(X[index_test, :][labels[index_test]==1, 0], 
                X[index_test, :][labels[index_test]==1, 1], 
                c=pred_test[labels[index_test]==1], marker='x', 
                alpha=0.5, vmin=0.0, vmax=1.0, cmap=plt.cm.RdBu_r)
plt.colorbar(sc)
plt.tight_layout()

## Implementación MLP Bayesiana con pymc y theano


In [None]:
#http://docs.pymc.io/notebooks/bayesian_neural_network_advi.html

import pymc3 as pm
import theano
floatX = theano.config.floatX
import theano.tensor as tt
print("Theano version: "+theano.__version__)
print("Pymc3 version: "+pm.__version__)

ann_input = theano.shared(X[index_train, :])
ann_output = theano.shared(labels[index_train])

with pm.Model() as neural_network:
    # Create network (priors + posteriors)
    W_z = pm.Normal('wz', 0, sd=1, shape=(X.shape[1], Nh),
                    testval=np.random.randn(X.shape[1], Nh).astype(floatX))
    b_z = pm.Normal('bz', 0, sd=1, shape=(Nh, ), testval=np.random.randn(Nh).astype(floatX))
    W_y = pm.Normal('wy', 0, sd=1, shape=(Nh,), 
                    testval=np.random.randn(Nh).astype(floatX))
    b_y = pm.Normal('by', 0, sd=1, shape=(1,), testval=np.random.randn(1).astype(floatX))

    z = pm.math.tanh(pm.math.dot(ann_input, W_z) + b_z)
    y = pm.math.sigmoid(pm.math.dot(z, W_y) + b_y)
    y_rv = pm.Bernoulli('y', y, observed=ann_output)

In [None]:
with neural_network:
    # Run ADVI to train
    inference = pm.ADVI()
    approx = pm.fit(n=30000, method=inference)
    # Create a theano function to evaluate new samples
    x = tt.matrix('X')
    n = tt.iscalar('n')
    x.tag.test_value = np.empty_like(X[index_test, :])
    n.tag.test_value = 100
    _sample_proba = approx.sample_node(y_rv.distribution.p, size=n,
                                       more_replacements={ann_input: x})

    sample_proba = theano.function([x, n], _sample_proba)
    
# Predicción en el espacio de entrada
pred = sample_proba(X[index_test, :], 500).mean(0) > 0.5
grid = pm.floatX(np.mgrid[-3:3:100j,-3:3:100j])
grid_2d = grid.reshape(2, -1).T
dummy_out = np.ones(grid.shape[1], dtype=np.int8)
ppc = sample_proba(grid_2d, 500)
# Evolución del ELBO
plt.plot(inference.hist, alpha=.5)
plt.ylabel('ELBO')
plt.xlabel('iteration')

In [None]:
# Predicciones (media)
fig, ax = plt.subplots(figsize=(10, 6))
contour = ax.contourf(grid[0], grid[1], ppc.mean(axis=0).reshape(100, 100), 
                      vmin=0.0, vmax=1.0, cmap=plt.cm.RdBu_r, alpha=0.5)
ax.scatter(X[index_test, :][pred==0, 0], X[index_test, :][pred==0, 1], marker='o', color='k')
ax.scatter(X[index_test, :][pred==1, 0], X[index_test, :][pred==1, 1], marker='x', color='k')
cbar = plt.colorbar(contour, ax=ax)

In [None]:
# Predicciones (desviación estándar)
fig, ax = plt.subplots(figsize=(10, 6))
contour = ax.contourf(grid[0], grid[1], ppc.std(axis=0).reshape(100, 100), 
                      cmap=plt.cm.Purples, alpha=0.5)
ax.scatter(X[index_test, :][pred==0, 0], X[index_test, :][pred==0, 1], marker='o', color='k')
ax.scatter(X[index_test, :][pred==1, 0], X[index_test, :][pred==1, 1], marker='x', color='k')
cbar = plt.colorbar(contour, ax=ax)