In [None]:
import pandas as pd
import numpy as np
import os
import typing as t
__file__ = os.getcwd()
CSV_PATH = os.path.join(os.path.abspath(__file__), 'data\\train.csv')

In [None]:
def simple_split(
        data: np.array, 
        label: np.array
) -> t.Tuple[np.array, np.array, np.array, np.array]:
    """简单交叉验证"""
    lens = len(data)
    train_num = int(lens * 0.7)
    train_x = data[: train_num]
    train_y = label[: train_num]
    test_x = data[train_num: ]
    test_y = label[train_num: ]
    return train_x, train_y, test_x, test_y
    

def k_fold_split(
        data: np.array, 
        label: np.array, 
        k: int = 10,
) -> t.Tuple[np.array, np.array, np.array, np.array]:
    """K折交叉验证, 默认10折"""
    lens = len(data)
    cell_num = lens // k
    for i in range(cell_num):
        # 在纵向合并两个array
        if i == cell_num - 1 or (i + 1) * k > lens:
            last_num = lens
        else:
            last_num = (i + 1) * k
        train_x = np.concatenate([data[: i * k], data[last_num: ]], axis = 0)
        train_y = np.concatenate([label[: i * k], label[last_num: ]], axis = 0)
        test_x = data[i * k: last_num]
        test_y = label[i * k: last_num]
        yield train_x, train_y, test_x, test_y
        

def prefix_process(
        df_data: pd.DataFrame, 
        validate_choice: int = 1,
        fold_num: int = 10,
) -> t.Tuple[np.array, np.array, np.array, np.array]:
    # NaN数据填充为0
    df_data.fillna(0)
    array = np.array(df_data)
    x = array[:, 1: -1]
    y = array[:, -1]
    # 对倒数第二列, 第三列缩小范围之1左右
    x[-1] /= np.mean(x[-1])
    x[-2] /= np.mean(x[-2])
    
    if validate_choice == 1:
        return simple_split(x, y)
    else:
        return k_fold_split(x, y, fold_num)
    
    


In [None]:
def train(train_x: np.array, train_y: np.array, iterations: int):
    """
    训练模型
    1.先针对特征构造线性回归模型, z = wx + b
    2.将z带入sigmoid函数, 形成输入空间x到输出空间y的非线性映射。y = 1 / (1 + exp(-z))
    3.对y取对数, 由于logistic Regression 处理的是二分类问题 ==> 
        p(y=1|x) = exp(z) / (1 + exp(z))
        p(y=0|x) = 1 / (1 + exp(z))
        
    """
    lens = len(train_y)
    weights = np.ones(57)  # 初始化特征权重
    bias = 0  # 初始化偏置值
    bg_sum = 0 # 初始化bias的梯度下降和
    wg_sum = np.zeros(57) # 初始化weight的梯度下降和
    reg_rate = 0.001 # 选择L2范数, 2 / reg_rate * ||w||²
    learning_rate = 1  # 初始学习曲率
    
    
    for k in range(iterations):
        b_g = 0
        w_g = np.zeros(57)
        
        for i in range(lens):
            z = weights.dot(train_x[i, :]) + bias
            if z >= 0:
                yn = 1 / (1 + np.exp(-z))
            else:
                yn = np.exp(z) / (1 + np.exp(z)) # sigmoid函数 = p(y=1|x)
            
            """Loss函数形式
            p1 = np.log(z) / (1 + np.log(z)) # p(y=1|x)
            p0 = 1 - p1 # p(y=1|x)
            loss = -train_y[i] * np.log(p1) - (1 - train_y[i]) * np.log(p0))
            loss = -train_y[i] * z + np.log(1 + np.exp(z)))
            """
            
            # 对loss进行的weight和bias用梯度下降法求极小值点
            for m in range(57):
                w_g[m] += -1 * (train_y[i] - yn) * train_x[i, m]
            b_g += -1 * (train_y[i] - yn)
        
        # 添加正则化项
        w_g += reg_rate * np.sum(weights)
            
        # Adagrad优化自适应学习率
        wg_sum += w_g ** 2
        bg_sum += b_g ** 2
        weights -= learning_rate / wg_sum ** 0.5 * w_g
        bias -= learning_rate / bg_sum ** 0.5 * b_g
        
        if k % 200 == 0:
            acc = 0
            for w in range(lens):
                z = weights.dot(train_x[w, :]) + bias
                yn = 1 / (1 + np.exp(-z)) # sigmoid函数 = p(y=1|x)
                if yn > 0.5:
                    cur_category = 1
                else:
                    cur_category = 0
                if cur_category == train_y[w]:
                    acc += 1
                
            print('after {} epochs, the currency on train data is:'.format(k), acc/lens)
            # print(f'weight: {weights}, bias: {bias}')
            
    return weights, bias
            
            
    
    

In [None]:
def simple_validate(
        weights: np.array, 
        bias: float,
        test_x: np.array, 
        test_y: np.array
):
    """3:7验证"""
    lens = len(test_y)
    acc = 0
    for i in range(lens):
        z = weights.dot(test_x[i, :]) + bias
        yn = 1 / (1 + np.exp(-z)) # sigmoid函数 = p(y=1|x)
        if yn > 0.5:
            cur_category = 1
        else:
            cur_category = 0
        if cur_category == test_y[i]:
            acc += 1
    return acc / lens
    


In [None]:
def main():
    """使用简单交叉验证"""
    df_data = pd.read_csv(CSV_PATH)
    train_x, train_y, test_x, test_y = prefix_process(df_data)
    weights, bias = train(train_x, train_y, 10000)
    accuracy = simple_validate(weights, bias, test_x, test_y)
    print('test accuracy:', accuracy)
    # print('weights:', weights)
    # print('bias:', bias)
    
main()

In [None]:
def main():
    """使用K折交叉验证"""
    df_data = pd.read_csv(CSV_PATH)
    m = 1
    _accuracy = 0
    _weights = None
    _bias = None
    for train_x, train_y, test_x, test_y in prefix_process(df_data, validate_choice=0):
        weights, bias = train(train_x, train_y, 10000)
        accuracy = simple_validate(weights, bias, test_x, test_y)
        if accuracy > _accuracy:
            _accuracy = accuracy
            _weights = weights
            _bias = bias
        m += 1
        print('------------------------------------')
    print(f'most accuracy: {_accuracy}')
    # print('weights:', _weights)
    # print('bias:', _bias)
    
main()
        