In [1]:
import numpy as np
import pandas as pd

from scipy import signal
from scipy.stats import f as f_dist
from sklearn.linear_model import LinearRegression


def _to_1d_numeric(x: pd.Series) -> np.ndarray:
    """
    1列（Series）を「数値の1次元配列」にする。
    - 文字列が混じっていても to_numeric で NaN に落とす
    - 欠損は線形補間で埋め、端も埋める
    - それでも残る NaN は落とす
    """
    s = pd.to_numeric(x, errors="coerce")
    s = s.interpolate(limit_direction="both")
    s = s.dropna()
    return s.to_numpy(dtype=float)


def _acf_at_lag(x: np.ndarray, lag: int) -> float:
    """
    自己相関（ACF）を1点だけ計算（lag>=1）。
    正規化して r(lag) を返す。x は平均0にしておくのが望ましい。
    """
    if lag <= 0 or lag >= len(x):
        return np.nan
    x0 = x - x.mean()
    denom = np.dot(x0, x0) + 1e-12
    return float(np.dot(x0[lag:], x0[:-lag]) / denom)


def _harmonic_f_test(x: np.ndarray, T: float, fs: float = 1.0) -> tuple[float, float, float]:
    """
    周期 T（点数 or 時間単位）を仮定して、
      x_t = a cos(2πt/T) + b sin(2πt/T) + c + e_t
    を線形回帰で当てはめ、a=b=0 をF検定する。

    戻り値：
      (p_value, R2, F_stat)

    注意：
    - ここでの p_value は「この周期の正弦波成分が有意か」の目安。
    - 残差が強い自己相関を持つと p が過小になり得る（現場では“目安”として使う）。
    """
    n = len(x)
    if n < 10:
        return (np.nan, np.nan, np.nan)

    # 時刻軸（等間隔サンプリング前提）
    t = np.arange(n, dtype=float) / fs

    # 説明変数：cos, sin, 定数項（切片）
    w = 2.0 * np.pi / T
    X = np.column_stack([np.cos(w * t), np.sin(w * t), np.ones_like(t)])

    # 線形回帰（切片は design matrix に入れているので fit_intercept=False）
    model = LinearRegression(fit_intercept=False)
    model.fit(X, x)
    yhat = model.predict(X)

    # 決定係数 R^2（どれだけ説明できたか）
    sse = np.sum((x - yhat) ** 2)                      # 残差平方和
    sst = np.sum((x - np.mean(x)) ** 2) + 1e-12        # 全体平方和
    r2 = 1.0 - (sse / sst)

    # F検定：追加したパラメータは a,b の2つ（df1=2）
    df1 = 2
    df2 = n - 3  # (a,b,c) の3パラメータを使っているので n-3
    if df2 <= 0:
        return (np.nan, float(r2), np.nan)

    # 回帰による説明分 / 残差分 を比で見る
    ssr = sst - sse
    msr = ssr / df1
    mse = sse / df2
    F = msr / (mse + 1e-12)

    # 右片側確率（大きいFほど「周期成分あり」）
    p = 1.0 - f_dist.cdf(F, df1, df2)

    return (float(p), float(r2), float(F))


def check_periodicity_per_column(
    df: pd.DataFrame,
    fs: float = 1.0,                 # 1サンプルの時間間隔が一定なら 1.0 のままでOK
    min_period: int = 5,             # 探索する周期の下限（点数）
    max_period: int | None = None,   # 探索する周期の上限（点数）。Noneなら N//2
    top_k_peaks: int = 5,            # スペクトルから拾う周期候補の数
    detrend: bool = True,            # 緩やかな増減（トレンド）を落として周期だけ見やすくする
    alpha: float = 0.01              # 有意水準（小さいほど厳しい）
) -> pd.DataFrame:
    """
    N×Mの各列について周期性をチェックして、要約表(DataFrame)を返す。

    方針：
    1) 各列を前処理（欠損補間→必要ならトレンド除去→標準化）
    2) periodogram（スペクトル）から「周期候補」を top_k 個抽出
    3) 候補周期ごとに正弦波回帰 + F検定（p値）を計算
    4) 最もp値が小さい周期を「その列の最有力周期」として報告
    5) 補助指標として、その周期ラグの自己相関(ACF)も出す
    """
    results = []

    for col in df.columns:
        x = _to_1d_numeric(df[col])
        n = len(x)

        # データが短すぎると周期推定は不安定
        if n < max(50, 5 * min_period):
            results.append({
                "col": col, "n": n,
                "best_period": np.nan, "best_freq": np.nan,
                "peak_ratio": np.nan,
                "p_value": np.nan, "R2": np.nan, "F": np.nan,
                "acf_at_best": np.nan,
                "is_periodic": False
            })
            continue

        # 周期探索の上限
        maxP = (n // 2) if (max_period is None) else int(min(max_period, n // 2))
        maxP = max(maxP, min_period + 1)

        # 1) トレンド除去（ゆっくり上がる/下がる成分があると“周期らしさ”を邪魔する）
        if detrend:
            x = signal.detrend(x, type="linear")

        # 2) 標準化（列ごとのスケール差を消す：平均0・分散1）
        x = x - np.mean(x)
        x = x / (np.std(x) + 1e-12)

        # 3) スペクトル（periodogram）
        #    window="hann" は漏れ（リーク）を減らしてピークを見やすくする定番設定
        f, Pxx = signal.periodogram(
            x,
            fs=fs,
            window="hann",
            detrend=False,
            scaling="spectrum"
        )

        # DC(0Hz)は周期性ではないので除外
        f = f[1:]
        Pxx = Pxx[1:]

        # 周期 = 1/f（f=0は除外済み）
        period = 1.0 / (f + 1e-300)

        # min_period〜max_period の範囲だけ見る
        mask = (period >= min_period) & (period <= maxP)
        f2 = f[mask]
        P2 = Pxx[mask]
        per2 = period[mask]

        if len(P2) < 10:
            # 有効範囲が取れない場合
            results.append({
                "col": col, "n": n,
                "best_period": np.nan, "best_freq": np.nan,
                "peak_ratio": np.nan,
                "p_value": np.nan, "R2": np.nan, "F": np.nan,
                "acf_at_best": np.nan,
                "is_periodic": False
            })
            continue

        # 4) ピーク抽出：スペクトルの山を候補にする
        #    prominence を入れると「小さなノイズピーク」を減らせるが、
        #    ここでは簡潔に height なしでピークを拾い、上位だけ使う。
        peak_idx, _ = signal.find_peaks(P2)
        if len(peak_idx) == 0:
            # ピークが取れない（単調に近い等）
            results.append({
                "col": col, "n": n,
                "best_period": np.nan, "best_freq": np.nan,
                "peak_ratio": float(np.max(P2) / (np.median(P2) + 1e-12)),
                "p_value": np.nan, "R2": np.nan, "F": np.nan,
                "acf_at_best": np.nan,
                "is_periodic": False
            })
            continue

        # ピークの強さでソートして上位 top_k_peaks を候補にする
        peak_powers = P2[peak_idx]
        order = np.argsort(peak_powers)[::-1]
        peak_idx = peak_idx[order[:top_k_peaks]]

        # 参考：ピークの鋭さ（最大ピーク / スペクトル中央値）
        peak_ratio = float(np.max(P2) / (np.median(P2) + 1e-12))

        # 5) 各候補周期で「正弦波成分が有意か」をF検定
        best = {
            "p": np.inf,
            "T": np.nan,
            "freq": np.nan,
            "R2": np.nan,
            "F": np.nan
        }

        for i in peak_idx:
            T = float(per2[i])
            freq = float(f2[i])

            p, r2, F = _harmonic_f_test(x, T=T, fs=fs)

            # pが最小（最も有意）な周期を採用
            if np.isfinite(p) and p < best["p"]:
                best.update({"p": p, "T": T, "freq": freq, "R2": r2, "F": F})

        # 6) 補助：自己相関（周期が点数なので lag は四捨五入した整数で見る）
        lag = int(np.round(best["T"])) if np.isfinite(best["T"]) else -1
        acf_best = _acf_at_lag(x, lag) if lag >= 1 else np.nan

        # 7) 判定：p値が小さい（統計的に有意）なら周期性あり、とするのが最小構成
        #    実務では peak_ratio や acf も併記して判断材料にする
        is_periodic = bool(np.isfinite(best["p"]) and best["p"] < alpha)

        results.append({
            "col": col,
            "n": n,
            "best_period": best["T"],   # 点数（fs=1なら「何点周期か」）
            "best_freq": best["freq"],  # 周波数
            "peak_ratio": peak_ratio,   # 最大ピーク/中央値（ピークが目立つほど大きい）
            "p_value": best["p"],       # 周期成分の有意確率（小さいほど周期性が強い）
            "R2": best["R2"],           # 正弦波でどれだけ説明できたか
            "F": best["F"],             # F統計量
            "acf_at_best": acf_best,    # best_period の自己相関（補助）
            "is_periodic": is_periodic
        })

    out = pd.DataFrame(results)
    # 見やすく：周期っぽい列が上に来るように並べる
    out = out.sort_values(["is_periodic", "p_value"], ascending=[False, True]).reset_index(drop=True)
    return out


# ===== 使い方 =====
# df: N×M の DataFrame（各列がセンサ時系列）
# summary = check_periodicity_per_column(df, fs=1.0, min_period=5, max_period=500, top_k_peaks=5, detrend=True, alpha=0.01)
# display(summary)



In [3]:
# ============================================================
# ① データの読み込み（N×M、df形式、学習とテストは別）
# ============================================================
def load_csv_df(path, sep=";", encoding="utf-8"):
    return pd.read_csv(path, sep=sep, encoding=encoding)
SEP = ","
train_csv_path = r".\\data\test_5\\SKAB\\X_train.csv"
test_csv_path  = r".\\data\test_5\\SKAB\\X_test.csv"
df_train = load_csv_df(train_csv_path, sep=SEP)
df_test  = load_csv_df(test_csv_path,  sep=SEP)

print("df_train shape:", df_train.shape)
print("df_test  shape:", df_test.shape)


df_train shape: (5643, 8)
df_test  shape: (41163, 8)


In [4]:
summary = check_periodicity_per_column(df_train, fs=1.0, min_period=5, max_period=500, top_k_peaks=5, detrend=True, alpha=0.01)
display(summary)

Unnamed: 0,col,n,best_period,best_freq,peak_ratio,p_value,R2,F,acf_at_best,is_periodic
0,Temperature,5643,82.985294,0.01205,793.080669,0.0,0.036967,108.247954,0.556549,True
1,Volume Flow RateRMS,5643,376.2,0.002658,905.221917,0.0,0.017613,50.559603,0.113351,True
2,Current,5643,38.387755,0.02605,17.447385,9.898089e-07,0.004891,13.859702,0.00124,True
3,Accelerometer1RMS,5643,352.6875,0.002835,80.649571,2.519899e-05,0.003748,10.608611,0.402505,True
4,Pressure,5643,6.080819,0.164452,10.720588,6.934479e-05,0.00339,9.592698,0.000361,True
5,Thermocouple,5643,245.347826,0.004076,2511.179844,7.906758e-05,0.003344,9.461043,0.744791,True
6,Voltage,5643,9.358209,0.106858,13.908658,0.0004059569,0.002765,7.820086,0.004198,True
7,Accelerometer2RMS,5643,15.007979,0.066631,18.225443,0.04586829,0.001092,3.083666,0.423163,False


| 指標（列名） | 何を表すか | 値の解釈（大きい/小さい） | 判断の基本 | よくある落とし穴 | 追加で見ると良いもの |
|---|---|---|---|---|---|
| `n` | その列で実際に使えたサンプル数（欠損除外後） | 小さいほど不安定 | `n < max(50, 5×min_period)` 付近だと結論を出さない | 欠損が多い列は周期があっても判定が揺れる | 欠損率、補間の有無 |
| `best_period` | 推定された最有力周期（点数） | 値そのものが周期候補 | 近い値に集まれば信頼が上がる | 周期が無くても必ず何かしら出る（最大ピークは必ず存在） | `p_value`, `peak_ratio`, `acf_at_best` |
| `best_freq` | 推定された最有力周波数（Hz相当） | 大きいほど短周期 | `best_period` と同じ内容を周波数表示 | `fs` の設定ミスで解釈がズレる | `fs`（サンプリング間隔）の確認 |
| `peak_ratio` | スペクトル最大ピークの鋭さ（最大/中央値） | 大きいほど「周期ピークが目立つ」 | 大きい列ほど周期成分が強い可能性 | トレンド・外れ値・非正弦の繰返しでも大きくなり得る | トレンド除去、外れ値処理 |
| `p_value` | 「周期成分なし」でもこのピークが出る確率 | 小さいほど周期性が統計的に強い | 最小構成：`p_value < alpha` なら周期性あり（`is_periodic=True`） | 多数列で偶然当たりが増える（多重比較）／自己相関が強いと過小評価になり得る | 多重比較補正、残差の自己相関 |
| `R2` | 正弦波（cos/sin）でどれだけ説明できたか | 大きいほど「正弦波っぽい周期」 | `p_value` が小さくても `R2` が極小なら効果が小さい | 矩形波など非正弦だと `R2` が低めでも周期が本物の場合がある | 高調波も含めて再評価 |
| `F` | 正弦波成分の強さを表す統計量 | 大きいほど周期性が強い | `p_value` の元になる量（比較用） | `n` が大きいと小効果でも有意になりやすい | `R2` と併用して効果量を見る |
| `acf_at_best` | `best_period` だけ遅らせた自己相関 | 大きいほど「その周期で似ている」 | 目安：0.2以上などで周期らしさが増す | 位相ずれ・周期揺れ・非定常で低く出る（周期があっても下がる） | ACFのT,2T,3T を確認 |

### 多重比較（列が多いとき）の扱い（最小）
- 同時に M 列を判定すると「偶然の周期あり」が混ざる。
- 最も単純な対処：判定閾値を `alpha_adj = alpha / M` にして `p_value < alpha_adj` を使う（保守的）。

### 運用でのおすすめ最小ルール（事故が少ない）
- **周期性あり（統計的）**：`p_value < alpha_adj`
- **周期性あり（実用的に強い）**：上に加えて `peak_ratio` が十分大きい、かつ `acf_at_best` もそこそこ大きい  
  例：`p_value < 0.001` かつ `peak_ratio ≥ 5` かつ `acf_at_best ≥ 0.2`
