In [6]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import timeit
import functools
import os

# 尝试导入 pyfftw，如果失败则无法运行 pyfftw 部分
try:
    import pyfftw
    from pyfftw.interfaces import numpy_fft as pyfftw_fft
    pyfftw_available = True
    
    # --- 配置 PyFFTW Wisdom (更稳健的版本) ---
    pyfftw.interfaces.cache.enable()
    WISDOM_FILE = "pyfftw_wisdom.dat"

    if os.path.exists(WISDOM_FILE):
        try:
            with open(WISDOM_FILE, 'rb') as f:
                pyfftw.import_wisdom(f.read())
            print("PyFFTW Wisdom 导入成功。")
        except (IOError, EOFError, IndexError) as e: # <---【修复】将 IndexError 加入异常捕获
            print(f"警告：Wisdom 文件 '{WISDOM_FILE}' 已损坏或无法解析 ({e})。")
            print("         将删除旧文件并在此次运行后重新生成。")
            try:
                os.remove(WISDOM_FILE) # 尝试自动删除坏文件
            except OSError as remove_e:
                print(f"         无法自动删除文件: {remove_e}")
    else:
        print("未找到 Wisdom 文件，将在首次规划后生成。")
        
except ImportError:
    pyfftw_available = False
    print("警告：pyfftw 未安装，将无法进行对比。")


# --- 辅助函数：创建测试数据 ---
def make_test_image(shape=(256, 256)):
    """创建一个包含几个亮点的模拟图像"""
    image = np.zeros(shape, dtype=np.float64)
    # 在图像中放置几个明亮的“星星”
    coords = np.array([[0.2, 0.2], [0.5, 0.5], [0.8, 0.7], [0.3, 0.8]])
    for r, c in coords:
        x, y = int(r * shape[0]), int(c * shape[1])
        image[x, y] = 1000.0
    # 添加一些背景噪声
    image += np.random.uniform(0, 1, size=shape)
    return image

def make_gaussian_kernel(size=31, sigma=5):
    """创建一个高斯卷积核"""
    ax = np.linspace(-(size - 1) / 2., (size - 1) / 2., size)
    xx, yy = np.meshgrid(ax, ax)
    kernel = np.exp(-0.5 * (np.square(xx) + np.square(yy)) / np.square(sigma))
    return kernel / kernel.sum()


# --- 两种卷积方法的实现 ---

def convolve_numpy(image, kernel):
    """使用 numpy.fft 进行卷积"""
    s1 = np.array(image.shape)
    s2 = np.array(kernel.shape)
    shape = s1 + s2 - 1
    
    # 确定FFT形状
    fshape = [pyfftw.next_fast_len(int(d)) for d in shape]

    # 计算FFT
    fft_image = np.fft.rfftn(image, fshape)
    fft_kernel = np.fft.rfftn(kernel, fshape)

    # 卷积并逆变换
    ret = np.fft.irfftn(fft_image * fft_kernel, fshape)

    # 裁剪回原始大小 (标准的左上角裁剪)
    conv = ret[0:image.shape[0], 0:image.shape[1]].copy()
    
    # 返回结果，不施加非负约束，以便观察原始差异
    return conv

def convolve_pyfftw(image, kernel):
    """使用 pyfftw.interfaces.numpy_fft 进行卷积 (高性能且稳健)"""
    if not pyfftw_available:
        raise RuntimeError("pyfftw 不可用")

    s1 = np.array(image.shape)
    s2 = np.array(kernel.shape)
    shape = s1 + s2 - 1
    
    # 确定FFT形状
    fshape = [pyfftw.next_fast_len(int(d)) for d in shape]

    # 使用 pyfftw 接口计算 FFT
    fft_image = pyfftw_fft.rfftn(image, fshape)
    fft_kernel = pyfftw_fft.rfftn(kernel, fshape)

    # 卷积并逆变换
    ret = pyfftw_fft.irfftn(fft_image * fft_kernel, fshape)

    # 裁剪回原始大小
    conv = ret[0:image.shape[0], 0:image.shape[1]].copy()
    
    # 返回结果，不施加非负约束，以便观察原始差异
    return conv


# --- 可视化函数 ---
def visualize_results(image, kernel, result_numpy, result_pyfftw):
    """可视化所有步骤和差异"""
    # 准备FFT结果用于显示
    fft_image_abs = np.abs(np.fft.fftshift(np.fft.fftn(image)))
    fft_kernel_abs = np.abs(np.fft.fftshift(np.fft.fftn(kernel, s=image.shape)))
    
    # 计算差异
    difference = result_numpy - result_pyfftw
    
    fig, axes = plt.subplots(2, 3, figsize=(18, 11))
    fig.suptitle("NumPy FFT vs. PyFFTW 卷积对比", fontsize=16)

    # 第一行
    im0 = axes[0, 0].imshow(image, cmap='viridis', norm=LogNorm())
    axes[0, 0].set_title("1. 原始图像")
    fig.colorbar(im0, ax=axes[0, 0])

    im1 = axes[0, 1].imshow(kernel, cmap='viridis')
    axes[0, 1].set_title("2. 高斯卷积核")
    fig.colorbar(im1, ax=axes[0, 1])

    im2 = axes[0, 2].imshow(fft_image_abs, cmap='viridis', norm=LogNorm())
    axes[0, 2].set_title("3. 图像傅里叶谱 (幅度)")
    fig.colorbar(im2, ax=axes[0, 2])
    
    # 第二行
    im3 = axes[1, 0].imshow(result_numpy, cmap='viridis', norm=LogNorm(vmin=1))
    axes[1, 0].set_title("4. NumPy 卷积结果")
    fig.colorbar(im3, ax=axes[1, 0])

    im4 = axes[1, 1].imshow(result_pyfftw, cmap='viridis', norm=LogNorm(vmin=1))
    axes[1, 1].set_title("5. PyFFTW 卷积结果")
    fig.colorbar(im4, ax=axes[1, 1])
    
    # 差异图
    max_abs_diff = np.max(np.abs(difference))
    im5 = axes[1, 2].imshow(difference, cmap='seismic', vmin=-max_abs_diff, vmax=max_abs_diff)
    axes[1, 2].set_title(f"6. 差异 (NumPy - PyFFTW)\n最大差异: {max_abs_diff:.2e}")
    fig.colorbar(im5, ax=axes[1, 2])
    
    for ax in axes.flat:
        ax.set_xticks([])
        ax.set_yticks([])

    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()

# --- 性能测试函数 ---
def run_benchmark():
    """执行性能测试并打印报告"""
    print("\n" + "="*50)
    print("开始性能测试 (Benchmark)")
    print("="*50)
    
    # 定义测试场景
    test_scenarios = {
        "小图, 小核": {"shape": (256, 256), "ksize": 15},
        "中图, 中核": {"shape": (512, 512), "ksize": 31},
        "大图, 大核": {"shape": (1024, 1024), "ksize": 61},
    }
    
    n_loops = 50 # 每个场景的循环次数

    for name, params in test_scenarios.items():
        print(f"\n--- 场景: {name} (图像: {params['shape']}, 核: {params['ksize']}x{params['ksize']}) ---")
        
        # 准备数据
        image = make_test_image(shape=params['shape'])
        kernel = make_gaussian_kernel(size=params['ksize'])
        
        # 为 NumPy 创建一个无参数的调用
        numpy_callable = functools.partial(convolve_numpy, image, kernel)
        
        # 测试 NumPy
        numpy_time = timeit.timeit(
            numpy_callable,
            number=n_loops
        )
        avg_numpy = (numpy_time / n_loops) * 1000
        print(f"NumPy 平均耗时: {avg_numpy:.3f} ms")

        # 测试 PyFFTW
        if pyfftw_available:
            pyfftw_callable = functools.partial(convolve_pyfftw, image, kernel)
            
            pyfftw_time = timeit.timeit(
                pyfftw_callable,
                number=n_loops
            )
            avg_pyfftw = (pyfftw_time / n_loops) * 1000
            print(f"PyFFTW 平均耗时: {avg_pyfftw:.3f} ms")
            
            speedup = avg_numpy / avg_pyfftw
            print(f"-> PyFFTW 加速倍数: {speedup:.2f}x")
        else:
            print("PyFFTW 不可用，跳过测试。")

    if pyfftw_available:
        try:
            # 【修复】pyfftw.export_wisdom() 返回一个元组，需要将其连接起来
            wisdom_tuple = pyfftw.export_wisdom()
            # 将元组中的所有字节串连接成一个单一的字节串
            wisdom_bytes = b''.join(wisdom_tuple)
            
            with open(WISDOM_FILE, 'wb') as f:
                f.write(wisdom_bytes)
            print(f"\nWisdom 已更新并保存至 {WISDOM_FILE}")
        except IOError as e:
            print(f"\n无法保存 Wisdom: {e}")

# --- 主程序 ---
if __name__ == '__main__':
    # 1. 执行一次计算并可视化结果
    print("正在生成可视化结果...")
    image_vis = make_test_image(shape=(256, 256))
    kernel_vis = make_gaussian_kernel(size=31)
    
    result_numpy = convolve_numpy(image_vis, kernel_vis)
    
    if pyfftw_available:
        try:
            # 【修复】pyfftw.export_wisdom() 返回一个元组，需要将其连接起来
            wisdom_tuple = pyfftw.export_wisdom()
            # 将元组中的所有字节串连接成一个单一的字节串
            wisdom_bytes = b''.join(wisdom_tuple)
            
            with open(WISDOM_FILE, 'wb') as f:
                f.write(wisdom_bytes)
            print(f"\nWisdom 已更新并保存至 {WISDOM_FILE}")
        except IOError as e:
            print(f"\n无法保存 Wisdom: {e}")

    # 2. 运行性能测试
    run_benchmark()

警告：Wisdom 文件 'pyfftw_wisdom.dat' 已损坏或无法解析 (index out of range)。
         将删除旧文件并在此次运行后重新生成。
正在生成可视化结果...

Wisdom 已更新并保存至 pyfftw_wisdom.dat

开始性能测试 (Benchmark)

--- 场景: 小图, 小核 (图像: (256, 256), 核: 15x15) ---
NumPy 平均耗时: 2.176 ms
PyFFTW 平均耗时: 2.051 ms
-> PyFFTW 加速倍数: 1.06x

--- 场景: 中图, 中核 (图像: (512, 512), 核: 31x31) ---
NumPy 平均耗时: 11.649 ms
PyFFTW 平均耗时: 7.194 ms
-> PyFFTW 加速倍数: 1.62x

--- 场景: 大图, 大核 (图像: (1024, 1024), 核: 61x61) ---
NumPy 平均耗时: 58.526 ms
PyFFTW 平均耗时: 32.490 ms
-> PyFFTW 加速倍数: 1.80x

Wisdom 已更新并保存至 pyfftw_wisdom.dat
