In [4]:
# %% ONE-CELL: Core30 Monthly Matrix → parquet + manifest(items[]) + optional S3 upload
# - 入力: ./data/parquet/core30_meta.parquet（ticker 必須）
# - 取得: yfinance(period="max", interval="1mo") 月次 Close → 前月比(%) を算出
# - 出力: ./data/parquet/core30_anomaly.parquet（["ticker","year","month","return_pct"]）
# - 付随: manifest.json（共通フォーマット: {"generated_at", "items":[{key,bytes,sha256,mtime}], "note"}）を upsert
# - S3: .env.s3 / .env があれば読み込み、parquet/ と manifest.json を s3://{BUCKET}/{PREFIX} 配下へアップロード

from pathlib import Path
from datetime import datetime, timezone
import os, json, hashlib
import pandas as pd
import numpy as np
import yfinance as yf

# ========== .env.s3 / .env の自動ロード ==========
try:
    from dotenv import load_dotenv
    for _p in (Path(".env.s3"), Path(".env")):
        if _p.exists():
            load_dotenv(dotenv_path=_p, override=False)
except Exception:
    pass  # python-dotenv が無くても続行

# ========== 定数/入出力 ==========
PARQUET_DIR   = Path("./data/parquet")
META_PARQUET  = PARQUET_DIR / "core30_meta.parquet"
OUT_PARQUET   = PARQUET_DIR / "core30_anomaly.parquet"
MANIFEST_PATH = PARQUET_DIR / "manifest.json"
ANOMALY_KEY   = "core30_anomaly.parquet"  # manifest.items[].key（S3 上は PREFIX + この key）

# ========== S3 環境変数 ==========
DATA_BUCKET    = os.getenv("DATA_BUCKET")              # 例: dash-plotly
PARQUET_PREFIX = os.getenv("PARQUET_PREFIX", "parquet/")  # 例: parquet/
AWS_REGION     = os.getenv("AWS_REGION")               # 例: ap-northeast-1
AWS_PROFILE    = os.getenv("AWS_PROFILE")              # 例: default

# ========== ユーティリティ ==========
def _sha256_of(path: Path, chunk_size: int = 1024 * 1024) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(chunk_size), b""):
            h.update(chunk)
    return h.hexdigest()

def _load_manifest_items(path: Path) -> list[dict]:
    """共通フォーマットの manifest（items 配列）を読み、無ければ空配列。"""
    if not path.exists():
        return []
    try:
        obj = json.loads(path.read_text(encoding="utf-8"))
        items = obj.get("items", [])
        # safety: 配列のみ採用
        return items if isinstance(items, list) else []
    except Exception:
        return []

def _upsert_manifest_item(items: list[dict], key: str, local_path: Path) -> list[dict]:
    """items 配列に key を upsert し、bytes/sha256/mtime を更新して返す。"""
    stat = local_path.stat()
    newitem = {
        "key": key,
        "bytes": stat.st_size,
        "sha256": _sha256_of(local_path),
        "mtime": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(),
    }
    out = []
    found = False
    for it in items:
        if isinstance(it, dict) and it.get("key") == key:
            out.append(newitem)
            found = True
        else:
            out.append(it)
    if not found:
        out.append(newitem)
    # key で安定ソート
    out.sort(key=lambda d: d.get("key", ""))
    return out

def _write_manifest(path: Path, items: list[dict]) -> None:
    payload = {
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "items": items,
        "note": "Auto-generated. Do not edit by hand."
    }
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_suffix(path.suffix + ".tmp")
    tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
    tmp.replace(path)

def _maybe_s3_client():
    """boto3 クライアント（環境が揃っていなければ None）"""
    if not DATA_BUCKET:
        return None
    try:
        import boto3
        session_kwargs = {}
        if AWS_PROFILE:
            session_kwargs["profile_name"] = AWS_PROFILE
        if AWS_REGION:
            session_kwargs["region_name"] = AWS_REGION
        session = boto3.Session(**session_kwargs)
        return session.client("s3")
    except Exception as e:
        print(f"[WARN] boto3 init failed: {e}")
        return None

def _maybe_upload_files_s3(files: list[Path]):
    s3 = _maybe_s3_client()
    if s3 is None:
        print("[INFO] S3 upload skipped (no bucket or boto3 init failed).")
        return
    for p in files:
        key = f"{PARQUET_PREFIX}{p.name}"
        extra = {
            "ContentType": "application/octet-stream",
            "CacheControl": "max-age=60",
            "ServerSideEncryption": "AES256",
        }
        try:
            s3.upload_file(str(p), DATA_BUCKET, key, ExtraArgs=extra)
            print(f"[OK] uploaded: s3://{DATA_BUCKET}/{key}")
        except Exception as e:
            print(f"[WARN] upload failed: {p} → s3://{DATA_BUCKET}/{key} : {e}")

# ========== 入力チェック ==========
if not META_PARQUET.exists():
    raise FileNotFoundError(f"not found: {META_PARQUET}")

meta = pd.read_parquet(META_PARQUET, engine="pyarrow")
if "ticker" not in meta.columns:
    raise ValueError("meta parquet must contain 'ticker'")

tickers = (
    meta["ticker"].dropna().astype("string").str.strip().unique().tolist()
)
if not tickers:
    raise RuntimeError("No tickers found in core30_meta.parquet")

# ========== 月次データ取得 → 前月比(%) ==========
def _download_monthly_close(ticker: str) -> pd.Series:
    try:
        df = yf.download(ticker, period="max", interval="1mo", progress=False, auto_adjust=False)
    except Exception:
        df = pd.DataFrame()

    if df is None or df.empty:
        return pd.Series(dtype=float, name=ticker)

    # yfinance の列形状差異に耐性
    if isinstance(df.columns, pd.MultiIndex):
        # 代表的ケース: ('Close',) 軸がある
        if "Close" in df.columns.get_level_values(0):
            sub = df.xs("Close", axis=1, level=0)
        elif "Close" in df.columns.get_level_values(1):
            sub = df.xs("Close", axis=1, level=1)
        else:
            return pd.Series(dtype=float, name=ticker)
        s = sub[ticker] if ticker in getattr(sub, "columns", []) else sub.squeeze()
    else:
        s = df["Close"] if "Close" in df.columns else pd.Series(dtype=float, name=ticker)

    s = pd.to_numeric(pd.Series(s), errors="coerce").dropna()
    s.index = pd.to_datetime(s.index)
    s.name = ticker
    return s

rows = []
for i, tic in enumerate(tickers, 1):
    s_close = _download_monthly_close(tic)
    if s_close.empty:
        continue
    ret_m = (s_close.pct_change() * 100.0).dropna()
    if ret_m.empty:
        continue

    df = (
        ret_m.to_frame("return_pct")
             .assign(year=lambda d: d.index.year,
                     month=lambda d: d.index.month)
             [["year","month","return_pct"]]
             .astype({"year":"int64","month":"int64"})
    )
    df["ticker"] = tic
    rows.append(df[["ticker","year","month","return_pct"]])

    if i % 5 == 0:
        print(f"[{i}/{len(tickers)}] processed...")

if not rows:
    raise RuntimeError("No monthly returns computed.")

out = pd.concat(rows, ignore_index=True)

# ========== 保存 ==========
OUT_PARQUET.parent.mkdir(parents=True, exist_ok=True)
out.to_parquet(OUT_PARQUET, engine="pyarrow", index=False)
print(f"[OK] saved: {OUT_PARQUET.resolve()} rows={len(out)}")

# ========== manifest.json を共通フォーマットで upsert ==========
items = _load_manifest_items(MANIFEST_PATH)
items = _upsert_manifest_item(items, ANOMALY_KEY, OUT_PARQUET)
_write_manifest(MANIFEST_PATH, items)
print(f"[OK] manifest updated: {MANIFEST_PATH}")

# ========== S3 へアップロード（本体 + manifest） ==========
_to_upload = [OUT_PARQUET, MANIFEST_PATH]
_maybe_upload_files_s3(_to_upload)

# 先頭表示（確認用）
out.head()


[5/30] processed...
[10/30] processed...
[15/30] processed...
[20/30] processed...
[25/30] processed...
[30/30] processed...
[OK] saved: /Users/hiroyukiyamanaka/Desktop/python_stock/dash_plotly/data/parquet/core30_anomaly.parquet rows=8690
[OK] manifest updated: data/parquet/manifest.json
[OK] uploaded: s3://dash-plotly/parquet/core30_anomaly.parquet
[OK] uploaded: s3://dash-plotly/parquet/manifest.json


Unnamed: 0,ticker,year,month,return_pct
0,2914.T,2000,2,-11.599099
1,2914.T,2000,3,-6.369427
2,2914.T,2000,4,8.163265
3,2914.T,2000,5,3.773585
4,2914.T,2000,6,12.848485
