# Colab 1/3 — Daten-Aufbereitung

**Rolle in der 3-Saeulen-Architektur:** KI-Labor (Daten-Vorbereitung)

**Dieses Notebook tut NUR:**
- BTC/USDT OHLCV-Daten von Binance oder Yahoo Finance laden
- Features berechnen (FeatureEngine)
- Scaler fitten und speichern
- Alles auf Google Drive sichern fuer Notebook 2 und 3

**Kein GPU noetig! Wähle bei Runtime: `None` (CPU)**

---
**Warum 3 getrennte Notebooks?**
Ein einzelnes Notebook das alles macht (Daten + Evolution + PPO) verbraucht nach
~1h den gesamten RAM (12 GB). Die Daten bleiben im RAM waehrend das Training laeuft.
Mit 3 Notebooks wird jede Aufgabe in einer frischen Session gestartet - kein Altlast-RAM.
---

## Schritt 1: Repo klonen & Dependencies installieren

In [None]:
import os, shutil

PROJECT_DIR = '/content/BITCOIN4Traders'
REPO_URL    = 'https://github.com/juancarlosrial76-code/BITCOIN4Traders.git'

if os.path.exists(PROJECT_DIR) and not os.path.exists(f'{PROJECT_DIR}/.git'):
    shutil.rmtree(PROJECT_DIR)

if not os.path.exists(PROJECT_DIR):
    !git clone {REPO_URL} {PROJECT_DIR} --quiet
    print('Repo geklont.')
else:
    !git -C {PROJECT_DIR} pull --quiet
    print('Repo aktualisiert.')

os.chdir(PROJECT_DIR)
print(f'Verzeichnis: {os.getcwd()}')

In [None]:
%%time
# Nur Daten-Dependencies - kein torch, kein gymnasium
# Schneller + weniger RAM als full install
!pip install -q ccxt loguru pyarrow pandas numpy ta yfinance numba joblib pyyaml scikit-learn python-dotenv tqdm
print('Dependencies installiert.')

## Schritt 2: Google Drive mounten

In [None]:
from google.colab import drive
drive.mount('/content/drive')

DRIVE_DIR   = '/content/drive/MyDrive/BITCOIN4Traders'
DRIVE_DATA  = f'{DRIVE_DIR}/data'
DRIVE_PROC  = f'{DRIVE_DIR}/processed'

import os
for d in [DRIVE_DIR, DRIVE_DATA, DRIVE_PROC]:
    os.makedirs(d, exist_ok=True)

print(f'Drive bereit: {DRIVE_DIR}')

## Schritt 3: Konfiguration

In [None]:
from datetime import datetime, timedelta

# ===== EINSTELLUNGEN =====
SYMBOL      = 'BTC/USDT'
TIMEFRAME   = '1h'
END_DATE    = None            # None = bis heute

# Datentyp: float32 spart 50% RAM vs float64
DTYPE       = 'float32'

# Max Candles in RAM halten (Colab: 12 GB Limit)
MAX_CANDLES = 17_000  # ~2 Jahre a 1h = 17.520 Bars

# Yahoo Finance erlaubt fuer 1h-Daten maximal 729 Tage zurueck.
# Wir rechnen das automatisch aus - kein manuelles Anpassen noetig.
YF_MAX_DAYS = 729
YF_START    = (datetime.utcnow() - timedelta(days=YF_MAX_DAYS)).strftime('%Y-%m-%d')

# Binance/KuCoin koennen weiter zurueck (seit 2020), aber Colab-IPs
# werden von Binance US geblockt (451). KuCoin ist der zuverlaessigste Fallback.
CCXT_START  = '2022-01-01'  # 3 Jahre - genuegt fuer robustes Training

print(f'Symbol:          {SYMBOL}')
print(f'CCXT Start:      {CCXT_START}')
print(f'YF Start (auto): {YF_START}  (max {YF_MAX_DAYS} Tage fuer 1h-Daten)')
print(f'Max Bars:        {MAX_CANDLES:,}')

## Schritt 4: Daten laden (Binance oder Yahoo Finance Fallback)

In [None]:
import sys, gc, time
import pandas as pd
import numpy as np
from pathlib import Path
from loguru import logger

sys.path.insert(0, '/content/BITCOIN4Traders')
sys.path.insert(0, '/content/BITCOIN4Traders/src')

CACHE_FILE = Path(DRIVE_DATA) / 'BTC_USDT_1h_raw.parquet'

# ─────────────────────────────────────────────────────────────────────
# Hilfsfunktion: CCXT OHLCV → sauberer DataFrame
# ─────────────────────────────────────────────────────────────────────
def ccxt_to_df(ohlcv_list, dtype='float32'):
    df = pd.DataFrame(ohlcv_list, columns=['timestamp','open','high','low','close','volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df = df.set_index('timestamp').sort_index()
    df = df[~df.index.duplicated(keep='last')]
    return df.astype(dtype)

def fetch_ccxt(exchange_id, symbol, timeframe, start_date, dtype='float32'):
    """Laedt OHLCV via CCXT mit Paginierung. Gibt None bei Fehler zurueck."""
    import ccxt
    ex = getattr(ccxt, exchange_id)({'enableRateLimit': True})
    since_ms = ex.parse8601(f'{start_date}T00:00:00Z')
    all_ohlcv, limit = [], 1000
    logger.info(f'Lade von {exchange_id} ({symbol} {timeframe} ab {start_date})...')
    while True:
        batch = ex.fetch_ohlcv(symbol, timeframe, since=since_ms, limit=limit)
        if not batch:
            break
        all_ohlcv.extend(batch)
        since_ms = batch[-1][0] + 1
        if len(batch) < limit:
            break
        time.sleep(0.25)
    if not all_ohlcv:
        raise ValueError(f'Keine Daten von {exchange_id}')
    df = ccxt_to_df(all_ohlcv, dtype)
    logger.success(f'{exchange_id}: {len(df):,} Bars geladen')
    return df

def fetch_yfinance(symbol_yf, start_date, dtype='float32'):
    """Laedt von Yahoo Finance. Behebt Multi-Level-Columns Bug von yfinance."""
    import yfinance as yf
    logger.info(f'Lade {symbol_yf} von Yahoo Finance (ab {start_date})...')
    df = yf.download(symbol_yf, start=start_date, interval='1h',
                     progress=False, auto_adjust=True)
    if df.empty:
        raise ValueError(f'Keine Daten von Yahoo Finance fuer {symbol_yf}')
    # FIX: yfinance gibt MultiIndex-Spalten zurueck (z.B. ('Close','BTC-USD')).
    # df.columns = [c.lower() for c in df.columns] wirft dann
    # 'tuple object has no attribute lower'.
    # Loesung: Spalten auf erste Ebene reduzieren, dann lower().
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = df.columns.get_level_values(0)
    df.columns = [str(c).lower() for c in df.columns]
    df = df[['open', 'high', 'low', 'close', 'volume']]
    # Timezone entfernen (tz-aware Index -> tz-naive)
    if hasattr(df.index, 'tz') and df.index.tz is not None:
        df.index = df.index.tz_localize(None)
    df.index.name = 'timestamp'
    df = df.astype(dtype)
    logger.success(f'Yahoo Finance: {len(df):,} Bars geladen')
    return df

# ─────────────────────────────────────────────────────────────────────
# Daten laden: Cache → KuCoin → Bybit → Yahoo Finance
# ─────────────────────────────────────────────────────────────────────
if CACHE_FILE.exists():
    logger.info(f'Lade gecachte Daten von Drive...')
    price_data = pd.read_parquet(CACHE_FILE)
    logger.success(f'Cache: {len(price_data):,} Bars')

else:
    price_data = None

    # Versuch 1: KuCoin (kein Geo-Block, kein US-Limit)
    if price_data is None:
        try:
            price_data = fetch_ccxt('kucoin', 'BTC/USDT', TIMEFRAME, CCXT_START, DTYPE)
        except Exception as e:
            logger.warning(f'KuCoin fehlgeschlagen: {e}')

    # Versuch 2: Bybit (grosser Exchange, kein Geo-Block)
    if price_data is None:
        try:
            price_data = fetch_ccxt('bybit', 'BTC/USDT', TIMEFRAME, CCXT_START, DTYPE)
        except Exception as e:
            logger.warning(f'Bybit fehlgeschlagen: {e}')

    # Versuch 3: OKX
    if price_data is None:
        try:
            price_data = fetch_ccxt('okx', 'BTC/USDT', TIMEFRAME, CCXT_START, DTYPE)
        except Exception as e:
            logger.warning(f'OKX fehlgeschlagen: {e}')

    # Versuch 4: Yahoo Finance (max 729 Tage fuer 1h, kein Login)
    # YF_START ist bereits auf 729 Tage begrenzt (aus Schritt 3 berechnet)
    if price_data is None:
        try:
            price_data = fetch_yfinance('BTC-USD', YF_START, DTYPE)
        except Exception as e:
            logger.error(f'Yahoo Finance fehlgeschlagen: {e}')

    if price_data is None:
        raise RuntimeError(
            'Alle Datenquellen fehlgeschlagen!\n'
            'KuCoin / Bybit / OKX / Yahoo Finance konnten keine Daten liefern.\n'
            'Bitte Runtime neu starten und erneut versuchen.'
        )

    # Auf MAX_CANDLES begrenzen (neueste Bars)
    if len(price_data) > MAX_CANDLES:
        price_data = price_data.iloc[-MAX_CANDLES:]

    # Auf Drive cachen
    logger.info(f'Speichere auf Drive: {CACHE_FILE}')
    price_data.to_parquet(CACHE_FILE, engine='pyarrow', compression='snappy')

# NaN und Nullwerte entfernen
price_data = price_data.replace(0, np.nan).dropna()

# Auf MAX_CANDLES begrenzen auch wenn von Cache
if len(price_data) > MAX_CANDLES:
    price_data = price_data.iloc[-MAX_CANDLES:]

# RAM-Check
mem_mb = price_data.memory_usage(deep=True).sum() / 1024**2
print(f'\nDaten: {len(price_data):,} Bars | RAM: {mem_mb:.1f} MB | dtype: {price_data.dtypes[0]}')
print(f'Zeitraum: {price_data.index[0]} bis {price_data.index[-1]}')
print(price_data.tail(3))

## Schritt 5: Features berechnen & Scaler speichern

In [None]:
import gc
from features.feature_engine import FeatureEngine, FeatureConfig

PROC_DIR = Path(DRIVE_PROC)

# ── Split: 70% Train, 15% Val, 15% Test ─────────────────────────────
n         = len(price_data)
train_end = int(n * 0.70)
val_end   = int(n * 0.85)

train_raw = price_data.iloc[:train_end]
val_raw   = price_data.iloc[train_end:val_end]
test_raw  = price_data.iloc[val_end:]

logger.info(f'Split: Train={len(train_raw):,} | Val={len(val_raw):,} | Test={len(test_raw):,}')

# ── Feature Engineering ─────────────────────────────────────────────
feat_cfg = FeatureConfig(
    volatility_window=20,
    ou_window=20,
    rolling_mean_window=20,
    use_log_returns=True,
    scaler_type='standard',
    save_scaler=True,
    scaler_path=PROC_DIR,         # Scaler wird auf Drive gespeichert
    dropna_strategy='rolling',
    min_valid_rows=500,
)

engine = FeatureEngine(feat_cfg)

logger.info('Fit FeatureEngine auf Trainingsdaten (KEIN Leakage)...')
train_feat = engine.fit_transform(train_raw)
val_feat   = engine.transform(val_raw)
test_feat  = engine.transform(test_raw)

# Indizes angleichen
idx_train = train_raw.index.intersection(train_feat.index)
idx_val   = val_raw.index.intersection(val_feat.index)
idx_test  = test_raw.index.intersection(test_feat.index)

logger.success(f'Features: {train_feat.shape[1]} Spalten | Train-Samples: {len(idx_train):,}')

# ── Auf Drive speichern (komprimiert float32) ───────────────────────
def save_split(price, feat, idx, name):
    p_path = PROC_DIR / f'{name}_price.parquet'
    f_path = PROC_DIR / f'{name}_feat.parquet'
    price.loc[idx].astype('float32').to_parquet(p_path, compression='snappy')
    feat.loc[idx].astype('float32').to_parquet(f_path, compression='snappy')
    size_mb = (p_path.stat().st_size + f_path.stat().st_size) / 1024**2
    logger.success(f'Gespeichert: {name} ({size_mb:.1f} MB)')

save_split(train_raw, train_feat, idx_train, 'train')
save_split(val_raw,   val_feat,   idx_val,   'val')
save_split(test_raw,  test_feat,  idx_test,  'test')

# ── RAM freigeben ───────────────────────────────────────────────────
del train_feat, val_feat, test_feat, train_raw, val_raw, test_raw, price_data
gc.collect()

print('\nDaten-Aufbereitung abgeschlossen!')
print(f'Dateien auf Drive: {PROC_DIR}')
print('Weiter mit: Colab_2_Evolution.ipynb oder Colab_3_PPO_Training.ipynb')

## Schritt 6: Ergebnis pruefen

Zeigt alle gespeicherten Dateien auf Drive.

In [None]:
import os
from pathlib import Path

print('=== Gespeicherte Dateien auf Drive ===')
total = 0
for f in sorted(Path(DRIVE_PROC).iterdir()):
    mb = f.stat().st_size / 1024**2
    total += mb
    print(f'  {f.name:35s}  {mb:.1f} MB')
print(f'\nGesamt: {total:.1f} MB')
print('\nNotebook 1 fertig. Starte jetzt Notebook 2 oder 3.')