In [1]:
import pandas as pd
import numpy as np

In [2]:
# 导入数据
x_all = pd.read_csv('data/random_data_regression_X.csv', header=None)
y_all = pd.read_csv('data/random_data_regression_y.csv', header=None)
train_x = x_all[:40]
train_y = y_all[:40]
test_x = x_all[40:]
test_y = y_all[40:]
(x_all.shape, y_all.shape)

((50, 500), (50, 1))

In [3]:
# 封装最小二乘法
class OrdinaryLeastSquaresLinerRegression:
    def __init__(self):
        self._theta = None
        self.intercept_ = None
        self.coef_ = None

    def fit(self, x_train, y_train):
        X_b = np.hstack([np.ones((len(x_train), 1)), x_train])
        self._theta = np.linalg.inv(X_b.T.dot(X_b)).dot(X_b.T).dot(y_train)
        self.intercept_ = self._theta[0]
        self.coef_ = self._theta[1:]
        return self

    def predict(self, x_predict):
        X_b = np.hstack([np.ones((len(x_predict), 1)), x_predict])
        return X_b.dot(self._theta)


# 调用方法
reg = OrdinaryLeastSquaresLinerRegression()
reg.fit(train_x, train_y)
reg.coef_

array([[-1.71056415e+04],
       [-2.24539317e+05],
       [ 4.86941544e+04],
       [-1.00607619e+05],
       [-1.41904083e+05],
       [-1.11191459e+05],
       [ 3.17663333e+04],
       [ 6.71389502e+04],
       [ 1.34032876e+04],
       [-1.24988322e+05],
       [-1.84281181e+04],
       [ 2.89591532e+04],
       [-8.35623503e+04],
       [-5.07573195e+04],
       [-3.97079607e+04],
       [-4.15773835e+04],
       [-6.99885763e+04],
       [ 4.20659269e+04],
       [-5.55342523e+04],
       [ 1.16204196e+05],
       [-6.28104369e+04],
       [ 1.87689949e+04],
       [-6.58349711e+02],
       [-6.95909377e+04],
       [ 2.82423159e+04],
       [-3.30904576e+04],
       [-7.85047027e+04],
       [ 2.67370581e+03],
       [-3.39903105e+04],
       [ 1.67743896e+04],
       [-3.66944642e+04],
       [-8.41552392e+04],
       [-4.37184524e+03],
       [-4.35640617e+04],
       [-4.38334083e+04],
       [ 1.58076609e+04],
       [-1.06532784e+05],
       [-1.63456862e+04],
       [-6.4

In [4]:
from sklearn.metrics import mean_squared_error

pred = reg.predict(test_x)
np.sqrt(mean_squared_error(test_y, pred))

502685.7812319736

In [9]:
# 封装梯度下降法
class GradientDescentLinerRegression:
    def __init__(self, learning_rate=0.01, max_iter=1000, early_stop=True, seed=None):
        np.random.seed(seed)
        self.early_stop = early_stop
        self.lr = learning_rate  # 设置学习率
        self.max_iter = max_iter
        self.loss_arr = []
        self.intercept_ = None
        self.coef_ = None

    def fit(self, x, y):
        self.x = x
        self.x_b = np.hstack([np.ones((len(x), 1)), x])
        self.x_b_dim = np.size(self.x_b, 1)
        self.x_sample = np.size(self.x_b, 0)
        self.y = y
        self.w = np.random.normal(1, 0.001, (self.x_b_dim, 1))
        for i in range(self.max_iter):
            self._train_step()
            self.loss_arr.append(self.loss())
            print("iter: {}, loss_in_train: {:.5f}, loss_in_test: {:.5f}".format(i + 1, self.loss(is_test=False), self.loss_arr[-1]))

            # 连续十次不下降则退出
            if len(self.loss_arr) >= 10:
                arr = np.array(self.loss_arr[:-10])
                if len(arr) != 0 and np.sum(arr[0] < arr) == 0:
                    return

    def _f(self, x, w):
        return x.dot(w)

    def predict(self, x=None):
        if x is None:
            x = self.x_b
        x = np.hstack([np.ones((len(x), 1)), x])
        y_pred = self._f(x, self.w)
        return y_pred

    def loss(self, is_test=True, y_true=None, y_pred=None):
        if y_true is None or y_pred is None:
            y_true = self.y
        if is_test:
            return np.sqrt(mean_squared_error(test_y, self.predict(test_x)))
        else:
            return np.sqrt(mean_squared_error(y_true, self.predict(self.x)))

    def _calc_gradient(self):
        d_w = np.empty(self.x_b_dim).reshape(-1, 1)
        d_w[0] = np.sum(self.x_b.dot(self.w) - self.y)
        for i in range(1, self.x_b_dim):
            # print("{} {} {} {}".format(self.x_b.shape, self.w.shape, self.y.shape, self.x_b[:, i].T.shape))
            d_w[i] = np.squeeze((self.x_b.dot(self.w) - self.y)).dot(self.x_b[:, i].T)
        return d_w * 2 / self.x_sample

    def _train_step(self):
        d_w = self._calc_gradient()
        self.w = self.w - self.lr * d_w
        self.intercept_ = self.w[0]
        self.coef_ = self.w[1:]
        return self.w

reg = GradientDescentLinerRegression(learning_rate=1e-3, seed=1024)
reg.fit(train_x, train_y)
reg.coef_

iter: 1, loss_in_train: 30.05101, loss_in_test: 35.13702
iter: 2, loss_in_train: 29.30544, loss_in_test: 35.19389
iter: 3, loss_in_train: 28.58021, loss_in_test: 35.25055
iter: 4, loss_in_train: 27.87471, loss_in_test: 35.30694
iter: 5, loss_in_train: 27.18835, loss_in_test: 35.36305
iter: 6, loss_in_train: 26.52059, loss_in_test: 35.41883
iter: 7, loss_in_train: 25.87086, loss_in_test: 35.47425
iter: 8, loss_in_train: 25.23865, loss_in_test: 35.52929
iter: 9, loss_in_train: 24.62343, loss_in_test: 35.58392
iter: 10, loss_in_train: 24.02472, loss_in_test: 35.63813
iter: 11, loss_in_train: 23.44202, loss_in_test: 35.69187


array([[1.08580041],
       [0.80835784],
       [0.96169888],
       [1.06046853],
       [1.05521784],
       [0.95619998],
       [0.99935118],
       [1.13746504],
       [1.03108558],
       [0.93504387],
       [1.01110374],
       [0.99758081],
       [1.14976192],
       [0.97873377],
       [1.24455398],
       [0.98256361],
       [0.94727349],
       [1.04173728],
       [1.08264653],
       [1.04102854],
       [1.20554018],
       [1.21542349],
       [1.08342714],
       [0.92743669],
       [0.91649313],
       [0.9449844 ],
       [1.095866  ],
       [1.02780193],
       [1.00815463],
       [0.94248636],
       [0.93549627],
       [1.07403956],
       [1.03138165],
       [1.01198715],
       [1.01208492],
       [1.02683206],
       [1.08136812],
       [1.15313554],
       [1.11051811],
       [0.97874775],
       [0.88337135],
       [0.98262248],
       [1.05123822],
       [0.89871387],
       [1.03134521],
       [1.24165579],
       [1.06973531],
       [1.059

In [6]:
# 封装最小二乘法(包含正则化项)
class OrdinaryLeastSquaresLinerRegressionWithRegularization:
    def __init__(self, _lambda=0.1):
        self._lambda = _lambda
        self._theta = None
        self.intercept_ = None
        self.coef_ = None

    def fit(self, x_train, y_train):
        r = np.diag([self._lambda] * (x_train.shape[1] + 1))
        r[0][0] = 0
        X_b = np.hstack([np.ones((len(x_train), 1)), x_train])
        self._theta = np.linalg.inv(X_b.T.dot(X_b) + r).dot(X_b.T).dot(y_train)
        self.intercept_ = self._theta[0]
        self.coef_ = self._theta[1:]
        return self

    def predict(self, x_predict):
        X_b = np.hstack([np.ones((len(x_predict), 1)), x_predict])
        return X_b.dot(self._theta)

# 调用方法
reg = OrdinaryLeastSquaresLinerRegressionWithRegularization(_lambda=0.1)
reg.fit(train_x, train_y)
reg.coef_

array([[ 5.09377937e-02],
       [-9.73139426e-02],
       [ 8.17862915e-02],
       [ 4.91587905e-01],
       [ 5.20187922e-01],
       [-1.63280922e-01],
       [-2.17837706e-01],
       [ 8.11202701e-01],
       [ 3.37272393e-01],
       [-3.83060819e-01],
       [ 1.78821321e-01],
       [ 3.40271854e-01],
       [ 3.32779579e-01],
       [ 1.39946019e-01],
       [ 4.20897501e-01],
       [ 3.32263909e-02],
       [-4.61845622e-01],
       [ 1.33218708e-01],
       [-1.04878599e-01],
       [ 5.10349690e-01],
       [ 4.35200051e-01],
       [ 5.58001081e-01],
       [ 4.92958338e-01],
       [-2.04522427e-01],
       [-1.30378129e-02],
       [-8.91111532e-02],
       [ 4.16974076e-01],
       [ 1.83068711e-01],
       [ 2.01438802e-02],
       [ 1.03083726e-01],
       [ 6.77458659e-03],
       [ 3.89121545e-02],
       [ 7.46367173e-01],
       [-1.26990026e-01],
       [-1.69182684e-01],
       [ 2.34174280e-01],
       [ 2.93558520e-01],
       [ 7.12851105e-01],
       [ 4.9

In [93]:
pred = reg.predict(test_x)
np.sqrt(mean_squared_error(test_y, pred))

36.06870938703824

In [98]:
# 封装梯度下降法(包含正则化项)
class GradientDescentLinerRegressionWithRegularization:
    def __init__(self, learning_rate=0.01, max_iter=1000, _lambda=0.1, early_stop=True, seed=None):
        np.random.seed(seed)
        self.early_stop = early_stop
        self._lambda = _lambda
        self.lr = learning_rate  # 设置学习率
        self.max_iter = max_iter
        self.loss_arr = []
        self.intercept_ = None
        self.coef_ = None

    def fit(self, x, y):
        self.x = x
        self.x_b = np.hstack([np.ones((len(x), 1)), x])
        self.x_b_dim = np.size(self.x_b, 1)
        self.x_sample = np.size(self.x_b, 0)
        self.y = y
        self.w = np.random.normal(1, 0.001, (self.x_b_dim, 1))
        for i in range(self.max_iter):
            self._train_step()
            self.loss_arr.append(self.loss())
            print("iter: {}, loss_in_train: {:.5f}, loss_in_test: {:.5f}".format(i + 1, self.loss(is_test=False), self.loss_arr[-1]))

            # 连续十次不下降则退出
            if self.early_stop and len(self.loss_arr) >= 10:
                arr = np.array(self.loss_arr[:-10])
                if np.sum(arr[0] < arr) == 10:
                    return

    def _f(self, x, w):
        return x.dot(w)

    def predict(self, x=None):
        if x is None:
            x = self.x_b
        x = np.hstack([np.ones((len(x), 1)), x])
        y_pred = self._f(x, self.w)
        return y_pred

    def loss(self, is_test=True, y_true=None, y_pred=None):
        if y_true is None or y_pred is None:
            y_true = self.y
        if is_test:
            return np.sqrt(mean_squared_error(test_y, self.predict(test_x)))
        else:
            return np.sqrt(mean_squared_error(y_true, self.predict(self.x)))

    def _calc_gradient(self):
        d_w = np.empty(self.x_b_dim).reshape(-1, 1)
        d_w[0] = np.sum(self.x_b.dot(self.w) - self.y)
        for i in range(1, self.x_b_dim):
            # print("{} {} {} {}".format(self.x_b.shape, self.w.shape, self.y.shape, self.x_b[:, i].T.shape))
            d_w[i] = np.squeeze((self.x_b.dot(self.w) - self.y)).dot(self.x_b[:, i].T)
        return d_w * 2 / self.x_sample + (self._lambda / self.x_sample * self.w)

    def _train_step(self):
        d_w = self._calc_gradient()
        self.w = self.w - self.lr * d_w
        self.intercept_ = self.w[0]
        self.coef_ = self.w[1:]
        return self.w


reg = GradientDescentLinerRegressionWithRegularization(learning_rate=1e-3, _lambda=100, seed=1024)
reg.fit(train_x, train_y)

iter: 1, loss_in_train: 30.00431, loss_in_test: 35.12043
iter: 2, loss_in_train: 29.21537, loss_in_test: 35.16077
iter: 3, loss_in_train: 28.44995, loss_in_test: 35.20093
iter: 4, loss_in_train: 27.70729, loss_in_test: 35.24087
iter: 5, loss_in_train: 26.98667, loss_in_test: 35.28055
iter: 6, loss_in_train: 26.28739, loss_in_test: 35.31994
iter: 7, loss_in_train: 25.60878, loss_in_test: 35.35899
iter: 8, loss_in_train: 24.95019, loss_in_test: 35.39768
iter: 9, loss_in_train: 24.31099, loss_in_test: 35.43598
iter: 10, loss_in_train: 23.69056, loss_in_test: 35.47386
iter: 11, loss_in_train: 23.08833, loss_in_test: 35.51129
iter: 12, loss_in_train: 22.50373, loss_in_test: 35.54827
