# Portable CPU Benchmark (No Big Downloads)

本 notebook 旨在 **“即插即用”** 地在任意 CPU 设备上快速评估：
- **单核** 吞吐（单位时间完成的工作量）
- **满核** 吞吐（使用所有可用逻辑核心）

**特点**：
- 无需下载大型模型或数据；
- 仅依赖 Python 标准库（可选地检测到 NumPy 时运行一个小型 GEMM 测试）；
- 采用可跨设备比较的统一度量：**哈希/秒 (SHA256-H/s)**，并给出 **加速比**。

> 备注：此测试以 SHA-256 的多次计算为“工作单元（WU）”；1 WU = 对 512 字节的消息执行 1 次 SHA-256。

In [None]:
# English comments are used in code blocks for portability and clarity.

import os
import time
import math
import hashlib
import statistics
import multiprocessing as mp
from typing import Tuple, Dict

def cpu_info():
    """Return basic CPU info without external dependencies."""
    info = {}
    info["logical_cores"] = os.cpu_count() or 1
    # Try to infer physical cores via /proc/cpuinfo (Linux), fallback to logical.
    physical = None
    try:
        sockets = set()
        cores = set()
        with open("/proc/cpuinfo", "r") as f:
            phys_id, core_id = None, None
            for line in f:
                if ":" not in line: 
                    continue
                k, v = [s.strip() for s in line.split(":", 1)]
                if k == "physical id":
                    phys_id = v
                elif k == "core id":
                    core_id = v
                if phys_id is not None and core_id is not None:
                    cores.add((phys_id, core_id))
        if cores:
            physical = len(cores)
    except Exception:
        pass
    info["physical_cores_est"] = physical or info["logical_cores"]
    return info

def make_block(seed: int, size: int = 512) -> bytearray:
    """Create a deterministic message buffer of `size` bytes based on a seed."""
    buf = bytearray(size)
    x = seed & 0xFF
    for i in range(size):
        # Simple LCG-like pattern to avoid trivial optimization; deterministic
        x = (x * 131 + 17) & 0xFF
        buf[i] = x
    return buf

def sha256_work(iterations: int, seed: int = 1234) -> Tuple[bytes, int]:
    """
    Perform `iterations` SHA-256 hashes on a 512B buffer, mutating one byte per iter.
    Returns (final_digest, iterations).
    """
    buf = make_block(seed, 512)
    m = memoryview(buf)
    h = hashlib.sha256()
    # Use local variables for speed
    mv0_index = 0
    for i in range(iterations):
        # mutate one byte to avoid identical-message fast paths
        buf[mv0_index] ^= (i & 0xFF)
        h.update(m)  # feed 512 B
        # rotate which byte we flip next
        mv0_index = (mv0_index + 1) & 511
    return h.digest(), iterations

def time_single_core(target_seconds: float = 8.0) -> Dict[str, float]:
    """
    Auto-calibrate iterations to run ~target_seconds on 1 core.
    Returns dict with throughput in hashes/sec and elapsed time.
    """
    # Quick warmup
    _ = sha256_work(1000, seed=1)
    # Calibrate
    iters = 5000
    # Grow until ~0.5s to get stable measurement
    while True:
        t0 = time.perf_counter()
        _ = sha256_work(iters, seed=7)
        t1 = time.perf_counter()
        dt = t1 - t0
        if dt >= 0.5 or iters > 10_000_000:
            break
        # Scale to approach target_seconds
        scale = max(2.0, min(10.0, target_seconds / max(0.1, dt)))
        iters = int(iters * scale)
        iters = max(iters, 1000)
    # Now run 3 repeats and take median
    repeats = []
    for s in (11, 13, 17):
        t0 = time.perf_counter()
        _ = sha256_work(iters, seed=s)
        t1 = time.perf_counter()
        repeats.append(t1 - t0)
    median_dt = statistics.median(repeats)
    hps = iters / median_dt
    return {"iterations": float(iters), "elapsed_sec": float(median_dt), "throughput_hps": float(hps)}

def _worker(args):
    iters, seed = args
    d, n = sha256_work(iters, seed)
    return n

def time_full_cores(single_core_iters: int, target_seconds: float = 8.0) -> Dict[str, float]:
    """
    Use multiprocessing to saturate all logical cores.
    We distribute work so total time is around target_seconds.
    """
    workers = os.cpu_count() or 1
    # Roughly adjust iterations per worker based on single-core timing
    # We expect full parallel speedup close to workers, so keep same iters per worker.
    per_worker_iters = int(single_core_iters)
    # Small warmup pool
    with mp.Pool(processes=workers) as pool:
        _ = list(pool.imap_unordered(_worker, [(2000, i+1) for i in range(min(workers, 4))]))
    # Timed run
    args = [(per_worker_iters, i+1) for i in range(workers)]
    t0 = time.perf_counter()
    with mp.Pool(processes=workers) as pool:
        done = list(pool.imap_unordered(_worker, args))
    t1 = time.perf_counter()
    total_iters = sum(done)
    dt = t1 - t0
    hps = total_iters / dt
    return {
        "workers": workers,
        "iterations_total": float(total_iters),
        "elapsed_sec": float(dt),
        "throughput_hps": float(hps),
    }

def format_hps(hps: float) -> str:
    if hps >= 1e9:
        return f"{hps/1e9:.2f} GH/s"
    if hps >= 1e6:
        return f"{hps/1e6:.2f} MH/s"
    if hps >= 1e3:
        return f"{hps/1e3:.2f} kH/s"
    return f"{hps:.2f} H/s"

In [None]:
# --- Single-core benchmark ---
info = cpu_info()
print("CPU logical cores:", info["logical_cores"], "| physical cores (est):", info["physical_cores_est"])

single = time_single_core(target_seconds=8.0)
print("\n[Single-core]")
print("Iterations:", int(single["iterations"]))
print("Elapsed:    %.3f s" % single["elapsed_sec"])
print("Throughput: %s" % format_hps(single["throughput_hps"]))

In [None]:
# --- Full-cores benchmark ---
full = time_full_cores(int(single["iterations"]), target_seconds=8.0)
print("\n[Full-cores]")
print("Workers:", full["workers"])
print("Total iterations:", int(full["iterations_total"]))
print("Elapsed:         %.3f s" % full["elapsed_sec"])
print("Throughput:      %s" % format_hps(full["throughput_hps"]))

speedup = full["throughput_hps"] / single["throughput_hps"]
print("\n[Speedup] Full / Single = %.2f x" % speedup)

In [None]:
# --- Optional: NumPy GEMM test (if numpy is installed) ---
try:
    import numpy as np
    # Control BLAS threads via env vars if user set them before starting the kernel.
    # Smallish GEMM to avoid memory pressure
    n = 1024
    print(f"\n[NumPy GEMM] running {n}x{n} matmul once...")
    A = np.random.RandomState(0).randn(n, n).astype(np.float32)
    B = np.random.RandomState(1).randn(n, n).astype(np.float32)
    t0 = time.perf_counter()
    C = A @ B
    t1 = time.perf_counter()
    flops = 2.0 * n * n * n  # 2*n^3 FLOPs for GEMM
    gflops = flops / (t1 - t0) / 1e9
    print("Elapsed: %.3f s | Approx: %.2f GFLOP/s" % (t1 - t0, gflops))
except Exception as e:
    print("[NumPy GEMM] NumPy not available or failed:", e)

In [None]:
# --- Summary and persist ---
from datetime import datetime
report = {
    "timestamp_utc": datetime.utcnow().isoformat() + "Z",
    "cpu": cpu_info(),
    "single_core": single,
    "full_cores": full,
    "speedup_full_over_single": full["throughput_hps"] / single["throughput_hps"],
}

import json, os
os.makedirs("benchmark_reports", exist_ok=True)
with open("benchmark_reports/cpu_benchmark_report.json", "w") as f:
    json.dump(report, f, indent=2)

print("\nSaved JSON report to benchmark_reports/cpu_benchmark_report.json")
print(json.dumps(report, indent=2))