In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
plt.rcParams['font.sans-serif'] = ['SimHei'] 
plt.rcParams['axes.unicode_minus'] = False  


# Generate dataset
#Function y = 4*x1*x1 + 9*x2*x2   
def get_data(sample_num=1000):
    x1 = np.linspace(0, 9, sample_num)  
    x2 = np.linspace(4, 13, sample_num)
    x = np.concatenate(([x1], [x2]), axis=0).T   # generate a new array
    x = x*x
    y = np.dot(x, np.array([4, 9]).T)   
    return x, y


def shuffle_data(x, y):#Randomly shuffle x, y data
    seed = random.random()
    random.seed(seed)
    random.shuffle(x)
    random.seed(seed)
    random.shuffle(y)


def get_splited_data(x, y, ratio):
    shuffle_data(x, y)
    train_size = int (len(x)*ratio)
    test_size = int (len(x)*(1-ratio))
    train_x = x[:train_size]
    train_y = y[:train_size]
    test_x = x[test_size:]
    test_y = y[test_size:]
    return train_x, train_y, test_x, test_y


# Calculate the gradient
def compute_grad(X,y,batch_size,theta,type):
    row = X.shape[0]
    col = X.shape[1]
    grad = np.ones((col, 1))
    if type == 'FULL':  # GD
        h = np.dot(X, theta)
        grad = np.dot(np.transpose(X), h-y)/row
    elif type == 'SGD':  # SGD
        r = np.random.randint(row)                            # Randomly select a sample from all samples
        h = np.dot(np.array([X[r, :]]), theta)
        grad = np.dot(np.transpose(np.array([X[r, :]])), h - np.array([y[r, :]]))

    elif type == 'MINI':                                                                             # Mini-Batch 
        r = np.random.choice(row,batch_size,replace=False)  # choose  a batch sample
        h = np.dot(X[r,:], theta)
        grad = np.dot(np.transpose(X[r,:]), h - y[r,:]) / batch_size
    else:
        print("NO such gradient dencent Method!")

    # Calculatie train_loss
    loss = compute_RMSE(X, y, theta)
    return grad, loss


# Update theta
def update_theta(grad,theta,alpha):
    theta = theta - alpha*grad
    return theta


# LMS as loss function
def compute_RMSE(X,y,theta):
    row = X.shape[0]
    hh = np.dot(X, theta)
    RMSE = np.dot((np.transpose(hh-y)),(hh-y))/(2*row)    
    return RMSE


# run Mini-batch SGD
def run_mini_sgd():

    # Obtain the data set and divide it into training and test sets
    X, y = get_data()
    y = np.array(y).reshape(-1, 1)
    shuffle_data(X,y)
    train_x,train_y,test_x,test_y = get_splited_data(X, y, ratio=0.8)

    col = X.shape[1]  #
    theta = np.zeros((col, 1))
    max_step = 500   #The maximum number of iterations
    train_loss = []
    test_loss = []
    train_step = []
    test_step = []
    loss = 1
    step = 0
    while loss > 0.01 and step < max_step:

        grad, loss = compute_grad(train_x, train_y, batch_size=500, theta=theta, type='MINI')
        theta = update_theta(grad, theta, alpha=0.001)
        print("step: ", step, "loss:", loss[0][0])

        train_loss.append(loss[0][0])
        train_step.append(step)

        if step % 10 == 0:
            testloss = compute_RMSE(test_x, test_y, theta)
            test_loss.append(testloss[0][0])
            test_step.append(step)

        step += 1
    print(theta)
    # plot loss_curve
    plt.plot(train_step, train_loss, c='red', label='train_loss_curve')
    plt.plot(test_step, test_loss, c='blue', label='test_loss_curve')
    plt.legend(loc='best')
    plt.xlabel("step")
    plt.ylabel("loss")
    plt.title("loss_curve")
    plt.show()

if __name__ == '__main__':
    run_mini_sgd()
    