#### 最大熵模型

#### 改进的迭代尺度算法IIS：  
**输入**：特征函数$f_1,f_2,...,f_n$；经验分布$\tilde{P}(X,Y)$，模型$P_w(y|x)$；  
**输出**：最优参数值$w_i^*$；最优模型$P_{w^*}$  
（1）对所有$i\in \{1,2,...n \}$，取初值$w_i=0$。  
（2）对每一$i\in \{1,2,...n \}$  
&emsp;（a）令$\delta_i$是方程  
&emsp;&emsp;&emsp;$\sum_{x,y}\tilde{P}(x)P(y|x)f_i(x,y)exp(\delta_if^\#(x,y))=E_{\tilde{P}}(f_i) $  
&emsp;的解，这里，  
&emsp;&emsp;&emsp;$f^\#(x,y)=\sum_{i=1}^nf_i(x,y)$  
&emsp;（b）更新$w_i$值：$w_i\gets w_i+\delta_i$。  
（3）如果不是所有$w_i$都收敛，重复步（2）.

#### 加载文件  
$fileName$: 要加载的文件路径  
*return*: 数据集和标签集

In [1]:
import numpy as np
from collections import defaultdict

In [2]:
def loadData(fileName):
    dataList = []; labelList = []
    fr = open(fileName, 'r')
    for line in fr.readlines():
        curLine = line.strip().split(',')
        if int(curLine[0]) == 0:
            labelList.append(1)
        else:
            labelList.append(0)
        dataList.append([int(int(num) > 128) for num in curLine[1:]])

    #返回data和label
    return dataList, labelList

#### 定义最大熵类

In [3]:
class maxEnt:
    # 参数初始化
    def __init__(self, trainDataList, trainLabelList, testDataList, testLabelList):
        self.trainDataList = trainDataList          #训练数据集
        self.trainLabelList = trainLabelList        #训练标签集
        self.testDataList = testDataList            #测试数据集
        self.testLabelList = testLabelList          #测试标签集
        self.featureNum = len(trainDataList[0])     #特征数量

        self.N = len(trainDataList)                 #总训练集长度
        self.n = 0                                  #训练集中（xi，y）对数量
        self.M = 10000                              #
        self.fixy = self.calc_fixy()                #所有(x, y)对出现的次数
        self.w = [0] * self.n                       #Pw(y|x)中的w
        self.xy2idDict, self.id2xyDict = self.createSearchDict()        #(x, y)->id和id->(x, y)的搜索字典
        self.Ep_xy = self.calcEp_xy()               #Ep_xy期望值
        
    # 计算特征函数f(x, y)关于模型P(Y|X)与经验分布P_(X, Y)的期望值
    def calcEpxy(self):
        Epxy = [0] * self.n
        for i in range(self.N):
            Pwxy = [0] * 2
            Pwxy[0] = self.calcPwy_x(self.trainDataList[i], 0)
            Pwxy[1] = self.calcPwy_x(self.trainDataList[i], 1)
            for feature in range(self.featureNum):
                for y in range(2):
                    if (self.trainDataList[i][feature], y) in self.fixy[feature]:
                        id = self.xy2idDict[feature][(self.trainDataList[i][feature], y)]
                        Epxy[id] += (1 / self.N) * Pwxy[y]
                        
        return Epxy
    
    # 计算特征函数f(x, y)关于经验分布P_(x, y)的期望值
    def calcEp_xy(self):
        Ep_xy = [0] * self.n

        for feature in range(self.featureNum):
            for (x, y) in self.fixy[feature]:
                id = self.xy2idDict[feature][(x, y)]
                Ep_xy[id] = self.fixy[feature][(x, y)] / self.N

        #返回期望
        return Ep_xy

    # 创建查询字典
    def createSearchDict(self):
        xy2idDict = [{} for i in range(self.featureNum)]
        id2xyDict = {}

        #设置缩影
        index = 0
        #对特征进行遍历
        for feature in range(self.featureNum):
            for (x, y) in self.fixy[feature]:
                xy2idDict[feature][(x, y)] = index
                id2xyDict[index] = (x, y)
                index += 1

        #返回创建的两个字典
        return xy2idDict, id2xyDict


    # 计算(x, y)在训练集中出现过的次数
    def calc_fixy(self):
        fixyDict = [defaultdict(int) for i in range(self.featureNum)]
        for i in range(len(self.trainDataList)):
            for j in range(self.featureNum):
                fixyDict[j][(self.trainDataList[i][j], self.trainLabelList[i])] += 1
        for i in fixyDict:
            self.n += len(i)
            
        #返回大字典
        return fixyDict

    # 最大熵模型的学习
    def calcPwy_x(self, X, y):
        #分子
        numerator = 0
        #分母
        Z = 0
        #对每个特征进行遍历
        for i in range(self.featureNum):
            if (X[i], y) in self.xy2idDict[i]:
                index = self.xy2idDict[i][(X[i], y)]
                numerator += self.w[index]
            if (X[i], 1-y) in self.xy2idDict[i]:
                index = self.xy2idDict[i][(X[i], 1-y)]
                Z += self.w[index]
        #计算分子的指数
        numerator = np.exp(numerator)
        #计算分母的z
        Z = np.exp(Z) + numerator
        
        #返回Pw(y|x)
        return numerator / Z

    # 设置迭代次数寻找最优解
    def maxEntropyTrain(self, iter = 10):
        for i in range(iter):

            Epxy = self.calcEpxy()

            sigmaList = [0] * self.n
            for j in range(self.n):
                sigmaList[j] = (1 / self.M) * np.log(self.Ep_xy[j] / Epxy[j])

            # 更新w
            self.w = [self.w[i] + sigmaList[i] for i in range(self.n)]

            #打印运行信息
            print('iter:%d:%d'%(i, iter))

    # 预测标签
    def predict(self, X):
        result = [0] * 2
        #循环计算两个概率
        for i in range(2):
            result[i] = self.calcPwy_x(X, i)
            
        #返回标签
        return result.index(max(result))

    # 对测试集进行测试
    def test(self):
        errorCnt = 0
        #对测试集中所有样本进行遍历
        for i in range(len(self.testDataList)):
            #预测该样本对应的标签
            result = self.predict(self.testDataList[i])
            if result != self.testLabelList[i]:   errorCnt += 1
                
        #返回准确率
        return 1 - errorCnt / len(self.testDataList)

In [4]:
# 获取训练集及标签
trainData, trainLabel = loadData('Mnist/mnist_train.csv')

# 获取测试集及标签
testData, testLabel = loadData('Mnist/mnist_test.csv')

#初始化最大熵类
maxEnt = maxEnt(trainData[:20000], trainLabel[:20000], testData, testLabel)

#开始训练
print('start to train')
maxEnt.maxEntropyTrain()

#开始测试
print('start to test')
accuracy = maxEnt.test()
print('the accuracy is:', accuracy)

start to train
iter:0:10
iter:1:10
iter:2:10
iter:3:10
iter:4:10
iter:5:10
iter:6:10
iter:7:10
iter:8:10
iter:9:10
start to test
the accuracy is: 0.902
