In [1]:
## 测试最优参数文件，运行逻辑保持不变。仅进行多重循环测试寻找最优参数 ##
import numpy as np
import math
import scipy.io as scio
import seaborn as sns
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.model_selection import train_test_split
from pyrcn.util import concatenate_sequences
from pyrcn.linear_model import IncrementalRegression
from sklearn.preprocessing import LabelBinarizer
from sklearn.datasets import load_digits as sklearn_load_digits
from pyrcn.metrics import confusion_matrix,accuracy_score,precision_score,recall_score,f1_score
from typing import cast
import pickle

In [2]:
save_test_best_parameter_file = "./test_best_parameter_save/"

In [None]:
X_ori, y_ori = sklearn_load_digits(n_class=10, return_X_y=True, as_frame=False)
for i in range(len(X_ori)):
    X_ori[i] = np.where(X_ori[i] <= 4, 0, 1)
X = np.empty(shape=(X_ori.shape[0],), dtype=object)
y = np.empty(shape=(X_ori.shape[0],), dtype=object)
for k, (X_single, y_single) in enumerate(zip(X_ori, y_ori)):
    X[k] = X_single.reshape(8, 8).T
    y[k] = np.atleast_1d(y_single)
    
# 将y中所有的单个值数组元素转为单个值，去掉元素内的数组包装
# 即对y进行如下转化：一维数组元素为一维数组->一维数组元素为值
stratify = np.asarray([np.unique(yt) for yt in y]).flatten()

# 将数据集乱序，分割为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=stratify, random_state=42)

# 对训练集和测试集的数据进行深拷贝
X_tr = np.copy(X_train)
y_tr = np.copy(y_train)
X_te = np.copy(X_test)
y_te = np.copy(y_test)

# 遍历目标向量，每个元素从单个值扩展为1*8的数组元素
# 用于将值转化为一个序列，使得该问题转变为序列到序列的问题
for k, _ in enumerate(y_tr):
    y_tr[k] = np.repeat(y_tr[k], 8, 0)
for k, _ in enumerate(y_te):
    y_te[k] = np.repeat(y_te[k], 8, 0)

In [None]:
# 定义image_to_signal类，用于将图片转为时域信号进行预测
class image_to_signal:
    # 初始化函数
    def __init__(self,
                X_signal : np.array = None,
                Y_result : np.array = None,
                N_res : int = 50,
                K_in : int = -1,
                pulse_time : float = 1,
                time_frame : float = 3,
                interval_time : float = 2, 
                sparsity : float = -1,
                random_sequence: np.array = None):
        '''
        X_signal: 输入的图片，经过了二级化处理，可以根据像素确定是否有信号
        Y_result: 图片的分类结果，已进行了预处理后的图片，用于训练的拟合
        N_res: 隐藏层神经元数量，对应忆阻器的数量
        K_in: 稀疏性，对应的是每个忆阻器读取的每行图片元素数量
        pulse_time: 脉冲时间长度，对应的是单个信号的激活时间
        time_frame: 时间帧长度，对应的是单个信号的激活和衰减时间
        interval_time: 时间步间隔长度，对应的是每个
        sparsity: 稀疏性，对应的是每个忆阻器读取的每行图片元素数量比例
        random_sequence: 随机序列矩阵，从外面获得，使得不同类可共享
        '''
        # 根据mat文件设置激活信号的值计算和衰减值的计算
        self.up = scio.loadmat('../up.mat')["p"][0]
        self.down = scio.loadmat('../down.mat')["p2"][0]
        
        # 记录忆阻器数量
        self.N_res = N_res
        
        # 记录脉冲时间长度，时间帧长度和时间步的间隔长度
        self.pulse_time = pulse_time
        self.time_frame = time_frame
        self.interval_time = interval_time
        
        # 记录稀疏性和稀疏性比例
        self.K_in = K_in
        self.sparsity = sparsity
        
        # 对储备池状态信息进行拟合，使用回归器
        self.regressor = IncrementalRegression(alpha=1e-4)
        # 使用LabelBinarizer对分类进行one-hot编码
        self._encoder = LabelBinarizer()
        
        # 调用init_use函数，可以多次初始化输入图片和分类结果
        self.init_use(X_signal,Y_result)
        
        self.random_sequence = random_sequence
        
    #初始化使用函数，参数同上
    def init_use(self,
                X_signal : np.array = None,
                Y_result : np.array = None):
        
        # 记录输入图片和对应分类
        self.input = X_signal
        self.res = Y_result
        
        # 根据输入的图片大小计算时间步数量、时间帧数量和输入图片的特征数(即每行像素数量)
        self.num_step = self.input[0].shape[0]
        self.num_frame = self.input[0].shape[1]
        self.N_features = self.input[0].shape[1]

        if self.K_in > 0:
            if self.K_in <=  self.N_features:
                self.num_frame = self.K_in
        elif self.sparsity > 0:
            self.num_frame = math.ceil(self.sparsity*self.num_frame)
        

        
        # 记录忆阻器对应的储备池状态信息
        self.hidden_layer_state = np.ndarray(shape = (self.input.shape[0]),dtype = object)
        for i in range(self.hidden_layer_state.shape[0]):
            self.hidden_layer_state[i] = np.zeros(shape = (self.num_step,self.N_res),dtype = float)
        self.state_index = 0
        
        # 记录最新一次预测的储备池状态信息
        self.test_hidden_layer_state = None
        
        # 完成上述处理后，将输入图片和对应分类进行序列连接，得到相应的连续ndarray数组，用于进一步处理
        self.input,self.res,self.sequence_ranges = concatenate_sequences(self.input,
                                                                         self.res,
                                                                         sequence_to_value = False)
        
    # 定义脉冲激活函数
    def caculate_pulse_up(self,value):
        x = self.pulse_time
        up_value = self.up[0]* x**5 + \
                self.up[1]* x**4 + \
                self.up[2]* x**3 + \
                self.up[3]* x**2 + \
                self.up[4]* x + self.up[5]
        return value + up_value
    
    # 定义衰减函数,在遇到激活信号时进行时间结算,或者计算隐藏层状态信号
    def caculate_signal_down(self,value,have_down_time):
        x = have_down_time
        down_ratio = self.down[0] * math.exp(-self.down[1]*x) + self.down[2] + \
                     self.down[3] * math.exp(-self.down[4]*x) + self.down[5] + \
                     self.down[6] * math.exp(-self.down[7]*x) + self.down[8]
        return value * down_ratio
    
    # 定义时间帧计算函数
    def caculate_time_frame(self,value,have_down_time,signal):
        if signal == 0:
            return value, have_down_time + self.time_frame
        else:
            value = self.caculate_signal_down(value,have_down_time)
            value = self.caculate_pulse_up(value)
            return value, self.time_frame - self.pulse_time
         
    # 定义时间步计算函数
    def caculate_time_step(self,value,have_down_time,image_row,random_sequence):
        for i in range(random_sequence.shape[0]):
            value,have_down_time = self.caculate_time_frame(value,
                                                       have_down_time,
                                                       image_row[random_sequence[i]])
        return value, have_down_time
    
    # 定义忆阻器读取状态计算函数，对应储备池的神经元状态信息
    # 每次读取一张图片进行记录
    def caculate_hidden_layer_state(self, X: np.ndarray = None, 
                                    hidden_layer_state: np.ndarray = None,
                                    state_index: int = 0):
        
        # j循环是忆阻器群
        for j in range(self.N_res):
            # value是输入新的时间步时忆阻器的当前信号值
            value = 0.0
            # have_down_time是从上次激活开始的总衰减时间
            have_down_time = 0.0

            # k循环是时间步
            for k in range(self.num_step):
                # 计算忆阻器读取图片特定行转换的信号后的信号值变化和已衰减时间变化
                # 如没有激活信号，则信号不变，改变已衰减时间，从而在需要时计算新的激活前信号值
                value,have_down_time = self.caculate_time_step(value,
                                                          have_down_time,
                                                          X[k],
                                                          # self.input[i*self.num_step+k],
                                                          self.random_sequence[j][k])
                # 根据变化后的情况计算隐藏层状态
                hidden_layer_state[state_index][k][j] = self.caculate_signal_down(value,have_down_time)
                # 根据时间步间隔时间修改已衰减时间
                have_down_time += self.interval_time
        return hidden_layer_state
        
    # 定义部分训练函数，用于计算序列的部分图片，保证逆矩阵的推迟计算
    def partial_fit(self, 
                    X: np.ndarray = None, 
                    y: np.ndarray = None, 
                    postpone_inverse: bool = False):
        # 计算输入图片对应的储备池信息
        self.caculate_hidden_layer_state(X,self.hidden_layer_state,self.state_index)
        # 对图片进行拟合
        self.regressor.partial_fit(self.hidden_layer_state[self.state_index],y,postpone_inverse)
        # 状态索引+1
        self.state_index += 1

        
    # 定义训练函数,可以输入新的数据，也可以使用已有数据
    # 注意使用新的数据会导致原有的数据被重置
    def fit(self,
            X_train : np.ndarray = None, 
            Y_train : np.ndarray = None):
        # 修改输入的input和res
        if X_train is not None and Y_train is not None:
            self.init_use(X_train,y_train)
        
        # 对结果进行二进制编码，使用标签编码器,完成编码
        self._encoder = LabelBinarizer().fit(self.res)
        self.res = self._encoder.transform(self.res)
        
        # 拟合最后一张图片之前的图片，并推迟计算逆矩阵
        [self.partial_fit(X = self.input[idx[0]:idx[1],...],
                          y = self.res[idx[0]:idx[1],...],
                          postpone_inverse=True)
        for idx in self.sequence_ranges[:-1]]
        
        # 拟合最后一张不推迟逆矩阵的计算
        self.partial_fit(X = self.input[self.sequence_ranges[-1][0]:, ...],
                         y = self.res[self.sequence_ranges[-1][0]:, ...],
                         postpone_inverse=False)
    
    # 定义预测函数
    def predict(self, X_test: np.ndarray = None):
        # 初始化临时的储备池，用于记录测试集的结果
        hidden_layer_state = np.ndarray(shape = (X_test.shape[0]),dtype = object)
        for i in range(hidden_layer_state.shape[0]):
            hidden_layer_state[i] = np.zeros(shape = (self.num_step,self.N_res),dtype = float)
        
        # 记录图片索引
        state_index = 0
        
        # 记录预测结果
        y = np.empty(shape=X_test.shape, dtype=object)
        
        # 进行储备池计算
        for i in range(len(X_test)):
            self.caculate_hidden_layer_state(X_test[i], hidden_layer_state,state_index)
            # 计算后统计各时间步的可能性，
            y[i] = self.regressor.predict(hidden_layer_state[i])
            y[i] = self.softmax(cast(np.ndarray,np.sum(y[i], axis=0)))
            state_index += 1
            
        # 保存测试集的对应储备池
        self.test_hidden_layer_state = hidden_layer_state
        
        # 返回预测结果
        return y
    
    # 定义softmax函数，将值转为概率形式
    def softmax(self,x):
        exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
        return exp_x / exp_x.sum(axis=-1, keepdims=True)

In [None]:
# 指标类，用于统计预测结果的各项指标和混淆矩阵的可视化
class metrics:
    def __init__(self,
                 y_test :np.ndarray,
                 y_predict :np.ndarray):
        '''
        y_test:真实的测试结果，一维的ndarray，每个元素依次是对应的结果
        y_predict:预测结果，一维或者二维的ndarray
        '''
        self.y_real = y_test
        self.y_all_predict = None
        self.y_predict = None
        
        # 如果测试结果与预测结果一一对应
        if len(y_predict[0]) == 1:
            self.y_predict = y_predict
        
        # 否则就是测试结果对各个分类结果均有预测
        else:
            self.y_all_predict = y_predict
            self.y_predict = np.copy(y_predict)
            for i in range(len(self.y_predict)):
                self.y_predict[i] = np.atleast_1d(np.argmax(self.y_predict[i]))
        
        # 计算准确率、精确率、召回率和混淆矩阵
        self._acc_score = accuracy_score(self.y_real,self.y_predict)
        self._pre_score = precision_score(self.y_real,self.y_predict,average='macro')
        self._rec_score = recall_score(self.y_real,self.y_predict,average='macro')
        self._confusion_matrix = confusion_matrix(self.y_real,self.y_predict)
        
    # 获得准确率
    @property
    def acc_score(self):
        return self._acc_score
    
    # 计算准确率
    @acc_score.setter
    def acc_score(self,acc_score):
        self._acc_score = acc_score
    
    # 获得精确率
    @property
    def pre_score(self):
        return self._pre_score
    
    # 计算精确率
    @pre_score.setter
    def pre_score(self,pre_score):
        self._pre_score = pre_score
        
    # 获得召回率
    @property
    def rec_score(self):
        return self._rec_score
    
    # 计算召回率
    @rec_score.setter
    def rec_score(self,rec_score):
        self._rec_score = rec_score
        
    # 获得混淆矩阵
    @property
    def confusion_matrix(self):
        return self._confusion_matrix
    
    # 计算混淆矩阵
    @confusion_matrix.setter
    def confusion_matrix(self,confusion_matrix):
        self._confusion_matrix = confusion_matrix
    
    # 混淆矩阵可视化
    def show_confusion_matrix(self):
        
        # 绘制设置的参数
        
        # 利用mcolors的LinearSegmentedColormap类的from_list函数
        # 通过线性插值设置颜色的起始渐变色，结束渐变色和插值数
        clist = ['skyblue','darkblue']
        newcmp = mcolors.LinearSegmentedColormap.from_list('chaos',clist,N=256)
        
        # 设置seaborn风格,字体设置为Arial
        sns.set(font = 'Arial')
        # 设置画布，大小为5*5
        fig, ax = plt.subplots(figsize=(5,5))

        # 设置seaborn绘图为热力图
        '''
        data对应矩阵，ax对应设置的图
        linecolor是分割线颜色，linewidths是分割线宽度
        cbar是设置图例，square是热力图的单元格是否为方格
        annot是是否要在方格展示数据
        cbar_kws是图例的设置参数
            orientation是图例方向
            pad是图例与热力图的距离
            fraction是图例的大小比例
        annot_kws是方格展示数据的设置参数
        '''
        sns.heatmap(data = self._confusion_matrix, 
                    ax = ax, 
                    vmin = 0, vmax = np.max(self._confusion_matrix),
                    cmap = newcmp,
                    linecolor="black", linewidths = 0.01, 
                    cbar = True, square = True,
                    annot = True,
                    cbar_kws={ 'orientation': 'vertical',
                               "pad": 0.05,
                               "fraction": 0.04},
                    annot_kws={'size': 9,
                               'weight': 'bold'})

        # 设置x轴和y轴的刻度内容和刻度位置
        ax.set_xticks(np.arange(0.5,10,1))
        ax.set_yticks(np.arange(0.5,10,1))
        ticklabels = [str(i) for i in range(0,10,1)]
        ax.set_xticklabels(ticklabels, color = 'black', rotation= 0)
        ax.set_yticklabels(ticklabels, color = 'black')

        # 设置刻度线和大小
        ax.tick_params(bottom=True, left=True)
        ax.tick_params(labelsize=9)

        # 设置x轴和y轴的标签名和图名
        ax.set_xlabel("Predict digit", fontsize = 11, labelpad = 4, color = 'black')
        ax.set_ylabel("True digit", fontsize = 11 , labelpad = 4 , color = 'black')
        ax.set_title("Confusion matrix", fontsize = 13, color = 'black')

        # 保存和展示图像
        plt.savefig('confusion_matrix.png')
        plt.show()
    
    # 展示对单个图片的预测结果
    def show_one_image_predict(self,image_index):
        # 获得对应图片的各数字预测概率
        y_use = self.y_all_predict[image_index]
       
        # 绘制图像
        fig, ax = plt.subplots(figsize=(5,3.75))

        # 绘制柱状图,设置x洲和y轴数据,柱宽度,柱的位置
        colors = ['deepskyblue' for _ in range(len(y_use))]
        colors[np.argmax(y_use)] = 'red'
        ax.bar(x = np.arange(1,11,1), height = y_use, 
               width=0.7, align="center",
               color=colors)
               # color=['dimgrey','red','darkorange','yellow','deepskyblue','limegreen','blueviolet','peru']

        # 设置x轴和y轴的刻度内容和刻度位置
        ax.set_xticks(np.arange(1,11,1))
        ax.set_xticklabels(['0','1','2','3','4','5','6','7','8','9'], color = 'black')
        ax.set_ylim(0, y_use.max() * 1.1)

        # 设置x轴和y轴的标签名和图名
        ax.set_xlabel("Digit", fontsize = 11, labelpad = 4, color = 'black')
        ax.set_ylabel("Probability", fontsize = 11 , labelpad = 4 , color = 'black')
        ax.set_title("All probabilities of one digit image", fontsize = 13, color = 'black')
        
        ax.text(0.5, y_use.max()* 1.05, 
                s= "\""+str(self.y_real[image_index][0])+"\"", ha='left', va='center', 
                fontsize=11, color='red', weight='bold')

        xticks = ax.get_xticks()
        for i in range(len(y_use)):
            xy = (xticks[i], y_use[i] * 1.01)
            s = str(round(y_use[i],2))
            ax.annotate(
                text=s,  # 要添加的文本
                xy=xy,  # 将文本添加到哪个位置
                fontsize=8,  # 标签大小
                color="black",  # 标签颜色
                ha="center",  # 水平对齐
                va="baseline"  # 垂直对齐
            )

        # 保存和展示图像
        plt.savefig('One digit probabilities.png')
        plt.show()

In [None]:
# 用于搜索的三个列表
K_in_list = [2]
time_frame_list = [i for i in np.arange(1,5.1,0.5)]
interval_time_list = [i for i in range(1,7,1)]
# 动态忆阻器数量
n_res = 400

In [None]:
#将写好的随机序列保存到文件中，便于后面使用
for i in range(2,3,1):
    random_sequence = np.zeros(shape = (n_res,8,i), dtype=int)
    for random_i in range(n_res):
        for random_j in range(8):
            random_sequence[random_i][random_j] = np.sort(np.random.permutation(8)[0:i])
            with open(save_test_best_parameter_file+'random_sequence'+str(i)+'.pkl', 'wb') as f:
                pickle.dump(random_sequence, f)

In [None]:
# 结果保存
list_res_all = []

# 输出结果对比展示
print("K_in,time_frame,interval_time,acc_score")

# 传入的变量
random_sequence = None
    
# 循环搜索得到最优参数
for i in K_in_list:

    # 读取文件获得随机序列
    with open('random_sequence'+str(i)+'.pkl', 'rb') as f:
        random_sequence = pickle.load(f)
    for j in time_frame_list:
        for k in interval_time_list:
            # 进行实例化
            example = image_to_signal(X_signal = X_tr, Y_result= y_tr, N_res = n_res, K_in = i,
                          pulse_time = 1, time_frame = j, interval_time = k, random_sequence = random_sequence)
            # 训练
            example.fit()
            # 预测
            y_te_predict = example.predict(X_test = X_te)
            # 获得准确率并保存到结果中
            metric = metrics(y_test,y_te_predict)
            temp = [i,j,k,metric.acc_score]
            list_res_all.append(temp)
            # 输出信息
            print(i,j,k,metric.acc_score,sep=" , ")
    # 结果保存
    with open(save_test_best_parameter_file+'parameter_test_result'+str(i)+'.pkl', 'wb') as f:
        pickle.dump(list_res_all, f)