In [None]:
import tensorflow as tf
import numpy as np
import sklearn.datasets
import matplotlib.pyplot as plt
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import accuracy_score

In [None]:
def get_nonlinear_data(size=3000, kind='circles', faces=4, factor=0.3, noise=0.2):
    if kind == 'circles':
        return sklearn.datasets.make_circles(n_samples=size, factor=factor, 
                                             noise=noise)
    
    if kind == 'moons':
        return sklearn.datasets.make_moons(n_samples=size, noise=noise)
    
    if kind == 'star':
        X = np.zeros((size, 2))
        Y = np.ones(size)
        theta = np.linspace(0, 2*np.pi, size)
        r = np.sin(theta*faces)
        X[:, 0] = r*np.cos(theta)
        X[:, 1] = r*np.sin(theta)
        Y = np.mod(np.floor(theta/(2*np.pi)*faces), 2)
        return X, Y
    
    if kind == 'swirly':
        # From http://cs231n.github.io/neural-networks-case-study/
        N = size # number of points per class
        D = 2 # dimensionality
        K = 2 # number of classes
        X = np.zeros((N*K,D)) # data matrix (each row = single example)
        Y = np.zeros(N*K, dtype='uint8') # class labels
        for j in range(K):
            ix = range(N*j,N*(j+1))
            r = np.linspace(0.0,1,N) # radius
            t = np.linspace(j*4,(j+1)*4,N) + np.random.randn(N)*0.2 # theta
            X[ix] = np.c_[r*np.sin(t), r*np.cos(t)]
            Y[ix] = j
        return X, Y
    
    raise ValueError(f"unknown kind {kind}")


In [None]:
X, labels = get_nonlinear_data(size=1000, kind='swirly', noise=0.2)
plt.scatter(X[:,0], X[:, 1], c=labels, alpha=0.3)

In [None]:
def get_model(activation='sigmoid', hidden_units=[20], 
              input_shape=(None, 2), output_shape=(None, 2)):
    
    activation_map = {'sigmoid': tf.sigmoid, 'relu': tf.nn.relu}
    
    input_X = tf.placeholder(shape=input_shape, 
                             name='input_X', 
                             dtype=tf.float32)
    
    output_Y = tf.placeholder(shape=output_shape, 
                              name='output_Y', 
                              dtype=tf.float32)
    
    curr_layer = input_X
    curr_size = input_shape[1]
    for l, size in enumerate(hidden_units):
        hidden_layer_W = tf.get_variable(name=f'HiddenLayerW{l}', 
                                         shape=(curr_size, size), 
                                         initializer=tf.random_normal_initializer(seed=0))
        
        hidden_layer_b = tf.get_variable(name=f'HiddenLayerB{l}', 
                                         shape=(1, size), 
                                         initializer=tf.random_normal_initializer(seed=0))
        
        curr_layer = activation_map[activation](tf.matmul(curr_layer, hidden_layer_W) 
                                                    + hidden_layer_b)
        curr_size = size
    
    softmax_layer_W = tf.get_variable(name='SoftmaxLayerW', 
                                      shape=(curr_size, output_shape[1]),
                                      initializer=tf.random_normal_initializer(seed=0))
    
    softmax_layer_b = tf.get_variable(name='SoftmaxLayerB', shape=(1, output_shape[1]),
                                     initializer=tf.random_normal_initializer(seed=0))
    
    curr_layer = tf.nn.softmax(tf.matmul(curr_layer, softmax_layer_W) 
                               + softmax_layer_b)
    
    return input_X, output_Y, curr_layer
        

In [None]:
input_X, output_Y, curr_layer = get_model()

In [None]:
loss = -tf.reduce_mean(tf.reduce_mean(output_Y*tf.log(curr_layer), 
                       reduction_indices=1))

In [None]:
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.5).minimize(loss)

In [None]:
def batchify(x, y, size=100):
    counter = 0
    while counter < x.shape[0]:
        yield x[counter:counter+size, :], y[counter:counter+size, :]
        counter += size

In [None]:
Y = OneHotEncoder().fit_transform(labels.reshape(len(labels), 1)).toarray()

shuffle_index = np.arange(X.shape[0])
np.random.shuffle(shuffle_index)

X = X[shuffle_index, :]
Y = Y[shuffle_index, :]

labels = labels[shuffle_index]

EPOCH = 10000
BATCH_SIZE = 200
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    for epoch in range(EPOCH):
        for x_batch, y_batch in batchify(X, Y, size=BATCH_SIZE):
            loss_val, _ = sess.run([loss, optimizer], 
                                   feed_dict={input_X: x_batch, output_Y: y_batch})
        
        if epoch % 1000 == 0:
            print("epoch {} batch loss {}".format(epoch, loss_val), end='')
            loss_val, class_probs = sess.run([loss, curr_layer], 
                                             feed_dict={input_X: X, output_Y: Y})
            
            pred_label = np.argmax(class_probs, axis=1)
            acc = accuracy_score(labels, pred_label)
            print(" total loss {} accuray {}".format(loss_val, acc))
            
    print("last batch loss {}".format(loss_val), end='')
    
    loss_val, class_probs = sess.run([loss, curr_layer], 
                                     feed_dict={input_X: X, output_Y: Y})
    
    pred_label = np.argmax(class_probs, axis=1)
    acc = accuracy_score(labels, pred_label)
    print(" total loss {} accuray {}".format(loss_val, acc))

In [None]:
probs = np.squeeze(class_probs[:, 1])
plt.scatter(X[:, 0], X[:, 1], c=probs, alpha=0.3)