<a href="https://colab.research.google.com/github/kasahart/test/blob/main/%E7%9C%81%E3%83%A1%E3%83%A2%E3%83%AA%E8%A8%88%E7%AE%97%E3%81%AE%E3%83%86%E3%82%B9%E3%83%88.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import h5py
import dask.array as da
import gc
import os

### データ準備 ###
filename = 'data_compare.h5'
n, w, h = 1000, 500, 500
dtype = np.float64

# HDF5ファイル作成 (存在しなければ)
if not os.path.exists(filename):
    with h5py.File(filename, 'w') as f:
        dset = f.create_dataset('data', shape=(n, w, h), dtype=dtype, chunks=(100, w, h))
        data_array = np.random.rand(n, w, h).astype(dtype)
        dset[:] = data_array
    del data_array
    gc.collect()

# 全読み込み計算（基準となる計算）
with h5py.File(filename, 'r') as f:
    full_data = f['data'][:]
mean_full = np.mean(full_data, axis=0)
var_full = np.var(full_data, axis=0)
del full_data
gc.collect()

### Welford法での計算 ###
with h5py.File(filename, 'r') as f:
    dset = f['data']
    mean_w = np.zeros((w, h), dtype=dtype)
    M2 = np.zeros((w, h), dtype=dtype)
    count = 0

    # 中間バッファ
    delta = np.zeros((w, h), dtype=dtype)
    delta2 = np.zeros((w, h), dtype=dtype)

    chunk_size = 100
    for start in range(0, n, chunk_size):
        end = min(start + chunk_size, n)
        chunk = dset[start:end, :, :]
        for i in range(chunk.shape[0]):
            x = chunk[i, :, :]
            count += 1
            np.subtract(x, mean_w, out=delta)
            mean_w += delta / count
            np.subtract(x, mean_w, out=delta2)
            np.multiply(delta, delta2, out=delta2)
            M2 += delta2
        del chunk
        gc.collect()

    var_w = M2 / (count - 1)

### Daskでの計算 ###
with h5py.File(filename, 'r') as f:
    dset = f['data']
    x = da.from_array(dset, chunks=(100, w, h))
    mean_d = x.mean(axis=0).compute()
    var_d = x.var(axis=0).compute()

### 誤差確認 ###
# 絶対誤差
mean_diff_w = np.abs(mean_full - mean_w).max()
var_diff_w = np.abs(var_full - var_w).max()

mean_diff_d = np.abs(mean_full - mean_d).max()
var_diff_d = np.abs(var_full - var_d).max()

print("比較結果（最大絶対誤差）")
print("Welford法 vs Full:")
print("  Mean max abs diff:", mean_diff_w)
print("  Var max abs diff: ", var_diff_w)

print("Dask vs Full:")
print("  Mean max abs diff:", mean_diff_d)
print("  Var max abs diff: ", var_diff_d)

# 必要に応じて、相対誤差などもチェック可能
mean_rel_diff_w = mean_diff_w / np.abs(mean_full).max()
var_rel_diff_w = var_diff_w / np.abs(var_full).max()
mean_rel_diff_d = mean_diff_d / np.abs(mean_full).max()
var_rel_diff_d = var_diff_d / np.abs(var_full).max()

print("\n比較結果（最大相対誤差）")
print("Welford法 vs Full:")
print("  Mean max relative diff:", mean_rel_diff_w)
print("  Var max relative diff: ", var_rel_diff_w)

print("Dask vs Full:")
print("  Mean max relative diff:", mean_rel_diff_d)
print("  Var max relative diff: ", var_rel_diff_d)


比較結果（最大絶対誤差）
Welford法 vs Full:
  Mean max abs diff: 2.886579864025407e-15
  Var max abs diff:  9.457210975327701e-05
Dask vs Full:
  Mean max abs diff: 1.887379141862766e-15
  Var max abs diff:  3.885780586188048e-16

比較結果（最大相対誤差）
Welford法 vs Full:
  Mean max relative diff: 5.295941364890668e-15
  Var max relative diff:  0.0010010010010006742
Dask vs Full:
  Mean max relative diff: 3.462730892428514e-15
  Var max relative diff:  4.112914755302307e-15


In [1]:
import numpy as np
import dask.array as da
import os
import gc
import h5py
import tqdm
import psutil
import time
process = psutil.Process(os.getpid())

# テスト用に巨大なバイナリファイルを作成する（存在しない場合）
filename = 'data_compare.h5'
n, w, h = 2000, 500, 500
dtype = np.float64

mem_info = process.memory_info()
print(f"初期メモリ - RSS: {mem_info.rss / (1024**2):.2f} MB")
# HDF5ファイル作成
with h5py.File(filename, 'w') as f:
    # dset = f.create_dataset('data', shape=(n, w, h), dtype=dtype, chunks=(100, w, h))
    # for i in tqdm.tqdm(range(n)):
    #     data_array = np.zeros((w, h)).astype(dtype) + float(i)
    #     dset[i] = data_array

    # 初期サイズと最大サイズを指定してデータセットを作成
    dset = f.create_dataset(
        'data',
        shape=(n, w, h),  # 初期サイズ
        # maxshape=(n, w, h),  # 最大サイズ (第一軸を無制限にリサイズ可能)
        dtype=np.float64,
        chunks=(1, w, h),
        compression=None
    )

    for i in tqdm.tqdm(range(n-10)):
        data_array = np.zeros((w, h)).astype(dtype) + float(i)
        dset[i] = data_array

    dset.resize((n-10, w, h))

gc.collect()

mem_info = process.memory_info()
print(f"データ書き込み後 - RSS: {mem_info.rss / (1024**2):.2f} MB")

初期メモリ - RSS: 124.84 MB


100%|██████████| 1990/1990 [00:41<00:00, 48.26it/s]

データ書き込み後 - RSS: 136.61 MB





In [2]:
# テスト用設定
mem_info = process.memory_info()
print(f"初期メモリ - RSS: {mem_info.rss / (1024**2):.2f} MB")

# 計算時間測定用の開始時刻
start_time = time.time()

### Daskでの計算 ###
with h5py.File(filename, 'r') as f:
    dset = f['data']
    x = da.from_array(dset, chunks=(100, w, h))

    # Dask Arrayに変換 (この処理自体は軽量だが一応測定)
    mid_time = time.time()
    mem_info = process.memory_info()
    print(f"Dask Arrayに変換 - RSS: {mem_info.rss / (1024**2):.2f} MB")
    print(f"Dask Arrayに変換までの処理時間: {mid_time - start_time:.2f}秒")

    # Daskで平均・分散計算
    start_calc = time.time()

    mean = x.mean(axis=0).compute()
    variance = x.var(axis=0).compute()


    end_calc = time.time()
    print("mean shape:", mean.shape)
    print("variance shape:", variance.shape)
    print("Mean and variance calculation completed successfully.")

    mem_info = process.memory_info()
    print(f"平均分散計算 - RSS: {mem_info.rss / (1024**2):.2f} MB")
    print(f"平均・分散計算時間: {end_calc - start_calc:.2f}秒")

# # 標準化計算
# start_std = time.time()
# std_dev = da.sqrt(variance)  # (w, h) shape
# x_memmap = da.from_array(memmap_array, chunks=(100, w, h))
# x_standardized = (x_memmap - mean) / std_dev

# # HDF5へ書き出し (計算とI/Oをトリガー)
# filename = 'output2.h5'
# dset_name = '/standardized_data'
# da.to_hdf5(filename, dset_name, x_standardized)

# end_std = time.time()
# mem_info = process.memory_info()
# print(f"標準化計算 - RSS: {mem_info.rss / (1024**2):.2f} MB")
# print(f"標準化計算および書き出し時間: {end_std - start_std:.2f}秒")


初期メモリ - RSS: 136.61 MB
Dask Arrayに変換 - RSS: 136.61 MB
Dask Arrayに変換までの処理時間: 0.01秒
mean shape: (500, 500)
variance shape: (500, 500)
Mean and variance calculation completed successfully.
平均分散計算 - RSS: 171.25 MB
平均・分散計算時間: 9.47秒


In [3]:
# =======================
# 通常の平均・分散との比較
# =======================
# 全データを一括ロードしてnumpyで平均・分散を計算
mem_info = process.memory_info()
print(f"初期メモリ - RSS: {mem_info.rss / (1024**2):.2f} MB")
start_time = time.time()

# 全読み込み計算（基準となる計算）
with h5py.File(filename, 'r') as f:
    full_data = f['data'][:]

mid_time = time.time()
mem_info = process.memory_info()
print(f"読み込み - RSS: {mem_info.rss / (1024**2):.2f} MB")
print(f"読み込みまでの処理時間: {mid_time - start_time:.2f}秒")

mean_full = np.mean(full_data, axis=0)
var_full = np.var(full_data, axis=0)

mid_time = time.time()
mem_info = process.memory_info()
print(f"平均分散計算 - RSS: {mem_info.rss / (1024**2):.2f} MB")
print(f"平均分散計算までの処理時間: {mid_time - start_time:.2f}秒")

# 絶対誤差計算
mean_diff = np.abs(mean_full - mean).max()
var_diff = np.abs(var_full - variance).max()

end_compare = time.time()

print("通常計算との比較結果:")
print(f"Mean max abs diff: {mean_diff}")
print(f"Var max abs diff : {var_diff}")

初期メモリ - RSS: 171.98 MB
読み込み - RSS: 3972.91 MB
読み込みまでの処理時間: 3.01秒
平均分散計算 - RSS: 3966.95 MB
平均分散計算までの処理時間: 10.92秒
通常計算との比較結果:
Mean max abs diff: 0.0
Var max abs diff : 0.0


In [4]:
full_data.shape

(1990, 500, 500)