# 背离策略

In [7]:
import json
import os
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Optional
import math
import time

import numpy as np
import pandas as pd
import talib as ta
import mplfinance as mpf
from dotenv import load_dotenv
from okx.MarketData import MarketAPI
from pandas import DataFrame
from tqdm import tqdm

from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = 'all'

load_dotenv()
KEY = os.getenv("OKX_API_KEY")
SECRET = os.getenv("OKX_API_SECRET")
assert KEY and SECRET, "API key and secret are required"
market = MarketAPI(KEY, SECRET, flag="0", debug=False)

def to_candles(data: list) -> DataFrame:
    """将数据转换为用于绘制 K 线图的 DataFrame

    Args:
        data: JSON 数据

    Returns:
        DataFrame: 一组 K 线数据帧
    """
    df = pd.DataFrame(
        data,
        columns=[
            "ts",
            "open",
            "high",
            "low",
            "close",
            "volume",
            "volCcy",
            "volCcyQuote",
            "_",
        ],
    )
    return df

def get_current_candlestick(instId: str, bar: str = "1H") -> DataFrame:
    """获取当前 K 线数据"""
    if not isinstance(instId, str):
        raise TypeError("instId must be a string")

    # 获取最近 3 个小时级别的 K 线数据
    resp = market.get_candlesticks(instId, bar=bar, limit="2")
    data = resp["data"]
    df = to_candles(data)
    return df


def get_period(bar):
    """
    根据给定的时间条形（bar）字符串，返回相应的时间周期（以毫秒为单位）。

    参数:
    bar (str): 时间条形字符串，例如 "1s", "1m", "3m", "5m", "15m", "1H", "2H", "4H", "1D"

    返回:
    int: 对应的时间周期，以毫秒为单位

    异常:
    ValueError: 如果输入的 bar 值不在预定义的范围内
    """
    if bar == "1s":
        period = 1 * 1000
    elif bar == "1m":
        period = 60 * 1000
    elif bar == "3m":
        period = 180 * 1000
    elif bar == "5m":
        period = 300 * 1000
    elif bar == "15m":
        period = 900 * 1000
    elif bar == "1H":
        period = 3600 * 1000
    elif bar == "2H":
        period = 7200 * 1000
    elif bar == "4H":
        period = 14400 * 1000
    elif bar == "1D":
        period = 86400 * 1000
    else:
        period = 0
    return period


def get_candlesticks(instId: str, after: int | None = None, before: int | None = None, bar: str = "1H") -> pd.DataFrame:
    """获取 K 线数据, 包含历史数据和最新数据"""
    if not isinstance(instId, str):
        raise TypeError("instId must be a string")

    dst_dir = f"./data/{instId}/{bar}"
    file_path = f"./data/{instId}/{bar}/{instId}.json"
    period = get_period(bar)
    
    # 获取最新的 K 线数据
    latest_df = get_current_candlestick(instId, bar)
    history_data = []
    after_list = [int(after)]
    now_timestamp = datetime.now().timestamp() * 1000
    while 1:
        print(after_list[-1], period)
        _after = after_list[-1] + period * 100
        after_list.append(_after)
        if _after > now_timestamp:
            break
            
    # 获取历史的 K 线数据
    for _after in tqdm(after_list):
        resp = market.get_history_candlesticks(instId, bar=bar, after=str(_after))
        data = resp["data"]
        history_data.extend(data)
        if len(data) == 0:
            break
        last_data = data[-1]
        time.sleep(0.1)
    
    history_df = to_candles(history_data)
    df = merge_candlesticks(history_df, latest_df)
    data = df.to_dict(orient="records")
    os.makedirs(dst_dir, exist_ok=True)
    with open(file_path, "w") as f:
        f.write(json.dumps(data, indent=4))
    return df

def merge_candlesticks(df1: DataFrame, df2: DataFrame) -> DataFrame:
    """合并两个 K 线数据帧

    Args:
        df1: K 线数据帧 1
        df2: K 线数据帧 2

    Returns:
        DataFrame: 合并后的 K 线数据帧
    """
    # 合并历史数据和最新数据，按时间戳去重，保留 '_' 为 1 的数据，即收盘数据
    df = pd.concat([df1, df2], ignore_index=True)
    df.drop_duplicates("ts", keep="last", inplace=True)
    df.sort_values("ts", ascending=True, inplace=True)
    return df

True

In [8]:
# 获取过去一天的数据
def get_df(instId):
    start = datetime.now() - timedelta(days=1)
    df = get_candlesticks(instId, bar="15m", after=start.timestamp() * 1000)
    df.reset_index(drop=True, inplace=True)
    df["ts"] = df["ts"].astype(int)
    df["ts"] = pd.to_datetime(df["ts"], unit="ms") + timedelta(hours=8)
    df["high"] = df["high"].astype(float)
    df["low"] = df["low"].astype(float)
    df["close"] = df["close"].astype(float)
    df["hlc3"] = (df["high"] + df["low"] + df["close"]) / 3
    return df

In [9]:
instId = "SOL-USDT-SWAP"
df = get_df(instId)
df

1717658158974 None


TypeError: unsupported operand type(s) for *: 'NoneType' and 'int'