-
Notifications
You must be signed in to change notification settings - Fork 0
11 Job Runtime Lifecycle
以降の節ではアーティファクトメソッドの呼び出し順序について説明します。以下のような単純なシンボルを使用して呼び出し順序について示します。
シンボル | 意味 |
---|---|
<action> |
バッチランタイムによって実行されるアクション |
<->method |
バッチランタイムによるバッチアーティファクトメソッドの呼び出し |
[method] |
オプションのメソッド |
// comment |
振る舞いに対するコメント |
LABEL: |
flow制御コメントに使用されるラベル |
すべてのバッチアーティファクトは、Job XMLで定義されるスコープで使用される前に、インスタンス化されます。そして、アーティファクトが含まれているスコープ内が生存期間となります。アーティファクトのライフサイクルに関するスコープは三つあり、job, step, step-partitionとなります。
Job XMLの参照ごとに一つのアーテイファクトがインスタンス化されます。パーティーションstepの場合、パーティーションごとのJob XML参照ごとに一つのアーティファクト(*1)がインスタンス化されます。この意味するところは、ジョブレベルアーティファクトはjobが生存期間、ということです。ステップレベルアーティファクトはstepが生存期間になります。パーティーションのステップレベルアーティファクトはパーティーションが生存期間になります。
アーティファクトインスタンスはコンカレントなスコープを横断して共有されることはありません。同一インスタンスは特定のJob XML参照が示す適切なスコープで使用されなければなりません。
*1 one artifact per Job XML reference per partition is instantiated.
すべてのジョブリポジトリアーティファクトはjob処理中にバッチランタイムによって生成され、実装が提供する手段によって削除されるまでは存在します。
※訳注:こっから先しばらくは擬似コードによる各アーティファクトの説明なので、基本的には仕様から該当部分をそのまんまコピペしています。
1. <Create JobContext>
2. <Store job level properties in JobContext>
3. <->[JobListener.beforeJob...] // thread A
4. <processs execution elements>
5. <->[JobListener.afterJob...] // thread A
6. <Destroy JobContext>
1. <Create StepContext>
2. <Store step level properties in StepContext>
3. <->[StepListener.beforeStep...] // thread A
4. <->Batchlet.process // thread A
5. // if stop issued:
6. <->[Batchlet.stop] // thread B, StepContext is available
7. <->[StepListener.afterStep...] // thread A
8. <Store StepContext persistent area>
9. <Destroy StepContext>
1. <Create StepContext>
2. <Store step level properties in StepContext>
3. <->[StepListener.beforeStep...] // thread A
4. <->[PartitionReducer.beginPartitionedStep] // thread A
5. <->[PartitionMapper.mapPartitions] // thread A
6. // per partition:
a. <->Batchlet.process // thread Px
b. // if stop issued:
c. <->[Batchlet.stop] // thread Py, StepContext is available
d. <->[PartitionCollector.collectPartitionData] // thread Px
7. // when collector payload arrives:
8. <->[PartitionAnalyzer.analyzeCollectorData] // thread A
9. // when partition ends:
10. <->[PartitionAnalyzer.analyzeStatus] // thread A
11. // if rollback condition occurs:
12. <->[PartitionReducer.rollbackPartitionedStep] // thread A
13. <->[PartitionReducer.beforePartitionedStepCompletion] // thread A
14. <->[PartitionReducer.afterPartitionedStepCompletion] // thread A
15. <->[StepListener.afterStep...] // thread A
16. <Store StepContext persistent area>
17. <Destroy StepContext>
1. <Create StepContext>
2. <Store step level properties in StepContext>
3. <->[StepListener.beforeStep...] // thread A
4. [<begin transaction> ]
5. <->ItemReader.open // thread A
6. <->ItemWriter.open // thread A
7. [<commit transaction> ]
8. // chunk processing:
9. <repeat until no more items> {
a. <begin checkpoint [<begin transaction> ]>
b. <repeat until commit criteria reached> {
i. <->ItemReader.readItem // thread A
ii. <->ItemProcessor.processItem // thread A
iii. <add item to buffer>
c. }
d. <->ItemWriter.writeItems // thread A
e. <->[ItemReader.checkpointInfo] // thread A
f. <->[ItemWriter.checkpointInfo] // thread A
g. <Store StepContext persistent area>
h.
i. <commit checkpoint (commit transaction)>
10. }
11. [<begin transaction> ]
12. <->ItemWriter.close // thread A
13. <->ItemReader.close // thread A
14. [<commit transaction> ]
15. <->[StepListener.afterStep...] // thread A
16. <Store StepContext persistent area>
17. <Destroy StepContext>
##11.7 Partitioned Chunk Processing
1. <Create StepContext>
2. <Store step level properties in StepContext>
3. <->[StepListener.beforeStep...] // thread A
4. <->[PartitionReducer.beginPartitionedStep] // thread A
5. <->[PartitionMapper.mapPartitions] // thread A // per partition - on thread Px:
a. [<begin transaction> ]
b. <->ItemReader.open // thread Px
c. <->ItemWriter.open // thread Px
d. [<commit transaction> ]
a. <repeat until no more items> {
i. <begin checkpoint [<begin transaction> ]>
ii. <repeat until commit criteria reached> {
1. <->ItemReader.readItem // thread Px
2. <->ItemProcessor.processItem // thread Px
3. <add item to buffer>
iii. }
iv. <->ItemWriter.writeItems // thread Px
v. <->[ItemReader.checkpointInfo] // thread Px
vi. <->[ItemWriter.checkpointInfo] // thread Px
vii. <Store (partition-local) StepContext persistent area>
viii. <commit checkpoint (commit transaction)>
ix. <->[PartitionCollector.collectPartitionData] // thread Px
e. }
f. [<begin transaction> ]
g. <->ItemWriter.close // thread Px
h. <->ItemReader.close // thread Px
i. [<commit transaction> ]
j. <->[PartitionCollector.collectPartitionData] // thread Px
6. [<begin transaction> ] // thread A
7. // Actions 9-12 run continuously until all partitions end.
8. // when collector payload arrives:
9. <->[PartitionAnalyzer.analyzeCollectorData] // thread A
10. // when partition ends:
11. <->[PartitionAnalyzer.analyzeStatus] // thread A
12. // Remaining actions run after all partitions end:
13. // if rollback condition occurs:
14. <->[PartitionReducer.rollbackPartitionedStep] // thread A
15. [<rollback transaction >]
16. // else not rollback
17. <->[PartitionReducer.beforePartitionedStepCompletion] // thread A
18. [<commit transaction> ] // thread A
19. <->[PartitionReducer.afterPartitionedStepCompletion] // thread A
20. <->[StepListener.afterStep...] // thread A
21. <Store StepContext persistent area>
22. <Destroy StepContext>
1. <Create StepContext>
2. <Store step level properties in StepContext>
3. <->[StepListener.beforeStep...] // thread A
4. [<begin transaction> ]
5. <->ItemReader.open // thread A
6. <->ItemWriter.open // thread A
7. [<commit transaction> ]
8. // chunk processing:
9. <repeat until no more items> {
a. <begin checkpoint [<begin transaction> ]>
b. <->[ChunkListener.beforeChunk] // thread A
c. <repeat until commit criteria reached> {
i. <->[ItemReadListener.beforeRead] // thread A
ii. <->ItemReader.readItem // thread A
i. <->[ItemReadListener.afterRead] // thread A
ii. // or:
iii. {
iv. <->[ItemReadListener.onReadError] // thread A
v. <->[SkipListener.onSkipReadItem] // thread A
vi. }
vii. <->[ItemProcessListener.beforeProcess] // thread A
viii. <->ItemProcessor.processItem // thread A
ix. <->[ItemProcessListener.afterProcess] // thread A
x. // or:
xi. {
xii. <->[ItemProcessListener.onProcessError] // thread A
xiii. <->[SkipListener.onSkipProcessItem] // thread A
xiv. }
xv. <add item to buffer>
d. }
e. <->[ItemWriteListener.beforeWrite] // thread A
f. <->ItemWriter.writeItems // thread A
g. <->[ItemWriteListener.afterWrite] // thread A
h. // or:
i. {
j. <->[ItemWriteListener.onWriteError] // thread A
k. <->[SkipListener.onSkipWriteItems] // thread A
l. }
m. <->[ChunkListener.afterChunk] // thread A
n. <->[ItemReader.checkpointInfo] // thread A
o. <->[ItemWriter.checkpointInfo] // thread A
p. <Store StepContext persistent area>
q. <commit checkpoint (commit transaction)>
10. }
11. [<begin transaction> ]
12. <->ItemWriter.close // thread A
13. <->ItemReader.close // thread A
14. [<commit transaction> ]
15. <->[StepListener.afterStep...] // thread A
16. <Store StepContext persistent area>
17. <Destroy StepContext>
注意:ロールバック処理の詳細についてもこの節で記述されます。
1. <Create StepContext>
2. <Store step level properties in StepContext>
3. <->[StepListener.beforeStep...] // thread A
4. [<begin transaction> ]
5. <->ItemReader.open // thread A
6. <->ItemWriter.open // thread A
7. [<commit transaction> ]
8. // chunk processing:
9. <repeat until no more items> {
a. S1:
b. <begin checkpoint [<begin transaction> ]>
c. <repeat until commit-interval reached> {
i. S2:
ii. <->ItemReader.readItem // thread A
iii. // if exception
iv. <->[ItemReadListener.onReadErrror] // thread A
v. <->[RetryReadListener.onRetryReadException] // thread A
vi. // if retryable exception
vii. // if no-rollback exception
viii. resume S2:
ix. // else
x. <end repeat>
xi. // else
xii. <end repeat>
xiii. S3:
xiv. <->ItemProcessor.processItem // thread A
xv. // if exception
xvi. <->[ItemProcessListener.onProcessErrror] // thread A
xvii. <->[RetryProcessListener.onRetryProcessException] // thread A
xviii. // if retryable exception
xix. // if no-rollback exception
xx. resume S3:
xxi. // else
xxii. <end repeat>
xxiii. // else
xxiv. <end repeat>
xxv. <add item to buffer>
d. }
e. // if rollback exception, execute rollback procedure (below) and resume at S1 with item-count=1
f. S4:
g. <->ItemWriter.writeItems (buffer) // thread A
h. // if exception
i. <->[ItemWriteListener.onWriteErrror] // thread A
j. <->[RetryWriteListener.onRetryWriteException] // thread A
k. // if retryable exception
l. // if no-rollback exception
m. resume S4:
n. // else
o. execute rollback procedure (below) and resume S1:
p. // else execute rollback procedure (below) and resume S1:
q. <->[ItemReader.checkpointInfo] // thread A
r. <->[ItemWriter.checkpointInfo] // thread A
s. <Store StepContext persistent area> // thread A
t. S5:
u. <commit checkpoint (commit transaction)> // thread A
v. // if exception
w. // if retryable exception
x. // if no-rollback exception:
y. resume S5:
z. // else
aa. execute rollback procedure (below) and resume S1:
bb. // else execute rollback procedure (below) and resume S1:
10. }
11. [<begin transaction> ]
12. <->ItemWriter.close // thread A
13. <->ItemReader.close // thread A
14. [<commit transaction> ]
15. <->[StepListener.afterStep...] // thread A
16. <Store StepContext persistent area>
17. <Destroy StepContext>
Rollback Procedure
1. <->ItemWriter.close // thread A
2. <->ItemReader.close // thread A
3. [ChunkListener.onError] // thread A
4. [rollback transaction]
5. [<begin transaction> ]
6. <->ItemWriter.open // thread A, pass last committed checkpoint info
7. <->ItemReader.open // thread A, pass last committed checkpoint info
8. [<commit transaction> ]
1. <Create StepContext>
2. <Store step level properties in StepContext>
3. <->[StepListener.beforeStep...] // thread A
4. [<begin transaction> ]
5. <->ItemReader.open // thread A
6. <->ItemWriter.open // thread A
7. [<commit transaction> ]
8. // chunk processing:
9. <repeat until no more items> {
a. [
b. <->[CheckpointAlgorithm.checkpointTimeout]
c. <begin checkpoint [<begin transaction> ]>
b. ]
c. <repeat until commit criteria reached> {
i. <->ItemReader.readItem // thread A
ii. <->ItemProcessor.processItem // thread A
iii. <add item to buffer>
iv. <->CheckpointAlgorithm.isReadyToCheckpoint // thread A
d. }
e. <->ItemWriter.writeItems // thread A
f. <->[ItemReader.checkpointInfo] // thread A
g. <->[ItemWriter.checkpointInfo] // thread A
h. <Store StepContext persistent area>
d. <->[CheckpointAlgorithm.beginCheckpoint] // thread A
e. <commit checkpoint (commit transaction)>
i. <->[CheckpointAlgorithm.endCheckpoint] // thread A
10. }
11. [<begin transaction> ]
12. <->ItemWriter.close // thread A
13. <->ItemReader.close // thread A
14. [<commit transaction> ]
15. <->[StepListener.afterStep...] // thread A
16. <Store StepContext persistent area>
17. <Destroy StepContext>
1. // For each flow:
2. <run flow> // thread Fx
1. // For each split or step:
2. <run split or step> // thread Xy
JobOperator.stopは実行中のjobを停止します。もしstopが呼び出されたときにstepが実行中の場合、バッチランタイムは以下のアクションを起こします。
Chunk Step
jobとstepのバッチステータスはSTOPPINGになります。注意点として、バッチランタイムはstepが終了することを保証しません。バッチランタイムはread/process/write chunk処理ループをインタラプトしようと試み、stepに現在のアイテム処理を停止することを許可します。この意味するところは、読み込まれてprocessor設定があればそこで処理された現在のアイテム、現在までにバッファーされたアイテム、がもしあれば、現在のアイテムを含めて書き込みが行われます。もしchunkタイプのstepに設定されたバッチアーティファクトがバッチランタイムにreturnする場合は、期待通りに、jobとstepのバッチステータスはSTOPPEDになります。
Batchlet Step
jobとstepのバッチステータスはSTOPPINGになります。バッチランタイムはbatchletのstopメソッドを呼び出します。注意点として、バッチランタイムはbatchletが実際に終了することを保証しません。ただし、行儀のよいbatchletはその限りではありません。もしbatchletがバッチランタイムにreturnする場合、jobとstepのバッチステータスはSTOPPEDになります。
パーティーションbatchlet stepにおいては、batchletのstopメソッドがアクティブでパーティーション処理をしている個々のスレッド上で呼び出されます。