## MT5からinfluxに追加する

In [1]:
%load_ext dotenv
%dotenv

In [None]:
import os
import sys
import datetime
from time import sleep
import pandas as pd
import MetaTrader5 as mt5
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

sys.path.append(os.path.abspath("../.."))
from blue_ocean.ohlcv import Ohlcv, OhlcvRepository, BUCKET_NAME, MEASUREMENT_NAME

In [None]:
INFLUXDB_URL = os.getenv("INFLUXDB_URL")
INFLUXDB_ORG = os.getenv("INFLUXDB_ORG")
INFLUXDB_TOKEN = os.getenv("INFLUXDB_TOKEN")
SOURCE = "XM"
SYMBOL = "BTCUSD"
CHUNK_SIZE = 1000000

In [None]:
if mt5.initialize():
    print("MetaTrader 5に正常に接続しました。")
try:
    with InfluxDBClient(
        url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG
    ) as client:
        repository = OhlcvRepository(client)
        write_api = client.write_api(write_options=SYNCHRONOUS)
        # InfluxDBから最新のタイムスタンプを取得
        latest_timestamp = repository.get_latest_timestamp(SYMBOL)
        # タイムゾーンをUTCに設定
        timezone = datetime.timezone.utc
        # データ取得期間の「終わり」を設定 (DBにデータがあればその最新時刻、なければ現在時刻)
        date_to = (
            latest_timestamp if latest_timestamp else datetime.datetime.now(timezone)
        )
        # 遡って取得する期間の単位
        delta = datetime.timedelta(days=60)
        print(f"データ取得の基準時刻: {date_to}")
        while True:
            # データ取得期間の「始まり」を計算
            date_from = date_to - delta
            print(f"--- {date_from} から {date_to} のデータを取得します... ---")
            rates = mt5.copy_rates_range(SYMBOL, mt5.TIMEFRAME_M1, date_from, date_to)
            if rates is None or len(rates) == 0:
                print("MT5からこれ以上取得できるデータがありません。処理を終了します。")
                break
            rates_df = pd.DataFrame(rates)
            rates_df["time"] = pd.to_datetime(rates_df["time"], unit="s", utc=True)
            # 初回ループのみ、DBの最新データとの重複を削除
            if latest_timestamp and not rates_df.empty:
                rates_df = rates_df[rates_df["time"] > latest_timestamp]
            if rates_df.empty:
                print("書き込む新しいデータがありません。さらに過去に遡ります。\n")
                # 次のループのために、取得期間の終わりを更新
                date_to = date_from
                # 2回目以降は重複チェック不要
                latest_timestamp = None
                sleep(1)
                continue
            print(f"{len(rates_df)}件のデータを取得しました。")

            # Ohlcvオブジェクトのリストを作成
            ohlcvs = [row.to_point() for row in Ohlcv.from_dataframe(rates_df)]

            # データを一括書き込み
            if ohlcvs:
                repository.save(ohlcvs)
                print(f"==> {len(ohlcvs)}件のデータをInfluxDBに書き込みました。\n")
            # 次のループのために、取得期間の終わりを更新
            date_to = date_from
            # 2回目以降は重複チェック不要
            latest_timestamp = None
            sleep(1)
except Exception as e:
    print(f"エラーが発生しました: {e}")
finally:
    # MT5との接続をシャットダウン
    print("MetaTrader 5との接続をシャットダウンします。")
    mt5.shutdown()
    print("--- データインポート完了 ---")

MetaTrader 5に正常に接続しました。
データ取得の基準時刻: 2025-10-04 15:03:00+00:00
--- 2025-08-05 15:03:00+00:00 から 2025-10-04 15:03:00+00:00 のデータを取得します... ---
書き込む新しいデータがありません。さらに過去に遡ります。

--- 2025-06-06 15:03:00+00:00 から 2025-08-05 15:03:00+00:00 のデータを取得します... ---
13010件のデータを取得しました。
エラーが発生しました: Ohlcv.__init__() got an unexpected keyword argument 'index'
MetaTrader 5との接続をシャットダウンします。
--- データインポート完了 ---
