In [None]:
import os
import pandas as pd

from src.data.preprocessors import clean_price_data
from src.data.feature_store import FeatureEngineer

csv_path = os.path.join('data', 'raw', 'prices.csv')
prices = pd.read_csv(csv_path, parse_dates=['date'])
prices = clean_price_data(prices)

fe = FeatureEngineer({})
features = fe.create_features(prices)
features = fe.create_target(features)
features = features.dropna()
print('Engineered rows:', len(features), 'features:', len(features.columns))

# Persist engineered dataset
out_parquet = os.path.join('data', 'processed', 'features.parquet')
try:
    features.to_parquet(out_parquet, index=False)
    print('Wrote', out_parquet)
except Exception as e:
    out_csv = os.path.join('data', 'processed', 'features.csv')
    features.to_csv(out_csv, index=False)
    print('Parquet unavailable, wrote', out_csv, 'Error:', e)


In [None]:
# 1) Sanity checks and sorting
assert {'symbol','date','close'}.issubset(set(prices.columns))
prices = prices.sort_values(['symbol','date']).reset_index(drop=True)
print('After sort rows:', len(prices))
print(prices.head(3))
print(prices.tail(3))

# Basic uniqueness check
uniq = prices[['symbol','date']].drop_duplicates()
print('Unique (symbol,date) pairs:', len(uniq))


In [None]:
# 2) Basic derived columns (returns)
prices['ret_1'] = prices.groupby('symbol')['close'].pct_change()
prices['ret_5'] = prices.groupby('symbol')['close'].pct_change(5)
prices['ret_10'] = prices.groupby('symbol')['close'].pct_change(10)
print(prices[['symbol','date','close','ret_1','ret_5','ret_10']].head(12))

# Clip extreme returns for stability
prices['ret_1'] = prices['ret_1'].clip(-0.3, 0.3)


In [None]:
# 3) Use FeatureEngineer to compute advanced features
from src.data.feature_store import FeatureEngineer
fe = FeatureEngineer({})
feat_df = fe.create_features(prices.copy())
print('Feature columns added:', len(feat_df.columns) - len(prices.columns))
print(feat_df.head(5))

# Keep only needed columns for modeling
core_cols = ['symbol','date','close']
feature_cols = [c for c in feat_df.columns if c not in core_cols]
print('Total features:', len(feature_cols))


In [None]:
# 4) Lag features and leakage checks
lag_cols = ['close','volume']
for c in lag_cols:
    for l in [1,2,3,5]:
        feat_df[f'{c}_lag_{l}'] = feat_df.groupby('symbol')[c].shift(l)

# Ensure target uses only future info later; no label leakage in features
print(feat_df[[c for c in feat_df.columns if 'lag' in c]].head(10))


In [None]:
# 5) Create modeling target
horizon = 1
threshold = 0.02
feat_df = fe.create_target(feat_df, horizon=horizon, threshold=threshold)

print(feat_df[['symbol','date','future_return','target']].head(12))
print('Target distribution:', feat_df['target'].value_counts().to_dict())


In [None]:
# 6) Handle missing values and final feature selection
model_cols = [c for c in feat_df.columns if c not in ['future_return','target_return']]
feat_df = feat_df[model_cols]

# Drop rows with missing critical inputs
feat_df = feat_df.dropna(subset=['target']).copy()
feat_df = feat_df.dropna()

# Separate metadata and features
meta_cols = ['symbol','date','target']
X_cols = [c for c in feat_df.columns if c not in meta_cols]
print('Final feature count:', len(X_cols))


In [None]:
# 7) Save feature matrix
import os
os.makedirs('data/processed', exist_ok=True)

out_path = 'data/processed/feature_matrix.parquet'
try:
    feat_df.to_parquet(out_path, index=False)
    print('Saved to', out_path)
except Exception as e:
    out_csv = 'data/processed/feature_matrix.csv'
    feat_df.to_csv(out_csv, index=False)
    print('Parquet unavailable; wrote CSV to', out_csv, 'Error:', e)


In [None]:
# 8) Versioned feature store example (SQLite schema)
import sqlite3
conn = sqlite3.connect('features.db')
version = 'v1'

# Store a small sample to demonstrate schema (JSON-like via CSV string)
sample = feat_df.head(5).copy()
sample['feature_version'] = version
sample.to_sql('features', conn, if_exists='append', index=False)
print('Inserted sample features with version', version)
conn.close()


In [None]:
# 9) Feature importance proxy (variance)
variances = feat_df[X_cols].var().sort_values(ascending=False)
print('Top 20 high variance features:')
print(variances.head(20))

# Low variance filter example
low_var = variances[variances < 1e-8].index.tolist()
print('Low variance features (to consider dropping):', low_var[:20])


In [None]:
# 10) Normalization example (z-score)
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
scaled = scaler.fit_transform(feat_df[X_cols])
scaled_df = pd.DataFrame(scaled, columns=X_cols)
scaled_df.insert(0, 'symbol', feat_df['symbol'].values)
scaled_df.insert(1, 'date', feat_df['date'].values)
scaled_df['target'] = feat_df['target'].values
print('Scaled shape:', scaled_df.shape)
print(scaled_df.head(5))


In [None]:
# 11) Train/validation split (time-based)
from sklearn.model_selection import TimeSeriesSplit

ts = TimeSeriesSplit(n_splits=3)
X = scaled_df[X_cols]
y = scaled_df['target']
for fold, (tr, va) in enumerate(ts.split(X)):
    print('Fold', fold, 'train:', len(tr), 'val:', len(va))
    # Just display a few rows
    print('Train head:\n', scaled_df.iloc[tr].head(2))
    print('Val head:\n', scaled_df.iloc[va].head(2))


In [None]:
# 12) Save scaler + metadata for reproducibility
import joblib

artifacts_dir = 'data/processed'
os.makedirs(artifacts_dir, exist_ok=True)
joblib.dump({'scaler': scaler, 'features': X_cols}, os.path.join(artifacts_dir, 'scaler_and_features.joblib'))
print('Saved scaler and feature list to data/processed/scaler_and_features.joblib')
