本程序是基于Python+numpy+theano+PIL开发，采用类似于LeNet5的CNN模型，
应用于olivettifaces人脸数据库，实现人脸识别的功能，模型的误差率降到了5%以下。

In [1]:
import os
import sys
import time

import numpy as np
from PIL import Image

import theano
import theano.tensor as T
from theano.tensor.signal.pool import max_pool_2d_same_size
from theano.tensor.nnet import conv



加载图像数据的函数，dataset_path即图像olivettifaces的路径
加载olivettifaces后，划分为train_data,valid_data,test_data三个数据集
函数返回train_data,valid_data,test_data以及对应的label

In [None]:
def load_data(dataset_path):
    img = Image.open(dataset_path)
    img_ndarray = np.asarray(img, dtype='float64')/256
    faces = np.empty((400, 2679))
    for row in range(20):
        for column in range(20):
            faces[row*20+column] = np.ndarray.flatten(img_ndarray[row*57:(row+1)*57,
                                                                 column*47:(column+1)*47])
    label = np.empty(400)
    for i in range(40):
        label = np.empty(400)
    for i in range(40):
        label[i*10: i*10+10] = i
    label = label.astype(np.int)
    
    #   分成训练集，验证集，测试集，大小如下
    train_data = np.empty((320, 2679))
    train_label = np.empty(320)
    valid_data = np.empty((40, 2679))
    valid_label = np.empty(40)
    test_data = np.empty((40, 2679))
    test_label = np.empty(40)
    
    for i in range(40):
        train_data[i*8: i*8+8] = faces[i*10: i*10+8]
        train_label[i*8: i*8+8] = label[i*10: i*10+8]
        valid_data[i] = faces[i*10+8]
        valid_label[i] = label[i*10+8]
        test_data[i] = faces[i*10+9]
        test_label[i] = label[i*10+9]
        
    #  将数据集定义成shared类型，才能将数据复制进GPU，利用GPU加速程序
    def shared_dataset(data_x, data_y, borrow=True):
        shared_x = theano.shared(np.asarray(data_x, dtype=theano.config.floatX), borrow=borrow)
        shared_y = theano.shared(np.asarray(data_y, dtype=theano.config.floatX), borrow=borrow)
        return shared_x, T.cast(shared_y, 'int32')

    train_set_x, train_set_y = shared_dataset(train_data, train_label)
    valid_set_x, valid_set_y = shared_dataset(valid_data, valid_label)
    test_set_x, test_set_y = shared_dataset(test_data, test_label)
    rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y), (test_set_x, test_set_y)]
    return rval

# 分类器， 即CNN最后一层，采用逻辑回归（softmax）
class LogisticRegression(object):
    def __init__(self, input, n_in, n_out):
        self.W = theano.shared(
            value=np.zeros(
                (n_in, n_out),
                dtype=theano.config,floatX
            ),
            name='W',
            borrow=True
        )
        self.b = theano.shared(
            value=np.zeros(
                (n_out,),
                dtype=theano.config.floatX
            ),
            name='b',
            borrow=True
        )
        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
        self.y_pred = T.argmax(self.p_y_given_x, axis=1)
        self.params = [self.W, self.b]
        
    def negative_log_likelohood(self, y):
        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
    
    def errors(self, y):
        if y.ndim != self.y_pred.ndim:
            raise TypeError(
                'y should have the same shape as self.y_pred',
                ('y', y.type, 'y_pred', self.y_pred.type)
            )
        if y.dtype.startswith('int'):
            return T.mean(T.neq(self.y_pred, y))
        else:
            raise NotImplementedError()
            
# 全连接层，分类器前一层
class HiddenLayer(object):
    def __init__(self, rng, input, n_in, n_out, W=None, b=None, activation=T.tanh):
        self.input = input
        
        if W is None:
            W_values = np.asarray(
                rng.uniform(
                    low = -np.sqrt(6./(n_in + n_out)),
                    high = np.sqrt(6./(n_in + n_out)),
                    size = (n_in, n_out)
                ),
                dtype=theano.config.floatX
            )
            if activation = theano.tensor.nnet.sigmoid:
                W_values *= 4
            W = theano.shared(value=W_values, name='W', borrow=True)
            
        if b is None:
            b_values = np.zeros((n_out,), dtype=theano.config.floatX)
            b = theano.shared(value=b_values, name='b', borrow=True)
        
        self.W = W
        self.b = b
        
        lin_output = T.dot(input, self.W) + self.b
        self.output = (
            lin_output if activation is None
            else activation(lin_output)
        )
        #  模型的参数
        self.params = [self.W, self.b]

# 卷积+采样(CONV+maxpooling)
class LeNetConvPoolLayer(object):
    def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2)):
        assert image_shape[1] == filter_shape[1]
        self.input = input
        
        fan_in = np.prod(filter_shape[1:])
        fan_out = (filter_shape[0]*np.prod(filter_shape[2:])/np.prod(poolsize))
        
        #  用随机权重初始化权重
        W_bound = np.sqrt(6./(fan_in + fan_out))
        self.W = theano.shared(
            np.asarray(
                rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
                dtype=theano.config.floatX
            ),
            borrow=True
        )
        
        #  偏差是1D张量-每个输出特征图一个偏差
        b_values = np.zeros((filter_shape[0],), dtype=theano.config.floatX)
        self.b = theano.shared(value=b_values, borrow=True)
        
        #  卷积
        conv_out = conv.conv2d(
            input=input,
            filters=self.W,
            filter_shape=filter_shape,
            image_shape=image_shape
        )
        
        #  子采样
        pooled_out = max_pool_2d_same_size(
            input=conv_out,
            ds=poolsize,
            ignore_border=True
        )
        
        self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
        
        #  存储该层参数
        self.params = [self.W, self.b]

# 保存训练参数的函数
def save_params(param1, param2, param3, param4):
    import cProfile
    write_file = open('params.pkl', 'wb')
    cProfile.dump(param1, write_file, -1)
    cProfile.dump(param2, write_file, -1)
    cProfile.dump(param3, write_file, -1)
    cProfile.dump(param4, write_file, -1)
    write_file.close()
    
        