In [None]:
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime, timedelta
import requests
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

## 1. Data Collection

In [None]:
class DataCollector:
    """Unified data collection for crypto and traditional assets"""
    
    def __init__(self, start_date: str = "2017-01-01", end_date: Optional[str] = None):
        """
        Initialize data collector
        
        Parameters:
        -----------
        start_date : str
            Start date in 'YYYY-MM-DD' format
        end_date : str, optional
            End date in 'YYYY-MM-DD' format (default: today)
        """
        self.start_date = start_date
        self.end_date = end_date or datetime.now().strftime('%Y-%m-%d')
        
        # Asset universe definition
        self.crypto_spot = ['BTC-USD', 'ETH-USD', 'BNB-USD', 'SOL-USD', 'XRP-USD']
        self.crypto_memes = ['DOGE-USD', 'SHIB-USD', 'PEPE-USD']
        
        self.equities = ['^GSPC', '^RUT', 'URTH']  # S&P 500, Russell 2000, MSCI World proxy
        self.options = ['^SPX','^PUT'] # SPX
        self.bonds = ['AGG', 'IEF', 'TLT']
        self.real_estate = ['VNQ']
        self.commodities = ['GLD', 'DBC']
        self.volatility = ['^VIX']
        
        self.all_tickers = (self.equities + self.options + self.bonds + self.real_estate + 
                           self.commodities + self.volatility + self.crypto_spot + self.crypto_memes)
        
        self.risk_free_rate_ticker = '^IRX'  # 13-week T-bill
        
    def fetch_yfinance_data(self, tickers: List[str], 
                           data_type: str = 'Adj Close') -> pd.DataFrame:
        """
        Fetch data from Yahoo Finance
        
        Parameters:
        -----------
        tickers : list
            List of ticker symbols
        data_type : str
            Type of data to fetch ('Adj Close', 'Volume', etc.)
            
        Returns:
        --------
        pd.DataFrame
            DataFrame with tickers as columns and dates as index
        """
        print(f"Fetching {data_type} data for {len(tickers)} tickers...")
        
        try:
            data = yf.download(
                tickers,
                start=self.start_date,
                end=self.end_date,
                progress=False,
                auto_adjust=True
            )
            
            if len(tickers) == 1:
                result = pd.DataFrame({tickers[0]: data['Close']})
            else:
                result = data['Close'] if data_type == 'Adj Close' else data[data_type]
            
            print(f"✓ Successfully fetched {len(result)} days of data")
            return result
            
        except Exception as e:
            print(f"✗ Error fetching data: {e}")
            return pd.DataFrame()
    
    def fetch_all_prices(self) -> pd.DataFrame:
        """Fetch all asset prices"""
        prices = self.fetch_yfinance_data(self.all_tickers, 'Adj Close')
        return prices
    
    def fetch_all_volumes(self) -> pd.DataFrame:
        """Fetch all asset volumes"""
        volumes = self.fetch_yfinance_data(self.all_tickers, 'Volume')
        return volumes
    
    def fetch_risk_free_rate(self) -> pd.Series:
        """
        Fetch risk-free rate (13-week T-bill)
        
        Returns:
        --------
        pd.Series
            Daily risk-free rate
        """
        print("Fetching risk-free rate data...")
        rf_data = yf.download(self.risk_free_rate_ticker, 
                             start=self.start_date, 
                             end=self.end_date,
                             progress=False)
        
        # Convert annualized yield to daily
        rf_rate = rf_data['Close'] / 100 / 252
        return rf_rate
    
    def clean_and_align_data(self, prices: pd.DataFrame, 
                            volumes: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Clean and align price and volume data
        
        Parameters:
        -----------
        prices : pd.DataFrame
            Raw price data
        volumes : pd.DataFrame
            Raw volume data
            
        Returns:
        --------
        tuple
            (cleaned_prices, cleaned_volumes)
        """
        print("\nCleaning and aligning data...")
        
        # Forward fill missing values (max 5 days)
        prices_clean = prices.fillna(method='ffill', limit=5)
        volumes_clean = volumes.fillna(method='ffill', limit=5)
        
        # Drop columns with >20% missing data
        missing_threshold = 0.20
        prices_clean = prices_clean.loc[:, prices_clean.isnull().mean() < missing_threshold]
        volumes_clean = volumes_clean.loc[:, volumes_clean.isnull().mean() < missing_threshold]
        
        # Align on common columns
        common_cols = prices_clean.columns.intersection(volumes_clean.columns)
        prices_clean = prices_clean[common_cols]
        volumes_clean = volumes_clean[common_cols]
        
        # Drop any remaining rows with NaN
        prices_clean = prices_clean.dropna()
        volumes_clean = volumes_clean.loc[prices_clean.index]
        
        print(f"✓ Clean data: {len(prices_clean)} days, {len(common_cols)} assets")
        print(f"  Date range: {prices_clean.index[0]} to {prices_clean.index[-1]}")
        
        return prices_clean, volumes_clean
    
    def compute_returns(self, prices: pd.DataFrame, 
                       method: str = 'log') -> pd.DataFrame:
        """
        Compute returns from prices
        
        Parameters:
        -----------
        prices : pd.DataFrame
            Price data
        method : str
            'log' for log returns, 'simple' for simple returns
            
        Returns:
        --------
        pd.DataFrame
            Returns data
        """
        if method == 'log':
            returns = np.log(prices / prices.shift(1))
        else:
            returns = prices.pct_change()
        
        return returns.dropna()
    
    def compute_drawdowns(self, prices: pd.DataFrame) -> pd.DataFrame:
        """
        Compute drawdowns for each asset
        
        Parameters:
        -----------
        prices : pd.DataFrame
            Price data
            
        Returns:
        --------
        pd.DataFrame
            Drawdown series for each asset
        """
        # Compute running maximum
        running_max = prices.expanding().max()
        
        # Compute drawdown
        drawdowns = (prices - running_max) / running_max
        
        return drawdowns
    
    def get_dataset(self, include_volumes: bool = True) -> Dict:
        """
        Main method to get complete dataset
        
        Parameters:
        -----------
        include_volumes : bool
            Whether to include volume data
            
        Returns:
        --------
        dict
            Dictionary containing all processed data
        """
        print("="*60)
        print("COLLECTING AND PROCESSING DATA")
        print("="*60)
        
        # Fetch data
        prices = self.fetch_all_prices()
        volumes = self.fetch_all_volumes() if include_volumes else None
        rf_rate = self.fetch_risk_free_rate()
        
        # Clean and align
        if include_volumes:
            prices_clean, volumes_clean = self.clean_and_align_data(prices, volumes)
        else:
            prices_clean = prices.dropna()
            volumes_clean = None
        
        # Compute derived metrics
        log_returns = self.compute_returns(prices_clean, method='log')
        simple_returns = self.compute_returns(prices_clean, method='simple')
        drawdowns = self.compute_drawdowns(prices_clean)
        
        # Align risk-free rate
        rf_rate_aligned = rf_rate.reindex(log_returns.index, method='ffill')
        
        dataset = {
            'prices': prices_clean,
            'volumes': volumes_clean,
            'log_returns': log_returns,
            'simple_returns': simple_returns,
            'drawdowns': drawdowns,
            'risk_free_rate': rf_rate_aligned,
            'metadata': {
                'start_date': prices_clean.index[0],
                'end_date': prices_clean.index[-1],
                'n_assets': len(prices_clean.columns),
                'n_observations': len(prices_clean),
                'assets': prices_clean.columns.tolist()
            }
        }
        
        print("\n" + "="*60)
        print("DATA COLLECTION COMPLETE")
        print("="*60)
        print(f"Assets: {dataset['metadata']['n_assets']}")
        print(f"Observations: {dataset['metadata']['n_observations']}")
        print(f"Period: {dataset['metadata']['start_date'].date()} to {dataset['metadata']['end_date'].date()}")
        print("="*60 + "\n")
        
        return dataset


def save_dataset(dataset: Dict, output_dir: str = './data'):
    """Save dataset to disk"""
    import os
    os.makedirs(output_dir, exist_ok=True)
    
    dataset['prices'].to_csv(f'{output_dir}/prices.csv')
    dataset['log_returns'].to_csv(f'{output_dir}/log_returns.csv')
    dataset['simple_returns'].to_csv(f'{output_dir}/simple_returns.csv')
    dataset['drawdowns'].to_csv(f'{output_dir}/drawdowns.csv')
    
    if dataset['volumes'] is not None:
        dataset['volumes'].to_csv(f'{output_dir}/volumes.csv')
    
    print(f"✓ Dataset saved to {output_dir}/")

In [None]:
start_date = "2017-01-01"
output_dir = "result"

In [None]:
# 1. Collecting Data

collector = DataCollector(start_date = start_date)
data = collector.get_dataset()

save_dataset(data, output_dir=f"{output_dir}/data")