In [None]:
import pandas as pd
import talib as ta
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.cluster import KMeans 

import seaborn as sns
import statsmodels.api as sm
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib
import warnings
import logging
import platform
mpl_logger = logging.getLogger('matplotlib')
mpl_logger.setLevel(logging.WARNING)  # 设置 matplotlib 日志级别为 WARNING
warnings.filterwarnings("ignore", category=UserWarning, module="matplotlib.font_manager")
if platform.system() == 'Darwin':  # macOS
    plt.rcParams['font.family'] = ['Songti SC']
elif platform.system() == 'Windows':
    plt.rcParams['font.family'] = ['SimSun']
else:  # Linux
    plt.rcParams['font.family'] = ['Noto Sans CJK SC']
matplotlib.rcParams['axes.unicode_minus'] = False # 解决负号显示问题

In [None]:
df = pd.read_csv('../data/stock.csv', parse_dates=True)

df.head()

In [None]:
def iso_forest_detect(series):
    model = IsolationForest(
        contamination=0.0005,  # 预期异常比例
        random_state=42
    )
    X = series.values.reshape(-1,1)
    return model.fit_predict(X) == -1
def plot_anomalies(series, anomalies, title):
    plt.figure(figsize=(16, 6))
    # 主序列
    ax = series.plot(label='Normal', alpha=0.8,linewidth=1.2)
    
    # 异常点（确保索引对齐）
    anomaly_points = series.loc[anomalies]
    anomaly_points.plot(
        ax=ax,
        style='D',
        markersize=12,
        color='#FF4500',  # 使用更醒目的橙色
        label='Anomaly',
        alpha=0.9
    )
    # 添加异常密度热力图
    if len(anomaly_points) > 0:
        sns.kdeplot(
            x=anomaly_points.index.astype(np.int64), 
            y=anomaly_points.values,
            cmap='Reds',
            fill=True,
            alpha=0.2,
            ax=ax
        )
    plt.title(f'{title} Anomaly Detection\n(异常率：{anomalies.mean():.2%})', pad=20)
    plt.legend()
    plt.grid(True, alpha=0.3)

    # 添加统计信息框
    stats_text = f"""异常点统计：
    • 总数：{anomalies.sum()}
    • 异常率：{anomalies.mean():.2%}
    """
    
    plt.annotate(stats_text, 
                xy=(0.78, 0.85), 
                xycoords='axes fraction',
                bbox=dict(boxstyle="round", fc="white", ec="#999999", alpha=0.8))
    plt.show()
iso_forest_anomalies = iso_forest_detect(df['volume'])

plot_anomalies(df['volume'], iso_forest_anomalies, 'Volume')

# df.loc[iso_forest_anomalies]=np.nan
# df.loc[iso_forest_anomalies] = df.loc[iso_forest_anomalies].ffill() # 使用线性插值填充异常值
print(df.loc[iso_forest_anomalies])

In [None]:
def create_features(data:pd.DataFrame):
    data['EMA20'] = ta.EMA(data['close'], timeperiod=20)
    data['EMA100'] = ta.EMA(data['close'], timeperiod=100)
    data['SMA20'] = ta.SMA(data['close'], timeperiod=20)
    data['SMA100'] = ta.SMA(data['close'], timeperiod=100)

    data['RSI'] = ta.RSI(data['close'], timeperiod=14)
    # 平均真实波幅
    data['ATR'] = ta.ATR(data['high'], data['low'], data['close'], timeperiod=14)
    # 波动率
    data['Volatility'] = data['ATR'] / data['close']
    data['MACD'], data['Signal'], data['Hist'] = ta.MACD(data['close'], fastperiod=12, slowperiod=26, signalperiod=9)
    # MACD线瞬时斜率
    data['macd_slope'] = data['MACD'].diff()
    # 移动平均斜率（捕捉趋势强度）
    for window in [3, 5, 10]:
        data[f'macd_slope_ma{window}'] = data['macd_slope'].rolling(window).mean()
    
    # 计算价格波动率
    # 对数收益率滚动标准差
    data['close_volatility'] = np.log(data['close'] / data['close'].shift(1))
    data['Volatility_10'] = data['close_volatility'].rolling(window=10).std() * np.sqrt(10)
    data['close_Volume_volatility'] = data['close_volatility'] * data['volume']
    data['Volume_volatility_10'] = data['close_Volume_volatility'].rolling(window=10).std() * np.sqrt(10)
    # 计算布林带
    data['UpperBB'],data['MiddleBB'],data['LowerBB'] = ta.BBANDS(data['close'], timeperiod=20)

    # 特征工程
    for lag in [1, 3, 5]:
        data[f'return_lag{lag}'] = data['close'].pct_change(lag)
    # # 计算趋势线
    # seasonal_decompose = sm.tsa.seasonal_decompose(data['close'], model='additive', period=96) 
    # data['trend'] = seasonal_decompose.trend
    # data['seasonal'] = seasonal_decompose.seasonal
    # data['residual'] = seasonal_decompose.resid
    return df.dropna()

def create_(data:pd.DataFrame):
    data[['final_peak', 'final_valley']].fillna(value=False)
    df['bottom_divergence'] = (df['final_valley'] == 1 & (df['macd_slope_ma3'] > 0)).astype(int)
    
    df['top_divergence'] = (df['final_peak'] == 1 & 
                                (df['macd_slope_ma3'] < 0)).astype(int)
    
    
    

In [None]:
df = create_features(df)

df.to_csv("../data/train.csv", index=False)

df.tail()