# 背景

对指定个股和指数进行30分钟级别的监控，包括趋势、反转指标等，并通过语音进行报告。

本监控在数据上不依赖于zillionare-omega

In [1]:
! export HTTPS_PROXY=""
from alpha.notebook import *
import datetime
await init_notebook(use_omicron=True)

g = {
    "notified": set(),
    "rsi": RsiStats(FrameType.MIN30)
}

g["rsi"].load()

In [2]:
def get_pmae_threshold(code, frame_type, win):
    if code in ["000001.XSHG"]:
        return {
            "30m": {
                5: 1.51e-4,
                10: 1.51e-4
            }
        }.get(frame_type, {}).get(win, 3e-4)
    else: # 个股
        return {
            "30m": {
                5: 3e-3,
                10: 1e-3
            }
        }.get(frame_type, {}).get(win, 3e-3)
    
def pred_roc(close, win, pmae_threshold):
    """通过ma来预测2周期后的涨跌幅"""
    preds, pmae = predict_by_moving_average(close, win, n_preds = 2, err_threshold = pmae_threshold, n = 7)
    
    if preds:
        return preds[-1] / close[-1] - 1
    else:
        return None
    
def ma_swing(close, win, err_threshold, n=7):
    ma = moving_average(close, win)[-n:]
    ma_ = ma/ma[0]
    (a,b,c), pmae = polyfit(ma_)
    
    if pmae > err_threshold:
        return None
    
    # to check if last points matched
    fit = np.polyval((a,b,c), np.arange(n))
    
    vx = round(-b/(2 * a))
    return a, b, vx, n-vx
    
def notify(frame, text):
    key = f"{frame.hour:02d}:{frame.minute:02d} {text}"
    if key in g["notified"]:
        return
    
    print(key)
    g["notified"].add(key)
    say(text)
        
def canonical_code(code):
    if "." in code:
        return code
    
    if code.startswith("6"):
        return code + ".XSHG"
    else:
        return code + ".XSHE"
    
def to_code(name):
    secs = Securities()
    return secs._secs[secs._secs['display_name']==name]["code"][0]

In [3]:
async def do_check(holdings:list[str], check_at=None):
    end = arrow.now() if check_at is None else arrow.get(check_at)
    start = tf.shift(tf.floor(end, FrameType.MIN30), -39, FrameType.MIN30)
    
    clear_output()
    
    for code in holdings:
        sec = Security(code)
        bars = await sec.load_bars(start, end, FrameType.MIN30)
        close = bars["close"]
        
        rsi = relative_strength_index(close, period=6)[-1]
        prsi = g["rsi"].get_proba(code, rsi)
        
        if prsi >= 0.9:
            notify(end, f"相对强弱指标显示，{sec.display_name}即将高位反转，概率为{prsi:.0%}")
        if prsi <= 0.1:
            notify(end, f"相对强弱指标显示，{sec.display_name}即将低位反转，概率为{1-prsi:.0%}")
        
        # check 30m 5日线摆动
        params = ma_swing(close, 5, get_pmae_threshold(code, '30m', 5))
        if params is None:
            continue
            
        a, b, vx, dist = params
        if code == "000001.XSHG" and abs(a) > 1e-3 and 2 < dist <=4:
            text = "向下拐头，请注意风险" if a < 0 else "向上拐头，可考虑加仓"
            notify(end, f"沪指30分钟均线已经{text}")
            
        if code != "000001.XSHG" and abs(a) > 3e-3 and 2 < dist <= 4:
            text = "向下拐头，请注意风险" if a < 0 else "向上拐头，可考虑加仓"
            notify(end, f"{sec.display_name}30分钟均线已经{text}")
        
async def mon(my_stocks, check_at=None):
    import asyncio
    
    my_stocks = [to_code(name) for name in my_stocks]
    my_stocks.append("000001.XSHG")
    
    now = arrow.now()
    wakeup_time = []
    for tm in ["09:59", "10:29", "10:59", "11:29", "13:29", "13:59", "14:29", "14:56"]:
        hour, minute = map(int, tm.split(":"))
        wakeup_time.append(arrow.Arrow(now.year, now.month, now.day, hour, minute, tzinfo="Asia/Shanghai"))
        
    if check_at:
        wakeup_time.insert(0, arrow.now().shift(seconds=5))
        
    for tm in wakeup_time:
        if arrow.now() > tm:
            continue
            
        seconds = (tm.timestamp - arrow.now().timestamp)
        await asyncio.sleep(seconds)
        print(f"awake at {arrow.now()}")
        await do_check(my_stocks, check_at)

In [None]:
await mon(["中国西电", "格林美", "锦盛新材", "海信视像", "惠泉啤酒", "天齐锂业"])
say("finished")

09:59 相对强弱指标显示，上证指数即将低位反转，概率为96%
