-
Notifications
You must be signed in to change notification settings - Fork 0
9 Batch Programming Model
バッチプログラミングモデルは、インタフェース、抽象クラス、フィールドアノテーションによって定義されています。インタフェースはバッチアプリケーションとバッチランタイムの本質的な契約関係を定義しています。多くのインタフェースは開発者の手間を省くためにいくつかのメソッドのデフォルト実装をした抽象クラスもまた持っています。
stepはchunkかbatchletのどちらかです。
chunkタイプのstepはreader-processor-writerバッチパターンとチェックポイントを使用したアイテム指向処理を実行します。
ItemReaderはchunk stepでアイテムを読み込むために使用されます。ItemReaderはchunkタイプstepを構成する三つのバッチアーティファクトの一つです。他の二つはItemProcessorとItemWriterです。
ItemReaderインタフェースはItemReaderバッチアーティファクトを実装するために使用します。
package javax.batch.api.chunk;
import java.io.Serializable;
/**
* ItemReaderはchunk処理においてアイテム読み込むを行うバッチアーティファクトを定義します。
*/
public interface ItemReader {
/**
* openメソッドはアイテムを読み込むためのreaderを準備します。
*
* 入力パラメーターは与えられたjobインスタンスでこのreaderの最後のチェックポイントを表現しています。
*
* チェックポイントデータはこのreaderによって定義され、また、checkpointInfoメソッドによって提供されます。
* チェックポイントデータはリスタート時にアイテム読み込みを再開する必要がある情報とは何かをreaderに提供します。
* 初回起動時にはチェックポイントの値はnullが渡されます。
*
* @param checkpoint 最後のチェックポイントを定義
* @throws Exception 任意のエラーでスローされる例外
*/
public void open(Serializable checkpoint) throws Exception;
/**
* closeメソッドはItemReaderの使用終了を意味します。readerは必要なクリーンアップ処理を自由に行えます。
*
* @throws Exception 任意のエラーでスローされる例外
*/
public void close() throws Exception;
/**
* readItemメソッドはchunk処理における次のアイテムを返します。
* nullはそれ以上アイテムが無いことを示し、また、現在のchunkがコミットされてstepが終了することを意味します。
*
* @return 次のアイテムかnull
* @throws Exception 任意のエラーでスローされる例外
*/
public Object readItem() throws Exception;
/**
* checkpointInfoメソッドはこのreaderにおける現在のチェックポイントデータを返します。
* chunkのチェックポイントがコミットされる前に呼び出されます。
*
* @return チェックポイントデータ
* @throws Exception 任意のエラーでスローされる例外
*/
public Serializable checkpointInfo() throws Exception;
}
package javax.batch.api.chunk;
import java.io.Serializable;
/**
* AbstractItemReaderはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public abstract class AbstractItemReader implements ItemReader {
/**
* もしItemReaderがopen時に処理を必要とするならこのメソッドをオーバーライドします。
* デフォルト実装は何もしません。
* @param checkpoint このitemReaderの最後のチェックポイント(たいていはnull)
* @throws Exception もしエラーが発生した場合
*/
@Override
public void open(Serializable checkpoint) throws Exception {
}
/**
* もしItemReaderがclose時に処理を必要とするならこのメソッドをオーバーライドします。
* デフォルト実装は何もしません。
* @throws Exception もしエラーが発生した場合
*/
@Override
public void close() throws Exception {
}
/**
* このメソッドでItemReaderの読み込みロジックを実装します。
* @return 次のアイテムかnull
* @throws Exception もしエラーが発生した場合
*/
@Override
public abstract Object readItem() throws Exception;
/**
* もしItemReaderがチェックポイントをサポートするならこのメソッドをオーバーライドします。
* デフォルト実装はnullを返します。
* @return チェックポイントデータ
* @throws Exception もしエラーが発生した場合
*/
@Override
public Serializable checkpointInfo() throws Exception {
return null;
}
}
ItemProcessorはchunk stepにおけるアイテム処理に使用されます。ItemProcessorはchunkタイプstepを構成する三つのバッチアーティファクトの一つです。ItemProcessorはchunkタイプstepではオプションのアーティファクトです。他の二つはItemReaderとItemWriterです。
ItemProcessorインタフェースはItemProcessorバッチアーティファクトを実装するために使用されます。
package javax.batch.api.chunk;
/**
* ItemProcessorはchunk処理において入力アイテムを処理して出力アイテムを生成するために使用されます。
*/
public interface ItemProcessor {
/**
* processItemメソッドはchunk stepの一部分です。
* readerから入力アイテム受け取り、writerに渡されるアイテムが戻り値になります。
* 戻り値nullは、そのアイテムの処理を継続しないことを意味します。
* これはprocessItemで不要な入力アイテムをフィルタできること意味します。
*
* @param item 処理する入力アイテムを指定します
* @return 書き込むアイテムを出力します
* @throws Exception もしエラーが発生した場合
*/
public Object processItem(Object item) throws Exception;
}
ItemWriterはchunk stepにおける出力アイテムのリストを書き込むために使用されます。ItemWriterはchunkタイプstepを構成する三つのバッチアーティファクトの一つです。他の二つはItemProcessorとItemReaderです。
ItemWriterインタフェースはItemWriterバッチアーティファクトを実装するために使用されます。
package javax.batch.api.chunk;
import java.io.Serializable;
import java.util.List;
/**
* ItemWriterはchunk処理におけるアイテムのリストを書き込むバッチアーティファクトを定義します。
*/
public interface ItemWriter {
/**
* openメソッドはアイテムを書き込むためのwriterを準備します。
*
* 入力パラメーターは与えられたjobインスタンスでこのwriterの最後のチェックポイントを表現しています。
* チェックポイントデータはこのwriterによって定義され、また、checkpointInfoメソッドによって提供されます。
* チェックポイントデータはリスタート時にアイテム書き込むを再開する必要がある情報とは何かをwriterに提供します。
* 初回起動時にはチェックポイントの値はnullが渡されます。
*
* @param checkpoint 最後のチェックポイントを定義
* @throws Exception 任意のエラーでスローされる例外
*/
public void open(Serializable checkpoint) throws Exception;
/**
* closeメソッドはItemWriterの使用終了を意味します。writerは必要なクリーンアップ処理を自由に行えます。
*
* @throws Exception 任意のエラーでスローされる例外
*/
public void close() throws Exception;
/**
* writeItemsメソッドは現在のchunkにおけるアイテムのリストを書き込みます。
*
* @param items 書き込むためのアイテムのリストを指定する
* @throws Exception 任意のエラーでスローされる例外
*/
public void writeItems(List<Object> items) throws Exception;
/**
* checkpointInfoメソッドはこのwriterにおける現在のチェックポイントデータを返します。
* chunkのチェックポイントがコミットされる前に呼び出されます。
*
* @return チェックポイントデータ
* @throws Exception 任意のエラーでスローされる例外
*/
public Serializable checkpointInfo() throws Exception;
}
package javax.batch.api.chunk;
import java.io.Serializable;
import java.util.List;
/**
* AbstractItemWriterはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public abstract class AbstractItemWriter implements ItemWriter {
/**
* もしItemWriterがopen時に処理を必要とするならこのメソッドをオーバーライドします。
* デフォルト実装は何もしません。
*
* @param checkpoint このItemWriterの最後のチェックポイント(たいていはnull)
* @throws Exception もしエラーが発生した場合
*/
@Override
public void open(Serializable checkpoint) throws Exception {
}
/**
* もしItemWriterがclose時に処理を必要とするならこのメソッドをオーバーライドします。
* デフォルト実装は何もしません。
*
* @throws Exception もしエラーが発生した場合
*/
@Override
public void close() throws Exception {
}
/**
* このメソッドでItemWriterの書き込むロジックを実装します。
*
* @param items 書き込むアイテムのリストを指定
* @throws Exception もしエラーが発生した場合
*/
@Override
public abstract void writeItems(List<Object> items) throws Exception;
/**
* もしItemWriterがチェックポイントをサポートするならこのメソッドをオーバーライドします。
* デフォルト実装はnullを返します。
*
* @return チェックポイントデータ
* @throws Exception もしエラーが発生した場合
*/
@Override
public Serializable checkpointInfo() throws Exception {
return null;
}
}
CheckpointAlgorithmはchunk stepのためのチェックポイントポリシーのカスタム実装を行います。CheckpointAlgorithmインタフェースはCheckpointAlgorithmバッチアーティファクトと実装するために使用されます。
package javax.batch.api.chunk;
/**
* CheckpointAlgorithmはchunk stepのためのチェックポイントポリシーのカスタム実装を提供します。
*/
public interface CheckpointAlgorithm {
/**
* チェックポイントタイムアウトの確立目的で新しいチェックポイントインターバルの開始時に呼び出されます。
* このメソッドは次のチェックポイントトランザクションが開始する前に呼び出されます。
* このメソッドは次のチェックポイントトランザクションに使用されるタイムアウト値となる整数値を返します。
* このメソッドはジョブ定義の外側の情報に基づいてチェックポイントタイムアウトの設定を自動化するのに役立ちます。
*
* (*1)
* @return 次のチェックポイントインターバルで使用するためのタイムアウトインターバル値
* @throws Exception もしエラーが発生した場合
*/
public int checkpointTimeout() throws Exception;
/**
* beginCheckpointメソッドは次のチェックポイントインターバルが開始される前に呼び出されます。
*
* @throws Exception もしエラーが発生した場合
*/
public void beginCheckpoint() throws Exception;
/**
* isReadyToCheckpointメソッドは、現時点が現在のchunkでチェックポイント処理を行うタイミングかどうかを決定するために、
* 各アイテムの処理後に、バッチランタイムによって呼び出されます。
*
* @return 現時点でチェックポイント処理をするかどうかを示す。
* @throws Exception もしエラーが発生した場合
*/
public boolean isReadyToCheckpoint() throws Exception;
/**
* endCheckpointメソッドは現在のチェックポイントが終了した後に呼び出されます。
*
* @throws Exception もしエラーが発生した場合
*/
public void endCheckpoint() throws Exception;
}
package javax.batch.api.chunk;
/**
* AbstractCheckpointAlgorithmはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public class AbstractCheckpointAlgorithm implements CheckpointAlgorithm {
/**
* もしCheckpointAlgorithmがチェックポイントタイムアウトを確立するならこのメソッドをオーバーライドします。
* デフォルト実装はランタイム環境が許容する最大のタイムアウトを意味する0を返します。
*
* @return 次のチェックポイントインターバルで使用するためのタイムアウトインターバル
* @throws もしエラーが発生した場合
*/
@Override
public int checkpointTimeout() throws Exception {
return 0;
}
/**
* チェックポイント開始前にCheckpointAlgorithmで実行する何がしかのためにこのメソッドをオーバーライドします。
* デフォルト実装は何もしません。
*
* @throws もしエラーが発生した場合
*/
@Override
public void beginCheckpoint() throws Exception {
}
/**
* チェックポイントがその時点でされるかどうかを決定するためのロジックをこのメソッドで実装します。
*
* @return 現時点でチェックポイント処理をするかどうかを示す。
* @throws もしエラーが発生した場合
*/
@Override
public boolean isReadyToCheckpoint() throws Exception {
return false;
}
/**
* チェックポイント終了後にCheckpointAlgorithmで実行する何がしかのためにこのメソッドをオーバーライドします。
* デフォルト実装は何もしません。
*
* @throws もしエラーが発生した場合
*/
@Override
public void endCheckpoint() throws Exception {
}
}
*1 原文のcheckpointTimeout()には引数が無いのに@paramがあり謎。
batchletタイプのstepは環境独自のバッチパターンの実装を行います。このバッチパターンは一度呼び出されると、完了するまで実行され、完了ステータスを返します。
BatchletインタフェースはBatchletバッチアーティファクトを実装するために使用されます。
package javax.batch.api;
/**
* batchletは、chunk指向アプローチとは全く異なるバックグラウンド処理のために使用される、stepのタイプの一つです。
* <p>
* 仕様に沿ったbatchletは停止リクエストをstopメソッド実装によって応えます。
*/
public interface Batchlet {
/**
* processメソッドはbatchletの動作です。もしこのメソッドが例外をスローする場合、batchlet
* stepはバッチステータスFAILEDで終了します。
*
* @return 完了ステータスのstring
* @throws Exception もしエラーが発生した場合。
*/
public String process() throws Exception;
/**
* stopメソッドはJobOperator.stop()メソッド処理の一部としてバッチランタイムから呼び出されます。
* このメソッドはbatchletのprocessメソッドが実行中のスレッド以外のスレッドで呼び出されます。
*
* @throws Exception もしエラーが発生した場合。
*/
public void stop() throws Exception;
}
package javax.batch.api;
/**
* AbstractBatchletはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public class AbstractBatchlet implements Batchlet {
/**
* このメソッドでbatchletのロジックを実装します。
*
* @return 完了ステータスのstring
* @throws Exception もしエラーが発生した場合。
*/
@Override
public String process() throws Exception {
return null;
}
/**
* もしbatchletがJobOperator.stop()への応答として終了する場合にこのメソッドをオーバーライドします。
* デフォルト実装は何もしません。
*
* @throws Exception もしエラーが発生した場合。
*/
@Override
public void stop() throws Exception {
}
}
プログラミングガイドライン
JobOperator.stopが呼び出されるとき、良い設計のbatchletは潔く(gracefully)停止します。stop処理の詳細な情報については11.13節を参照してください。
バッチ実行に割り込むためにはリスナーを使用します。
ジョブリスナーはジョブ実行の前後や、例外がジョブ処理中にスローされた場合に制御が渡されます。JobListenerインタフェースはJobListenerバッチアーティファクトを実装するために使用されます。
package javax.batch.api.listener;
/**
* JobListenerはジョブの実行をインターセプトします。
*/
public interface JobListener {
/**
* beforeJobメソッドはジョブ実行の開始前に制御が渡されます。
* @throws Exception エラー発生時にスローされる例外
*/
public void beforeJob() throws Exception;
/**
* afterJobメソッドはジョブ実行の終了前に制御が渡されます。
* @throws Exception エラー発生時にスローされる例外
*/
public void afterJob() throws Exception;
}
package javax.batch.api.listener;
/**
* AbstractJobListenerはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public class AbstractJobListener implements JobListener {
/**
* もしJobListenerがジョブ開始前にすることがあればこのメソッドをオーバーライドします。デフォルト実装は何もしません。
*/
@Override
public void beforeJob() throws Exception {
}
/**
* もしJobListenerがジョブ終了後にすることがあればこのメソッドをオーバーライドします。デフォルト実装は何もしません。
*/
@Override
public void afterJob() throws Exception {
}
}
ステップリスナーはステップ実行の前後や、例外がステップ処理中にスローされた場合に、制御が渡されます。StepListenerインタフェースはStepListenerバッチアーティファクトを実装するために使用されます。
package javax.batch.api.listener;
/**
* StepListenerはジョブの実行をインターセプトします。
*/
public interface StepListener {
/**
* beforeStepメソッドはステップ実行の開始前に制御が渡されます。
* @throws Exception エラー発生時にスローされる例外
*/
public void beforeStep() throws Exception;
/**
* afterStepメソッドはステップ実行の終了前に制御が渡されます。
* @throws Exception エラー発生時にスローされる例外
*/
public void afterStep() throws Exception;
}
package javax.batch.api.listener;
/**
* AbstractStepListenerはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public class AbstractStepListener implements StepListener {
/**
* もしStepListenerがステップ開始前にすることがあればこのメソッドをオーバーライドします。デフォルト実装は何もしません。
*/
@Override
public void beforeStep() throws Exception {
}
/**
* もしJobListenerがステップ終了後にすることがあればこのメソッドをオーバーライドします。デフォルト実装は何もしません。
*/
@Override
public void afterStep() throws Exception {
}
}
チャンクリスナーは、チャンク処理の開始と終了時、チェックポイント処理の前後、に制御が渡されます。ChunkListenerインタフェースはChunkListenerバッチアーティファクトを実装するために使用されます。
package javax.batch.api.chunk.listener;
/**
* ChunkListenerはチャンク処理をインターセプトします。
*/
public interface ChunkListener {
/**
* beforeChunkメソッドは次のチャンク処理が開始する前に制御が渡されます。このメソッドはチャンク処理と同じトランザクションで呼び出されます。
*
* @throws Exception エラー発生時にスローされる例外
*/
public void beforeChunk() throws Exception;
/**
* onErrorメソッドはチャンクトランザクションがロールバックされる前に制御が渡されます。注意点として、
* afterChunkはこのケースでは呼び出されません。
*
* @param ex ロールバックの原因となった例外
* @throws Exception エラー発生時にスローされる例外
*/
public void onError(Exception ex) throws Exception;
/**
* afterChunkメソッドは現在のチャンク処理の終了後に制御が渡されます。このメソッドはチャンク処理と同じトランザクションで呼び出されます。
*
* @throws Exception エラー発生時にスローされる例外
*/
public void afterChunk() throws Exception;
}
package javax.batch.api.chunk.listener;
/**
* AbstractChunkListenerはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public class AbstractChunkListener implements ChunkListener {
/**
* ChunkListenerがチャンク開始前にすることがあればこのメソッドをオーバーライドしてください。デフォルト実装は何もしません。
*
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void beforeChunk() throws Exception {
}
/**
* ChunkListenerがチャンクトランザクションがロールバックされる前にすることがあればこのメソッドをオーバーライドしてください。
* デフォルト実装は何もしません。
*
* @param ex ロールバックの原因となった例外
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void onError(Exception ex) throws Exception {
}
/**
* ChunkListenerがチャンク終了後にすることがあればこのメソッドをオーバーライドしてください。 デフォルト実装は何もしません。
*
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void afterChunk() throws Exception {
}
}
item readリスナーは、item readerによってアイテムが読み込まれる前後や、readerが例外をスローした場合に、制御が渡されます。ItemReadListenerインタフェースはItemReadListenerバッチアーティファクトを実装するために使用されます。
package javax.batch.api.chunk.listener;
/**
* ItemReadListenerはitem reader処理をインターセプトします。
*/
public interface ItemReadListener {
/**
* beforeReadメソッドは次のアイテムを読み込むためにitem readerが呼び出される前に制御が渡されます。
*
* @throws Exception エラー発生時にスローされる例外
*/
public void beforeRead() throws Exception;
/**
* afterReadメソッドはitem readerがアイテムを読み込んだ後に制御が渡されます。
* このメソッドは入力値として読み込まれたアイテムが渡されます。
*
* @param item item readerによって読み込まれたアイテム
* @throws Exception エラー発生時にスローされる例外
*/
public void afterRead(Object item) throws Exception;
/**
* onReadErrorはitem readerがreadItemメソッドで例外をスローした後に制御が渡されます。
* このメソッドは入力値として例外が渡されます。
*
* @param ex item readerで発生した例外
* @throws Exception エラー発生時にスローされる例外
*/
public void onReadError(Exception ex) throws Exception;
}
package javax.batch.api.chunk.listener;
/**
* AbstractItemReadListenerはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public class AbstractItemReadListener implements ItemReadListener {
/**
* ItemReadListenerでアイテムが読み込まれる前にすることがあればこのメソッドをオーバーライドしてください。デフォルトは何もしません。
*
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void beforeRead() throws Exception {
}
/**
* ItemReadListenerでアイテムが読み込まれた後にすることがあればこのメソッドをオーバーライドしてください。デフォルトは何もしません。
*
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void afterRead(Object item) throws Exception {
}
/**
* ItemReaderのreadItemメソッドが例外をスローしたときにすることがあればこのメソッドをオーバーライドしてください。
* デフォルトは何もしません。
*
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void onReadError(Exception ex) throws Exception {
}
}
item processorリスナーはアイテムがitem processorによって処理される前後や、processorが例外をスローする場合に、制御が渡されます。ItemProcessListenerインタフェースはItemProcessListenerバッチアーティファクトを実装するために使用されます。
package javax.batch.api.chunk.listener;
/**
* ItemProcessListenerはアイテム処理をインターセプトします。
*/
public interface ItemProcessListener {
/**
* beforeProcessメソッドは次のアイテムを処理するためにitem processorが呼び出される前に制御が渡されます。
* このメソッドは入力値として処理されるアイテムが渡されます。
*
* @param item 処理対象のアイテム
* @throws Exception エラー発生時にスローされる例外
*/
public void beforeProcess(Object item) throws Exception;
/**
* afterProcessメソッドはitem processorがアイテムを処理したあとに制御が渡されます。
* このメソッドは入力値として処理されるアイテムと結果のアイテムを受け取ります。
*
* @param item 処理対象のアイテム
* @param result item writerへ渡されるアイテム
* @throws Exception エラー発生時にスローされる例外
*/
public void afterProcess(Object item, Object result) throws Exception;
/**
* (*1)
* @param item 処理対象のアイテム
* @param ex item processorがスローした例外
* @throws Exception エラー発生時にスローされる例外
*/
public void onProcessError(Object item, Exception ex) throws Exception;
}
package javax.batch.api.chunk.listener;
/**
* AbstractItemProcessListenerはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public class AbstractItemProcessListener implements ItemProcessListener {
/**
* アイテム処理前にItemProcessListenerですることがあればこのメソッドをオーバーライドします。
* デフォルトは何もしません。
*
* @param item 処理対象のアイテム
* @throw Exception エラー発生時にスローされる例外
*/
@Override
public void beforeProcess(Object item) throws Exception {
}
/**
* アイテム処理後にItemProcessListenerですることがあればこのメソッドをオーバーライドします。
* デフォルトは何もしません。
*
* @param item 処理対象のアイテム
* @param result item writerに渡されるアイテム
* @throw Exception エラー発生時にスローされる例外
*/
@Override
public void afterProcess(Object item, Object result) throws Exception {
}
/**
* ItemProcessorのprocessItemメソッドが例外をスローしたときItemProcessListenerですることがあればこのメソッドをオーバーライドします。
* デフォルトは何もしません。
*
* @param item 処理対象のアイテム
* @param ex item processorによってローされた例外
* @throw Exception エラー発生時にスローされる例外
*/
@Override
public void onProcessError(Object item, Exception ex) throws Exception {
}
}
*1 onProcessErrorにafterProcessの内容がコピペされてるっぽいので訳無し
item writeリスナーはアイテムがitem writerによって書き込まれる前後や、writerが例外をスローする場合に、制御が渡されます。ItemWriteListenerインタフェースはItemWriteListenerバッチアーティファクトを実装するために使用されます。
package javax.batch.api.chunk.listener;
import java.util.List;
/**
* ItemWriteListenerはアイテム書き込み処理をインターセプトします。
*/
public interface ItemWriteListener {
/**
* beforeWriteメソッドは処理済アイテムを書き込むためにitem writerが呼び出される前に制御が渡されます。
* このメソッドは入力値としてitem readerに渡されたアイテムのリストが渡されます。
* (*1)
*
* @param items 書き込み対象のアイテムのリスト
* @throws Exception エラー発生時にスローされる例外
*/
public void beforeWrite(List<Object> items) throws Exception;
/**
* afterWriteメソッドはitem writerがアイテムを書き込んだ後に制御が渡されます。
* このメソッドは入力値としてitem readerに渡されたアイテムのリストが渡されます。
*
* @param items 書き込み対象のアイテムのリスト
* @throws Exception エラー発生時にスローされる例外
*/
public void afterWrite(List<Object> items) throws Exception;
/**
* onWriteErrorメソッドはitem writerのwriteItemメソッドが例外をスローした後に制御が渡されます。
* このメソッドは入力値としてitem readerに渡されたアイテムのリストが渡されます。
*
* @param items 書き込み対象のアイテムのリスト
* @param ex item writerによってスローされた例外
* @throws Exception エラー発生時にスローされる例外
*/
public void onWriteError(List<Object> items, Exception ex) throws Exception;
}
*1 processor通過後のアイテムなのか、readerに渡されたアイテムなのか、どっちなんだろうか……
skip listenerはスキップ可能例外がitem reader, processor, writerからスローされたときに制御を渡されます。三つのインターフェースがそれぞれについてのリスナーを実装するために用意されています。
package javax.batch.api.chunk.listener;
/**
* SkipReadListenerはスキップ可能なitemReaderの例外ハンドリングをインターセプトします。
*/
public interface SkipReadListener {
/**
* onSkipReadItemメソッドはItemReaderのreadItemメソッドからスキップ可能例外がスローされたときに制御が渡されます。
* このメソッドは入力値として例外を受け取ります。
*
* @param ex ItemReaderによってスローされた例外
* @throws Exception エラー発生時にスローされる例外
*/
public void onSkipReadItem(Exception ex) throws Exception;
}
package javax.batch.api.chunk.listener;
/**
* SkipProcessListenerはスキップ可能なitemProcessの例外ハンドリングをインターセプトします。
*/
public interface SkipProcessListener {
/**
* onSkipProcessItemメソッドはItemProcessのprocessItemメソッドからスキップ可能例外がスローされたときに制御が渡されます。
* このメソッドは入力値として例外と処理対象のアイテムを受け取ります。
*
* @param item ItemProcessorに渡されたアイテム
* @param ex ItemProcessorでスローされた例外
* @throws Exception エラー発生時にスローされる例外
*/
public void onSkipProcessItem(Object item, Exception ex) throws Exception;
}
package javax.batch.api.chunk.listener;
import java.util.List;
/**
* SkipWriteListenerはスキップ可能なitemWriterの例外ハンドリングをインターセプトします。
*/
public interface SkipWriteListener {
/**
* onSkipWriteItemsメソッドはItemWriterのwriteItemsメソッドからスキップ可能例外がスローされたときに制御が渡されます。
* このメソッドは入力値として例外とスキップされたアイテムを受け取ります。
* @param item item writerに渡されたアイテムのリスト
* @param ex ItemWriterでスローされた例外
* @throws Exception エラー発生時にスローされる例外
*/
public void onSkipWriteItem(List<Object> items, Exception ex) throws Exception;
}
retry listenerはリトライ可能例外がitem reader, processor, writerからスローされたときに制御を渡されます。三つのインターフェースがそれぞれについてのリスナーを実装するために用意されています。
package javax.batch.api.chunk.listener;
/**
* RetryReadListenerはItemReaderのリトライ処理をインターセプトします。
*/
public interface RetryReadListener {
/**
* onRetryReadExceptionメソッドは、リトライ可能例外がItemReaderのreadItemメソッドからスローされたときに制御を渡されます。
* このメソッドは入力値とし例外を受け取り、ItemReaderと同様のチェックポイントスコープとなります。
* もしこのメソッドが例外をスローする場合、ジョブはFAILEDステータスで終了します。
*
* @param ex item readerでスローされた例外
* @throws エラー発生時にスローされる例外
*/
public void onRetryReadException(Exception ex) throws Exception;
}
package javax.batch.api.chunk.listener;
/**
* RetryProcessListenerはItemProcessorのリトライ処理をインターセプトします。
*/
public interface RetryProcessListener {
/**
* onRetryProcessExceptionメソッドは、リトライ可能例外がItemProcessorのprocessItemメソッドからスローされたときに制御を渡されます。
* このメソッドは入力値として例外とそのとき処理中のアイテムを受け取ります。
* また、このメソッドはItemProcessorと同様のチェックポイントスコープとなります。
* もしこのメソッドが例外をスローする場合、ジョブはFAILEDステータスで終了します。
*
* @param item ItemProcessorに渡されたアイテム
* @param ex ItemProcessorでスローされた例外
* @throws エラー発生時にスローされる例外
*/
public void onRetryProcessException(Object item, Exception ex) throws Exception;
}
package javax.batch.api.chunk.listener;
import java.util.List;
/**
* RetryWriteListenerはItemWriterのリトライ処理をインターセプトします。
*/
public interface RetryWriteListener {
/**
* onRetryWriteExceptionメソッドは、リトライ可能例外がItemWriterのwriteItemsメソッドからスローされたときに制御を渡されます。
* このメソッドは入力値として例外と書き込み中のアイテムのリストを受け取ります。
* また、このメソッドはItemWriterと同様のチェックポイントスコープとなります。
* もしこのメソッドが例外をスローする場合、ジョブはFAILEDステータスで終了します。
*
* @param items item writerに渡されたアイテムのリスト
* @param ex item writerでスローされた例外
* @throws エラー発生時にスローされる例外
*/
public void onRetryWriteException(List<Object> items, Exception ex) throws Exception;
}
バッチアプリケーションはジョブが実行のために初期化されるときにパラメータを受け取る方法が必要です。プロパティをバッチプログラミングモデルのアーティファクトで定義することができ、ジョブが初期化されるときに渡された値を持ちます。バッチのプロパティは文字列です。
バッチのプロパティはそれが定義されたスコープのみが可視範囲です。しかし、バッチプロパティの値はXML置換表現ルールに沿って他のプロパティを使用できます。置換表現の詳細については8.8節を参照してください。
@BatchPropertyアノテーションはバッチプロパティとしてフィールドインジェクションを指定します。バッチプロパティは名前とデフォルト値を持ちます。@BatchPropertyはバッチプログラミングモデルのアーティファクト(たとえばItemReader, ItemProcessor, JobListenerなど)として作成された任意のクラスのフィールドで使用できます。
@BatchPropertyは標準の@Injectアノテーション(javax.inject.Inject)と共に使用する必要があります。@BatchPropertyはJob XMLのバッチアーティファクトのプロパティにバッチアーティファクトを割り当てるために使用します。
@BatchPropertyアノテーションの付与されたフィールドは非staticかつ非finalでなければなりません。
バッチランタイムは、実行環境がJSR229もしくはJSR330の実装を含んでいてもそうでなくても、@Injectが@BatchPropertyと共に動作することを保証します。バッチプロパティは常にインジェクトされることを意味します。他のインジェクション機構がサポートされているかどうかバッチランタイム実装に依存します。
文法:
package: javax.batch.api
@Inject @BatchProperty(name="<property-name>") String <field-name>;
入力:
オプション項目のバッチプロパティ名。デフォルト値はJavaのフィールド名 | |
バッチプロパティのフィールド名 |
package javax.batch.api;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.enterprise.util.Nonbinding;
import javax.inject.Qualifier;
@Qualifier
@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
public @interface BatchProperty {
@Nonbinding
public String name() default "";
}
当該バッチアーティファクトのスコープのJSLで一致する名前のプロパティ要素が定義されている場合、バッチランタイムによってアノテーション付与されたフィールドの値が割り当てられます。もしJSLのプロパティ値が空文字に解決される場合、割り当ては発生せずにJavaのデフォルト値が適用されます。注意点として、この仕様の範囲外のDIの使用は異なる結果を生む可能性があります。その結果生じる振る舞いは開発者が使用したDIに依存し、それはこの仕様の範囲外です。
例:
import javax.inject.Inject;
import javax.batch.api.BatchProperty;
public class MyItemReaderImpl {
@Inject @BatchProperty String fname;
}
振る舞い:
バッチランタイムがバッチアーティファクト(上の例ではitem reader)を初期化するとき、Job XMLでのnamedプロパティの値を対応する@BatchPropertyのフィールドに割り当てます。もし値が無ければJavaのデフォルト値となります。
バッチランタイムから提供されるコンテキストオブジェクトはバッチアプリケーションに重要な機能を提供します。コンテキストは、実行中のバッチジョブに関する情報、中間結果を格納するバッチジョブのための領域、バッチアプリケーションがバッチランタイムへ重要な情報を書き戻す方法、を提供します。コンテキストはメンバ変数としてアプリケーションにインジェクトできます。jobとstep両方についてのコンテキストが存在します。ジョブコンテキストはjob全体を表現しています。ステップコンテキストはjob内で現在実行中のstepを表現しています。
バッチアーティファクトは標準@Injectアノテーション(javax.inject.Inject)を使用したインジェクションによってバッチコンテキストにアクセスします。バッチコンテキストがインジェクトされるフィールドは非staticかつ非finalでなければなりません。
例:
@Inject JobContext _jctxt;
@Inject StepContext _sctxt;
バッチランタイムは現在実行中のjobかstepに従って正しいコンテキストオブジェクトがインジェクトされることを保証する責務があります。
バッチランタイムは、実行環境がJSR229もしくはJSR330の実装を含んでいてもそうでなくても、@InjectがJobContextとStepContextと共に動作することを保証します。バッチコンテキストは常にインジェクトされることを意味します。他のインジェクション機構がサポートされているかどうかバッチランタイム実装に依存します。
JobContextの定義については10.9.1を、StepContextの定義については10.9.2を参照してください。
バッチコンテキストはスレッドアフィニティを持ち、特定のスレッドで実行中のバッチアーティファクトでのみアクセス可能です。バッチコンテキストがインジェクトされたフィールドはスコープ外ではnullになります。各コンテキストタイプはタイプごとにスコープと下記のようなライフサイクルを持ちます。
- JobContext
job実行ごとに一つのJobContextが存在し、jobの生存期間と同じくする。(パーティーションstepなどの)パラレル実行のサブスレッドごとに別々のJobContextが存在する。 - StepContext
step実行ごとに一つStepContextが存在し、stepの生存期間と同じくする。パーティーションstepでは、親step/スレッドに一つのStepContextが存在する。サブスレッドごとに別々のStepContextが存在する。
バッチジョブは複数のstepをパラレル実行に設定可能です。二つのパラレル実行モデルがサポートされています。
1.Partitioned:
partitionedモデルでは、stepは複数スレッドにまたがる複数インスタンスとして動作するように設定されます。各スレッドは同一のstepかflowを実行します。このモデルは同一stepを複数インスタンスで実行するのと論理的に同等です。各パーティーションが入力アイテムの異なる範囲を処理することを意味します。
partitionedモデルはパラレル処理に対する細かな制御を可能にするバッチアーティファクトのオプションが含まれています。
- PartitionMapperはパーティーションごとのプロパティとパーティーション数をプログラミングによって算出する方法を提供します。
- PartitionReducerはパーティーション処理の間で単位処理を行う方法を提供します。
- PartitionCollectorは個々のパーティーションから中間結果をマージする方法を提供します。
- PartitionAnalyzerは中間結果を収集して個々のパーティーションからの最終結果を返す方法を提供します(*1)。
*1 for single point of control processing and decision making.が上手く訳せない…意味的にはmap-recduce的な並行データ処理モデルで分析結果を意思決定に役立てます、ってことなんだろうけども。
2.Concurrent:
concurrentモデルでは、splitによって定義されるflowは1スレッド1flowの複数スレッドでコンカレントに動作するよう設定できます。
partition mapperはパーティーション実行の開始時に制御を渡されます。partition mapperはパーティーションごとにユニークなバッチプロパティを渡す役割を持ちます。PartitionMapperインターフェースはPartitionMapperバッチアーティファクトを実装するために使用されます。
package javax.batch.api.partition;
import javax.batch.api.partition.PartitionPlan;
/**
* PartitionMapperはパーティーション実行の開始時に制御を渡されます。
* PartitionMapperはパーティーションごとにユニークなバッチプロパティを渡す役割を持ちます。
*/
public interface PartitionMapper {
/**
* mapPartitionsメソッドはパーティーション化step処理の開始時に制御を渡されます。
* PartitionPlanを返すメソッドは、各パーティーションごとのバッチプロパティを指定します。
* @return パーティーションstep用のpartition plan
* @throws Exception エラー発生時にスローされる例外
*/
public PartitionPlan mapPartitions( ) throws Exception;
}
注意点として、PartitionMapperはjobのリスタートでは呼び出されません。バッチランタイムは初回のJobExecutionでpartition planを記憶しており、同一のJobInstanceではすべての後続のJobExecutionでそれを使用します。
戻り値となるPartitionPlan型の詳細については10.9.3節を参照してください。
partition reducerはパーティーション処理の間で単位処理を行う方法(unit of work demarcation across partitions)(*1)を提供します。非JTA transactionで、どのリソースもJTA下に入りません。partition reducerが提供するのは、補償ロジックまたは終了処理のマージを調整するトランザクショナル・フロー・セマンティクス(transactional flow semantic)です。PartitionReducerインターフェースはPartitionReducerバッチアーティファクトを実装するために使用されます。
*1 並行処理あんま詳しくないんで、おかしな日本語なんだよなぁ……たぶん、そっちの分野でなら良く出てくる用語な気がするので、分かる人には分かるんだろうと思われ
package javax.batch.api.partition;
/**
* PartitionReducerはpartitionsにまたがる単位処理を行う方法を提供します。
* 非JTA transactionで、どのリソースもJTA下に入りません。
* PartitionReducerが提供するのは補償ロジックまたは終了処理のマージを調整するトランザクショナル・フロー・セマンティクスです。
*/
public interface PartitionReducer {
public enum PartitionStatus {COMMIT, ROLLBACK}
/**
* beginPartitionedStepメソッドはパーティーション処理の開始時に制御が渡されます。
* PartitionMapperが呼び出される前と任意のパーティーションが開始する前に制御が渡されます。
* @throws Exception エラー発生時にスローされる例外
*/
public void beginPartitionedStep() throws Exception;
/**
* beforePartitionedStepCompletionメソッドはパーティション化step処理の終了時に制御が渡されます。
* すべてのパーティーションが完了した後に制御が渡されます。
* もしPartitionReducerがロールバックする場合には制御は渡されません。
* @throws Exception エラー発生時にスローされる例外
*/
public void beforePartitionedStepCompletion() throws Exception;
/**
* rollbackPartitionedStepメソッドは、もしランタイムがパーティーション化stepをロールバックする場合に、制御が渡されます。
* 実行中の任意のパーティーションスレッドはこのメソッドが呼び出される前に完了可能です。
* このメソッドは以下の条件のいずれかがtrueの場合に制御が渡されます。
* <p>
* <ol>
* <li>1つまたは複数のパーティーションのバッチステータスがSTOPPEDかFAILEDで終了。</li>
* <li>以下のパーティーション化stepのコールバックのいずれかかが例外をスロー。</li>
* <ol>
* <li>PartitionMapper</li>
* <li>PartitionReducer</li>
* <li>PartitionCollector</li>
* <li>PartitionAnalyzer</li>
* </ol>
* <li>パーティーション化stepのjobがリスタート。</li>
* </ol>
* @throws Exception エラー発生時にスローされる例外
*/
public void rollbackPartitionedStep() throws Exception;
/**
* afterPartitionedStepCompletionメソッドはパーティーション処理の終了時に制御が渡されます。
* パーティーション処理の結果を意味するステータス値が渡されます。
* ステータスの文字列は"COMMIT"か"ROLLBACK"のどちらかです。
* @param status パーティーション化stepの結果。"COMMIT"か"ROLLBACK"のどちらか。
* @throws Exception エラー発生時にスローされる例外
*/
public void afterPartitionedStepCompletion(PartitionStatus status) throws Exception;
}
package javax.batch.api.partition;
/**
* AbstractBatchletはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public abstract class AbstractPartitionReducer implements PartitionReducer {
/**
* パーティーションstep処理が開始する前に実行する処理のためにこのメソッドをオーバーライドします。
*
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void beginPartitionedStep() throws Exception {}
/**
* 通常のパーティーションstep処理の終了前に実行する処理のためにこのメソッドをオーバーライドします。
*
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void beforePartitionedStepCompletion() throws Exception {}
/**
* パーティションstepがロールバックされるときに実行する処理のためにこのメソッドをオーバーライドします。
*
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void rollbackPartitionedStep() throws Exception {}
/**
* パーティションstep処理の終了後に実行する処理のためにこのメソッドをオーバーライドします。
*
* @param status パーティーションstepの結果。"COMMIT"か"ROLLBACK"のどちらか。
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void afterPartitionedStepCompletion(PartitionStatus status)
throws Exception {}
}
partition collectorは個々のパーティーションから親スレッドで動作する単一制御点(a single point of control)へデータを送る方法を提供します。PartitionAnalyzerはデータの受け取りと処理に使用されます。PartitionAnalyzerの詳細については9.5.4節を参照してください。PartitionCollectorインタフェースはPartitionCollectorバッチアーティファクトを実装するためにしようされます。
package javax.batch.api.partition;
import java.io.Serializable;
/**
* PartitionCollectorは個々のパーティーションからstepの親スレッドで動作する単一制御点へデータを渡す方法を提供します。
* PartitionAnalyzerはデータを受け取って処理するために使用されます。
*/
public interface PartitionCollector {
/**
* collectPartitionDataメソッドはパーティーション処理中に定期的に制御が渡されます。
* このメソッドは以下のような各スレッドがパーティーションを処理する時点で制御が渡されます。
* <p>
* <ol>
* <li>chunkタイプのstepでは、各chunkのチェックポイント後とパーティーション終了時の最終チェックポイント後に制御が渡されます。</li>
* <li>batchletタイプのstepでは、batchletの終了時に制御が渡されます。</li>
* </ol>
* <p>
* 注意点として、collectorは、もしパーティーションがunhandledな例外起因で終了する場合、呼び出されません。
* <p>
* @return PartitionAnalyzerに渡すためのSerializableなオブジェクト
* @throws Exception エラー発生時にスローされる例外
*/
public Serializable collectPartitionData() throws Exception;
}
partition analyzerはパーティーションの最終結果とデータを処理する目的のために制御が渡されます。もしpartition collectorがstepに設定されている場合、partition analyzerはpartition collectorから結果とデータを処理するために制御が渡されます。別々のpartition collectorのインスタンスがパーティーションを処理する各スレッドで呼び出される限り、partition analyzerはシングルスレッドで動作し、一貫性のあるスレッド(consistent thread)で毎回呼び出されます。PartitionAnalyzerインタフェースはPartitionAnalyzerバッチアーティファクトを実装するために使用されます。
package javax.batch.api.partition;
import java.io.Serializable;
import javax.batch.runtime.BatchStatus;
/**
* PartitionAnalyzerは各パーティーションからの最終結果とデータを処理するために制御が渡されます。
* PartitionCollectorがstepに設定されている場合、PartitionAnalyzerはpartition collectorから
* 結果とデータ処理のために制御が渡されます。
* 別々のPartitionCollectorインスタンスがstepパーティーションを処理する個々のスレッドで
* 呼び出される限り、単一のPartitionAnalyzerインスタンスが単一のスレッドで動作し、一貫性のある
* スレッドで毎回呼び出されます。
*
*/
public interface PartitionAnalyzer {
/**
* analyzeCollectorDataメソッドはPartition collectorが処理結果(*1)を送り出す度に制御が渡されます。
* 入力値としてSerializableなオブジェクトがcollectorから渡されます。
*
* @param data PartitionCollectorから送られる処理結果
* @throws Exception エラー発生時にスローされる例外
*/
public void analyzeCollectorData(Serializable data) throws Exception;
/**
* analyzeStatusメソッドはパーティーションが終了する度に制御が渡されます。
* 入力値としてパーティーションのバッチステータスと完了ステータスが渡されます
*
* @param batchStatus パーティーションのバッチステータス
* @param exitStatus パーティーションの完了ステータス
* @throws Exception エラー発生時にスローされる例外
*/
public void analyzeStatus(BatchStatus batchStatus, String exitStatus) throws Exception;
}
*1 原文ではpayload. Partition collectorの戻り値で「データ本体」みたいなニュアンスなんだろうし、まぁ「処理結果」という安直な訳でいいか、と妥協した。
package javax.batch.api.partition;
import java.io.Serializable;
import javax.batch.runtime.BatchStatus;
/**
* AbstractPartitionAnalyzerはメソッドをただ実装しただけのデフォルト実装を提供します。
*/
public abstract class AbstractPartitionAnalyzer implements PartitionAnalyzer {
/**
* PartitionCollectorの処理結果を分析するにはこのメソッドをオーバーライドします。
*
* @param data PartitionCollectorから送られる処理結果
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void analyzeCollectorData(Serializable data) throws Exception {}
/**
* パーティーションの終了ステータスを分析するにはこのメソッドをオーバーライドします。
*
* @param batchStatus パーティーションのバッチステータス
* @param exitStatus パーティーションの完了ステータス
* @throws Exception エラー発生時にスローされる例外
*/
@Override
public void analyzeStatus(BatchStatus batchStatus, String exitStatus)
throws Exception {}
}
deciderはJob XMLのstep, split, flow間のバッチ完了ステータスと遷移を決定するために使用されます。deciderは次の遷移先選択を決定するための完了ステータス値となる文字列を返します。DeciderインタフェースはDeciderバッチアーティファクトを実装するために使用されます。
package javax.batch.api;
import javax.batch.runtime.StepExecution;
/**
* Deciderはjobにおけるdecision要素の一部として制御を渡されます。
* job処理の間でflowを管理するために使用されます。
* 現在のjob実行完了ステータスをアップデートする完了ステータスを返します。
* この完了ステータス値はまた、deciderと同様のdecision要素で構成されるnext, end, stop, fail子要素に基づいて、実行の遷移を管理します。
*
*/
public interface Decider {
/**
* decideメソッドはjobに新しい完了ステータスを設定します。
* 入力値としてStepExecutionオブジェクトの配列を受け取ります。
* このStepExecutionオブジェクトは以下のようなdeciderへ遷移する実行要素を表現しています。
* <p>
* <ul>
* <li>Step</li>
* <p>
* 遷移がstepからのとき、decideメソッドは入力値としてそのstepに対応するStepExecutionを受け取ります。
* <li>Split</li>
* <p>
* 遷移がsplitからのとき、decideメソッドは入力値としてsplitに定義された各flowからのStepExecutionを受け取ります。
* <li>Flow</li>
* <p>
* 遷移がflowからのとき、decideメソッドはflowで完了した最後の実行要素に対応するStepExecutionを受け取ります。
* もし最後の要素がstepだった場合は一つのStepExecutionとなり、
* もし最後の要素がsplitだった場合は複数のStepExecutionとなります。
* </ul>
* @param executions この要素の前に実行された要素のStepExecution(s)
* @return 更新されるjob完了ステータス
* @throws Exception エラー発生時にスローされる例外
*/
public String decide(StepExecution[] executions) throws Exception;
}
chunkタイプのチェックポイントはトランザクショナルです。バッチランタイムはJava EEプラットフォームではグローバル・トランザクション・モード(global transaction mode)を使用し、Java SEプラットフォームではローカル・トランザクション・モード(local transaction mode)を使用します。グローバル・トランザクション・タイムアウトはstepレベルのプロパティで設定可能です。
- javax.transaction.global.timeout={seconds} - デフォルトは180秒。
例:
<step id="MyGlobalStep">
<properties>
<property name="javax.transaction.global.timeout" value="600"/>
</properties>
</step>