In [None]:
import numpy as np
import pandas as pd
import pandas_ta as ta
import akshare as ak
from datetime import datetime, timedelta
from IPython.display import HTML

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)  # 也可以设置列宽

In [None]:
def sim_price():
    import math

    # 参数设置
    A = 100  # 振幅
    f = 1  # 频率（周期为1/f）
    phi = 0  # 相位偏移
    B = 100  # 垂直偏移
    T = 2  # 总时间
    N = 1000  # 点的数量

    # 生成时间轴
    t = [i / N * T for i in range(N)]

    # 使用for循环生成正弦波
    y = []
    for i in range(N):
        value = round(A * math.sin(2 * math.pi * f * t[i] + phi) + B, 2)
        y.append(value)
    return y


def sim_volume():
    import math

    # 参数设置
    A = 10  # 振幅
    f = 5  # 频率（周期为1/f）
    phi = 0  # 相位偏移
    B = 20000  # 垂直偏移
    T = 2  # 总时间
    N = 1000  # 点的数量

    # 生成时间轴
    t = [i / N * T for i in range(N)]

    # 使用for循环生成正弦波
    y = []
    for i in range(N):
        value = math.floor(A * math.sin(2 * math.pi * f * t[i] + phi) + B)
        y.append(value)
    return y

In [None]:
import random
import time


def sim_price(start_price=100, price_variation=40, min_price=10.0):
    """模拟股票价格变动，每2秒产生一次价格"""
    price = start_price
    while True:  # 无限行情生成，实际应用中应该有结束条件
        change = random.uniform(-price_variation, price_variation)
        price += change
        price = max(price, min_price)  # 保证价格不小于10元
        yield round(price, 2)  # 产生当前的价格
        time.sleep(1)  # 暂停2秒


def sim_volume(base_volume=10000, volume_variation=7000):
    """模拟股票成交量变动，每2秒产生一次成交量"""
    while True:  # 无限行情生成，实际应用中应该有结束条件
        volume = random.uniform(
            base_volume - volume_variation, base_volume + volume_variation)
        yield int(volume)  # 产生当前的成交量
        time.sleep(1)  # 暂停2秒

In [None]:
import pandas as pd
import pandas_ta as ta


class BollingerOBVStrategy:
    def __init__(self, window_bb=5, window_obv=5, window_signal=5, num_std=1.5):
        self.window_bb = window_bb
        self.window_obv = window_obv
        self.window_signal = window_signal
        self.num_std = num_std
        self.data = pd.DataFrame(
            columns=['close', 'volume', 'bb_upper', 'bb_middle', 'bb_lower', 'obv', 'signal'])

    def update_data(self, new_close, new_volume):
        if self.data.empty:
            self.data = pd.DataFrame(
                {'close': [new_close], 'volume': [new_volume]})
            self.data['obv'] = 0  # 初始化OBV
            self.data['signal'] = 0  # 初始化signal

        else:
            new_row = pd.DataFrame(
                {'close': [new_close], 'volume': [new_volume], 'signal': 0})
            self.data = pd.concat([self.data, new_row], ignore_index=True)

        # 计算OBV
        if 'obv' in self.data.columns and len(self.data) > 1:
            self.data['obv'] = ta.obv(self.data['close'], self.data['volume'])

        # 计算布林带
        if len(self.data) >= self.window_bb:
            bb_df = ta.bbands(self.data['close'],
                              length=self.window_bb, std=self.num_std)
            columns = {
                f"BBL_{self.window_bb}_{self.num_std}": "bb_lower",
                f"BBM_{self.window_bb}_{self.num_std}": "bb_middle",
                f"BBU_{self.window_bb}_{self.num_std}": "bb_upper"
            }
            bb_df.rename(columns=columns, inplace=True)
            self.data['bb_upper'] = bb_df['bb_upper']
            self.data['bb_middle'] = bb_df['bb_middle']
            self.data['bb_lower'] = bb_df['bb_lower']

            # 生成信号
            self.generate_signals()

    def generate_signals(self):
        # 检查背离并生成信号
        if len(self.data) > 1:
            # 看跌背离：价格新高但OBV未新高，且价格超过上轨
            #    self.data.iloc[-1]['obv'] < self.data.iloc[-2]['obv'] and \
            if self.data.iloc[-1]['close'] > self.data.iloc[-2]['close'] and \
               self.data.iloc[-1]['close'] > self.data.iloc[-1]['bb_upper']:
                self.data.at[len(self.data) - 1, 'signal'] = -1  # 卖出信号

            # 看涨背离：价格新低但OBV未新低，且价格低于下轨
            #    self.data.iloc[-1]['obv'] > self.data.iloc[-2]['obv'] and \
            if self.data.iloc[-1]['close'] < self.data.iloc[-2]['close'] and \
               self.data.iloc[-1]['close'] < self.data.iloc[-1]['bb_lower']:
                self.data.at[len(self.data) - 1, 'signal'] = 1  # 买入信号

    def get_last_signal(self):
        # 返回最新的信号
        return self.data.iloc[-1]['signal'] if 'signal' in self.data.columns else None

In [None]:
strategy = BollingerOBVStrategy()
prices = sim_price()
volumes = sim_volume()

try:
    for price, volume in zip(prices, volumes):
        strategy.update_data(price, volume)
        # print(strategy.data)
        signal = strategy.get_last_signal()
        # print(signal)
        if signal > 0 or signal < 0:
            print(strategy.data)
except StopIteration:
    pass  # 如果生成器耗尽，捕获StopIteration异常

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# 参数设置
A = 1  # 振幅
f = 1  # 频率（周期为1/f）
phi = 0  # 相位偏移
B = 0  # 垂直偏移
T = 2  # 总时间
N = 1000  # 点的数量

# 生成时间轴
t = np.linspace(0, T, N, endpoint=False)

# 生成正弦波
y = A * np.sin(2 * np.pi * f * t + phi) + B

# 绘制正弦波
plt.plot(t, y)
plt.title('Sine Wave')
plt.xlabel('Time')
plt.ylabel('Amplitude')
plt.show()

In [None]:

# 导入数据
# df = pd.read_csv('data.csv')

def fetch_data(symbol: str, start_date: str = None, end_date: str = None, adjust: str = ""):
    start_date = (datetime.now() - timedelta(days=500)
                  ).strftime('%Y%m%d') if not start_date else start_date
    end_date = datetime.now().strftime('%Y%m%d') if not end_date else end_date
    # 日期    开盘    收盘    最高    最低     成交量           成交额    振幅   涨跌幅   涨跌额   换手率
    try:
        df = ak.stock_zh_a_hist(
            symbol=symbol, start_date=start_date, end_date=end_date, adjust='')
    except Exception as e:
        print(f'ak.stock_zh_a_hist调用出错。{e}')
    return df


symbol = "300031"  
df = fetch_data(symbol=symbol)
df.rename(columns={"日期":"date","开盘":"open","最高":"high","最低":"low","收盘":"close","成交量":"volume"},inplace=True)
df=df[["date","open","high","low","close","volume"]]
df


In [None]:
import pandas as pd
import pandas_ta as ta

class BollingerOBVStrategy:
    def __init__(self, data, window_bb=5, window_obv=3, window_signal=3,num_std=2.0):
        self.data = data
        self.window_bb = window_bb
        self.window_obv = window_obv
        self.window_signal = window_signal
        self.num_std = num_std

    def add_technical_indicators(self):
        # 计算布林带
        bb_df = ta.bbands(self.data['close'], length=self.window_bb, std=self.num_std)
        columns = {
            f"BBL_{self.window_bb}_{self.num_std}": "bb_lower",
            f"BBM_{self.window_bb}_{self.num_std}": "bb_middle",
            f"BBU_{self.window_bb}_{self.num_std}": "bb_upper"
        }
        bb_df.rename(columns=columns, inplace=True)
        self.data['bb_upper'] = round(bb_df['bb_upper'],2)
        self.data['bb_middle'] = round(bb_df['bb_middle'],2)
        self.data['bb_lower'] = round(bb_df['bb_lower'],2)       
        # 计算OBV
        self.data['obv'] = ta.obv(close=self.data['close'], volume=self.data['volume'])
        # 计算移动平均
        self.data['close_ma'] = round(self.data['close'].rolling(window=self.window_signal).mean(),2)
        self.data['obv_ma'] = round(self.data['obv'].rolling(window=self.window_obv).mean())

    def detect_obv_reversal(self):
        # 初始化OBV趋势状态
        # self.data['obv_trend'] = 0
        # 标记OBV上升或下降趋势
        self.data['obv_trend'] = (self.data['obv'] > self.data['obv'].shift(1)).astype(int)
          # 检测趋势反转
        self.data['obv_reversal'] = (self.data['obv_trend'] != self.data['obv_trend'].shift(1)) & (
            (self.data['obv_trend'] == 0) | (self.data['obv_trend'].shift(1) == 0)
        )

    def generate_signals(self):
        self.detect_obv_reversal()
        # 生成背离信号,顶背离
        self.data['bearish_divergence'] = (self.data['close'] > self.data['close_ma'].shift(1)) & (
            self.data['obv'] < self.data['obv_ma'].shift(1)
        )
         # 生成背离信号,底背离
        self.data['bullish_divergence'] = (self.data['close'] < self.data['close_ma'].shift(1)) & (
            self.data['obv'] > self.data['obv_ma'].shift(1)
        )

        # 生成买卖信号
        self.data['signal'] = 0
        # 卖出信号：顶背离且价格接近布林带上轨，或者OBV趋势反转上升
        # self.data.loc[
        #     (self.data['bearish_divergence'] & (self.data['close'] > self.data['bb_upper'])) |
        #     (self.data['obv_reversal'] & (self.data['obv_trend'] == 1)),
        #     'signal'
        # ] = -1
        self.data.loc[
            (self.data['bearish_divergence'] & (self.data['close'] > self.data['bb_upper'])),
            'signal'
        ] = -1
        # 买入信号：底背离且价格接近布林带下轨，或者OBV趋势反转下降
        # self.data.loc[
        #     (self.data['bullish_divergence'] & (self.data['close'] < self.data['bb_lower'])) |
        #     (self.data['obv_reversal'] & (self.data['obv_trend'] == 0)),
        #     'signal'
        # ] = 1
        self.data.loc[
            (self.data['bullish_divergence'] & (self.data['close'] < self.data['bb_lower'])) ,
            'signal'
        ] = 1


        # 清除重复信号
        self.data['signal'] = self.data['signal'].diff().fillna(0)

    def get_strategy_signals(self):
        self.add_technical_indicators()
        self.generate_signals()
        return self.data[['signal']]

# 使用示例
# df = pd.read_csv('path_to_your_data.csv')  # 加载数据


In [None]:
strategy = BollingerOBVStrategy(df)
signals = strategy.get_strategy_signals()

ss = signals[signals["signal"] != 0.0]
columns = [ "signal","date","close","close_ma", "volume", "bb_upper", "bb_lower", "obv",  "obv_ma", "obv_trend", "obv_reversal", "bearish_divergence", "bullish_divergence"]
columns = [ "signal","close","close_ma", "bb_upper", "bb_lower", "obv",  "obv_trend", "obv_reversal", "bearish_divergence", "bullish_divergence"]
for ind in ss.index:
    # print(strategy.data[columns][ind -5:ind + 5])
    results=strategy.data[columns][ind -5:ind + 5]
    display(HTML(results.to_html(index=False)))