In [None]:
from hls4ml.converters import convert_from_keras_model
from hls4ml.utils import config_from_keras_model
from sklearn.metrics import accuracy_score
from keras.utils import to_categorical
import matplotlib.pyplot as plt
import tensorflow as tf
import scipy.io as sio
import numpy as np
import plotting

# 加载数据集
def load_data():
    train_data = sio.loadmat('train_32x32.mat')
    test_data = sio.loadmat('test_32x32.mat')
    X_train = np.transpose(train_data['X'], (3, 0, 1, 2))
    y_train = train_data['y'].reshape(-1)
    X_test = np.transpose(test_data['X'], (3, 0, 1, 2))
    y_test = test_data['y'].reshape(-1)
    y_train[y_train == 10] = 0
    y_test[y_test == 10] = 0
    return X_train, y_train, X_test, y_test

# 归一化数据
def normalize_data(X_train, X_test):
    X_train = X_train / 255.0
    X_test  = X_test  / 255.0
    return X_train, X_test

# 转换为 one-hot 编码
def one_hot_encode_labels(y_train, y_test):
    y_train_one_hot = to_categorical(y_train, num_classes=10)
    y_test_one_hot = to_categorical(y_test, num_classes=10)
    return y_train_one_hot, y_test_one_hot

# 加载数据
X_train, y_train, X_test, y_test = load_data()
X_train, X_test = normalize_data(X_train, X_test)
y_train, y_test = one_hot_encode_labels(y_train, y_test)

# 取得测试集的前10%(全部测试用时太久)
num_samples = len(X_test) // 10
X_test_subset = X_test[:num_samples]
y_test_subset = y_test[:num_samples]

# 可视化模型预测结果
def plot_predictions(images, true_labels, predicted_labels, start=0):
    plt.figure(figsize=(10, 10))
    for i in range(start, start + 25):
        plt.subplot(5, 5, i - start + 1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        clipped_image = np.clip(images[i], 0, 1)  # 剪裁图像数据到合适的范围
        plt.imshow(clipped_image)
        true_label = true_labels[i]
        predicted_label = predicted_labels[i]
        color = 'blue' if true_label == predicted_label else 'red'
        plt.xlabel(f"True: {true_label}\nPred: {predicted_label}", color=color)
    plt.subplots_adjust(hspace=0.3, wspace=0.3)
    plt.show()

# 保存数据到 .npy 文件
def save_data(X_test, y_test):
    np.save('X_test.npy', X_test)
    np.save('y_test.npy', y_test)

# 调用保存函数
save_data(X_test_subset, y_test_subset)
print(X_test_subset[0])
print("--------------")
print(y_test_subset[0])

In [None]:
from keras.layers import Input, add, Flatten, BatchNormalization, Activation, MaxPooling2D
from qkeras import QConv2D, QDense, QActivation
from keras.models import Model, Sequential
from keras.optimizers import Adam

# 定义量化位宽
bit_width = 5
integer_bits = 0
quantization_params = 1
quant_bits=f"quantized_bits({bit_width}, {integer_bits}, {quantization_params})"

# 检查每层的可训练参数量
def check_model_params(model, threshold=4096):
    for layer in model.layers:
        num_params = sum(tf.keras.backend.count_params(p) for p in layer.trainable_weights)
        if num_params > threshold:
            raise ValueError(f"Layer {layer.name} has {num_params} parameters, which exceeds the threshold of {threshold}.")
        
# 生成ResNet块
def resnet_block(inputs, filters, kernel_size, quant_bits):
    x = QConv2D(filters, kernel_size=kernel_size, padding='same',
                kernel_quantizer=quant_bits, bias_quantizer=quant_bits)(inputs)
    x = BatchNormalization()(x)
    x = QActivation(activation=f'quantized_relu({bit_width},{integer_bits},{quantization_params})')(x)
    return x

# 生成ResNet模型
def create_resnet(input_shape, num_classes, quant_bits):
    inputs = Input(shape=input_shape)
    x = resnet_block(inputs, 10, (3, 3), quant_bits)  # 缩小filter数量
    x = resnet_block(x, 10, (3, 3), quant_bits)
    shortcut = QConv2D(10, kernel_size=(1, 1), padding='same', bias_quantizer=quant_bits)(inputs)  # 调整shortcut的形状
    x = add([x, shortcut])  # 残差连接
    x = MaxPooling2D((2, 2))(x)
    x = QConv2D(4, (3,3),  kernel_quantizer=quant_bits, bias_quantizer=quant_bits)(x)
    x = MaxPooling2D((2, 2))(x)
    x = QConv2D(4, (3,3),  kernel_quantizer=quant_bits, bias_quantizer=quant_bits)(x)
    x = MaxPooling2D((2, 2))(x)
    x = Flatten()(x)
    x = QDense(num_classes, kernel_quantizer=quant_bits, bias_quantizer=quant_bits)(x)
    outputs = Activation('softmax')(x)
    model = Model(inputs=inputs, outputs=outputs)
    return model

qmodel = create_resnet(X_train.shape[1:], 10, quant_bits)
qmodel.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
qmodel.summary()
check_model_params(qmodel)

# 训练模型
qmodel.fit(X_train, y_train, epochs=25, batch_size=32, validation_split=0.1)

# 进行预测
predictions = qmodel.predict(X_test)
# 获取每个样本的预测类别
predicted_classes = np.argmax(predictions, axis=1)
true_classes = np.argmax(y_test, axis=1)
# 计算准确率
accuracy = accuracy_score(true_classes, predicted_classes)
print(f'模型在测试集上的准确率: {accuracy * 100:.2f}%')

# 保存模型
qmodel.save('model/test.h5')

In [None]:
import tensorflow_model_optimization as tfmot

# 创建一个剪枝策略
pruning_params = {
    'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(
        initial_sparsity=0.00,
        final_sparsity=0.40,
        begin_step=0,
        end_step=len(X_train) * 25 // 32)  # 30个epoch, batch_size为32
}
# 对模型进行剪枝
model_for_pruning = tfmot.sparsity.keras.prune_low_magnitude(qmodel, **pruning_params)
model_for_pruning.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

# 添加一个剪枝回调
callbacks = [
    tfmot.sparsity.keras.UpdatePruningStep(),
    tfmot.sparsity.keras.PruningSummaries(log_dir='/tmp/logs')
]
# 训练剪枝后的模型
model_for_pruning.fit(X_train, y_train, epochs=25, batch_size=32, validation_split=0.1, callbacks=callbacks)

# 去掉剪枝部分，导出普通模型
pmodel = tfmot.sparsity.keras.strip_pruning(model_for_pruning)
pmodel.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

# 再次进行预测和评估
predictions = pmodel.predict(X_test)
predicted_classes = np.argmax(predictions, axis=1)
true_classes = np.argmax(y_test, axis=1)
accuracy = accuracy_score(true_classes, predicted_classes)
print(f'剪枝后模型在测试集上的准确率: {accuracy * 100:.2f}%')

# 保存模型
pmodel.save('model/test.h5')

In [None]:
from keras.models import load_model
from qkeras.utils import _add_supported_quantized_objects
# 加载模型
co = {}
_add_supported_quantized_objects(co)
model = load_model('model/test.h5', custom_objects=co)
predictions = model.predict(X_test)
predicted_classes = np.argmax(predictions, axis=1)
true_classes = np.argmax(y_test, axis=1)
accuracy = accuracy_score(true_classes, predicted_classes)
print(f'模型在测试集上的准确率: {accuracy * 100:.2f}%')

In [None]:
# 测试CPU推理速度
predictions = model.predict(X_test_subset)

In [None]:
# 将模型转换为HLS格式
config = config_from_keras_model(model, backend='VivadoAccelerator',
                                 default_precision='fixed<13,6>',
                                 #max_precision='fixed<32,16>',
                                 granularity='model')
config['Model']['ReuseFactor'] = 1000
config['Model']['Strategy'] = 'Resource'  # Latency/Resource/Unrolled
plotting.print_dict(config)

hls_model = convert_from_keras_model(model, hls_config=config,
                                     backend='VivadoAccelerator', io_type='io_stream',
                                     output_dir='hls4ml_prj/test', board='pynq-z2')
# 编译HLS模型
hls_model.compile()

In [None]:
# 将 X_test 转换为连续的内存布局
X_test_contiguous = np.ascontiguousarray(X_test_subset)
# 计算准确率
predictions = hls_model.predict(X_test_contiguous)
predicted_classes = np.argmax(predictions, axis=1)
true_classes = np.argmax(y_test_subset, axis=1)
accuracy = accuracy_score(true_classes, predicted_classes)
print(f'HLS模型的准确率: {accuracy * 100:.2f}%')
# 可视化模型预测结果
#plot_predictions(X_test_subset, y_test_subset, predicted_classes, start=50)

In [None]:
hls_model.build(csim=False, export=True, bitfile=True)
#hls4ml.report.read_vivado_report('hls4ml_prj/test')

In [None]:
def extract_power_values(file_path):
    # 打开文件并读取第 32 至 45 行
    with open(file_path, 'r') as file:
        lines = file.readlines()[31:45] # 注意 Python 的索引是从 0 开始的，所以 32 行对应索引 31
    
    # 初始化返回值
    total_on_chip_power = None
    dynamic_power = None
    device_static_power = None

    # 遍历每一行，提取所需的值
    for line in lines:
        if 'Total On-Chip Power (W)' in line:
            # 提取字符串并转为 float
            total_on_chip_power = float(line.split('|')[2].strip())
        elif 'Dynamic (W)' in line:
            dynamic_power = float(line.split('|')[2].strip())
        elif 'Device Static (W)' in line:
            device_static_power = float(line.split('|')[2].strip())

    return {
        '芯片总功耗(W)': total_on_chip_power,
        '动态功耗(W)': dynamic_power,
        '静态功耗(W)': device_static_power
    }

# 示例调用
file_path = 'hls4ml_prj/test/myproject_vivado_accelerator/project_1.runs/impl_1/design_1_wrapper_power_routed.rpt'
power_values = extract_power_values(file_path)
print(power_values)

In [None]:
def parse_resource_utilization(file_path):
    def extract_utilization(lines, start, end, targets):
        results = {}
        for line in lines[start:end]:
            parts = [p.strip() for p in line.strip().split('|')]
            if len(parts) >= 6 and parts[1] in targets:
                # 处理百分比符号并转换为浮点数
                util = parts[5].replace('%', '')
                results[parts[1]] = float(util)
        return results
    
    with open(file_path, 'r') as f:
        content = f.readlines()

    # 确保索引从 0开始计算 (行号-1)
    lut_ff = extract_utilization(content, 30, 44, ['Slice LUTs', 'Slice Registers'])
    bram = extract_utilization(content, 100, 109, ['Block RAM Tile'])
    dsp = extract_utilization(content, 115, 121, ['DSPs'])

    return {
        'LUT': lut_ff.get('Slice LUTs'),
        'FF': lut_ff.get('Slice Registers'),
        'BRAM': bram.get('Block RAM Tile'),
        'DSP': dsp.get('DSPs')
    }

# 使用示例
file_path = 'hls4ml_prj/test/myproject_vivado_accelerator/project_1.runs/impl_1/design_1_wrapper_utilization_placed.rpt'
result = parse_resource_utilization(file_path)
print(result)

In [None]:
import shutil
import os

source_bit = "hls4ml_prj/test/myproject_vivado_accelerator/project_1.runs/impl_1/design_1_wrapper.bit"
dest_bit = "pynq-z2/hls4ml_nn.bit"

source_hwh = "hls4ml_prj/test/myproject_vivado_accelerator/project_1.srcs/sources_1/bd/design_1/hw_handoff/design_1.hwh"
dest_hwh = "pynq-z2/hls4ml_nn.hwh"

source_driver = "hls4ml_prj/test/axi_stream_driver.py"
dest_driver = "pynq-z2/axi_stream_driver.py"

# 如果目标文件夹不存在，则创建
dest_dir = "pynq-z2"
if not os.path.exists(dest_dir):
    os.makedirs(dest_dir)
    print(f"已创建目标文件夹: {dest_dir}")

# 复制 .bit 文件并重命名
try:
    shutil.copy(source_bit, dest_bit)
    print(f"成功复制 '{source_bit}' 到 '{dest_bit}'")
except Exception as e:
    print(f"复制 '{source_bit}' 到 '{dest_bit}' 时出错: {e}")

# 复制 .hwh 文件并重命名
try:
    shutil.copy(source_hwh, dest_hwh)
    print(f"成功复制 '{source_hwh}' 到 '{dest_hwh}'")
except Exception as e:
    print(f"复制 '{source_hwh}' 到 '{dest_hwh}' 时出错: {e}")

# 复制 axi_stream_driver 到目标文件夹
try:
    shutil.copy(source_driver, dest_driver)
    print(f"成功复制 '{source_driver}' 到 '{dest_driver}'")
except Exception as e:
    print(f"复制 '{source_driver}' 到 '{dest_driver}' 时出错: {e}")

In [None]:
# 加载预测结果 (从 y_hw.npy 文件)
predictions = np.load("y_hw.npy")
# 提取预测类别
predicted_classes = np.argmax(predictions, axis=1)
true_classes = np.argmax(y_test_subset, axis=1)
# 计算准确率
accuracy = accuracy_score(true_classes, predicted_classes)
print(f"硬件推理准确率: {accuracy * 100:.2f}%")