# Feature Engineering Test - 25 Trades

Testing the feature engineering pipeline on a small sample before running on 100k trades.

**What this does:**
1. Classify asset types (stock/treasury/etf)
2. Extract market features for each type
3. Calculate returns, fundamentals, technicals, events
4. Output wide dataset ready for ML

## Setup

In [None]:
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# For tech indicators
try:
    import ta
except:
    print("Installing ta library...")
    !pip install ta --quiet
    import ta

print("Ready")

## Load Test Data

Load your trades CSV and take first 25 rows

In [None]:
# Load your data
# df_full = pd.read_csv('your_trades.csv')

# For testing, create sample data structure
# Replace this with: df = df_full.head(25)

df = pd.DataFrame({
    'date': ['2024-01-15', '2024-01-20', '2024-02-01'],
    'ticker': ['AAPL', 'TLT', 'SPY'],
    'Company': ['Apple Inc.', 'iShares 20+ Year Treasury Bond ETF', 'SPDR S&P 500 ETF Trust'],
    'transaction_type': ['Purchase', 'Sale', 'Purchase'],
    'amount': [50000, 100000, 75000],
    'member_name': ['Rep. Smith', 'Sen. Jones', 'Rep. Davis']
})

# Convert date
df['date'] = pd.to_datetime(df['date'])

print(f"Testing with {len(df)} trades")
print(f"Date range: {df['date'].min()} to {df['date'].max()}")
df.head()

## Step 1: Classify Asset Types

In [None]:
def classify_asset(ticker, company_name):
    """
    Figure out what kind of asset this is
    """
    ticker_upper = str(ticker).upper()
    company_upper = str(company_name).upper()
    
    # Treasury bonds
    treasury_keywords = ['TREASURY', 'T-BILL', 'US GOVT', 'GOVERNMENT BOND']
    if any(kw in company_upper for kw in treasury_keywords):
        return 'treasury'
    
    # ETFs
    etf_keywords = ['ETF', 'ISHARES', 'VANGUARD', 'SPDR', 'INVESCO', 'PROSHARES']
    if any(kw in company_upper for kw in etf_keywords):
        return 'etf'
    
    # Property transactions (skip these)
    property_keywords = ['PROPERTY', 'REAL ESTATE TRANSACTION', 'LAND']
    if any(kw in company_upper for kw in property_keywords):
        return 'property'
    
    # Corporate bonds (not treasury, but has BOND)
    if 'BOND' in company_upper and 'ETF' not in company_upper:
        return 'corporate_bond'
    
    # Everything else is a stock
    return 'stock'

# Apply classification
df['asset_type'] = df.apply(lambda x: classify_asset(x['ticker'], x['Company']), axis=1)

# Summary
print("\nAsset type distribution:")
print(df['asset_type'].value_counts())

# Filter out property transactions
df = df[df['asset_type'] != 'property'].copy()
print(f"\nAfter filtering: {len(df)} trades")

## Step 2: Download Price Data (Batch)

Download all historical data at once to avoid repeated API calls

In [None]:
def download_all_price_data(df, lookback_years=3):
    """
    Download price history for all unique tickers
    Returns dict: {ticker: DataFrame}
    """
    # Get date range
    min_date = df['date'].min() - timedelta(days=lookback_years * 365)
    max_date = df['date'].max() + timedelta(days=200)  # For forward returns
    
    print(f"Downloading data from {min_date.date()} to {max_date.date()}")
    
    # Get unique tickers
    tickers = df['ticker'].unique()
    print(f"Unique tickers: {len(tickers)}")
    
    price_data = {}
    
    for ticker in tickers:
        try:
            print(f"  Downloading {ticker}...", end=" ")
            data = yf.download(ticker, start=min_date, end=max_date, progress=False)
            
            if len(data) > 0:
                price_data[ticker] = data
                print(f"✓ {len(data)} days")
            else:
                print("✗ No data")
                
        except Exception as e:
            print(f"✗ Error: {e}")
    
    return price_data

# Download
price_data = download_all_price_data(df)
print(f"\nSuccessfully downloaded {len(price_data)} tickers")

## Step 3: Feature Functions

Each function extracts specific features from the cached price data

### 3A. Returns (backward and forward)

In [None]:
def get_returns(ticker, trade_date, price_data):
    """
    Calculate returns before and after trade date
    """
    if ticker not in price_data:
        return {}
    
    data = price_data[ticker]
    
    # Find closest date (in case trade_date is weekend/holiday)
    dates = data.index
    closest_idx = dates.searchsorted(trade_date)
    
    if closest_idx >= len(dates) or closest_idx == 0:
        return {}
    
    trade_idx = closest_idx
    trade_price = data.iloc[trade_idx]['Close']
    
    features = {'entry_price': trade_price}
    
    # Backward returns (what happened before)
    periods_back = [5, 20, 60, 120, 252]  # ~1W, 1M, 3M, 6M, 1Y
    for period in periods_back:
        if trade_idx >= period:
            past_price = data.iloc[trade_idx - period]['Close']
            ret = (trade_price - past_price) / past_price
            features[f'return_{period}d_back'] = ret
    
    # Forward returns (what happened after) - THIS IS YOUR TARGET
    periods_fwd = [5, 20, 60, 120]
    for period in periods_fwd:
        if trade_idx + period < len(data):
            future_price = data.iloc[trade_idx + period]['Close']
            ret = (future_price - trade_price) / trade_price
            features[f'return_{period}d_fwd'] = ret
            
            # Also track max gain/loss in that window
            window = data.iloc[trade_idx:trade_idx + period + 1]
            max_price = window['High'].max()
            min_price = window['Low'].min()
            
            features[f'max_gain_{period}d'] = (max_price - trade_price) / trade_price
            features[f'max_loss_{period}d'] = (min_price - trade_price) / trade_price
    
    return features

# Test
test_ret = get_returns('AAPL', datetime(2024, 1, 15), price_data)
print("Sample return features:")
for k, v in list(test_ret.items())[:5]:
    print(f"  {k}: {v:.4f}")

### 3B. Volatility and Risk

In [None]:
def get_volatility_metrics(ticker, trade_date, price_data):
    """
    Volatility and risk measures
    """
    if ticker not in price_data:
        return {}
    
    data = price_data[ticker]
    dates = data.index
    trade_idx = dates.searchsorted(trade_date)
    
    if trade_idx == 0 or trade_idx >= len(dates):
        return {}
    
    # Get lookback window
    lookback = 60  # 3 months
    if trade_idx < lookback:
        return {}
    
    window = data.iloc[trade_idx - lookback:trade_idx]
    returns = window['Close'].pct_change().dropna()
    
    features = {}
    
    # Historical volatility (annualized)
    features['volatility_60d'] = returns.std() * np.sqrt(252)
    
    # Max drawdown
    cumulative = (1 + returns).cumprod()
    running_max = cumulative.expanding().max()
    drawdown = (cumulative - running_max) / running_max
    features['max_drawdown_60d'] = drawdown.min()
    
    # Downside deviation (only negative returns)
    negative_returns = returns[returns < 0]
    if len(negative_returns) > 0:
        features['downside_dev_60d'] = negative_returns.std() * np.sqrt(252)
    else:
        features['downside_dev_60d'] = 0
    
    # VaR (Value at Risk - 5th percentile)
    features['var_95_60d'] = returns.quantile(0.05)
    
    # Percent of down days
    features['pct_down_days_60d'] = (returns < 0).sum() / len(returns)
    
    return features

# Test
test_vol = get_volatility_metrics('AAPL', datetime(2024, 1, 15), price_data)
print("\nSample volatility features:")
for k, v in test_vol.items():
    print(f"  {k}: {v:.4f}")

### 3C. Technical Indicators

In [None]:
def get_technical_indicators(ticker, trade_date, price_data):
    """
    RSI, MACD, moving averages, etc.
    """
    if ticker not in price_data:
        return {}
    
    data = price_data[ticker].copy()
    dates = data.index
    trade_idx = dates.searchsorted(trade_date)
    
    if trade_idx == 0 or trade_idx >= len(dates):
        return {}
    
    # Need enough history for 200-day MA
    if trade_idx < 200:
        return {}
    
    # Get window up to trade date
    hist_data = data.iloc[:trade_idx + 1]
    
    # Moving averages
    hist_data['sma_20'] = hist_data['Close'].rolling(20).mean()
    hist_data['sma_50'] = hist_data['Close'].rolling(50).mean()
    hist_data['sma_200'] = hist_data['Close'].rolling(200).mean()
    
    # RSI
    hist_data['rsi'] = ta.momentum.RSIIndicator(hist_data['Close'], window=14).rsi()
    
    # MACD
    macd = ta.trend.MACD(hist_data['Close'])
    hist_data['macd'] = macd.macd()
    hist_data['macd_signal'] = macd.macd_signal()
    hist_data['macd_diff'] = macd.macd_diff()
    
    # Bollinger Bands
    bb = ta.volatility.BollingerBands(hist_data['Close'])
    hist_data['bb_upper'] = bb.bollinger_hband()
    hist_data['bb_lower'] = bb.bollinger_lband()
    hist_data['bb_middle'] = bb.bollinger_mavg()
    
    # Get values at trade date (last row)
    trade_row = hist_data.iloc[-1]
    
    features = {
        # Moving averages
        'sma_20': trade_row['sma_20'],
        'sma_50': trade_row['sma_50'],
        'sma_200': trade_row['sma_200'],
        
        # Price vs MAs (>1 = above MA)
        'price_vs_sma20': trade_row['Close'] / trade_row['sma_20'] if trade_row['sma_20'] > 0 else np.nan,
        'price_vs_sma50': trade_row['Close'] / trade_row['sma_50'] if trade_row['sma_50'] > 0 else np.nan,
        'price_vs_sma200': trade_row['Close'] / trade_row['sma_200'] if trade_row['sma_200'] > 0 else np.nan,
        
        # RSI (overbought >70, oversold <30)
        'rsi': trade_row['rsi'],
        'rsi_overbought': 1 if trade_row['rsi'] > 70 else 0,
        'rsi_oversold': 1 if trade_row['rsi'] < 30 else 0,
        
        # MACD
        'macd': trade_row['macd'],
        'macd_signal': trade_row['macd_signal'],
        'macd_diff': trade_row['macd_diff'],
        'macd_bullish': 1 if trade_row['macd_diff'] > 0 else 0,
        
        # Bollinger position (0=lower band, 1=upper band)
        'bb_position': (trade_row['Close'] - trade_row['bb_lower']) / (trade_row['bb_upper'] - trade_row['bb_lower']) if (trade_row['bb_upper'] - trade_row['bb_lower']) > 0 else 0.5,
    }
    
    return features

# Test
test_tech = get_technical_indicators('AAPL', datetime(2024, 1, 15), price_data)
print("\nSample technical indicators:")
for k, v in list(test_tech.items())[:5]:
    print(f"  {k}: {v}")

### 3D. Fundamentals (from yfinance .info)

In [None]:
def get_fundamentals(ticker):
    """
    Get fundamental data - note: this is CURRENT, not historical
    For proper backtest, you'd need point-in-time data
    """
    try:
        stock = yf.Ticker(ticker)
        info = stock.info
        
        features = {
            # Valuation
            'pe_ratio': info.get('trailingPE', np.nan),
            'forward_pe': info.get('forwardPE', np.nan),
            'pb_ratio': info.get('priceToBook', np.nan),
            'ps_ratio': info.get('priceToSalesTrailing12Months', np.nan),
            'peg_ratio': info.get('pegRatio', np.nan),
            
            # Size
            'market_cap': info.get('marketCap', np.nan),
            
            # Profitability
            'roe': info.get('returnOnEquity', np.nan),
            'roa': info.get('returnOnAssets', np.nan),
            'profit_margin': info.get('profitMargins', np.nan),
            'operating_margin': info.get('operatingMargins', np.nan),
            
            # Financial health
            'debt_to_equity': info.get('debtToEquity', np.nan),
            'current_ratio': info.get('currentRatio', np.nan),
            
            # Dividends
            'dividend_yield': info.get('dividendYield', 0),
            
            # Growth
            'revenue_growth': info.get('revenueGrowth', np.nan),
            'earnings_growth': info.get('earningsGrowth', np.nan),
            
            # Risk
            'beta': info.get('beta', np.nan),
            
            # Sector
            'sector': info.get('sector', 'Unknown'),
            'industry': info.get('industry', 'Unknown'),
        }
        
        return features
        
    except Exception as e:
        print(f"Error getting fundamentals for {ticker}: {e}")
        return {}

# Test
test_fund = get_fundamentals('AAPL')
print("\nSample fundamentals:")
for k, v in list(test_fund.items())[:5]:
    print(f"  {k}: {v}")

### 3E. Market Context (SPY, VIX)

In [None]:
def get_market_context(trade_date, price_data):
    """
    What was the overall market doing?
    """
    features = {}
    
    # S&P 500 (using SPY as proxy)
    if 'SPY' in price_data:
        spy = price_data['SPY']
        dates = spy.index
        trade_idx = dates.searchsorted(trade_date)
        
        if trade_idx > 0 and trade_idx < len(spy):
            spy_price = spy.iloc[trade_idx]['Close']
            
            # Calculate SPY 200-day MA
            if trade_idx >= 200:
                spy_sma200 = spy.iloc[trade_idx - 200:trade_idx + 1]['Close'].mean()
                features['spy_vs_sma200'] = spy_price / spy_sma200
                features['market_bull'] = 1 if spy_price > spy_sma200 else 0
            
            # SPY returns
            if trade_idx >= 20:
                spy_20d_ago = spy.iloc[trade_idx - 20]['Close']
                features['spy_return_20d'] = (spy_price - spy_20d_ago) / spy_20d_ago
            
            if trade_idx >= 60:
                spy_60d_ago = spy.iloc[trade_idx - 60]['Close']
                features['spy_return_60d'] = (spy_price - spy_60d_ago) / spy_60d_ago
    
    # VIX (fear index)
    if '^VIX' in price_data:
        vix = price_data['^VIX']
        dates = vix.index
        trade_idx = dates.searchsorted(trade_date)
        
        if trade_idx > 0 and trade_idx < len(vix):
            vix_level = vix.iloc[trade_idx]['Close']
            features['vix_level'] = vix_level
            features['vix_high'] = 1 if vix_level > 20 else 0
            features['vix_extreme'] = 1 if vix_level > 30 else 0
    
    return features

# Need to download SPY and VIX first
print("Downloading market indices...")
if 'SPY' not in price_data:
    min_date = df['date'].min() - timedelta(days=365)
    max_date = df['date'].max() + timedelta(days=30)
    price_data['SPY'] = yf.download('SPY', start=min_date, end=max_date, progress=False)
    price_data['^VIX'] = yf.download('^VIX', start=min_date, end=max_date, progress=False)

# Test
test_mkt = get_market_context(datetime(2024, 1, 15), price_data)
print("\nSample market context:")
for k, v in test_mkt.items():
    print(f"  {k}: {v}")

### 3F. Relative Performance (vs Market)

In [None]:
def get_relative_performance(ticker, trade_date, price_data):
    """
    How is this stock doing vs the market?
    """
    if ticker not in price_data or 'SPY' not in price_data:
        return {}
    
    stock_data = price_data[ticker]
    spy_data = price_data['SPY']
    
    # Get aligned data up to trade date
    stock_dates = stock_data.index
    trade_idx = stock_dates.searchsorted(trade_date)
    
    if trade_idx < 60 or trade_idx >= len(stock_data):
        return {}
    
    # 60-day window
    stock_window = stock_data.iloc[trade_idx - 60:trade_idx + 1]['Close']
    
    # Get SPY for same dates
    spy_aligned = spy_data.reindex(stock_window.index, method='ffill')
    
    if len(spy_aligned) == 0:
        return {}
    
    # Calculate returns
    stock_ret = (stock_window.iloc[-1] - stock_window.iloc[0]) / stock_window.iloc[0]
    spy_ret = (spy_aligned['Close'].iloc[-1] - spy_aligned['Close'].iloc[0]) / spy_aligned['Close'].iloc[0]
    
    # Beta (simplified - just correlation * (stock_vol / spy_vol))
    stock_returns = stock_window.pct_change().dropna()
    spy_returns = spy_aligned['Close'].pct_change().dropna()
    
    # Align
    combined = pd.DataFrame({
        'stock': stock_returns,
        'spy': spy_returns
    }).dropna()
    
    if len(combined) > 10:
        covariance = combined['stock'].cov(combined['spy'])
        spy_var = combined['spy'].var()
        beta = covariance / spy_var if spy_var > 0 else 1
    else:
        beta = np.nan
    
    features = {
        'stock_return_60d': stock_ret,
        'spy_return_60d_aligned': spy_ret,
        'relative_strength_60d': stock_ret - spy_ret,
        'outperforming': 1 if stock_ret > spy_ret else 0,
        'beta_60d': beta,
    }
    
    return features

# Test
test_rel = get_relative_performance('AAPL', datetime(2024, 1, 15), price_data)
print("\nSample relative performance:")
for k, v in test_rel.items():
    print(f"  {k}: {v}")

### 3G. Corporate Events

In [None]:
def get_corporate_events(ticker, trade_date):
    """
    Proximity to earnings, dividends, etc.
    """
    try:
        stock = yf.Ticker(ticker)
        
        features = {
            'has_earnings_date': 0,
            'days_to_earnings': np.nan,
            'near_earnings_30d': 0,
        }
        
        # Try to get earnings calendar
        try:
            calendar = stock.calendar
            if calendar is not None and 'Earnings Date' in calendar:
                earnings_date = calendar['Earnings Date']
                if isinstance(earnings_date, pd.Timestamp):
                    days_diff = (earnings_date - trade_date).days
                    features['has_earnings_date'] = 1
                    features['days_to_earnings'] = days_diff
                    features['near_earnings_30d'] = 1 if abs(days_diff) <= 30 else 0
        except:
            pass
        
        # Dividends
        try:
            dividends = stock.dividends
            if dividends is not None and len(dividends) > 0:
                # Find most recent dividend before trade
                past_divs = dividends[dividends.index <= trade_date]
                if len(past_divs) > 0:
                    last_div_date = past_divs.index[-1]
                    days_since = (trade_date - last_div_date).days
                    features['days_since_dividend'] = days_since
        except:
            pass
        
        return features
        
    except Exception as e:
        return {}

# Test
test_events = get_corporate_events('AAPL', datetime(2024, 1, 15))
print("\nSample corporate events:")
for k, v in test_events.items():
    print(f"  {k}: {v}")

### 3H. Entry Quality Metrics

In [None]:
def get_entry_quality(ticker, trade_date, price_data):
    """
    Did they buy at the bottom or top of recent range?
    """
    if ticker not in price_data:
        return {}
    
    data = price_data[ticker]
    dates = data.index
    trade_idx = dates.searchsorted(trade_date)
    
    if trade_idx < 120 or trade_idx >= len(data):
        return {}
    
    trade_price = data.iloc[trade_idx]['Close']
    
    features = {}
    
    # 6-month (120 trading days) range
    window = data.iloc[trade_idx - 120:trade_idx + 1]
    price_max = window['High'].max()
    price_min = window['Low'].min()
    
    # Where in the range? (0 = bottom, 100 = top)
    if price_max > price_min:
        percentile = ((trade_price - price_min) / (price_max - price_min)) * 100
        features['price_percentile_120d'] = percentile
        features['bought_near_bottom'] = 1 if percentile < 20 else 0
        features['bought_near_top'] = 1 if percentile > 80 else 0
    
    # Volume percentile
    trade_volume = data.iloc[trade_idx]['Volume']
    avg_volume = window['Volume'].mean()
    features['volume_ratio'] = trade_volume / avg_volume if avg_volume > 0 else 1
    
    return features

# Test
test_entry = get_entry_quality('AAPL', datetime(2024, 1, 15), price_data)
print("\nSample entry quality:")
for k, v in test_entry.items():
    print(f"  {k}: {v}")

## Step 4: Master Feature Extraction

Combine all feature functions based on asset type

In [None]:
def extract_all_features(row, price_data):
    """
    Extract all relevant features for a single trade
    """
    ticker = row['ticker']
    trade_date = row['date']
    asset_type = row['asset_type']
    
    # Start with the original row data
    features = row.to_dict()
    
    # Market context (always include)
    features.update(get_market_context(trade_date, price_data))
    
    # Asset-specific features
    if asset_type == 'stock':
        features.update(get_returns(ticker, trade_date, price_data))
        features.update(get_volatility_metrics(ticker, trade_date, price_data))
        features.update(get_technical_indicators(ticker, trade_date, price_data))
        features.update(get_fundamentals(ticker))
        features.update(get_relative_performance(ticker, trade_date, price_data))
        features.update(get_corporate_events(ticker, trade_date))
        features.update(get_entry_quality(ticker, trade_date, price_data))
    
    elif asset_type == 'treasury':
        # For treasuries: returns, yields, duration
        features.update(get_returns(ticker, trade_date, price_data))
        features.update(get_volatility_metrics(ticker, trade_date, price_data))
        # Add treasury-specific features here (yield curve, etc.)
    
    elif asset_type == 'etf':
        # ETFs: subset of stock features
        features.update(get_returns(ticker, trade_date, price_data))
        features.update(get_volatility_metrics(ticker, trade_date, price_data))
        features.update(get_technical_indicators(ticker, trade_date, price_data))
        features.update(get_relative_performance(ticker, trade_date, price_data))
    
    return features

## Step 5: Process All Trades

In [None]:
print(f"Processing {len(df)} trades...\n")

enriched_trades = []

for idx, row in df.iterrows():
    print(f"[{idx + 1}/{len(df)}] {row['ticker']} ({row['asset_type']}) on {row['date'].date()}")
    
    try:
        features = extract_all_features(row, price_data)
        enriched_trades.append(features)
        print(f"  ✓ Extracted {len(features)} features")
    except Exception as e:
        print(f"  ✗ Error: {e}")
        # Add original row with NaNs for features
        enriched_trades.append(row.to_dict())

# Convert to DataFrame
df_enriched = pd.DataFrame(enriched_trades)

print(f"\n✓ Done! Final dataset: {df_enriched.shape}")

## Step 6: Inspect Results

In [None]:
print("Dataset shape:", df_enriched.shape)
print(f"\nColumns: {len(df_enriched.columns)}")
print("\nColumn list:")
print(df_enriched.columns.tolist())

In [None]:
# Look at a sample row
print("Sample trade (first stock):")
stock_trades = df_enriched[df_enriched['asset_type'] == 'stock']
if len(stock_trades) > 0:
    sample = stock_trades.iloc[0]
    
    # Show key features
    key_features = [
        'ticker', 'date', 'transaction_type', 'entry_price',
        'return_20d_back', 'return_20d_fwd',
        'pe_ratio', 'rsi', 'beta_60d',
        'price_percentile_120d', 'near_earnings_30d'
    ]
    
    for feat in key_features:
        if feat in sample:
            print(f"  {feat}: {sample[feat]}")

In [None]:
# Check missing data
print("\nMissing data summary:")
missing_pct = (df_enriched.isnull().sum() / len(df_enriched) * 100).sort_values(ascending=False)
print(missing_pct.head(20))

In [None]:
# Summary stats for key numeric features
numeric_cols = df_enriched.select_dtypes(include=[np.number]).columns
print(f"\nNumeric features: {len(numeric_cols)}")
print("\nSummary stats for returns:")
return_cols = [c for c in numeric_cols if 'return' in c]
if len(return_cols) > 0:
    print(df_enriched[return_cols].describe())

## Step 7: Save Results

In [None]:
# Save to CSV
output_file = 'enriched_trades_test.csv'
df_enriched.to_csv(output_file, index=False)
print(f"✓ Saved to {output_file}")

# Also save by asset type
for asset_type in df_enriched['asset_type'].unique():
    subset = df_enriched[df_enriched['asset_type'] == asset_type]
    filename = f'enriched_{asset_type}_trades_test.csv'
    subset.to_csv(filename, index=False)
    print(f"✓ Saved {len(subset)} {asset_type} trades to {filename}")

## Summary

**What we built:**
- Asset classification (stock/treasury/etf)
- ~100-120 features per stock trade
- Returns (backward/forward), volatility, technicals, fundamentals
- Market context, relative performance, corporate events
- Entry quality metrics

**Next steps:**
1. Review the output CSVs
2. Check feature quality and missing data patterns
3. Adjust feature functions if needed
4. Scale up to full 100k trades
5. Add treasury-specific features
6. Calculate abnormal returns (CAPM, Fama-French)