In [36]:
from numba import njit, prange
import pandas as pd
import numpy as np
import dask.dataframe as dd

@njit()
def cal_diff(x, window):
    # pands diffより遅い
    # xの長さと同じ大きさの配列を作成し、初期値をNaNに設定
    log_diff = np.full(x.shape, np.nan)
    # 指定されたwindowに基づいて差分を計算
    for i in range(window, len(x)):
        log_diff[i] = x[i] - x[i - window]

    return log_diff

@njit()
def cal_vix(x, window, offset=0):
    # オフセットを適用し、対数を計算
    log_x = np.log(x + offset)

    # 差分を計算
    log_diff = np.empty(log_x.shape)
    log_diff[0] = np.nan
    for i in range(1, len(log_x)):
        log_diff[i] = log_x[i] - log_x[i - 1]

    # ローリング標準偏差を計算
    # jitを使わない場合、roll_std[i] = np.std(log_diff[i-window+1:i+1], ddof=1)と書ける(不偏推定量を使うためddof=1)
    # jitを使う場合、ddof=1は使えないので、標準偏差の計算を自分で実装する
    roll_std = np.empty(log_diff.shape)
    roll_std[:window] = np.nan
    for i in range(window, len(log_diff)):
        window_values = log_diff[i-window+1:i+1]
        mean = np.nanmean(window_values)
        sum_sq_diff = np.nansum((window_values - mean) ** 2)
        roll_std[i] = np.sqrt(sum_sq_diff / (window - 1))

    return roll_std

# parallel=Trueを指定すると、並列化される
@njit(parallel=True)
def cal_vix_p(x, window, offset=0):
    log_x = np.log(x + offset)
    log_diff = np.empty(log_x.shape)
    roll_std = np.empty(log_diff.shape)

    for i in prange(1, len(log_x)):
        log_diff[i] = log_x[i] - log_x[i - 1]
    
    # ローリング標準偏差を計算
    # jitを使わない場合、roll_std[i] = np.std(log_diff[i-window+1:i+1], ddof=1)と書ける(不偏推定量を使うためddof=1)
    # jitを使う場合、ddof=1は使えないので、標準偏差の計算を自分で実装する
    for i in prange(window, len(log_diff)):
        window_values = log_diff[i-window+1:i+1]
        mean = np.mean(window_values)
        sum_sq_diff = np.sum((window_values - mean) ** 2)
        roll_std[i] = np.sqrt(sum_sq_diff / (window - 1))

    return roll_std

data = {
    "key": ["key1", "key1", "key1", "key2", "key2", "key2"],
    "value": [10, 20, 30, 30, 20, 10]
}

# DataFrameの作成
df = pd.DataFrame(data)
grouped = df.groupby(['key'])

df["diff"] = grouped["value"].diff(2)
df["diff_c"] = grouped["value"].transform(lambda x: cal_diff(x.values, 2))

df["vix"] = grouped["value"].transform(lambda x: np.log(x).diff().rolling(2).std())
df["vix_c"] = grouped["value"].transform(lambda x: cal_vix(x.values, 2))

a = df.groupby(by=['key'])["value"].apply(lambda x: np.log(x).diff().rolling(2).std()).reset_index()['value']
a_c  = df.groupby(by=['key'])["value"].apply(lambda x: cal_vix(x.values, 2)).to_dict()
df['mean'] = df.apply(lambda x: a_c[x['key']], axis=1)

a_c

{'key1': array([       nan,        nan, 0.20342194]),
 'key2': array([       nan,        nan, 0.20342194])}

In [None]:
"""
# グループ化し、transformを適用
# Pandas DataFrameをDask DataFrameに変換
ddf = dd.from_pandas(df, npartitions=4)  # npartitionsは使用するコアの数に応じて調整

# インデックスを設定（'key'列をインデックスとする）
ddf = ddf.set_index('key')

# グループ化とtransformの適用

ddf_grouped = ddf.groupby('key')
ddf['diff_c'] = ddf_grouped['value'].transform(lambda x: cal_diff(x.values, 2), meta=('x', 'f8'))
ddf['vix_c'] = ddf_grouped['value'].transform(lambda x: cal_vix(x.values, 2), meta=('x', 'f8'))

# 計算結果の取得
result = ddf.compute()
print(result)
"""

FileNotFoundError: [Errno 2] No such file or directory: './df_train.csv'