<a href="https://colab.research.google.com/github/takawasi/aeonix-agent/blob/main/%E3%83%92%E3%82%B2%E9%A0%82%E7%82%B9%E5%8F%96%E3%82%8A%E4%BD%9C%E6%88%A6%EF%BC%9A%E5%AE%9F%E3%82%B3%E3%83%BC%E3%83%89%E6%8F%90%E6%A1%88.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# config.py
# 設定ファイル：APIキーや取引パラメータなどを管理します。

API_KEY = "YOUR_API_KEY"  # ご自身のAPIキーに置き換えてください
API_SECRET = "YOUR_SECRET_KEY"  # ご自身のAPIシークレットに置き換えてください

# 取引設定
TARGET_SYMBOL = "USD_JPY"  # 取引対象の通貨ペア
ORDER_SIZE = "100"         # 注文数量（GMOコインの最小取引単位に合わせてください）
# 例: "10000" (1万通貨) など。取引ルールAPI (/public/v1/symbols) で確認してください。

# ファイルパス
LOG_FILE_PATH = "logs/trigger_log.jsonl"  # 発注ログの保存先
KLINES_DATA_FILE = "data/latest_kline_data.json" # 1分足データを共有するための一時ファイル

# APIエンドポイント
PUBLIC_API_URL = "https://forex-api.coin.z.com/public"
PRIVATE_API_URL = "https://forex-api.coin.z.com/private"
PUBLIC_WEBSOCKET_URL = "wss://forex-api.coin.z.com/ws/public/v1"

# レート制限に関する考慮
# GETリクエスト: 6回/秒 (klines_sync.pyは1分に1回なので問題なし)
# POSTリクエスト: 1回/秒 (place_order.pyで考慮が必要)
# WebSocket subscribe/unsubscribe: 1回/秒

# ==============================================================================
# gmo_api_utils.py
# GMOコインAPIとの通信に関する共通関数群
# ==============================================================================
import time
import hmac
import hashlib
import json
import requests
from datetime import datetime
import os

# logsディレクトリとdataディレクトリが存在しない場合は作成
if not os.path.exists("logs"):
    os.makedirs("logs")
if not os.path.exists("data"):
    os.makedirs("data")

def create_signature(timestamp, method, path, secret_key, req_body=None):
    """
    GMOコインAPIの署名を生成します。
    """
    if req_body:
        text = timestamp + method + path + json.dumps(req_body)
    else:
        text = timestamp + method + path

    sign = hmac.new(bytes(secret_key.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()
    return sign

def gmo_public_request(endpoint, path, params=None):
    """
    GMOコイン Public APIへのGETリクエストを送信します。
    """
    try:
        response = requests.get(endpoint + path, params=params, timeout=10)
        response.raise_for_status()  # HTTPエラーがあれば例外を発生
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Public APIリクエストエラー: {e}")
        return None

def gmo_private_request(method, endpoint, path, api_key, secret_key, req_body=None, params=None):
    """
    GMOコイン Private APIへのリクエストを送信します。
    """
    timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
    headers = {
        "API-KEY": api_key,
        "API-TIMESTAMP": timestamp,
        "API-SIGN": create_signature(timestamp, method, path, secret_key, req_body)
    }

    try:
        if method == 'GET':
            response = requests.get(endpoint + path, headers=headers, params=params, timeout=10)
        elif method == 'POST':
            headers["Content-Type"] = "application/json" # POSTの場合、Content-Typeが必要
            response = requests.post(endpoint + path, headers=headers, data=json.dumps(req_body) if req_body else None, timeout=10)
        else:
            raise ValueError("Unsupported HTTP method")

        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Private APIリクエストエラー ({method} {path}): {e}")
        if response is not None:
            print(f"Response content: {response.text}")
        return None
    except ValueError as e:
        print(f"ValueError: {e}")
        return None

# ==============================================================================
# klines_sync.py
# 1分足のローソク足データを取得し、高値・安値を更新するモジュール
# ==============================================================================
import time
import json
from datetime import datetime, timedelta
# from config import TARGET_SYMBOL, KLINES_DATA_FILE, PUBLIC_API_URL  # config.pyから設定を読み込む
# from gmo_api_utils import gmo_public_request # gmo_api_utils.pyから関数を読み込む

def get_latest_kline_data(symbol):
    """
    指定された通貨ペアの最新の1分足データを取得し、高値と安値を返す。
    """
    path = '/v1/klines'
    # GMO APIは日付をYYYYMMDD形式で要求。日本時間朝6時で日付が切り替わる点に注意。
    # 簡単のため、ここでは現在の日付を使用。本番環境では正確な日付管理が必要。
    today_str = datetime.now().strftime('%Y%m%d')
    params = {
        "symbol": symbol,
        "priceType": "ASK", # または "BID"、戦略に応じて調整
        "interval": "1min",
        "date": today_str
    }
    data = gmo_public_request(PUBLIC_API_URL, path, params)

    if data and data.get("status") == 0 and data.get("data"):
        # 最新の足はリストの最後に来ることが多いが、API仕様を確認すること
        # GMOのドキュメントではソート順が明記されていないため、openTimeでソートするか、
        # 最新のものを特定するロジックが必要になる場合があります。
        # ここでは単純に最後の要素を取得すると仮定します。
        if data["data"]: # データが空でないことを確認
            latest_kline = data["data"][-1] # 最新の足を取得
            return {
                "timestamp": latest_kline.get("openTime"), # Unixタイムスタンプ (ミリ秒)
                "high": float(latest_kline.get("high")),
                "low": float(latest_kline.get("low"))
            }
    return None

def update_kline_data_file():
    """
    最新の1分足データを取得し、ファイルに保存する。
    """
    print(f"{datetime.now()}: 1分足データを取得中...")
    kline_data = get_latest_kline_data(TARGET_SYMBOL)
    if kline_data:
        try:
            with open(KLINES_DATA_FILE, 'w') as f:
                json.dump(kline_data, f)
            print(f"{datetime.now()}: 1分足データを更新しました: High={kline_data['high']}, Low={kline_data['low']}")
        except IOError as e:
            print(f"ファイル書き込みエラー ({KLINES_DATA_FILE}): {e}")
    else:
        print(f"{datetime.now()}: 1分足データの取得に失敗しました。")

def klines_sync_main():
    """
    klines_sync.py のメイン処理。定期的に1分足データを更新する。
    """
    print("klines_sync.py: 1分足データ取得モジュール起動")
    while True:
        update_kline_data_file()
        time.sleep(60) # 1分ごとに実行

# ==============================================================================
# order_manager.py
# 注文実行とロギングを行うモジュール
# ==============================================================================
import json
import time
from datetime import datetime
# from config import API_KEY, API_SECRET, TARGET_SYMBOL, ORDER_SIZE, LOG_FILE_PATH, PRIVATE_API_URL
# from gmo_api_utils import gmo_private_request

# 最後に注文した時刻を記録（レートリミット対策の簡易的なもの）
last_order_time = 0

def place_order(side, symbol, size, execution_type="MARKET", settle_type="OPEN"):
    """
    注文を発注する関数。
    side: "BUY" または "SELL"
    symbol: 通貨ペア (例: "USD_JPY")
    size: 注文数量 (文字列)
    execution_type: "MARKET", "LIMIT", "STOP"
    settle_type: "OPEN" (両建て用), "CLOSE"
    """
    global last_order_time
    current_time = time.time()

    # POSTリクエストは1秒間に1回の上限
    if current_time - last_order_time < 1.1: # 少し余裕を持たせる
        print(f"{datetime.now()}: レートリミットのため注文を見送ります。前回の注文から {current_time - last_order_time:.2f} 秒経過。")
        return None

    path = '/v1/order'
    req_body = {
        "symbol": symbol,
        "side": side,
        "executionType": execution_type,
        "timeInForce": "FAK", # Fill and Kill (IOC) または FAS (Fill and Store) が成行では一般的
        "size": str(size),
        "settleType": settle_type
    }
    # 指値・逆指値の場合、priceの指定が必要 (この戦略では成行なので不要)
    # if execution_type == "LIMIT" or execution_type == "STOP":
    #     req_body["price"] = str(price) # priceを引数に追加する必要がある

    print(f"{datetime.now()}: 注文リクエスト: {req_body}")
    response = gmo_private_request('POST', PRIVATE_API_URL, path, API_KEY, API_SECRET, req_body=req_body)
    last_order_time = current_time # 注文試行時刻を更新

    log_entry = {
        "timestamp": datetime.now().isoformat(),
        "symbol": symbol,
        "side": side,
        "size": size,
        "execution_type": execution_type,
        "settle_type": settle_type,
        "request_body": req_body,
        "response": response
    }

    try:
        with open(LOG_FILE_PATH, 'a') as f:
            json.dump(log_entry, f)
            f.write('\n')
    except IOError as e:
        print(f"ログファイル書き込みエラー ({LOG_FILE_PATH}): {e}")

    if response and response.get("status") == 0:
        order_id = response.get("data") # GMOの仕様では成功時に注文ID(数値)がdataに入る
        print(f"{datetime.now()}: 注文成功。注文ID: {order_id}")
        return order_id
    else:
        error_code = response.get("messages")[0].get("message_code") if response and response.get("messages") else "N/A"
        error_message = response.get("messages")[0].get("message") if response and response.get("messages") else "Unknown error"
        print(f"{datetime.now()}: 注文失敗。エラーコード: {error_code}, メッセージ: {error_message}")
        return None

# ==============================================================================
# tick_watcher.py
# WebSocketでティックデータを受信し、エントリー判断を行うモジュール
# ==============================================================================
import websocket # websocket-client ライブラリが必要
import json
import time
from datetime import datetime
# from config import TARGET_SYMBOL, KLINES_DATA_FILE, PUBLIC_WEBSOCKET_URL, ORDER_SIZE
# from order_manager import place_order # order_manager.py から関数を読み込む

# 最後にエントリーした足のタイムスタンプと方向を記録 (同一ヒゲへの多重発注防止用)
last_entry_kline_timestamp = None
last_entry_side = None # "BUY" or "SELL"

def on_message(ws, message):
    """WebSocketでメッセージ受信時の処理"""
    global last_entry_kline_timestamp, last_entry_side

    try:
        tick_data = json.loads(message)
        # print(f"受信ティック: {tick_data}") # デバッグ用

        if tick_data.get("channel") == "ticker" and tick_data.get("symbol") == TARGET_SYMBOL:
            ask_price = float(tick_data.get("ask"))
            bid_price = float(tick_data.get("bid"))
            tick_timestamp_str = tick_data.get("timestamp") # 例: "2023-05-19T02:51:24.516493Z"
            tick_datetime = datetime.fromisoformat(tick_timestamp_str.replace("Z", "+00:00"))


            # 共有ファイルから最新の1分足データを読み込む
            try:
                with open(KLINES_DATA_FILE, 'r') as f:
                    kline_data = json.load(f)
            except (FileNotFoundError, json.JSONDecodeError) as e:
                print(f"{datetime.now()}: klineデータファイル読み込みエラー: {e}。次の更新まで待機します。")
                return

            if not kline_data or "high" not in kline_data or "low" not in kline_data or "timestamp" not in kline_data:
                print(f"{datetime.now()}: klineデータが不完全です。次の更新まで待機します。")
                return

            kline_high = float(kline_data["high"])
            kline_low = float(kline_data["low"])
            kline_timestamp_ms = int(kline_data["timestamp"]) # ミリ秒単位のUnixタイムスタンプ

            # 同一1分足での連続エントリーを避けるためのチェック
            # kline_timestamp_ms を秒に変換して比較
            current_kline_start_timestamp_sec = kline_timestamp_ms // 1000

            if last_entry_kline_timestamp == current_kline_start_timestamp_sec:
                # print(f"同一1分足({datetime.fromtimestamp(current_kline_start_timestamp_sec)})内での再エントリーはスキップします。")
                return

            # エントリー条件の判定
            if ask_price >= kline_high:
                print(f"{datetime.now()}: 買いシグナル発生！ Ask: {ask_price} >= 1分足高値: {kline_high}")
                if last_entry_kline_timestamp != current_kline_start_timestamp_sec or last_entry_side != "BUY":
                    order_id = place_order("BUY", TARGET_SYMBOL, ORDER_SIZE)
                    if order_id:
                        print(f"{datetime.now()}: 買い注文成功 (ID: {order_id})")
                        last_entry_kline_timestamp = current_kline_start_timestamp_sec
                        last_entry_side = "BUY"
                    else:
                        print(f"{datetime.now()}: 買い注文失敗")
                else:
                    print(f"{datetime.now()}: 買いシグナル発生しましたが、同一足での連続BUYのためスキップ。")


            elif bid_price <= kline_low:
                print(f"{datetime.now()}: 売りシグナル発生！ Bid: {bid_price} <= 1分足安値: {kline_low}")
                if last_entry_kline_timestamp != current_kline_start_timestamp_sec or last_entry_side != "SELL":
                    order_id = place_order("SELL", TARGET_SYMBOL, ORDER_SIZE)
                    if order_id:
                        print(f"{datetime.now()}: 売り注文成功 (ID: {order_id})")
                        last_entry_kline_timestamp = current_kline_start_timestamp_sec
                        last_entry_side = "SELL"
                    else:
                        print(f"{datetime.now()}: 売り注文失敗")
                else:
                    print(f"{datetime.now()}: 売りシグナル発生しましたが、同一足での連続SELLのためスキップ。")

    except Exception as e:
        print(f"tick_watcher - on_messageエラー: {e}")
        import traceback
        traceback.print_exc()

def on_error(ws, error):
    """WebSocketエラー発生時の処理"""
    print(f"WebSocketエラー: {error}")

def on_close(ws, close_status_code, close_msg):
    """WebSocketクローズ時の処理"""
    print(f"WebSocketクローズ: status={close_status_code}, msg={close_msg}")
    # 再接続ロジックなどをここに追加可能
    print("10秒後に再接続を試みます...")
    time.sleep(10)
    tick_watcher_main() # 再接続試行

def on_open(ws):
    """WebSocket接続成功時の処理"""
    print(f"{datetime.now()}: WebSocket接続成功。ティックデータの購読を開始します: {TARGET_SYMBOL}")
    subscribe_message = {
        "command": "subscribe",
        "channel": "ticker",
        "symbol": TARGET_SYMBOL
    }
    ws.send(json.dumps(subscribe_message))

def tick_watcher_main():
    """
    tick_watcher.py のメイン処理。WebSocketでティックデータを監視する。
    """
    print("tick_watcher.py: ティックデータ監視モジュール起動")
    # websocket.enableTrace(True) # 詳細なWebSocketログを出力する場合
    ws = websocket.WebSocketApp(PUBLIC_WEBSOCKET_URL,
                              on_message=on_message,
                              on_error=on_error,
                              on_close=on_close)
    ws.on_open = on_open

    while True:
        try:
            ws.run_forever(ping_interval=60, ping_timeout=10) # 60秒ごとにping、10秒でタイムアウト
        except KeyboardInterrupt:
            print("WebSocketクライアントを終了します。")
            ws.close()
            break
        except Exception as e:
            print(f"WebSocket run_foreverでエラー発生: {e}。再試行します...")
            time.sleep(5) # 短い待機後に再接続ループへ

# ==============================================================================
# main.py
# 各モジュールを起動し、戦略全体を管理するメインスクリプト
# ==============================================================================
import threading
import time
# from klines_sync import klines_sync_main
# from tick_watcher import tick_watcher_main

def main():
    print("ヒゲ頂点取り作戦システムを起動します...")

    # klines_sync.pyを別スレッドで実行
    klines_thread = threading.Thread(target=klines_sync_main, daemon=True)
    klines_thread.start()
    print("1分足データ取得モジュール (klines_sync) を起動しました。")

    # tick_watcher.pyを別スレッドで実行
    # klines_syncが初回データを取得するまで少し待つ
    print("ティック監視モジュール (tick_watcher) 起動準備中... (5秒待機)")
    time.sleep(5)
    tick_watcher_thread = threading.Thread(target=tick_watcher_main, daemon=True)
    tick_watcher_thread.start()
    print("ティック監視モジュール (tick_watcher) を起動しました。")

    try:
        while True:
            # メインスレッドは特に何もしないか、全体の監視処理などを行う
            time.sleep(60) # 例: 1分ごとに生存確認など
            if not klines_thread.is_alive():
                print("エラー: klines_sync スレッドが停止しました。")
                # 再起動処理などを検討
                break
            if not tick_watcher_thread.is_alive():
                print("エラー: tick_watcher スレッドが停止しました。")
                # 再起動処理などを検討
                break
            print(f"{datetime.now()}: システム稼働中...")

    except KeyboardInterrupt:
        print("システムを終了します...")
    finally:
        # 必要に応じてクリーンアップ処理
        print("システム終了。")

if __name__ == "__main__":
    # 最初に一度、手動でklineデータを取得しておく（tick_watcherが初回起動時にエラーにならないように）
    # 本番環境では、より洗練された初期化シーケンスを検討
    print("初回1分足データを取得します...")
    update_kline_data_file()

    main()

# ==============================================================================
# requirements.txt
# 必要なライブラリ
# ==============================================================================
# requests
# websocket-client