Description
概要
MySQL Connector/Jにおけるstream処理は、大きな結果セットを効率的に処理するための仕組みです。setFetchSize(Integer.MIN_VALUE)を設定することで、行を一つずつストリーミング読み取りし、メモリ使用量を最小限に抑えます。
- 使用されるパケット構造
COM_QUERYパケット
[パケットヘッダー (4バイト)]
├─ payload_length (3バイト): パケットサイズ
└─ sequence_id (1バイト): シーケンス番号
[パケットペイロード]
├─ command (1バイト): 0x03 (COM_QUERY)
├─ [クエリ属性 (CLIENT_QUERY_ATTRIBUTES対応時)]
│ ├─ parameter_count (length-encoded integer)
│ ├─ parameter_set_count (length-encoded integer)
│ ├─ null_bitmap (可変長)
│ ├─ new_params_bind_flag (1バイト)
│ └─ パラメータ詳細 (可変長)
└─ query (可変長文字列): SQLクエリ
結果セットパケット
[Column Count Packet]
├─ column_count (length-encoded integer)
[Column Definition Packets] × column_count
├─ catalog (length-encoded string)
├─ schema (length-encoded string)
├─ table (length-encoded string)
├─ org_table (length-encoded string)
├─ name (length-encoded string)
├─ org_name (length-encoded string)
├─ length_of_fixed_length_fields (1バイト)
├─ character_set (2バイト)
├─ column_length (4バイト)
├─ type (1バイト)
├─ flags (2バイト)
├─ decimals (1バイト)
└─ filler (2バイト)
[EOF Packet] (MySQL 5.7.5以前)
├─ header (1バイト): 0xFE
├─ warning_count (2バイト)
└─ status_flags (2バイト)
[Row Data Packets] (ストリーミング時は1行ずつ)
└─ column_values... (各列の値、length-encoded string)
[EOF/OK Packet] (結果セット終了)
- ストリーミング処理のロジック
ストリーミング有効化の条件
protected boolean createStreamingResultSet() {
return this.query.getResultType() == Type.FORWARD_ONLY
&& this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY
&& this.query.getResultFetchSize() == Integer.MIN_VALUE;
}
ストリーミング読み取りロジック
public Row next() {
if (!this.noMoreRows) {
// サーバーから1行ずつ読み取り
this.nextRow = this.protocol.read(ResultsetRow.class, this.rowFactory);
if (this.nextRow == null) {
this.noMoreRows = true;
this.isAfterEnd = true;
}
}
// ストリーミング終了処理
if (this.nextRow == null && !this.streamerClosed) {
if (this.protocol.getServerSession().hasMoreResults()) {
this.protocol.readNextResultset(/*複数結果セット処理*/);
} else {
this.protocol.unsetStreamingData(this);
this.streamerClosed = true;
}
}
return this.nextRow;
}
メモリ効率化の仕組み
- 遅延読み込み: ResultSet.next()呼び出し時にサーバーから1行取得
- 単行バッファ: 一度に1行のみメモリに保持
- 自動リソース管理: ストリーミング終了時の自動クリーンアップ
- 例外安全性: エラー発生時もリソース解放を保証
- クライアント ↔ データベース処理のシーケンス図
sequenceDiagram
participant App as アプリケーション
participant DS as MysqlDataSource
participant Conn as ConnectionImpl
participant Stmt as StatementImpl
participant Proto as NativeProtocol
participant Server as MySQL Server
Note over App,Server: 1. 接続確立フェーズ
App->>DS: new MysqlDataSource()
App->>DS: setServerName("127.0.0.1")
App->>DS: setPortNumber(13306)
App->>DS: getConnection()
DS->>Conn: create ConnectionImpl
Conn->>Proto: create NativeProtocol
Proto->>Server: TCP接続確立
Proto->>Server: Handshake
Server-->>Proto: Initial Handshake Packet
Proto->>Server: Login Request
Server-->>Proto: OK Packet
Conn-->>DS: Connection確立完了
DS-->>App: Connection返却
Note over App,Server: 2. Statement作成・設定フェーズ
App->>Conn: createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY)
Conn->>Stmt: new StatementImpl
Stmt-->>App: Statement返却
App->>Stmt: setFetchSize(Integer.MIN_VALUE)
Note right of Stmt: ストリーミングモード有効化
Note over App,Server: 3. クエリ実行フェーズ
App->>Stmt: executeQuery("SELECT * FROM large_table")
Stmt->>Proto: sendQueryPacket()
rect rgb(255, 240, 240)
Note over Proto,Server: COM_QUERYパケット送信
Proto->>Server: [Header:4bytes][COM_QUERY:1byte][SQL:variable]
end
rect rgb(240, 255, 240)
Note over Proto,Server: 結果セットメタデータ受信
Server-->>Proto: Column Count Packet
Server-->>Proto: Column Definition Packets × N
Server-->>Proto: EOF Packet (MySQL 5.7.5以前)
end
Proto->>Stmt: createResultSet(streaming=true)
Stmt-->>App: ResultSet返却
Note over App,Server: 4. ストリーミング読み取りフェーズ
loop 各行の処理
App->>Stmt: resultSet.next()
Stmt->>Proto: readRow()
rect rgb(240, 240, 255)
Note over Proto,Server: 1行ずつデータ受信
Server-->>Proto: Row Data Packet
end
Proto-->>Stmt: Row Data
Stmt-->>App: true (行が存在)
App->>Stmt: resultSet.getString(column)
Note right of App: 行データ処理
end
rect rgb(240, 240, 255)
Note over Proto,Server: 結果セット終了
Server-->>Proto: EOF/OK Packet
end
Proto-->>Stmt: null (行なし)
Stmt-->>App: false (終了)
Note over App,Server: 5. リソース解放フェーズ
App->>Stmt: resultSet.close()
Stmt->>Proto: unsetStreamingData()
App->>Stmt: statement.close()
App->>Conn: connection.close()
Conn->>Proto: close()
Proto->>Server: COM_QUIT
Server-->>Proto: OK Packet
Proto->>Server: TCP接続切断
- パケット送受信の詳細タイミング
通常モード vs ストリーミングモード
フェーズ | 通常モード | ストリーミングモード |
---|---|---|
クエリ送信 | COM_QUERYパケット送信 | COM_QUERYパケット送信 |
メタデータ受信 | 全カラム定義受信 | 全カラム定義受信 |
データ受信 | 全行データを一括受信 | 1行ずつ遅延受信 |
メモリ使用量 | 全行分のメモリ確保 | 1行分のみメモリ確保 |
処理タイミング | executeQuery()時に全取得 | next()呼び出し時に1行取得 |
ストリーミング時の特別な制約
- ResultSet.TYPE_FORWARD_ONLY必須(前進のみ)
- ResultSet.CONCUR_READ_ONLY必須(読み取り専用)
- 複数ステートメント同時実行不可
- 行の巻き戻し不可
- 性能特性とメモリ効率
メモリ使用量比較
通常モード:
メモリ使用量 ∝ 結果セット全体のサイズ
例: 100万行 × 1KB/行 = 約1GB
ストリーミングモード:
メモリ使用量 ≈ 定数(1行分 + バッファ)
例: 1行 × 1KB + バッファ = 約数KB
ネットワーク特性
- レイテンシ: 各行取得時にネットワーク往復が発生
- スループット: 大量データ処理時は通常モードより低下する可能性
- 適用場面: 大きな結果セットを順次処理する場合に有効
まとめ
MySQL Connector/Jのストリーミング処理は、setFetchSize(Integer.MIN_VALUE)により有効化され、以下の特徴を持ちます:
- パケットレベル: 標準のMySQLプロトコルを使用し、行データを1行ずつ受信
- メモリ効率: 結果セットサイズに関係なく、一定のメモリ使用量を維持
- 処理制約: FORWARD_ONLY、READ_ONLYの制約あり
- 適用場面: 大きな結果セットの順次処理に最適
- 自動管理: リソースの自動解放とエラー時の適切なクリーンアップ
この仕組みにより、数百万行といった大規模データセットでも、限られたメモリ環境で効率的に処理することが可能になります。
useCursorFetch=true時の自動切り替え仕組み
- 設定時の自動プロパティ変更
まず、useCursorFetch=trueが設定されると、自動的に他のプロパティも変更されます:
// JdbcPropertySetImpl.java (59-62行目)
if (getBooleanProperty(PropertyKey.useCursorFetch).getValue()) {
// カーソル機能はサーバーサイドプリペアドステートメントでのみ利用可能なため
// 自動的にuseServerPrepStmtsをtrueに設定
super.<Boolean>getProperty(PropertyKey.useServerPrepStmts).setValue(true);
}
- executeQuery()での分岐処理
通常のStatementでも、条件次第でPreparedStatementに切り替わります:
// StatementImpl.executeQuery() (1285行目)
public java.sql.ResultSet executeQuery(String sql) throws SQLException {
// 重要な分岐点
if (useServerFetch()) {
// ここでPreparedStatementに切り替え
this.results = createResultSetUsingServerFetch(sql);
return this.results;
}
// 通常のCOM_QUERYを使用
this.results = ((NativeSession) locallyScopedConn.getSession()).execSQL(...);
}
- 切り替え条件の判定
// StatementImpl.useServerFetch() (2269行目)
private boolean useServerFetch() throws SQLException {
return this.session.getPropertySet().getBooleanProperty(PropertyKey.useCursorFetch).getValue()
&& this.query.getResultFetchSize() > 0
&& this.query.getResultType() == Type.FORWARD_ONLY;
}
3つの条件すべてが満たされた場合のみ切り替わります:
-
useCursorFetch = true
-
fetchSize > 0
-
ResultType = FORWARD_ONLY
-
内部的なPreparedStatement作成
// StatementImpl.createResultSetUsingServerFetch() (596行目)
private ResultSetInternalMethods createResultSetUsingServerFetch(String sql) throws SQLException {
// 通常のStatementから内部的にPreparedStatementを作成
java.sql.PreparedStatement pStmt = this.connection.prepareStatement(sql,
this.query.getResultType().getIntValue(), this.resultSetConcurrency);
pStmt.setFetchSize(this.query.getResultFetchSize());
pStmt.execute();
return ((JdbcStatement) pStmt).getResultSetInternal();
}
- ServerPreparedStatementの作成
// ConnectionImpl.prepareStatement() (1625行目)
public java.sql.PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) {
if (this.useServerPrepStmts.getValue() && canServerPrepare) {
// ServerPreparedStatementを作成(COM_STMT_PREPAREを使用)
pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);
} else {
// ClientPreparedStatementを作成(COM_QUERYを使用)
pStmt = (ClientPreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
}
}
- プロトコルレベルでの処理
ServerPreparedStatementの場合
// COM_STMT_PREPARE送信
public void serverPrepare(String sql) throws IOException {
NativePacketPayload prepareResultPacket = this.session.getProtocol()
.sendCommand(this.commandBuilder.buildComStmtPrepare(...), false, 0);
}
// buildComStmtPrepare()
public NativePacketPayload buildComStmtPrepare(NativePacketPayload sharedPacket, byte[] query) {
packet.writeInteger(IntegerDataType.INT1, NativeConstants.COM_STMT_PREPARE); // 0x16 (22)
packet.writeBytes(StringLengthDataType.STRING_FIXED, query);
return packet;
}
通常のStatementの場合
// COM_QUERY送信
public NativePacketPayload buildComQuery(NativePacketPayload sharedPacket, Session sess, byte[] query) {
packet.writeInteger(IntegerDataType.INT1, NativeConstants.COM_QUERY); // 0x03 (3)
packet.writeBytes(StringLengthDataType.STRING_FIXED, query);
return packet;
}
処理フローの全体像
graph TD
A[dataSource.setUseCursorFetch] --> B[useServerPrepStmts自動設定]
B --> C[statement.executeQuery]
C --> D{useServerFetch?}
D -->|No| E[通常処理]
E --> F[COM_QUERY送信]
D -->|Yes| G[createResultSetUsingServerFetch]
G --> H[connection.prepareStatement]
H --> I[ServerPreparedStatement作成]
I --> J[COM_STMT_PREPARE送信]
J --> K[サーバーでPreparedStatement作成]
K --> L[COM_STMT_EXECUTE送信]
L --> M[カーソル付きで実行]
M --> N[COM_STMT_FETCH で1行ずつ取得]
重要なポイント
- 透明性: 開発者はcreateStatement()を使っているつもりでも、内部的にはPreparedStatementに切り替わる
- 自動設定: useCursorFetch=trueによりuseServerPrepStmtsも自動的にtrueになる
- 条件付き: fetchSize > 0 かつ FORWARD_ONLY の条件も必要
- プロトコル変更: COM_QUERY から COM_STMT_PREPARE/EXECUTE/FETCH に完全に切り替わる