In [1]:
import pandas as pd
import numpy as np 
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.font_manager as fm
plt.rcParams['font.sans-serif'] = ['SimHei', 'Heiti TC', 'PingFang HK', 'Microsoft YaHei', 'SimSun']  # 优先级从左到右
plt.rcParams['axes.unicode_minus'] = False    # 用来正常显示负号


df = pd.read_csv('data/init.csv')

df.set_index('timestamp', inplace=True)




In [2]:
def calculate_atr(df, period=14):
    # 计算三种价格范围
    high_low = df['high'] - df['low']
    high_close = np.abs(df['high'] - df['close'].shift())
    low_close = np.abs(df['low'] - df['close'].shift())
    
    # 真实范围取最大值
    ranges = pd.concat([high_low, high_close, low_close], axis=1)
    true_range = ranges.max(axis=1)
    
    # 使用 Wilder 的平滑方法计算 ATR
    atr = pd.Series(index=df.index, dtype=float)
    atr.iloc[period-1] = true_range[:period].mean()  # 第一个ATR值使用简单平均
    
    # 使用 Wilder 的平滑方法计算后续值
    for i in range(period, len(df)):
        atr.iloc[i] = (atr.iloc[i-1] * (period-1) + true_range.iloc[i]) / period
    
    return atr

# 计算 ATR
df['ATR'] = calculate_atr(df)
df['ATR_Pct'] = (df['ATR'] / df['close']) * 100

## 简易支撑压力 ##


In [3]:
# 支撑位：最近N根K线的最低价（动态窗口）
def dynamic_support(data, window=30):
    return data['low'].rolling(window).min()

# 阻力位：最近N根K线的最高价（动态窗口）
def dynamic_resistance(data, window=30):
    return data['high'].rolling(window).max()

# 加入波动率调整
df['support'] = dynamic_support(df) * (1 - 0.1*df['ATR_Pct'])
df['resistance'] = dynamic_resistance(df) * (1 + 0.1*df['ATR_Pct'])

## 动量压缩比 ##

In [4]:
def calculate_momentum_compression_ratio(dataframe, short_window, long_window):
    dataframe['Short_Term_Momentum'] = dataframe['close'].diff(periods=short_window)
    dataframe['Long_Term_Momentum'] = dataframe['close'].diff(periods=long_window)
    dataframe['Momentum_Compression_Ratio'] = dataframe['Short_Term_Momentum'] / dataframe['Long_Term_Momentum']
    return dataframe

# 设置参数，例如短期5分钟，长期15分钟
df = calculate_momentum_compression_ratio(df, short_window=5, long_window=15)


## 弹性 ##

In [5]:
def calculate_elasticity_coefficient(dataframe, window):
    roll_max = dataframe['high'].rolling(window=window, min_periods=1).max()
    roll_min = dataframe['low'].rolling(window=window, min_periods=1).min()
    dataframe['Elasticity'] = (dataframe['close'] - roll_min) / (roll_max - roll_min)
    return dataframe

# 应用函数，例如使用14分钟作为窗口
df = calculate_elasticity_coefficient(df, window=14)


## 压力 ##

In [6]:
def calculate_pressure_accumulation(dataframe):
    # 根据收盘价是否高于开盘价来决定多空力量
    dataframe['Pressure'] = np.where(dataframe['close'] > dataframe['open'],
                                     dataframe['volume'], -dataframe['volume'])
    # 累积多空力量
    dataframe['Cumulative_Pressure'] = dataframe['Pressure'].cumsum()

    return dataframe

# 应用函数到实际数据
df = calculate_pressure_accumulation(df)

#


## Hurst ##


In [8]:
from hurst import compute_Hc

def calculate_fractal_dimension(dataframe, window):
    H, c, data = compute_Hc(dataframe['close'].values, kind='price', simplified=True)
    D = 2 - H
    return H, D

# 计算示例数据集的Hurst指数和分形维度
H, D = calculate_fractal_dimension(df, window=60)
print("Hurst指数:", H)
print("分形维度:", D)

Hurst指数: 0.5494223636011429
分形维度: 1.4505776363988572


## RSI ##


In [10]:


def calculate_rsi_wilder(df, period=14):
    # 计算价格变化
    delta = df['close'].diff()
    
    # 分别获取上涨和下跌
    gain = delta.where(delta > 0, 0.0)
    loss = -delta.where(delta < 0, 0.0)
    
    # Wilder's EMA 使用 alpha = 1/period
    # 首次计算简单平均值
    avg_gain = gain[:period].mean()
    avg_loss = loss[:period].mean()
    
    # 准备存放 RSI 的序列
    rsi_series = pd.Series(index=df.index, dtype=float)
    
    # 第一个 RSI 值
    rsi_series.iloc[period] = 100 - (100 / (1 + (avg_gain / avg_loss)))
    
    # 使用 Wilder 的平滑方法计算后续值
    for i in range(period + 1, len(df)):
        avg_gain = ((avg_gain * (period - 1)) + gain.iloc[i]) / period
        avg_loss = ((avg_loss * (period - 1)) + loss.iloc[i]) / period
        if avg_loss != 0:
            rs = avg_gain / avg_loss
            rsi_series.iloc[i] = 100 - (100 / (1 + rs))
        else:
            rsi_series.iloc[i] = 100
    
    return rsi_series

# 计算 Wilder's RSI

df['RSI_Wilder'] = calculate_rsi_wilder(df)



In [11]:
df.columns

Index(['open', 'high', 'low', 'close', 'volume', 'vwap', 'transactions', 'ATR',
       'ATR_Pct', 'support', 'resistance', 'Short_Term_Momentum',
       'Long_Term_Momentum', 'Momentum_Compression_Ratio', 'Elasticity',
       'Pressure', 'Cumulative_Pressure', 'RSI_Wilder'],
      dtype='object')