In [None]:
# パッケージをインストール
%pip install -qe ..

In [2]:
import duckdb
import japanize_matplotlib  # noqa: F401
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

In [3]:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

## 基本設定


In [4]:
plt.style.use("ggplot")

In [None]:
with duckdb.connect("../data/raw/etl_from_sf.duckdb", read_only=True) as con:
    df = con.execute("SELECT * FROM raw_clean_with_pool").df()
df.head()

In [6]:
raw_clean_with_pool = df

## データ変換


In [9]:
# タイムスタンプを日時に変換
raw_clean_with_pool["datetime"] = pd.to_datetime(raw_clean_with_pool["hour_ts"], unit="s")

In [10]:
# IDからプールアドレスとインデックスを抽出
raw_clean_with_pool["pool_address"] = raw_clean_with_pool["id"].str.split("-").str[0]
raw_clean_with_pool["block_index"] = raw_clean_with_pool["id"].str.split("-").str[1]

In [12]:
# フィーティアごとのプール数
fee_tier_counts = raw_clean_with_pool["fee_tier"].value_counts().reset_index()
fee_tier_counts.columns = ["fee_tier", "count"]

In [None]:
# 基本統計量の確認
raw_clean_with_pool.describe()

In [14]:
# 数値カラムのキャスト
num_cols = [
    "volume_usd",
    "tvl_usd",
    "fees_usd",
    "open_price",
    "high_price",
    "low_price",
    "close_price",
    "liquidity",
    "volume_token0",
    "volume_token1",
    "tx_count",
    "tick",
    "sqrt_price",
]
for c in num_cols:
    raw_clean_with_pool[c] = pd.to_numeric(raw_clean_with_pool[c], errors="coerce")

### 異常値の検出 & クリーニング


In [15]:
#  clip で下限 0、 or 上限 percentile 99.9% に制限
raw_clean_with_pool["tvl_usd"] = raw_clean_with_pool["tvl_usd"].clip(lower=0)
upper = raw_clean_with_pool["volume_usd"].quantile(0.999)
raw_clean_with_pool = raw_clean_with_pool[raw_clean_with_pool["volume_usd"] <= upper]

In [None]:
# 欠損の確認（型変換で NaN が入る可能性があるため）
print(raw_clean_with_pool[num_cols].isna().sum())
raw_clean_with_pool = raw_clean_with_pool.dropna(subset=num_cols)

## 可視化


### 時系列分析


In [None]:
# 時系列での取引量推移（全体）
daily_volume = (
    raw_clean_with_pool.groupby(raw_clean_with_pool["datetime"].dt.date).agg({"volume_usd": "sum"}).reset_index()
)

plt.figure(figsize=(12, 6))
sns.lineplot(data=daily_volume, x="datetime", y="volume_usd", marker="o", markersize=4, linewidth=1.5)
plt.title("日次総取引量")
plt.xlabel("日付")
plt.ylabel("取引量 (USD)")
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# 主要プールの時系列分析
top_pools = raw_clean_with_pool.groupby("pool_address")["volume_usd"].sum().nlargest(5).index

for pool in top_pools:
    dfp = raw_clean_with_pool[raw_clean_with_pool["pool_address"] == pool].copy()

    # 日付で集計
    dfp["date"] = dfp["datetime"].dt.date
    dfp_agg = dfp.groupby("date")[["volume_usd", "tvl_usd"]].sum().reset_index()

    # 二軸プロット作成
    fig, ax1 = plt.subplots(figsize=(12, 6))

    # 左軸：取引量
    color1 = "tab:blue"
    line1 = sns.lineplot(
        data=dfp_agg, x="date", y="volume_usd", marker="o", markersize=4, linewidth=1.5, color=color1, ax=ax1
    )
    ax1.set_xlabel("日付")
    ax1.set_ylabel("取引量 (USD)", color=color1)
    ax1.tick_params(axis="y", labelcolor=color1)

    # 右軸：TVL
    ax2 = ax1.twinx()
    color2 = "tab:red"
    line2 = sns.lineplot(
        data=dfp_agg, x="date", y="tvl_usd", marker="o", markersize=4, linewidth=1.5, color=color2, ax=ax2
    )
    ax2.set_ylabel("TVL (USD)", color=color2)
    ax2.tick_params(axis="y", labelcolor=color2)

    # グラフタイトルとフォーマット設定
    plt.title(f"プール {pool[:10]}… の日次推移")
    ax1.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
    plt.xticks(rotation=45)

    # 凡例追加
    lines1, labels1 = ax1.get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    ax1.legend(lines1 + lines2, ["日次取引量", "日次TVL"], loc="upper left")

    # プロット調整と表示
    sns.despine(left=False, right=False)
    plt.tight_layout()
    plt.show()

# オプション：すべてのトッププールを1つのグラフで比較
plt.figure(figsize=(14, 8))
pool_data = []

for pool in top_pools:
    dfp = raw_clean_with_pool[raw_clean_with_pool["pool_address"] == pool].copy()
    dfp["date"] = dfp["datetime"].dt.date
    dfp_agg = dfp.groupby("date")["volume_usd"].sum().reset_index()
    dfp_agg["pool"] = pool[:8] + "..."  # アドレスの先頭8文字
    pool_data.append(dfp_agg)

all_pools_data = pd.concat(pool_data)

sns.lineplot(data=all_pools_data, x="date", y="volume_usd", hue="pool", marker="o", markersize=4)
plt.title("トッププール5件の日次取引量比較")
plt.xlabel("日付")
plt.ylabel("取引量 (USD)")
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
plt.xticks(rotation=45)
plt.legend(title="Pool")
plt.tight_layout()
plt.show()

In [None]:
# 主要プールの時系列分析
for pool in top_pools:
    dfp = raw_clean_with_pool[raw_clean_with_pool["pool_address"] == pool].copy()

    # datetime を index にして日次リサンプリング
    dfp = dfp.set_index("datetime").sort_index()
    dfp_agg = dfp[["volume_usd", "tvl_usd"]].resample("D").sum().reset_index()

    # 二軸プロット作成
    fig, ax1 = plt.subplots(figsize=(12, 6))

    # 左軸：取引量
    color1 = "tab:blue"
    line1 = sns.lineplot(
        data=dfp_agg, x="datetime", y="volume_usd", marker="o", markersize=4, linewidth=1.5, color=color1, ax=ax1
    )
    ax1.set_xlabel("日付")
    ax1.set_ylabel("取引量 (USD)", color=color1)
    ax1.tick_params(axis="y", labelcolor=color1)

    # 右軸：TVL
    ax2 = ax1.twinx()
    color2 = "tab:red"
    line2 = sns.lineplot(
        data=dfp_agg, x="datetime", y="tvl_usd", marker="o", markersize=4, linewidth=1.5, color=color2, ax=ax2
    )
    ax2.set_ylabel("TVL (USD)", color=color2)
    ax2.tick_params(axis="y", labelcolor=color2)

    # グラフタイトルとフォーマット設定
    plt.title(f"プール {pool[:10]}… の日次推移 (resample)")
    ax1.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
    plt.xticks(rotation=45)

    # 凡例追加
    lines1, labels1 = ax1.get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    ax1.legend(lines1 + lines2, ["日次取引量", "日次TVL"], loc="upper left")

    # プロット調整と表示
    sns.despine(left=False, right=False)
    plt.tight_layout()
    plt.show()

# 追加オプション：すべてのトッププールを1つのグラフで比較（resampleバージョン）
plt.figure(figsize=(14, 8))
pool_data = []

for pool in top_pools:
    dfp = raw_clean_with_pool[raw_clean_with_pool["pool_address"] == pool].copy()
    # resampleを使用
    dfp = dfp.set_index("datetime").sort_index()
    dfp_agg = dfp[["volume_usd"]].resample("D").sum().reset_index()
    dfp_agg["pool"] = pool[:8] + "..."  # アドレスの先頭8文字
    pool_data.append(dfp_agg)

all_pools_data = pd.concat(pool_data)

sns.lineplot(data=all_pools_data, x="datetime", y="volume_usd", hue="pool", marker="o", markersize=4)
plt.title("トッププール5件の日次取引量比較 (resample)")
plt.xlabel("日付")
plt.ylabel("取引量 (USD)")
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
plt.xticks(rotation=45)
plt.legend(title="Pool")
plt.tight_layout()
plt.show()

## 特徴量エンジニアリング


In [None]:
from matplotlib.cm import ScalarMappable
from matplotlib.colors import Normalize


# プールごとに時系列特徴量を作成（この部分は同じ）
def add_time_features(group):
    # 時系列順にソート
    group = group.sort_values("datetime")

    # 移動平均（24時間）
    group["volume_ma24"] = group["volume_usd"].rolling(24).mean()

    # 移動平均からの乖離率（異常検知の重要指標）
    group["volume_deviation"] = (group["volume_usd"] / (group["volume_ma24"] + 1e-10)) - 1

    # ボラティリティ（24時間の標準偏差）
    group["volume_volatility"] = group["volume_usd"].rolling(24).std()

    # TVLに対する取引量の比率（高すぎると異常の可能性）
    group["volume_tvl_ratio"] = group["volume_usd"] / (group["tvl_usd"] + 1e-10)

    # 価格変動率
    group["price_change"] = (group["close_price"] / group["open_price"]) - 1

    return group


# 各プールに対して特徴を追加
enhanced_df = raw_clean_with_pool.groupby("pool_address").apply(add_time_features)

# 外れ値を制限（可視化のため）
upper_tvl_ratio = np.percentile(enhanced_df["volume_tvl_ratio"].dropna(), 99)
upper_deviation = np.percentile(enhanced_df["volume_deviation"].dropna(), 99)
lower_deviation = np.percentile(enhanced_df["volume_deviation"].dropna(), 1)

plot_df = enhanced_df.copy()
plot_df["volume_tvl_ratio"] = plot_df["volume_tvl_ratio"].clip(upper=upper_tvl_ratio)
plot_df["volume_deviation"] = plot_df["volume_deviation"].clip(upper=upper_deviation, lower=lower_deviation)

# 異常スコアの可視化
# カラーマップの設定
cmap = plt.cm.coolwarm
norm = Normalize(vmin=plot_df["price_change"].quantile(0.05), vmax=plot_df["price_change"].quantile(0.95))

# 散布図プロット
fig, ax = plt.subplots(figsize=(12, 8))
scatter = sns.scatterplot(
    data=plot_df,
    x="volume_tvl_ratio",
    y="volume_deviation",
    hue="price_change",
    palette=cmap,
    hue_norm=norm,
    alpha=0.7,
    edgecolor="none",
    s=50,  # ポイントサイズ
    ax=ax,  # Axesを明示的に指定
)

# カラーバーの追加
sm = ScalarMappable(cmap=cmap, norm=norm)
sm.set_array([])
cbar = plt.colorbar(sm, ax=ax)
cbar.set_label("価格変動率")

# グラフの設定
plt.title("取引量の異常指標", fontsize=16)
plt.xlabel("取引量/TVL比率")
plt.ylabel("移動平均からの乖離率")

# 凡例の非表示（カラーバーがあるため）
scatter.get_legend().remove()

# トゥールチップ代わりにポイントに注釈を付ける場合のサンプル（上位5件の異常値）
anomaly_score = plot_df["volume_deviation"].abs() * plot_df["volume_tvl_ratio"]
top_anomalies = plot_df.loc[anomaly_score.nlargest(5).index]

for idx, row in top_anomalies.iterrows():
    plt.annotate(
        f"Pool: {row['pool_address'][:8]}...\nDate: {row['datetime'].strftime('%Y-%m-%d %H:%M')}",
        xy=(row["volume_tvl_ratio"], row["volume_deviation"]),
        xytext=(10, 10),
        textcoords="offset points",
        bbox=dict(boxstyle="round,pad=0.5", fc="yellow", alpha=0.5),
        arrowprops=dict(arrowstyle="->", connectionstyle="arc3,rad=0"),
    )

plt.tight_layout()
plt.show()

# オプション：異常検知の時系列分析（移動平均からの乖離率の時系列プロット）
# トップ5の異常値を持つプールを選択
top_anomaly_pools = top_anomalies["pool_address"].unique()[:3]  # 上位3つのプールを選択

plt.figure(figsize=(14, 8))
for pool in top_anomaly_pools:
    pool_data = enhanced_df[enhanced_df["pool_address"] == pool].copy()
    pool_data = pool_data.sort_values("datetime")

    sns.lineplot(data=pool_data, x="datetime", y="volume_deviation", label=f"Pool {pool[:8]}...")

plt.axhline(y=0, color="r", linestyle="--", alpha=0.3)
plt.title("移動平均からの乖離率の時系列推移（異常検知上位プール）")
plt.xlabel("日時")
plt.ylabel("移動平均からの乖離率")
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

### 多変量相関分析とプール特性の把握


In [None]:
# 1. 相関ヒートマップ
numeric_cols = ["volume_usd", "tvl_usd", "fees_usd", "tx_count", "liquidity", "volume_deviation", "volume_tvl_ratio"]
corr = enhanced_df[numeric_cols].corr()

plt.figure(figsize=(12, 10))
mask = np.triu(np.ones_like(corr, dtype=bool))  # 上三角形をマスク（オプション）
heatmap = sns.heatmap(
    corr,
    annot=True,  # 値を表示
    fmt=".2f",  # 小数点以下2桁表示
    cmap="coolwarm",  # カラーマップ
    vmin=-1,
    vmax=1,  # 値の範囲
    center=0,  # 中央値（カラーマップの中心）
    square=True,  # 正方形のセル
    linewidths=0.5,  # セル間の線の幅
    cbar_kws={"shrink": 0.8},  # カラーバーの設定
)
plt.title("指標間の相関係数", fontsize=16, pad=20)
plt.tight_layout()
plt.show()


# 2. フィーティア別の統計比較
def anomaly_rate(x):
    return (abs(x) > 3).mean()


fee_tier_stats = enhanced_df.groupby("fee_tier").agg(
    {
        "volume_usd": ["mean", "std", "max"],
        "tvl_usd": ["mean", "std"],
        "tx_count": "mean",
        "volume_deviation": ["mean", "std", anomaly_rate],
    }
)

# カラム名をフラット化
fee_tier_stats.columns = [
    "_".join(col).strip() if isinstance(col, tuple) else col for col in fee_tier_stats.columns.values
]
fee_tier_stats = fee_tier_stats.reset_index()

# フィーティア別の異常発生率比較
plt.figure(figsize=(10, 6))
bar = sns.barplot(data=fee_tier_stats, x="fee_tier", y="volume_deviation_anomaly_rate", palette="viridis")

# 値を棒グラフの上に表示
for i, p in enumerate(bar.patches):
    height = p.get_height()
    bar.text(p.get_x() + p.get_width() / 2.0, height + 0.01, f"{height:.2%}", ha="center", fontsize=10)

plt.title("フィーティア別の異常発生率（volume_deviation > 3σ）", fontsize=14)
plt.xlabel("フィーティア")
plt.ylabel("異常発生率")
plt.grid(axis="y", alpha=0.3)
plt.tight_layout()
plt.show()

# 3. トークンペアの組み合わせ分析
token_pairs = enhanced_df.groupby(["token0_symbol", "token1_symbol"]).size().reset_index(name="count")
top_pairs = token_pairs.sort_values("count", ascending=False).head(15)

# 表示用のラベルにトークンペアを結合
top_pairs["pair_label"] = top_pairs["token0_symbol"] + "-" + top_pairs["token1_symbol"]

plt.figure(figsize=(12, 10))
bars = sns.barplot(
    data=top_pairs,
    y="pair_label",  # 結合したラベルを使用
    x="count",
    palette="muted",
    orient="h",  # 水平バーチャート
)

# 値を棒グラフの中に表示
for i, p in enumerate(bars.patches):
    width = p.get_width()
    bars.text(
        width * 0.98,
        p.get_y() + p.get_height() / 2.0,
        f"{int(width)}",
        ha="right",
        va="center",
        color="white",
        fontweight="bold",
        fontsize=10,
    )

plt.title("最も一般的なトークンペア", fontsize=14)
plt.xlabel("件数")
plt.ylabel("")  # yラベルは不要
plt.tight_layout()
plt.show()

# ボーナス：フィーティア別の各種統計量のヒートマップ表示
# 主要な統計量のみを選択
key_stats = ["volume_usd_mean", "tvl_usd_mean", "tx_count_mean", "volume_deviation_anomaly_rate"]
stats_df = fee_tier_stats[["fee_tier"] + key_stats].set_index("fee_tier")

# フォーマットとラベル設定
formatted_labels = {
    "volume_usd_mean": "平均取引量",
    "tvl_usd_mean": "平均TVL",
    "tx_count_mean": "平均取引回数",
    "volume_deviation_anomaly_rate": "異常発生率",
}

# 正規化（各指標を0-1スケールに）
normalized_stats = stats_df.copy()
for col in key_stats:
    normalized_stats[col] = (stats_df[col] - stats_df[col].min()) / (stats_df[col].max() - stats_df[col].min())

plt.figure(figsize=(10, 8))
sns.heatmap(
    normalized_stats,
    annot=stats_df,  # 元の値を表示
    fmt=".2g",  # 一般形式で表示
    cmap="YlGnBu",
    linewidths=0.5,
    yticklabels=normalized_stats.index,
    xticklabels=[formatted_labels[col] for col in key_stats],
)
plt.title("フィーティア別の主要統計指標", fontsize=14)
plt.tight_layout()
plt.show()

## ベースラインモデルの定義


In [None]:
# 異常検知のための特徴選択
anomaly_features = ["volume_usd", "tvl_usd", "tx_count", "volume_deviation", "volume_tvl_ratio", "price_change"]

# 欠損値処理
model_df = enhanced_df.dropna(subset=anomaly_features)

# インデックスをリセット（重要: インデックスの問題を解決するため）
model_df = model_df.reset_index(drop=True)

# スケーリング
scaler = StandardScaler()
scaled_features = scaler.fit_transform(model_df[anomaly_features])

# Isolation Forestで異常検知
clf = IsolationForest(contamination=0.01, random_state=42)
model_df["anomaly_score"] = clf.fit_predict(scaled_features)
model_df["is_anomaly"] = model_df["anomaly_score"] == -1

# 検出された異常のみのデータフレーム
anomaly_df = model_df[model_df["is_anomaly"]].copy()

# プール名をより短く、読みやすくする
anomaly_df["pool_short"] = anomaly_df["pool_address"].apply(lambda x: x[:8] + "...")

# サイズ変数の調整（視覚的に見やすくするため）
if len(anomaly_df) > 0:
    size_var = np.abs(anomaly_df["volume_deviation"])
    if size_var.max() > size_var.min():
        size_var = 50 + (size_var - size_var.min()) / (size_var.max() - size_var.min()) * 200
    else:
        size_var = 100  # 全て同じ値の場合のデフォルトサイズ
else:
    size_var = []  # 空の場合

# 1. 検出された異常の可視化
if len(anomaly_df) > 0:
    plt.figure(figsize=(14, 8))

    # 色分け用のパレット設定（プール数に応じて色を生成）
    unique_pools = anomaly_df["pool_short"].nunique()
    palette = sns.color_palette("husl", unique_pools)

    # 散布図プロット
    scatter = sns.scatterplot(
        data=anomaly_df,
        x="datetime",
        y="volume_usd",
        hue="pool_short",
        size=size_var,
        sizes=(50, 250),  # サイズの範囲
        palette=palette,
        alpha=0.7,
        edgecolor="black",
        linewidth=0.5,
    )

    # グラフの設定
    plt.title("検出された異常（Isolation Forest）", fontsize=16)
    plt.xlabel("日時")
    plt.ylabel("取引量 (USD)")

    # x軸の日付フォーマット調整
    plt.gcf().autofmt_xdate()

    # 凡例調整（多すぎる場合は表示を制限）
    if unique_pools > 10:
        # 凡例を上位10件に制限
        handles, labels = scatter.get_legend_handles_labels()
        plt.legend(handles[:11], labels[:11], title="Pool", fontsize=9, loc="best")
    else:
        plt.legend(title="Pool", fontsize=9, loc="best")

    # 注釈追加（トップ5の異常値）
    if len(anomaly_df) >= 5:
        top_anomalies = anomaly_df.nlargest(5, "volume_usd")
    else:
        top_anomalies = anomaly_df

    for idx, row in top_anomalies.iterrows():
        plt.annotate(
            f"{row['token0_symbol']}-{row['token1_symbol']}",
            xy=(row["datetime"], row["volume_usd"]),
            xytext=(10, 0),
            textcoords="offset points",
            fontsize=9,
            bbox=dict(boxstyle="round,pad=0.3", fc="yellow", alpha=0.5),
        )

    plt.tight_layout()
    plt.show()
else:
    print("異常は検出されませんでした。")

# 2. プール別の異常数集計
# as_index=Falseを指定して明示的にカラムとして処理
anomaly_by_pool = model_df.groupby("pool_address", as_index=False)["is_anomaly"].sum()
anomaly_by_pool = anomaly_by_pool[anomaly_by_pool["is_anomaly"] > 0].sort_values("is_anomaly", ascending=False)

if len(anomaly_by_pool) > 0:
    top10_pools = anomaly_by_pool.head(10).copy()

    # プールアドレスを短縮
    top10_pools["pool_short"] = top10_pools["pool_address"].apply(lambda x: x[:10] + "...")

    plt.figure(figsize=(12, 7))
    bars = sns.barplot(
        data=top10_pools,
        x="is_anomaly",
        y="pool_short",
        palette="viridis",
        orient="h",  # 水平棒グラフ
    )

    # バーに数値を表示
    for i, p in enumerate(bars.patches):
        width = p.get_width()
        plt.text(
            width + 0.3,  # 少し右にオフセット
            p.get_y() + p.get_height() / 2,
            f"{int(width)}",
            ha="left",
            va="center",
        )

    plt.title("異常が最も多く検出されたプールTop10", fontsize=14)
    plt.xlabel("異常データ数")
    plt.ylabel("プールアドレス")
    plt.tight_layout()
    plt.show()
else:
    print("異常は検出されませんでした。")

# 3. ボーナス：トークンペア別の異常発生数
if len(anomaly_df) > 0:
    # トークンペア情報を結合
    anomaly_df["token_pair"] = anomaly_df["token0_symbol"] + "-" + anomaly_df["token1_symbol"]

    # トークンペア別の異常数を集計
    anomaly_by_pair = anomaly_df.groupby("token_pair").size().reset_index(name="count")
    top_pairs = anomaly_by_pair.sort_values("count", ascending=False).head(10)

    plt.figure(figsize=(12, 7))
    bars = sns.barplot(data=top_pairs, x="count", y="token_pair", palette="muted", orient="h")

    # バーに数値を表示
    for i, p in enumerate(bars.patches):
        width = p.get_width()
        plt.text(width + 0.1, p.get_y() + p.get_height() / 2, f"{int(width)}", ha="left", va="center")

    plt.title("異常が最も多く検出されたトークンペアTop10", fontsize=14)
    plt.xlabel("異常データ数")
    plt.ylabel("トークンペア")
    plt.tight_layout()
    plt.show()