In [13]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import norm
import seaborn as sns
import plotly.graph_objects as go

from bokeh.plotting import figure, show
from bokeh.layouts import column
from bokeh.models import ColumnDataSource, CustomJS, HoverTool


class StockPriceSimulator:
    def __init__(self, n_steps, n_sims):
        """
        初始化参数
        S0: 初始股票价格
        mu: 预期年化收益率
        sigma: 波动率(标准差)
        n_steps: 时间步数
        n_sims: 模拟路径数量
        """
        self.S0 = 1
        self.T = n_steps
        self.dt = 1
        self.n_sims = n_sims
        self.n_steps = int(n_steps)
        
    def generate_paths(self):
        """生成蒙特卡罗模拟路径"""
        # 创建时间序列
        self.times = np.linspace(0, self.T, self.n_steps)
        
        # 初始化股票价格数组
        S = np.zeros((self.n_steps, self.n_sims))
        S[0] = self.S0

        # 生成随机正态分布数
        Z = np.random.normal(0, 1, (self.n_steps, self.n_sims))

        # 使用几何布朗运动公式模拟价格路径
        for t in range(1, self.n_steps):
            # 计算漂移和扩散项
            mu = np.random.uniform(-0.0001, 0.0001)
            sigma = np.random.uniform(0.0001, 0.0003)

            drift = (mu - 0.5 * sigma**2) * self.dt
            diffusion = sigma * np.sqrt(self.dt) * Z[t-1]

            # 更新股票价格
            S[t] = S[t-1] * np.exp(drift + diffusion)
        
        return S
    
    def plot_paths(self, S, n_paths_to_plot=0, t_idx=0, predict_t=100):
        """绘制模拟路径"""
        if n_paths_to_plot == 0:
            n_paths_to_plot = self.n_sims

        plt.figure(figsize=(12, 6))
        plt.axvline(x=t_idx, color='r', linestyle='--')
        plt.axvline(x=predict_t, color='g', linestyle='--')
        plt.plot(self.times, S[:, :n_paths_to_plot])
        plt.xlabel('time')
        plt.ylabel('price')
        plt.grid(True)

        plt.show()
        
    def plot_paths_interactive(self, S, n_paths_to_plot=0, t_idx=0, predict_t=100, lines=None):
        """
        使用 Bokeh 绘制交互式模拟路径
        """
        if n_paths_to_plot == 0:
            n_paths_to_plot = S.shape[1]

        # 创建数据源
        source = ColumnDataSource({
            'xs': [self.times for _ in range(n_paths_to_plot)],
            'ys': [S[:, i] for i in range(n_paths_to_plot)],
            'path_id': [f'Path {i+1}' for i in range(n_paths_to_plot)]
        })

        # 创建图表 - 增加尺寸
        p = figure(
            width=1600,  # 增加宽度
            height=800,  # 增加高度
            title='Stock Price Simulation Paths',
            x_axis_label='Time',
            y_axis_label='Price',
            background_fill_color='white',
            sizing_mode="stretch_width"  # 自适应宽度
        )

        # 添加所有路径 - 修改默认颜色和透明度
        line_renderer = p.multi_line(
            xs='xs', 
            ys='ys', 
            source=source,
            line_width=1.5,
            line_alpha=0.3,  # 降低默认透明度
            line_color='gray',  # 改为灰色
            hover_line_color='#FF6B6B',  # 改为醒目的珊瑚红色
            hover_line_width=3.5
        )

        # 添加垂直参考线
        p.line([t_idx, t_idx], [S.min(), S.max()], 
            line_color='red', line_dash='dashed', legend_label='Start',
            line_width=2)  # 增加线宽
        p.line([predict_t, predict_t], [S.min(), S.max()], 
            line_color='green', line_dash='dashed', legend_label='Predict',
            line_width=2)  # 增加线宽

        # 添加悬停工具
        hover = HoverTool(
            tooltips=[
                ('Path', '@path_id'),
                ('Time', '$x{0.0}'),
                ('Price', '@ys{0,0.00}')  # 增加格式化
            ],
            renderers=[line_renderer],
            mode='mouse'
        )
        p.add_tools(hover)

        # 添加鼠标悬停效果
        callback = CustomJS(args=dict(source=source), code="""
            const indices = cb_data.index.indices;
            if (indices.length > 0) {
                const colors = Array(source.data.xs.length).fill('rgba(128,128,128,0.15)');  // 更淡的灰色
                colors[indices[0]] = '#FF6B6B';  // 珊瑚红色
                const widths = Array(source.data.xs.length).fill(1.5);
                widths[indices[0]] = 3.5;
                source.data['line_color'] = colors;
                source.data['line_width'] = widths;
                source.change.emit();
            }
        """)

        # 添加鼠标离开效果
        reset_callback = CustomJS(args=dict(source=source), code="""
            const colors = Array(source.data.xs.length).fill('gray');
            const widths = Array(source.data.xs.length).fill(1.5);
            source.data['line_color'] = colors;
            source.data['line_width'] = widths;
            source.change.emit();
        """)

        # 设置图表样式
        p.grid.grid_line_color = 'gray'
        p.grid.grid_line_alpha = 0.1
        p.axis.axis_line_color = 'gray'
        p.axis.axis_line_alpha = 0.5
        p.title.text_font_size = '20pt'  # 增大标题字号
        p.xaxis.axis_label_text_font_size = '14pt'  # 增大轴标签字号
        p.yaxis.axis_label_text_font_size = '14pt'
        p.xaxis.major_label_text_font_size = '12pt'  # 增大刻度标签字号
        p.yaxis.major_label_text_font_size = '12pt'
        p.legend.location = "top_left"
        p.legend.click_policy = "hide"
        p.legend.label_text_font_size = '12pt'  # 增大图例字号

        # 添加交互事件
        p.js_on_event('mousemove', callback)
        p.js_on_event('mouseleave', reset_callback)

        # 设置边距
        p.min_border_left = 80
        p.min_border_right = 80
        p.min_border_top = 60
        p.min_border_bottom = 60

        # 添加lines中的所有线段
        if lines is not None:
            for x1, y1, x2, y2 in lines:
                p.line([x1, x2], [y1, y2], line_color='red', line_dash='dashed', line_width=2)


        # 显示图表
        show(p)



In [14]:
# 设置参数
n_steps= 4800   # 时间步数
n_sims = 30 # 模拟路径数量

# 创建模拟器实例
simulator = StockPriceSimulator(n_steps, n_sims)

# 生成路径
paths = simulator.generate_paths()
print(len(paths))

4800


In [15]:
simulator.plot_paths_interactive(paths,t_idx=0, predict_t=4800)

# paper label  

In [21]:
"""
predict_n   :  预测未来时点个数
min_tick    :  最小变动价格

t0          :  当前时点前 predict_n 时点
t1          :  当前时点
t2          :  当前时点后 predict_n 时点

p0          :  t0 - t1 之间的平均中间价格
p1          :  t1 - t2 之间的平均中间价格

价差        :  (p1 - p0) / min_tick > 变动了多少个 min_tick
阈值        :  0.5

分类:
  价差>阈值     :  上涨     (0)
  价差<-阈值    :  下跌     (1)
  其他          :  无变动   (2)

包含预期外的形态:
1. 预测段没有上涨
    - t0 - t1 之间平稳上涨, t1 - t2 之间基本持平/缓慢下跌
    - t1 开始较高位置 无变动, 之后开始下跌
    - t1 开始大幅度震荡

2. 预测段上涨与 t1 距离很远, 意味着 上涨与x关系不大
    - t1 开始 无变动/缓慢下跌 , 最后时段急剧上涨
    - t1 开始 下跌 , 之后开始上涨

"""


'\npredict_n   :  预测未来时点个数\nmin_tick    :  最小变动价格\n\nt0          :  当前时点前 predict_n 时点\nt1          :  当前时点\nt2          :  当前时点后 predict_n 时点\n\np0          :  t0 - t1 之间的平均中间价格\np1          :  t1 - t2 之间的平均中间价格\n\n价差        :  (p1 - p0) / min_tick > 变动了多少个 min_tick\n阈值        :  0.5\n\n分类:\n  价差>阈值     :  上涨     (0)\n  价差<-阈值    :  下跌     (1)\n  其他          :  无变动   (2)\n\n包含预期外的形态:\n1. 预测段没有上涨\n    - t0 - t1 之间平稳上涨, t1 - t2 之间基本持平/缓慢下跌\n    - t1 开始较高位置 无变动, 之后开始下跌\n    - t1 开始大幅度震荡\n\n2. 预测段上涨与 t1 距离很远, 意味着 上涨与x关系不大\n    - t1 开始 无变动/缓慢下跌 , 最后时段急剧上涨\n    - t1 开始 下跌 , 之后开始上涨\n\n'

In [22]:
# n = 100

# data = pd.DataFrame(paths)
# mean_mid_p0 = data.rolling(n).mean().shift(-n)
# mean_mid_p = data.rolling(n).mean()
# paper = ((mean_mid_p0 - mean_mid_p) / 0.001).dropna()

# t = paper.iloc[0]
# print(t.describe())
# up_cols = t[t > 0.5].index
# simulator.plot_paths_interactive(data.loc[:, up_cols].values,t_idx=99, predict_t=99+100)

# label 1

In [23]:
"""
predict_n   :  预测未来时点个数
min_tick    :  最小变动价格
k           :  计算均价前后时点个数(3)

t0          :  当前时点
t1          :  当前时点后 predict_n 时点

p0          :  t0 时点中间价格
p1          :  t1-k, t1+k 之间的平均中间价格

价差        :  (p1 - p0) / min_tick > 变动了多少个 min_tick
阈值        :  0.5

分类:
  价差>阈值     :  上涨     (0)
  价差<-阈值    :  下跌     (1)
  其他          :  无变动   (2)

包含预期外的形态:
1. 预测段上涨与 t1 距离很远, 意味着 上涨与x关系不大
    - t01 开始 无变动/下跌 , 最后时段急剧上涨

"""

'\npredict_n   :  预测未来时点个数\nmin_tick    :  最小变动价格\nk           :  计算均价前后时点个数(3)\n\nt0          :  当前时点\nt1          :  当前时点后 predict_n 时点\n\np0          :  t0 时点中间价格\np1          :  t1-k, t1+k 之间的平均中间价格\n\n价差        :  (p1 - p0) / min_tick > 变动了多少个 min_tick\n阈值        :  0.5\n\n分类:\n  价差>阈值     :  上涨     (0)\n  价差<-阈值    :  下跌     (1)\n  其他          :  无变动   (2)\n\n包含预期外的形态:\n1. 预测段上涨与 t1 距离很远, 意味着 上涨与x关系不大\n    - t01 开始 无变动/下跌 , 最后时段急剧上涨\n\n'

In [24]:
n = 3

data = pd.DataFrame(paths)

k = 3
p = data.rolling(2*k + 1).mean()
p2 = p.shift(-k)
p_1 = p2.shift(-n)
label_1 = ((p_1 - data) / 0.001).dropna()

t = label_1.iloc[0]
print(t.describe())

# 计算33%和66%分位数
q33 = t.quantile(0.33)
q66 = t.quantile(0.66)

count    500.000000
mean      -0.059142
std        0.285316
min       -0.919008
25%       -0.251045
50%       -0.071989
75%        0.163335
max        0.635111
Name: 0, dtype: float64


In [25]:
up_cols = t[t > q66].index
simulator.plot_paths_interactive(data.loc[:, up_cols].values,t_idx=0, predict_t=0+n)
down_cols = t[t < q33].index
simulator.plot_paths_interactive(data.loc[:, down_cols].values,t_idx=0, predict_t=0+n)
other_cols = t[(t < q66) & (t > q33)].index
simulator.plot_paths_interactive(data.loc[:, other_cols].values,t_idx=0, predict_t=0+n)

# label god 
全知标签

In [None]:
"""
predict_n   :  预测未来时点个数

t0          :  当前时点
t1          :  当前时点后 predict_n 时点

-------------------------------------
上涨:
  p0          :  t0 时点中间价格 - error
  p1          :  t0 时点中间价格 + win* min_tick
  趋势线:
    data_up: p0 - p1
    data_up_0: data_up 上移error*2

-------------------------------------
下跌:
  p0          :  t0 时点中间价格 + error
  p1          :  t0 时点中间价格 - win* min_tick
  趋势线:
    data_down: p0 - p1
    data_down_0: data_down 下移error*2
-------------------------------------

分类:
  predict_n期间内 
  不跌破data_up趋势线(温和上涨) | 价格先上穿data_up_0趋势线(上涨强烈)     :  上涨 (0)
  不涨破data_down趋势线(温和下跌) | 价格先下穿data_down_0趋势线(下跌强烈) :  下跌 (1)
  其他:  震荡 (2)

"""

# label 2  
趋势线控制


In [9]:
"""
predict_n   :  预测未来时点个数
min_tick    :  最小变动价格
win         :  盈利倍数(1.5)
error       :  允许误差(min_tick)

t0          :  当前时点
t1          :  当前时点后 predict_n 时点

-------------------------------------
上涨:
  p0          :  t0 时点中间价格 - error
  p1          :  t0 时点中间价格 + win* min_tick
  趋势线:
    data_up: p0 - p1
    data_up_0: data_up 上移error*2

-------------------------------------
下跌:
  p0          :  t0 时点中间价格 + error
  p1          :  t0 时点中间价格 - win* min_tick
  趋势线:
    data_down: p0 - p1
    data_down_0: data_down 下移error*2
-------------------------------------

分类:
  predict_n期间内 
  不跌破data_up趋势线(温和上涨) | 价格先上穿data_up_0趋势线(上涨强烈)     :  上涨 (0)
  不涨破data_down趋势线(温和下跌) | 价格先下穿data_down_0趋势线(下跌强烈) :  下跌 (1)
  其他:  震荡 (2)

"""

'\npredict_n   :  预测未来时点个数\nmin_tick    :  最小变动价格\nwin         :  盈利倍数(1.5)\nerror       :  允许误差(min_tick)\n\nt0          :  当前时点\nt1          :  当前时点后 predict_n 时点\n\n-------------------------------------\n上涨:\n  p0          :  t0 时点中间价格 - error\n  p1          :  t0 时点中间价格 + win* min_tick\n  趋势线:\n    data_up: p0 - p1\n    data_up_0: data_up 上移error*2\n\n-------------------------------------\n下跌:\n  p0          :  t0 时点中间价格 + error\n  p1          :  t0 时点中间价格 - win* min_tick\n  趋势线:\n    data_down: p0 - p1\n    data_down_0: data_down 下移error*2\n-------------------------------------\n\n分类:\n  predict_n期间内 \n  不跌破data_up趋势线(温和上涨) | 价格先上穿data_up_0趋势线(上涨强烈)     :  上涨 (0)\n  不涨破data_down趋势线(温和下跌) | 价格先下穿data_down_0趋势线(下跌强烈) :  下跌 (1)\n  其他:  震荡 (2)\n\n'

In [54]:
n = 100

data = pd.DataFrame(paths)

win = 1
error = 0.001
soft = error / 20
match_ratio = 4/5
match_ratio_soft = match_ratio * 80/100

In [55]:
def cal_label_2(mid_price, add_soft=True):
    # mid_price = data.iloc[:,0]

    p0_up = mid_price - error
    p1_up = mid_price + win * 0.001

    p0_down = mid_price + error
    p1_down = mid_price - win * 0.001

    res = np.full(len(mid_price), np.nan)
    for i in range(len(mid_price)):
        if i+n == len(mid_price):
            break
        p0 = p0_up.iloc[i]
        p1 = p1_up.iloc[i]
        # 上涨趋势线
        # 构建线性序列
        linear_steps = [p0 + (p1 - p0) * j / n for j in range(1, n)]
        linear_steps = [p0] + linear_steps + [p1]
        # 创建 DataFrame
        data_up = pd.Series(linear_steps)
        data_up_0 = data_up + error*2
        # 软边界
        data_up_s = data_up - soft
        data_up_0_s = data_up_0 - soft

        # 下跌趋势线
        # 构建线性序列
        p0 = p0_down.iloc[i]
        p1 = p1_down.iloc[i]
        linear_steps = [p0 + (p1 - p0) * j / n for j in range(1, n)]
        linear_steps = [p0] + linear_steps + [p1]
        # 创建 DataFrame
        data_down = pd.Series(linear_steps)
        data_down_0 = data_down - error*2
        # 软边界
        data_down_s = data_down + soft
        data_down_0_s = data_down_0 + soft

        match_range = int(len(data_up) * match_ratio)
        match_range_soft = int(len(data_up) * match_ratio_soft)
        range_data= mid_price.iloc[i:i+len(data_up)].reset_index(drop=True)

        upper = range_data > data_up_0
        upper.iloc[-1] = True
        upper_idx = upper.idxmax()
        bad_up = range_data < data_up
        bad_up.iloc[-1] = True
        bad_up_idx = bad_up.idxmax()
        good_upper = upper_idx < bad_up_idx
        up_match = ((range_data >= data_up).iloc[:match_range].all() | good_upper)
        
        if up_match:
            res[i] = 0
        else:
            lower = (range_data < data_down_0)
            lower.iloc[-1] = True
            lower_idx = lower.idxmax()
            bad_down = range_data > data_down
            bad_down.iloc[-1] = True
            bad_down_idx = bad_down.idxmax()
            good_lower = lower_idx < bad_down_idx
            down_match = ((range_data <= data_down).iloc[:match_range].all() | good_lower)

            if down_match:
                res[i] = 1
            else:
                if add_soft:
                    # soft upper
                    upper_s = range_data > data_up_0_s
                    upper_s.iloc[-1] = True
                    upper_idx_s = upper_s.idxmax()
                    bad_up_s = range_data < data_up_s
                    bad_up_s.iloc[-1] = True
                    bad_up_idx_s = bad_up_s.idxmax()
                    good_upper_s = upper_idx_s < bad_up_idx_s
                    up_match_s = ((range_data >= data_up_s).iloc[:match_range_soft].all() | good_upper_s)

                    # soft lower
                    lower_s = (range_data < data_down_0_s)
                    lower_s.iloc[-1] = True
                    lower_idx_s = lower_s.idxmax()
                    bad_down_s = range_data > data_down_s
                    bad_down_s.iloc[-1] = True
                    bad_down_idx_s = bad_down_s.idxmax()
                    good_lower_s = lower_idx_s < bad_down_idx_s
                    down_match_s = ((range_data <= data_down_s).iloc[:match_range_soft].all() | good_lower_s)

                    if up_match_s or down_match_s:
                        res[i] = 3
                    else:
                        res[i] = 2

                else:
                    res[i] = 2

    return res

# cal_label_2(data.iloc[:,0])
up_cols = []
down_cols = []
other_cols = []
soft_cols = []
for i in list(data):
    label = pd.Series(cal_label_2(data[i]))
    if label.iloc[0] == 0:
        up_cols.append(i)
    elif label.iloc[0] == 1:
        down_cols.append(i)
    elif label.iloc[0] == 2:
        other_cols.append(i)
    else:
        soft_cols.append(i)

# (14411, 14534, 21055, 18916)
len(up_cols), len(down_cols), len(other_cols), len(soft_cols)

(211, 106, 139, 44)

In [56]:
# 上涨趋势线
p0 = data.iloc[0] - error
p1 = data.iloc[0] + win * 0.001
# 构建线性序列
linear_steps = [p0 + (p1 - p0) * i / n for i in range(1, n)]
linear_steps = [p0] + linear_steps + [p1]
# 创建 DataFrame
data_up = pd.DataFrame(linear_steps).reset_index(drop=True)
data_up_0 = data_up + error*2
# 软边界
data_up_s = data_up - soft
data_up_0_s = data_up_0 - soft

up_lines = [
    (0, p0[0], 100, p1[0]),
    (0, p0[0] + error*2, 100, p1[0] + error*2),
]

other_lines = [
    (0, p0[0]-soft, 100, p1[0]-soft),
    (0, p0[0]-soft + error*2, 100, p1[0]-soft + error*2),
]

In [57]:
# 下跌趋势线
p0 = data.iloc[0] + error
p1 = data.iloc[0] - win * 0.001
# 构建线性序列
linear_steps = [p0 + (p1 - p0) * i / n for i in range(1, n)]
linear_steps = [p0] + linear_steps + [p1]
# 创建 DataFrame
data_down = pd.DataFrame(linear_steps).reset_index(drop=True)
data_down_0 = data_down - error*2
# 软边界
data_down_s = data_down + soft
data_down_0_s = data_down_0 + soft

down_lines = [
    (0, p0[0], 100, p1[0]),
    (0, p0[0] - error*2, 100, p1[0] - error*2),
]

other_lines.append((0, p0[0]+soft, 100, p1[0]+soft))
other_lines.append((0, p0[0]+soft - error*2, 100, p1[0]+soft - error*2))

In [58]:
match_range = int(len(data_up) * match_ratio)
match_range_soft = int(len(data_up) * match_ratio_soft)

range_data= data.iloc[:len(data_up)]

upper = (range_data > data_up_0)
upper.iloc[-1] = True
upper_idx = upper.idxmax()
bad_up = range_data < data_up
bad_up.iloc[-1] = True
bad_up_idx = bad_up.idxmax()
good_upper = upper_idx < bad_up_idx
up_match = ((range_data >= data_up).iloc[:match_range].all() | good_upper)
up_cols = up_match[up_match == True].index

# soft upper
upper_s = (range_data > data_up_0_s)
upper_s.iloc[-1] = True
upper_idx_s = upper_s.idxmax()
bad_up_s = range_data < data_up_s
bad_up_s.iloc[-1] = True
bad_up_idx_s = bad_up_s.idxmax()
good_upper_s = upper_idx_s < bad_up_idx_s
up_match_s = ((range_data >= data_up_s).iloc[:match_range_soft].all() | good_upper_s)
up_cols_s = up_match_s[up_match_s == True].index

lower = (range_data < data_down_0)
lower.iloc[-1] = True
lower_idx = lower.idxmax()
bad_down = range_data > data_down
bad_down.iloc[-1] = True
bad_down_idx = bad_down.idxmax()
good_lower = lower_idx < bad_down_idx
down_match = ((range_data <= data_down).iloc[:match_range].all() | good_lower)
down_cols = down_match[down_match == True].index

# soft lower
lower_s = (range_data < data_down_0_s)
lower_s.iloc[-1] = True
lower_idx_s = lower_s.idxmax()
bad_down_s = range_data > data_down_s
bad_down_s.iloc[-1] = True
bad_down_idx_s = bad_down_s.idxmax()
good_lower_s = lower_idx_s < bad_down_idx_s
down_match_s = ((range_data <= data_down_s).iloc[:match_range_soft].all() | good_lower_s)
down_cols_s = down_match_s[down_match_s == True].index

other_cols = [i for i in list(data) if i not in up_cols and i not in down_cols]
other_lines = other_lines + up_lines + down_lines

other_cols_s = [i for i in list(data) if i not in up_cols_s and i not in down_cols_s]
other_lines_s = other_lines + up_lines + down_lines

In [59]:
# # 上涨绘图
# simulator.plot_paths_interactive(data.loc[:, up_cols].values,t_idx=0, predict_t=0+n, lines=up_lines)
# # 下跌绘图
# simulator.plot_paths_interactive(data.loc[:, down_cols].values,t_idx=0, predict_t=0+n, lines=down_lines)
# # 其他绘图
# simulator.plot_paths_interactive(data.loc[:, other_cols].values,t_idx=0, predict_t=0+n, lines=other_lines)

In [60]:
# 100   (211, 106, 183, 139)
# 60    (199, 93, 208, 146)
# 30    (138, 86, 276, 149)
# 15    (149, 88, 263, 119)
# 10    (159, 78, 263, 128)
# 5     (168, 151, 181, 0)
# 3     (69, 89, 342, 0)
len(up_cols), len(down_cols), len(other_cols), len(other_cols_s)

(211, 106, 183, 139)