###  用Python实现SVM

In [15]:
import numpy as np
import pandas as pd
from zipfile import ZipFile
import random
import time
import math

In [21]:
def loadData(pathName,fileName):
    '''
    加载数据并返回数据集与标签集
    '''
    dataArr = []
    labelArr = []
    file = ZipFile(pathName)
    fr = file.open(fileName)
    
    '''
    for line in fr.readlines():
        # 获取数据，用“，”分割保存，将标签外的数据存入dataArr中，将标签存入labelArr中，curline[0]为标签
        # 将元字符串形式数据转换为0-1浮点型
        curline = line.strip().split(',')
        dataArr.append([int(num) / 255 for num in curline[1:]])
        
        # 标签数据转化为整型，数字0标记为1，其余标记为-1
        if int(curline[0]) == 0:
            labelArr.append(1)
        else:
            labelArr.append(-1)
    
    return dataArr, labelArr
    '''

    train_data = pd.read_csv(fr, sep=",", header=None)
    dataArr = train_data[1:]
    
    if int(train_data[0][0]) == 0:
        labelArr.append(1)
    else:
        labelArr.append(-1)
    
    return dataArr, labelArr

In [24]:
# SVM类
class SVM:
    def __init__(self, trainDatalist, trainLabelList, sigma=10, C=200, toler=0.001):
        '''
        SVM相关参数初始化
        :param trainDataList:训练数据集
        :param trainLabelList:训练测试集
        :param sigma:高斯核中分母的σ
        :param C:软间隔中的惩罚参数
        :param toler:松弛变量
        '''
        
        self.trainDataMat = np.mat(trainDataList)
        self.trainLabelMat = np.mat(trainLabelList).T
        
        self.m, self.n = np.shape(self.trainDataMat)    # m为训练集数量，n为样本特征数目
        self.sigma = sigma
        self.C = C
        self.toler = toler
        
        self.k = self.calcKernel()                      # 核函数（初始化时提前计算）
        self.b = 0                                      # SVM中的偏置b
        self.alpha = [0] * self.trainDataMat.shape[0]   # α长度为训练集数目
        self.E = [0 * self.trainLabelMat[i, 0] for i in range(self.trainLabelMat.shape[0])]  # SMO运算过程中的Ei
        self.supportVecIndex = []
    
    
    def calcKernel(self):
        '''
        计算核函数-采用高斯核， 返回高斯核矩阵
        '''
        
        # 初始化高斯核矩阵  大小 = 训练集长度m * 训练集长度m
        # k[i][j] = Xi * Xj
        k = [[0 for i in range(self.m)] for j in range(self.m)]
        
        # 遍历循环获取Xi和Xj
        for i in range(self.m):
            if i % 100 == 0:
                print('construct the kernel:', i, self.m)
            
            X = self.trainDataMat[i, :]
            for j in range(self.m):
                Z = self.trainDataMat[j, :]
                numeratorResult = (X - Z) * (X - Z).T
                result = np.exp(-1 * numeratorResult / (2 * self.sigma ** 2)) 
                k[i][j] = result
                k[j][i] = result
        
        return k
    
    
    def satisfyKKT(self, i):
        '''
        查看第i个α是否满足KKT条件
        :param i:α的下标
        :return:
            True：满足
            False：不满足
        '''
        gxi = self.calc_gxi(i)
        yi = self.trainLabelMat(i)
        
        # 根据变量的选择方法，判断第一个变量的选择是否满足KKT条件，该检验是在松弛变量Ei范围内进行的
        # 检验中，外层循环先遍历所有满足条件0<alpha[i]<C的样本点，即在间隔边界上的支持向量点，检验是否满足KKT条件
        # 如果这些样本点都满足，那遍历整个训练集，检验是否满足KKT条件
        
        # alpha[i]=0 & yi*gxi>=1
        if (math.fabs(self.alpha[i]) < self.toler) and (yi * gxi >= 1):
            return True
        
        # alpha[i]=C & yi*gxi=1
        elif (math.fabs(self.alpha[i] - self.C) < self.toler) and (yi * gxi <= 1):
            return True
        
        # 0<alpha[i]<C & yi*gxi<=1
        elif (self.alpha[i] > -self.toler) and (self.alpha[i] < (self.C + self.toler)) \
                and (math.fabs(yi * gxi - 1) < self.toler):
            return True

        return False
    
    
    def calc_gxi(self, i):
        '''
        计算g(xi) = alpha(i)*yi*k[i][j]+b
        依据“7.101 两个变量二次规划的求解方法”式7.104
        :param i:x的下标
        :return: g(xi)的值
        '''
        gxi = 0
        index = [i for i, alpha in enumerate(self.alpha) if alpha != 0 ]
        for j in index:
            gxi += self.alpha[j] * self.trainLabelMat[j] * self.k[j][i]
        gxi += self.b
        
        return gxi
    
    
    def calcEi(self, i):
        '''
        根据“两个变量二次规划的求解方法”式
        计算Ei=gxi-yi
        :param i: E的下标
        :return:
        '''
        gxi = self.calc_gxi(i)
        return gxi - self.trainLabelMat[i]
    
    
    def getAlphaJ(self, E1, i):
        '''
        SMO中选择第二个变量
        :param E1: 第一个变量的E1
        :param i: 第一个变量α的下标
        :return: E2，α2的下标
        '''
        E2 = 0
        maxE1_E2 = -1
        maxIndex = -1
        
        #获得Ei非0的对应索引组成的列表，列表内容为非0Ei的下标i
        nozeroE = [i for i, Ei in enumerate(self.E) if Ei != 0]
        #对每个非零Ei的下标i进行遍历
        for j in nozeroE:
            #计算E2
            E2_tmp = self.calcEi(j)
            #如果|E1-E2|大于目前最大值
            if math.fabs(E1 - E2_tmp) > maxE1_E2:
                #更新最大值
                maxE1_E2 = math.fabs(E1 - E2_tmp)
                #更新最大值E2
                E2 = E2_tmp
                #更新最大值E2的索引j
                maxIndex = j
        #如果列表中没有非0元素了（对应程序最开始运行时的情况）
        if maxIndex == -1:
            maxIndex = i
            while maxIndex == i:
                #获得随机数，如果随机数与第一个变量的下标i一致则重新随机
                maxIndex = int(random.uniform(0, self.m))
            #获得E2
            E2 = self.calcEi(maxIndex)

        #返回第二个变量的E2值以及其索引
        return E2, maxIndex
    
    
    def train(self, iter = 100):
        #iterStep：迭代次数，超过设置次数还未收敛则强制停止
        #parameterChanged：单次迭代中有参数改变则增加1
        iterStep = 0; parameterChanged = 1

        #如果没有达到限制的迭代次数以及上次迭代中有参数改变则继续迭代
        #parameterChanged==0时表示上次迭代没有参数改变，如果遍历了一遍都没有参数改变，说明
        #达到了收敛状态，可以停止了
        while (iterStep < iter) and (parameterChanged > 0):
            print('iter:%d:%d'%( iterStep, iter))
            iterStep += 1
            parameterChanged = 0

            #大循环遍历所有样本，用于找SMO中第一个变量
            for i in range(self.m):
                if self.isSatisfyKKT(i) == False:
                    E1 = self.calcEi(i)
                    E2, j = self.getAlphaJ(E1, i)

                    #获得两个变量的标签
                    y1 = self.trainLabelMat[i]
                    y2 = self.trainLabelMat[j]
                    #复制α值作为old值
                    alphaOld_1 = self.alpha[i]
                    alphaOld_2 = self.alpha[j]
                    #依据标签是否一致来生成不同的L和H
                    if y1 != y2:
                        L = max(0, alphaOld_2 - alphaOld_1)
                        H = min(self.C, self.C + alphaOld_2 - alphaOld_1)
                    else:
                        L = max(0, alphaOld_2 + alphaOld_1 - self.C)
                        H = min(self.C, alphaOld_2 + alphaOld_1)
                    # 如果两者相等，说明该变量无法再优化，直接跳到下一次循环
                    if L == H:   continue

                    # 计算α的新值
                    k11 = self.k[i][i]
                    k22 = self.k[j][j]
                    k21 = self.k[j][i]
                    k12 = self.k[i][j]
                    alphaNew_2 = alphaOld_2 + y2 * (E1 - E2) / (k11 + k22 - 2 * k12)
                    if alphaNew_2 < L: alphaNew_2 = L
                    elif alphaNew_2 > H: alphaNew_2 = H
                    alphaNew_1 = alphaOld_1 + y1 * y2 * (alphaOld_2 - alphaNew_2)

                    # 计算b1和b2
                    b1New = -1 * E1 - y1 * k11 * (alphaNew_1 - alphaOld_1) \
                            - y2 * k21 * (alphaNew_2 - alphaOld_2) + self.b
                    b2New = -1 * E2 - y1 * k12 * (alphaNew_1 - alphaOld_1) \
                            - y2 * k22 * (alphaNew_2 - alphaOld_2) + self.b

                    # 依据α1和α2的值范围确定新b
                    if (alphaNew_1 > 0) and (alphaNew_1 < self.C):
                        bNew = b1New
                    elif (alphaNew_2 > 0) and (alphaNew_2 < self.C):
                        bNew = b2New
                    else:
                        bNew = (b1New + b2New) / 2

                    #将更新后的各类值写入，进行更新
                    self.alpha[i] = alphaNew_1
                    self.alpha[j] = alphaNew_2
                    self.b = bNew

                    self.E[i] = self.calcEi(i)
                    self.E[j] = self.calcEi(j)

                    #如果α2的改变量过于小，就认为该参数未改变，不增加parameterChanged值
                    #反之则自增1
                    if math.fabs(alphaNew_2 - alphaOld_2) >= 0.00001:
                        parameterChanged += 1

                #打印迭代轮数，i值，该迭代轮数修改α数目
                print("iter: %d i:%d, pairs changed %d" % (iterStep, i, parameterChanged))

        #全部计算结束后，重新遍历一遍α，查找里面的支持向量
        for i in range(self.m):
            #如果α>0，说明是支持向量
            if self.alpha[i] > 0:
                #将支持向量的索引保存起来
                self.supportVecIndex.append(i)
        
    
    def calcSinglKernel(self, x1, x2):
        '''
        单独计算核函数
        :param x1:向量1
        :param x2: 向量2
        :return: 核函数结果
        '''
        result = (x1 - x2) * (x1 - x2).T
        result = np.exp(-1 * result / (2 * self.sigma ** 2))
        return np.exp(result)
    
    def predict(self, x):
        '''
        对样本的标签进行预测
        :param x: 要预测的样本x
        :return: 预测结果
        '''

        result = 0
        for i in self.supportVecIndex:
            tmp = self.calcSinglKernel(self.trainDataMat[i, :], np.mat(x))
            result += self.alpha[i] * self.trainLabelMat[i] * tmp
        result += self.b
        # 使用sign函数返回预测结果
        return np.sign(result)
    
    
    def test(self, testDataList, testLabelList):
        '''
        测试
        :param testDataList:测试数据集
        :param testLabelList: 测试标签集
        :return: 正确率
        '''
        errorCnt = 0
        for i in range(len(testDataList)):
            print('test:%d:%d'%(i, len(testDataList)))
            result = self.predict(testDataList[i])
            if result != testLabelList[i]:
                errorCnt += 1
        
        return 1 - errorCnt / len(testDataList)

In [25]:
if __name__ == '__main__':
    start = time.time()

    # 获取训练集及标签
    print('start read transSet')
    trainDataList, trainLabelList = loadData('./data_sets/Mnist/mnist_train.zip', 'mnist_train.csv')

    # 获取测试集及标签
    print('start read testSet')
    testDataList, testLabelList = loadData('./data_sets/Mnist/mnist_test.zip', 'mnist_test.csv')

    #初始化SVM类
    print('start init SVM')
    svm = SVM(trainDataList[:1000], trainLabelList[:1000], 10, 200, 0.001)

    # 开始训练
    print('start to train')
    svm.train()

    # 开始测试
    print('start to test')
    accuracy = svm.test(testDataList[:100], testLabelList[:100])
    print('the accuracy is:%d'%(accuracy * 100), '%')

    # 打印时间
    print('time span:', time.time() - start)

start read transSet
start read testSet
start init SVM


ValueError: ndarray is not contiguous