In [9]:
import os
import time
import warnings
from datetime import datetime, timedelta
import pandas as pd
import requests
import yfinance as yf
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from binance.client import Client
from defillama2 import DefiLlama # 라이브러리 사용 유지

warnings.filterwarnings('ignore')

# --- 설정 ---
START_DATE = "2017-01-01"
END_DATE = (datetime.today() + timedelta(days=1)).strftime('%Y-%m-%d')
OUTPUT_DIR = "./macro_data_4h"
os.makedirs(OUTPUT_DIR, exist_ok=True)

UPBIT_CRYPTO_TICKERS = {
    'KRW-BTC': ('BTC', 'BTC'), 'KRW-ETH': ('ETH', 'ETH'), 'KRW-XRP': ('XRP', 'XRP'),
    'KRW-SOL': ('SOL', 'SOL'), 'KRW-ADA': ('ADA', 'ADA'), 'KRW-DOGE': ('DOGE', 'DOGE'),
    'KRW-AVAX': ('AVAX', 'AVAX'), 'KRW-DOT': ('DOT', 'DOT')
}

MACRO_TICKERS = {
    'DX-Y.NYB': 'DXY', 'GC=F': 'GOLD', '^VIX': 'VIX', '^GSPC': 'SP500'
}

DEFI_PROTOCOLS = ['makerdao', 'lido', 'aave', 'uniswap', 'curve-dex']
L2_CHAINS = ['Arbitrum', 'Optimism', 'Base', 'zkSync Era']

# --- 안전한 요청을 위한 세션 설정 (443 에러 방지 핵심) ---
def get_session():
    session = requests.Session()
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Accept": "application/json"
    })
    # 재시도 횟수 5회, 간격 증가(Backoff)
    retry = Retry(total=5, backoff_factor=2, status_forcelist=[429, 500, 502, 503, 504])
    session.mount('https://', HTTPAdapter(max_retries=retry))
    return session

# ============================================================================
# 1. Upbit (진짜 4시간봉 수집)
# ============================================================================
def collect_upbit_crypto_prices_4h():
    print(f"\n[1/6] Collecting 4H cryptocurrency prices from Upbit...")
    session = get_session()
    start_dt = pd.to_datetime(START_DATE)
    merged_df = None
    
    for market, (symbol, _) in UPBIT_CRYPTO_TICKERS.items():
        try:
            all_candles = []
            to_date = None
            while True:
                # minutes/240 = 4시간봉
                url = "https://api.upbit.com/v1/candles/minutes/240"
                params = {'market': market, 'count': 200}
                if to_date: params['to'] = to_date
                
                # 타임아웃 10초 설정
                resp = session.get(url, params=params, timeout=10)
                resp.raise_for_status()
                candles = resp.json()
                if not candles: break
                
                all_candles.extend(candles)
                
                # 날짜 확인
                last_date = pd.to_datetime(candles[-1]['candle_date_time_kst'])
                if last_date <= start_dt: break
                
                to_date = candles[-1]['candle_date_time_utc']
                time.sleep(0.2) # 요청 간격 0.1 -> 0.2초로 완화
            
            if not all_candles:
                print(f"  - {symbol}: No data")
                continue
            
            df = pd.DataFrame(all_candles)
            df['timestamp'] = pd.to_datetime(df['candle_date_time_kst']) # KST 기준
            
            df = df.rename(columns={
                'opening_price': f'{symbol}_Open',
                'high_price': f'{symbol}_High',
                'low_price': f'{symbol}_Low',
                'trade_price': f'{symbol}_Close',
                'candle_acc_trade_volume': f'{symbol}_Volume'
            })
            
            df = df[['timestamp', f'{symbol}_Open', f'{symbol}_High', 
                    f'{symbol}_Low', f'{symbol}_Close', f'{symbol}_Volume']]
            df = df.sort_values('timestamp')
            df = df[df['timestamp'] >= start_dt]
            
            merged_df = df if merged_df is None else pd.merge(merged_df, df, on='timestamp', how='outer')
            print(f"  - {symbol}: {len(df)} 4H candles")
            
        except Exception as e:
            print(f"  - {symbol}: Failed ({str(e)[:50]})")
    
    if merged_df is not None:
        merged_df = merged_df.sort_values('timestamp')
        merged_df.to_csv(os.path.join(OUTPUT_DIR, "crypto_4h_kst.csv"), index=False)
        print(f"  Saved: crypto_4h_kst.csv")

# ============================================================================
# 2. Macro Indicators (일봉 -> 4시간봉 변환)
# ============================================================================
def collect_macro_indicators_4h():
    print(f"\n[2/6] Collecting macro indicators (resampled to 4H)...")
    for ticker, name in MACRO_TICKERS.items():
        try:
            df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False, interval='1d')
            if isinstance(df.columns, pd.MultiIndex):
                df = df.xs('Close', level=0, axis=1)
            else:
                df = df[['Close']]
            
            # 인덱스 정리
            df.index = pd.to_datetime(df.index).tz_localize(None)
            
            # 4시간봉으로 리샘플링 (직전 값 채우기 - Forward Fill)
            df_4h = df.resample('4H').ffill()
            df_4h.columns = [name]
            df_4h.index.name = 'timestamp'
            
            # 2017년부터 필터링
            df_4h = df_4h[df_4h.index >= pd.to_datetime(START_DATE)]
            
            df_4h.to_csv(os.path.join(OUTPUT_DIR, f"{name}_4h.csv"))
            print(f"  - {name}: {len(df_4h)} rows")
        except Exception as e:
            print(f"  - {name}: Failed ({str(e)[:50]})")

# ============================================================================
# 3. Fear & Greed (일봉 -> 4시간봉 변환)
# ============================================================================
def collect_fear_greed_4h():
    print(f"\n[3/6] Collecting Fear & Greed Index (resampled to 4H)...")
    try:
        session = get_session()
        url = "https://api.alternative.me/fng/?limit=4000&format=json"
        resp = session.get(url, timeout=10)
        data = resp.json()['data']
        
        df = pd.DataFrame(data)
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
        df = df[['timestamp', 'value']].rename(columns={'value': 'fear_greed'})
        df['fear_greed'] = df['fear_greed'].astype(float)
        df = df.set_index('timestamp').sort_index()
        
        # 4시간봉 리샘플링
        df_4h = df.resample('4H').ffill().reset_index()
        
        df_4h.to_csv(os.path.join(OUTPUT_DIR, "fear_greed_4h.csv"), index=False)
        print(f"  - Fear & Greed: {len(df_4h)} rows")
    except Exception as e:
        print(f"  - Fear & Greed: Failed ({str(e)[:50]})")

# ============================================================================
# 4. ETH Funding Rate (8시간봉 -> 4시간봉 변환)
# ============================================================================
def collect_funding_rate_4h():
    print(f"\n[4/6] Collecting ETH funding rate (resampled to 4H)...")
    try:
        client = Client("", "")
        funding_rates = []
        start_ts = int(datetime.strptime(START_DATE, "%Y-%m-%d").timestamp() * 1000)
        end_ts = int(datetime.strptime(END_DATE, "%Y-%m-%d").timestamp() * 1000)
        
        while start_ts < end_ts:
            rates = client.futures_funding_rate(symbol='ETHUSDT', startTime=start_ts, limit=1000)
            if not rates: break
            funding_rates.extend(rates)
            start_ts = rates[-1]['fundingTime'] + 1
            time.sleep(0.2) # 0.1 -> 0.2 안전하게
        
        df = pd.DataFrame(funding_rates)
        df['timestamp'] = pd.to_datetime(df['fundingTime'], unit='ms')
        df['fundingRate'] = df['fundingRate'].astype(float)
        df = df[['timestamp', 'fundingRate']].sort_values('timestamp').set_index('timestamp')
        
        # 8시간 -> 4시간 리샘플링
        df_4h = df.resample('4H').ffill().reset_index()
        
        df_4h.to_csv(os.path.join(OUTPUT_DIR, "eth_funding_rate_4h.csv"), index=False)
        print(f"  - Funding Rate: {len(df_4h)} rows")
    except Exception as e:
        print(f"  - Funding Rate: Failed ({str(e)[:50]})")

# ============================================================================
# 5. DeFi TVL (일봉 -> 4시간봉 보간)
# ============================================================================
def collect_defi_tvl_4h():
    print(f"\n[5/6] Collecting DeFi TVL (interpolated to 4H)...")
    
    # DefiLlama 라이브러리 사용하되, 안전장치 추가
    obj = DefiLlama()
    
    def safe_run(func, *args):
        try:
            time.sleep(2) # 요청 전 2초 대기 (강력한 Rate Limit 방지)
            return func(*args)
        except Exception as e:
            print(f"    Error: {str(e)[:30]}... Retrying in 5s")
            time.sleep(5)
            return func(*args) # 한번 더 시도

    # 1. Chain TVL
    try:
        df = safe_run(obj.get_chain_hist_tvl, 'Ethereum')
        df = df.reset_index().rename(columns={'date': 'timestamp', 'tvl': 'eth_chain_tvl'})
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df = df.set_index('timestamp').resample('4H').interpolate().reset_index()
        df.to_csv(os.path.join(OUTPUT_DIR, 'eth_chain_tvl_4h.csv'), index=False)
        print(f"  - ETH Chain TVL: {len(df)} rows")
    except Exception as e:
        print(f"  - ETH Chain TVL Failed: {e}")

    # 2. Protocol TVL
    for protocol in DEFI_PROTOCOLS:
        try:
            tvl_dict = safe_run(obj.get_protocol_hist_tvl_by_chain, protocol)
            if 'Ethereum' in tvl_dict:
                df = tvl_dict['Ethereum'].reset_index().rename(columns={'date': 'timestamp', 'tvl': f'{protocol}_eth_tvl'})
                df['timestamp'] = pd.to_datetime(df['timestamp'])
                # 4시간으로 부드럽게 보간 (Interpolate)
                df_4h = df.set_index('timestamp').resample('4H').interpolate().reset_index()
                df_4h.to_csv(os.path.join(OUTPUT_DIR, f'{protocol}_eth_tvl_4h.csv'), index=False)
                print(f"  - {protocol}: {len(df_4h)} rows")
        except Exception as e:
            print(f"  - {protocol}: Failed ({str(e)[:30]})")

    # 3. USDT Mcap
    try:
        df = safe_run(obj.get_stablecoin_hist_mcap_on_a_chain, 1, 'ethereum')
        df = df.reset_index().rename(columns={'date': 'timestamp', 'mcap': 'usdt_eth_mcap'})
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_4h = df.set_index('timestamp').resample('4H').interpolate().reset_index()
        df_4h.to_csv(os.path.join(OUTPUT_DIR, 'usdt_eth_mcap_4h.csv'), index=False)
        print(f"  - USDT Mcap: {len(df_4h)} rows")
    except: pass

# ============================================================================
# 6. Layer 2 TVL (일봉 -> 4시간봉 보간)
# ============================================================================
def collect_l2_tvl_4h():
    print(f"\n[6/6] Collecting Layer 2 TVL (interpolated to 4H)...")
    session = get_session()
    
    for chain in L2_CHAINS:
        try:
            time.sleep(2) # 2초 대기
            url = f"https://api.llama.fi/v2/historicalChainTvl/{chain}"
            resp = session.get(url, timeout=20)
            
            if resp.status_code == 200:
                df = pd.DataFrame(resp.json())
                df['timestamp'] = pd.to_datetime(df['date'], unit='s')
                df = df.set_index('timestamp').rename(columns={'tvl': f'{chain.lower()}_tvl'})
                
                # 4시간 보간
                df_4h = df[f'{chain.lower()}_tvl'].resample('4H').interpolate().reset_index()
                df_4h.to_csv(os.path.join(OUTPUT_DIR, f'{chain.lower()}_tvl_4h.csv'), index=False)
                print(f"  - {chain}: {len(df_4h)} rows")
            else:
                print(f"  - {chain}: API Status {resp.status_code}")
        except Exception as e:
            print(f"  - {chain}: Failed ({str(e)[:30]})")

# ============================================================================
# Main
# ============================================================================

print("ETH Price Prediction - 4H Data Collection Pipeline (Robust)")
print("=" * 80)

collect_upbit_crypto_prices_4h()
collect_macro_indicators_4h()
collect_fear_greed_4h()
collect_funding_rate_4h()
collect_defi_tvl_4h()
collect_l2_tvl_4h()

print("\n" + "=" * 80)
print("4H Data collection completed!")
print(f"Output directory: {OUTPUT_DIR}")
print("=" * 80)



ETH Price Prediction - 4H Data Collection Pipeline (Robust)

[1/6] Collecting 4H cryptocurrency prices from Upbit...
  - BTC: 17901 4H candles
  - ETH: 17878 4H candles
  - XRP: 17877 4H candles
  - SOL: 9024 4H candles
  - ADA: 17817 4H candles
  - DOGE: 10422 4H candles
  - AVAX: 8327 4H candles
  - DOT: 11274 4H candles
  Saved: crypto_4h_kst.csv

[2/6] Collecting macro indicators (resampled to 4H)...
  - DXY: 19495 rows
  - GOLD: 19495 rows
  - VIX: 19495 rows
  - SP500: 19495 rows

[3/6] Collecting Fear & Greed Index (resampled to 4H)...
  - Fear & Greed: 17137 rows

[4/6] Collecting ETH funding rate (resampled to 4H)...
  - Funding Rate: 13151 rows

[5/6] Collecting DeFi TVL (interpolated to 4H)...
  - ETH Chain TVL: 17899 rows
  - makerdao: 15116 rows
  - lido: 10821 rows
  - aave: 12104 rows
    Error: HTTPSConnectionPool(host='api.... Retrying in 5s
  - uniswap: 15488 rows
  - curve-dex: 12710 rows
  - USDT Mcap: 17521 rows

[6/6] Collecting Layer 2 TVL (interpolated to 4H)...

In [34]:
import os
import pandas as pd
import glob
import numpy as np

OUTPUT_DIR = "./macro_data_4h"
FINAL_FILE = "final_dataset_4h.csv"

def robust_load_csv(path):
    # (기존과 동일: 날짜 파싱 및 정렬)
    df = pd.read_csv(path)
    date_col = None
    for col in df.columns:
        if col.lower() in ['timestamp', 'date', 'datetime', 'time', 'index']:
            date_col = col
            break
    if not date_col:
        try:
            pd.to_datetime(df.iloc[:, 0])
            date_col = df.columns[0]
        except: return None
            
    df = df.rename(columns={date_col: 'timestamp'})
    df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
    df = df.dropna(subset=['timestamp'])
    
    if df['timestamp'].dt.tz is not None:
        df['timestamp'] = df['timestamp'].dt.tz_localize(None)
        
    df['timestamp'] = df['timestamp'].dt.floor('min')
    return df.sort_values('timestamp')

def smart_merge_and_fill():
    print("=" * 60)
    print("Smart Merge: 0 for Pre-history, ffill for Missing")
    print("=" * 60)

    # 1. Master (Upbit) 로드
    upbit_path = os.path.join(OUTPUT_DIR, "crypto_4h_kst.csv")
    if not os.path.exists(upbit_path):
        print("Error: Master file not found.")
        return

    df_master = robust_load_csv(upbit_path)
    df_master = df_master.drop_duplicates('timestamp').reset_index(drop=True)
    print(f"Master Range: {df_master['timestamp'].min()} ~ {df_master['timestamp'].max()}")

    # 2. 모든 파일 병합 (일단 NaN 상태로)
    all_files = glob.glob(os.path.join(OUTPUT_DIR, "*.csv"))
    target_files = [f for f in all_files if "crypto_4h_kst.csv" not in f and "final_dataset" not in f]

    for file_path in target_files:
        try:
            filename = os.path.basename(file_path)
            df_temp = robust_load_csv(file_path)
            if df_temp is None or df_temp.empty: continue
            
            df_temp = df_temp.drop_duplicates('timestamp')
            
            # merge_asof로 병합 (매칭 안되면 NaN 들어감)
            df_master = pd.merge_asof(
                df_master, 
                df_temp, 
                on='timestamp', 
                direction='backward',
                tolerance=pd.Timedelta('24h') 
            )
            print(f"  + Merged {filename}")
            
        except Exception as e:
            print(f"  - Error merging {filename}: {e}")

    # 3. [핵심 로직] 스마트 결측치 처리
    print("\nApplying Smart Fill Logic...")
    
    # 타임스탬프를 제외한 모든 컬럼에 대해 반복
    feature_cols = [c for c in df_master.columns if c != 'timestamp']
    
    for col in feature_cols:
        # 1) 첫 유효 값(First Valid Index) 찾기
        first_idx = df_master[col].first_valid_index()
        
        if first_idx is None:
            # 데이터가 아예 하나도 없으면 전체 0
            df_master[col] = 0
            print(f"  - {col}: No data (Filled 0)")
            continue
            
        # 2) 유효 구간(First Index 이후)은 ffill 적용
        # (데이터 시작 후 발생한 결측은 '누락'이므로 직전 값 유지)
        df_master.loc[first_idx:, col] = df_master.loc[first_idx:, col].ffill()
        
        # 3) 비유효 구간(First Index 이전)은 0으로 채움
        # (데이터 시작 전이므로 '존재하지 않음' = 0)
        df_master.loc[:first_idx, col] = df_master.loc[:first_idx, col].fillna(0)
        
        # 혹시 남은 NaN(맨 끝부분 등)이 있다면 0 처리 (안전장치)
        df_master[col] = df_master[col].fillna(0)

    # 4. 저장
    save_path = os.path.join(OUTPUT_DIR, FINAL_FILE)
    df_master.to_csv(save_path, index=False)

    print("=" * 60)
    print(f"Completed! Saved to {save_path}")
    print(f"Shape: {df_master.shape}")
    
    # 검증: 앞부분(0이어야 함)과 뒷부분(값 있어야 함) 확인
    sample_col = [c for c in df_master.columns if 'tvl' in c.lower()][0] # TVL 컬럼 하나 골라서 확인
    print(f"\n[Verify Logic with '{sample_col}']")
    print("--- Head (Should be 0) ---")
    print(df_master[['timestamp', sample_col]].head(3))
    print("--- Tail (Should have value) ---")
    print(df_master[['timestamp', sample_col]].tail(3))
    return df_master


df= smart_merge_and_fill()


Smart Merge: 0 for Pre-history, ffill for Missing
Master Range: 2017-09-26 17:00:00 ~ 2025-11-27 13:00:00
  + Merged usdt_eth_mcap_4h.csv
  + Merged base_tvl_4h.csv
  + Merged lido_eth_tvl_4h.csv
  + Merged eth_chain_tvl_4h.csv
  + Merged zksync era_tvl_4h.csv
  + Merged aave_eth_tvl_4h.csv
  + Merged DXY_4h.csv
  + Merged VIX_4h.csv
  + Merged curve-dex_eth_tvl_4h.csv
  + Merged makerdao_eth_tvl_4h.csv
  + Merged fear_greed_4h.csv
  + Merged SP500_4h.csv
  + Merged binance_4h_kst.csv
  + Merged arbitrum_tvl_4h.csv
  + Merged uniswap_eth_tvl_4h.csv
  + Merged optimism_tvl_4h.csv
  + Merged eth_funding_rate_4h.csv
  + Merged GOLD_4h.csv

Applying Smart Fill Logic...
Completed! Saved to ./macro_data_4h/final_dataset_4h.csv
Shape: (17901, 102)

[Verify Logic with 'base_tvl']
--- Head (Should be 0) ---
            timestamp  base_tvl
0 2017-09-26 17:00:00       0.0
1 2017-09-26 21:00:00       0.0
2 2017-09-27 01:00:00       0.0
--- Tail (Should have value) ---
                timestamp    

In [35]:
df.to_csv("eth_4hour.csv",index=False)
