# 02. Анализ признаков и Feature Engineering

Этот ноутбук посвящен анализу технических индикаторов и созданию признаков для модели

In [None]:
# Импорты
import sys
sys.path.append('..')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.feature_selection import mutual_info_regression, SelectKBest, f_regression
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')

# Настройка визуализации
plt.style.use('seaborn-v0_8-darkgrid')
plt.rcParams['figure.figsize'] = (14, 8)
plt.rcParams['font.size'] = 10

# Импорт модулей проекта
from data.data_loader import CryptoDataLoader
from data.feature_engineering import FeatureEngineer
from data.preprocessor import DataPreprocessor
import yaml

# Загрузка конфигурации
with open('../config/config.yaml', 'r') as f:
    config = yaml.safe_load(f)

print("Библиотеки загружены успешно!")

## 1. Загрузка и подготовка данных

In [None]:
# Загрузка данных
data_loader = CryptoDataLoader(config)
df = data_loader.load_raw_data(symbols=['BTCUSDT', 'ETHUSDT'], limit=50000)

print(f"Загружено {len(df)} записей")
print(f"Символы: {df['symbol'].unique()}")
print(f"Период: {df['datetime'].min()} - {df['datetime'].max()}")

In [None]:
# Создание признаков
feature_engineer = FeatureEngineer(config)
df_features = feature_engineer.create_features(df)

print(f"\nСоздано признаков: {len(df_features.columns) - len(df.columns)}")
print(f"Всего колонок: {len(df_features.columns)}")
print(f"\nНовые категории признаков:")

# Группировка признаков по категориям
feature_categories = {
    'Price': [col for col in df_features.columns if any(x in col for x in ['sma', 'ema', 'price'])],
    'Momentum': [col for col in df_features.columns if any(x in col for x in ['rsi', 'stoch', 'momentum', 'roc'])],
    'Volatility': [col for col in df_features.columns if any(x in col for x in ['atr', 'bb_', 'volatility'])],
    'Volume': [col for col in df_features.columns if any(x in col for x in ['volume', 'obv', 'cmf', 'mfi'])],
    'Trend': [col for col in df_features.columns if any(x in col for x in ['adx', 'aroon', 'macd', 'ichimoku'])],
    'Microstructure': [col for col in df_features.columns if any(x in col for x in ['spread', 'imbalance', 'pressure'])],
    'Returns': [col for col in df_features.columns if 'return' in col],
    'Targets': [col for col in df_features.columns if 'target' in col or 'future' in col]
}

for category, features in feature_categories.items():
    if features:
        print(f"  {category}: {len(features)} признаков")

## 2. Анализ распределения признаков

In [None]:
# Выбираем числовые признаки для анализа
numeric_features = df_features.select_dtypes(include=[np.number]).columns
feature_cols = [col for col in numeric_features if col not in ['id', 'timestamp'] and 'target' not in col]

# Статистика по признакам
feature_stats = df_features[feature_cols].describe().T
feature_stats['skew'] = df_features[feature_cols].skew()
feature_stats['kurtosis'] = df_features[feature_cols].kurtosis()

# Топ признаков по скошенности
print("Топ-10 признаков с наибольшей скошенностью:")
feature_stats.sort_values('skew', ascending=False).head(10)[['mean', 'std', 'skew', 'kurtosis']]

In [None]:
# Визуализация распределений для разных категорий
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.ravel()

# Выбираем по одному признаку из каждой категории
sample_features = {
    'RSI': 'rsi_14',
    'ATR': 'atr_14',
    'Volume': 'volume_sma_ratio',
    'MACD': 'macd_signal',
    'Bollinger': 'bb_width',
    'Returns': 'returns_1'
}

for idx, (name, col) in enumerate(sample_features.items()):
    if col in df_features.columns:
        data = df_features[col].dropna()
        axes[idx].hist(data, bins=50, alpha=0.7, edgecolor='black')
        axes[idx].axvline(data.mean(), color='red', linestyle='--', label='Mean')
        axes[idx].axvline(data.median(), color='green', linestyle='--', label='Median')
        axes[idx].set_title(f'{name} Distribution')
        axes[idx].set_xlabel(col)
        axes[idx].set_ylabel('Frequency')
        axes[idx].legend()

plt.tight_layout()
plt.show()

## 3. Корреляционный анализ признаков

In [None]:
# Выбираем подмножество признаков для корреляционной матрицы
selected_features = []
for category, features in feature_categories.items():
    if category not in ['Targets'] and features:
        # Берем первые 2 признака из каждой категории
        selected_features.extend([f for f in features[:2] if f in df_features.columns])

# Добавляем целевую переменную
if 'future_return_4h' in df_features.columns:
    selected_features.append('future_return_4h')

# Корреляционная матрица
correlation_matrix = df_features[selected_features].corr()

plt.figure(figsize=(14, 12))
mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
sns.heatmap(correlation_matrix, mask=mask, annot=True, fmt='.2f', 
            cmap='coolwarm', center=0, square=True, linewidths=1,
            cbar_kws={"shrink": 0.8})
plt.title('Корреляционная матрица признаков')
plt.tight_layout()
plt.show()

In [None]:
# Поиск высококоррелированных признаков
def find_highly_correlated_features(correlation_matrix, threshold=0.9):
    corr_pairs = []
    for i in range(len(correlation_matrix.columns)):
        for j in range(i+1, len(correlation_matrix.columns)):
            if abs(correlation_matrix.iloc[i, j]) > threshold:
                corr_pairs.append({
                    'feature1': correlation_matrix.columns[i],
                    'feature2': correlation_matrix.columns[j],
                    'correlation': correlation_matrix.iloc[i, j]
                })
    return pd.DataFrame(corr_pairs)

# Полная корреляционная матрица
full_corr_matrix = df_features[feature_cols].corr()
highly_correlated = find_highly_correlated_features(full_corr_matrix, threshold=0.95)

print("Высококоррелированные пары признаков (|корреляция| > 0.95):")
if len(highly_correlated) > 0:
    print(highly_correlated.sort_values('correlation', ascending=False))
else:
    print("Не найдено пар с корреляцией выше 0.95")

## 4. Анализ важности признаков

In [None]:
# Подготовка данных для анализа важности
df_clean = df_features.dropna(subset=['future_return_4h'])

# Разделение на признаки и целевую переменную
feature_cols_clean = [col for col in feature_cols if col in df_clean.columns]
X = df_clean[feature_cols_clean]
y = df_clean['future_return_4h']

# Заполнение пропущенных значений
X = X.fillna(X.mean())

# Mutual Information
mi_scores = mutual_info_regression(X, y, random_state=42)
mi_scores = pd.Series(mi_scores, index=X.columns).sort_values(ascending=False)

# F-статистика
f_selector = SelectKBest(f_regression, k='all')
f_selector.fit(X, y)
f_scores = pd.Series(f_selector.scores_, index=X.columns).sort_values(ascending=False)

# Визуализация топ-20 важных признаков
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

# Mutual Information
mi_scores.head(20).plot(kind='barh', ax=ax1)
ax1.set_title('Топ-20 признаков по Mutual Information')
ax1.set_xlabel('MI Score')

# F-статистика
f_scores.head(20).plot(kind='barh', ax=ax2, color='orange')
ax2.set_title('Топ-20 признаков по F-статистике')
ax2.set_xlabel('F Score')

plt.tight_layout()
plt.show()

## 5. Временной анализ признаков

In [None]:
# Выбираем BTCUSDT для временного анализа
btc_data = df_features[df_features['symbol'] == 'BTCUSDT'].copy()
btc_data = btc_data.sort_values('datetime')

# Выбираем несколько ключевых признаков
key_features = ['rsi_14', 'macd_signal', 'atr_14', 'volume_sma_ratio', 'bb_width']

# Создаем subplots
fig, axes = plt.subplots(len(key_features) + 1, 1, figsize=(14, 12), sharex=True)

# График цены
axes[0].plot(btc_data['datetime'], btc_data['close'], label='Close Price', color='black', linewidth=1)
axes[0].set_ylabel('Price')
axes[0].set_title('BTCUSDT - Цена и ключевые индикаторы')
axes[0].grid(True, alpha=0.3)
axes[0].legend()

# Графики индикаторов
for idx, feature in enumerate(key_features, 1):
    if feature in btc_data.columns:
        axes[idx].plot(btc_data['datetime'], btc_data[feature], label=feature, linewidth=1)
        axes[idx].set_ylabel(feature)
        axes[idx].grid(True, alpha=0.3)
        axes[idx].legend()

axes[-1].set_xlabel('Date')
plt.tight_layout()
plt.show()

In [None]:
# Анализ стабильности признаков во времени
# Разделяем данные на периоды
btc_data['month'] = pd.to_datetime(btc_data['datetime']).dt.to_period('M')

# Статистика признаков по месяцам
monthly_stats = btc_data.groupby('month')[key_features].agg(['mean', 'std'])

# Визуализация изменения средних значений
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
axes = axes.ravel()

for idx, feature in enumerate(key_features[:6]):
    if feature in btc_data.columns:
        monthly_mean = monthly_stats[feature]['mean']
        monthly_std = monthly_stats[feature]['std']
        
        x = range(len(monthly_mean))
        axes[idx].plot(x, monthly_mean, label='Mean', marker='o')
        axes[idx].fill_between(x, 
                              monthly_mean - monthly_std, 
                              monthly_mean + monthly_std, 
                              alpha=0.3, label='±1 STD')
        axes[idx].set_title(f'{feature} - Monthly Statistics')
        axes[idx].set_xlabel('Month')
        axes[idx].legend()
        axes[idx].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 6. Анализ целевых переменных

In [None]:
# Анализ распределения целевых переменных
target_cols = [col for col in df_features.columns if 'target' in col or 'future_return' in col]

print("Найденные целевые переменные:")
for col in target_cols:
    print(f"  - {col}")

# Статистика по целевым переменным
if target_cols:
    target_stats = df_features[target_cols].describe()
    print("\nСтатистика целевых переменных:")
    print(target_stats)

In [None]:
# Визуализация целевых переменных
if 'future_return_4h' in df_features.columns:
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # Распределение future returns
    future_returns = df_features['future_return_4h'].dropna()
    axes[0, 0].hist(future_returns, bins=100, alpha=0.7, edgecolor='black')
    axes[0, 0].axvline(0, color='red', linestyle='--', label='Zero')
    axes[0, 0].set_title('Распределение Future Returns (4h)')
    axes[0, 0].set_xlabel('Return')
    axes[0, 0].set_ylabel('Frequency')
    axes[0, 0].legend()
    
    # QQ-plot
    from scipy import stats
    stats.probplot(future_returns, dist="norm", plot=axes[0, 1])
    axes[0, 1].set_title('QQ-plot Future Returns')
    
    # Анализ TP targets
    tp_cols = [col for col in target_cols if 'target_tp' in col]
    if tp_cols:
        tp_rates = df_features[tp_cols].mean()
        axes[1, 0].bar(range(len(tp_rates)), tp_rates.values)
        axes[1, 0].set_xticks(range(len(tp_rates)))
        axes[1, 0].set_xticklabels([col.replace('target_tp_', 'TP ') for col in tp_rates.index], rotation=45)
        axes[1, 0].set_title('Частота достижения TP уровней')
        axes[1, 0].set_ylabel('Probability')
    
    # Связь с волатильностью
    if 'atr_14' in df_features.columns:
        df_sample = df_features[['atr_14', 'future_return_4h']].dropna().sample(min(5000, len(df_features)))
        axes[1, 1].scatter(df_sample['atr_14'], df_sample['future_return_4h'], alpha=0.5, s=10)
        axes[1, 1].set_xlabel('ATR (14)')
        axes[1, 1].set_ylabel('Future Return (4h)')
        axes[1, 1].set_title('ATR vs Future Returns')
        axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

## 7. Feature Engineering - Дополнительные признаки

In [None]:
# Создание интерактивных признаков
def create_interaction_features(df, features, max_features=10):
    """Создание произведений важных признаков"""
    interaction_features = {}
    
    # Берем топ признаки
    top_features = features[:max_features]
    
    for i in range(len(top_features)):
        for j in range(i+1, len(top_features)):
            feat1, feat2 = top_features[i], top_features[j]
            if feat1 in df.columns and feat2 in df.columns:
                interaction_name = f"{feat1}_X_{feat2}"
                interaction_features[interaction_name] = df[feat1] * df[feat2]
    
    return pd.DataFrame(interaction_features)

# Создаем интерактивные признаки на основе MI scores
top_mi_features = mi_scores.head(10).index.tolist()
interaction_df = create_interaction_features(df_features, top_mi_features)

print(f"Создано {len(interaction_df.columns)} интерактивных признаков")
print("\nПримеры:")
for col in list(interaction_df.columns)[:5]:
    print(f"  - {col}")

In [None]:
# Создание лаговых признаков
def create_lag_features(df, features, lags=[1, 2, 3, 4, 8, 12]):
    """Создание лаговых признаков"""
    lag_features = {}
    
    for feature in features:
        if feature in df.columns:
            for lag in lags:
                lag_name = f"{feature}_lag_{lag}"
                lag_features[lag_name] = df.groupby('symbol')[feature].shift(lag)
    
    return pd.DataFrame(lag_features)

# Создаем лаги для важных признаков
important_features = ['rsi_14', 'macd_signal', 'volume_sma_ratio', 'bb_width']
lag_df = create_lag_features(df_features, important_features)

print(f"Создано {len(lag_df.columns)} лаговых признаков")

# Анализ корреляции лагов с целевой переменной
if 'future_return_4h' in df_features.columns:
    lag_correlations = {}
    for col in lag_df.columns:
        corr = lag_df[col].corr(df_features['future_return_4h'])
        lag_correlations[col] = corr
    
    lag_corr_df = pd.Series(lag_correlations).sort_values(ascending=False)
    print("\nТоп-10 лаговых признаков по корреляции с целевой переменной:")
    print(lag_corr_df.head(10))

## 8. Сохранение результатов анализа

In [None]:
# Сохранение важности признаков
feature_importance = pd.DataFrame({
    'mutual_info': mi_scores,
    'f_score': f_scores
})

# Добавляем ранги
feature_importance['mi_rank'] = feature_importance['mutual_info'].rank(ascending=False)
feature_importance['f_rank'] = feature_importance['f_score'].rank(ascending=False)
feature_importance['avg_rank'] = (feature_importance['mi_rank'] + feature_importance['f_rank']) / 2

# Сортируем по среднему рангу
feature_importance = feature_importance.sort_values('avg_rank')

# Сохраняем топ-50 признаков
top_features = feature_importance.head(50)
top_features.to_csv('feature_importance_analysis.csv')

print("Топ-20 признаков по среднему рангу:")
print(top_features.head(20)[['mutual_info', 'f_score', 'avg_rank']])

# Сохранение списка рекомендуемых признаков
recommended_features = {
    'top_features': top_features.index.tolist()[:30],
    'highly_correlated_to_remove': highly_correlated['feature2'].tolist() if len(highly_correlated) > 0 else [],
    'feature_categories': {k: v[:10] for k, v in feature_categories.items() if v},  # Топ-10 из каждой категории
    'statistics': {
        'total_features': len(feature_cols),
        'recommended_features': 30,
        'highly_correlated_pairs': len(highly_correlated)
    }
}

import json
with open('recommended_features.json', 'w') as f:
    json.dump(recommended_features, f, indent=4)

print("\nРезультаты сохранены в:")
print("  - feature_importance_analysis.csv")
print("  - recommended_features.json")