# 数据集：Mnist
- 训练集数量：60000
- 测试集数量：10000
- 10 类 0 - 9

In [1]:
import numpy as np
from tqdm import tqdm
from collections import defaultdict

In [15]:
def load_data(fileName):
    """
    加载数据
    """
    dataList = []
    labelList = []
    
    fr = open(fileName, 'r')
    
    for line in tqdm(fr.readlines()):
        current_Line = line.strip().split(',')
        # 二分类
        if int(current_Line[0]) == 0:
            labelList.append(1)
        else:
            labelList.append(0)
        
        dataList.append(current_Line[1: ])
        
    return dataList, labelList

# max entropy
class maxEnt:
    """
    最大熵类
    """
    def __init__(self, trainDataList, trainLabelList, testDataList, testLabelList):
        """
        初始化参数
        """
        
        self.trainDataList = trainDataList
        self.trainLabelList = trainLabelList
        self.testDataList = testDataList
        self.testLabelList = testLabelList
        
        # 特征维度
        self.featureDim = len(trainDataList[0])
        
        #
        self.N = len(trainDataList)
        self.n = 0
        self.M = 10000
        self.fixy = self.calc_fixy()
        self.w = [0] * self.n
        self.xy2idDict, self.id2xyDict = self.createSearchDict()
        self.Ep_xy = self.calcEp_xy()
        
    def calcEpxy(self):
        """
        计算特征函数f(x, y)关于模型P(Y|X)与经验分布P_(X, Y)的期望
        """
        
        Epxy = [0] * self.n
        
        # 遍历
        for i in range(self.N):
            Pwxy = [0] * 2
            Pwxy[0] = self.calcPwy_x(self.trainDataList[i], 0)
            Pwxy[1] = self.calcPwy_x(self.trainDataList[i], 1)
            
            for feature in range(self.featureDim):
                for y in range(2):
                    if (self.trainDataList[i][feature], y) in self.fixy[feature]:
                        id = self.xy2idDict[feature][(self.trainDataList[i][feature], y)]
                        Epxy[id] += (1 / self.N) * Pwxy[y]
                        
        return Epxy
    
    def calcEp_xy(self):
        """
        计算特征函数f(x, y)关于经验分布P_(x, y)的期望值
        """
        Ep_xy = [0] * self.n
        
        # 遍历每一个特征
        for feature in range(self.featureDim):
            for (x, y) in self.fixy[feature]:
                id = self.xy2idDict[feature][(x, y)]
                Ep_xy[id] = self.fixy[feature][(x, y)] / self.N
                
        
        return Ep_xy
    
    def createSearchDict(self):
        """
        创建字典
        
        xy2idDict
        id2xyDict
        """
        
        xy2idDict = [{} for i in range(self.featureDim)]
        id2xyDict = {}
        
        index = 0
        
        for feature in range(self.featureDim):
            for (x, y) in self.fixy[feature]:
                xy2idDict[feature][(x, y)] = index
                id2xyDict[index] = (x, y)
                index += 1
                
        return xy2idDict, id2xyDict
    
    def calc_fixy(self):
        """
        计算(x, y)在训练集中出现过的次数
        """
        fixyDict = [defaultdict(int) for i in range(self.featureDim)]
        
        for i in range(len(self.trainDataList)):
            for j in range(self.featureDim):
                fixyDict[j][(self.trainDataList[i][j], self.trainLabelList[i])] += 1 
        
        for i in fixyDict:
            self.n += len(i)
        
        return fixyDict
    
    def calcPwy_x(self, X, y):
        """
        最大熵模型
        """
        numrator = 0
        Z = 0
        
        for i in range(self.featureDim):
            if (X[i], y) in self.xy2idDict[i]:
                index = self.xy2idDict[i][(X[i], y)]
                numrator += self.w[index]
                
            if (X[i], 1 - y) in self.xy2idDict[i]:
                index = self.xy2idDict[i][(X[i], 1 -y)]
                Z += self.w[index]
                
        numrator = np.exp(numrator)
        Z = np.exp(Z) + numrator
        
        return numrator / Z
    
    def maxEntropyTrain(self, max_iter = 100):
        for i in range(max_iter):
            Epxy = self.calcEpxy()
            sigmaList = [0] * self.n
            for j in range(self.n):
                sigmaList[j] = (1 / self.M) * np.log(self.Ep_xy[j] / Epxy[j])
                
            self.w = [self.w[i] + sigmaList[i] for i in range(self.n)]
            
            print("Iter: ", i)
            
    def predict(self, X):
        """
        预测
        """
        result = [0] * 2
        for i in range(2):
            result[i] = self.calcPwy_x(X, i)
            
        return result.index(max(result))
    
    def test(self):
        error_count = 0
        for i in range(len(self.testDataList)):
            result = self.predict(self.testDataList[i])
            if result != self.testLabelList[i]:
                error_count += 1
                
        return 1 - error_count / len(self.testDataList)

In [5]:
trainData, trainLabel = load_data('./mnist/mnist_train.csv')
testData, testLabel = load_data('./mnist/mnist_test.csv')

100%|██████████████████████████████████| 60000/60000 [00:02<00:00, 23451.64it/s]
100%|██████████████████████████████████| 10000/10000 [00:00<00:00, 29575.90it/s]


In [17]:
maxEnt = maxEnt(trainData[:2000], trainLabel[: 2000], testData[: 1000], testLabel[:1000])
maxEnt.maxEntropyTrain()


Iter:  0
Iter:  1
Iter:  2
Iter:  3
Iter:  4
Iter:  5
Iter:  6
Iter:  7
Iter:  8
Iter:  9
Iter:  10
Iter:  11
Iter:  12
Iter:  13
Iter:  14
Iter:  15
Iter:  16
Iter:  17
Iter:  18
Iter:  19
Iter:  20
Iter:  21
Iter:  22
Iter:  23
Iter:  24
Iter:  25
Iter:  26
Iter:  27
Iter:  28
Iter:  29
Iter:  30
Iter:  31
Iter:  32
Iter:  33
Iter:  34
Iter:  35
Iter:  36
Iter:  37
Iter:  38
Iter:  39
Iter:  40
Iter:  41
Iter:  42
Iter:  43
Iter:  44
Iter:  45
Iter:  46
Iter:  47
Iter:  48
Iter:  49
Iter:  50
Iter:  51
Iter:  52
Iter:  53
Iter:  54
Iter:  55
Iter:  56
Iter:  57
Iter:  58
Iter:  59
Iter:  60
Iter:  61
Iter:  62
Iter:  63
Iter:  64
Iter:  65
Iter:  66
Iter:  67
Iter:  68
Iter:  69
Iter:  70
Iter:  71
Iter:  72
Iter:  73
Iter:  74
Iter:  75
Iter:  76
Iter:  77
Iter:  78
Iter:  79
Iter:  80
Iter:  81
Iter:  82
Iter:  83
Iter:  84
Iter:  85
Iter:  86
Iter:  87
Iter:  88
Iter:  89
Iter:  90
Iter:  91
Iter:  92
Iter:  93
Iter:  94
Iter:  95
Iter:  96
Iter:  97
Iter:  98
Iter:  99


In [18]:
accuracy = maxEnt.test()
print("Accuracy: ", accuracy) 

Accuracy:  0.916
