Skip to content

MySQL Stream処理の詳細分析 #499

Open
@takapi327

Description

@takapi327

概要

MySQL Connector/Jにおけるstream処理は、大きな結果セットを効率的に処理するための仕組みです。setFetchSize(Integer.MIN_VALUE)を設定することで、行を一つずつストリーミング読み取りし、メモリ使用量を最小限に抑えます。

  1. 使用されるパケット構造

COM_QUERYパケット

[パケットヘッダー (4バイト)]
├─ payload_length (3バイト): パケットサイズ
└─ sequence_id (1バイト): シーケンス番号

[パケットペイロード]
├─ command (1バイト): 0x03 (COM_QUERY)
├─ [クエリ属性 (CLIENT_QUERY_ATTRIBUTES対応時)]
│ ├─ parameter_count (length-encoded integer)
│ ├─ parameter_set_count (length-encoded integer)
│ ├─ null_bitmap (可変長)
│ ├─ new_params_bind_flag (1バイト)
│ └─ パラメータ詳細 (可変長)
└─ query (可変長文字列): SQLクエリ

結果セットパケット

[Column Count Packet]
├─ column_count (length-encoded integer)

[Column Definition Packets] × column_count
├─ catalog (length-encoded string)
├─ schema (length-encoded string)
├─ table (length-encoded string)
├─ org_table (length-encoded string)
├─ name (length-encoded string)
├─ org_name (length-encoded string)
├─ length_of_fixed_length_fields (1バイト)
├─ character_set (2バイト)
├─ column_length (4バイト)
├─ type (1バイト)
├─ flags (2バイト)
├─ decimals (1バイト)
└─ filler (2バイト)

[EOF Packet] (MySQL 5.7.5以前)
├─ header (1バイト): 0xFE
├─ warning_count (2バイト)
└─ status_flags (2バイト)

[Row Data Packets] (ストリーミング時は1行ずつ)
└─ column_values... (各列の値、length-encoded string)

[EOF/OK Packet] (結果セット終了)

  1. ストリーミング処理のロジック

ストリーミング有効化の条件

  protected boolean createStreamingResultSet() {
      return this.query.getResultType() == Type.FORWARD_ONLY
          && this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY
          && this.query.getResultFetchSize() == Integer.MIN_VALUE;
  }

  ストリーミング読み取りロジック

  public Row next() {
      if (!this.noMoreRows) {
          // サーバーから1行ずつ読み取り
          this.nextRow = this.protocol.read(ResultsetRow.class, this.rowFactory);

          if (this.nextRow == null) {
              this.noMoreRows = true;
              this.isAfterEnd = true;
          }
      }

      // ストリーミング終了処理
      if (this.nextRow == null && !this.streamerClosed) {
          if (this.protocol.getServerSession().hasMoreResults()) {
              this.protocol.readNextResultset(/*複数結果セット処理*/);
          } else {
              this.protocol.unsetStreamingData(this);
              this.streamerClosed = true;
          }
      }

      return this.nextRow;
  }

メモリ効率化の仕組み

  • 遅延読み込み: ResultSet.next()呼び出し時にサーバーから1行取得
  • 単行バッファ: 一度に1行のみメモリに保持
  • 自動リソース管理: ストリーミング終了時の自動クリーンアップ
  • 例外安全性: エラー発生時もリソース解放を保証
  1. クライアント ↔ データベース処理のシーケンス図
  sequenceDiagram
      participant App as アプリケーション
      participant DS as MysqlDataSource
      participant Conn as ConnectionImpl
      participant Stmt as StatementImpl
      participant Proto as NativeProtocol
      participant Server as MySQL Server

      Note over App,Server: 1. 接続確立フェーズ
      App->>DS: new MysqlDataSource()
      App->>DS: setServerName("127.0.0.1")
      App->>DS: setPortNumber(13306)
      App->>DS: getConnection()
      DS->>Conn: create ConnectionImpl
      Conn->>Proto: create NativeProtocol
      Proto->>Server: TCP接続確立
      Proto->>Server: Handshake
      Server-->>Proto: Initial Handshake Packet
      Proto->>Server: Login Request
      Server-->>Proto: OK Packet
      Conn-->>DS: Connection確立完了
      DS-->>App: Connection返却

      Note over App,Server: 2. Statement作成・設定フェーズ
      App->>Conn: createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY)
      Conn->>Stmt: new StatementImpl
      Stmt-->>App: Statement返却
      App->>Stmt: setFetchSize(Integer.MIN_VALUE)
      Note right of Stmt: ストリーミングモード有効化

      Note over App,Server: 3. クエリ実行フェーズ
      App->>Stmt: executeQuery("SELECT * FROM large_table")
      Stmt->>Proto: sendQueryPacket()

      rect rgb(255, 240, 240)
          Note over Proto,Server: COM_QUERYパケット送信
          Proto->>Server: [Header:4bytes][COM_QUERY:1byte][SQL:variable]
      end

      rect rgb(240, 255, 240)
          Note over Proto,Server: 結果セットメタデータ受信
          Server-->>Proto: Column Count Packet
          Server-->>Proto: Column Definition Packets × N
          Server-->>Proto: EOF Packet (MySQL 5.7.5以前)
      end

      Proto->>Stmt: createResultSet(streaming=true)
      Stmt-->>App: ResultSet返却

      Note over App,Server: 4. ストリーミング読み取りフェーズ
      loop 各行の処理
          App->>Stmt: resultSet.next()
          Stmt->>Proto: readRow()

          rect rgb(240, 240, 255)
              Note over Proto,Server: 1行ずつデータ受信
              Server-->>Proto: Row Data Packet
          end

          Proto-->>Stmt: Row Data
          Stmt-->>App: true (行が存在)
          App->>Stmt: resultSet.getString(column)
          Note right of App: 行データ処理
      end

      rect rgb(240, 240, 255)
          Note over Proto,Server: 結果セット終了
          Server-->>Proto: EOF/OK Packet
      end

      Proto-->>Stmt: null (行なし)
      Stmt-->>App: false (終了)

      Note over App,Server: 5. リソース解放フェーズ
      App->>Stmt: resultSet.close()
      Stmt->>Proto: unsetStreamingData()
      App->>Stmt: statement.close()
      App->>Conn: connection.close()
      Conn->>Proto: close()
      Proto->>Server: COM_QUIT
      Server-->>Proto: OK Packet
      Proto->>Server: TCP接続切断
Loading
  1. パケット送受信の詳細タイミング

通常モード vs ストリーミングモード

フェーズ 通常モード ストリーミングモード
クエリ送信 COM_QUERYパケット送信 COM_QUERYパケット送信
メタデータ受信 全カラム定義受信 全カラム定義受信
データ受信 全行データを一括受信 1行ずつ遅延受信
メモリ使用量 全行分のメモリ確保 1行分のみメモリ確保
処理タイミング executeQuery()時に全取得 next()呼び出し時に1行取得

ストリーミング時の特別な制約

  • ResultSet.TYPE_FORWARD_ONLY必須(前進のみ)
  • ResultSet.CONCUR_READ_ONLY必須(読み取り専用)
  • 複数ステートメント同時実行不可
  • 行の巻き戻し不可
  1. 性能特性とメモリ効率

メモリ使用量比較

通常モード:
メモリ使用量 ∝ 結果セット全体のサイズ
例: 100万行 × 1KB/行 = 約1GB

ストリーミングモード:
メモリ使用量 ≈ 定数(1行分 + バッファ)
例: 1行 × 1KB + バッファ = 約数KB

ネットワーク特性

  • レイテンシ: 各行取得時にネットワーク往復が発生
  • スループット: 大量データ処理時は通常モードより低下する可能性
  • 適用場面: 大きな結果セットを順次処理する場合に有効

まとめ

MySQL Connector/Jのストリーミング処理は、setFetchSize(Integer.MIN_VALUE)により有効化され、以下の特徴を持ちます:

  1. パケットレベル: 標準のMySQLプロトコルを使用し、行データを1行ずつ受信
  2. メモリ効率: 結果セットサイズに関係なく、一定のメモリ使用量を維持
  3. 処理制約: FORWARD_ONLY、READ_ONLYの制約あり
  4. 適用場面: 大きな結果セットの順次処理に最適
  5. 自動管理: リソースの自動解放とエラー時の適切なクリーンアップ

この仕組みにより、数百万行といった大規模データセットでも、限られたメモリ環境で効率的に処理することが可能になります。

useCursorFetch=true時の自動切り替え仕組み
  1. 設定時の自動プロパティ変更

まず、useCursorFetch=trueが設定されると、自動的に他のプロパティも変更されます:

  // JdbcPropertySetImpl.java (59-62行目)
  if (getBooleanProperty(PropertyKey.useCursorFetch).getValue()) {
      // カーソル機能はサーバーサイドプリペアドステートメントでのみ利用可能なため
      // 自動的にuseServerPrepStmtsをtrueに設定
      super.<Boolean>getProperty(PropertyKey.useServerPrepStmts).setValue(true);
  }
  1. executeQuery()での分岐処理

通常のStatementでも、条件次第でPreparedStatementに切り替わります:

  // StatementImpl.executeQuery() (1285行目)
  public java.sql.ResultSet executeQuery(String sql) throws SQLException {
      // 重要な分岐点
      if (useServerFetch()) {
          // ここでPreparedStatementに切り替え
          this.results = createResultSetUsingServerFetch(sql);
          return this.results;
      }

      // 通常のCOM_QUERYを使用
      this.results = ((NativeSession) locallyScopedConn.getSession()).execSQL(...);
  }
  1. 切り替え条件の判定
  // StatementImpl.useServerFetch() (2269行目)
  private boolean useServerFetch() throws SQLException {
      return this.session.getPropertySet().getBooleanProperty(PropertyKey.useCursorFetch).getValue()
             && this.query.getResultFetchSize() > 0
             && this.query.getResultType() == Type.FORWARD_ONLY;
  }

3つの条件すべてが満たされた場合のみ切り替わります:

  1. useCursorFetch = true

  2. fetchSize > 0

  3. ResultType = FORWARD_ONLY

  4. 内部的なPreparedStatement作成

  // StatementImpl.createResultSetUsingServerFetch() (596行目)
  private ResultSetInternalMethods createResultSetUsingServerFetch(String sql) throws SQLException {
      // 通常のStatementから内部的にPreparedStatementを作成
      java.sql.PreparedStatement pStmt = this.connection.prepareStatement(sql,
          this.query.getResultType().getIntValue(), this.resultSetConcurrency);

      pStmt.setFetchSize(this.query.getResultFetchSize());
      pStmt.execute();
      return ((JdbcStatement) pStmt).getResultSetInternal();
  }
  1. ServerPreparedStatementの作成
  // ConnectionImpl.prepareStatement() (1625行目)
  public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) {
      if (this.useServerPrepStmts.getValue() && canServerPrepare) {
          // ServerPreparedStatementを作成(COM_STMT_PREPAREを使用)
          pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);
      } else {
          // ClientPreparedStatementを作成(COM_QUERYを使用)
          pStmt = (ClientPreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
      }
  }
  1. プロトコルレベルでの処理

ServerPreparedStatementの場合

  // COM_STMT_PREPARE送信
  public void serverPrepare(String sql) throws IOException {
      NativePacketPayload prepareResultPacket = this.session.getProtocol()
              .sendCommand(this.commandBuilder.buildComStmtPrepare(...), false, 0);
  }

  // buildComStmtPrepare()
  public NativePacketPayload buildComStmtPrepare(NativePacketPayload sharedPacket, byte[] query) {
      packet.writeInteger(IntegerDataType.INT1, NativeConstants.COM_STMT_PREPARE); // 0x16 (22)
      packet.writeBytes(StringLengthDataType.STRING_FIXED, query);
      return packet;
  }

通常のStatementの場合

  // COM_QUERY送信
  public NativePacketPayload buildComQuery(NativePacketPayload sharedPacket, Session sess, byte[] query) {
      packet.writeInteger(IntegerDataType.INT1, NativeConstants.COM_QUERY); // 0x03 (3)
      packet.writeBytes(StringLengthDataType.STRING_FIXED, query);
      return packet;
  }

処理フローの全体像

  graph TD
      A[dataSource.setUseCursorFetch] --> B[useServerPrepStmts自動設定]
      B --> C[statement.executeQuery]
      C --> D{useServerFetch?}

      D -->|No| E[通常処理]
      E --> F[COM_QUERY送信]

      D -->|Yes| G[createResultSetUsingServerFetch]
      G --> H[connection.prepareStatement]
      H --> I[ServerPreparedStatement作成]
      I --> J[COM_STMT_PREPARE送信]
      J --> K[サーバーでPreparedStatement作成]
      K --> L[COM_STMT_EXECUTE送信]
      L --> M[カーソル付きで実行]
      M --> N[COM_STMT_FETCH で1行ずつ取得]
Loading

重要なポイント

  1. 透明性: 開発者はcreateStatement()を使っているつもりでも、内部的にはPreparedStatementに切り替わる
  2. 自動設定: useCursorFetch=trueによりuseServerPrepStmtsも自動的にtrueになる
  3. 条件付き: fetchSize > 0 かつ FORWARD_ONLY の条件も必要
  4. プロトコル変更: COM_QUERY から COM_STMT_PREPARE/EXECUTE/FETCH に完全に切り替わる

Metadata

Metadata

Assignees

Labels

ldbc:connectorAddition and modification of functionality to Connector projects🚀 featureNew feature

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions