# Imports

In [None]:
from tf.keras.utils import plot_model
import numpy as np
import tensorflow as tf
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Dense, Activation
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
import scipy.stats

# Helpers

In [None]:
def y_func(x, m):
    return 1/3 + 0.5 * np.sin(3 * x * np.pi) + m

# sample data using numpy's random_sample
def generate_data(n):
    x = np.random.random_sample((n, 1))
    m = 1.4 * np.random.random_sample((n , 1)) - 0.7
    y = y_func(x, m)
    return (x, y)

# draw plot using x ,y , and title
def draw_plot(x, y, pred, title):
    plt.scatter(x, y, c="b")
    if (pred is not None):
        plt.scatter(x, pred, c="r")
        plt.legend(["truth", "prediction"], loc ="lower right")
    plt.title(title)
    plt.xlabel("x")
    plt.ylabel("y")
    plt.show()

# Dataset

In [None]:
rbf_train_x, rbf_train_y = generate_data(300)
rbf_valid_x, rbf_valid_y = generate_data(30)

draw_plot(rbf_train_x, rbf_train_y, title="Generated train data", pred=None)
draw_plot(rbf_valid_x, rbf_valid_y, title="Generated valid data", pred=None)

# MLP

In [None]:
def create_mlp(layers=[60, 40, 30, 20]):
    # input layer
    input_layer = Input(shape=(None,1))
    # hidden layers
    x = Dense(layers[0])(input_layer)
    x = Activation("relu")(x)

    for i in range(1, len(layers)):
        x = Dense(layers[i])(x)
        x = Activation("relu")(x)
        
    # output layer
    x = Dense(1)(x)
    model = Model(inputs=input_layer, outputs=x)
    model.compile(loss='mean_squared_error', optimizer='SGD')
    return model

In [None]:
model = create_mlp()
plot_model(model)
model.fit(rbf_train_x, rbf_train_y, validation_data=(rbf_valid_x, rbf_valid_y) , epochs=500, batch_size=50, verbose=0)
mlp_pred_valid_y = model.predict(rbf_valid_x)
draw_plot(x=rbf_valid_x, y=rbf_valid_y, pred=mlp_pred_valid_y, title="Predicted valid data after 500 epochs")
mlp_pred_train_y = model.predict(rbf_train_x)
draw_plot(x=rbf_train_x, y=rbf_train_y, pred=mlp_pred_train_y, title="Predicted train data after 500 epochs")

model.fit(rbf_train_x, rbf_train_y, validation_data=(rbf_valid_x, rbf_valid_y) , epochs=300, batch_size=50, verbose=0)
mlp_pred_valid_y = model.predict(rbf_valid_x)
draw_plot(x=rbf_valid_x, y=rbf_valid_y, pred=mlp_pred_valid_y, title="Predicted valid data after 800 epochs")
mlp_pred_train_y = model.predict(rbf_train_x)
draw_plot(x=rbf_train_x, y=rbf_train_y, pred=mlp_pred_train_y, title="Predicted train data after 800 epochs")

# RBF

In [None]:
class RBF:
    def __init__(self, input_n, lr=0.01, cluster_method="kmeans"):
        self.input_n = input_n
        self.clusters_n = input_n // 2
        self.lr = lr
        # output layer weights
        self.w = np.random.random((self.clusters_n,1))
        self.b = np.random.random((1,1))
        self.cluster_method = cluster_method
        
    def find_centers_knn(self, inputs):
        clusters = KMeans(n_clusters=self.clusters_n).fit(inputs)
        return clusters.cluster_centers_

    def find_centers_gmm(self, inputs):
        gmm = GaussianMixture(n_components=self.clusters_n)
        gmm.fit(inputs)
        centers = []
        for i in range(gmm.n_components):
            density = scipy.stats.multivariate_normal(cov=gmm.covariances_[i], mean=gmm.means_[i]).logpdf(rbf_train_x)
            centers.append(rbf_train_x[np.argmax(density)])
        return np.array(centers)

    def gaus_dist(self, p1, p2):
        return np.abs(p1-p2)

    def rfb_layer(self, inputs):
        d = self.gaus_dist(inputs, self.centers)
        return np.exp(-d/np.power(self.rs,2))
    
    
    
    def calculate_error(self, truth, pred):
        return (np.power(truth-pred,2)).mean()
    
    def get_result(self, inputs, truth):
        rbf_res = self.rfb_layer(inputs)
        
        output = np.dot(self.w.T, rbf_res) + self.b
        # learn output layer w and b with gradient
        if (truth is not None):
            err = (output.reshape(1,1) - truth.reshape(1,1))
            self.w = self.w - self.lr * rbf_res * err
            self.b = self.b - self.lr * err
        
        return output
    
            
    def predict(self, x):
        y = []
        for i in range(len(x)):
            y_i = self.get_result(x[i], truth=None)
            y.append(y_i)
            
        return np.array(y)
        
    
    def train(self, inputs, answers, epochs, continute_train=False):
        # rbf layers radius and centers
        if (not continute_train):
            if (self.cluster_method == "kmeans"):
              self.centers = self.find_centers_knn(inputs)
            elif (self.cluster_method == "gmm"):
              self.centers = self.find_centers_gmm(inputs)
            max_dist = max([np.abs(c1 - c2) for c1 in self.centers for c2 in self.centers])
            self.rs = np.repeat(max_dist / np.sqrt(2*self.clusters_n), self.clusters_n)
            self.rs = np.expand_dims(self.rs, axis=1)
            
        for e in range(epochs):
            for i in range(self.input_n):
                self.get_result(inputs[i], answers[i])
        

## Finding centers using KMeans

In [None]:
rbf = RBF(len(rbf_train_x) // 2)
rbf.train(rbf_train_x, rbf_train_y, 100)
rbf_pred_valid_y = rbf.predict(rbf_valid_x).reshape((-1,1))
draw_plot(x=rbf_valid_x, y=rbf_valid_y, pred=rbf_pred_valid_y, title="Predicted valid data after 100 epochs")
rbf_pred_train_y = rbf.predict(rbf_train_x).reshape((-1,1))
draw_plot(x=rbf_train_x, y=rbf_train_y, pred=rbf_pred_train_y, title="Predicted train data after 100 epochs")

rbf.train(rbf_train_x, rbf_train_y, 100)
rbf_pred_valid_y = rbf.predict(rbf_valid_x).reshape((-1,1))
draw_plot(x=rbf_valid_x, y=rbf_valid_y, pred=rbf_pred_valid_y, title="Predicted valid data after 200 epochs")
rbf_pred_train_y = rbf.predict(rbf_train_x).reshape((-1,1))
draw_plot(x=rbf_train_x, y=rbf_train_y, pred=rbf_pred_train_y, title="Predicted train data after 200 epochs")

## Finding centers using GMM

In [None]:
rbf_gmm = RBF(len(rbf_train_x) // 2, cluster_method="gmm")
rbf_gmm.train(rbf_train_x, rbf_train_y, 100)
rbf_gmm_pred_valid_y = rbf_gmm.predict(rbf_valid_x).reshape((-1,1))
draw_plot(x=rbf_valid_x, y=rbf_valid_y, pred=rbf_gmm_pred_valid_y, title="Predicted valid data after 100 epochs")
rbf_gmm_pred_train_y = rbf_gmm.predict(rbf_train_x).reshape((-1,1))
draw_plot(x=rbf_train_x, y=rbf_train_y, pred=rbf_gmm_pred_train_y, title="Predicted train data after 100 epochs")

rbf_gmm.train(rbf_train_x, rbf_train_y, 100)
rbf_gmm_pred_valid_y = rbf_gmm.predict(rbf_valid_x).reshape((-1,1))
draw_plot(x=rbf_valid_x, y=rbf_valid_y, pred=rbf_gmm_pred_valid_y, title="Predicted valid data after 200 epochs")
rbf_gmm_pred_train_y = rbf_gmm.predict(rbf_train_x).reshape((-1,1))
draw_plot(x=rbf_train_x, y=rbf_train_y, pred=rbf_gmm_pred_train_y, title="Predicted train data after 200 epochs")

# Comparing results

In [None]:
plt.scatter(rbf_train_x, rbf_train_y, c="b")
plt.scatter(rbf_train_x, mlp_pred_train_y, c="r")
plt.scatter(rbf_train_x, rbf_pred_train_y, c="y")
plt.legend(["truth", "mlp", "rbf"], loc ="lower right")
plt.title("Comparison of train data")
plt.xlabel("x")
plt.ylabel("y")
plt.show()

plt.scatter(rbf_valid_x, rbf_valid_y, c="b")
plt.scatter(rbf_valid_x, mlp_pred_valid_y, c="r")
plt.scatter(rbf_valid_x, rbf_pred_valid_y, c="y")
plt.legend(["truth", "mlp", "rbf"], loc ="lower right")
plt.title("Comparison of valid data")
plt.xlabel("x")
plt.ylabel("y")
plt.show()