# INIT

In [None]:
import pandas as pd
import yfinance as yf
from IPython.display import display

# Daftar ticker yang ingin diambil
tickers = [
    # Saham LQ45 awal:
    "BBRI.JK", "BMRI.JK", "BBCA.JK", "TLKM.JK", "BBNI.JK", "BRIS.JK", "BRPT.JK", 
    "PTRO.JK", "ADRO.JK", "ASII.JK", "ANTM.JK", "BBTN.JK", "CPIN.JK", "ERAA.JK", 
    "GGRM.JK", "HMSP.JK", "ICBP.JK", "INCO.JK", "INDF.JK", "INKP.JK", "INTP.JK", 
    "ITMG.JK", "JPFA.JK", "JSMR.JK", "KLBF.JK", "MDKA.JK", "MEDC.JK", "MIKA.JK", 
    "MNCN.JK", "PGAS.JK", "PTBA.JK", "SCMA.JK", "SMGR.JK", "SMRA.JK", "TBIG.JK", 
    "TINS.JK", "TKIM.JK", "TOWR.JK", "UNTR.JK", "UNVR.JK", "WSKT.JK", "EXCL.JK", "AMRT.JK",
    
    # Grup Prajogo Pangestu
    "TPIA.JK", "STAR.JK", "BREN.JK", "CUAN.JK",
    
    # Grup Salim
    "IMAS.JK", "ROTI.JK", "DPNS.JK",
    
    # Grup Hartono (Djarum)
    "BELI.JK",
    
    # Grup Chairul Tanjung
    "MEGA.JK",
    
    # Grup Garibaldi (Boy) Thohir
    "GOTO.JK",
    
    # Grup Mochtar Riady (Lippo)
    "LPKR.JK", "SILO.JK", "LPPF.JK", "MLPL.JK",
    
    # Grup Hary Tanoesoedibjo (MNC)
    "MSIN.JK", "BCAP.JK", "BABP.JK",
    
    # Grup Eddy Kusnadi Sariaatmadja (Emtek)
    "EMTK.JK", "BUKA.JK",
    
    # Grup Peter Sondakh (Rajawali)
    "ARCI.JK", "BWPT.JK", "META.JK",
    
    # Grup Alexander Tedja (Pakuwon)
    "PWON.JK",
    
    # Grup Eka Tjipta Widjaja (Sinarmas)
    "SMAR.JK", "DSSA.JK", "BSDE.JK",
    
    # Grup Sugianto Kusuma (Aguan - Agung Sedayu)
    "PANI.JK",
    
    # Grup Saratoga (Edwin Soeryadjaya & Sandiaga Uno)
    "SRTG.JK", "MPMX.JK", "PALM.JK", "AGII.JK", "PRAY.JK"
]

# Simpan semua hasil dalam satu list (untuk warm-up stage)
data_rows = []
failed_tickers = []

def warm_up_data():
    for ticker in tickers:
        try:
            # Ambil data 8 hari terakhir untuk setiap ticker
            data = yf.download(ticker, interval="1m", period="8d", timeout=20)
            # Ubah timezone ke Jakarta (WIB)
            data.index = data.index.tz_convert("Asia/Jakarta")

            # Ambil tanggal unik
            data['Date'] = data.index.date
            unique_dates = data['Date'].unique()

            for date in unique_dates:
                # Filter data hanya untuk tanggal ini
                daily_data = data[data['Date'] == date]

                # Ambil data menit pertama (09:00) dan kedua (09:01)
                first_minute = daily_data.between_time("09:00:00", "09:00:59")
                second_minute = daily_data.between_time("09:01:00", "09:01:59")

                # Ambil harga penutupan harian (16:00)
                close_data = daily_data.between_time("15:59:00", "16:00:00")
                close_price = close_data["Close"].iloc[0] if not close_data.empty else None

                # Pastikan tidak ada data yang kosong
                if not first_minute.empty and not second_minute.empty:
                    data_rows.append([
                        ticker.replace(".JK", ""), date,
                        first_minute["Open"].iloc[0].item(), first_minute["High"].iloc[0].item(), 
                        first_minute["Low"].iloc[0].item(), first_minute["Close"].iloc[0].item(),
                        second_minute["Open"].iloc[0].item(), second_minute["High"].iloc[0].item(), 
                        second_minute["Low"].iloc[0].item(), second_minute["Close"].iloc[0].item(),
                        close_price.item() if close_price is not None else None
                    ])
        except Exception as e:
            print(f"⚠️ Gagal mengambil data {ticker}: {e}")
            failed_tickers.append(ticker)

    # Coba lagi untuk ticker yang gagal
    if failed_tickers:
        print("🔄 Mencoba ulang untuk ticker yang gagal...")
        for ticker in failed_tickers:
            download_data_for_ticker(ticker)

# Lakukan warming-up dengan mengunduh data
warm_up_data()

# Buat DataFrame hasil
columns = ["Ticker", "Tanggal", "O1", "H1", "L1", "C1", "O2", "H2", "L2", "C2", "Close Price"]
result_df = pd.DataFrame(data_rows, columns=columns)

# Tampilkan hasil data sebelum aplikasi logika
print("Data setelah warm-up:")
display(result_df)

# Logic Validation: Tambahkan Kolom is_2min_valid
if not result_df.empty:
    result_df['is_2min_valid'] = result_df.apply(
        lambda row: 1 if row['C1'] > row['O1'] and row['Close Price'] >= row['C2'] else 0, axis=1
    )

    # Setup tampilan Pandas
    pd.set_option('display.max_columns', None)
    pd.set_option('display.expand_frame_repr', False)
    pd.set_option('display.max_rows', 10)

    # Tampilkan hasil setelah logic validasi
    print("Data setelah logic validasi:")
    display(result_df)
else:
    print("Tidak ada data yang berhasil diproses.")

In [None]:
import pandas as pd
import yfinance as yf
from IPython.display import display

# Daftar ticker yang ingin diambil
tickers = [
    "BBRI.JK", "BMRI.JK", "BBCA.JK", "TLKM.JK", "BBNI.JK"
]

# Ticker untuk indeks Hang Seng dan Nikkei
index_tickers = {
    'hang_seng': "^HSI",
    'nikkei': "^N225"
}

# Simpan semua hasil dalam satu list (untuk warm-up stage)
data_rows = []
failed_tickers = []

def download_index_data(ticker, start, end):
    try:
        # Ambil data trading pada hari yang bersangkutan
        data = yf.download(ticker, start=start, end=end, interval="1d", timeout=20)  # Daily OHLC data
        if not data.empty:
            first_ohlc = data.iloc[0][['Open', 'High', 'Low', 'Close']]
            return first_ohlc.tolist()
        else:
            return [None, None, None, None]
    except Exception as e:
        print(f"⚠️ Gagal mengambil data {ticker}: {e}")
        return [None, None, None, None]

def warm_up_data():
    for ticker in tickers:
        try:
            # Ambil data 8 hari terakhir untuk setiap ticker
            data = yf.download(ticker, interval="1m", period="8d", timeout=20)
            # Ubah timezone ke Jakarta (WIB)
            data.index = data.index.tz_convert("Asia/Jakarta")

            # Ambil tanggal unik
            data['Date'] = data.index.date
            unique_dates = data['Date'].unique()

            for date in unique_dates:
                # Filter data hanya untuk tanggal ini
                daily_data = data[data['Date'] == date]

                # Load index values for Hang Seng and Nikkei
                hs_ohlc = download_index_data(index_tickers['hang_seng'], date, date + pd.Timedelta(days=1))
                nikkei_ohlc = download_index_data(index_tickers['nikkei'], date, date + pd.Timedelta(days=1))

                # Ambil data menit pertama (09:00) dan kedua (09:01)
                first_minute = daily_data.between_time("09:00:00", "09:00:59")
                second_minute = daily_data.between_time("09:01:00", "09:01:59")

                # Ambil harga penutupan harian (16:00)
                close_data = daily_data.between_time("15:59:00", "16:00:00")
                close_price = close_data["Close"].iloc[0] if not close_data.empty else None

                if not first_minute.empty and not second_minute.empty:
                    hs_open, hs_high, hs_low, hs_close = hs_ohlc
                    nikkei_open, nikkei_high, nikkei_low, nikkei_close = nikkei_ohlc

                    # Tentukan apakah Hang Seng dan Nikkei Bullish atau Bearish
                    hs_bull = 1 if (hs_close > hs_open) else 0
                    nikkei_bull = 1 if (nikkei_close > nikkei_open) else 0

                    data_rows.append([
                        date,
                        hs_open, hs_high, hs_low, hs_close,  # Hang Seng
                        nikkei_open, nikkei_high, nikkei_low, nikkei_close,  # Nikkei
                        ticker.replace(".JK", ""),
                        first_minute["Open"].iloc[0].item(), first_minute["High"].iloc[0].item(), 
                        first_minute["Low"].iloc[0].item(), first_minute["Close"].iloc[0].item(),
                        second_minute["Open"].iloc[0].item(), second_minute["High"].iloc[0].item(), 
                        second_minute["Low"].iloc[0].item(), second_minute["Close"].iloc[0].item(),
                        close_price.item() if close_price is not None else None,
                        hs_bull,  # Kolom untuk ngeset bullish atau bearish Hang Seng
                        nikkei_bull  # Kolom untuk ngeset bullish atau bearish Nikkei
                    ])

        except Exception as e:
            print(f"⚠️ Gagal mengambil data {ticker}: {e}")
            failed_tickers.append(ticker)

    if failed_tickers:
        print("🔄 Mencoba ulang untuk ticker yang gagal...")
        for ticker in failed_tickers:
            download_index_data(ticker)

# Lakukan warm-up dengan mengunduh data
warm_up_data()

# Buat DataFrame hasil dengan kolom terstruktur
columns = [
    "TGL",
    "HS_O", "HS_H", "HS_L", "HS_C", 
    "Ni_O", "Ni_H", "Ni_L", "Ni_C",
    "SAHAM", 
    "O1", "H1", "L1", "C1", 
    "O2", "H2", "L2", "C2", 
    "CLOSE",
    "HS_Bull", "Ni_Bull"  # Kolom tambahan untuk bull/bear
]
result_df = pd.DataFrame(data_rows, columns=columns)

# Batasi semua nilai float ke 2 desimal
float_columns = [
    "HS_O", "HS_H", "HS_L", "HS_C", 
    "Ni_O", "Ni_H", "Ni_L", "Ni_C",
    "O1", "H1", "L1", "C1", 
    "O2", "H2", "L2", "C2", 
    "CLOSE"
]

result_df[float_columns] = result_df[float_columns].round(2)

# Tampilkan hasil data sebelum aplikasi logika
print("Data setelah warm-up:")
display(result_df)

# Logic Validation: Tambahkan Kolom is_2min_valid
if not result_df.empty:
    result_df['is_2min_valid'] = result_df.apply(
        lambda row: 1 if (row.get('C1', 0) > row.get('O1', 0) and 
                           row.get('C2', 0) >= row.get('C1', 0) and 
                           row.get('Close Price', 0) >= row.get('C2', 0)) else 0, 
        axis=1
    )

    # Setup tampilan Pandas agar hanya menampilkan 2 desimal
    pd.options.display.float_format = '{:.2f}'.format
    pd.set_option('display.max_columns', None)
    pd.set_option('display.expand_frame_repr', False)
    pd.set_option('display.max_rows', 10)

    # Tampilkan hasil setelah logic validasi
    print("Data setelah logic validasi:")
    display(result_df)
else:
    print("Tidak ada data yang berhasil diproses.")


# PREFLIGHT FOR YFINANCE CONNECTION

In [None]:
# === PREFLIGHT: Cek koneksi ke Yahoo Finance / yfinance sebelum download berat ===
# aman dijalankan berulang
!pip install -q requests yfinance

import socket, ssl, time, json, os
import requests, yfinance as yf
from urllib.parse import urlparse

TIMEOUT = 15  # detik

ENDPOINTS = [
    "https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL",
    "https://query1.finance.yahoo.com/v8/finance/chart/TLKM.JK?range=5d&interval=1d",
]

TEST_SYMBOLS = ["AAPL", "TLKM.JK"]  # 1 global, 1 Indonesia
RESULTS = []

def check_dns(host):
    t0 = time.time()
    try:
        ip = socket.gethostbyname(host)
        return True, ip, (time.time()-t0)
    except Exception as e:
        return False, str(e), (time.time()-t0)

def check_tls(host, port=443):
    t0 = time.time()
    ctx = ssl.create_default_context()
    try:
        with socket.create_connection((host, port), timeout=TIMEOUT) as sock:
            with ctx.wrap_socket(sock, server_hostname=host) as ssock:
                cert = ssock.getpeercert()
        return True, "ok", (time.time()-t0)
    except Exception as e:
        return False, str(e), (time.time()-t0)

def http_probe(url):
    t0 = time.time()
    try:
        r = requests.get(url, timeout=TIMEOUT, allow_redirects=False,
                         headers={"User-Agent":"Mozilla/5.0"})
        return True, {"status": r.status_code, "location": r.headers.get("Location"), "len": len(r.content)}, (time.time()-t0)
    except Exception as e:
        return False, str(e), (time.time()-t0)

def yf_probe(symbol, period="5d", interval="1d"):
    t0 = time.time()
    try:
        df = yf.download(symbol, period=period, interval=interval, progress=False, threads=False, timeout=TIMEOUT)
        ok = not df.empty
        return ok, {"rows": len(df)}, (time.time()-t0)
    except Exception as e:
        return False, str(e), (time.time()-t0)

print("🔎 Preflight: Yahoo Finance connectivity\n")

# 1) DNS + TLS untuk host utama
hosts = {"query1.finance.yahoo.com", "guce.yahoo.com", "www.yahoo.com"}
for h in hosts:
    ok, info, dt = check_dns(h)
    RESULTS.append(("DNS", h, ok, info, dt))
    print(f"DNS  {h:<28} {'OK' if ok else 'FAIL'}  {info}  ({dt:.2f}s)")
for h in hosts:
    ok, info, dt = check_tls(h, 443)
    RESULTS.append(("TLS", h, ok, info, dt))
    print(f"TLS  {h:<28} {'OK' if ok else 'FAIL'}  {info}  ({dt:.2f}s)")

# 2) HTTP GET ke endpoint quote/chart (tanpa yfinance)
for url in ENDPOINTS:
    host = urlparse(url).hostname
    ok, info, dt = http_probe(url)
    RESULTS.append(("HTTP", host, ok, info, dt))
    if ok:
        status = info["status"]
        loc    = info["location"]
        note   = f"status={status}" + (f", redirect→{loc}" if loc else "")
        print(f"HTTP {host:<28} OK    {note}  ({dt:.2f}s)")
    else:
        print(f"HTTP {host:<28} FAIL  {info}  ({dt:.2f}s)")

# 3) Tes ringan via yfinance untuk 2 simbol
for sym in TEST_SYMBOLS:
    ok, info, dt = yf_probe(sym, period="5d", interval="1d")
    RESULTS.append(("YF", sym, ok, info, dt))
    if ok:
        print(f"YF   {sym:<10} OK    rows={info['rows']}  ({dt:.2f}s)")
    else:
        print(f"YF   {sym:<10} FAIL  {info}  ({dt:.2f}s)")

# 4) Rekomendasi cepat berdasarkan hasil
print("\n🩺 Diagnosis & saran:")
dns_fail = [r for r in RESULTS if r[0]=="DNS" and not r[2]]
http_fail= [r for r in RESULTS if r[0]=="HTTP" and not r[2]]
yf_fail  = [r for r in RESULTS if r[0]=="YF" and not r[2]]

if dns_fail:
    print("- DNS gagal untuk:", ", ".join(set([h for _,h,_,_,_ in dns_fail])))
    print("  ➜ Coba ganti DNS (mis. 8.8.8.8/1.1.1.1) atau cek VPN/firewall.")
if http_fail:
    # deteksi redirect ke guce (consent)
    redir_to_guce = any(isinstance(info, dict) and info.get("location","",) and "guce.yahoo.com" in info.get("location","") for _,_,_,info,_ in RESULTS if _=="HTTP")
    if redir_to_guce:
        print("- Ter-redirect ke guce.yahoo.com (consent).")
        print("  ➜ Buka https://www.yahoo.com di browser (set consent/cookies), lalu jalankan ulang.")
    else:
        print("- HTTP ke endpoint Yahoo gagal/time-out.")
        print("  ➜ Cek koneksi umum & coba lagi beberapa menit (bisa throttling).")
if yf_fail and not http_fail:
    print("- yfinance gagal, tapi HTTP oke → kemungkinan rate limit.")
    print("  ➜ Kecilkan batch, tambah jeda, pakai slow mode (1-per-1), atau lanjut saat off-peak.")
if not (dns_fail or http_fail or yf_fail):
    print("- Semua cek OK ✅ — lanjutkan pipeline unduhan.")


# GET ACTIVE EMITEN

In [None]:
# === FINAL CLEANER (pakai config kamu) ===
# progress bar + cache + retry/backoff + slow-mode; output & cache di folder emiten/
!pip install -q yfinance pandas numpy tqdm openpyxl xlrd

import os, re, time, random, warnings
import numpy as np, pandas as pd, yfinance as yf
from datetime import datetime
from pathlib import Path
from IPython.display import display
from tqdm.auto import tqdm
warnings.filterwarnings("ignore")

# ===== CONFIG (ambil punyamu) =====
INPUT_PATH         = "candidates_from_excel.csv"
PREFER_EXCEL       = False
FOLDER_OUT         = "emiten"

LOOKBACK_DAYS      = 30         # cek aktivitas 30 hari terakhir
MIN_NONZERO_DAYS   = 15
MIN_PCT_NONZERO    = 0.50
MAX_CONSEC_ZERO    = 10
MIN_MED_VALUE_90D  = 7.5e9
MIN_PRICE_FLOOR    = 75
MIN_TURNOVER       = 0.0005

DL_PERIOD_FOR_ACTIVITY = 60     # -> '60d'
DL_PERIOD_FOR_LIQ      = 100    # -> '100d'
CHUNK_SIZE_MAIN        = 5
PAUSE_MAIN             = 2.0
CHUNK_SIZE_SECOND      = 10
PAUSE_SECOND           = 1.5
MAX_RETRIES            = 3
RETRY_BACKOFF          = 1.6
TIMEOUT_SEC            = 45
SLOWMODE_COOLDOWN      = 120    # detik; jeda panjang saat rate-limit

# ===== Setup output =====
DATE_TAG = datetime.now().strftime("%Y%m%d")
os.makedirs(FOLDER_OUT, exist_ok=True)
OUTPUT_ACTIVE_CSV  = os.path.join(FOLDER_OUT, f"candidates_active_filtered_{DATE_TAG}.csv")
OUTPUT_FULL_CSV    = os.path.join(FOLDER_OUT, f"candidates_full_with_flags_{DATE_TAG}.csv")

# ===== Helpers =====
def extract_tickers_from_df(df):
    tickers = set()
    for c in df.columns:
        s = df[c].astype(str).str.upper().str.strip()
        extracted = s.str.extract(r'\b([A-Z]{2,5}(?:\.JK)?)\b')[0].dropna()
        for sym in extracted:
            base = sym.replace(".JK","")
            if 2 <= len(base) <= 5 and base.isalpha():
                tickers.add(sym if sym.endswith(".JK") else f"{sym}.JK")
    return sorted(tickers)

def load_candidates(path, prefer_excel=False):
    p = path.lower()
    if not os.path.exists(path):
        raise FileNotFoundError(f"File input tidak ada: {path}")
    if prefer_excel and (p.endswith(".xlsx") or p.endswith(".xls")):
        try:
            engine = "openpyxl" if p.endswith(".xlsx") else "xlrd"
            xl = pd.ExcelFile(path, engine=engine)
            codes=set()
            for sh in xl.sheet_names:
                df = xl.parse(sh)
                codes |= set(extract_tickers_from_df(df))
            return sorted(codes)
        except Exception as e:
            print(f"⚠️ Gagal baca Excel ({e}), fallback CSV…")
    df = pd.read_csv(path)
    cols_lower = [c.lower() for c in df.columns]
    if "ticker" in cols_lower:
        s = df[df.columns[cols_lower.index("ticker")]]
        return extract_tickers_from_df(pd.DataFrame({"ticker": s}))
    if "code" in cols_lower:
        s = df[df.columns[cols_lower.index("code")]]
        return extract_tickers_from_df(pd.DataFrame({"code": s}))
    return extract_tickers_from_df(df)

def _slice_px(px, t, batch_len):
    if isinstance(px.columns, pd.MultiIndex):
        try: return px[t].dropna()
        except Exception: return pd.DataFrame()
    else:
        return px.dropna() if batch_len == 1 else pd.DataFrame()

def _period_arg(period):  # int -> 'Nd'
    return period if isinstance(period, str) else f"{int(period)}d"

def _cache_paths(base_dir, period, ticker):
    cache_dir = Path(base_dir) / f"cache_{period}"
    cache_dir.mkdir(parents=True, exist_ok=True)
    return cache_dir, cache_dir / f"{ticker}.csv"

def _save_cache(df, path_csv):
    try: df.to_csv(path_csv, index=True)
    except Exception: pass

def _load_cache(path_csv):
    try:
        df = pd.read_csv(path_csv, parse_dates=True, index_col=0)
        req = {"Open","High","Low","Close","Volume"}
        return df if req.issubset(df.columns) else pd.DataFrame()
    except Exception:
        return pd.DataFrame()

def download_panel(
    tickers, period_days,
    cache_base="emiten",
    chunk_main=15, pause_main=1.2,
    chunk_second=10, pause_second=1.5,
    max_retries=3, backoff=1.6,
    timeout=45, jitter=0.3,
    slowmode_cooldown=120
):
    period = _period_arg(period_days)
    out = {}
    # cache hit
    cached = 0
    for t in tickers:
        _, cpath = _cache_paths(cache_base, period, t)
        df = _load_cache(cpath)
        if not df.empty:
            out[t] = df; cached += 1
    if cached: print(f"💾 Cache hit: {cached}/{len(tickers)}")

    remaining = [t for t in tickers if t not in out]
    if not remaining: return out

    # main pass batched (threads=False)
    batches = list(range(0, len(remaining), chunk_main))
    pbar = tqdm(batches, desc=f"Downloading {period} OHLCV (main)", unit="batch")
    rate_limited = False

    for start in pbar:
        batch = [t for t in remaining[start:start+chunk_main] if t not in out]
        if not batch: continue
        attempt = 0
        while True:
            try:
                px = yf.download(
                    batch, period=period, interval="1d",
                    auto_adjust=False, group_by="ticker",
                    progress=False, threads=False, timeout=timeout
                )
                break
            except Exception as e:
                attempt += 1
                if attempt > max_retries:
                    rate_limited = True
                    tqdm.write(f"[main] batch failed: {type(e).__name__} → switch to slow mode")
                    px = pd.DataFrame(); break
                sleep_s = pause_main * (backoff ** (attempt-1)) + random.uniform(0, jitter)
                tqdm.write(f"[main] retry {attempt}/{max_retries} after {sleep_s:.1f}s ({type(e).__name__})")
                time.sleep(sleep_s)
        for t in batch:
            try: df = _slice_px(px, t, len(batch))
            except Exception: df = pd.DataFrame()
            if not df.empty:
                out[t] = df
                _, cpath = _cache_paths(cache_base, period, t)
                _save_cache(df, cpath)
        time.sleep(pause_main + random.uniform(0, jitter))

    # slow mode sequential for remaining
    still = [t for t in tickers if t not in out]
    if still:
        if rate_limited:
            tqdm.write(f"🕒 Rate-limited. Cooling down {slowmode_cooldown}s before slow mode…")
            time.sleep(slowmode_cooldown)
        pbar2 = tqdm(still, desc="Slow mode (1-by-1)", unit="ticker")
        for t in pbar2:
            _, cpath = _cache_paths(cache_base, period, t)
            df = _load_cache(cpath)
            if not df.empty:
                out[t] = df; continue
            attempt = 0; pause = pause_main
            while True:
                try:
                    px = yf.download(t, period=period, interval="1d",
                                     auto_adjust=False, progress=False,
                                     threads=False, timeout=timeout)
                    df = _slice_px(px, t, 1); break
                except Exception as e:
                    attempt += 1
                    if attempt > (max_retries + 1):
                        df = pd.DataFrame(); break
                    sleep_s = pause * (backoff ** (attempt-1)) + random.uniform(0, jitter)
                    pbar2.set_postfix_str(f"retry {attempt} ({type(e).__name__})")
                    time.sleep(sleep_s)
            if not df.empty:
                out[t] = df; _save_cache(df, cpath)
            else:
                time.sleep(pause_main + random.uniform(0, jitter))

    missing = [t for t in tickers if t not in out or getattr(out[t], "empty", True)]
    if missing:
        fail_csv = Path(cache_base) / f"failed_{period}_{DATE_TAG}.csv"
        pd.DataFrame({"ticker": missing}).to_csv(fail_csv, index=False)
        print(f"⚠️ Gagal final: {len(missing)} tickers. Disimpan ke {fail_csv}")
    else:
        print("✅ Semua ticker berhasil/cached.")
    return out

def fetch_meta(tickers):
    rows=[]
    for t in tickers:
        tk = yf.Ticker(t)
        try: info = tk.info or {}
        except Exception: info = {}
        rows.append({
            "ticker": t,
            "marketCap": info.get("marketCap"),
            "sector": info.get("sector"),
            "industry": info.get("industry"),
            "sharesOut": info.get("sharesOutstanding") or info.get("floatShares")
        })
    return pd.DataFrame(rows)

# ===== Run =====
CANDIDATES_ALL = load_candidates(INPUT_PATH, prefer_excel=PREFER_EXCEL)
print(f"📥 Total kandidat masuk: {len(CANDIDATES_ALL)}")
if not CANDIDATES_ALL:
    raise SystemExit("Daftar kandidat kosong.")

print("⏬ Unduh OHLCV aktivitas…")
px_act = download_panel(
    CANDIDATES_ALL, DL_PERIOD_FOR_ACTIVITY,
    cache_base=FOLDER_OUT,
    chunk_main=CHUNK_SIZE_MAIN, pause_main=PAUSE_MAIN,
    chunk_second=CHUNK_SIZE_SECOND, pause_second=PAUSE_SECOND,
    max_retries=MAX_RETRIES, backoff=RETRY_BACKOFF,
    timeout=TIMEOUT_SEC, slowmode_cooldown=SLOWMODE_COOLDOWN
)

print("⏬ Unduh OHLCV likuiditas…")
px_liq = download_panel(
    CANDIDATES_ALL, DL_PERIOD_FOR_LIQ,
    cache_base=FOLDER_OUT,
    chunk_main=CHUNK_SIZE_MAIN, pause_main=PAUSE_MAIN,
    chunk_second=CHUNK_SIZE_SECOND, pause_second=PAUSE_SECOND,
    max_retries=MAX_RETRIES, backoff=RETRY_BACKOFF,
    timeout=TIMEOUT_SEC, slowmode_cooldown=SLOWMODE_COOLDOWN
)

print("ℹ️ Ambil metadata…")
meta = fetch_meta(CANDIDATES_ALL)

# ===== Hitung metrik & klasifikasi =====
rows=[]
for t in CANDIDATES_ALL:
    dfa = px_act.get(t, pd.DataFrame())
    dfl = px_liq.get(t, pd.DataFrame())

    last_close = float(dfa["Close"].iloc[-1]) if not dfa.empty else np.nan
    nonzero_days = 0; pct_nonzero = 0.0; max_zero_run = LOOKBACK_DAYS
    if not dfa.empty:
        look = dfa.tail(LOOKBACK_DAYS)
        if not look.empty:
            v = look["Volume"].fillna(0)
            nonzero_days = int((v>0).sum())
            pct_nonzero  = float(nonzero_days/len(look))
            runs = (v==0).astype(int)
            if runs.any():
                grp = (runs != runs.shift()).cumsum()
                max_zero_run = int(runs.groupby(grp).sum().max())
            else:
                max_zero_run = 0

    med_value_90d = 0.0; med_vol_90d = 0.0
    if not dfl.empty:
        val = (dfl["Close"]*dfl["Volume"])
        med_value_90d = float(val.rolling(90).median().dropna().iloc[-1]) if len(val)>=90 else float(val.median())
        med_vol_90d   = float(dfl["Volume"].rolling(90).median().dropna().iloc[-1]) if len(dfl)>=90 else float(dfl["Volume"].median())

    rows.append({
        "ticker": t,
        "last_close": last_close,
        "nonzero_days_30d": nonzero_days,
        "pct_nonzero_30d": round(pct_nonzero,3),
        "max_consec_zero_30d": max_zero_run,
        "med_value_90d": med_value_90d,
        "med_volume_90d": med_vol_90d,
    })

feat = pd.DataFrame(rows)
df = meta.merge(feat, on="ticker", how="right")
df["turnover_med"] = np.where(df["sharesOut"].fillna(0)>0, df["med_volume_90d"]/df["sharesOut"], np.nan)

def classify(r):
    # full zero / sangat tidak aktif (window 30d)
    if r["nonzero_days_30d"] == 0 or r["pct_nonzero_30d"] < 0.1 or r["max_consec_zero_30d"] >= LOOKBACK_DAYS:
        return "SUSPECT_SUSPENDED", "no trading in window"
    reasons=[]
    if not pd.isna(r["last_close"]) and r["last_close"] < MIN_PRICE_FLOOR: reasons.append("price < floor")
    if r["pct_nonzero_30d"] < MIN_PCT_NONZERO: reasons.append("pct_nonzero < threshold")
    if r["nonzero_days_30d"] < MIN_NONZERO_DAYS: reasons.append("nonzero_days < threshold")
    if r["max_consec_zero_30d"] > MAX_CONSEC_ZERO: reasons.append("max zero run too long")
    if r["med_value_90d"] < MIN_MED_VALUE_90D: reasons.append("med_value_90d < min")
    if not pd.isna(r["turnover_med"]) and r["turnover_med"] < MIN_TURNOVER: reasons.append("turnover < min")
    return ("DORMANT", "; ".join(reasons)) if reasons else ("ACTIVE","pass")

lab = df.apply(classify, axis=1, result_type="expand")
lab.columns = ["status","why"]
df = pd.concat([df, lab], axis=1).sort_values(["status","med_value_90d"], ascending=[True,False]).reset_index(drop=True)

# ===== Save =====
df.to_csv(OUTPUT_FULL_CSV, index=False, encoding="utf-8")
active = df[df["status"]=="ACTIVE"][["ticker"]]
active.to_csv(OUTPUT_ACTIVE_CSV, index=False, encoding="utf-8")

print(f"\n✅ Disaring: {len(df)} total | ACTIVE: {len(active)} | DORMANT: {len(df[df.status=='DORMANT'])} | SUSPECT_SUSPENDED: {len(df[df.status=='SUSPECT_SUSPENDED'])}")
print(f"💾 Saved (full flags): {OUTPUT_FULL_CSV}")
print(f"💾 Saved (active-only): {OUTPUT_ACTIVE_CSV}")
display(df.head(20))


# BESOKARA V.1

In [None]:
# ==== ARA PROB (DAILY INCREMENTAL + 1m, single cell) =========================
# Output:
#   emiten/prob_ara_today_YYYYMMDD_HHMM.csv
#   emiten/prob_ara_tomorrow_YYYYMMDD_HHMM.csv
#   emiten/cache_daily/<TICKER>.csv   (incremental append)
#   emiten/cache_1m/<TICKER>.csv      (merge 7d)
#   emiten/cache_5m/<TICKER>.csv      (fallback only, merge 12d)
# ============================================================================
!pip install -q yfinance pandas numpy

import os, glob, time, math, warnings
import numpy as np, pandas as pd, yfinance as yf
from pathlib import Path
from datetime import datetime, timedelta, time as dtime
from IPython.display import display

warnings.filterwarnings("ignore")

# =========================
# CONFIG
# =========================
EMITEN_DIR           = "emiten"
ACTIVE_PREFIX        = "candidates_active_filtered_"   # roster aktif harian
MAX_TICKERS          = 200       # naikan sesuai kebutuhan (700 kalau mau full)

# DAILY cache (append only)
USE_CACHE_DAILY      = True
CACHE_DAILY_DIR      = os.path.join(EMITEN_DIR, "cache_daily")
DAILY_INIT_YEARS     = 5         # initial fetch if cache missing
DAILY_WINDOW_DAYS    = 1500      # simpan ~6 thn; prune saat write

# 1m cache (merge)
USE_CACHE_1M         = True
CACHE_1M_DIR         = os.path.join(EMITEN_DIR, "cache_1m")
CACHE_1M_WINDOW_D    = 7         # Yahoo 1m ≈ 7 hari

# 5m fallback cache (merge)
USE_CACHE_5M         = True
CACHE_5M_DIR         = os.path.join(EMITEN_DIR, "cache_5m")
CACHE_5M_WINDOW_D    = 12

# ARA heuristik (boleh sesuaikan)
def ara_limit_pct(price):
    if price < 200: return 0.35
    if price < 5000: return 0.25
    return 0.20

# Sinyal probabilistik (longgar)
LAST_WINDOW_MIN      = 60   # menit akhir sesi
RECENT_MOM_MIN       = 15   # momentum 15 menit
VOL_SPIKE_MULT       = 1.6
CR_MIN               = 0.70
VIRGIN_HINT          = 0.06
MIN_PRICE            = 50
YF_TIMEOUT           = 45
EXCHANGE_CLOSE_UTC   = (8, 0)  # 15:00 WIB = 08:00 UTC

# =========================
# UTIL & TZ
# =========================
def _now_date_tag(): return datetime.now().strftime("%Y%m%d")
def _now_ts_tag():   return datetime.now().strftime("%Y%m%d_%H%M")

def _utc_naive_index(idx):
    di = pd.to_datetime(idx, utc=True, errors="coerce")
    return pd.DatetimeIndex(di).tz_localize(None)

def minutes_to_close(last_ts):
    if not isinstance(last_ts, pd.Timestamp):
        last_ts = pd.Timestamp(last_ts)
    if getattr(last_ts, "tz", None) is not None:
        last_ts = last_ts.tz_localize(None)
    close_dt = datetime.combine(last_ts.date(), dtime(*EXCHANGE_CLOSE_UTC))
    m = int((close_dt - last_ts).total_seconds() // 60)
    return max(0, min(120, m))

def _latest_file(folder, prefix):
    today = os.path.join(folder, f"{prefix}{_now_date_tag()}.csv")
    if os.path.exists(today): return today
    cand = sorted(glob.glob(os.path.join(folder, f"{prefix}*.csv")))
    if not cand: raise FileNotFoundError(f"Tidak ada file {prefix}*.csv di {folder}")
    return cand[-1]

def _load_tickers(path, col="ticker"):
    df = pd.read_csv(path)
    cl = [c.lower() for c in df.columns]
    assert col in cl, f"Kolom '{col}' tidak ditemukan di {path}"
    s = df[df.columns[cl.index(col)]].astype(str).str.upper().str.strip()
    return s.apply(lambda x: x if x.endswith(".JK") else f"{x}.JK").dropna().drop_duplicates().tolist()

# =========================
# TEST KONEKSI (harian & 1m)
# =========================
def test_yf():
    try:
        d = yf.download("TLKM.JK", period="3mo", interval="1d", progress=False, threads=False, timeout=YF_TIMEOUT)
        m = yf.download("TLKM.JK", period="7d", interval="1m",  progress=False, threads=False, timeout=YF_TIMEOUT)
        if d.empty or m.empty:
            print("⚠️ Koneksi YF lemah (data kosong). Coba ulangi beberapa menit lagi.")
            return False
        print(f"✅ Koneksi OK: daily={len(d)} bar, 1m={len(m)} bar")
        return True
    except Exception as e:
        print(f"❌ Gagal koneksi YF: {e}")
        return False

# =========================
# CACHE I/O GENERIK
# =========================
def _read_cache_csv(path):
    if not os.path.exists(path): return pd.DataFrame()
    try:
        df = pd.read_csv(path, index_col=0)
        df.index = _utc_naive_index(df.index)
        return df
    except Exception:
        return pd.DataFrame()

def _write_cache_csv(path, df, window_days):
    if df.empty: return
    dfx = df.copy()
    dfx.index = _utc_naive_index(dfx.index)
    cutoff = datetime.utcnow() - timedelta(days=window_days+1)
    dfx = dfx[dfx.index >= cutoff]
    dfx = dfx[~dfx.index.duplicated(keep="last")].sort_index()
    Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True)
    Path(path).write_text(dfx.to_csv())

# =========================
# FETCH DAILY (incremental append)
# =========================
def get_daily_incremental(t):
    """Append-only: jika cache ada → unduh start=last_date+1, else initial fetch N years"""
    path = os.path.join(CACHE_DAILY_DIR, f"{t}.csv")
    base = _read_cache_csv(path) if USE_CACHE_DAILY else pd.DataFrame()

    if base.empty:
        fresh = yf.download(t, period=f"{DAILY_INIT_YEARS}y", interval="1d",
                            progress=False, threads=False, timeout=YF_TIMEOUT)
        if fresh.empty: return pd.DataFrame()
        fresh.index = _utc_naive_index(fresh.index)
        if USE_CACHE_DAILY: _write_cache_csv(path, fresh, DAILY_WINDOW_DAYS)
        return fresh

    # incremental
    last_dt = pd.to_datetime(base.index.max())
    start = (last_dt + pd.Timedelta(days=1)).date().isoformat()
    fresh = yf.download(t, start=start, interval="1d",
                        progress=False, threads=False, timeout=YF_TIMEOUT)
    if fresh.empty:
        return base
    fresh.index = _utc_naive_index(fresh.index)
    comb = pd.concat([base, fresh]).sort_index()
    comb = comb[~comb.index.duplicated(keep="last")]
    if USE_CACHE_DAILY: _write_cache_csv(path, comb, DAILY_WINDOW_DAYS)
    return comb

# =========================
# FETCH 1m (merge 7d) + fallback 5m
# =========================
def get_1m_merge(t):
    path = os.path.join(CACHE_1M_DIR, f"{t}.csv")
    base = _read_cache_csv(path) if USE_CACHE_1M else pd.DataFrame()
    fresh = yf.download(t, period="7d", interval="1m",
                        progress=False, threads=False, timeout=YF_TIMEOUT)
    if fresh.empty and base.empty:
        # fallback 5m
        return get_5m_merge(t), "5m"
    if fresh.empty:
        return base, "1m"
    fresh.index = _utc_naive_index(fresh.index)
    if base.empty:
        if USE_CACHE_1M: _write_cache_csv(path, fresh, CACHE_1M_WINDOW_D)
        return fresh, "1m"
    comb = pd.concat([base, fresh]).sort_index()
    comb = comb[~comb.index.duplicated(keep="last")]
    if USE_CACHE_1M: _write_cache_csv(path, comb, CACHE_1M_WINDOW_D)
    return comb, "1m"

def get_5m_merge(t):
    path = os.path.join(CACHE_5M_DIR, f"{t}.csv")
    base = _read_cache_csv(path) if USE_CACHE_5M else pd.DataFrame()
    fresh = yf.download(t, period=f"{CACHE_5M_WINDOW_D}d", interval="5m",
                        progress=False, threads=False, timeout=YF_TIMEOUT)
    if fresh.empty and base.empty: return pd.DataFrame()
    if fresh.empty: return base
    fresh.index = _utc_naive_index(fresh.index)
    if base.empty:
        if USE_CACHE_5M: _write_cache_csv(path, fresh, CACHE_5M_WINDOW_D)
        return fresh
    comb = pd.concat([base, fresh]).sort_index()
    comb = comb[~comb.index.duplicated(keep="last")]
    if USE_CACHE_5M: _write_cache_csv(path, comb, CACHE_5M_WINDOW_D)
    return comb

# =========================
# FEATS: daily context & intraday micro
# =========================
def closing_range_bar(row):
    high = float(row["High"]); low = float(row["Low"]); close = float(row["Close"])
    rng = max(1e-8, high - low)
    return 1.0 - float((high - close)/rng)

def obv_series(df):
    delta = np.sign(df["Close"].diff()).fillna(0.0)
    return (delta * df["Volume"].fillna(0)).cumsum()

def daily_regime_score(d1d):
    if d1d is None or d1d.empty or len(d1d) < 40:
        return 0.5, {}
    df = d1d.dropna().copy()
    df["EMA20"] = df["Close"].ewm(span=20).mean()
    df["EMA50"] = df["Close"].ewm(span=50).mean()
    delta = np.sign(df["Close"].diff()).fillna(0.0)
    obv = (delta * df["Volume"].fillna(0)).cumsum()
    obv_up = bool(len(obv)>5 and (obv.iloc[-1]-obv.iloc[-6])>0)
    last = df.iloc[-1]
    above_ema = bool((last["Close"] > last["EMA20"]) and (last["Close"] > last["EMA50"]))
    hh20 = float(df["High"].rolling(20).max().shift(1).iloc[-1]) if len(df)>20 else np.inf
    breakout20 = bool(last["Close"] > hh20)
    # rVol daily 20
    med20 = df["Volume"].rolling(20).median().iloc[-1]
    rvol_d20 = float(df["Volume"].iloc[-1]/med20) if med20 and med20>0 else 1.0
    # score 0..1
    sc = 0.0
    sc += 0.28 if above_ema else 0.0
    sc += 0.26 if breakout20 else 0.0
    sc += 0.24*min(1.5, rvol_d20)/1.5
    sc += 0.22 if obv_up else 0.0
    sc = float(max(0.0, min(1.0, sc)))
    return sc, dict(above_ema=above_ema, breakout20=breakout20, rvol_d20=round(rvol_d20,2))

def rvol_lastNmin(df_today, N, base_df):
    if df_today.empty or base_df.empty: return np.nan
    recent_vol = float(df_today["Volume"].tail(max(1,N)).sum())
    b = base_df.copy()
    b["date"] = b.index.date
    buckets = [float(g["Volume"].tail(max(1,N)).sum()) for _, g in b.groupby("date")]
    if len(buckets) < 2: return np.nan
    med = float(np.median(buckets))
    return (recent_vol / med) if med > 0 else np.nan

def sigm(x): return 1.0/(1.0+math.exp(-x))

def prob_today(df_day, base_df, ara_pct):
    if df_day.empty or len(df_day) < 10: return 0.0, {}
    o = float(df_day["Open"].iloc[0]); c = float(df_day["Close"].iloc[-1])
    day_roc = (c/o - 1.0) if o>0 else 0.0
    prox, dist = max(0.0, min(1.0, day_roc/ara_pct)), max(0.0, ara_pct - day_roc)
    # momentum 15m
    N = RECENT_MOM_MIN
    if len(df_day) > N:
        c_prev = float(df_day["Close"].iloc[-(N+1)])
        mom_ret = (c/c_prev - 1.0) if c_prev>0 else 0.0
        per_min = mom_ret / N
    else:
        per_min = 0.0
    mleft = minutes_to_close(df_day.index[-1])
    coverage = (per_min*max(1,mleft))/max(1e-6, dist) if dist>0 else 1.0
    coverage = max(0.0, min(1.0, coverage))
    rvol30 = rvol_lastNmin(df_day, 30, base_df)
    rvol30_n = max(0.0, min(3.0, (rvol30 if not np.isnan(rvol30) else 0.0)))/3.0
    last = df_day.iloc[-1]
    cr = closing_range_bar(last)
    hh_before = float(df_day["High"].iloc[:-1].max()) if len(df_day)>1 else float(last["High"])
    breakout = 1.0 if float(last["Close"]) > float(hh_before) else 0.0
    x = (1.6*prox) + (1.4*coverage) + (1.2*rvol30_n) + (1.0*cr) + (0.6*breakout) - 2.2
    p = sigm(x)
    feat = dict(day_roc=round(day_roc,4), prox=round(prox,3), coverage=round(coverage,3),
                rvol30=round(rvol30 if rvol30==rvol30 else 0.0,2), cr=round(cr,3),
                breakout=bool(breakout), mleft=int(mleft))
    return float(p), feat

def prob_tomorrow(df_day, base_df, ara_pct):
    if df_day.empty or len(df_day) < 10: return 0.0, {}
    o = float(df_day["Open"].iloc[0]); c = float(df_day["Close"].iloc[-1])
    day_roc = (c/o - 1.0) if o>0 else 0.0
    prox_t  = max(0.0, min(1.0, day_roc/ara_pct))
    last = df_day.iloc[-1]
    cr = closing_range_bar(last)
    obv = obv_series(df_day)
    obv_slope = float(obv.iloc[-1] - obv.iloc[-31]) if len(obv)>30 else 0.0
    obv_up = 1.0 if obv_slope > 0 else 0.0
    rvol60 = rvol_lastNmin(df_day, 60, base_df)
    rvol60_n = max(0.0, min(3.0, (rvol60 if not np.isnan(rvol60) else 0.0)))/3.0
    N = LAST_WINDOW_MIN
    if len(df_day) > N:
        roc_series = (df_day["Close"]/o - 1.0)
        prev_max = float(roc_series.iloc[:-N].max())
        virgin = 1.0 if (day_roc >= VIRGIN_HINT and prev_max < VIRGIN_HINT) else 0.0
    else:
        virgin = 0.0
    x = (1.5*prox_t) + (1.2*cr) + (1.0*rvol60_n) + (0.8*obv_up) + (0.6*virgin) - 1.8
    p = sigm(x)
    feat = dict(day_roc=round(day_roc,4), prox=round(prox_t,3), rvol60=round(rvol60 if rvol60==rvol60 else 0.0,2),
                cr=round(cr,3), obv_up=bool(obv_up), virgin_hint=bool(virgin))
    return float(p), feat

# =========================
# MAIN
# =========================
# ensure folders
Path(EMITEN_DIR).mkdir(parents=True, exist_ok=True)
Path(CACHE_DAILY_DIR).mkdir(parents=True, exist_ok=True)
Path(CACHE_1M_DIR).mkdir(parents=True, exist_ok=True)
Path(CACHE_5M_DIR).mkdir(parents=True, exist_ok=True)

# roster
active_csv = _latest_file(EMITEN_DIR, ACTIVE_PREFIX)
tickers_all = _load_tickers(active_csv)[:MAX_TICKERS]
print(f"[Roster] {active_csv} | tickers={len(tickers_all)}")

# test koneksi
if not test_yf():
    raise SystemExit("Stop: koneksi yfinance bermasalah.")

rows_today, rows_tmr = [], []

for i, t in enumerate(tickers_all, 1):
    # 1) DAILY incremental (append-only)
    d1d = get_daily_incremental(t)
    if d1d.empty: 
        if i % 20 == 0: print(f"… {i}/{len(tickers_all)} (daily empty: {t})")
        continue

    # 2) 1m (merge 7d) / fallback 5m
    df_all, gran = get_1m_merge(t)
    if df_all.empty:
        if i % 20 == 0: print(f"… {i}/{len(tickers_all)} (intraday empty: {t})")
        continue

    # hari ini
    today_date = df_all.index.max().date()
    df_today = df_all[df_all.index.date == today_date].copy()
    if df_today.empty or len(df_today) < 10:
        continue

    last_close = float(df_today["Close"].iloc[-1])
    if last_close < MIN_PRICE:
        continue

    # daily context (prior multiplier)
    reg_score, _reg = daily_regime_score(d1d)

    # baseline intraday untuk rVol = hari2 sebelum hari ini
    base_intra = df_all[df_all.index.date < today_date].copy()

    # target ARA
    target_pct = ara_limit_pct(last_close)

    # probabilitas
    p_today, feat_t = prob_today(df_today, base_intra, target_pct)
    p_tmr,  feat_m  = prob_tomorrow(df_today, base_intra, target_pct)

    # adjust by daily regime (penopang)
    p_today_adj = min(1.0, p_today * (0.85 + 0.30*reg_score))
    p_tmr_adj   = min(1.0, p_tmr   * (0.80 + 0.40*reg_score))

    rows_today.append({
        "ticker": t, "gran": gran, "price": round(last_close,2),
        "prob_ara_today": round(100*p_today,1),
        "prob_ara_today_adj": round(100*p_today_adj,1),
        "ara_target_pct": round(100*target_pct,1),
        **feat_t, "regime_score": round(reg_score,2)
    })
    rows_tmr.append({
        "ticker": t, "gran": gran, "price": round(last_close,2),
        "prob_ara_tomorrow": round(100*p_tmr,1),
        "prob_ara_tomorrow_adj": round(100*p_tmr_adj,1),
        "ara_target_pct": round(100*target_pct,1),
        **feat_m, "regime_score": round(reg_score,2)
    })

    if i % 25 == 0:
        print(f"… processed {i}/{len(tickers_all)}")
    time.sleep(0.03)

# ranking & save
df_today = pd.DataFrame(rows_today).sort_values(
    ["prob_ara_today_adj","prob_ara_today","price"],
    ascending=[False, False, False]).reset_index(drop=True)
df_tmr = pd.DataFrame(rows_tmr).sort_values(
    ["prob_ara_tomorrow_adj","prob_ara_tomorrow","price"],
    ascending=[False, False, False]).reset_index(drop=True)

ts_tag = _now_ts_tag()
out_today = os.path.join(EMITEN_DIR, f"prob_ara_today_{ts_tag}.csv")
out_tmr   = os.path.join(EMITEN_DIR, f"prob_ara_tomorrow_{ts_tag}.csv")

if not df_today.empty:
    df_today.to_csv(out_today, index=False)
    print(f"✅ Saved → {out_today}")
    display(df_today.head(25))
else:
    print("⚠️ Tidak ada kandidat 'prob ARA hari ini'.")

if not df_tmr.empty:
    df_tmr.to_csv(out_tmr, index=False)
    print(f"✅ Saved → {out_tmr}")
    display(df_tmr.head(25))
else:
    print("⚠️ Tidak ada kandidat 'prob ARA besok'.")
# ============================================================================
# ==== ARA PROB (DAILY INCREMENTAL + 1m, single cell) =========================
# Output:
#   emiten/prob_ara_today_YYYYMMDD_HHMM.csv
#   emiten/prob_ara_tomorrow_YYYYMMDD_HHMM.csv
#   emiten/cache_daily/<TICKER>.csv   (incremental append)
#   emiten/cache_1m/<TICKER>.csv      (merge 7d)
#   emiten/cache_5m/<TICKER>.csv      (fallback only, merge 12d)
# ============================================================================
!pip install -q yfinance pandas numpy

import os, glob, time, math, warnings
import numpy as np, pandas as pd, yfinance as yf
from pathlib import Path
from datetime import datetime, timedelta, time as dtime
from IPython.display import display

warnings.filterwarnings("ignore")

# =========================
# CONFIG
# =========================
EMITEN_DIR           = "emiten"
ACTIVE_PREFIX        = "candidates_active_filtered_"   # roster aktif harian
MAX_TICKERS          = 700       # naikan sesuai kebutuhan (700 kalau mau full)

# DAILY cache (append only)
USE_CACHE_DAILY      = True
CACHE_DAILY_DIR      = os.path.join(EMITEN_DIR, "cache_daily")
DAILY_INIT_YEARS     = 5         # initial fetch if cache missing
DAILY_WINDOW_DAYS    = 1500      # simpan ~6 thn; prune saat write

# 1m cache (merge)
USE_CACHE_1M         = True
CACHE_1M_DIR         = os.path.join(EMITEN_DIR, "cache_1m")
CACHE_1M_WINDOW_D    = 7         # Yahoo 1m ≈ 7 hari

# 5m fallback cache (merge)
USE_CACHE_5M         = True
CACHE_5M_DIR         = os.path.join(EMITEN_DIR, "cache_5m")
CACHE_5M_WINDOW_D    = 12

# ARA heuristik (boleh sesuaikan)
def ara_limit_pct(price):
    if price < 200: return 0.35
    if price < 5000: return 0.25
    return 0.20

# Sinyal probabilistik (longgar)
LAST_WINDOW_MIN      = 60   # menit akhir sesi
RECENT_MOM_MIN       = 15   # momentum 15 menit
VOL_SPIKE_MULT       = 1.6
CR_MIN               = 0.70
VIRGIN_HINT          = 0.06
MIN_PRICE            = 50
YF_TIMEOUT           = 45
EXCHANGE_CLOSE_UTC   = (8, 0)  # 15:00 WIB = 08:00 UTC

# =========================
# UTIL & TZ
# =========================
def _now_date_tag(): return datetime.now().strftime("%Y%m%d")
def _now_ts_tag():   return datetime.now().strftime("%Y%m%d_%H%M")

def _utc_naive_index(idx):
    di = pd.to_datetime(idx, utc=True, errors="coerce")
    return pd.DatetimeIndex(di).tz_localize(None)

def minutes_to_close(last_ts):
    if not isinstance(last_ts, pd.Timestamp):
        last_ts = pd.Timestamp(last_ts)
    if getattr(last_ts, "tz", None) is not None:
        last_ts = last_ts.tz_localize(None)
    close_dt = datetime.combine(last_ts.date(), dtime(*EXCHANGE_CLOSE_UTC))
    m = int((close_dt - last_ts).total_seconds() // 60)
    return max(0, min(120, m))

def _latest_file(folder, prefix):
    today = os.path.join(folder, f"{prefix}{_now_date_tag()}.csv")
    if os.path.exists(today): return today
    cand = sorted(glob.glob(os.path.join(folder, f"{prefix}*.csv")))
    if not cand: raise FileNotFoundError(f"Tidak ada file {prefix}*.csv di {folder}")
    return cand[-1]

def _load_tickers(path, col="ticker"):
    df = pd.read_csv(path)
    cl = [c.lower() for c in df.columns]
    assert col in cl, f"Kolom '{col}' tidak ditemukan di {path}"
    s = df[df.columns[cl.index(col)]].astype(str).str.upper().str.strip()
    return s.apply(lambda x: x if x.endswith(".JK") else f"{x}.JK").dropna().drop_duplicates().tolist()

# =========================
# TEST KONEKSI (harian & 1m)
# =========================
def test_yf():
    try:
        d = yf.download("TLKM.JK", period="3mo", interval="1d", progress=False, threads=False, timeout=YF_TIMEOUT)
        m = yf.download("TLKM.JK", period="7d", interval="1m",  progress=False, threads=False, timeout=YF_TIMEOUT)
        if d.empty or m.empty:
            print("⚠️ Koneksi YF lemah (data kosong). Coba ulangi beberapa menit lagi.")
            return False
        print(f"✅ Koneksi OK: daily={len(d)} bar, 1m={len(m)} bar")
        return True
    except Exception as e:
        print(f"❌ Gagal koneksi YF: {e}")
        return False

# =========================
# CACHE I/O GENERIK
# =========================
def _read_cache_csv(path):
    if not os.path.exists(path): return pd.DataFrame()
    try:
        df = pd.read_csv(path, index_col=0)
        df.index = _utc_naive_index(df.index)
        return df
    except Exception:
        return pd.DataFrame()

def _write_cache_csv(path, df, window_days):
    if df.empty: return
    dfx = df.copy()
    dfx.index = _utc_naive_index(dfx.index)
    cutoff = datetime.utcnow() - timedelta(days=window_days+1)
    dfx = dfx[dfx.index >= cutoff]
    dfx = dfx[~dfx.index.duplicated(keep="last")].sort_index()
    Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True)
    Path(path).write_text(dfx.to_csv())

# =========================
# FETCH DAILY (incremental append)
# =========================
def get_daily_incremental(t):
    """Append-only: jika cache ada → unduh start=last_date+1, else initial fetch N years"""
    path = os.path.join(CACHE_DAILY_DIR, f"{t}.csv")
    base = _read_cache_csv(path) if USE_CACHE_DAILY else pd.DataFrame()

    if base.empty:
        fresh = yf.download(t, period=f"{DAILY_INIT_YEARS}y", interval="1d",
                            progress=False, threads=False, timeout=YF_TIMEOUT)
        if fresh.empty: return pd.DataFrame()
        fresh.index = _utc_naive_index(fresh.index)
        if USE_CACHE_DAILY: _write_cache_csv(path, fresh, DAILY_WINDOW_DAYS)
        return fresh

    # incremental
    last_dt = pd.to_datetime(base.index.max())
    start = (last_dt + pd.Timedelta(days=1)).date().isoformat()
    fresh = yf.download(t, start=start, interval="1d",
                        progress=False, threads=False, timeout=YF_TIMEOUT)
    if fresh.empty:
        return base
    fresh.index = _utc_naive_index(fresh.index)
    comb = pd.concat([base, fresh]).sort_index()
    comb = comb[~comb.index.duplicated(keep="last")]
    if USE_CACHE_DAILY: _write_cache_csv(path, comb, DAILY_WINDOW_DAYS)
    return comb

# =========================
# FETCH 1m (merge 7d) + fallback 5m
# =========================
def get_1m_merge(t):
    path = os.path.join(CACHE_1M_DIR, f"{t}.csv")
    base = _read_cache_csv(path) if USE_CACHE_1M else pd.DataFrame()
    fresh = yf.download(t, period="7d", interval="1m",
                        progress=False, threads=False, timeout=YF_TIMEOUT)
    if fresh.empty and base.empty:
        # fallback 5m
        return get_5m_merge(t), "5m"
    if fresh.empty:
        return base, "1m"
    fresh.index = _utc_naive_index(fresh.index)
    if base.empty:
        if USE_CACHE_1M: _write_cache_csv(path, fresh, CACHE_1M_WINDOW_D)
        return fresh, "1m"
    comb = pd.concat([base, fresh]).sort_index()
    comb = comb[~comb.index.duplicated(keep="last")]
    if USE_CACHE_1M: _write_cache_csv(path, comb, CACHE_1M_WINDOW_D)
    return comb, "1m"

def get_5m_merge(t):
    path = os.path.join(CACHE_5M_DIR, f"{t}.csv")
    base = _read_cache_csv(path) if USE_CACHE_5M else pd.DataFrame()
    fresh = yf.download(t, period=f"{CACHE_5M_WINDOW_D}d", interval="5m",
                        progress=False, threads=False, timeout=YF_TIMEOUT)
    if fresh.empty and base.empty: return pd.DataFrame()
    if fresh.empty: return base
    fresh.index = _utc_naive_index(fresh.index)
    if base.empty:
        if USE_CACHE_5M: _write_cache_csv(path, fresh, CACHE_5M_WINDOW_D)
        return fresh
    comb = pd.concat([base, fresh]).sort_index()
    comb = comb[~comb.index.duplicated(keep="last")]
    if USE_CACHE_5M: _write_cache_csv(path, comb, CACHE_5M_WINDOW_D)
    return comb

# =========================
# FEATS: daily context & intraday micro
# =========================
def closing_range_bar(row):
    high = float(row["High"]); low = float(row["Low"]); close = float(row["Close"])
    rng = max(1e-8, high - low)
    return 1.0 - float((high - close)/rng)

def obv_series(df):
    delta = np.sign(df["Close"].diff()).fillna(0.0)
    return (delta * df["Volume"].fillna(0)).cumsum()

def daily_regime_score(d1d):
    if d1d is None or d1d.empty or len(d1d) < 40:
        return 0.5, {}
    df = d1d.dropna().copy()
    # EMA harian
    df["EMA20"] = df["Close"].ewm(span=20, adjust=False).mean()
    df["EMA50"] = df["Close"].ewm(span=50, adjust=False).mean()

    # OBV harian (pastikan skalarnya)
    delta = np.sign(df["Close"].diff()).fillna(0.0)
    obv = (delta * df["Volume"].fillna(0)).cumsum()
    if len(obv) > 6:
        delta_obv = float(obv.iloc[-1]) - float(obv.iloc[-6])
        obv_up = bool(delta_obv > 0.0)
    else:
        obv_up = False

    # Last values as floats (hindari Series ambiguous)
    last_close = float(df["Close"].iloc[-1])
    ema20_last = float(df["EMA20"].iloc[-1])
    ema50_last = float(df["EMA50"].iloc[-1])

    above_ema = bool((last_close > ema20_last) and (last_close > ema50_last))

    if len(df) > 20:
        hh20 = float(df["High"].rolling(20).max().shift(1).iloc[-1])
        breakout20 = bool(last_close > hh20)
    else:
        breakout20 = False

    # rVol daily 20
    vol_med20 = float(df["Volume"].rolling(20).median().iloc[-1]) if len(df) >= 20 else 0.0
    rvol_d20 = (float(df["Volume"].iloc[-1]) / vol_med20) if vol_med20 > 0 else 1.0

    # Skor 0..1 (penopang)
    sc = 0.0
    sc += 0.28 if above_ema   else 0.0
    sc += 0.26 if breakout20  else 0.0
    sc += 0.24 * min(1.5, rvol_d20) / 1.5
    sc += 0.22 if obv_up      else 0.0
    sc = float(max(0.0, min(1.0, sc)))

    return sc, {
        "above_ema": above_ema,
        "breakout20": breakout20,
        "rvol_d20": round(rvol_d20, 2)
    }

def rvol_lastNmin(df_today, N, base_df):
    if df_today.empty or base_df.empty: return np.nan
    recent_vol = float(df_today["Volume"].tail(max(1,N)).sum())
    b = base_df.copy()
    b["date"] = b.index.date
    buckets = [float(g["Volume"].tail(max(1,N)).sum()) for _, g in b.groupby("date")]
    if len(buckets) < 2: return np.nan
    med = float(np.median(buckets))
    return (recent_vol / med) if med > 0 else np.nan

def sigm(x): return 1.0/(1.0+math.exp(-x))

def prob_today(df_day, base_df, ara_pct):
    if df_day.empty or len(df_day) < 10: return 0.0, {}
    o = float(df_day["Open"].iloc[0]); c = float(df_day["Close"].iloc[-1])
    day_roc = (c/o - 1.0) if o>0 else 0.0
    prox, dist = max(0.0, min(1.0, day_roc/ara_pct)), max(0.0, ara_pct - day_roc)
    # momentum 15m
    N = RECENT_MOM_MIN
    if len(df_day) > N:
        c_prev = float(df_day["Close"].iloc[-(N+1)])
        mom_ret = (c/c_prev - 1.0) if c_prev>0 else 0.0
        per_min = mom_ret / N
    else:
        per_min = 0.0
    mleft = minutes_to_close(df_day.index[-1])
    coverage = (per_min*max(1,mleft))/max(1e-6, dist) if dist>0 else 1.0
    coverage = max(0.0, min(1.0, coverage))
    rvol30 = rvol_lastNmin(df_day, 30, base_df)
    rvol30_n = max(0.0, min(3.0, (rvol30 if not np.isnan(rvol30) else 0.0)))/3.0
    last = df_day.iloc[-1]
    cr = closing_range_bar(last)
    hh_before = float(df_day["High"].iloc[:-1].max()) if len(df_day)>1 else float(last["High"])
    breakout = 1.0 if float(last["Close"]) > float(hh_before) else 0.0
    x = (1.6*prox) + (1.4*coverage) + (1.2*rvol30_n) + (1.0*cr) + (0.6*breakout) - 2.2
    p = sigm(x)
    feat = dict(day_roc=round(day_roc,4), prox=round(prox,3), coverage=round(coverage,3),
                rvol30=round(rvol30 if rvol30==rvol30 else 0.0,2), cr=round(cr,3),
                breakout=bool(breakout), mleft=int(mleft))
    return float(p), feat

def prob_tomorrow(df_day, base_df, ara_pct):
    if df_day.empty or len(df_day) < 10: return 0.0, {}
    o = float(df_day["Open"].iloc[0]); c = float(df_day["Close"].iloc[-1])
    day_roc = (c/o - 1.0) if o>0 else 0.0
    prox_t  = max(0.0, min(1.0, day_roc/ara_pct))
    last = df_day.iloc[-1]
    cr = closing_range_bar(last)
    obv = obv_series(df_day)
    obv_slope = float(obv.iloc[-1] - obv.iloc[-31]) if len(obv)>30 else 0.0
    obv_up = 1.0 if obv_slope > 0 else 0.0
    rvol60 = rvol_lastNmin(df_day, 60, base_df)
    rvol60_n = max(0.0, min(3.0, (rvol60 if not np.isnan(rvol60) else 0.0)))/3.0
    N = LAST_WINDOW_MIN
    if len(df_day) > N:
        roc_series = (df_day["Close"]/o - 1.0)
        prev_max = float(roc_series.iloc[:-N].max())
        virgin = 1.0 if (day_roc >= VIRGIN_HINT and prev_max < VIRGIN_HINT) else 0.0
    else:
        virgin = 0.0
    x = (1.5*prox_t) + (1.2*cr) + (1.0*rvol60_n) + (0.8*obv_up) + (0.6*virgin) - 1.8
    p = sigm(x)
    feat = dict(day_roc=round(day_roc,4), prox=round(prox_t,3), rvol60=round(rvol60 if rvol60==rvol60 else 0.0,2),
                cr=round(cr,3), obv_up=bool(obv_up), virgin_hint=bool(virgin))
    return float(p), feat

# =========================
# MAIN
# =========================
# ensure folders
Path(EMITEN_DIR).mkdir(parents=True, exist_ok=True)
Path(CACHE_DAILY_DIR).mkdir(parents=True, exist_ok=True)
Path(CACHE_1M_DIR).mkdir(parents=True, exist_ok=True)
Path(CACHE_5M_DIR).mkdir(parents=True, exist_ok=True)

# roster
active_csv = _latest_file(EMITEN_DIR, ACTIVE_PREFIX)
tickers_all = _load_tickers(active_csv)[:MAX_TICKERS]
print(f"[Roster] {active_csv} | tickers={len(tickers_all)}")

# test koneksi
if not test_yf():
    raise SystemExit("Stop: koneksi yfinance bermasalah.")

rows_today, rows_tmr = [], []

for i, t in enumerate(tickers_all, 1):
    # 1) DAILY incremental (append-only)
    d1d = get_daily_incremental(t)
    if d1d.empty: 
        if i % 20 == 0: print(f"… {i}/{len(tickers_all)} (daily empty: {t})")
        continue

    # 2) 1m (merge 7d) / fallback 5m
    df_all, gran = get_1m_merge(t)
    if df_all.empty:
        if i % 20 == 0: print(f"… {i}/{len(tickers_all)} (intraday empty: {t})")
        continue

    # hari ini
    today_date = df_all.index.max().date()
    df_today = df_all[df_all.index.date == today_date].copy()
    if df_today.empty or len(df_today) < 10:
        continue

    last_close = float(df_today["Close"].iloc[-1])
    if last_close < MIN_PRICE:
        continue

    # daily context (prior multiplier)
    reg_score, _reg = daily_regime_score(d1d)

    # baseline intraday untuk rVol = hari2 sebelum hari ini
    base_intra = df_all[df_all.index.date < today_date].copy()

    # target ARA
    target_pct = ara_limit_pct(last_close)

    # probabilitas
    p_today, feat_t = prob_today(df_today, base_intra, target_pct)
    p_tmr,  feat_m  = prob_tomorrow(df_today, base_intra, target_pct)

    # adjust by daily regime (penopang)
    p_today_adj = min(1.0, p_today * (0.85 + 0.30*reg_score))
    p_tmr_adj   = min(1.0, p_tmr   * (0.80 + 0.40*reg_score))

    rows_today.append({
        "ticker": t, "gran": gran, "price": round(last_close,2),
        "prob_ara_today": round(100*p_today,1),
        "prob_ara_today_adj": round(100*p_today_adj,1),
        "ara_target_pct": round(100*target_pct,1),
        **feat_t, "regime_score": round(reg_score,2)
    })
    rows_tmr.append({
        "ticker": t, "gran": gran, "price": round(last_close,2),
        "prob_ara_tomorrow": round(100*p_tmr,1),
        "prob_ara_tomorrow_adj": round(100*p_tmr_adj,1),
        "ara_target_pct": round(100*target_pct,1),
        **feat_m, "regime_score": round(reg_score,2)
    })

    if i % 25 == 0:
        print(f"… processed {i}/{len(tickers_all)}")
    time.sleep(0.03)

# ranking & save
df_today = pd.DataFrame(rows_today).sort_values(
    ["prob_ara_today_adj","prob_ara_today","price"],
    ascending=[False, False, False]).reset_index(drop=True)
df_tmr = pd.DataFrame(rows_tmr).sort_values(
    ["prob_ara_tomorrow_adj","prob_ara_tomorrow","price"],
    ascending=[False, False, False]).reset_index(drop=True)

ts_tag = _now_ts_tag()
out_today = os.path.join(EMITEN_DIR, f"prob_ara_today_{ts_tag}.csv")
out_tmr   = os.path.join(EMITEN_DIR, f"prob_ara_tomorrow_{ts_tag}.csv")

if not df_today.empty:
    df_today.to_csv(out_today, index=False)
    print(f"✅ Saved → {out_today}")
    display(df_today.head(25))
else:
    print("⚠️ Tidak ada kandidat 'prob ARA hari ini'.")

if not df_tmr.empty:
    df_tmr.to_csv(out_tmr, index=False)
    print(f"✅ Saved → {out_tmr}")
    display(df_tmr.head(25))
else:
    print("⚠️ Tidak ada kandidat 'prob ARA besok'.")
# ============================================================================


## PATCH MERGE FOLDER INTO LEANER DAILY

In [None]:
# ONE-CELL: Merge daily caches (legacy) → emiten/cache_daily/
from pathlib import Path
import pandas as pd
import shutil
from datetime import datetime

# ==== CONFIG ====
SRC_DIRS = [
    Path("emiten/cache_60d"),
    Path("emiten/cache_100d"),
    Path("emiten/cache_6mo"),
]
DEST_DIR = Path("emiten/cache_daily")

BACKUP_DEST_BEFORE = False   # True → backup folder cache_daily sebelum merge
DELETE_SOURCE_DIRS_AFTER = False  # True → hapus folder sumber setelah merge sukses

# ==== HELPERS ====
def _read_daily_csv(p: Path) -> pd.DataFrame:
    """Baca CSV harian (OHLCV); index = Date (harian), dedupe & sort."""
    try:
        df = pd.read_csv(p, parse_dates=["Date"]).set_index("Date")
    except Exception:
        df = pd.read_csv(p, parse_dates=[0], index_col=0)

    # pastikan datetime index harian (tz-naive)
    if not isinstance(df.index, pd.DatetimeIndex):
        df.index = pd.to_datetime(df.index, errors="coerce")
    if df.index.tz is not None:
        df.index = df.index.tz_convert(None)
    df.index = df.index.normalize()

    keep = [c for c in ["Open","High","Low","Close","Adj Close","Volume"] if c in df.columns]
    if not keep:
        # kalau kolomnya aneh, simpan semua—nanti tetap dedupe per tanggal
        keep = list(df.columns)
    df = df[keep].sort_index()
    df = df[~df.index.duplicated(keep="last")]
    return df

def _safe_backup(dest: Path):
    if not BACKUP_DEST_BEFORE or not dest.exists():
        return None
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup = dest.parent / f"{dest.name}_backup_{ts}"
    if backup.exists():
        shutil.rmtree(backup, ignore_errors=True)
    shutil.copytree(dest, backup)
    return backup

# ==== MAIN ====
def merge_legacy_daily_to_cache_daily():
    DEST_DIR.mkdir(parents=True, exist_ok=True)
    backup_dir = _safe_backup(DEST_DIR)

    from collections import defaultdict
    by_ticker = defaultdict(list)
    total_source_files = 0

    # Kumpulkan semua CSV sumber
    for src in SRC_DIRS:
        if not src.exists():
            continue
        csvs = list(src.glob("*.csv"))
        total_source_files += len(csvs)
        for f in csvs:
            by_ticker[f.stem].append(f)

    tickers_updated = 0
    rows_added_total = 0

    for tkr, files in sorted(by_ticker.items()):
        dest_file = DEST_DIR / f"{tkr}.csv"

        # baca existing dest (jika ada)
        if dest_file.exists():
            try:
                dest = _read_daily_csv(dest_file)
            except Exception:
                dest = pd.DataFrame()
        else:
            dest = pd.DataFrame()

        before = len(dest)

        # merge semua sumber utk ticker ini
        for f in files:
            try:
                src_df = _read_daily_csv(f)
            except Exception:
                continue
            if dest.empty:
                dest = src_df
            else:
                dest = pd.concat([dest, src_df]).sort_index()
                dest = dest[~dest.index.duplicated(keep="last")]

        after = len(dest)
        if after > before:
            out = dest.copy()
            out.index.name = "Date"
            out.to_csv(dest_file)
            rows_added_total += (after - before)
            tickers_updated += 1

    # Opsional: hapus folder sumber
    if DELETE_SOURCE_DIRS_AFTER:
        for src in SRC_DIRS:
            if src.exists():
                shutil.rmtree(src, ignore_errors=True)

    print("✅ Merge selesai")
    print(f"  Source CSV total : {total_source_files}")
    print(f"  Tickers updated  : {tickers_updated}")
    print(f"  Rows added       : {rows_added_total}")
    if backup_dir:
        print(f"  Backup created   : {backup_dir}")

merge_legacy_daily_to_cache_daily()


## BSJP

### v.2.0

In [None]:
# ===================== BSJP v.2.0 — STRICT OFFLINE RUNNER =====================
# Input  : emiten/cache_daily/, emiten/cache_1m/, emiten/cache_5m/, roster aktif
# Output : result/prob_ara_today_YYYYMMDD.csv
#          result/prob_ara_tomorrow_YYYYMMDD.csv
# Note   : TANPA fetch internet; seluruh data dari cache warm-up service.

import os, glob, math, warnings, time
import numpy as np, pandas as pd
from pathlib import Path
from datetime import datetime, timedelta, time as dtime
from IPython.display import display
warnings.filterwarnings("ignore")

# ------------------- CONFIG -------------------
EMITEN_DIR      = "emiten"
RESULTS_DIR     = "result"                     # sejajar dgn emiten/
ACTIVE_PREFIX   = "candidates_active_filtered_"
MAX_TICKERS     = 0                            # 0 = no cap
MIN_PRICE       = 50

# Cache paths & freshness guards
CACHE_DAILY_DIR = os.path.join(EMITEN_DIR, "cache_daily")
CACHE_1M_DIR    = os.path.join(EMITEN_DIR, "cache_1m")
CACHE_5M_DIR    = os.path.join(EMITEN_DIR, "cache_5m")
CACHE_STALE_M_1M= 15                           # menit; hanya untuk warning (tidak fail)
DAILY_MIN_BARS  = 40

# Intraday windows (minute)
WIN_MOM_RECENT  = 15
WIN_RVOL_30M    = 30
WIN_RVOL_60M    = 60

# Exchange anchors (yfinance intraday UTC-naive)
CLOSE_UTC       = (8, 0)   # 15:00 WIB = 08:00 UTC

# Logit weights (longgar)
B_TODAY  = dict(prox=1.6, coverage=1.4, rvol30=1.2, cr=1.0, breakout=0.6, bias=-2.2)
B_TMR    = dict(prox=1.5, cr=1.2, rvol60=1.0, obv=0.8, virgin=0.6, bias=-1.8)

# Prior harian (regime) multipliers → p_adj
REG_TODAY_M = (0.85, 0.30)   # base, slope
REG_TMR_M   = (0.80, 0.40)

# ------------------- UTILS --------------------
def _now_date():      return datetime.now().strftime("%Y%m%d")
def _latest_file(folder, prefix):
    today = os.path.join(folder, f"{prefix}{_now_date()}.csv")
    if os.path.exists(today): return today
    cand = sorted(glob.glob(os.path.join(folder, f"{prefix}*.csv")))
    if not cand: raise FileNotFoundError(f"Tidak ada {prefix}*.csv di {folder}")
    return cand[-1]

def _load_roster(path):
    df = pd.read_csv(path)
    cl = [c.lower() for c in df.columns]
    assert "ticker" in cl, "CSV roster butuh kolom 'ticker'"
    s = df[df.columns[cl.index("ticker")]].astype(str).str.upper().str.strip()
    return s.apply(lambda x: x if x.endswith(".JK") else f"{x}.JK").dropna().drop_duplicates().tolist()

def _utc_naive_index(idx):
    di = pd.to_datetime(idx, utc=True, errors="coerce")
    return pd.DatetimeIndex(di).tz_localize(None)

def _read_cache_csv(path):
    if not os.path.exists(path): return pd.DataFrame()
    try:
        df = pd.read_csv(path, index_col=0)
        df.index = _utc_naive_index(df.index)
        # keep essential cols only
        keep = [c for c in ["Open","High","Low","Close","Volume"] if c in df.columns]
        df = df[keep].dropna(subset=[c for c in ["High","Low","Close"] if c in keep]).sort_index()
        return df[~df.index.duplicated(keep="last")]
    except Exception:
        return pd.DataFrame()

def ara_limit_pct(p):
    if p < 200:   return 0.35
    if p < 5000:  return 0.25
    return 0.20

def sigmoid(x):  return 1.0/(1.0+math.exp(-x))

def minutes_to_close(ts):
    ts = pd.Timestamp(ts)
    if getattr(ts, "tz", None) is not None: ts = ts.tz_localize(None)
    tgt = datetime.combine(ts.date(), dtime(*CLOSE_UTC))
    m = int((tgt - ts).total_seconds() // 60)
    return max(0, min(180, m))

def closing_range(row):  # 0..1 (near high)
    rng = max(1e-9, float(row["High"] - row["Low"]))
    return 1.0 - float((row["High"] - row["Close"]) / rng)

def obv_series(df):
    delta = np.sign(df["Close"].diff()).fillna(0.0)
    return (delta * df["Volume"].fillna(0.0)).cumsum()

def regime_daily(df_daily):
    if df_daily is None or df_daily.empty or len(df_daily)<DAILY_MIN_BARS: return 0.5
    d = df_daily.copy()
    d["EMA20"] = d["Close"].ewm(span=20).mean()
    d["EMA50"] = d["Close"].ewm(span=50).mean()
    above_ema  = 1.0 if (d["Close"].iloc[-1] > d["EMA20"].iloc[-1] and d["Close"].iloc[-1] > d["EMA50"].iloc[-1]) else 0.0
    breakout20 = 1.0 if (len(d)>20 and d["Close"].iloc[-1] > d["High"].rolling(20).max().shift(1).iloc[-1]) else 0.0
    vol_med20  = d["Volume"].rolling(20).median().iloc[-1] if len(d)>=20 else np.nan
    rvol_d20   = (d["Volume"].iloc[-1]/vol_med20) if (isinstance(vol_med20,(int,float)) and vol_med20>0) else 1.0
    rvol_d20   = min(1.5, max(0.0, rvol_d20))
    obv        = obv_series(d)
    obv_up     = 1.0 if (len(obv)>6 and (obv.iloc[-1]-obv.iloc[-6])>0) else 0.0
    sc = 0.28*above_ema + 0.26*breakout20 + 0.24*(rvol_d20/1.5) + 0.22*obv_up
    return float(max(0.0, min(1.0, sc)))

def rvol_lastNbars(df_today, N_bars, df_all_prev_days):
    if df_today.empty or df_all_prev_days.empty: return np.nan
    recent = float(df_today["Volume"].tail(max(1,N_bars)).sum())
    b = df_all_prev_days.copy(); b["date"] = b.index.date
    buckets = [float(g["Volume"].tail(max(1,N_bars)).sum()) for _, g in b.groupby("date")]
    if len(buckets)<2: return np.nan
    med = float(np.median(buckets))
    if med<=0: return np.nan
    return recent/med

# ----------------- PROBS & CONFIDENCE -----------------
def compute_probs_bsJP(df_all, gran_minutes, df_daily):
    # split hari ini vs baseline
    today_date = df_all.index.max().date()
    df_today   = df_all[df_all.index.date == today_date].copy()
    base_intra = df_all[df_all.index.date < today_date].copy()
    if df_today.empty or len(df_today)<(10 if gran_minutes==1 else 3):
        return None  # not enough

    reg = regime_daily(df_daily)

    open0 = float(df_today["Open"].iloc[0])
    last  = df_today.iloc[-1]
    close = float(last["Close"])
    if close < MIN_PRICE: return None

    day_roc = (close/open0 - 1.0) if open0>0 else 0.0
    ara_pct = ara_limit_pct(close)
    prox    = max(0.0, min(1.0, day_roc/ara_pct))
    dist    = max(0.0, ara_pct - day_roc)

    # momentum & coverage
    N_rec   = max(1, int(WIN_MOM_RECENT/gran_minutes))
    if len(df_today) > N_rec:
        c_prev  = float(df_today["Close"].iloc[-(N_rec+1)])
        mom_ret = (close/c_prev - 1.0) if c_prev>0 else 0.0
        per_min = mom_ret / (N_rec*gran_minutes)
    else:
        per_min = 0.0
    mleft   = minutes_to_close(df_today.index[-1])
    coverage= 1.0 if dist==0 else max(0.0, min(1.0, (per_min*max(1,mleft))/dist))

    # rVol
    N30     = max(1, int(WIN_RVOL_30M/gran_minutes))
    N60     = max(1, int(WIN_RVOL_60M/gran_minutes))
    rvol30  = rvol_lastNbars(df_today, N30, base_intra)
    rvol60  = rvol_lastNbars(df_today, N60, base_intra)
    r30n    = min(3.0, max(0.0, rvol30 if not np.isnan(rvol30) else 0.0))/3.0
    r60n    = min(3.0, max(0.0, rvol60 if not np.isnan(rvol60) else 0.0))/3.0

    # tape
    cr      = closing_range(last)
    hh_before = float(df_today["High"].iloc[:-1].max()) if len(df_today)>1 else float(last["High"])
    breakout  = 1.0 if close > hh_before else 0.0

    # OBV micro + virgin hint (untuk besok)
    obv     = obv_series(df_today)
    obv_up  = 1.0 if (len(obv)>N30 and (obv.iloc[-1]-obv.iloc[-N30])>0) else 0.0
    roc_series = (df_today["Close"]/open0 - 1.0)
    prev_max   = float(roc_series.iloc[:-N60].max()) if len(roc_series)>N60 else -1.0
    virgin     = 1.0 if (day_roc>=0.06 and prev_max<0.06) else 0.0

    # logits → probs
    x_today = (B_TODAY["prox"]*prox + B_TODAY["coverage"]*coverage +
               B_TODAY["rvol30"]*r30n + B_TODAY["cr"]*cr +
               B_TODAY["breakout"]*breakout + B_TODAY["bias"])
    p_today = sigmoid(x_today)
    mul_t   = REG_TODAY_M[0] + REG_TODAY_M[1]*reg
    p_today_adj = min(1.0, p_today * mul_t)

    x_tmr = (B_TMR["prox"]*prox + B_TMR["cr"]*cr + B_TMR["rvol60"]*r60n +
             B_TMR["obv"]*obv_up + B_TMR["virgin"]*virgin + B_TMR["bias"])
    p_tmr = sigmoid(x_tmr)
    mul_m = REG_TMR_M[0] + REG_TMR_M[1]*reg
    p_tmr_adj = min(1.0, p_tmr * mul_m)

    # -------- confidence (0..1) & band --------
    # data_quality: gran=1m, bar cukup, fresh <15m, ada baseline
    age_mins = (datetime.utcnow() - df_all.index.max()).total_seconds()/60.0
    data_quality = 0.0
    data_quality += 0.4 if gran_minutes==1 else 0.2
    data_quality += 0.3 if len(df_today) >= (30 if gran_minutes==1 else 6) else 0.1
    data_quality += 0.2 if age_mins <= CACHE_STALE_M_1M else 0.0
    data_quality += 0.1 if not base_intra.empty else 0.0

    # signal_quality: rVol kuat, CR tinggi, breakout valid
    signal_quality = 0.0
    signal_quality += 0.35 if r30n >= 0.5 else (0.15 if r30n >= 0.25 else 0.0)
    signal_quality += 0.25 if r60n >= 0.5 else (0.10 if r60n >= 0.25 else 0.0)
    signal_quality += 0.25 if cr >= 0.6  else 0.10 if cr >= 0.5 else 0.0
    signal_quality += 0.15 if breakout==1.0 else 0.0

    confidence = 0.3*data_quality + 0.4*signal_quality + 0.3*reg
    confidence = float(max(0.0, min(1.0, confidence)))
    band = "Low" if confidence < 0.45 else ("Med" if confidence < 0.7 else "High")

    return {
        # meta
        "gran": f"{gran_minutes}m",
        "price": round(close,2),
        # today
        "day_roc": round(day_roc,4),
        "ara_target_pct": round(100*ara_pct,1),
        "prox": round(prox,3),
        "coverage": round(coverage,3),
        "rvol30": round(rvol30 if not np.isnan(rvol30) else 0.0,2),
        "cr": round(cr,3),
        "breakout": bool(breakout),
        "mleft": int(mleft),
        # tmr
        "rvol60": round(rvol60 if not np.isnan(rvol60) else 0.0,2),
        "obv_up": bool(obv_up),
        "virgin_hint": bool(virgin),
        # probs
        "p_today": round(100*p_today,1),
        "p_today_adj": round(100*p_today_adj,1),
        "p_tmr": round(100*p_tmr,1),
        "p_tmr_adj": round(100*p_tmr_adj,1),
        # regime & confidence
        "regime_score": round(reg,2),
        "confidence": round(confidence,2),
        "confidence_band": band
    }

# ------------------- RUN ----------------------
# ensure folder
Path(RESULTS_DIR).mkdir(parents=True, exist_ok=True)
for p in [EMITEN_DIR, CACHE_DAILY_DIR, CACHE_1M_DIR, CACHE_5M_DIR]:
    Path(p).mkdir(parents=True, exist_ok=True)

# roster
roster_csv = _latest_file(EMITEN_DIR, ACTIVE_PREFIX)
tickers    = _load_roster(roster_csv)
tickers    = tickers if (MAX_TICKERS in [0, None]) else tickers[:MAX_TICKERS]
print(f"[BSJP v2.0] Roster={len(tickers)} | {os.path.basename(roster_csv)}")
print("↪️  Mode: STRICT OFFLINE (baca cache harian & 1m/5m)")

rows_today, rows_tmr = [], []
cnt_ok=cnt_skip_daily=cnt_skip_intraday=0

for i, t in enumerate(tickers, 1):
    # DAILY (from cache only)
    d1d = _read_cache_csv(os.path.join(CACHE_DAILY_DIR, f"{t}.csv"))
    reg = regime_daily(d1d)

    # INTRADAY (1m → fallback 5m)
    df = _read_cache_csv(os.path.join(CACHE_1M_DIR, f"{t}.csv"))
    gran = 1
    if df.empty:
        df = _read_cache_csv(os.path.join(CACHE_5M_DIR, f"{t}.csv"))
        gran = 5
    if df.empty:
        cnt_skip_intraday += 1
        if i % 50 == 0: print(f"… {i}/{len(tickers)} (intraday missing: {t})")
        continue

    feats = compute_probs_bsJP(df, gran, d1d)
    if feats is None:
        cnt_skip_intraday += 1
        continue

    # split ke 2 tabel output
    rec_today = {
        "ticker": t,
        "gran": feats["gran"],
        "price": feats["price"],
        "prob_ara_today": feats["p_today"],
        "prob_ara_today_adj": feats["p_today_adj"],
        "ara_target_pct": feats["ara_target_pct"],
        "day_roc": feats["day_roc"],
        "prox": feats["prox"],
        "coverage": feats["coverage"],
        "rvol30": feats["rvol30"],
        "cr": feats["cr"],
        "breakout": feats["breakout"],
        "mleft": feats["mleft"],
        "regime_score": feats["regime_score"],
        "confidence": feats["confidence"],
        "confidence_band": feats["confidence_band"],
    }
    rows_today.append(rec_today)

    rec_tmr = {
        "ticker": t,
        "gran": feats["gran"],
        "price": feats["price"],
        "prob_ara_tomorrow": feats["p_tmr"],
        "prob_ara_tomorrow_adj": feats["p_tmr_adj"],
        "ara_target_pct": feats["ara_target_pct"],
        "day_roc": feats["day_roc"],
        "prox": feats["prox"],
        "rvol60": feats["rvol60"],
        "cr": feats["cr"],
        "obv_up": feats["obv_up"],
        "virgin_hint": feats["virgin_hint"],
        "regime_score": feats["regime_score"],
        "confidence": feats["confidence"],
        "confidence_band": feats["confidence_band"],
    }
    rows_tmr.append(rec_tmr)

    cnt_ok += 1
    if i % 50 == 0:
        # freshness warn
        age_m = (datetime.utcnow() - df.index.max()).total_seconds()/60.0
        fresh = f"{int(age_m)}m{' (stale!)' if age_m>CACHE_STALE_M_1M else ''}"
        print(f"… processed {i}/{len(tickers)} | ok={cnt_ok} | skip_daily={cnt_skip_daily} | skip_intraday={cnt_skip_intraday} | last_age={fresh}")
    time.sleep(0.01)  # kecilkan/naikkan sesuai beban

# ranking & save (overwrite per tanggal)
cols_today = ["ticker","gran","price","prob_ara_today_adj","prob_ara_today","confidence","confidence_band",
              "ara_target_pct","day_roc","prox","coverage","rvol30","cr","breakout","mleft","regime_score"]
cols_tmr   = ["ticker","gran","price","prob_ara_tomorrow_adj","prob_ara_tomorrow","confidence","confidence_band",
              "ara_target_pct","day_roc","prox","rvol60","cr","obv_up","virgin_hint","regime_score"]

df_today = pd.DataFrame(rows_today)
if df_today.empty:
    df_today = pd.DataFrame(columns=cols_today)
else:
    for c in ["prob_ara_today_adj","prob_ara_today","price","confidence"]:
        if c not in df_today.columns: df_today[c] = np.nan
    df_today = df_today.sort_values(["prob_ara_today_adj","prob_ara_today","confidence","price"],
                                    ascending=[False, False, False, False]).reset_index(drop=True)

df_tmr = pd.DataFrame(rows_tmr)
if df_tmr.empty:
    df_tmr = pd.DataFrame(columns=cols_tmr)
else:
    for c in ["prob_ara_tomorrow_adj","prob_ara_tomorrow","price","confidence"]:
        if c not in df_tmr.columns: df_tmr[c] = np.nan
    df_tmr = df_tmr.sort_values(["prob_ara_tomorrow_adj","prob_ara_tomorrow","confidence","price"],
                                ascending=[False, False, False, False]).reset_index(drop=True)

Path(RESULTS_DIR).mkdir(parents=True, exist_ok=True)
out_today = os.path.join(RESULTS_DIR, f"prob_ara_today_{_now_date()}.csv")
out_tmr   = os.path.join(RESULTS_DIR, f"prob_ara_tomorrow_{_now_date()}.csv")
df_today.to_csv(out_today, index=False)
df_tmr.to_csv(out_tmr, index=False)

print(f"✅ Saved (overwrite) → {out_today} | rows={len(df_today)}")
print(f"✅ Saved (overwrite) → {out_tmr}   | rows={len(df_tmr)}")

# preview
display(df_today.head(15))
display(df_tmr.head(15))
# ============================================================================ 


## DIAG + WARMUP

### v.1.1

In [None]:
# ============================================================
# ALL-IN-ONE WARMUP SERVICE (1 CELL, MANDIRI)
# - Melengkapi cache_daily / cache_1m / cache_5m secara incremental
# - Rolling 1m→trim + downsample ke 5m untuk bagian lama
# - Progress bar (tqdm), error report, smoketest opsional
# - Konsisten UTC-naive (anti tz-naive vs tz-aware)
# ============================================================

# --------------- CONFIG ---------------
from pathlib import Path
import os, time, random, socket, warnings
import pandas as pd

# Folder dasar
BASE_DIR        = Path(".")
EMITEN_DIR      = BASE_DIR / "emiten"
CACHE_DAILY_DIR = EMITEN_DIR / "cache_daily"
CACHE_1M_DIR    = EMITEN_DIR / "cache_1m"
CACHE_5M_DIR    = EMITEN_DIR / "cache_5m"
RESULTS_DIR     = BASE_DIR / "result"

# Roster (opsional). Jika None, otomatis cari file "candidates_active_filtered_*.csv" terbaru.
ROSTER_PATH     = None

# Retensi & fresh rules
DAILY_BOOTSTRAP_Y               = 10     # bootstrap daily saat cache kosong
CACHE_1M_WINDOW_D               = 7      # simpan 1m hanya 7 hari terakhir
CACHE_5M_WINDOW_D               = 12     # simpan 5m 12 hari (sesuaikan 12–60 sesuai kebutuhan)
INTRADAY_1M_FRESH_SLACK_MIN     = 20     # skip fetch 1m jika last_ts sudah dekat "now"
INTRADAY_5M_FRESH_SLACK_MIN     = 30     # skip fetch 5m jika last_ts sudah dekat "now"

# Filter hanya 30 menit awal Sesi-1 & 30 menit akhir Sesi-2 BEI (opsional)
FILTER_SESSION_WINDOWS          = False  # True untuk aktifkan

# yfinance retry/backoff
MAX_RETRIES = 4
BACKOFF_S   = [0.6, 1.5, 3.0, 6.0]       # ~exponential + jitter

# Runner
DO_SMOKETEST  = True
SMOKE_TICKERS = ["BBCA.JK", "ASII.JK"]
PRINT_EVERY   = 100
DESC_LABEL    = "Warmup(all-in-one)"

warnings.filterwarnings("ignore", category=UserWarning)

# --------------- IMPORTS ---------------
try:
    import yfinance as yf
except Exception as e:
    raise RuntimeError("yfinance belum terpasang. Jalankan: pip install yfinance pandas tqdm pytz") from e

try:
    from tqdm import tqdm
    def _iter_progress(it, total, desc): return tqdm(it, total=total, ncols=90, desc=desc)
except Exception:
    def _iter_progress(it, total, desc): return it

# --------------- STATE ---------------
ERRORS = {"dns": [], "missing": [], "timeout": [], "empty": [], "other": []}
STATS  = {
    "tickers": 0,
    "daily_fetch": 0, "daily_skip": 0,
    "m1_fetch": 0, "m1_skip": 0, "m1_fallback_5m": 0,
    "m5_fetch": 0, "m5_skip": 0,
}

# --------------- UTIL WAKTU (PAKSA UTC-NAIVE) ---------------
def _naive(ts):
    ts = pd.Timestamp(ts)
    return ts.tz_convert("UTC").tz_localize(None) if ts.tz is not None else ts

def _now_utc():
    return _naive(pd.Timestamp.utcnow())

def _today_utc():
    return _now_utc().normalize()

def _now_date_tag():
    try:
        import pytz
        tz = pytz.timezone("Asia/Jakarta")
        return pd.Timestamp.now(tz).strftime("%Y%m%d")
    except Exception:
        return pd.Timestamp.utcnow().strftime("%Y%m%d")

# --------------- FILE IO & SANITIZE ---------------
def _idx_naive(df: pd.DataFrame):
    if df is None or len(df)==0: return df
    out = df.copy()
    if not isinstance(out.index, pd.DatetimeIndex):
        out.index = pd.to_datetime(out.index, errors="coerce", utc=True).tz_convert("UTC").tz_localize(None)
    elif out.index.tz is not None:
        out.index = out.index.tz_convert("UTC").tz_localize(None)
    out = out[~out.index.duplicated(keep="last")].sort_index()
    return out

def _sanitize_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    if df is None or len(df)==0: return pd.DataFrame()
    out = _idx_naive(df)
    keep = [c for c in ["Open","High","Low","Close","Adj Close","Volume"] if c in out.columns]
    if not keep: keep = list(out.columns)
    out = out[keep].dropna(how="all")
    return out

def _read_cache_csv(path: Path) -> pd.DataFrame:
    if not path.exists(): return pd.DataFrame()
    try:
        df = pd.read_csv(path, parse_dates=[0], index_col=0)
    except Exception:
        try:
            df = pd.read_csv(path)
            if "Date" in df.columns:
                df = df.set_index("Date")
        except Exception:
            return pd.DataFrame()
    return _sanitize_ohlcv(df)

def _resample_to_5m(df_1m: pd.DataFrame) -> pd.DataFrame:
    if df_1m.empty: return df_1m
    df = _idx_naive(df_1m)
    agg = {"Open":"first","High":"max","Low":"min","Close":"last","Volume":"sum"}
    if "Adj Close" in df.columns: agg["Adj Close"] = "last"
    df5 = df.resample("5min", label="right", closed="right").agg(agg)
    ohlc_cols = [c for c in ["Open","High","Low","Close"] if c in df5.columns]
    df5 = df5.dropna(subset=ohlc_cols, how="all")
    return _sanitize_ohlcv(df5)

def _write_cache_csv(path: Path, df_new: pd.DataFrame, window_days: int|None=None):
    p = Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    df_new = _sanitize_ohlcv(df_new)

    # merge lama + baru
    if p.exists():
        df_old = _read_cache_csv(p)
        df = pd.concat([df_old, df_new]).sort_index()
        df = df[~df.index.duplicated(keep="last")]
    else:
        df = df_new

    now = _now_utc()
    path_lower = str(p).lower().replace("\\","/")
    is_1m   = "cache_1m"   in path_lower
    is_5m   = "cache_5m"   in path_lower
    is_daily= "cache_daily" in path_lower

    # (opsional) filter window sesi BEI
    if FILTER_SESSION_WINDOWS and (is_1m or is_5m) and not df.empty:
        import pytz
        jkt = pytz.timezone("Asia/Jakarta")
        dfl = df.copy()
        dfl.index = dfl.index.tz_localize("UTC").tz_convert(jkt)
        mask = (
            ((dfl.index.time >= pd.Timestamp("09:00").time()) & (dfl.index.time < pd.Timestamp("09:30").time())) |
            ((dfl.index.time >= pd.Timestamp("14:30").time()) & (dfl.index.time <= pd.Timestamp("15:00").time()))
        )
        dfl = dfl[mask]
        dfl.index = dfl.index.tz_convert("UTC").tz_localize(None)
        df = dfl

    if is_1m:
        cutoff_1m = now - pd.Timedelta(days=window_days or CACHE_1M_WINDOW_D)
        older = df[df.index < cutoff_1m]
        tail  = df[df.index >= cutoff_1m]
        # turun ke 5m untuk bagian lama
        if not older.empty:
            df5_add = _resample_to_5m(older)
            p5 = CACHE_5M_DIR / p.name
            if p5.exists(): df5_old = _read_cache_csv(p5)
            else: df5_old = pd.DataFrame()
            df5 = pd.concat([df5_old, df5_add]).sort_index()
            df5 = df5[~df5.index.duplicated(keep="last")]
            cutoff_5m = now - pd.Timedelta(days=CACHE_5M_WINDOW_D)
            df5 = df5[df5.index >= cutoff_5m]
            df5.to_csv(p5)
        tail.to_csv(p)  # simpan tail saja
        return

    if is_5m:
        keep_days = window_days or CACHE_5M_WINDOW_D
        cutoff_5m = now - pd.Timedelta(days=keep_days)
        df = df[df.index >= cutoff_5m]
        df.to_csv(p)
        return

    if is_daily:
        # daily biasanya keep all; jika window_days diberikan, trim
        if window_days:
            cutoff_d = now.normalize() - pd.Timedelta(days=window_days)
            df = df[df.index >= cutoff_d]
        df.to_csv(p)
        return

    # default fallback
    df.to_csv(p)

# --------------- YFINANCE WRAPPER (RETRY) ---------------
def yfdl(ticker, **kw):
    # normalisasi waktu
    if "start" in kw and kw["start"] is not None: kw["start"] = _naive(kw["start"]).to_pydatetime()
    if "end"   in kw and kw["end"]   is not None: kw["end"]   = _naive(kw["end"]).to_pydatetime()
    # argumen kompatibel lintas versi
    kw.pop("timeout", None)
    kw.pop("threads", None)
    kw["progress"]    = False
    kw["auto_adjust"] = False
    kw["actions"]     = False
    kw["group_by"]    = "column"

    for attempt in range(MAX_RETRIES):
        try:
            try: socket.gethostbyname("query2.finance.yahoo.com")
            except Exception: pass
            df = yf.download(ticker, **kw)
            df = _sanitize_ohlcv(df)
            if df is not None and not df.empty:
                return df
        except Exception as e:
            low = str(e).lower()
            if "could not resolve host" in low or "temporary failure in name resolution" in low:
                ERRORS["dns"].append(ticker); break
            elif "timed out" in low:
                ERRORS["timeout"].append(ticker); continue
            elif "no price data found" in low or "possibly delisted" in low:
                ERRORS["missing"].append(ticker); return pd.DataFrame()
            else:
                ERRORS["other"].append(f"{ticker}: {e}")
        time.sleep(BACKOFF_S[min(attempt, len(BACKOFF_S)-1)] + random.random()*0.5)
    ERRORS["empty"].append(ticker)
    return pd.DataFrame()

# --------------- INCREMENTAL FETCHERS ---------------
def _fresh_enough(last_ts, now_ts, slack_min):
    try:
        return _naive(last_ts) >= (_naive(now_ts) - pd.Timedelta(minutes=slack_min))
    except Exception:
        return False

def get_daily_incremental(ticker: str):
    path = CACHE_DAILY_DIR / f"{ticker}.csv"
    base = _read_cache_csv(path)
    today = _today_utc()
    if base.empty:
        fresh = yfdl(ticker, period=f"{DAILY_BOOTSTRAP_Y}y", interval="1d")
        if fresh.empty:
            STATS["daily_skip"] += 1; return base, "skip(empty)"
        _write_cache_csv(path, fresh, window_days=None)
        STATS["daily_fetch"] += 1; return _read_cache_csv(path), "fetch(bootstrap)"
    last_date = _naive(base.index.max()).normalize()
    start, end = _naive(last_date + pd.Timedelta(days=1)), _naive(today + pd.Timedelta(days=1))
    if start > end:
        STATS["daily_skip"] += 1; return base, "skip(fresh)"
    fresh = yfdl(ticker, start=start, end=end, interval="1d")
    if fresh.empty:
        STATS["daily_skip"] += 1; return base, "skip(empty)"
    _write_cache_csv(path, fresh, window_days=None)
    STATS["daily_fetch"] += 1; return _read_cache_csv(path), "fetch(incremental)"

def get_1m_incremental(ticker: str):
    path = CACHE_1M_DIR / f"{ticker}.csv"
    base = _read_cache_csv(path)
    now  = _now_utc().floor("min")
    hzn  = _naive(now - pd.Timedelta(days=CACHE_1M_WINDOW_D))
    if base.empty:
        fresh = yfdl(ticker, period=f"{CACHE_1M_WINDOW_D}d", interval="1m")
        if fresh.empty:
            df5, _ = get_5m_incremental(ticker)
            STATS["m1_fallback_5m"] += 1; return df5, "fallback(5m)"
        _write_cache_csv(path, fresh, window_days=CACHE_1M_WINDOW_D)
        STATS["m1_fetch"] += 1; return _read_cache_csv(path), "fetch(bootstrap)"
    last_ts = _naive(base.index.max())
    if _fresh_enough(last_ts, now, INTRADAY_1M_FRESH_SLACK_MIN):
        _write_cache_csv(path, base, window_days=CACHE_1M_WINDOW_D)
        STATS["m1_skip"] += 1; return base, "skip(fresh)"
    start = max(last_ts + pd.Timedelta(minutes=1), hzn)
    fresh = yfdl(ticker, start=_naive(start), end=_naive(now), interval="1m")
    if fresh.empty:
        _write_cache_csv(path, base, window_days=CACHE_1M_WINDOW_D)
        STATS["m1_skip"] += 1; return base, "skip(empty)"
    _write_cache_csv(path, fresh, window_days=CACHE_1M_WINDOW_D)
    STATS["m1_fetch"] += 1; return _read_cache_csv(path), "fetch(incremental)"

def get_5m_incremental(ticker: str):
    path = CACHE_5M_DIR / f"{ticker}.csv"
    base = _read_cache_csv(path)
    now  = _now_utc().floor("min")
    hzn  = _naive(now - pd.Timedelta(days=CACHE_5M_WINDOW_D))
    if base.empty:
        fresh = yfdl(ticker, period=f"{CACHE_5M_WINDOW_D}d", interval="5m")
        if fresh.empty:
            STATS["m5_skip"] += 1; return base, "skip(empty)"
        _write_cache_csv(path, fresh, window_days=CACHE_5M_WINDOW_D)
        STATS["m5_fetch"] += 1; return _read_cache_csv(path), "fetch(bootstrap)"
    last_ts = _naive(base.index.max())
    if _fresh_enough(last_ts, now, INTRADAY_5M_FRESH_SLACK_MIN):
        _write_cache_csv(path, base, window_days=CACHE_5M_WINDOW_D)
        STATS["m5_skip"] += 1; return base, "skip(fresh)"
    start = max(last_ts + pd.Timedelta(minutes=5), hzn)
    fresh = yfdl(ticker, start=_naive(start), end=_naive(now), interval="5m")
    if fresh.empty:
        _write_cache_csv(path, base, window_days=CACHE_5M_WINDOW_D)
        STATS["m5_skip"] += 1; return base, "skip(empty)"
    _write_cache_csv(path, fresh, window_days=CACHE_5M_WINDOW_D)
    STATS["m5_fetch"] += 1; return _read_cache_csv(path), "fetch(incremental)"

# --------------- TICKER SOURCE ---------------
def _detect_roster_path():
    cand = sorted(EMITEN_DIR.glob("candidates_active_filtered_*.csv"))
    return str(cand[-1]) if cand else None

def _infer_tickers():
    roster = ROSTER_PATH or _detect_roster_path()
    if roster and Path(roster).exists():
        df = pd.read_csv(roster)
        for c in df.columns:
            if c.lower() in ("ticker","symbol","kode","emiten"):
                return df[c].astype(str).str.strip().tolist()
        return df.iloc[:,0].astype(str).str.strip().tolist()
    # fallback: union nama file di cache
    s = set()
    for d in [CACHE_DAILY_DIR, CACHE_1M_DIR, CACHE_5M_DIR]:
        if Path(d).exists():
            for f in Path(d).glob("*.csv"): s.add(f.stem)
    return sorted(s)

# --------------- PREP DIRS ---------------
for d in [CACHE_DAILY_DIR, CACHE_1M_DIR, CACHE_5M_DIR, RESULTS_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# --------------- RUNNER + PROGRESS BAR ---------------
for k in ("dns","missing","timeout","empty","other"): ERRORS[k].clear()
for k in STATS: STATS[k] = 0

tickers = _infer_tickers()
STATS["tickers"] = len(tickers)
print(f"Warmup start • tickers={len(tickers)} • 1m_win={CACHE_1M_WINDOW_D}d • 5m_win={CACHE_5M_WINDOW_D}d • daily_bootstrap={DAILY_BOOTSTRAP_Y}y")
t0 = time.time()

for i, t in enumerate(_iter_progress(tickers, total=len(tickers), desc=DESC_LABEL), 1):
    try:
        _ = get_daily_incremental(t)
        _ = get_1m_incremental(t)
        _ = get_5m_incremental(t)
        if PRINT_EVERY and (i % PRINT_EVERY == 0):
            print(f"[{i}/{len(tickers)}] {t}")
    except Exception as e:
        ERRORS["other"].append(f"{t}: {e}")

# --------------- SAVE ERRORS ---------------
rows = []
for kind, vals in ERRORS.items():
    for v in vals:
        rows.append({"kind": kind, "detail": v})
if rows:
    out = RESULTS_DIR / f"errors_warmup_{_now_date_tag()}.csv"
    pd.DataFrame(rows).drop_duplicates().to_csv(out, index=False)
    print(f"\n[ERRORS] Saved → {out}")

# --------------- SUMMARY ---------------
elapsed = time.time() - t0
print("\n===== SUMMARY =====")
print(f"Tickers         : {STATS['tickers']}")
print(f"Daily   fetch/skip : {STATS['daily_fetch']}/{STATS['daily_skip']}")
print(f"1m      fetch/skip : {STATS['m1_fetch']}/{STATS['m1_skip']}  | fallback→5m: {STATS['m1_fallback_5m']}")
print(f"5m      fetch/skip : {STATS['m5_fetch']}/{STATS['m5_skip']}")
print(f"Errors  dns/missing/timeout/empty/other : "
      f"{len(ERRORS['dns'])}/{len(ERRORS['missing'])}/{len(ERRORS['timeout'])}/{len(ERRORS['empty'])}/{len(ERRORS['other'])}")
print(f"Elapsed         : {elapsed:.2f}s")

# --------------- OPTIONAL SMOKETEST ---------------
if DO_SMOKETEST:
    print("\n=== SMOKETEST ===")
    for _t in SMOKE_TICKERS:
        try:
            print(f"\n>>> {_t}")
            def _show(name, tup):
                if isinstance(tup, tuple) and len(tup)==2: df, st = tup
                else: df, st = tup, "(unknown)"
                print(f"{name:>5s}: {st}")
                try:
                    if isinstance(df, pd.DataFrame) and not df.empty:
                        display(df.tail(3))
                except Exception: pass
            _show("DAILY", get_daily_incremental(_t))
            _show("1m",    get_1m_incremental(_t))
            _show("5m",    get_5m_incremental(_t))
        except Exception as e:
            print(f"[SMOKE ERROR] {_t}: {e}")


### v.1.2

In [None]:
# ===========================================
# PARALLEL RUNNER (anti-timeout) — drop-in
# v 1.2

# ===========================================
import time, concurrent.futures as cf
from pathlib import Path
import pandas as pd

# --- TUNEABLES ---
MAX_WORKERS        = 8          # 6–10 aman; sesuaikan bandwidth/IP kamu
WORKER_TIMEOUT_S   = 15         # hard timeout per ticker task (menghindari nunggu 10s x 3 panggilan)
YF_PAUSE_JITTER_S  = (0.05, 0.25)   # jeda kecil antar ticker di worker (kurangi spike)

# --- helper kecil untuk jitter ---
import random
def _sleep_jitter(a, b):
    time.sleep(random.uniform(a, b))

# --- task per ticker (dipanggil di thread worker) ---
def _warmup_one_ticker(ticker: str):
    """
    Jalankan 3 langkah untuk 1 ticker.
    Return ringkasan tuple (ticker, daily_stat, m1_stat, m5_stat, err_msg_or_None).
    """
    try:
        # DAILY
        d = get_daily_incremental(ticker)
        _sleep_jitter(*YF_PAUSE_JITTER_S)

        # 1m
        m1 = get_1m_incremental(ticker)
        _sleep_jitter(*YF_PAUSE_JITTER_S)

        # 5m
        m5 = get_5m_incremental(ticker)

        # ambil status string kalau pair (df, "stat"), atau "(ok)" bila tidak tersedia
        def _stat(x):
            if isinstance(x, tuple) and len(x) == 2:
                return str(x[1])
            return "(ok)"
        return (ticker, _stat(d), _stat(m1), _stat(m5), None)

    except Exception as e:
        # biar tetap tercatat, worker tidak meledakkan seluruh loop
        return (ticker, None, None, None, str(e))

# --- ambil daftar tickers dari fungsi yang sudah ada ---
tickers = _infer_tickers()
STATS["tickers"] = len(tickers)
print(f"Parallel warmup • tickers={len(tickers)} • workers={MAX_WORKERS} • timeout={WORKER_TIMEOUT_S}s")

t0 = time.time()
from tqdm import tqdm
pbar = tqdm(total=len(tickers), ncols=90, desc="Warmup(parallel)")

futures = []
results = []
errors_local = []

with cf.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    # submit semua ticker
    for tkr in tickers:
        futures.append(ex.submit(_warmup_one_ticker, tkr))

    # ambil hasil satu per satu dengan hard-timeout per future
    for fut in futures:
        try:
            tkr, dstat, m1stat, m5stat, err = fut.result(timeout=WORKER_TIMEOUT_S)
            if err:
                # catat sebagai "timeout/error" umum — tidak menghambat yang lain
                ERRORS["other"].append(f"{tkr}: {err}")
                errors_local.append((tkr, err))
            results.append((tkr, dstat, m1stat, m5stat))
        except cf.TimeoutError:
            # kalau thread macet > WORKER_TIMEOUT_S, kita tandai & lanjut
            ERRORS["timeout"].append("ticker_task")
            errors_local.append((tkr, f"worker-timeout>{WORKER_TIMEOUT_S}s"))
        finally:
            pbar.update(1)

pbar.close()

# --- simpan error report (opsional) ---
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
tag = _now_date_tag()

if errors_local:
    df_err = pd.DataFrame(errors_local, columns=["Ticker", "Error"])
    df_err.drop_duplicates().to_csv(RESULTS_DIR / f"errors_warmup_parallel_{tag}.csv", index=False)
    print(f"\n[ERRORS] Saved → result/errors_warmup_parallel_{tag}.csv")

# --- summary akhir ---
elapsed = time.time() - t0
print("\n===== SUMMARY (parallel) =====")
print(f"Tickers         : {STATS['tickers']}")
print(f"Daily   fetch/skip : {STATS.get('daily_fetch',0)}/{STATS.get('daily_skip',0)}")
print(f"1m      fetch/skip : {STATS.get('m1_fetch',0)}/{STATS.get('m1_skip',0)}  | fallback→5m: {STATS.get('m1_fallback_5m',0)}")
print(f"5m      fetch/skip : {STATS.get('m5_fetch',0)}/{STATS.get('m5_skip',0)}")
print(f"Errors  dns/missing/timeout/empty/other : "
      f"{len(ERRORS['dns'])}/{len(ERRORS['missing'])}/{len(ERRORS['timeout'])}/{len(ERRORS['empty'])}/{len(ERRORS['other'])}")
print(f"Elapsed         : {elapsed:.2f}s")


### v.1.3

In [None]:
# ==== PATCH TZ-SAFE (drop-in untuk v1.3.1) ====
import pandas as pd

def _ensure_aware_utc(ts=None):
    """
    Kembalikan Timestamp yang PASTI tz-aware UTC.
    - Jika input tz-naive -> tz_localize('UTC')
    - Jika input tz-aware -> tz_convert('UTC')
    - Jika None -> pakai sekarang (UTC)
    """
    t = pd.Timestamp.utcnow() if ts is None else pd.Timestamp(ts)
    if t.tz is None:
        return t.tz_localize("UTC")
    return t.tz_convert("UTC")

def _is_jkt_market_time(ts_utc=None):
    import pytz
    jkt = pytz.timezone("Asia/Jakarta")
    t_utc = _ensure_aware_utc(ts_utc)
    t_jkt = t_utc.tz_convert(jkt)
    if t_jkt.weekday() >= 5:   # Sabtu/Minggu
        return False
    tm = t_jkt.time()
    # Sesi reguler BEI
    return (pd.Timestamp("09:00").time() <= tm < pd.Timestamp("11:30").time()) or \
           (pd.Timestamp("13:30").time() <= tm <= pd.Timestamp("15:00").time())

def _is_jkt_session(ts):
    import pytz
    jkt = pytz.timezone("Asia/Jakarta")
    t_utc = _ensure_aware_utc(ts)
    t_jkt = t_utc.tz_convert(jkt)
    if t_jkt.weekday() >= 5:
        return False
    tm = t_jkt.time()
    return (pd.Timestamp("09:00").time() <= tm < pd.Timestamp("11:30").time()) or \
           (pd.Timestamp("13:30").time() <= tm <= pd.Timestamp("15:00").time())


In [None]:
# ============================================================
# v1.3.3 — ALL-IN-ONE
# Warmup (parallel, tz-safe, retry, batch) + Robust Sanity + Toggles
# ============================================================

# ---------- CONFIG ----------
from pathlib import Path
import os, time, random, socket, warnings, math, traceback
import pandas as pd
import numpy as np

# Toggles
DO_WARMUP        = True   # set False untuk skip warmup dan langsung sanity
SANITY_ENABLE    = True   # set False untuk hanya warmup

# Paths
BASE_DIR        = Path(".")
EMITEN_DIR      = BASE_DIR / "emiten"
CACHE_DAILY_DIR = EMITEN_DIR / "cache_daily"
CACHE_1M_DIR    = EMITEN_DIR / "cache_1m"
CACHE_5M_DIR    = EMITEN_DIR / "cache_5m"
RESULTS_DIR     = BASE_DIR / "result"
ROSTER_PATH     = None  # ex: "emiten/candidates_active_filtered_20250813.csv" (opsional)

# Retensi / freshness
DAILY_BOOTSTRAP_Y               = 10
CACHE_1M_WINDOW_D               = 7
CACHE_5M_WINDOW_D               = 12
INTRADAY_1M_FRESH_SLACK_MIN     = 0
INTRADAY_5M_FRESH_SLACK_MIN     = 0
FILTER_SESSION_WINDOWS          = False  # True: simpan hanya 09:00–09:30 & 14:30–15:00 WIB

# yfinance retry/backoff
MAX_RETRIES   = 4
BACKOFF_S     = [0.6, 1.5, 3.0, 6.0]

# Parallel runner
MAX_WORKERS              = 6
WORKER_TIMEOUT_S         = 22
SUBMIT_BATCH             = 128
MAX_RETRY_ROUNDS         = 2
YF_PAUSE_JITTER_S        = (0.08, 0.30)
OFF_HOURS_SKIP_INTRADAY  = False
DESC_LABEL               = "Warmup(parallel v1.3.3)"

# Sanity
SANITY_SAMPLE_SIZE       = 10
CHECK_BEI_HOURS          = True
TOL_PCT_PRICE            = 0.001  # 0.1%
TOL_VOL_DIFF             = 0.02   # 2%

warnings.filterwarnings("ignore", category=UserWarning)

# ---------- DEPENDENCIES ----------
try:
    import yfinance as yf
except Exception as e:
    raise RuntimeError("Install dulu: pip install yfinance pandas requests tqdm pytz") from e

try:
    from tqdm import tqdm
    def _iter_progress(it, total, desc): return tqdm(it, total=total, ncols=90, desc=desc)
except Exception:
    def _iter_progress(it, total, desc): return it

# ---------- GLOBAL STATE ----------
ERRORS = {"dns": [], "missing": [], "timeout": [], "empty": [], "other": []}
STATS  = {
    "tickers": 0,
    "daily_fetch": 0, "daily_skip": 0,
    "m1_fetch": 0, "m1_skip": 0, "m1_fallback_5m": 0,
    "m5_fetch": 0, "m5_skip": 0,
}

# ensure dirs
for d in [CACHE_DAILY_DIR, CACHE_1M_DIR, CACHE_5M_DIR, RESULTS_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# ---------- TIME HELPERS (tz-safe) ----------
def _ensure_aware_utc(ts=None):
    """Return UTC-aware Timestamp regardless of input being naive/aware/None."""
    t = pd.Timestamp.utcnow() if ts is None else pd.Timestamp(ts)
    if t.tz is None:
        return t.tz_localize("UTC")
    return t.tz_convert("UTC")

def _naive(ts):
    t = pd.Timestamp(ts)
    if t.tz is None:
        return t
    return t.tz_convert("UTC").tz_localize(None)

def _now_utc():   return _naive(pd.Timestamp.utcnow())
def _today_utc(): return _now_utc().normalize()

def _now_date_tag():
    try:
        import pytz
        tz = pytz.timezone("Asia/Jakarta")
        return pd.Timestamp.now(tz).strftime("%Y%m%d")
    except Exception:
        return pd.Timestamp.utcnow().strftime("%Y%m%d")

def _now_tag_full():
    try:
        import pytz
        tz = pytz.timezone("Asia/Jakarta")
        return pd.Timestamp.now(tz).strftime("%Y%m%d_%H%M%S")
    except Exception:
        return pd.Timestamp.utcnow().strftime("%Y%m%d_%H%M%S")

# ---------- IO & SANITIZE ----------
def _idx_naive(df: pd.DataFrame):
    if df is None or len(df)==0: return df
    out = df.copy()
    if not isinstance(out.index, pd.DatetimeIndex):
        out.index = pd.to_datetime(out.index, errors="coerce", utc=True).tz_convert("UTC").tz_localize(None)
    elif out.index.tz is not None:
        out.index = out.index.tz_convert("UTC").tz_localize(None)
    return out[~out.index.duplicated(keep="last")].sort_index()

def _sanitize_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    if df is None or len(df)==0: return pd.DataFrame()
    out = _idx_naive(df)
    keep = [c for c in ["Open","High","Low","Close","Adj Close","Volume"] if c in out.columns]
    if not keep: keep = list(out.columns)
    out = out[keep].dropna(how="all")
    return out

def _read_cache_csv(path: Path) -> pd.DataFrame:
    if not path.exists(): return pd.DataFrame()
    try:
        df = pd.read_csv(path, parse_dates=[0], index_col=0)
    except Exception:
        try:
            df = pd.read_csv(path)
            if "Date" in df.columns: df = df.set_index("Date")
        except Exception:
            return pd.DataFrame()
    return _sanitize_ohlcv(df)

def _resample_to_5m(df_1m: pd.DataFrame) -> pd.DataFrame:
    if df_1m.empty: return df_1m
    df = _idx_naive(df_1m)
    agg = {"Open":"first","High":"max","Low":"min","Close":"last","Volume":"sum"}
    if "Adj Close" in df.columns: agg["Adj Close"] = "last"
    out = df.resample("5min", label="right", closed="right").agg(agg)
    ohlc = [c for c in ["Open","High","Low","Close"] if c in out.columns]
    out = out.dropna(subset=ohlc, how="all")
    return _sanitize_ohlcv(out)

def _write_cache_csv(path: Path, df_new: pd.DataFrame, window_days: int|None=None):
    p = Path(path); p.parent.mkdir(parents=True, exist_ok=True)
    df_new = _sanitize_ohlcv(df_new)
    if p.exists():
        df_old = _read_cache_csv(p)
        df = pd.concat([df_old, df_new]).sort_index()
        df = df[~df.index.duplicated(keep="last")]
    else:
        df = df_new

    now = _now_utc()
    path_lower = str(p).lower().replace("\\","/")
    is_1m   = "cache_1m" in path_lower
    is_5m   = "cache_5m" in path_lower
    is_daily= "cache_daily" in path_lower

    # session windows filter (opsional)
    if FILTER_SESSION_WINDOWS and (is_1m or is_5m) and not df.empty:
        import pytz
        jkt = pytz.timezone("Asia/Jakarta")
        dfl = df.copy()
        dfl.index = dfl.index.tz_localize("UTC").tz_convert(jkt)
        mask = (
            ((dfl.index.time >= pd.Timestamp("09:00").time()) & (dfl.index.time < pd.Timestamp("09:30").time())) |
            ((dfl.index.time >= pd.Timestamp("14:30").time()) & (dfl.index.time <= pd.Timestamp("15:00").time()))
        )
        dfl = dfl[mask]
        dfl.index = dfl.index.tz_convert("UTC").tz_localize(None)
        df = dfl

    if is_1m:
        cutoff_1m = now - pd.Timedelta(days=window_days or CACHE_1M_WINDOW_D)
        older = df[df.index < cutoff_1m]
        tail  = df[df.index >= cutoff_1m]
        if not older.empty:
            df5_add = _resample_to_5m(older)
            p5 = CACHE_5M_DIR / p.name
            if p5.exists(): df5_old = _read_cache_csv(p5)
            else: df5_old = pd.DataFrame()
            df5 = pd.concat([df5_old, df5_add]).sort_index()
            df5 = df5[~df5.index.duplicated(keep="last")]
            cutoff_5m = now - pd.Timedelta(days=CACHE_5M_WINDOW_D)
            df5 = df5[df5.index >= cutoff_5m]
            df5.to_csv(p5)
        tail.to_csv(p); return

    if is_5m:
        keep_days = window_days or CACHE_5M_WINDOW_D
        cutoff_5m = now - pd.Timedelta(days=keep_days)
        df = df[df.index >= cutoff_5m]
        df.to_csv(p); return

    if is_daily:
        if window_days:
            cutoff_d = now.normalize() - pd.Timedelta(days=window_days)
            df = df[df.index >= cutoff_d]
        df.to_csv(p); return

    df.to_csv(p)

# ---------- yfinance via requests.Session (no curl) ----------
os.environ["YF_USE_CURL"] = "0"
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

_YF_SESSION = requests.Session()
retries = Retry(
    total=5, connect=5, read=5,
    backoff_factor=0.6,
    status_forcelist=(429,500,502,503,504),
    allowed_methods=False
)
adapter = HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=20)
_YF_SESSION.mount("https://", adapter)
_YF_SESSION.mount("http://", adapter)
_YF_SESSION.headers.update({"User-Agent": "Mozilla/5.0 (warmup-service)"})
import importlib; importlib.reload(yf)

def yfdl(ticker, **kw):
    if "start" in kw and kw["start"] is not None: kw["start"] = _naive(kw["start"]).to_pydatetime()
    if "end"   in kw and kw["end"]   is not None: kw["end"]   = _naive(kw["end"]).to_pydatetime()
    kw.pop("threads", None)
    kw["progress"]    = False
    kw["auto_adjust"] = False
    kw["actions"]     = False
    kw["group_by"]    = "column"
    kw["session"]     = _YF_SESSION
    kw.setdefault("timeout", 20)

    for attempt in range(MAX_RETRIES):
        try:
            try: socket.gethostbyname("query2.finance.yahoo.com")
            except Exception: pass
            df = yf.download(ticker, **kw)
            df = _sanitize_ohlcv(df)
            if df is not None and not df.empty:
                return df
        except Exception as e:
            low = str(e).lower()
            if "could not resolve host" in low or "temporary failure in name resolution" in low:
                ERRORS["dns"].append(ticker); break
            elif "timed out" in low:
                ERRORS["timeout"].append(ticker)
            elif "no price data found" in low or "possibly delisted" in low:
                ERRORS["missing"].append(ticker); return pd.DataFrame()
            else:
                ERRORS["other"].append(f"{ticker}: {e}")
        time.sleep(BACKOFF_S[min(attempt, len(BACKOFF_S)-1)] + random.random()*0.8)
    ERRORS["empty"].append(ticker)
    return pd.DataFrame()

# ---------- FRESH CHECK ----------
def _fresh_enough(last_ts, now_ts, slack_min):
    try:
        return _naive(last_ts) >= (_naive(now_ts) - pd.Timedelta(minutes=slack_min))
    except Exception:
        return False

# ---------- INCREMENTAL FETCHERS ----------
def get_daily_incremental(ticker: str):
    path = CACHE_DAILY_DIR / f"{ticker}.csv"
    base = _read_cache_csv(path)
    today = _today_utc()
    if base.empty:
        fresh = yfdl(ticker, period=f"{DAILY_BOOTSTRAP_Y}y", interval="1d")
        if fresh.empty:
            STATS["daily_skip"] += 1; return base, "skip(empty)"
        _write_cache_csv(path, fresh, window_days=None)
        STATS["daily_fetch"] += 1; return _read_cache_csv(path), "fetch(bootstrap)"
    last_date = _naive(base.index.max()).normalize()
    start, end = _naive(last_date + pd.Timedelta(days=1)), _naive(today + pd.Timedelta(days=1))
    if start > end:
        STATS["daily_skip"] += 1; return base, "skip(fresh)"
    fresh = yfdl(ticker, start=start, end=end, interval="1d")
    if fresh.empty:
        STATS["daily_skip"] += 1; return base, "skip(empty)"
    _write_cache_csv(path, fresh, window_days=None)
    STATS["daily_fetch"] += 1; return _read_cache_csv(path), "fetch(incremental)"

def get_1m_incremental(ticker: str):
    path = CACHE_1M_DIR / f"{ticker}.csv"
    base = _read_cache_csv(path)
    now  = _now_utc().floor("min")
    hzn  = _naive(now - pd.Timedelta(days=CACHE_1M_WINDOW_D))
    if base.empty:
        fresh = yfdl(ticker, period=f"{CACHE_1M_WINDOW_D}d", interval="1m")
        if fresh.empty:
            df5, _ = get_5m_incremental(ticker)
            STATS["m1_fallback_5m"] += 1; return df5, "fallback(5m)"
        _write_cache_csv(path, fresh, window_days=CACHE_1M_WINDOW_D)
        STATS["m1_fetch"] += 1; return _read_cache_csv(path), "fetch(bootstrap)"
    last_ts = _naive(base.index.max())
    if _fresh_enough(last_ts, now, INTRADAY_1M_FRESH_SLACK_MIN):
        _write_cache_csv(path, base, window_days=CACHE_1M_WINDOW_D)
        STATS["m1_skip"] += 1; return base, "skip(fresh)"
    start = max(last_ts + pd.Timedelta(minutes=1), hzn)
    fresh = yfdl(ticker, start=_naive(start), end=_naive(now), interval="1m")
    if fresh.empty:
        _write_cache_csv(path, base, window_days=CACHE_1M_WINDOW_D)
        STATS["m1_skip"] += 1; return base, "skip(empty)"
    _write_cache_csv(path, fresh, window_days=CACHE_1M_WINDOW_D)
    STATS["m1_fetch"] += 1; return _read_cache_csv(path), "fetch(incremental)"

def get_5m_incremental(ticker: str):
    path = CACHE_5M_DIR / f"{ticker}.csv"
    base = _read_cache_csv(path)
    now  = _now_utc().floor("min")
    hzn  = _naive(now - pd.Timedelta(days=CACHE_5M_WINDOW_D))
    if base.empty:
        fresh = yfdl(ticker, period=f"{CACHE_5M_WINDOW_D}d", interval="5m")
        if fresh.empty:
            STATS["m5_skip"] += 1; return base, "skip(empty)"
        _write_cache_csv(path, fresh, window_days=CACHE_5M_WINDOW_D)
        STATS["m5_fetch"] += 1; return _read_cache_csv(path), "fetch(bootstrap)"
    last_ts = _naive(base.index.max())
    if _fresh_enough(last_ts, now, INTRADAY_5M_FRESH_SLACK_MIN):
        _write_cache_csv(path, base, window_days=CACHE_5M_WINDOW_D)
        STATS["m5_skip"] += 1; return base, "skip(fresh)"
    start = max(last_ts + pd.Timedelta(minutes=5), hzn)
    fresh = yfdl(ticker, start=_naive(start), end=_naive(now), interval="5m")
    if fresh.empty:
        _write_cache_csv(path, base, window_days=CACHE_5M_WINDOW_D)
        STATS["m5_skip"] += 1; return base, "skip(empty)"
    _write_cache_csv(path, fresh, window_days=CACHE_5M_WINDOW_D)
    STATS["m5_fetch"] += 1; return _read_cache_csv(path), "fetch(incremental)"

# ---------- TICKER SOURCE ----------
def _detect_roster_path():
    cand = sorted(EMITEN_DIR.glob("candidates_active_filtered_*.csv"))
    return str(cand[-1]) if cand else None

def _infer_tickers():
    roster = ROSTER_PATH or _detect_roster_path()
    if roster and Path(roster).exists():
        df = pd.read_csv(roster)
        for c in df.columns:
            if c.lower() in ("ticker","symbol","kode","emiten"):
                return df[c].astype(str).str.strip().tolist()
        return df.iloc[:,0].astype(str).str.strip().tolist()
    s = set()
    for d in [CACHE_DAILY_DIR, CACHE_1M_DIR, CACHE_5M_DIR]:
        if Path(d).exists():
            for f in Path(d).glob("*.csv"): s.add(f.stem)
    return sorted(s)

# ---------- MARKET-TIME GUARD (TZ-SAFE) ----------
def _is_jkt_market_time(ts_utc=None):
    import pytz
    jkt = pytz.timezone("Asia/Jakarta")
    t_utc = _ensure_aware_utc(ts_utc)        # tz-safe
    t_jkt = t_utc.tz_convert(jkt)
    if t_jkt.weekday() >= 5: return False
    tm = t_jkt.time()
    return (pd.Timestamp("09:00").time() <= tm < pd.Timestamp("11:30").time()) or \
           (pd.Timestamp("13:30").time() <= tm <= pd.Timestamp("15:00").time())

def _prewarm_dns():
    for host in ("query1.finance.yahoo.com","query2.finance.yahoo.com"):
        try: socket.gethostbyname(host)
        except: pass

def _sleep_jitter(a,b): time.sleep(random.uniform(a,b))
def _stat_label(x): return (str(x[1]) if isinstance(x, tuple) and len(x)==2 else "(ok)")

# ---------- PARALLEL RUNNER v1.3.3 ----------
import concurrent.futures as cf
try:
    from tqdm import tqdm
    def _tqdm(x, **k): return tqdm(x, **k)
except Exception:
    def _tqdm(x, **k): return x

def _warmup_one_ticker(ticker: str, do_intraday: bool):
    try:
        d = get_daily_incremental(ticker)
        _sleep_jitter(*YF_PAUSE_JITTER_S)
        if do_intraday:
            m1 = get_1m_incremental(ticker)
            _sleep_jitter(*YF_PAUSE_JITTER_S)
            m5 = get_5m_incremental(ticker)
        else:
            m1 = "(skipped-offhours)"; m5 = "(skipped-offhours)"
        return (ticker, _stat_label(d), _stat_label(m1), _stat_label(m5), None)
    except Exception as e:
        return (ticker, None, None, None, str(e))

def run_parallel_warmup_v133(tickers):
    RESULTS_DIR.mkdir(parents=True, exist_ok=True)
    all_rows, all_errs = [], []
    rounds = MAX_RETRY_ROUNDS + 1
    remain = list(tickers)
    t0 = time.time()

    for r in range(rounds):
        if not remain: break
        do_intraday = (not OFF_HOURS_SKIP_INTRADAY) or _is_jkt_market_time()
        label = f"{DESC_LABEL} R{r}{' (intraday)' if do_intraday else ' (daily-only)'}"
        print(f"\nRound {r+1}/{rounds} • tickers={len(remain)} • workers={MAX_WORKERS} • timeout={WORKER_TIMEOUT_S}s • {('INTRA ON' if do_intraday else 'INTRA OFF')}")
        _prewarm_dns()

        failed_this = []
        pbar = tqdm(total=len(remain), ncols=90, desc=label)

        batches = math.ceil(len(remain)/SUBMIT_BATCH)
        idx = 0
        for b in range(batches):
            batch = remain[idx: idx+SUBMIT_BATCH]; idx += SUBMIT_BATCH
            futs = []
            with cf.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
                for tkr in batch:
                    futs.append(ex.submit(_warmup_one_ticker, tkr, do_intraday))
                for fut in futs:
                    try:
                        tkr, dstat, m1stat, m5stat, err = fut.result(timeout=WORKER_TIMEOUT_S)
                        all_rows.append({"round": r, "ticker": tkr, "daily": dstat, "m1": m1stat, "m5": m5stat, "error": err or ""})
                        if err:
                            ERRORS["other"].append(f"{tkr}: {err}")
                            all_errs.append((tkr, err, r))
                            failed_this.append(tkr)
                    except cf.TimeoutError:
                        msg = f"worker-timeout>{WORKER_TIMEOUT_S}s"
                        ERRORS["timeout"].append("ticker_task")
                        all_rows.append({"round": r, "ticker": "?", "daily": "", "m1": "", "m5": "", "error": msg})
                        all_errs.append(("?", msg, r))
                    finally:
                        pbar.update(1)
            _sleep_jitter(0.3, 0.8)
        pbar.close()
        remain = sorted(set(t for t in failed_this if t and t.endswith(".JK")))

    tag = _now_tag_full()
    if all_rows:
        pd.DataFrame(all_rows).to_csv(RESULTS_DIR / f"summary_parallel_{tag}.csv", index=False)
        print(f"[SUMMARY CSV] result/summary_parallel_{tag}.csv")
    if all_errs:
        pd.DataFrame(all_errs, columns=["Ticker","Error","Round"]).drop_duplicates().to_csv(RESULTS_DIR / f"errors_parallel_{tag}.csv", index=False)
        print(f"[ERRORS CSV]  result/errors_parallel_{tag}.csv")

    elapsed = time.time() - t0
    print("\n===== SUMMARY (parallel v1.3.3) =====")
    print(f"Tickers total   : {STATS.get('tickers', len(tickers))}")
    print(f"Daily   fetch/skip : {STATS.get('daily_fetch',0)}/{STATS.get('daily_skip',0)}")
    print(f"1m      fetch/skip : {STATS.get('m1_fetch',0)}/{STATS.get('m1_skip',0)}  | fallback→5m: {STATS.get('m1_fallback_5m',0)}")
    print(f"5m      fetch/skip : {STATS.get('m5_fetch',0)}/{STATS.get('m5_skip',0)}")
    print(f"Errors  dns/missing/timeout/empty/other : "
          f"{len(ERRORS['dns'])}/{len(ERRORS['missing'])}/{len(ERRORS['timeout'])}/{len(ERRORS['empty'])}/{len(ERRORS['other'])}")
    print(f"Elapsed         : {elapsed:.2f}s")
    return all_rows, tag

# ---------- ROBUST SANITY ----------
def _safe_display(df):
    try:
        display(df)
    except Exception:
        print(df.head(20).to_string())

def _cols_ok(df):
    if df is None or df.empty: return []
    known = ["Open","High","Low","Close","Adj Close","Volume"]
    return [c for c in known if c in df.columns]

def _age_days_safe(df):
    if df is None or df.empty: return None
    try:
        last = df.index.max()
        if pd.isna(last): return None
        return int((pd.Timestamp.utcnow().normalize() - pd.Timestamp(last).normalize()).days)
    except Exception:
        return None

def _is_jkt_session(ts):
    import pytz
    jkt = pytz.timezone("Asia/Jakarta")
    t_utc = _ensure_aware_utc(ts)      # tz-safe
    t_jkt = t_utc.tz_convert(jkt)
    if t_jkt.weekday() >= 5: return False
    tm = t_jkt.time()
    return (pd.Timestamp("09:00").time() <= tm < pd.Timestamp("11:30").time()) or \
           (pd.Timestamp("13:30").time() <= tm <= pd.Timestamp("15:00").time())

def _resample_1m_to_5m_safe(df_1m: pd.DataFrame):
    if df_1m is None or df_1m.empty: return pd.DataFrame()
    df = df_1m.copy()
    agg = {}
    if "Open" in df.columns:  agg["Open"]  = "first"
    if "High" in df.columns:  agg["High"]  = "max"
    if "Low" in df.columns:   agg["Low"]   = "min"
    if "Close" in df.columns: agg["Close"] = "last"
    if "Volume" in df.columns:agg["Volume"]= "sum"
    if "Adj Close" in df.columns: agg["Adj Close"] = "last"
    if not agg:
        return pd.DataFrame()
    out = df.resample("5min", label="right", closed="right").agg(agg)
    price_cols = [c for c in ["Open","High","Low","Close"] if c in out.columns]
    if price_cols:
        out = out.dropna(subset=price_cols, how="all")
    return _idx_naive(out)

def run_sanity_random_robust(sample_size=SANITY_SAMPLE_SIZE, tol_price=TOL_PCT_PRICE, tol_vol=TOL_VOL_DIFF):
    # Kumpulkan tickers dari cache yang ada
    s = set()
    for d in [CACHE_DAILY_DIR, CACHE_1M_DIR, CACHE_5M_DIR]:
        if Path(d).exists():
            for f in Path(d).glob("*.csv"):
                s.add(f.stem)
    tickers = sorted(s)
    if not tickers:
        print("⚠️ Tidak ada file di cache_* untuk sanity.")
        return pd.DataFrame()

    sample = tickers if len(tickers) <= sample_size else random.sample(tickers, sample_size)

    rows = []
    for tkr in sample:
        pD, p1, p5 = CACHE_DAILY_DIR/f"{tkr}.csv", CACHE_1M_DIR/f"{tkr}.csv", CACHE_5M_DIR/f"{tkr}.csv"
        dfD, df1, df5 = _read_cache_csv(pD), _read_cache_csv(p1), _read_cache_csv(p5)

        d_cols = _cols_ok(dfD)
        d_dups = int(dfD.index.duplicated().sum()) if not dfD.empty else 0
        d_mono = bool(dfD.index.is_monotonic_increasing) if not dfD.empty else True
        d_last = str(dfD.index.max()) if not dfD.empty else ""
        d_age  = _age_days_safe(dfD)
        d_vol0 = int((dfD["Volume"]==0).sum()) if ("Volume" in dfD.columns and not dfD.empty) else 0
        d_na   = {c: float(dfD[c].isna().mean()) for c in dfD.columns} if not dfD.empty else {}

        m1_cols = _cols_ok(df1)
        m1_dups = int(df1.index.duplicated().sum()) if not df1.empty else 0
        m1_mono = bool(df1.index.is_monotonic_increasing) if not df1.empty else True
        m1_last = str(df1.index.max()) if not df1.empty else ""
        m1_rows = len(df1)
        m1_outside = int((~df1.index.to_series().map(_is_jkt_session)).sum()) if (CHECK_BEI_HOURS and not df1.empty) else 0
        m1_vol0 = int((df1["Volume"]==0).sum()) if ("Volume" in df1.columns and not df1.empty) else 0
        m1_na   = {c: float(df1[c].isna().mean()) for c in df1.columns} if not df1.empty else {}

        m5_cols = _cols_ok(df5)
        m5_dups = int(df5.index.duplicated().sum()) if not df5.empty else 0
        m5_mono = bool(df5.index.is_monotonic_increasing) if not df5.empty else True
        m5_last = str(df5.index.max()) if not df5.empty else ""
        m5_rows = len(df5)
        m5_outside = int((~df5.index.to_series().map(_is_jkt_session)).sum()) if (CHECK_BEI_HOURS and not df5.empty) else 0
        m5_vol0 = int((df5["Volume"]==0).sum()) if ("Volume" in df5.columns and not df5.empty) else 0
        m5_na   = {c: float(df5[c].isna().mean()) for c in df5.columns} if not df5.empty else {}

        # Konsistensi 1m->5m (hari terakhir)
        res_ok, mism_price, mism_vol = "", None, None
        if not df1.empty and not df5.empty:
            try:
                last_day = pd.Timestamp(df1.index.max()).normalize()
                d1 = df1.loc[df1.index.normalize() == last_day]
                r5 = _resample_1m_to_5m_safe(d1)
                d5 = df5.loc[df5.index.normalize() == last_day]
                idx = r5.index.intersection(d5.index)
                if not idx.empty:
                    r5i, d5i = r5.loc[idx], d5.loc[idx]
                    mism_price = False
                    for col in [c for c in ["Open","High","Low","Close"] if c in r5i.columns and c in d5i.columns]:
                        base = d5i[col].replace(0, np.nan).astype(float)
                        diff = (r5i[col].astype(float) - d5i[col].astype(float)).abs() / base
                        if diff.dropna().gt(tol_price).any():
                            mism_price = True; break
                    mism_vol = False
                    if "Volume" in r5i.columns and "Volume" in d5i.columns:
                        basev = d5i["Volume"].replace(0, np.nan).astype(float)
                        diffv = (r5i["Volume"].astype(float) - d5i["Volume"].astype(float)).abs() / basev
                        if diffv.dropna().gt(tol_vol).any():
                            mism_vol = True
                    res_ok = "OK" if (not mism_price and not mism_vol) else "MISMATCH"
                else:
                    res_ok = "NO_OVERLAP"
            except Exception as e:
                res_ok = f"CHECK_ERROR: {e}"

        rows.append({
            "ticker": tkr,
            "daily_rows": len(dfD), "daily_last": d_last, "daily_age_days": d_age,
            "daily_cols": ",".join(d_cols), "daily_monotonic": d_mono, "daily_dups": d_dups,
            "daily_vol_zero": d_vol0, "daily_na_%": round(sum(d_na.values())/max(1,len(d_na))*100,3) if d_na else None,
            "m1_rows": m1_rows, "m1_last": m1_last, "m1_cols": ",".join(m1_cols), "m1_monotonic": m1_mono,
            "m1_dups": m1_dups, "m1_outside_session": m1_outside, "m1_vol_zero": m1_vol0,
            "m1_na_%": round(sum(m1_na.values())/max(1,len(m1_na))*100,3) if m1_na else None,
            "m5_rows": m5_rows, "m5_last": m5_last, "m5_cols": ",".join(m5_cols), "m5_monotonic": m5_mono,
            "m5_dups": m5_dups, "m5_outside_session": m5_outside, "m5_vol_zero": m5_vol0,
            "m5_na_%": round(sum(m5_na.values())/max(1,len(m5_na))*100,3) if m5_na else None,
            "consistency_1m_to_5m": res_ok, "price_mismatch": mism_price, "vol_mismatch": mism_vol,
        })

    df_report = pd.DataFrame(rows)
    order = [
        "ticker",
        "daily_rows","daily_last","daily_age_days","daily_cols","daily_monotonic","daily_dups","daily_vol_zero","daily_na_%",
        "m1_rows","m1_last","m1_cols","m1_monotonic","m1_dups","m1_outside_session","m1_vol_zero","m1_na_%",
        "m5_rows","m5_last","m5_cols","m5_monotonic","m5_dups","m5_outside_session","m5_vol_zero","m5_na_%",
        "consistency_1m_to_5m","price_mismatch","vol_mismatch"
    ]
    df_report = df_report.reindex(columns=[c for c in order if c in df_report.columns])
    out = RESULTS_DIR / f"sanity_report_{_now_tag_full()}.csv"
    df_report.to_csv(out, index=False)
    _safe_display(df_report)
    print(f"\n✅ Sanity report saved → {out}")
    return df_report

# ---------- WRAPPER EKSEKUSI DENGAN LOGGING ----------
def run_all_with_logging():
    # 1) Warmup (opsional)
    if DO_WARMUP:
        try:
            tks = _infer_tickers()
            STATS["tickers"] = len(tks)
            print(f"Parallel warmup v1.3.3 • candidates={len(tks)}")
            _ = run_parallel_warmup_v133(tks)
        except Exception as e:
            logp = RESULTS_DIR / f"error_runtime_warmup_{_now_tag_full()}.log"
            with open(logp, "w") as f:
                f.write("WARMUP ERROR\n")
                traceback.print_exc(file=f)
            print(f"⚠️ Warmup error tertangkap. Log → {logp}")
    else:
        print("⚠️ Warmup SKIPPED (DO_WARMUP=False).")

    # 2) Sanity (opsional)
    if SANITY_ENABLE:
        try:
            print("\n=== SANITY (random 10) ===")
            _ = run_sanity_random_robust(sample_size=SANITY_SAMPLE_SIZE)
        except Exception as e:
            logp = RESULTS_DIR / f"error_runtime_sanity_{_now_tag_full()}.log"
            with open(logp, "w") as f:
                f.write("SANITY ERROR\n")
                traceback.print_exc(file=f)
            print(f"⚠️ Sanity error tertangkap. Log → {logp}")
    else:
        print("Sanity check dimatikan (SANITY_ENABLE=False).")

# --- RUN ONCE ---
run_all_with_logging()


### v.1.4

In [None]:
# ============================================================
# v1.4 — ALWAYS-FRESH QUICK PATH + 15m FALLBACK
# Warmup (parallel, tz-safe, retry, batch) + Robust Sanity + Toggles
# ============================================================
"""
MANUAL (v1.4)
=================
Goal: cache selalu fresh dengan performa hemat:
1) Quick Daily Scan (global):
   - Sebelum loop utama, kita hanya cek baris terakhir CSV harian untuk SEMUA emiten.
   - Jika baris terakhir sudah menyentuh "hari ini" (zona Jakarta), daily untuk emiten tsb di-skip.
   - Hanya emiten yang ketinggalan daily yang di-fetch incremental. (Hemat API call & I/O)

2) Always-Fresh Intraday + Early-Exit:
   - Di tiap emiten, sebelum panggil yfinance intraday, kita baca timestamp terakhir di cache 1m/5m/15m.
   - Jika ketiganya masih dalam batas "freshness slack" (konfigurable), langsung early-exit (skip fetch).

3) Rantai Fallback Intraday: 1m → 5m → 15m
   - get_1m_incremental sudah otomatis fallback ke 5m saat 1m kosong.
   - Jika 5m juga kosong/failed, v1.4 akan mencoba 15m sebagai fallback terakhir.
   - Statistik fallback 5m→15m dilacak di STATS["m5_fallback_15m"].

4) Retensi & Rollup:
   - Cache 1m diringkas otomatis ke 5m untuk data yang lebih tua dari WINDOW 1m (hemat ukuran file).
   - Cache 5m dibatasi WINDOW 5m. (Sesuai v1.3.3, tetap berlaku.)

Cara Pakai
---------
- Pastikan dependencies: `pip install yfinance pandas requests tqdm pytz`
- Letakkan file ini sebagai skrip harian (cron) di server kamu.
- Jalankan langsung: `python warmup_sanity_v1.4.py`
- Konfigurasi penting ada di blok CONFIG:
    * ALWAYS_FRESH_MODE / QUICK_DAILY_CHECK / EARLY_EXIT_IF_INTRADAY_FRESH
    * INTRADAY_*_FRESH_SLACK_MIN (1m/5m/15m)
    * CACHE_*_WINDOW_D (retensi)
    * OFF_HOURS_SKIP_INTRADAY (skip intraday di luar jam bursa JKT)

Catatan
-------
- v1.4 mempertahankan semua fitur v1.3.3 (parallel runner, retry/backoff, rollup 1m→5m, sanity robust),
  sambil menambahkan quick path & 15m fallback. Struktur fungsi inti v1.3.3 tetap diacu agar kompatibel.
- Untuk konteks implementasi v1.3.3 (baseline), lihat komentar sumber di akhir file.
"""

# ---------- CONFIG ----------
from pathlib import Path
import os, time, random, socket, warnings, math, traceback
import pandas as pd
import numpy as np

# Toggles
DO_WARMUP        = True    # set False untuk skip warmup dan langsung sanity
SANITY_ENABLE    = True    # set False untuk hanya warmup

# Always-fresh toggles (baru v1.4)
ALWAYS_FRESH_MODE              = True
QUICK_DAILY_CHECK              = True   # scan semua emiten: fetch hanya yang stale
EARLY_EXIT_IF_INTRADAY_FRESH   = False   # jika 1m/5m/15m masih fresh → skip yfinance

# Paths
BASE_DIR        = Path(".")
EMITEN_DIR      = BASE_DIR / "emiten"
CACHE_DAILY_DIR = EMITEN_DIR / "cache_daily"
CACHE_1M_DIR    = EMITEN_DIR / "cache_1m"
CACHE_5M_DIR    = EMITEN_DIR / "cache_5m"
CACHE_15M_DIR   = EMITEN_DIR / "cache_15m"   # baru v1.4
RESULTS_DIR     = BASE_DIR / "result"
ROSTER_PATH     = None  # ex: "emiten/candidates_active_filtered_20250813.csv" (opsional)

# Retensi / freshness
DAILY_BOOTSTRAP_Y               = 10
CACHE_1M_WINDOW_D               = 7
CACHE_5M_WINDOW_D               = 12
INTRADAY_1M_FRESH_SLACK_MIN     = 0
INTRADAY_5M_FRESH_SLACK_MIN     = 0
INTRADAY_15M_FRESH_SLACK_MIN    = 0    # baru v1.4
FILTER_SESSION_WINDOWS          = False  # True: simpan hanya 09:00–09:30 & 14:30–15:00 WIB

# yfinance retry/backoff
MAX_RETRIES   = 4
BACKOFF_S     = [0.6, 1.5, 3.0, 6.0]

# Parallel runner
MAX_WORKERS              = 6
WORKER_TIMEOUT_S         = 22
SUBMIT_BATCH             = 128
MAX_RETRY_ROUNDS         = 2
YF_PAUSE_JITTER_S        = (0.08, 0.30)
OFF_HOURS_SKIP_INTRADAY  = False
DESC_LABEL               = "Warmup(parallel v1.4)"

# Sanity
SANITY_SAMPLE_SIZE       = 10
CHECK_BEI_HOURS          = True
TOL_PCT_PRICE            = 0.001  # 0.1%
TOL_VOL_DIFF             = 0.02   # 2%

warnings.filterwarnings("ignore", category=UserWarning)

# ---------- DEPENDENCIES ----------
try:
    import yfinance as yf
except Exception as e:
    raise RuntimeError("Install dulu: pip install yfinance pandas requests tqdm pytz") from e

try:
    from tqdm import tqdm
    def _iter_progress(it, total, desc): return tqdm(it, total=total, ncols=90, desc=desc)
except Exception:
    def _iter_progress(it, total, desc): return it

# ---------- GLOBAL STATE ----------
ERRORS = {"dns": [], "missing": [], "timeout": [], "empty": [], "other": []}
STATS  = {
    "tickers": 0,
    "daily_fetch": 0, "daily_skip": 0,
    "m1_fetch": 0, "m1_skip": 0, "m1_fallback_5m": 0,
    "m5_fetch": 0, "m5_skip": 0, "m5_fallback_15m": 0,  # baru v1.4
    "m15_fetch": 0, "m15_skip": 0,
}

# ensure dirs
for d in [CACHE_DAILY_DIR, CACHE_1M_DIR, CACHE_5M_DIR, CACHE_15M_DIR, RESULTS_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# ---------- TIME HELPERS (tz-safe) ----------
def _ensure_aware_utc(ts=None):
    """Return UTC-aware Timestamp regardless of input being naive/aware/None."""
    t = pd.Timestamp.utcnow() if ts is None else pd.Timestamp(ts)
    if t.tz is None:
        return t.tz_localize("UTC")
    return t.tz_convert("UTC")

def _naive(ts):
    t = pd.Timestamp(ts)
    if t.tz is None:
        return t
    return t.tz_convert("UTC").tz_localize(None)


def _now_utc():   return _naive(pd.Timestamp.utcnow())
def _today_utc(): return _now_utc().normalize()


def _now_date_tag():
    try:
        import pytz
        tz = pytz.timezone("Asia/Jakarta")
        return pd.Timestamp.now(tz).strftime("%Y%m%d")
    except Exception:
        return pd.Timestamp.utcnow().strftime("%Y%m%d")


def _now_tag_full():
    try:
        import pytz
        tz = pytz.timezone("Asia/Jakarta")
        return pd.Timestamp.now(tz).strftime("%Y%m%d_%H%M%S")
    except Exception:
        return pd.Timestamp.utcnow().strftime("%Y%m%d_%H%M%S")

# ---------- IO & SANITIZE ----------
def _idx_naive(df: pd.DataFrame):
    if df is None or len(df)==0: return df
    out = df.copy()
    if not isinstance(out.index, pd.DatetimeIndex):
        out.index = pd.to_datetime(out.index, errors="coerce", utc=True).tz_convert("UTC").tz_localize(None)
    elif out.index.tz is not None:
        out.index = out.index.tz_convert("UTC").tz_localize(None)
    return out[~out.index.duplicated(keep="last")].sort_index()


def _sanitize_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    if df is None or len(df)==0: return pd.DataFrame()
    out = _idx_naive(df)
    keep = [c for c in ["Open","High","Low","Close","Adj Close","Volume"] if c in out.columns]
    if not keep: keep = list(out.columns)
    out = out[keep].dropna(how="all")
    return out


def _read_cache_csv(path: Path) -> pd.DataFrame:
    if not path.exists(): return pd.DataFrame()
    try:
        df = pd.read_csv(path, parse_dates=[0], index_col=0)
    except Exception:
        try:
            df = pd.read_csv(path)
            if "Date" in df.columns: df = df.set_index("Date")
        except Exception:
            return pd.DataFrame()
    return _sanitize_ohlcv(df)


def _resample_to_5m(df_1m: pd.DataFrame) -> pd.DataFrame:
    if df_1m.empty: return df_1m
    df = _idx_naive(df_1m)
    agg = {"Open":"first","High":"max","Low":"min","Close":"last","Volume":"sum"}
    if "Adj Close" in df.columns: agg["Adj Close"] = "last"
    out = df.resample("5min", label="right", closed="right").agg(agg)
    ohlc = [c for c in ["Open","High","Low","Close"] if c in out.columns]
    out = out.dropna(subset=ohlc, how="all")
    return _sanitize_ohlcv(out)


def _write_cache_csv(path: Path, df_new: pd.DataFrame, window_days: int|None=None):
    p = Path(path); p.parent.mkdir(parents=True, exist_ok=True)
    df_new = _sanitize_ohlcv(df_new)
    if p.exists():
        df_old = _read_cache_csv(p)
        df = pd.concat([df_old, df_new]).sort_index()
        df = df[~df.index.duplicated(keep="last")]
    else:
        df = df_new

    now = _now_utc()
    path_lower = str(p).lower().replace("\\","/")
    is_1m   = "cache_1m" in path_lower
    is_5m   = "cache_5m" in path_lower
    is_15m  = "cache_15m" in path_lower
    is_daily= "cache_daily" in path_lower

    # session windows filter (opsional)
    if FILTER_SESSION_WINDOWS and (is_1m or is_5m or is_15m) and not df.empty:
        import pytz
        jkt = pytz.timezone("Asia/Jakarta")
        dfl = df.copy()
        dfl.index = dfl.index.tz_localize("UTC").tz_convert(jkt)
        mask = (
            ((dfl.index.time >= pd.Timestamp("09:00").time()) & (dfl.index.time < pd.Timestamp("09:30").time())) |
            ((dfl.index.time >= pd.Timestamp("14:30").time()) & (dfl.index.time <= pd.Timestamp("15:00").time()))
        )
        dfl = dfl[mask]
        dfl.index = dfl.index.tz_convert("UTC").tz_localize(None)
        df = dfl

    if is_1m:
        cutoff_1m = now - pd.Timedelta(days=window_days or CACHE_1M_WINDOW_D)
        older = df[df.index < cutoff_1m]
        tail  = df[df.index >= cutoff_1m]
        if not older.empty:
            df5_add = _resample_to_5m(older)
            p5 = CACHE_5M_DIR / p.name
            if p5.exists(): df5_old = _read_cache_csv(p5)
            else: df5_old = pd.DataFrame()
            df5 = pd.concat([df5_old, df5_add]).sort_index()
            df5 = df5[~df5.index.duplicated(keep="last")]
            cutoff_5m = now - pd.Timedelta(days=CACHE_5M_WINDOW_D)
            df5 = df5[df5.index >= cutoff_5m]
            df5.to_csv(p5)
        tail.to_csv(p); return

    if is_5m:
        keep_days = window_days or CACHE_5M_WINDOW_D
        cutoff_5m = now - pd.Timedelta(days=keep_days)
        df = df[df.index >= cutoff_5m]
        df.to_csv(p); return

    if is_15m:
        # Ikuti window 5m (atau atur sendiri bila perlu)
        keep_days = window_days or CACHE_5M_WINDOW_D
        cutoff_15m = now - pd.Timedelta(days=keep_days)
        df = df[df.index >= cutoff_15m]
        df.to_csv(p); return

    if is_daily:
        if window_days:
            cutoff_d = now.normalize() - pd.Timedelta(days=window_days)
            df = df[df.index >= cutoff_d]
        df.to_csv(p); return

    df.to_csv(p)

# ---------- FAST TAIL READER (untuk quick check) ----------

def _read_last_ts_csv(path: Path):
    """Baca timestamp baris terakhir dari CSV tanpa load full file. Return naive UTC Timestamp atau None."""
    try:
        if not path.exists() or path.stat().st_size == 0:
            return None
        with open(path, "rb") as f:
            f.seek(0, 2)
            end = f.tell()
            size = min(4096, end)
            f.seek(end - size)
            chunk = f.read().decode("utf-8", errors="ignore")
        lines = [ln for ln in chunk.strip().splitlines() if ln.strip()]
        if not lines:
            return None
        last = lines[-1]
        if last.lower().startswith(("date,", "index,")):
            last = lines[-2] if len(lines) >= 2 else None
        if not last:
            return None
        first_field = last.split(",")[0].strip()
        ts = pd.to_datetime(first_field, errors="coerce", utc=True)
        if ts is pd.NaT:
            return None
        return ts.tz_convert("UTC").tz_localize(None)
    except Exception:
        return None


def _jkt_today_utc_naive():
    import pytz
    jkt = pd.Timestamp.now(pytz.timezone("Asia/Jakarta")).normalize()
    return jkt.tz_convert("UTC").tz_localize(None)


# ---------- yfinance via requests.Session (no curl) ----------
os.environ["YF_USE_CURL"] = "0"
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

_YF_SESSION = requests.Session()
retries = Retry(
    total=5, connect=5, read=5,
    backoff_factor=0.6,
    status_forcelist=(429,500,502,503,504),
    allowed_methods=False
)
adapter = HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=20)
_YF_SESSION.mount("https://", adapter)
_YF_SESSION.mount("http://", adapter)
_YF_SESSION.headers.update({"User-Agent": "Mozilla/5.0 (warmup-service)"})
import importlib; importlib.reload(yf)


def yfdl(ticker, **kw):
    if "start" in kw and kw["start"] is not None: kw["start"] = _naive(kw["start"]).to_pydatetime()
    if "end"   in kw and kw["end"]   is not None: kw["end"]   = _naive(kw["end"]).to_pydatetime()
    kw.pop("threads", None)
    kw["progress"]    = False
    kw["auto_adjust"] = False
    kw["actions"]     = False
    kw["group_by"]    = "column"
    kw["session"]     = _YF_SESSION
    kw.setdefault("timeout", 20)

    for attempt in range(MAX_RETRIES):
        try:
            try: socket.gethostbyname("query2.finance.yahoo.com")
            except Exception: pass
            df = yf.download(ticker, **kw)
            df = _sanitize_ohlcv(df)
            if df is not None and not df.empty:
                return df
        except Exception as e:
            low = str(e).lower()
            if "could not resolve host" in low or "temporary failure in name resolution" in low:
                ERRORS["dns"].append(ticker); break
            elif "timed out" in low:
                ERRORS["timeout"].append(ticker)
            elif "no price data found" in low or "possibly delisted" in low:
                ERRORS["missing"].append(ticker); return pd.DataFrame()
            else:
                ERRORS["other"].append(f"{ticker}: {e}")
        time.sleep(BACKOFF_S[min(attempt, len(BACKOFF_S)-1)] + random.random()*0.8)
    ERRORS["empty"].append(ticker)
    return pd.DataFrame()

# ---------- FRESH CHECK ----------

def _fresh_enough(last_ts, now_ts, slack_min):
    try:
        return _naive(last_ts) >= (_naive(now_ts) - pd.Timedelta(minutes=slack_min))
    except Exception:
        return False


# ---------- INCREMENTAL FETCHERS ----------

def get_daily_incremental(ticker: str):
    path = CACHE_DAILY_DIR / f"{ticker}.csv"
    base = _read_cache_csv(path)
    today = _today_utc()
    if base.empty:
        fresh = yfdl(ticker, period=f"{DAILY_BOOTSTRAP_Y}y", interval="1d")
        if fresh.empty:
            STATS["daily_skip"] += 1; return base, "skip(empty)"
        _write_cache_csv(path, fresh, window_days=None)
        STATS["daily_fetch"] += 1; return _read_cache_csv(path), "fetch(bootstrap)"
    last_date = _naive(base.index.max()).normalize()
    start, end = _naive(last_date + pd.Timedelta(days=1)), _naive(today + pd.Timedelta(days=1))
    if start > end:
        STATS["daily_skip"] += 1; return base, "skip(fresh)"
    fresh = yfdl(ticker, start=start, end=end, interval="1d")
    if fresh.empty:
        STATS["daily_skip"] += 1; return base, "skip(empty)"
    _write_cache_csv(path, fresh, window_days=None)
    STATS["daily_fetch"] += 1; return _read_cache_csv(path), "fetch(incremental)"


def get_1m_incremental(ticker: str):
    path = CACHE_1M_DIR / f"{ticker}.csv"
    base = _read_cache_csv(path)
    now  = _now_utc().floor("min")
    hzn  = _naive(now - pd.Timedelta(days=CACHE_1M_WINDOW_D))
    if base.empty:
        fresh = yfdl(ticker, period=f"{CACHE_1M_WINDOW_D}d", interval="1m")
        if fresh.empty:
            df5, _ = get_5m_incremental(ticker)
            STATS["m1_fallback_5m"] += 1; return df5, "fallback(5m)"
        _write_cache_csv(path, fresh, window_days=CACHE_1M_WINDOW_D)
        STATS["m1_fetch"] += 1; return _read_cache_csv(path), "fetch(bootstrap)"
    last_ts = _naive(base.index.max())
    if _fresh_enough(last_ts, now, INTRADAY_1M_FRESH_SLACK_MIN):
        _write_cache_csv(path, base, window_days=CACHE_1M_WINDOW_D)
        STATS["m1_skip"] += 1; return base, "skip(fresh)"
    start = max(last_ts + pd.Timedelta(minutes=1), hzn)
    fresh = yfdl(ticker, start=_naive(start), end=_naive(now), interval="1m")
    if fresh.empty:
        _write_cache_csv(path, base, window_days=CACHE_1M_WINDOW_D)
        STATS["m1_skip"] += 1; return base, "skip(empty)"
    _write_cache_csv(path, fresh, window_days=CACHE_1M_WINDOW_D)
    STATS["m1_fetch"] += 1; return _read_cache_csv(path), "fetch(incremental)"


def get_5m_incremental(ticker: str):
    path = CACHE_5M_DIR / f"{ticker}.csv"
    base = _read_cache_csv(path)
    now  = _now_utc().floor("min")
    hzn  = _naive(now - pd.Timedelta(days=CACHE_5M_WINDOW_D))
    if base.empty:
        fresh = yfdl(ticker, period=f"{CACHE_5M_WINDOW_D}d", interval="5m")
        if fresh.empty:
            STATS["m5_skip"] += 1; return base, "skip(empty)"
        _write_cache_csv(path, fresh, window_days=CACHE_5M_WINDOW_D)
        STATS["m5_fetch"] += 1; return _read_cache_csv(path), "fetch(bootstrap)"
    last_ts = _naive(base.index.max())
    if _fresh_enough(last_ts, now, INTRADAY_5M_FRESH_SLACK_MIN):
        _write_cache_csv(path, base, window_days=CACHE_5M_WINDOW_D)
        STATS["m5_skip"] += 1; return base, "skip(fresh)"
    start = max(last_ts + pd.Timedelta(minutes=5), hzn)
    fresh = yfdl(ticker, start=_naive(start), end=_naive(now), interval="5m")
    if fresh.empty:
        _write_cache_csv(path, base, window_days=CACHE_5M_WINDOW_D)
        STATS["m5_skip"] += 1; return base, "skip(empty)"
    _write_cache_csv(path, fresh, window_days=CACHE_5M_WINDOW_D)
    STATS["m5_fetch"] += 1; return _read_cache_csv(path), "fetch(incremental)"


def get_15m_incremental(ticker: str):
    """Ambil/merge interval 15m ke CACHE_15M_DIR/<ticker>.csv."""
    path = CACHE_15M_DIR / f"{ticker}.csv"
    base = _read_cache_csv(path)
    now  = _now_utc().floor("min")
    # Ambil horizon default 30d jika kosong
    if base.empty:
        start = _naive(now - pd.Timedelta(days=30))
        fresh = yfdl(ticker, start=start, end=_naive(now), interval="15m")
        if fresh.empty:
            STATS["m15_skip"] += 1; return base, "skip(empty)"
        _write_cache_csv(path, fresh, window_days=CACHE_5M_WINDOW_D)
        STATS["m15_fetch"] += 1; return _read_cache_csv(path), "fetch(bootstrap)"
    last_ts = _naive(base.index.max())
    if _fresh_enough(last_ts, now, INTRADAY_15M_FRESH_SLACK_MIN):
        _write_cache_csv(path, base, window_days=CACHE_5M_WINDOW_D)
        STATS["m15_skip"] += 1; return base, "skip(fresh)"
    # overlap sedikit
    start = last_ts + pd.Timedelta(minutes=15)
    hzn   = _naive(now - pd.Timedelta(days=CACHE_5M_WINDOW_D))
    start = max(start, hzn)
    fresh = yfdl(ticker, start=_naive(start), end=_naive(now), interval="15m")
    if fresh.empty:
        _write_cache_csv(path, base, window_days=CACHE_5M_WINDOW_D)
        STATS["m15_skip"] += 1; return base, "skip(empty)"
    _write_cache_csv(path, fresh, window_days=CACHE_5M_WINDOW_D)
    STATS["m15_fetch"] += 1; return _read_cache_csv(path), "fetch(incremental)"


# ---------- TICKER SOURCE ----------

def _detect_roster_path():
    cand = sorted(EMITEN_DIR.glob("candidates_active_filtered_*.csv"))
    return str(cand[-1]) if cand else None


def _infer_tickers():
    roster = ROSTER_PATH or _detect_roster_path()
    if roster and Path(roster).exists():
        df = pd.read_csv(roster)
        for c in df.columns:
            if c.lower() in ("ticker","symbol","kode","emiten"):
                return df[c].astype(str).str.strip().tolist()
        return df.iloc[:,0].astype(str).str.strip().tolist()
    s = set()
    for d in [CACHE_DAILY_DIR, CACHE_1M_DIR, CACHE_5M_DIR, CACHE_15M_DIR]:
        if Path(d).exists():
            for f in Path(d).glob("*.csv"): s.add(f.stem)
    return sorted(s)


# ---------- MARKET-TIME GUARD (TZ-SAFE) ----------

def _is_jkt_market_time(ts_utc=None):
    import pytz
    jkt = pytz.timezone("Asia/Jakarta")
    t_utc = _ensure_aware_utc(ts_utc)        # tz-safe
    t_jkt = t_utc.tz_convert(jkt)
    if t_jkt.weekday() >= 5: return False
    tm = t_jkt.time()
    return (pd.Timestamp("09:00").time() <= tm < pd.Timestamp("11:30").time()) or \
           (pd.Timestamp("13:30").time() <= tm <= pd.Timestamp("15:00").time())


def _prewarm_dns():
    for host in ("query1.finance.yahoo.com","query2.finance.yahoo.com"):
        try: socket.gethostbyname(host)
        except: pass


def _sleep_jitter(a,b): time.sleep(random.uniform(a,b))

def _stat_label(x): return (str(x[1]) if isinstance(x, tuple) and len(x)==2 else "(ok)")


# ---------- QUICK DAILY SCAN (global) ----------
FRESH_DAILY_SET = set()
STALE_DAILY_SET = set()


def scan_daily_freshness(all_tickers: list[str]) -> tuple[set[str], set[str]]:
    fresh, stale = set(), set()
    today_utc = _jkt_today_utc_naive()
    for t in all_tickers:
        p = CACHE_DAILY_DIR / f"{t}.csv"
        last = _read_last_ts_csv(p)
        if last is not None and pd.Timestamp(last).normalize() >= today_utc:
            fresh.add(t)
        else:
            stale.add(t)
    return fresh, stale


# ---------- PARALLEL RUNNER v1.4 ----------
import concurrent.futures as cf
try:
    from tqdm import tqdm
    def _tqdm(x, **k): return tqdm(x, **k)
except Exception:
    def _tqdm(x, **k): return x


def _warmup_one_ticker(ticker: str, do_intraday: bool):
    try:
        # === DAILY (quick-scan aware) ===
        if ALWAYS_FRESH_MODE and QUICK_DAILY_CHECK and ticker in FRESH_DAILY_SET:
            dstat = "skip(daily-fresh-quick)"
            d = (None, dstat)
        else:
            d = get_daily_incremental(ticker)
        _sleep_jitter(*YF_PAUSE_JITTER_S)

        # === INTRADAY EARLY-EXIT ===
        if do_intraday and ALWAYS_FRESH_MODE and EARLY_EXIT_IF_INTRADAY_FRESH:
            now = _now_utc().floor("min")
            p1 = CACHE_1M_DIR / f"{ticker}.csv"
            p5 = CACHE_5M_DIR / f"{ticker}.csv"
            p15= CACHE_15M_DIR / f"{ticker}.csv"
            last_1  = _read_last_ts_csv(p1)
            last_5  = _read_last_ts_csv(p5)
            last_15 = _read_last_ts_csv(p15)
            m1_ok  = last_1  is not None and _fresh_enough(last_1,  now, INTRADAY_1M_FRESH_SLACK_MIN)
            m5_ok  = last_5  is not None and _fresh_enough(last_5,  now, INTRADAY_5M_FRESH_SLACK_MIN)
            m15_ok = last_15 is not None and _fresh_enough(last_15, now, INTRADAY_15M_FRESH_SLACK_MIN)
            if m1_ok and m5_ok and m15_ok:
                return (ticker, _stat_label(d), "skip(fresh-quick)", "skip(fresh-quick)", "skip(fresh-quick)", None)

        # === INTRADAY ===
        if do_intraday:
            m1_df, m1_stat = get_1m_incremental(ticker)
            _sleep_jitter(*YF_PAUSE_JITTER_S)
            m5_df, m5_stat = get_5m_incremental(ticker)

            # 15m fallback jika 1m & 5m tidak memberikan data berguna
            need_15m = (
                (m1_df is None or getattr(m1_df, "empty", False)) and
                (m5_df is None or getattr(m5_df, "empty", False) or "skip(empty)" in str(m5_stat).lower())
            )
            if need_15m:
                STATS["m5_fallback_15m"] += 1
                _sleep_jitter(*YF_PAUSE_JITTER_S)
                m15_df, m15_stat = get_15m_incremental(ticker)
            else:
                m15_df, m15_stat = None, "skip(15m-not-needed)"
        else:
            m1_stat = "(skipped-offhours)"; m5_stat = "(skipped-offhours)"; m15_stat = "(skipped-offhours)"

        return (ticker, _stat_label(d), str(m1_stat), str(m5_stat), str(m15_stat), None)
    except Exception as e:
        return (ticker, None, None, None, None, str(e))


def run_parallel_warmup_v14(tickers):
    RESULTS_DIR.mkdir(parents=True, exist_ok=True)
    all_rows, all_errs = [], []
    rounds = MAX_RETRY_ROUNDS + 1
    remain = list(tickers)
    t0 = time.time()

    for r in range(rounds):
        if not remain: break
        do_intraday = (not OFF_HOURS_SKIP_INTRADAY) or _is_jkt_market_time()
        label = f"{DESC_LABEL} R{r}{' (intraday)' if do_intraday else ' (daily-only)'}"
        print(f"\nRound {r+1}/{rounds} • tickers={len(remain)} • workers={MAX_WORKERS} • timeout={WORKER_TIMEOUT_S}s • {('INTRA ON' if do_intraday else 'INTRA OFF')}")
        _prewarm_dns()

        failed_this = []
        pbar = tqdm(total=len(remain), ncols=90, desc=label)

        batches = math.ceil(len(remain)/SUBMIT_BATCH)
        idx = 0
        for b in range(batches):
            batch = remain[idx: idx+SUBMIT_BATCH]; idx += SUBMIT_BATCH
            futs = []
            with cf.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
                for tkr in batch:
                    futs.append(ex.submit(_warmup_one_ticker, tkr, do_intraday))
                for fut in futs:
                    try:
                        tkr, dstat, m1stat, m5stat, m15stat, err = fut.result(timeout=WORKER_TIMEOUT_S)
                        all_rows.append({"round": r, "ticker": tkr, "daily": dstat, "m1": m1stat, "m5": m5stat, "m15": m15stat, "error": err or ""})
                        if err:
                            ERRORS["other"].append(f"{tkr}: {err}")
                            all_errs.append((tkr, err, r))
                            failed_this.append(tkr)
                    except cf.TimeoutError:
                        msg = f"worker-timeout>{WORKER_TIMEOUT_S}s"
                        ERRORS["timeout"].append("ticker_task")
                        all_rows.append({"round": r, "ticker": "?", "daily": "", "m1": "", "m5": "", "m15": "", "error": msg})
                        all_errs.append(("?", msg, r))
                    finally:
                        pbar.update(1)
            _sleep_jitter(0.3, 0.8)
        pbar.close()
        remain = sorted(set(t for t in failed_this if t and t.endswith(".JK")))

    tag = _now_tag_full()
    if all_rows:
        pd.DataFrame(all_rows).to_csv(RESULTS_DIR / f"summary_parallel_{tag}.csv", index=False)
        print(f"[SUMMARY CSV] result/summary_parallel_{tag}.csv")
    if all_errs:
        pd.DataFrame(all_errs, columns=["Ticker","Error","Round"]).drop_duplicates().to_csv(RESULTS_DIR / f"errors_parallel_{tag}.csv", index=False)
        print(f"[ERRORS CSV]  result/errors_parallel_{tag}.csv")

    elapsed = time.time() - t0
    print("\n===== SUMMARY (parallel v1.4) =====")
    print(f"Tickers total   : {STATS.get('tickers', len(tickers))}")
    print(f"Daily   fetch/skip : {STATS.get('daily_fetch',0)}/{STATS.get('daily_skip',0)}")
    print(f"1m      fetch/skip : {STATS.get('m1_fetch',0)}/{STATS.get('m1_skip',0)}  | fallback→5m: {STATS.get('m1_fallback_5m',0)}")
    print(f"5m      fetch/skip : {STATS.get('m5_fetch',0)}/{STATS.get('m5_skip',0)}  | fallback→15m: {STATS.get('m5_fallback_15m',0)}")
    print(f"15m     fetch/skip : {STATS.get('m15_fetch',0)}/{STATS.get('m15_skip',0)}")
    print(f"Errors  dns/missing/timeout/empty/other : "
          f"{len(ERRORS['dns'])}/{len(ERRORS['missing'])}/{len(ERRORS['timeout'])}/{len(ERRORS['empty'])}/{len(ERRORS['other'])}")
    print(f"Elapsed         : {elapsed:.2f}s")
    return all_rows, tag


# ---------- ROBUST SANITY (tetap) ----------

def _safe_display(df):
    try:
        display(df)
    except Exception:
        print(df.head(20).to_string())


def _cols_ok(df):
    if df is None or df.empty: return []
    known = ["Open","High","Low","Close","Adj Close","Volume"]
    return [c for c in known if c in df.columns]


def _age_days_safe(df):
    if df is None or df.empty: return None
    try:
        last = df.index.max()
        if pd.isna(last): return None
        return int((pd.Timestamp.utcnow().normalize() - pd.Timestamp(last).normalize()).days)
    except Exception:
        return None


def _is_jkt_session(ts):
    import pytz
    jkt = pytz.timezone("Asia/Jakarta")
    t_utc = _ensure_aware_utc(ts)      # tz-safe
    t_jkt = t_utc.tz_convert(jkt)
    if t_jkt.weekday() >= 5: return False
    tm = t_jkt.time()
    return (pd.Timestamp("09:00").time() <= tm < pd.Timestamp("11:30").time()) or \
           (pd.Timestamp("13:30").time() <= tm <= pd.Timestamp("15:00").time())


def _resample_1m_to_5m_safe(df_1m: pd.DataFrame):
    if df_1m is None or df_1m.empty: return pd.DataFrame()
    df = df_1m.copy()
    agg = {}
    if "Open" in df.columns:  agg["Open"]  = "first"
    if "High" in df.columns:  agg["High"]  = "max"
    if "Low" in df.columns:   agg["Low"]   = "min"
    if "Close" in df.columns: agg["Close"] = "last"
    if "Volume" in df.columns:agg["Volume"]= "sum"
    if "Adj Close" in df.columns: agg["Adj Close"] = "last"
    if not agg:
        return pd.DataFrame()
    out = df.resample("5min", label="right", closed="right").agg(agg)
    price_cols = [c for c in ["Open","High","Low","Close"] if c in out.columns]
    if price_cols:
        out = out.dropna(subset=price_cols, how="all")
    return _idx_naive(out)


def run_sanity_random_robust(sample_size=SANITY_SAMPLE_SIZE, tol_price=TOL_PCT_PRICE, tol_vol=TOL_VOL_DIFF):
    # Kumpulkan tickers dari cache yang ada
    s = set()
    for d in [CACHE_DAILY_DIR, CACHE_1M_DIR, CACHE_5M_DIR, CACHE_15M_DIR]:
        if Path(d).exists():
            for f in Path(d).glob("*.csv"):
                s.add(f.stem)
    tickers = sorted(s)
    if not tickers:
        print("⚠️ Tidak ada file di cache_* untuk sanity.")
        return pd.DataFrame()

    sample = tickers if len(tickers) <= sample_size else random.sample(tickers, sample_size)

    rows = []
    for tkr in sample:
        pD, p1, p5 = CACHE_DAILY_DIR/f"{tkr}.csv", CACHE_1M_DIR/f"{tkr}.csv", CACHE_5M_DIR/f"{tkr}.csv"
        dfD, df1, df5 = _read_cache_csv(pD), _read_cache_csv(p1), _read_cache_csv(p5)

        d_cols = _cols_ok(dfD)
        d_dups = int(dfD.index.duplicated().sum()) if not dfD.empty else 0
        d_mono = bool(dfD.index.is_monotonic_increasing) if not dfD.empty else True
        d_last = str(dfD.index.max()) if not dfD.empty else ""
        d_age  = _age_days_safe(dfD)
        d_vol0 = int((dfD["Volume"]==0).sum()) if ("Volume" in dfD.columns and not dfD.empty) else 0
        d_na   = {c: float(dfD[c].isna().mean()) for c in dfD.columns} if not dfD.empty else {}

        m1_cols = _cols_ok(df1)
        m1_dups = int(df1.index.duplicated().sum()) if not df1.empty else 0
        m1_mono = bool(df1.index.is_monotonic_increasing) if not df1.empty else True
        m1_last = str(df1.index.max()) if not df1.empty else ""
        m1_rows = len(df1)
        m1_outside = int((~df1.index.to_series().map(_is_jkt_session)).sum()) if (CHECK_BEI_HOURS and not df1.empty) else 0
        m1_vol0 = int((df1["Volume"]==0).sum()) if ("Volume" in df1.columns and not df1.empty) else 0
        m1_na   = {c: float(df1[c].isna().mean()) for c in df1.columns} if not df1.empty else {}

        m5_cols = _cols_ok(df5)
        m5_dups = int(df5.index.duplicated().sum()) if not df5.empty else 0
        m5_mono = bool(df5.index.is_monotonic_increasing) if not df5.empty else True
        m5_last = str(df5.index.max()) if not df5.empty else ""
        m5_rows = len(df5)
        m5_outside = int((~df5.index.to_series().map(_is_jkt_session)).sum()) if (CHECK_BEI_HOURS and not df5.empty) else 0
        m5_vol0 = int((df5["Volume"]==0).sum()) if ("Volume" in df5.columns and not df5.empty) else 0
        m5_na   = {c: float(df5[c].isna().mean()) for c in df5.columns} if not df5.empty else {}

        # Konsistensi 1m->5m (hari terakhir)
        res_ok, mism_price, mism_vol = "", None, None
        if not df1.empty and not df5.empty:
            try:
                last_day = pd.Timestamp(df1.index.max()).normalize()
                d1 = df1.loc[df1.index.normalize() == last_day]
                r5 = _resample_1m_to_5m_safe(d1)
                d5 = df5.loc[df5.index.normalize() == last_day]
                idx = r5.index.intersection(d5.index)
                if not idx.empty:
                    r5i, d5i = r5.loc[idx], d5.loc[idx]
                    mism_price = False
                    for col in [c for c in ["Open","High","Low","Close"] if c in r5i.columns and c in d5i.columns]:
                        base = d5i[col].replace(0, np.nan).astype(float)
                        diff = (r5i[col].astype(float) - d5i[col].astype(float)).abs() / base
                        if diff.dropna().gt(tol_price).any():
                            mism_price = True; break
                    mism_vol = False
                    if "Volume" in r5i.columns and "Volume" in d5i.columns:
                        basev = d5i["Volume"].replace(0, np.nan).astype(float)
                        diffv = (r5i["Volume"].astype(float) - d5i["Volume"].astype(float)).abs() / basev
                        if diffv.dropna().gt(tol_vol).any():
                            mism_vol = True
                    res_ok = "OK" if (not mism_price and not mism_vol) else "MISMATCH"
                else:
                    res_ok = "NO_OVERLAP"
            except Exception as e:
                res_ok = f"CHECK_ERROR: {e}"

        rows.append({
            "ticker": tkr,
            "daily_rows": len(dfD), "daily_last": d_last, "daily_age_days": d_age,
            "daily_cols": ",".join(d_cols), "daily_monotonic": d_mono, "daily_dups": d_dups,
            "daily_vol_zero": d_vol0, "daily_na_%": round(sum(d_na.values())/max(1,len(d_na))*100,3) if d_na else None,
            "m1_rows": m1_rows, "m1_last": m1_last, "m1_cols": ",".join(m1_cols), "m1_monotonic": m1_mono,
            "m1_dups": m1_dups, "m1_outside_session": m1_outside, "m1_vol_zero": m1_vol0,
            "m1_na_%": round(sum(m1_na.values())/max(1,len(m1_na))*100,3) if m1_na else None,
            "m5_rows": m5_rows, "m5_last": m5_last, "m5_cols": ",".join(m5_cols), "m5_monotonic": m5_mono,
            "m5_dups": m5_dups, "m5_outside_session": m5_outside, "m5_vol_zero": m5_vol0,
            "m5_na_%": round(sum(m5_na.values())/max(1,len(m5_na))*100,3) if m5_na else None,
            "consistency_1m_to_5m": res_ok, "price_mismatch": mism_price, "vol_mismatch": mism_vol,
        })

    df_report = pd.DataFrame(rows)
    order = [
        "ticker",
        "daily_rows","daily_last","daily_age_days","daily_cols","daily_monotonic","daily_dups","daily_vol_zero","daily_na_%",
        "m1_rows","m1_last","m1_cols","m1_monotonic","m1_dups","m1_outside_session","m1_vol_zero","m1_na_%",
        "m5_rows","m5_last","m5_cols","m5_monotonic","m5_dups","m5_outside_session","m5_vol_zero","m5_na_%",
        "consistency_1m_to_5m","price_mismatch","vol_mismatch"
    ]
    df_report = df_report.reindex(columns=[c for c in order if c in df_report.columns])
    out = RESULTS_DIR / f"sanity_report_{_now_tag_full()}.csv"
    df_report.to_csv(out, index=False)
    _safe_display(df_report)
    print(f"\n✅ Sanity report saved → {out}")
    return df_report


# ---------- WRAPPER EKSEKUSI DENGAN LOGGING ----------

def run_all_with_logging():
    global FRESH_DAILY_SET, STALE_DAILY_SET

    # 0) Ticker roster
    tks = _infer_tickers()
    STATS["tickers"] = len(tks)

    # 0.5) Quick Daily Scan (opsional)
    if ALWAYS_FRESH_MODE and QUICK_DAILY_CHECK:
        FRESH_DAILY_SET, STALE_DAILY_SET = scan_daily_freshness(tks)
        print(f"QuickDaily: fresh={len(FRESH_DAILY_SET)} stale={len(STALE_DAILY_SET)} (of {len(tks)})")
    else:
        FRESH_DAILY_SET, STALE_DAILY_SET = set(), set(tks)

    # 1) Warmup (opsional)
    if DO_WARMUP:
        try:
            print(f"Parallel warmup v1.4 • candidates={len(tks)}")
            _ = run_parallel_warmup_v14(tks)
        except Exception as e:
            logp = RESULTS_DIR / f"error_runtime_warmup_{_now_tag_full()}.log"
            with open(logp, "w") as f:
                f.write("WARMUP ERROR\n")
                traceback.print_exc(file=f)
            print(f"⚠️ Warmup error tertangkap. Log → {logp}")
    else:
        print("⚠️ Warmup SKIPPED (DO_WARMUP=False).")

    # 2) Sanity (opsional)
    if SANITY_ENABLE:
        try:
            print("\n=== SANITY (random 10) ===")
            _ = run_sanity_random_robust(sample_size=SANITY_SAMPLE_SIZE)
        except Exception as e:
            logp = RESULTS_DIR / f"error_runtime_sanity_{_now_tag_full()}.log"
            with open(logp, "w") as f:
                f.write("SANITY ERROR\n")
                traceback.print_exc(file=f)
            print(f"⚠️ Sanity error tertangkap. Log → {logp}")
    else:
        print("Sanity check dimatikan (SANITY_ENABLE=False).")


# --- RUN ONCE ---
if __name__ == "__main__":
    run_all_with_logging()

# Baseline v1.3.3 taken from the user's previous script as reference for compatibility and structure.
# (See chat/file reference).
