-
Notifications
You must be signed in to change notification settings - Fork 76
/
JdbcRepository.java
1240 lines (1167 loc) · 59.1 KB
/
JdbcRepository.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 2013-2017 Red Hat, Inc. and/or its affiliates.
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.jberet.repository;
import java.io.IOException;
import java.io.InputStream;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;
import jakarta.batch.runtime.BatchStatus;
import jakarta.batch.runtime.JobExecution;
import jakarta.batch.runtime.JobInstance;
import jakarta.batch.runtime.Metric;
import jakarta.batch.runtime.StepExecution;
import org.jberet._private.BatchLogger;
import org.jberet._private.BatchMessages;
import org.jberet.runtime.AbstractStepExecution;
import org.jberet.runtime.JobExecutionImpl;
import org.jberet.runtime.JobInstanceImpl;
import org.jberet.runtime.PartitionExecutionImpl;
import org.jberet.runtime.StepExecutionImpl;
import org.jberet.util.BatchUtil;
import org.wildfly.security.manager.WildFlySecurityManager;
public final class JdbcRepository extends AbstractPersistentRepository {
//keys used in jberet.properties
public static final String DDL_FILE_NAME_KEY = "ddl-file";
public static final String SQL_FILE_NAME_KEY = "sql-file";
public static final String DATASOURCE_JNDI_KEY = "datasource-jndi";
public static final String DB_URL_KEY = "db-url";
public static final String DB_USER_KEY = "db-user";
public static final String DB_PASSWORD_KEY = "db-password";
public static final String DB_PROPERTIES_KEY = "db-properties";
public static final String DB_PROPERTY_DELIM = ":";
public static final String DB_TABLE_PREFIX_KEY = "db-table-prefix";
public static final String DB_TABLE_SUFFIX_KEY = "db-table-suffix";
//defaults for entries in jberet.properties
//private static final String DEFAULT_DATASOURCE = "java:jboss/datasources/ExampleDS";
// private static final String DEFAULT_DB_URL = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1";
private static final String DEFAULT_DB_URL = "jdbc:h2:~/jberet-repo";
private static final String DEFAULT_SQL_FILE = "sql/jberet-sql.properties";
private static final String DEFAULT_DDL_FILE = "sql/jberet.ddl";
//keys used in *.sql files
private static final String SELECT_ALL_JOB_INSTANCES = "select-all-job-instances";
private static final String COUNT_JOB_INSTANCES_BY_JOB_NAME = "count-job-instances-by-job-name";
private static final String SELECT_JOB_INSTANCES_BY_JOB_NAME = "select-job-instances-by-job-name";
private static final String SELECT_JOB_INSTANCE = "select-job-instance";
private static final String INSERT_JOB_INSTANCE = "insert-job-instance";
private static final String SELECT_ALL_JOB_EXECUTIONS = "select-all-job-executions";
private static final String SELECT_JOB_EXECUTIONS_BY_JOB_INSTANCE_ID = "select-job-executions-by-job-instance-id";
private static final String SELECT_RUNNING_JOB_EXECUTIONS_BY_JOB_NAME = "select-running-job-executions-by-job-name";
private static final String SELECT_JOB_EXECUTIONS_BY_JOB_NAME = "select-job-executions-by-job-name";
private static final String SELECT_JOB_EXECUTION = "select-job-execution";
private static final String INSERT_JOB_EXECUTION = "insert-job-execution";
private static final String UPDATE_JOB_EXECUTION = "update-job-execution";
private static final String UPDATE_JOB_EXECUTION_AND_PARAMETERS = "update-job-execution-and-parameters";
private static final String UPDATE_JOB_EXECUTION_PARTIAL = "update-job-execution-partial";
private static final String STOP_JOB_EXECUTION = "stop-job-execution";
private static final String SELECT_ALL_STEP_EXECUTIONS = "select-all-step-executions";
private static final String SELECT_STEP_EXECUTIONS_BY_JOB_EXECUTION_ID = "select-step-executions-by-job-execution-id";
private static final String SELECT_STEP_EXECUTION = "select-step-execution";
private static final String INSERT_STEP_EXECUTION = "insert-step-execution";
private static final String UPDATE_STEP_EXECUTION = "update-step-execution";
private static final String UPDATE_STEP_EXECUTION_IF_NOT_STOPPING = "update-step-execution-if-not-stopping";
private static final String STOP_STEP_EXECUTION = "stop-step-execution";
private static final String FIND_ORIGINAL_STEP_EXECUTION = "find-original-step-execution";
private static final String COUNT_STEP_EXECUTIONS_BY_JOB_INSTANCE_ID = "count-step-executions-by-job-instance-id";
//private static final String SELECT_ALL_PARTITION_EXECUTIONS = "select-all-partition-executions";
private static final String COUNT_PARTITION_EXECUTIONS = "count-partition-executions";
private static final String SELECT_PARTITION_EXECUTIONS_BY_STEP_EXECUTION_ID = "select-partition-executions-by-step-execution-id";
private static final String INSERT_PARTITION_EXECUTION = "insert-partition-execution";
private static final String UPDATE_PARTITION_EXECUTION = "update-partition-execution";
private static final String UPDATE_PARTITION_EXECUTION_IF_NOT_STOPPING = "update-partition-execution-if-not-stopping";
private static final String STOP_PARTITION_EXECUTION = "stop-partition-execution";
private final DataSource dataSource;
private final String dbUrl;
private final String userDefinedDdlFile;
private final Properties dbProperties;
private final Properties sqls = new Properties();
private boolean isOracle;
private int[] idIndexInOracle;
public static JdbcRepository create(final Properties configProperties) {
return new JdbcRepository(configProperties);
}
public JdbcRepository(final Properties configProperties) {
String dataSourceName = configProperties.getProperty(DATASOURCE_JNDI_KEY);
dbProperties = new Properties();
userDefinedDdlFile = configProperties.getProperty(DDL_FILE_NAME_KEY);
//if dataSourceName is configured, use dataSourceName;
//else if dbUrl is specified, use dbUrl;
//if neither is specified, use default dbUrl;
if (dataSourceName != null) {
dataSourceName = dataSourceName.trim();
}
if (dataSourceName != null && !dataSourceName.isEmpty()) {
dbUrl = null;
try {
dataSource = InitialContext.doLookup(dataSourceName);
} catch (final NamingException e) {
throw BatchMessages.MESSAGES.failToLookupDataSource(e, dataSourceName);
}
} else {
String dbUrl = configProperties.getProperty(DB_URL_KEY);
dataSource = null;
if (dbUrl != null) {
dbUrl = dbUrl.trim();
}
if (dbUrl == null || dbUrl.isEmpty()) {
dbUrl = DEFAULT_DB_URL;
}
this.dbUrl = dbUrl;
final String dbUser = configProperties.getProperty(DB_USER_KEY);
if (dbUser != null) {
dbProperties.setProperty("user", dbUser.trim());
}
final String dbPassword = configProperties.getProperty(DB_PASSWORD_KEY);
if (dbPassword != null) {
dbProperties.setProperty("password", dbPassword.trim());
}
final String s = configProperties.getProperty(DB_PROPERTIES_KEY);
if (s != null) {
final String[] ss = s.trim().split(DB_PROPERTY_DELIM);
for (final String kv : ss) {
final int equalSign = kv.indexOf('=');
if (equalSign > 0) {
dbProperties.setProperty(kv.substring(0, equalSign), kv.substring(equalSign + 1));
}
}
}
}
createTables(configProperties);
}
/**
* Creates a new JDBC job repository.
*
* @param dataSource the data source used to connect to the database
*/
public JdbcRepository(final DataSource dataSource) {
this(dataSource, new Properties());
}
/**
* Creates a new JDBC job repository.
*
* @param dataSource the data source used to connect to the database
* @param configProperties the configuration properties to use
*/
public JdbcRepository(final DataSource dataSource, final Properties configProperties) {
if (dataSource == null) {
throw BatchMessages.MESSAGES.nullVar("dataSource");
}
if (configProperties == null) {
throw BatchMessages.MESSAGES.nullVar("configProperties");
}
dbProperties = new Properties();
userDefinedDdlFile = configProperties.getProperty(DDL_FILE_NAME_KEY);
this.dataSource = dataSource;
dbUrl = null;
createTables(configProperties);
}
private void createTables(final Properties configProperties) {
String sqlFile = configProperties.getProperty(SQL_FILE_NAME_KEY);
if (sqlFile != null) {
sqlFile = sqlFile.trim();
}
if (sqlFile == null || sqlFile.isEmpty()) {
sqlFile = DEFAULT_SQL_FILE;
}
final String tablePrefix = configProperties.getProperty(DB_TABLE_PREFIX_KEY, "").trim();
final String tableSuffix = configProperties.getProperty(DB_TABLE_SUFFIX_KEY, "").trim();
final Pattern tableNamesPattern = tablePrefix.length() > 0 || tableSuffix.length() > 0 ?
Pattern.compile("JOB_INSTANCE|JOB_EXECUTION|STEP_EXECUTION|PARTITION_EXECUTION"): null;
final InputStream sqlResource = getClassLoader(false).getResourceAsStream(sqlFile);
try {
if (sqlResource == null) {
throw BatchMessages.MESSAGES.failToLoadSqlProperties(null, sqlFile);
}
sqls.load(sqlResource);
if (tableNamesPattern != null) {
BatchLogger.LOGGER.tracef("Applying batch job repository table prefix %s and suffix %s%n",
tablePrefix, tableSuffix);
sqls.replaceAll((k, v) -> addPrefixSuffix((String) v, tablePrefix, tableSuffix, tableNamesPattern));
}
} catch (final IOException e) {
throw BatchMessages.MESSAGES.failToLoadSqlProperties(e, sqlFile);
} finally {
if (sqlResource != null) {
try {
sqlResource.close();
} catch (final IOException e) {
BatchLogger.LOGGER.failToClose(e, InputStream.class, sqlResource);
}
}
}
//first test table existence by running a query against the last table in the ddl entry list
final String countPartitionExecutions = sqls.getProperty(COUNT_PARTITION_EXECUTIONS);
Connection connection1 = getConnection();
ResultSet rs = null;
PreparedStatement countPartitionExecutionStatement = null;
PreparedStatement countJobInstancesStatement = null;
InputStream ddlResource = null;
String databaseProductName = "";
try {
databaseProductName = connection1.getMetaData().getDatabaseProductName().trim();
} catch (final SQLException e) {
BatchLogger.LOGGER.failToGetDatabaseProductName(e, connection1);
close(connection1, null, null, null);
connection1 = getConnection();
} catch (final Exception e) {
BatchLogger.LOGGER.failToGetDatabaseProductName(e, connection1);
}
if (databaseProductName.startsWith("Oracle")) {
isOracle = true;
idIndexInOracle = new int[]{1};
}
try {
countPartitionExecutionStatement = connection1.prepareStatement(countPartitionExecutions);
rs = countPartitionExecutionStatement.executeQuery();
} catch (final SQLException e) {
final String ddlFile = getDDLLocation(databaseProductName);
ddlResource = getClassLoader(false).getResourceAsStream(ddlFile);
if (ddlResource == null) {
throw BatchMessages.MESSAGES.failToLoadDDL(ddlFile);
}
final java.util.Scanner scanner = new java.util.Scanner(ddlResource).useDelimiter("!!");
Connection connection2 = null;
Statement batchDDLStatement = null;
try {
connection2 = getConnection();
batchDDLStatement = connection2.createStatement();
while (scanner.hasNext()) {
String ddlEntry = scanner.next().trim();
if (!ddlEntry.isEmpty()) {
if (tableNamesPattern != null) {
ddlEntry = addPrefixSuffix(ddlEntry, tablePrefix, tableSuffix, tableNamesPattern);
}
batchDDLStatement.addBatch(ddlEntry);
BatchLogger.LOGGER.addDDLEntry(ddlEntry);
}
}
scanner.close();
batchDDLStatement.executeBatch();
BatchLogger.LOGGER.tableCreated(ddlFile);
} catch (final Exception e1) {
//check if the tables have just been created by another concurrent client in the interim
try {
final String countJobInstances = sqls.getProperty(COUNT_JOB_INSTANCES_BY_JOB_NAME);
countJobInstancesStatement = connection1.prepareStatement(countJobInstances);
countJobInstancesStatement.setString(1, "A");
rs = countJobInstancesStatement.executeQuery();
BatchLogger.LOGGER.tracef(
"This invocation needed to create tables since they didn't exit, but failed to create because they've been created by another concurrent invocation, so ignore the exception and return normally: %s", e1);
} catch (final SQLException sqle) {
//still cannot access the table, so fail it
throw BatchMessages.MESSAGES.failToCreateTables(e1, databaseProductName, ddlFile);
}
} finally {
close(connection2, batchDDLStatement, null, null);
}
} finally {
close(connection1, countPartitionExecutionStatement, countJobInstancesStatement, rs);
try {
if (ddlResource != null) {
ddlResource.close();
}
} catch (final Exception e) {
BatchLogger.LOGGER.failToClose(e, InputStream.class, ddlResource);
}
}
}
@Override
void insertJobInstance(final JobInstanceImpl jobInstance) {
final String insert = sqls.getProperty(INSERT_JOB_INSTANCE);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
try {
preparedStatement = isOracle ? connection.prepareStatement(insert, idIndexInOracle) :
connection.prepareStatement(insert, Statement.RETURN_GENERATED_KEYS);
preparedStatement.setString(1, jobInstance.getJobName());
preparedStatement.setString(2, jobInstance.getApplicationName());
preparedStatement.executeUpdate();
rs = preparedStatement.getGeneratedKeys();
rs.next();
jobInstance.setId(rs.getLong(1));
BatchLogger.LOGGER.persisted(jobInstance, jobInstance.getInstanceId());
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, insert);
} finally {
close(connection, preparedStatement, null, rs);
}
}
@Override
public List<JobInstance> getJobInstances(final String jobName) {
final boolean selectAll = jobName == null || jobName.equals("*");
final String select = selectAll ? sqls.getProperty(SELECT_ALL_JOB_INSTANCES) :
sqls.getProperty(SELECT_JOB_INSTANCES_BY_JOB_NAME);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
final List<JobInstance> result = new ArrayList<JobInstance>();
try {
preparedStatement = connection.prepareStatement(select);
if (!selectAll) {
preparedStatement.setString(1, jobName);
}
rs = preparedStatement.executeQuery();
while (rs.next()) {
final long i = rs.getLong(TableColumns.JOBINSTANCEID);
final SoftReference<JobInstanceImpl, Long> ref = jobInstances.get(i);
JobInstanceImpl jobInstance1 = (ref != null) ? ref.get() : null;
if (jobInstance1 == null) {
final String appName = rs.getString(TableColumns.APPLICATIONNAME);
if (selectAll) {
final String goodJobName = rs.getString(TableColumns.JOBNAME);
jobInstance1 = new JobInstanceImpl(getJob(new ApplicationAndJobName(appName, goodJobName)), appName, goodJobName);
} else {
jobInstance1 = new JobInstanceImpl(getJob(new ApplicationAndJobName(appName, jobName)), appName, jobName);
}
jobInstance1.setId(i);
jobInstances.put(i, new SoftReference<JobInstanceImpl, Long>(jobInstance1, jobInstanceReferenceQueue, i));
}
//this job instance is already in the cache, so get it from the cache
result.add(jobInstance1);
}
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, select);
} finally {
close(connection, preparedStatement, null, rs);
}
return result;
}
@Override
public JobInstanceImpl getJobInstance(final long jobInstanceId) {
JobInstanceImpl result = super.getJobInstance(jobInstanceId);
if (result != null) {
return result;
}
final String select = sqls.getProperty(SELECT_JOB_INSTANCE);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(select);
preparedStatement.setLong(1, jobInstanceId);
rs = preparedStatement.executeQuery();
while (rs.next()) {
final SoftReference<JobInstanceImpl, Long> jobInstanceSoftReference = jobInstances.get(jobInstanceId);
result = jobInstanceSoftReference != null ? jobInstanceSoftReference.get() : null;
if (result == null) {
final String appName = rs.getString(TableColumns.APPLICATIONNAME);
final String goodJobName = rs.getString(TableColumns.JOBNAME);
result = new JobInstanceImpl(getJob(new ApplicationAndJobName(appName, goodJobName)), appName, goodJobName);
result.setId(jobInstanceId);
jobInstances.put(jobInstanceId,
new SoftReference<JobInstanceImpl, Long>(result, jobInstanceReferenceQueue, jobInstanceId));
}
break;
}
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, select);
} finally {
close(connection, preparedStatement, null, rs);
}
return result;
}
@Override
public int getJobInstanceCount(final String jobName) {
final String select = sqls.getProperty(COUNT_JOB_INSTANCES_BY_JOB_NAME);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
int count = 0;
try {
preparedStatement = connection.prepareStatement(select);
preparedStatement.setString(1, jobName);
rs = preparedStatement.executeQuery();
while (rs.next()) {
count = rs.getInt(1);
break;
}
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, select);
} finally {
close(connection, preparedStatement, null, rs);
}
return count;
}
@Override
void insertJobExecution(final JobExecutionImpl jobExecution) {
final String insert = sqls.getProperty(INSERT_JOB_EXECUTION);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
try {
preparedStatement = isOracle ? connection.prepareStatement(insert, idIndexInOracle) :
connection.prepareStatement(insert, Statement.RETURN_GENERATED_KEYS);
preparedStatement.setLong(1, jobExecution.getJobInstance().getInstanceId());
preparedStatement.setTimestamp(2, createTimestamp(jobExecution.getCreateTime()));
preparedStatement.setString(3, jobExecution.getBatchStatus().name());
preparedStatement.setString(4, BatchUtil.propertiesToString(jobExecution.getJobParameters()));
preparedStatement.executeUpdate();
rs = preparedStatement.getGeneratedKeys();
rs.next();
jobExecution.setId(rs.getLong(1));
BatchLogger.LOGGER.persisted(jobExecution, jobExecution.getExecutionId());
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, insert);
} finally {
close(connection, preparedStatement, null, rs);
}
}
@Override
public void updateJobExecution(final JobExecutionImpl jobExecution, final boolean fullUpdate, final boolean saveJobParameters) {
super.updateJobExecution(jobExecution, fullUpdate, saveJobParameters);
final String update;
if (fullUpdate) {
if (saveJobParameters) {
update = sqls.getProperty(UPDATE_JOB_EXECUTION_AND_PARAMETERS);
} else {
update = sqls.getProperty(UPDATE_JOB_EXECUTION);
}
} else {
update = sqls.getProperty(UPDATE_JOB_EXECUTION_PARTIAL);
}
final Connection connection = getConnection();
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(update);
if (fullUpdate) {
preparedStatement.setTimestamp(1, createTimestamp(jobExecution.getEndTime()));
preparedStatement.setTimestamp(2, createTimestamp(jobExecution.getLastUpdatedTime()));
preparedStatement.setString(3, jobExecution.getBatchStatus().name());
preparedStatement.setString(4, jobExecution.getExitStatus());
preparedStatement.setString(5, jobExecution.combineRestartPositionAndUser());
if (saveJobParameters) {
preparedStatement.setString(6, BatchUtil.propertiesToString(jobExecution.getJobParameters())); //job parameters
preparedStatement.setLong(7, jobExecution.getExecutionId()); //where clause
} else {
preparedStatement.setLong(6, jobExecution.getExecutionId()); //where clause
}
} else {
preparedStatement.setTimestamp(1, createTimestamp(jobExecution.getLastUpdatedTime()));
preparedStatement.setTimestamp(2, createTimestamp(jobExecution.getStartTime()));
preparedStatement.setString(3, jobExecution.getBatchStatus().name());
preparedStatement.setLong(4, jobExecution.getExecutionId()); //where clause
}
preparedStatement.executeUpdate();
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, update);
} finally {
close(connection, preparedStatement, null, null);
}
}
@Override
public void stopJobExecution(final JobExecutionImpl jobExecution) {
super.stopJobExecution(jobExecution);
final String[] stopExecutionSqls = {
sqls.getProperty(STOP_JOB_EXECUTION),
sqls.getProperty(STOP_STEP_EXECUTION),
sqls.getProperty(STOP_PARTITION_EXECUTION)
};
final String jobExecutionIdString = String.valueOf(jobExecution.getExecutionId());
final String newBatchStatus = BatchStatus.STOPPING.toString();
final Connection connection = getConnection();
Statement stmt = null;
try {
stmt = connection.createStatement();
for (String sql : stopExecutionSqls) {
stmt.addBatch(sql.replace("?", jobExecutionIdString));
}
stmt.executeBatch();
} catch (Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, Arrays.toString(stopExecutionSqls));
} finally {
close(connection, stmt, null, null);
}
}
@Override
public JobExecutionImpl getJobExecution(final long jobExecutionId) {
JobExecutionImpl result = super.getJobExecution(jobExecutionId);
if (result != null && !isExecutionStale(result)) {
return result;
}
final String select = sqls.getProperty(SELECT_JOB_EXECUTION);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(select);
preparedStatement.setLong(1, jobExecutionId);
rs = preparedStatement.executeQuery();
while (rs.next()) {
final SoftReference<JobExecutionImpl, Long> ref = jobExecutions.get(jobExecutionId);
result = (ref != null) ? ref.get() : null;
final long jobInstanceId = rs.getLong(TableColumns.JOBINSTANCEID);
if (result == null) {
result = new JobExecutionImpl(getJobInstance(jobInstanceId),
jobExecutionId,
BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS)),
rs.getTimestamp(TableColumns.CREATETIME),
rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME),
rs.getTimestamp(TableColumns.LASTUPDATEDTIME),
rs.getString(TableColumns.BATCHSTATUS),
rs.getString(TableColumns.EXITSTATUS),
rs.getString(TableColumns.RESTARTPOSITION));
jobExecutions.put(jobExecutionId,
new SoftReference<JobExecutionImpl, Long>(result, jobExecutionReferenceQueue, jobExecutionId));
} else {
if (result.getEndTime() == null && rs.getTimestamp(TableColumns.ENDTIME) != null) {
final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS));
result = new JobExecutionImpl(getJobInstance(jobInstanceId),
jobExecutionId,
BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS)),
rs.getTimestamp(TableColumns.CREATETIME),
rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME),
rs.getTimestamp(TableColumns.LASTUPDATEDTIME),
rs.getString(TableColumns.BATCHSTATUS),
rs.getString(TableColumns.EXITSTATUS),
rs.getString(TableColumns.RESTARTPOSITION));
jobExecutions.replace(jobExecutionId,
new SoftReference<JobExecutionImpl, Long>(result, jobExecutionReferenceQueue, jobExecutionId));
}
}
break;
}
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, select);
} finally {
close(connection, preparedStatement, null, rs);
}
return result;
}
@Override
public List<JobExecution> getJobExecutions(final JobInstance jobInstance) {
final String select;
long jobInstanceId = 0;
if (jobInstance == null) {
select = sqls.getProperty(SELECT_ALL_JOB_EXECUTIONS);
} else {
select = sqls.getProperty(SELECT_JOB_EXECUTIONS_BY_JOB_INSTANCE_ID);
jobInstanceId = jobInstance.getInstanceId();
}
final List<JobExecution> result = new ArrayList<JobExecution>();
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(select);
if (jobInstance != null) {
preparedStatement.setLong(1, jobInstanceId);
}
rs = preparedStatement.executeQuery();
while (rs.next()) {
final long executionId = rs.getLong(TableColumns.JOBEXECUTIONID);
final SoftReference<JobExecutionImpl, Long> ref = jobExecutions.get(executionId);
JobExecutionImpl jobExecution1 = (ref != null) ? ref.get() : null;
if (jobExecution1 == null) {
if (jobInstance == null) {
jobInstanceId = rs.getLong(TableColumns.JOBINSTANCEID);
}
final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS));
jobExecution1 =
new JobExecutionImpl(getJobInstance(jobInstanceId), executionId, jobParameters1,
rs.getTimestamp(TableColumns.CREATETIME), rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME), rs.getTimestamp(TableColumns.LASTUPDATEDTIME),
rs.getString(TableColumns.BATCHSTATUS), rs.getString(TableColumns.EXITSTATUS),
rs.getString(TableColumns.RESTARTPOSITION));
jobExecutions.put(executionId,
new SoftReference<JobExecutionImpl, Long>(jobExecution1, jobExecutionReferenceQueue, executionId));
} else {
if (jobExecution1.getEndTime() == null && rs.getTimestamp(TableColumns.ENDTIME) != null) {
final Properties jobParameters1 = BatchUtil.stringToProperties(rs.getString(TableColumns.JOBPARAMETERS));
jobExecution1 =
new JobExecutionImpl(getJobInstance(jobInstanceId), executionId, jobParameters1,
rs.getTimestamp(TableColumns.CREATETIME), rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME), rs.getTimestamp(TableColumns.LASTUPDATEDTIME),
rs.getString(TableColumns.BATCHSTATUS), rs.getString(TableColumns.EXITSTATUS),
rs.getString(TableColumns.RESTARTPOSITION));
jobExecutions.replace(executionId,
new SoftReference<JobExecutionImpl, Long>(jobExecution1, jobExecutionReferenceQueue, executionId));
}
}
// jobExecution1 is either got from the cache, or created, now add it to the result list
result.add(jobExecution1);
}
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, select);
} finally {
close(connection, preparedStatement, null, rs);
}
return result;
}
private boolean isExecutionStale(final JobExecutionImpl jobExecution) {
final BatchStatus jobStatus = jobExecution.getBatchStatus();
if (jobStatus.equals(BatchStatus.COMPLETED) ||
jobStatus.equals(BatchStatus.FAILED) ||
jobStatus.equals(BatchStatus.STOPPED) ||
jobStatus.equals(BatchStatus.ABANDONED) || jobExecution.getStepExecutions().size() >= 1) {
return false;
}
return true;
}
@Override
public List<Long> getRunningExecutions(final String jobName) {
final String select = sqls.getProperty(SELECT_RUNNING_JOB_EXECUTIONS_BY_JOB_NAME);
return getJobExecutions0(select, jobName, true, null);
}
/**
* {@inheritDoc}
*/
@Override
public List<Long> getJobExecutionsByJob(final String jobName) {
return getJobExecutionsByJob(jobName, null);
}
@Override
public List<Long> getJobExecutionsByJob(String jobName, Integer limit) {
final String select = sqls.getProperty(SELECT_JOB_EXECUTIONS_BY_JOB_NAME);
return getJobExecutions0(select, jobName, false, limit);
}
@Override
void insertStepExecution(final StepExecutionImpl stepExecution, final JobExecutionImpl jobExecution) {
final String insert = sqls.getProperty(INSERT_STEP_EXECUTION);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
try {
preparedStatement = isOracle ? connection.prepareStatement(insert, idIndexInOracle) :
connection.prepareStatement(insert, Statement.RETURN_GENERATED_KEYS);
preparedStatement.setLong(1, jobExecution.getExecutionId());
preparedStatement.setString(2, stepExecution.getStepName());
preparedStatement.setTimestamp(3, new Timestamp(stepExecution.getStartTime().getTime()));
preparedStatement.setString(4, stepExecution.getBatchStatus().name());
preparedStatement.executeUpdate();
rs = preparedStatement.getGeneratedKeys();
rs.next();
stepExecution.setId(rs.getLong(1));
BatchLogger.LOGGER.persisted(stepExecution, stepExecution.getStepExecutionId());
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, insert);
} finally {
close(connection, preparedStatement, null, rs);
}
}
@Override
public void updateStepExecution(final StepExecution stepExecution) {
updateStepExecution0(stepExecution, sqls.getProperty(UPDATE_STEP_EXECUTION));
}
@Override
public int savePersistentDataIfNotStopping(final JobExecution jobExecution, final AbstractStepExecution stepOrPartitionExecution) {
if (stepOrPartitionExecution instanceof StepExecutionImpl) {
//stepExecution is for the main step, and should map to the STEP_EXECUTIOIN table
return updateStepExecution0(stepOrPartitionExecution, sqls.getProperty(UPDATE_STEP_EXECUTION_IF_NOT_STOPPING));
} else {
//stepExecutionId is for a partition execution, and should map to the PARTITION_EXECUTION table
return updatePartitionExecution((PartitionExecutionImpl) stepOrPartitionExecution, sqls.getProperty(UPDATE_PARTITION_EXECUTION_IF_NOT_STOPPING));
}
}
@Override
public void savePersistentData(final JobExecution jobExecution, final AbstractStepExecution stepOrPartitionExecution) {
//super.savePersistentData() serialize persistent data and checkpoint info to avoid further modification
super.savePersistentData(jobExecution, stepOrPartitionExecution);
if (stepOrPartitionExecution instanceof StepExecutionImpl) {
//stepExecution is for the main step, and should map to the STEP_EXECUTIOIN table
updateStepExecution(stepOrPartitionExecution);
} else {
//stepExecutionId is for a partition execution, and should map to the PARTITION_EXECUTION table
updatePartitionExecution((PartitionExecutionImpl) stepOrPartitionExecution, sqls.getProperty(UPDATE_PARTITION_EXECUTION));
}
}
/*
StepExecution selectStepExecution(final long stepExecutionId, final ClassLoader classLoader) {
final String select = sqls.getProperty(SELECT_STEP_EXECUTION);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
final List<StepExecution> result = new ArrayList<StepExecution>();
try {
preparedStatement = connection.prepareStatement(select);
preparedStatement.setLong(1, stepExecutionId);
rs = preparedStatement.executeQuery();
createStepExecutionsFromResultSet(rs, result, false, classLoader);
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, select);
} finally {
close(connection, preparedStatement, null, rs);
}
return result.get(0);
}
*/
/**
* Retrieves a list of StepExecution from database by JobExecution id. This method does not check the cache, so it
* should only be called after the cache has been searched without a match.
*
* @param jobExecutionId if null, retrieves all StepExecutions; otherwise, retrieves all StepExecutions belongs to the JobExecution id
* @param classLoader the current application class loader
* @return a list of StepExecutions
*/
@Override
List<StepExecution> selectStepExecutions(final Long jobExecutionId, final ClassLoader classLoader) {
final String select = (jobExecutionId == null) ? sqls.getProperty(SELECT_ALL_STEP_EXECUTIONS) :
sqls.getProperty(SELECT_STEP_EXECUTIONS_BY_JOB_EXECUTION_ID);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
final List<StepExecution> result = new ArrayList<StepExecution>();
try {
preparedStatement = connection.prepareStatement(select);
if (jobExecutionId != null) {
preparedStatement.setLong(1, jobExecutionId);
}
rs = preparedStatement.executeQuery();
createStepExecutionsFromResultSet(rs, result, false, classLoader);
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, select);
} finally {
close(connection, preparedStatement, null, rs);
}
return result;
}
@Override
public void addPartitionExecution(final StepExecutionImpl enclosingStepExecution, final PartitionExecutionImpl partitionExecution) {
super.addPartitionExecution(enclosingStepExecution, partitionExecution);
final String insert = sqls.getProperty(INSERT_PARTITION_EXECUTION);
final Connection connection = getConnection();
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(insert);
preparedStatement.setInt(1, partitionExecution.getPartitionId());
preparedStatement.setLong(2, partitionExecution.getStepExecutionId());
preparedStatement.setString(3, partitionExecution.getBatchStatus().name());
preparedStatement.executeUpdate();
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, insert);
} finally {
close(connection, preparedStatement, null, null);
}
}
@Override
public StepExecutionImpl findOriginalStepExecutionForRestart(final String stepName,
final JobExecutionImpl jobExecutionToRestart,
final ClassLoader classLoader) {
final StepExecutionImpl result = super.findOriginalStepExecutionForRestart(stepName, jobExecutionToRestart, classLoader);
if (result != null) {
return result;
}
final String select = sqls.getProperty(FIND_ORIGINAL_STEP_EXECUTION);
final Connection connection = getConnection();
ResultSet rs = null;
final List<StepExecution> results = new ArrayList<StepExecution>();
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(select);
preparedStatement.setLong(1, jobExecutionToRestart.getJobInstance().getInstanceId());
preparedStatement.setString(2, stepName);
rs = preparedStatement.executeQuery();
createStepExecutionsFromResultSet(rs, results, true, classLoader);
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, select);
} finally {
close(connection, preparedStatement, null, rs);
}
return results.size() > 0 ? (StepExecutionImpl) results.get(0) : null;
}
@Override
public List<PartitionExecutionImpl> getPartitionExecutions(final long stepExecutionId,
final StepExecutionImpl stepExecution,
final boolean notCompletedOnly,
final ClassLoader classLoader) {
List<PartitionExecutionImpl> result = super.getPartitionExecutions(stepExecutionId, stepExecution, notCompletedOnly, classLoader);
if (result != null && !result.isEmpty()) {
return result;
}
final String select = sqls.getProperty(SELECT_PARTITION_EXECUTIONS_BY_STEP_EXECUTION_ID);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
result = new ArrayList<PartitionExecutionImpl>();
try {
preparedStatement = connection.prepareStatement(select);
preparedStatement.setLong(1, stepExecutionId);
rs = preparedStatement.executeQuery();
while (rs.next()) {
final String batchStatusValue = rs.getString(TableColumns.BATCHSTATUS);
if (!notCompletedOnly ||
!BatchStatus.COMPLETED.name().equals(batchStatusValue)) {
result.add(new PartitionExecutionImpl(
rs.getInt(TableColumns.PARTITIONEXECUTIONID),
rs.getLong(TableColumns.STEPEXECUTIONID),
stepExecution.getStepName(),
BatchStatus.valueOf(batchStatusValue),
rs.getString(TableColumns.EXITSTATUS),
rs.getBytes(TableColumns.PERSISTENTUSERDATA),
rs.getBytes(TableColumns.READERCHECKPOINTINFO),
rs.getBytes(TableColumns.WRITERCHECKPOINTINFO)
));
}
}
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, select);
} finally {
close(connection, preparedStatement, null, rs);
}
return result;
}
/**
* Updates the partition execution in job repository, using the {@code updateSql} passed in.
* @param partitionExecution the partition execution to update to job repository
* @param updateSql the update sql to use
* @return the number of rows affected by this update sql execution
*/
private int updatePartitionExecution(final PartitionExecutionImpl partitionExecution, final String updateSql) {
final Connection connection = getConnection();
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(updateSql);
preparedStatement.setString(1, partitionExecution.getBatchStatus().name());
preparedStatement.setString(2, partitionExecution.getExitStatus());
preparedStatement.setString(3, TableColumns.formatException(partitionExecution.getException()));
preparedStatement.setBytes(4, partitionExecution.getPersistentUserDataSerialized());
preparedStatement.setBytes(5, partitionExecution.getReaderCheckpointInfoSerialized());
preparedStatement.setBytes(6, partitionExecution.getWriterCheckpointInfoSerialized());
preparedStatement.setInt(7, partitionExecution.getPartitionId());
preparedStatement.setLong(8, partitionExecution.getStepExecutionId());
return preparedStatement.executeUpdate();
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, updateSql);
} finally {
close(connection, preparedStatement, null, null);
}
}
/**
* Updates the step execution in job repository, using the {@code updateSql} passed in.
* @param stepExecution the step execution to update to job repository
* @param updateSql the update sql to use
* @return the number of rows affected by this update sql execution
*/
private int updateStepExecution0(final StepExecution stepExecution, final String updateSql) {
final Connection connection = getConnection();
final StepExecutionImpl stepExecutionImpl = (StepExecutionImpl) stepExecution;
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(updateSql);
preparedStatement.setTimestamp(1, createTimestamp(stepExecution.getEndTime()));
preparedStatement.setString(2, stepExecution.getBatchStatus().name());
preparedStatement.setString(3, stepExecution.getExitStatus());
preparedStatement.setString(4, TableColumns.formatException(stepExecutionImpl.getException()));
preparedStatement.setBytes(5, stepExecutionImpl.getPersistentUserDataSerialized());
preparedStatement.setLong(6, stepExecutionImpl.getStepMetrics().get(Metric.MetricType.READ_COUNT));
preparedStatement.setLong(7, stepExecutionImpl.getStepMetrics().get(Metric.MetricType.WRITE_COUNT));
preparedStatement.setLong(8, stepExecutionImpl.getStepMetrics().get(Metric.MetricType.COMMIT_COUNT));
preparedStatement.setLong(9, stepExecutionImpl.getStepMetrics().get(Metric.MetricType.ROLLBACK_COUNT));
preparedStatement.setLong(10, stepExecutionImpl.getStepMetrics().get(Metric.MetricType.READ_SKIP_COUNT));
preparedStatement.setLong(11, stepExecutionImpl.getStepMetrics().get(Metric.MetricType.PROCESS_SKIP_COUNT));
preparedStatement.setLong(12, stepExecutionImpl.getStepMetrics().get(Metric.MetricType.FILTER_COUNT));
preparedStatement.setLong(13, stepExecutionImpl.getStepMetrics().get(Metric.MetricType.WRITE_SKIP_COUNT));
preparedStatement.setBytes(14, stepExecutionImpl.getReaderCheckpointInfoSerialized());
preparedStatement.setBytes(15, stepExecutionImpl.getWriterCheckpointInfoSerialized());
preparedStatement.setLong(16, stepExecution.getStepExecutionId());
return preparedStatement.executeUpdate();
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, updateSql);
} finally {
close(connection, preparedStatement, null, null);
}
}
private void createStepExecutionsFromResultSet(final ResultSet rs,
final List<StepExecution> result,
final boolean top1,
final ClassLoader classLoader)
throws SQLException, ClassNotFoundException, IOException {
while (rs.next()) {
final StepExecutionImpl e = new StepExecutionImpl(
rs.getLong(TableColumns.STEPEXECUTIONID),
rs.getString(TableColumns.STEPNAME),
rs.getTimestamp(TableColumns.STARTTIME),
rs.getTimestamp(TableColumns.ENDTIME),
rs.getString(TableColumns.BATCHSTATUS),
rs.getString(TableColumns.EXITSTATUS),
rs.getBytes(TableColumns.PERSISTENTUSERDATA),
rs.getInt(TableColumns.READCOUNT),
rs.getInt(TableColumns.WRITECOUNT),
rs.getInt(TableColumns.COMMITCOUNT),
rs.getInt(TableColumns.ROLLBACKCOUNT),
rs.getInt(TableColumns.READSKIPCOUNT),
rs.getInt(TableColumns.PROCESSSKIPCOUNT),
rs.getInt(TableColumns.FILTERCOUNT),
rs.getInt(TableColumns.WRITESKIPCOUNT),
rs.getBytes(TableColumns.READERCHECKPOINTINFO),
rs.getBytes(TableColumns.WRITERCHECKPOINTINFO)
);
result.add(e);
if (top1) {
return;
}
}
}
@Override
public int countStepStartTimes(final String stepName, final long jobInstanceId) {
final String select = sqls.getProperty(COUNT_STEP_EXECUTIONS_BY_JOB_INSTANCE_ID);
final Connection connection = getConnection();
ResultSet rs = null;
PreparedStatement preparedStatement = null;
int count = 0;
try {
preparedStatement = connection.prepareStatement(select);
preparedStatement.setString(1, stepName);
preparedStatement.setLong(2, jobInstanceId);
rs = preparedStatement.executeQuery();
while (rs.next()) {
count = rs.getInt(1);
break;
}
} catch (final Exception e) {
throw BatchMessages.MESSAGES.failToRunQuery(e, select);
} finally {
close(connection, preparedStatement, null, rs);
}
return count;
}