In [None]:
import os
import json
import pandas as pd
import numpy as np
import joblib
from sklearn.preprocessing import StandardScaler
import yfinance as yf # Yahoo Finance
from datetime import datetime

In [None]:
# --- Config ---
TICKER = "BTC-USD" # a nickname/shorthand for a stock e.g NGN for Nigerian Naira
DATA_DIR = "data/raw"
MANIFEST_FILE = os.path.join(DATA_DIR, "manifest.json") # a metadata which describes the data(receipt)
START_DATE = "2015-01-01"   # earliest Yahoo Finance data for BTC
END_DATE = datetime.today().strftime("%Y-%m-%d")
INTERVAL = "1d"  # daily granularity
PROJECT_DIRS = [
    "data/raw",
    "data/processed",
    "data/features",
    "experiments",
    "models"
]

RAW_DIR = "data/raw"
PROCESSED_DIR = "data/processed"

FEATURES_DIR = "data/features"
PROCESSED_FILE = os.path.join(FEATURES_DIR, "BTC-USD_daily_features.parquet")
OUTPUT_FILE = os.path.join(FEATURES_DIR, "BTC-USD_daily_ml_ready.parquet")
SCALER_FILE = os.path.join(FEATURES_DIR, "scaler.pkl")

In [None]:
def ensure_dirs():
    os.makedirs(DATA_DIR, exist_ok=True)

def load_manifest():
    if os.path.exists(MANIFEST_FILE):
        with open(MANIFEST_FILE, "r") as f:
            return json.load(f)
    return {}

def save_manifest(manifest):
    with open(MANIFEST_FILE, "w") as f:
        json.dump(manifest, f, indent=2)

def fetch_btc_data():
    print(f"Fetching {TICKER} data from {START_DATE} to {END_DATE}...")
    df = yf.download(TICKER, start=START_DATE, end=END_DATE, interval=INTERVAL)
    df.reset_index(inplace=True)
    return df

def save_data(df):
    fname = f"{TICKER}_{INTERVAL}_{START_DATE}_{END_DATE}.csv.gz" # a csv file which has been compressed
    fpath = os.path.join(DATA_DIR, fname)
    df.to_csv(fpath, index=False, compression="gzip")
    print(f"Saved data to {fpath}")
    return fpath

def setup_directories():
    for d in PROJECT_DIRS:
        os.makedirs(d, exist_ok=True)

def scrape_data():
    ensure_dirs()
    setup_directories()
    manifest = load_manifest()
    df = fetch_btc_data()
    fpath = save_data(df)

    # Update manifest
    manifest_entry = {
        "source": "yfinance",
        "ticker": TICKER,
        "interval": INTERVAL,
        "rows": len(df),
        "start": str(df["Date"].iloc[0]),
        "end": str(df["Date"].iloc[-1]),
        "file": fpath,
        "downloaded_at": datetime.now().isoformat()
    }
    manifest[f"{TICKER}_{INTERVAL}"] = manifest_entry
    save_manifest(manifest)

    print("Manifest updated:", MANIFEST_FILE)

scrape_data()

  df = yf.download(TICKER, start=START_DATE, end=END_DATE, interval=INTERVAL)
[*********************100%***********************]  1 of 1 completed

Fetching BTC-USD data from 2015-01-01 to 2025-09-21...
Saved data to data/raw\BTC-USD_1d_2015-01-01_2025-09-21.csv.gz
Manifest updated: data/raw\manifest.json





In [21]:
def clean_btc_data(raw_file):
    # Load raw data
    df = pd.read_csv(raw_file, parse_dates=["Date"])
    
    # Standardize column names
    df = df.rename(columns={
        "Date": "timestamp",
        "Open": "open",
        "High": "high",
        "Low": "low",
        "Close": "close",
        "Adj Close": "adj_close",
        "Volume": "volume"
    })
    
    # Drop Adj Close (redundant for crypto)
    df = df.drop(columns=["adj_close"], errors="ignore")

    # Ensure sorted
    df = df.sort_values("timestamp").reset_index(drop=True)

    # Align to daily frequency (UTC)
    df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
    full_range = pd.date_range(
        start=df["timestamp"].min(), 
        end=df["timestamp"].max(), 
        freq="1D", 
        tz="UTC"
    )
    df = df.set_index("timestamp").reindex(full_range)

    # Fill missing values
    df["open"] = df["open"].ffill()
    df["high"] = df["high"].ffill()
    df["low"] = df["low"].ffill()
    df["close"] = df["close"].ffill()
    df["volume"] = df["volume"].fillna(0)

    # Reset index
    df = df.reset_index().rename(columns={"index": "timestamp"})

    # Remove duplicates if any
    df = df.drop_duplicates(subset=["timestamp"])

    return df

def clean_data():
    os.makedirs(PROCESSED_DIR, exist_ok=True)

    # Find latest raw file
    raw_files = [f for f in os.listdir(RAW_DIR) if f.endswith(".csv.gz")]
    if not raw_files:
        raise FileNotFoundError("No raw data files found in data/raw/")
    raw_file = os.path.join(RAW_DIR, sorted(raw_files)[-1])

    # Clean
    cleaned_df = clean_btc_data(raw_file)

    # Save
    processed_file = os.path.join(PROCESSED_DIR, "BTC-USD_daily_clean.parquet")
    cleaned_df.to_parquet(processed_file, index=False)
    print(f"Saved cleaned data to {processed_file}")
    print(f"Rows: {len(cleaned_df)}, Date range: {cleaned_df['timestamp'].min()} → {cleaned_df['timestamp'].max()}")

clean_data()

Saved cleaned data to data/processed\BTC-USD_daily_clean.parquet
Rows: 3916, Date range: 2015-01-01 00:00:00+00:00 → 2025-09-20 00:00:00+00:00


In [None]:
def add_indicators(df):
    """Compute technical indicators and returns/volatility features."""

    # --- Moving Averages ---
    for win in [5, 10, 21, 50, 200]:
        df[f"sma_{win}"] = df["close"].rolling(win).mean()
        df[f"ema_{win}"] = df["close"].ewm(span=win, adjust=False).mean()

    # --- Momentum / Returns ---
    df["log_return"] = np.log(df["close"] / df["close"].shift(1))
    df["pct_change"] = df["close"].pct_change()

    # Multi-horizon returns
    for h in [3, 7, 14, 30]:
        df[f"return_{h}d"] = df["close"].pct_change(h)

    # --- Volatility ---
    for win in [5, 10, 21]:
        df[f"volatility_{win}d"] = df["log_return"].rolling(win).std()

    # --- RSI (14-day default) ---
    delta = df["close"].diff()
    gain = np.where(delta > 0, delta, 0)
    loss = np.where(delta < 0, -delta, 0)
    roll_up = pd.Series(gain).rolling(14).mean()
    roll_down = pd.Series(loss).rolling(14).mean()
    rs = roll_up / (roll_down + 1e-9)  # avoid div/0
    df["rsi_14"] = 100.0 - (100.0 / (1.0 + rs))

    # --- Bollinger Bands (20, 2) ---
    rolling_mean = df["close"].rolling(20).mean()
    rolling_std = df["close"].rolling(20).std()
    df["bollinger_mid"] = rolling_mean
    df["bollinger_up"] = rolling_mean + (rolling_std * 2)
    df["bollinger_down"] = rolling_mean - (rolling_std * 2)

    # --- MACD (12,26,9) ---
    ema12 = df["close"].ewm(span=12, adjust=False).mean()
    ema26 = df["close"].ewm(span=26, adjust=False).mean()
    df["macd"] = ema12 - ema26
    df["macd_signal"] = df["macd"].ewm(span=9, adjust=False).mean()

    # --- On-Balance Volume (OBV) ---
    df["obv"] = (np.sign(df["close"].diff()) * df["volume"]).fillna(0).cumsum()

    # --- VWAP (Volume Weighted Average Price) ---
    df["vwap"] = (df["close"] * df["volume"]).cumsum() / (df["volume"].cumsum() + 1e-9)

    return df

def build_features():
    os.makedirs(FEATURES_DIR, exist_ok=True)

    # Load cleaned data
    processed_file = os.path.join(PROCESSED_DIR, "BTC-USD_daily_clean.parquet") # unlike row based csv, parquet is column based
    if not os.path.exists(processed_file):
        raise FileNotFoundError("Run clean_data first to generate processed data")
    
    df = pd.read_parquet(processed_file)

    numeric_cols = ["close", "volume", "open", "high", "low"]
    df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')

    # Add features
    df = add_indicators(df)

    # Drop initial rows with NaNs from rolling windows
    df = df.dropna().reset_index(drop=True)

    # Save
    features_file = os.path.join(FEATURES_DIR, "BTC-USD_daily_features.parquet")
    df.to_parquet(features_file, index=False)
    print(f"Saved features to {features_file}")
    print(f"Final shape: {df.shape}, Columns: {len(df.columns)}")

build_features()

Saved features to data/features\BTC-USD_daily_features.parquet
Final shape: (3717, 33), Columns: 33


In [None]:
def add_lagged_features(df, cols, lags=[1, 2, 3, 5, 7]):
    """Add lagged versions of selected columns."""
    for col in cols:
        for lag in lags:
            df[f"{col}_lag{lag}"] = df[col].shift(lag)
    return df

def scale_features(df, exclude_cols):
    """Scale numeric features except excluded ones."""
    scaler = StandardScaler()
    cols_to_scale = [c for c in df.columns if c not in exclude_cols]

    df_scaled = df.copy()
    df_scaled[cols_to_scale] = scaler.fit_transform(df_scaled[cols_to_scale])

    return df_scaled, scaler, cols_to_scale

def features_engineering():
    if not os.path.exists(PROCESSED_FILE):
        raise FileNotFoundError("Run build_features.py first to generate base features")

    df = pd.read_parquet(PROCESSED_FILE)

    # --- Step 1: Lagged features ---
    lag_cols = ["close", "volume", "sma_10", "ema_10", "rsi_14", "macd"]
    df = add_lagged_features(df, lag_cols)

    # Drop NaN rows caused by lagging
    df = df.dropna().reset_index(drop=True)

    # --- Step 2: Scaling ---
    exclude_cols = ["timestamp"]  # keep time intact
    df_scaled, scaler, scaled_cols = scale_features(df, exclude_cols)

    # --- Save outputs ---
    os.makedirs(FEATURES_DIR, exist_ok=True)
    df_scaled.to_parquet(OUTPUT_FILE, index=False)
    joblib.dump({"scaler": scaler, "scaled_cols": scaled_cols}, SCALER_FILE)

    print(f"Saved ML-ready features to {OUTPUT_FILE}")
    print(f"Saved scaler to {SCALER_FILE}")
    print(f"Shape: {df_scaled.shape}, Features: {len(df_scaled.columns)}")

features_engineering()

Saved ML-ready features to data/features\BTC-USD_daily_ml_ready.parquet
Saved scaler to data/features\scaler.pkl
Shape: (3710, 63), Features: 63
