In [None]:
from futu import *

In [None]:
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)  # 创建行情对象

In [None]:
code='HK.00700'

In [None]:
ret, dates = quote_ctx.get_option_expiration_date(code=code)
if ret != RET_OK:
    print('error:', ret)

In [None]:
dates

In [None]:
start_date=dates.iloc[1]['strike_time']
end_date=dates.iloc[3]['strike_time']
start_date, end_date

In [None]:
filter1 = OptionDataFilter()
filter1.delta_min = 0.3
filter1.delta_max = 0.7

In [None]:
ret, options1 = quote_ctx.get_option_chain(code=code, start=start_date, end=start_date, 
                                          option_type=OptionType.CALL, data_filter=filter1)
if ret != RET_OK:
    print('error:', ret)
options1    

In [None]:
ret, options2 = quote_ctx.get_option_chain(code=code, start=end_date, end=end_date, 
                                          option_type=OptionType.CALL, data_filter=filter1)
if ret != RET_OK:
    print('error:', ret)
options2

In [None]:
option_codes=[options1.iloc[0]['code'],options2.iloc[1]['code']]
option_codes

In [None]:
ret, snaps = quote_ctx.get_market_snapshot(option_codes)
if ret != RET_OK:
    print('error:', ret)

In [None]:
snaps[['code','update_time','last_price','option_open_interest','option_implied_volatility']]

In [None]:
import plotly.graph_objs as go
import ipywidgets as widgets
from ipywidgets import VBox, Label
from IPython.display import display, clear_output
import time
import numpy as np
import threading

time_data = []
price_data_1 = [] 
price_data_2 = []
MAX_DATA_POINTS = 1000 

fig = go.FigureWidget(
    data=[go.Scatter(x=time_data, y=price_data_1, mode='lines', name=snaps.iloc[0]['name'], 
                     hovertemplate="Time: %{x}<br>Curve 1: %{y}<extra></extra>"),
          go.Scatter(x=time_data, y=price_data_2, mode='lines', name=snaps.iloc[1]['name'], 
                     hovertemplate="Time: %{x}<br>Curve 2: %{y}<extra></extra>")],
    layout=go.Layout(
        title="realtime implied volatility",
        xaxis=dict(title="time"),
        yaxis=dict(title="implied volatility")
    )
)

label = Label(value="will update")
start_button = widgets.Button(description="Start")
stop_button = widgets.Button(description="Stop")
clear_button = widgets.Button(description="Clear")
ui = VBox([fig, label, start_button, stop_button, clear_button])

updating = False

# 更新图表的函数
def update_chart():
    global updating
    while updating:
        ret, snaps = quote_ctx.get_market_snapshot(option_codes)        
        if ret != RET_OK:
            print('error:', ret)
            break
    
        new_time = snaps.iloc[0]['update_time']
        new_price_1 = snaps.iloc[0]['option_implied_volatility']
        new_price_2 = snaps.iloc[1]['option_implied_volatility']

        last_time = ''
        if time_data:
            last_time = time_data[-1]
        label.value= f'last refresh:{time.strftime("%H:%M:%S")}, new time:{new_time}, last time: {last_time}, len={len(time_data)}'
        if last_time and new_time == last_time:
            clear_output(wait=True)
            display(ui)
            time.sleep(5)
            continue

        # 添加新数据
        time_data.append(new_time)
        price_data_1.append(new_price_1)
        price_data_2.append(new_price_2)

        # 如果数据超过最大数量，删除最前面的数据
        if len(time_data) > MAX_DATA_POINTS:
            time_data.pop(0)
            price_data_1.pop(0)
            price_data_2.pop(0)

        # 更新图表数据
        fig.data[0].x = time_data
        fig.data[0].y = price_data_1
        fig.data[1].x = time_data
        fig.data[1].y = price_data_2

        latest_annotation = [
            dict(
                x=time_data[-1],
                y=price_data_1[-1],
                text=f"{snaps.iloc[0]['name']}: {price_data_1[-1]:.2f}",
                showarrow=True,
                arrowhead=2,
                ax=-40, ay=-20,
                font=dict(color="blue")
            ),
            dict(
                x=time_data[-1],
                y=price_data_2[-1],
                text=f"{snaps.iloc[1]['name']}: {price_data_2[-1]:.2f}",
                showarrow=True,
                arrowhead=2,
                ax=-40, ay=20,
                font=dict(color="red")
            )
        ]
        fig.layout.annotations = latest_annotation        

        # 清除并显示新的图表
        clear_output(wait=True)
        display(ui)

        time.sleep(5)

def start_update(_):
    global updating
    updating = True
    threading.Thread(target=update_chart, daemon=True).start()

def stop_update(_):
    global updating
    updating = False

def clear_chart(_):
    global time_data, price_data_1, price_data_2
    time_data = []
    price_data_1 = []
    price_data_2 = []
    fig.data[0].x = time_data
    fig.data[0].y = price_data_1
    fig.data[1].x = time_data
    fig.data[1].y = price_data_2
    clear_output(wait=True)
    display(fig, start_button, stop_button, clear_button)

start_button.on_click(start_update)
stop_button.on_click(stop_update)
clear_button.on_click(clear_chart)

display(ui)


In [None]:
ret, history, page_req_key = quote_ctx.request_history_kline(code, start='2024-12-27', ktype=KLType.K_1M)
if ret != RET_OK:
    print('error:', ret)

In [None]:
history.head()

In [None]:
df = history
df['time_key'] = pd.to_datetime(df['time_key'])

In [None]:
import plotly.graph_objects as go

# 创建 K 线图
fig = go.Figure()

# K线图
fig.add_trace(go.Candlestick(
    x=df['time_key'],
    open=df['open'],
    high=df['high'],
    low=df['low'],
    close=df['close'],
    name='K线'
))

# 成交量图
fig.add_trace(go.Bar(
    x=df['time_key'],
    y=df['volume'],
    name='成交量',
    marker=dict(color='rgba(0,0,255,0.3)'),
    yaxis='y2'  # 使用第二个 y 轴
))

# 设置布局
fig.update_layout(
    title='K线图与成交量',
    xaxis_title='时间',
    yaxis_title='价格',
    yaxis2=dict(
        title='成交量',
        overlaying='y',  # 让第二个 y 轴覆盖在第一个 y 轴上
        side='right'  # 成交量显示在右侧
    ),
    xaxis_rangeslider_visible=False  # 隐藏下方的范围滑块
)

# 显示图表
fig.show()
