In [14]:
# !python3 -m pip install --upgrade pip
# !python3 -m pip install python-okx --upgrade

In [15]:
import okx.Account as Account
import requests
from datetime import datetime

In [16]:
# demo api key 和secret key，不要用成实盘的
def read_config(filepath="config.txt"):
    config = {}
    with open(filepath, 'r') as file:
        for line in file:
            if '=' in line:
                key, value = line.strip().split('=', 1)
                config[key.strip()] = value.strip()
    return config

config = read_config()
api_key = config.get("api_key")
secret_key = config.get("secret_key")
passphrase = config.get("passphrase")

In [17]:

# check if the api setup works
flag = "1"  # live trading: 0, demo trading: 1
accountAPI = Account.AccountAPI(api_key, secret_key, passphrase, False, flag)
result = accountAPI.get_account_balance()
print(result)

{'msg': 'Invalid OK-ACCESS-KEY', 'code': '50111'}


In [18]:
class Candle:

    def __init__(self, timestamp, open_p, high, low, close):
        self.timestamp = timestamp
        self.open_p = open_p
        self.high = high
        self.low = low
        self.close = close

    def __str__(self) -> str:
        return f"Time: {self.timestamp}, Open: {self.open_p}, High: {self.high}, Low: {self.low}, Close: {self.close}"


In [19]:
def get_4h_candlesticks(inst_id='BTC-USDT', limit=100):
    url = "https://www.okx.com/api/v5/market/candles"
    params = {
        "instId": inst_id,
        "bar": "4H",
        "limit": limit
    }
    

    candles = []
    response = requests.get(url, params=params)
    if response.status_code == 200:
        json_data = response.json()
        data = json_data['data']
        print(len(data))
        # fetched data are from most recent to oldest
        for datapoint in data:
            # last element indicate if the current candle is finished or not
            # finish = candle[8]
            timestamp, open_p, high, low, close = datapoint[0:5]
            # print(f"Time: {timestamp}, Open: {open_p}, High: {high}, Low: {low}, Close: {close}")
            new_candle = Candle(to_readable_time(int(timestamp)), float(open_p), float(high), float(low), float(close))
            candles.append(new_candle)
        candles.reverse()
        return candles
    else:
        print("Error:", response.status_code, response.text)
        return None


def to_readable_time(timestamp_ms: int) -> str:
    dt = datetime.fromtimestamp(timestamp_ms / 1000).astimezone()
    return dt.strftime('%Y-%m-%d %H:%M:%S')

candles = get_4h_candlesticks()

for candle in candles:
    print(candle)

100
Time: 2025-05-23 05:00:00, Open: 109083.7, High: 109999.9, Low: 107313.9, Close: 108940.6
Time: 2025-05-23 09:00:00, Open: 108942.4, High: 109812.2, Low: 108004.9, Close: 108714.5
Time: 2025-05-23 13:00:00, Open: 108714.5, High: 108730.1, Low: 106800.0, Close: 107324.0
Time: 2025-05-23 17:00:00, Open: 107324.1, High: 108371.9, Low: 106880.0, Close: 108370.0
Time: 2025-05-23 21:00:00, Open: 108370.0, High: 108581.8, Low: 107567.7, Close: 107600.8
Time: 2025-05-24 01:00:00, Open: 107600.8, High: 109488.6, Low: 107600.8, Close: 109196.0
Time: 2025-05-24 05:00:00, Open: 109196.0, High: 109279.9, Low: 108540.0, Close: 108939.3
Time: 2025-05-24 09:00:00, Open: 108939.3, High: 109221.9, Low: 108680.0, Close: 108944.4
Time: 2025-05-24 13:00:00, Open: 108944.3, High: 109120.5, Low: 107500.0, Close: 107767.2
Time: 2025-05-24 17:00:00, Open: 107767.3, High: 108258.0, Low: 107192.4, Close: 108091.3
Time: 2025-05-24 21:00:00, Open: 108091.3, High: 108299.1, Low: 107440.1, Close: 107682.1
Time: 

In [20]:
class Extreme:
    def __init__(self, timestamp, open_p, high, low, close, extreme_type):
        self.timestamp = timestamp
        self.open_p = open_p
        self.high = high
        self.low = low
        self.close = close
        self.type = extreme_type

    def __str__(self) -> str:
        return f"{self.type} Extreme: Time: {self.timestamp}, Open: {self.open_p}, High: {self.high}, Low: {self.low}, Close: {self.close}"


In [21]:
def find_extremes(candles: Candle):
    """
    通过k线高点高于前后各一根k线或者低于前后各一根k线来找到每个高点和低点
    除了MSB都是根据影线来判断
    """
    extremes = []
    length = len(candles)
    # TODO: 第一根k线永远不可能是极值点，这一点要不要考虑
    for i in range(1, length-1):
        is_high = candles[i].high >= candles[i-1].high and candles[i].high >= candles[i+1].high
        is_low = is_low = candles[i].low <= candles[i-1].low and candles[i].low <= candles[i+1].low

        # 高点
        if is_high and is_low:
            new_extreme = Extreme(candles[i].timestamp, candles[i].open_p, candles[i].high, candles[i].low, candles[i].close, 'both')
            extremes.append(new_extreme)
        elif is_high:
            new_extreme = Extreme(candles[i].timestamp, candles[i].open_p, candles[i].high, candles[i].low, candles[i].close, 'high')
            extremes.append(new_extreme)
        elif is_low:
            new_extreme = Extreme(candles[i].timestamp, candles[i].open_p, candles[i].high, candles[i].low, candles[i].close, 'low')
            extremes.append(new_extreme)
    return extremes



def filter_extremes(extremes):
    """
    如果出现高点后接高点的情况，记录两个高点中的最高点信息，
    如果出现低点后接低点的情况，记录两个低点中的最低点信息，
    如果一根k线既是高点又是低点，则根据之前的点来判断，如果之前的点是高点，则记录该点的低点信息，
    反之，如果之前的点是低点，则记录该点的高点信息，从而完成初步的过滤，但仍存在的问题是记录小级别高低点和趋势性暂时未知。  
    """

    length = len(extremes)

    # 如果最开始就是both，先默认最开始为high
    if extremes[0] == 'both':
        extremes[0].type = 'high'

    filtered_extremes = [extremes[0]] # 保留起始点作为趋势判断的开始

    
    for i in range(1, length-1):
        prev = filtered_extremes[-1]
        cur = extremes[i]


        # TODO：还是上面提出的问题，如果最开始就是both呢？
        if cur.type == "both":
            cur.type = "low" if prev.type == "high" else "high"

            
        if cur.type == prev.type:
            if cur.type == "high":
                # 连续高点，保留更高
                if cur.high > prev.high:
                    filtered_extremes[-1] = cur
            elif cur.type == "low":
                # 连续低点，保留更低
                if cur.low < prev.low:
                    filtered_extremes[-1] = cur
        else:
            # 不同类型极值点，直接加入
            filtered_extremes.append(cur)
        
    return filtered_extremes

extremes = find_extremes(candles)
for e in extremes:
    print(e)
print("\n-------------------------------------------\n")


filtered_extremes = filter_extremes(extremes)
for e in filtered_extremes:
    print(e)


low Extreme: Time: 2025-05-23 13:00:00, Open: 108714.5, High: 108730.1, Low: 106800.0, Close: 107324.0
high Extreme: Time: 2025-05-24 01:00:00, Open: 107600.8, High: 109488.6, Low: 107600.8, Close: 109196.0
low Extreme: Time: 2025-05-24 17:00:00, Open: 107767.3, High: 108258.0, Low: 107192.4, Close: 108091.3
high Extreme: Time: 2025-05-24 21:00:00, Open: 108091.3, High: 108299.1, Low: 107440.1, Close: 107682.1
low Extreme: Time: 2025-05-25 05:00:00, Open: 107249.8, High: 107743.8, Low: 106600.0, Close: 106973.3
high Extreme: Time: 2025-05-25 21:00:00, Open: 109621.9, High: 110214.8, Low: 109360.3, Close: 110089.0
both Extreme: Time: 2025-05-26 09:00:00, Open: 110007.1, High: 110450.0, Low: 108851.1, Close: 109123.5
low Extreme: Time: 2025-05-26 17:00:00, Open: 109436.7, High: 109677.9, Low: 107505.2, Close: 108886.4
both Extreme: Time: 2025-05-27 05:00:00, Open: 109588.3, High: 110729.0, Low: 108621.2, Close: 110145.7
low Extreme: Time: 2025-05-27 17:00:00, Open: 108936.0, High: 109282

In [22]:
def find_initial_trend(filtered_extremes):
    """
    从记录的高低点信息中从最开始往后遍历，每次读取四个点，
    即两个高点信息和两个低点信息，当出现两个更高高点和两个更高低点时，
    记录初始趋势为上升。当出现两个更低高点和两个更低低点时，记录初始趋势为下降
    """
    idx = 0
    while idx < len(filtered_extremes) - 3:
        if filtered_extremes[idx].type == 'low' and \
            filtered_extremes[idx+1].type == 'high' and \
            filtered_extremes[idx+2].type == 'low' and \
            filtered_extremes[idx+3].type == 'high' and \
            filtered_extremes[idx].low < filtered_extremes[idx+2].low and \
            filtered_extremes[idx+1].high < filtered_extremes[idx+3].high:
                return 'rise'
        
        elif filtered_extremes[idx].type == 'high' and \
            filtered_extremes[idx+1].type == 'low' and \
            filtered_extremes[idx+2].type == 'high' and \
            filtered_extremes[idx+3].type == 'low' and \
            filtered_extremes[idx].high > filtered_extremes[idx+2].high and \
            filtered_extremes[idx+1].low > filtered_extremes[idx+3].low:
                return "drop"
        else:
            idx += 1
        
    # 没有明显趋势，震荡区间
    return "bumpy"
    # TODO: 没有讨论怎么处理？


def analyze_trend(initial_trend, filtered_extremes):
    key_points = []  # 用于记录趋势变化和关键点
    state = initial_trend  # 当前趋势
    i = 0

    # 初始化状态维护变量
    if state == 'rise':
        last_higher_low = None
        last_higher_high = None
        # 只有低点高于前一个低点，并且下一个高点被确认为higher_high时，才会记录之前的低点为higher_low
        # 暂时存起来用作后续判定
        candidate_lows = []
    else:
        last_lower_high = None
        last_lower_low = None
        candidate_highs = []

    while i < len(filtered_extremes):
        point = filtered_extremes[i]

        if state == 'rise':
            if point.type == 'low':
                if last_higher_low:
                    # 如果之前有低点，并且当前点的实体部分低于之前低点的low
                    if point.close < last_higher_low.low:
                        # 结构破坏，趋势反转为下跌
                        state = 'drop'
                        key_points.append({'type': 'MSB', 'trend': 'rise -> drop', 'trigger': point})
                        # 确定反转为下跌之后，就可以确定第一对lower_low和lower_high
                        last_lower_low = point
                        last_lower_high = filtered_extremes[i - 1] if i > 0 else None
                        # 此时变为下降趋势，重置candidates高点
                        candidate_highs = []
                    else:
                        # 如果之前有低点，但是没有任何跌破，说明在震荡，把当前点放在candidates low里面以便后续使用
                        # 等于不算做跌破
                        candidate_lows.append(point)
                else:
                    # 如果上涨趋势之前还没有higher low
                    candidate_lows.append(point)

            elif point.type == 'high':
                # 如果之前没有高点，或者当前点实体部分比之前高点高，趋势延续
                if last_higher_high is None or point.high > last_higher_high.high:
                    last_higher_high = point
                    # 只记录准备买入的位置
                    # key_points.append({'type': 'continuation', 'trend': 'rise', 'trigger': point})
                    if candidate_lows:
                        # 如果存在低点，则选取之前的震荡区间中的最低点作为higher_low
                        # TODO: !!!!! 这个代码逻辑需要仔细考虑 
                        last_higher_low = min(candidate_lows, key=lambda p: p.low)
                        key_points.append({'type': 'continuation', 'trend': 'rise', 'trigger': last_higher_low})
                        candidate_lows.clear()

        else:  # state == 'drop'
            if point.type == 'high':
                # 存在lower_high，并且实体突破原来的lower_high
                if last_lower_high:
                    if point.close > last_lower_high.high:
                        # 结构破坏，趋势反转为上涨
                        state = 'rise'
                        key_points.append({'type': 'MSB', 'trend': 'drop -> rise', 'trigger': point})
                        last_higher_high = point
                        last_higher_low = filtered_extremes[i - 1] if i > 0 else None
                        candidate_lows = []
                    else:
                        candidate_highs.append(point)
                else:
                    candidate_highs.append(point)
            elif point.type == 'low':
                # 如果之前没有低点，或者当前点实体部分比之前低点低，趋势延续
                if last_lower_low is None or point.low < last_lower_low.low:
                    last_lower_low = point
                    # 只记录准备买入的位置
                    # key_points.append({'type': 'continuation', 'trend': 'drop', 'trigger': point})
                    if candidate_highs:
                        last_lower_high = max(candidate_highs, key=lambda p: p.high)
                        key_points.append({'type': 'continuation', 'trend': 'drop', 'trigger': last_lower_high})
                        candidate_highs.clear()
        i += 1

    return key_points

In [23]:
initial_trend = find_initial_trend(filtered_extremes)
print(initial_trend)


key_points = analyze_trend(initial_trend, filtered_extremes)
for key_point in key_points:
    if key_point['type'] == 'continuation':
        print(f"{key_point['type']} : {key_point['trend']}, should buy at {key_point['trigger'].timestamp}")
    elif key_point['type'] == 'MSB':
        print(f"ATTENTION!! {key_point['type']} : {key_point['trend']} at {key_point['trigger'].timestamp}")

drop
continuation : drop, should buy at 2025-05-24 01:00:00
ATTENTION!! MSB : drop -> rise at 2025-05-25 21:00:00
continuation : rise, should buy at 2025-05-26 17:00:00
ATTENTION!! MSB : rise -> drop at 2025-05-29 17:00:00
continuation : drop, should buy at 2025-05-29 21:00:00
continuation : drop, should buy at 2025-05-30 05:00:00
ATTENTION!! MSB : drop -> rise at 2025-06-03 05:00:00
ATTENTION!! MSB : rise -> drop at 2025-06-04 21:00:00
continuation : drop, should buy at 2025-06-05 05:00:00


In [24]:
# 这里找了所有ob块，不只是最近的一个
def find_ob(key_points, candles):
    order_blocks = []
    # 为了在所有candles里面找到trigger之前的那根k线
    candle_index_map = {candle.timestamp: i for i, candle in enumerate(candles)}

    for point in key_points:
        # 只做顺趋势
        if point["type"] == "continuation":
            trigger_candle = point['trigger']
            trigger_index = candle_index_map[trigger_candle.timestamp]

            # 因为要找到前一个，所以idx不能等于0
            if trigger_index is not None and trigger_index > 0:
                prev_candle = candles[trigger_index - 1]

                if point["trend"] == "rise":
                    # 之前是一根阴线
                    # TODO: 5 对于higher low买点，如果极值点前一根不是阴线怎么办?
                    if prev_candle.close < prev_candle.open_p:
                        ob = {
                            "type" : "Bullish OB",
                            "timestamp" : prev_candle.timestamp,
                            "top_price" : prev_candle.open_p,
                            'bottom_price': prev_candle.close
                        }
                        order_blocks.append(ob)

                
                elif point["trend"] == "drop":
                     if prev_candle.close > prev_candle.open_p:
                        ob = {
                            'type': 'Bearish OB',
                            'timestamp': prev_candle.timestamp,
                            'top_price': prev_candle.close,
                            'bottom_price': prev_candle.open_p
                        }
                        order_blocks.append(ob)

    return order_blocks

In [25]:
order_blocks = find_ob(key_points, candles)

print("\n--- Identified Order Blocks ---\n")
if not order_blocks:
    print("No significant order blocks were found.")
else:
    for ob in order_blocks:
        print(f"Type: {ob['type']} at {ob['timestamp']}. Price Range: ${ob['bottom_price']} - ${ob['top_price']}")


--- Identified Order Blocks ---

Type: Bearish OB at 2025-05-29 17:00:00. Price Range: $105586.1 - $106026.2
Type: Bearish OB at 2025-05-30 01:00:00. Price Range: $105031.6 - $105874.7
Type: Bearish OB at 2025-06-05 01:00:00. Price Range: $104525.8 - $104810.6
