From 6de65f306af657c2ad33b8397b2fa4540cb9d4d5 Mon Sep 17 00:00:00 2001 From: Kodai Doki Date: Tue, 14 Oct 2025 18:54:42 +0900 Subject: [PATCH 1/2] Add more waits for cache expiry --- .../JdbcTransactionAdminIntegrationTest.java | 6 + ...edTransactionAdminIntegrationTestBase.java | 228 +++++++++--------- ...nsensusCommitAdminIntegrationTestBase.java | 8 + ...onTransactionAdminIntegrationTestBase.java | 8 + 4 files changed, 140 insertions(+), 110 deletions(-) diff --git a/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionAdminIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionAdminIntegrationTest.java index 22f40693b0..299c520110 100644 --- a/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionAdminIntegrationTest.java +++ b/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionAdminIntegrationTest.java @@ -499,6 +499,9 @@ protected boolean isIndexOnBlobColumnSupported() { @Override protected void transactionalInsert(DistributedTransactionManager manager, Insert insert) throws TransactionException { + // Wait for cache expiry + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + DistributedTransaction transaction = manager.start(); transaction.insert(insert); transaction.commit(); @@ -507,6 +510,9 @@ protected void transactionalInsert(DistributedTransactionManager manager, Insert @Override protected List transactionalScan(DistributedTransactionManager manager, Scan scan) throws TransactionException { + // Wait for cache expiry + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + DistributedTransaction transaction = manager.start(); List results = transaction.scan(scan); transaction.commit(); diff --git a/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionAdminIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionAdminIntegrationTestBase.java index 14e3e9caa3..815810e417 100644 --- a/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionAdminIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionAdminIntegrationTestBase.java @@ -426,16 +426,21 @@ public void dropTable_IfExists_ForNonExistingTable_ShouldNotThrowAnyException() @Test public void truncateTable_ShouldTruncateProperly() throws ExecutionException, TransactionException { - DistributedTransactionManager manager = null; - try { + // Use a separate table name to avoid hitting the stale cache, which can cause test failure when + // executing DMLs + String table = "table_for_truncate"; + + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { // Arrange + Map options = getCreationOptions(); + admin.createTable(namespace1, table, TABLE_METADATA, true, options); Key partitionKey = Key.of(COL_NAME2, "aaa", COL_NAME1, 1); Key clusteringKey = Key.of(COL_NAME4, 2, COL_NAME3, "bbb"); - manager = transactionFactory.getTransactionManager(); - manager.put( - Put.newBuilder() + transactionalInsert( + manager, + Insert.newBuilder() .namespace(namespace1) - .table(TABLE1) + .table(table) .partitionKey(partitionKey) .clusteringKey(clusteringKey) .intValue(COL_NAME5, 3) @@ -448,21 +453,20 @@ public void truncateTable_ShouldTruncateProperly() .build()); // Act - admin.truncateTable(namespace1, TABLE1); + admin.truncateTable(namespace1, table); // Assert List results = - manager.scan( + transactionalScan( + manager, Scan.newBuilder() .namespace(namespace1) - .table(TABLE1) + .table(table) .partitionKey(partitionKey) .build()); assertThat(results).isEmpty(); } finally { - if (manager != null) { - manager.close(); - } + admin.dropTable(namespace1, table, true); } } @@ -510,8 +514,11 @@ public void tableExists_ShouldReturnCorrectResults() throws ExecutionException { @Test public void createIndex_ForAllDataTypesWithExistingData_ShouldCreateIndexesCorrectly() throws Exception { - DistributedTransactionManager transactionManager = null; - try { + // Use a separate table name to avoid hitting the stale cache, which can cause test failure when + // executing DMLs + String table = "table_for_create_index"; + + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { // Arrange Map options = getCreationOptions(); TableMetadata.Builder metadataBuilder = @@ -534,12 +541,11 @@ public void createIndex_ForAllDataTypesWithExistingData_ShouldCreateIndexesCorre metadataBuilder = metadataBuilder.addColumn(COL_NAME13, DataType.TIMESTAMP); } TableMetadata metadata = metadataBuilder.build(); - admin.createTable(namespace1, TABLE4, metadata, options); - transactionManager = transactionFactory.getTransactionManager(); + admin.createTable(namespace1, table, metadata, options); InsertBuilder.Buildable insert = Insert.newBuilder() .namespace(namespace1) - .table(TABLE4) + .table(table) .partitionKey(Key.ofInt(COL_NAME1, 1)) .intValue(COL_NAME2, 2) .textValue(COL_NAME3, "3") @@ -560,53 +566,53 @@ public void createIndex_ForAllDataTypesWithExistingData_ShouldCreateIndexesCorre COL_NAME13, LocalDateTime.of(LocalDate.of(2020, 6, 2), LocalTime.of(12, 2, 6, 123_000_000))); } - transactionManager.insert(insert.build()); + transactionalInsert(manager, insert.build()); // Act - admin.createIndex(namespace1, TABLE4, COL_NAME2, options); + admin.createIndex(namespace1, table, COL_NAME2, options); if (isCreateIndexOnTextColumnEnabled()) { - admin.createIndex(namespace1, TABLE4, COL_NAME3, options); + admin.createIndex(namespace1, table, COL_NAME3, options); } - admin.createIndex(namespace1, TABLE4, COL_NAME4, options); - admin.createIndex(namespace1, TABLE4, COL_NAME5, options); - admin.createIndex(namespace1, TABLE4, COL_NAME6, options); + admin.createIndex(namespace1, table, COL_NAME4, options); + admin.createIndex(namespace1, table, COL_NAME5, options); + admin.createIndex(namespace1, table, COL_NAME6, options); if (isIndexOnBooleanColumnSupported()) { - admin.createIndex(namespace1, TABLE4, COL_NAME7, options); + admin.createIndex(namespace1, table, COL_NAME7, options); } if (isIndexOnBlobColumnSupported()) { - admin.createIndex(namespace1, TABLE4, COL_NAME8, options); + admin.createIndex(namespace1, table, COL_NAME8, options); } - admin.createIndex(namespace1, TABLE4, COL_NAME10, options); - admin.createIndex(namespace1, TABLE4, COL_NAME11, options); - admin.createIndex(namespace1, TABLE4, COL_NAME12, options); + admin.createIndex(namespace1, table, COL_NAME10, options); + admin.createIndex(namespace1, table, COL_NAME11, options); + admin.createIndex(namespace1, table, COL_NAME12, options); if (isTimestampTypeSupported()) { - admin.createIndex(namespace1, TABLE4, COL_NAME13, options); + admin.createIndex(namespace1, table, COL_NAME13, options); } // Assert - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME2)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME2)).isTrue(); if (isCreateIndexOnTextColumnEnabled()) { - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME3)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME3)).isTrue(); } - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME4)).isTrue(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME5)).isTrue(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME6)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME4)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME5)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME6)).isTrue(); if (isIndexOnBooleanColumnSupported()) { - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME7)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME7)).isTrue(); } if (isIndexOnBlobColumnSupported()) { - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME8)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME8)).isTrue(); } - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME9)).isTrue(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME10)).isTrue(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME11)).isTrue(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME12)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME9)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME10)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME11)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME12)).isTrue(); if (isTimestampTypeSupported()) { - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME13)).isTrue(); + assertThat(admin.indexExists(namespace1, table, COL_NAME13)).isTrue(); } Set actualSecondaryIndexNames = - admin.getTableMetadata(namespace1, TABLE4).getSecondaryIndexNames(); + admin.getTableMetadata(namespace1, table).getSecondaryIndexNames(); assertThat(actualSecondaryIndexNames) .contains(COL_NAME2, COL_NAME4, COL_NAME5, COL_NAME9, COL_NAME10, COL_NAME11, COL_NAME12); int indexCount = 8; @@ -629,10 +635,7 @@ public void createIndex_ForAllDataTypesWithExistingData_ShouldCreateIndexesCorre assertThat(actualSecondaryIndexNames).hasSize(indexCount); } finally { - admin.dropTable(namespace1, TABLE4, true); - if (transactionManager != null) { - transactionManager.close(); - } + admin.dropTable(namespace1, table, true); } } @@ -707,8 +710,11 @@ public void createIndex_IfNotExists_ForAlreadyExistingIndex_ShouldNotThrowAnyExc @Test public void dropIndex_ForAllDataTypesWithExistingData_ShouldDropIndexCorrectly() throws Exception { - DistributedTransactionManager transactionManager = null; - try { + // Use a separate table name to avoid hitting the stale cache, which can cause test failure when + // executing DMLs + String table = "table_for_alter_1"; + + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { // Arrange Map options = getCreationOptions(); TableMetadata.Builder metadataBuilder = @@ -746,12 +752,11 @@ public void dropIndex_ForAllDataTypesWithExistingData_ShouldDropIndexCorrectly() metadataBuilder.addColumn(COL_NAME13, DataType.TIMESTAMP); metadataBuilder.addSecondaryIndex(COL_NAME13); } - admin.createTable(namespace1, TABLE4, metadataBuilder.build(), options); - transactionManager = transactionFactory.getTransactionManager(); - PutBuilder.Buildable put = - Put.newBuilder() + admin.createTable(namespace1, table, metadataBuilder.build(), options); + InsertBuilder.Buildable insert = + Insert.newBuilder() .namespace(namespace1) - .table(TABLE4) + .table(table) .partitionKey(Key.ofInt(COL_NAME1, 1)) .intValue(COL_NAME2, 2) .textValue(COL_NAME3, "3") @@ -767,52 +772,49 @@ public void dropIndex_ForAllDataTypesWithExistingData_ShouldDropIndexCorrectly() LocalDateTime.of(LocalDate.of(2020, 6, 2), LocalTime.of(12, 2, 6, 123_000_000)) .toInstant(ZoneOffset.UTC)); if (isTimestampTypeSupported()) { - put.timestampValue( + insert.timestampValue( COL_NAME13, LocalDateTime.of(LocalDate.of(2020, 6, 2), LocalTime.of(12, 2, 6, 123_000_000))); } - transactionManager.put(put.build()); + transactionalInsert(manager, insert.build()); // Act - admin.dropIndex(namespace1, TABLE4, COL_NAME2); - admin.dropIndex(namespace1, TABLE4, COL_NAME3); - admin.dropIndex(namespace1, TABLE4, COL_NAME4); - admin.dropIndex(namespace1, TABLE4, COL_NAME5); - admin.dropIndex(namespace1, TABLE4, COL_NAME6); + admin.dropIndex(namespace1, table, COL_NAME2); + admin.dropIndex(namespace1, table, COL_NAME3); + admin.dropIndex(namespace1, table, COL_NAME4); + admin.dropIndex(namespace1, table, COL_NAME5); + admin.dropIndex(namespace1, table, COL_NAME6); if (isIndexOnBooleanColumnSupported()) { - admin.dropIndex(namespace1, TABLE4, COL_NAME7); + admin.dropIndex(namespace1, table, COL_NAME7); } if (isIndexOnBlobColumnSupported()) { - admin.dropIndex(namespace1, TABLE4, COL_NAME8); + admin.dropIndex(namespace1, table, COL_NAME8); } - admin.dropIndex(namespace1, TABLE4, COL_NAME10); - admin.dropIndex(namespace1, TABLE4, COL_NAME11); - admin.dropIndex(namespace1, TABLE4, COL_NAME12); + admin.dropIndex(namespace1, table, COL_NAME10); + admin.dropIndex(namespace1, table, COL_NAME11); + admin.dropIndex(namespace1, table, COL_NAME12); if (isTimestampTypeSupported()) { - admin.dropIndex(namespace1, TABLE4, COL_NAME13); + admin.dropIndex(namespace1, table, COL_NAME13); } // Assert - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME2)).isFalse(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME3)).isFalse(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME4)).isFalse(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME5)).isFalse(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME6)).isFalse(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME7)).isFalse(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME8)).isFalse(); - assertThat(admin.getTableMetadata(namespace1, TABLE4).getSecondaryIndexNames()) + assertThat(admin.indexExists(namespace1, table, COL_NAME2)).isFalse(); + assertThat(admin.indexExists(namespace1, table, COL_NAME3)).isFalse(); + assertThat(admin.indexExists(namespace1, table, COL_NAME4)).isFalse(); + assertThat(admin.indexExists(namespace1, table, COL_NAME5)).isFalse(); + assertThat(admin.indexExists(namespace1, table, COL_NAME6)).isFalse(); + assertThat(admin.indexExists(namespace1, table, COL_NAME7)).isFalse(); + assertThat(admin.indexExists(namespace1, table, COL_NAME8)).isFalse(); + assertThat(admin.getTableMetadata(namespace1, table).getSecondaryIndexNames()) .containsOnly(COL_NAME9); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME10)).isFalse(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME11)).isFalse(); - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME12)).isFalse(); + assertThat(admin.indexExists(namespace1, table, COL_NAME10)).isFalse(); + assertThat(admin.indexExists(namespace1, table, COL_NAME11)).isFalse(); + assertThat(admin.indexExists(namespace1, table, COL_NAME12)).isFalse(); if (isTimestampTypeSupported()) { - assertThat(admin.indexExists(namespace1, TABLE4, COL_NAME13)).isFalse(); + assertThat(admin.indexExists(namespace1, table, COL_NAME13)).isFalse(); } } finally { - admin.dropTable(namespace1, TABLE4, true); - if (transactionManager != null) { - transactionManager.close(); - } + admin.dropTable(namespace1, table, true); } } @@ -1181,8 +1183,11 @@ public void renameColumn_ForIndexKeyColumn_ShouldRenameColumnAndIndexCorrectly() public void alterColumnType_AlterColumnTypeFromEachExistingDataTypeToText_ShouldAlterColumnTypesCorrectly() throws ExecutionException, IOException, TransactionException { - try (DistributedTransactionManager transactionManager = - transactionFactory.getTransactionManager()) { + // Use a separate table name to avoid hitting the stale cache, which can cause test failure when + // executing DMLs + String table = "table_for_alter_2"; + + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -1205,11 +1210,11 @@ public void renameColumn_ForIndexKeyColumn_ShouldRenameColumnAndIndexCorrectly() .addColumn("c12", DataType.TIMESTAMPTZ); } TableMetadata currentTableMetadata = currentTableMetadataBuilder.build(); - admin.createTable(namespace1, TABLE4, currentTableMetadata, options); + admin.createTable(namespace1, table, currentTableMetadata, options); InsertBuilder.Buildable insert = Insert.newBuilder() .namespace(namespace1) - .table(TABLE4) + .table(table) .partitionKey(Key.ofInt("c1", 1)) .clusteringKey(Key.ofInt("c2", 2)) .intValue("c3", 1) @@ -1224,20 +1229,20 @@ public void renameColumn_ForIndexKeyColumn_ShouldRenameColumnAndIndexCorrectly() insert.timestampValue("c11", LocalDateTime.now(ZoneOffset.UTC)); insert.timestampTZValue("c12", Instant.now()); } - transactionalInsert(transactionManager, insert.build()); + transactionalInsert(manager, insert.build()); // Act - admin.alterColumnType(namespace1, TABLE4, "c3", DataType.TEXT); - admin.alterColumnType(namespace1, TABLE4, "c4", DataType.TEXT); - admin.alterColumnType(namespace1, TABLE4, "c5", DataType.TEXT); - admin.alterColumnType(namespace1, TABLE4, "c6", DataType.TEXT); - admin.alterColumnType(namespace1, TABLE4, "c7", DataType.TEXT); - admin.alterColumnType(namespace1, TABLE4, "c8", DataType.TEXT); - admin.alterColumnType(namespace1, TABLE4, "c9", DataType.TEXT); - admin.alterColumnType(namespace1, TABLE4, "c10", DataType.TEXT); + admin.alterColumnType(namespace1, table, "c3", DataType.TEXT); + admin.alterColumnType(namespace1, table, "c4", DataType.TEXT); + admin.alterColumnType(namespace1, table, "c5", DataType.TEXT); + admin.alterColumnType(namespace1, table, "c6", DataType.TEXT); + admin.alterColumnType(namespace1, table, "c7", DataType.TEXT); + admin.alterColumnType(namespace1, table, "c8", DataType.TEXT); + admin.alterColumnType(namespace1, table, "c9", DataType.TEXT); + admin.alterColumnType(namespace1, table, "c10", DataType.TEXT); if (isTimestampTypeSupported()) { - admin.alterColumnType(namespace1, TABLE4, "c11", DataType.TEXT); - admin.alterColumnType(namespace1, TABLE4, "c12", DataType.TEXT); + admin.alterColumnType(namespace1, table, "c11", DataType.TEXT); + admin.alterColumnType(namespace1, table, "c12", DataType.TEXT); } // Assert @@ -1261,17 +1266,20 @@ public void renameColumn_ForIndexKeyColumn_ShouldRenameColumnAndIndexCorrectly() .addColumn("c12", DataType.TEXT); } TableMetadata expectedTableMetadata = expectedTableMetadataBuilder.build(); - assertThat(admin.getTableMetadata(namespace1, TABLE4)).isEqualTo(expectedTableMetadata); + assertThat(admin.getTableMetadata(namespace1, table)).isEqualTo(expectedTableMetadata); } finally { - admin.dropTable(namespace1, TABLE4, true); + admin.dropTable(namespace1, table, true); } } @Test public void alterColumnType_WideningConversion_ShouldAlterColumnTypesCorrectly() throws ExecutionException, IOException, TransactionException { - try (DistributedTransactionManager transactionManager = - transactionFactory.getTransactionManager()) { + // Use a separate table name to avoid hitting the stale cache, which can cause test failure when + // executing DMLs + String table = "table_for_alter_3"; + + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -1283,23 +1291,23 @@ public void alterColumnType_WideningConversion_ShouldAlterColumnTypesCorrectly() .addPartitionKey("c1") .addClusteringKey("c2", Scan.Ordering.Order.ASC); TableMetadata currentTableMetadata = currentTableMetadataBuilder.build(); - admin.createTable(namespace1, TABLE4, currentTableMetadata, options); + admin.createTable(namespace1, table, currentTableMetadata, options); int expectedColumn3Value = 1; float expectedColumn4Value = 4.0f; InsertBuilder.Buildable insert = Insert.newBuilder() .namespace(namespace1) - .table(TABLE4) + .table(table) .partitionKey(Key.ofInt("c1", 1)) .clusteringKey(Key.ofInt("c2", 2)) .intValue("c3", expectedColumn3Value) .floatValue("c4", expectedColumn4Value); - transactionalInsert(transactionManager, insert.build()); + transactionalInsert(manager, insert.build()); // Act - admin.alterColumnType(namespace1, TABLE4, "c3", DataType.BIGINT); - admin.alterColumnType(namespace1, TABLE4, "c4", DataType.DOUBLE); + admin.alterColumnType(namespace1, table, "c3", DataType.BIGINT); + admin.alterColumnType(namespace1, table, "c4", DataType.DOUBLE); // Wait for cache expiry Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); @@ -1314,20 +1322,20 @@ public void alterColumnType_WideningConversion_ShouldAlterColumnTypesCorrectly() .addPartitionKey("c1") .addClusteringKey("c2", Scan.Ordering.Order.ASC); TableMetadata expectedTableMetadata = expectedTableMetadataBuilder.build(); - assertThat(admin.getTableMetadata(namespace1, TABLE4)).isEqualTo(expectedTableMetadata); + assertThat(admin.getTableMetadata(namespace1, table)).isEqualTo(expectedTableMetadata); Scan scan = Scan.newBuilder() .namespace(namespace1) - .table(TABLE4) + .table(table) .partitionKey(Key.ofInt("c1", 1)) .build(); - List results = transactionalScan(transactionManager, scan); + List results = transactionalScan(manager, scan); assertThat(results).hasSize(1); Result result = results.get(0); assertThat(result.getBigInt("c3")).isEqualTo(expectedColumn3Value); assertThat(result.getDouble("c4")).isEqualTo(expectedColumn4Value); } finally { - admin.dropTable(namespace1, TABLE4, true); + admin.dropTable(namespace1, table, true); } } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitAdminIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitAdminIntegrationTestBase.java index 133141e476..ee46d22a6a 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitAdminIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitAdminIntegrationTestBase.java @@ -1,5 +1,6 @@ package com.scalar.db.transaction.consensuscommit; +import com.google.common.util.concurrent.Uninterruptibles; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionAdminIntegrationTestBase; import com.scalar.db.api.DistributedTransactionManager; @@ -9,6 +10,7 @@ import com.scalar.db.exception.transaction.TransactionException; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; public abstract class ConsensusCommitAdminIntegrationTestBase extends DistributedTransactionAdminIntegrationTestBase { @@ -34,6 +36,9 @@ protected final Properties getProperties(String testName) { @Override protected void transactionalInsert(DistributedTransactionManager manager, Insert insert) throws TransactionException { + // Wait for cache expiry + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + DistributedTransaction transaction = manager.start(); transaction.insert(insert); transaction.commit(); @@ -42,6 +47,9 @@ protected void transactionalInsert(DistributedTransactionManager manager, Insert @Override protected List transactionalScan(DistributedTransactionManager manager, Scan scan) throws TransactionException { + // Wait for cache expiry + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + DistributedTransaction transaction = manager.start(); List results = transaction.scan(scan); transaction.commit(); diff --git a/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionAdminIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionAdminIntegrationTestBase.java index 80e3c2f66b..9ef142c4e4 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionAdminIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionAdminIntegrationTestBase.java @@ -1,5 +1,6 @@ package com.scalar.db.transaction.singlecrudoperation; +import com.google.common.util.concurrent.Uninterruptibles; import com.scalar.db.api.DistributedTransactionAdminIntegrationTestBase; import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Insert; @@ -11,6 +12,7 @@ import com.scalar.db.util.AdminTestUtils; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -101,12 +103,18 @@ protected AdminTestUtils getAdminTestUtils(String testName) { @Override protected void transactionalInsert(DistributedTransactionManager manager, Insert insert) throws TransactionException { + // Wait for cache expiry + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + manager.insert(insert); } @Override protected List transactionalScan(DistributedTransactionManager manager, Scan scan) throws TransactionException { + // Wait for cache expiry + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + return manager.scan(scan); } } From f3f57268beb081332fc2c3bb86ee5eb1046b1531 Mon Sep 17 00:00:00 2001 From: Kodai Doki Date: Wed, 15 Oct 2025 11:17:37 +0900 Subject: [PATCH 2/2] Fix to use a separate manager instance for DMLs --- ...tAdminIntegrationTestWithJdbcDatabase.java | 17 ++++----- ...nAdminIntegrationTestWithJdbcDatabase.java | 11 +++--- .../JdbcTransactionAdminIntegrationTest.java | 37 ++++++++++--------- ...edTransactionAdminIntegrationTestBase.java | 36 ++++++++---------- ...nsensusCommitAdminIntegrationTestBase.java | 24 ++++++------ ...onTransactionAdminIntegrationTestBase.java | 14 ++++--- 6 files changed, 68 insertions(+), 71 deletions(-) diff --git a/core/src/integration-test/java/com/scalar/db/storage/jdbc/ConsensusCommitAdminIntegrationTestWithJdbcDatabase.java b/core/src/integration-test/java/com/scalar/db/storage/jdbc/ConsensusCommitAdminIntegrationTestWithJdbcDatabase.java index 0f72aef95c..37c0ef9de2 100644 --- a/core/src/integration-test/java/com/scalar/db/storage/jdbc/ConsensusCommitAdminIntegrationTestWithJdbcDatabase.java +++ b/core/src/integration-test/java/com/scalar/db/storage/jdbc/ConsensusCommitAdminIntegrationTestWithJdbcDatabase.java @@ -5,7 +5,6 @@ import static org.assertj.core.api.Assertions.catchThrowable; import com.google.common.util.concurrent.Uninterruptibles; -import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Insert; import com.scalar.db.api.InsertBuilder; import com.scalar.db.api.Result; @@ -147,8 +146,7 @@ public void renameColumn_Db2_ForPrimaryOrIndexKeyColumn_ShouldThrowUnsupportedOp public void alterColumnType_Oracle_AlterColumnTypeFromEachExistingDataTypeToText_ShouldThrowUnsupportedOperationException() throws ExecutionException, TransactionException { - try (DistributedTransactionManager transactionManager = - transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -190,7 +188,7 @@ public void renameColumn_Db2_ForPrimaryOrIndexKeyColumn_ShouldThrowUnsupportedOp insert.timestampValue("c11", LocalDateTime.now(ZoneOffset.UTC)); insert.timestampTZValue("c12", Instant.now()); } - transactionalInsert(transactionManager, insert.build()); + transactionalInsert(insert.build()); // Act Assert assertThatCode(() -> admin.alterColumnType(namespace1, TABLE4, "c3", DataType.TEXT)) @@ -225,8 +223,7 @@ public void renameColumn_Db2_ForPrimaryOrIndexKeyColumn_ShouldThrowUnsupportedOp public void alterColumnType_Db2_AlterColumnTypeFromEachExistingDataTypeToText_ShouldAlterColumnTypesCorrectlyIfSupported() throws ExecutionException, TransactionException { - try (DistributedTransactionManager transactionManager = - transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -268,7 +265,7 @@ public void renameColumn_Db2_ForPrimaryOrIndexKeyColumn_ShouldThrowUnsupportedOp insert.timestampValue("c11", LocalDateTime.now(ZoneOffset.UTC)); insert.timestampTZValue("c12", Instant.now()); } - transactionalInsert(transactionManager, insert.build()); + transactionalInsert(insert.build()); // Act Assert assertThatCode(() -> admin.alterColumnType(namespace1, TABLE4, "c3", DataType.TEXT)) @@ -332,7 +329,7 @@ public void alterColumnType_WideningConversion_ShouldAlterColumnTypesCorrectly() @EnabledIf("isOracle") public void alterColumnType_Oracle_WideningConversion_ShouldAlterColumnTypesCorrectly() throws ExecutionException, TransactionException { - try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -356,7 +353,7 @@ public void alterColumnType_Oracle_WideningConversion_ShouldAlterColumnTypesCorr .clusteringKey(Key.ofInt("c2", 2)) .intValue("c3", expectedColumn3Value) .floatValue("c4", expectedColumn4Value); - transactionalInsert(manager, insert.build()); + transactionalInsert(insert.build()); // Act admin.alterColumnType(namespace1, TABLE4, "c3", DataType.BIGINT); @@ -385,7 +382,7 @@ public void alterColumnType_Oracle_WideningConversion_ShouldAlterColumnTypesCorr .table(TABLE4) .partitionKey(Key.ofInt("c1", 1)) .build(); - List results = transactionalScan(manager, scan); + List results = transactionalScan(scan); assertThat(results).hasSize(1); Result result = results.get(0); assertThat(result.getBigInt("c3")).isEqualTo(expectedColumn3Value); diff --git a/core/src/integration-test/java/com/scalar/db/storage/jdbc/SingleCrudOperationTransactionAdminIntegrationTestWithJdbcDatabase.java b/core/src/integration-test/java/com/scalar/db/storage/jdbc/SingleCrudOperationTransactionAdminIntegrationTestWithJdbcDatabase.java index f5669f0953..859fe2a57b 100644 --- a/core/src/integration-test/java/com/scalar/db/storage/jdbc/SingleCrudOperationTransactionAdminIntegrationTestWithJdbcDatabase.java +++ b/core/src/integration-test/java/com/scalar/db/storage/jdbc/SingleCrudOperationTransactionAdminIntegrationTestWithJdbcDatabase.java @@ -147,8 +147,7 @@ public void renameColumn_Db2_ForPrimaryOrIndexKeyColumn_ShouldThrowUnsupportedOp public void alterColumnType_Oracle_AlterColumnTypeFromEachExistingDataTypeToText_ShouldThrowUnsupportedOperationException() throws ExecutionException, TransactionException { - try (DistributedTransactionManager transactionManager = - transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -190,7 +189,7 @@ public void renameColumn_Db2_ForPrimaryOrIndexKeyColumn_ShouldThrowUnsupportedOp insert.timestampValue("c11", LocalDateTime.now(ZoneOffset.UTC)); insert.timestampTZValue("c12", Instant.now()); } - transactionalInsert(transactionManager, insert.build()); + transactionalInsert(insert.build()); // Act Assert assertThatCode(() -> admin.alterColumnType(namespace1, TABLE4, "c3", DataType.TEXT)) @@ -332,7 +331,7 @@ public void alterColumnType_WideningConversion_ShouldAlterColumnTypesCorrectly() @EnabledIf("isOracle") public void alterColumnType_Oracle_WideningConversion_ShouldAlterColumnTypesCorrectly() throws ExecutionException, TransactionException { - try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -356,7 +355,7 @@ public void alterColumnType_Oracle_WideningConversion_ShouldAlterColumnTypesCorr .clusteringKey(Key.ofInt("c2", 2)) .intValue("c3", expectedColumn3Value) .floatValue("c4", expectedColumn4Value); - transactionalInsert(manager, insert.build()); + transactionalInsert(insert.build()); // Act admin.alterColumnType(namespace1, TABLE4, "c3", DataType.BIGINT); @@ -385,7 +384,7 @@ public void alterColumnType_Oracle_WideningConversion_ShouldAlterColumnTypesCorr .table(TABLE4) .partitionKey(Key.ofInt("c1", 1)) .build(); - List results = transactionalScan(manager, scan); + List results = transactionalScan(scan); assertThat(results).hasSize(1); Result result = results.get(0); assertThat(result.getBigInt("c3")).isEqualTo(expectedColumn3Value); diff --git a/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionAdminIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionAdminIntegrationTest.java index 299c520110..7ed7fe9e1f 100644 --- a/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionAdminIntegrationTest.java +++ b/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionAdminIntegrationTest.java @@ -219,8 +219,7 @@ public void renameColumn_Db2_ForPrimaryOrIndexKeyColumn_ShouldThrowUnsupportedOp public void alterColumnType_Oracle_AlterColumnTypeFromEachExistingDataTypeToText_ShouldThrowUnsupportedOperationException() throws ExecutionException, TransactionException { - try (DistributedTransactionManager transactionManager = - transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -262,7 +261,7 @@ public void renameColumn_Db2_ForPrimaryOrIndexKeyColumn_ShouldThrowUnsupportedOp insert.timestampValue("c11", LocalDateTime.now(ZoneOffset.UTC)); insert.timestampTZValue("c12", Instant.now()); } - transactionalInsert(transactionManager, insert.build()); + transactionalInsert(insert.build()); // Act Assert assertThatCode(() -> admin.alterColumnType(namespace1, TABLE4, "c3", DataType.TEXT)) @@ -340,7 +339,7 @@ public void renameColumn_Db2_ForPrimaryOrIndexKeyColumn_ShouldThrowUnsupportedOp insert.timestampValue("c11", LocalDateTime.now(ZoneOffset.UTC)); insert.timestampTZValue("c12", Instant.now()); } - transactionalInsert(transactionManager, insert.build()); + transactionalInsert(insert.build()); // Act Assert assertThatCode(() -> admin.alterColumnType(namespace1, TABLE4, "c3", DataType.TEXT)) @@ -404,7 +403,7 @@ public void alterColumnType_WideningConversion_ShouldAlterColumnTypesCorrectly() @EnabledIf("isOracle") public void alterColumnType_Oracle_WideningConversion_ShouldAlterColumnTypesCorrectly() throws ExecutionException, TransactionException { - try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -428,7 +427,7 @@ public void alterColumnType_Oracle_WideningConversion_ShouldAlterColumnTypesCorr .clusteringKey(Key.ofInt("c2", 2)) .intValue("c3", expectedColumn3Value) .floatValue("c4", expectedColumn4Value); - transactionalInsert(manager, insert.build()); + transactionalInsert(insert.build()); // Act admin.alterColumnType(namespace1, TABLE4, "c3", DataType.BIGINT); @@ -457,7 +456,7 @@ public void alterColumnType_Oracle_WideningConversion_ShouldAlterColumnTypesCorr .table(TABLE4) .partitionKey(Key.ofInt("c1", 1)) .build(); - List results = transactionalScan(manager, scan); + List results = transactionalScan(scan); assertThat(results).hasSize(1); Result result = results.get(0); assertThat(result.getBigInt("c3")).isEqualTo(expectedColumn3Value); @@ -497,25 +496,27 @@ protected boolean isIndexOnBlobColumnSupported() { } @Override - protected void transactionalInsert(DistributedTransactionManager manager, Insert insert) - throws TransactionException { + protected void transactionalInsert(Insert insert) throws TransactionException { // Wait for cache expiry Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - DistributedTransaction transaction = manager.start(); - transaction.insert(insert); - transaction.commit(); + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + DistributedTransaction transaction = manager.start(); + transaction.insert(insert); + transaction.commit(); + } } @Override - protected List transactionalScan(DistributedTransactionManager manager, Scan scan) - throws TransactionException { + protected List transactionalScan(Scan scan) throws TransactionException { // Wait for cache expiry Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - DistributedTransaction transaction = manager.start(); - List results = transaction.scan(scan); - transaction.commit(); - return results; + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + DistributedTransaction transaction = manager.start(); + List results = transaction.scan(scan); + transaction.commit(); + return results; + } } } diff --git a/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionAdminIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionAdminIntegrationTestBase.java index 815810e417..a255c2aa52 100644 --- a/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionAdminIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionAdminIntegrationTestBase.java @@ -430,14 +430,13 @@ public void truncateTable_ShouldTruncateProperly() // executing DMLs String table = "table_for_truncate"; - try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); admin.createTable(namespace1, table, TABLE_METADATA, true, options); Key partitionKey = Key.of(COL_NAME2, "aaa", COL_NAME1, 1); Key clusteringKey = Key.of(COL_NAME4, 2, COL_NAME3, "bbb"); transactionalInsert( - manager, Insert.newBuilder() .namespace(namespace1) .table(table) @@ -458,7 +457,6 @@ public void truncateTable_ShouldTruncateProperly() // Assert List results = transactionalScan( - manager, Scan.newBuilder() .namespace(namespace1) .table(table) @@ -518,7 +516,7 @@ public void createIndex_ForAllDataTypesWithExistingData_ShouldCreateIndexesCorre // executing DMLs String table = "table_for_create_index"; - try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder metadataBuilder = @@ -566,7 +564,7 @@ public void createIndex_ForAllDataTypesWithExistingData_ShouldCreateIndexesCorre COL_NAME13, LocalDateTime.of(LocalDate.of(2020, 6, 2), LocalTime.of(12, 2, 6, 123_000_000))); } - transactionalInsert(manager, insert.build()); + transactionalInsert(insert.build()); // Act admin.createIndex(namespace1, table, COL_NAME2, options); @@ -633,7 +631,6 @@ public void createIndex_ForAllDataTypesWithExistingData_ShouldCreateIndexesCorre indexCount += 1; } assertThat(actualSecondaryIndexNames).hasSize(indexCount); - } finally { admin.dropTable(namespace1, table, true); } @@ -712,9 +709,9 @@ public void dropIndex_ForAllDataTypesWithExistingData_ShouldDropIndexCorrectly() throws Exception { // Use a separate table name to avoid hitting the stale cache, which can cause test failure when // executing DMLs - String table = "table_for_alter_1"; + String table = "table_for_drop_index"; - try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder metadataBuilder = @@ -753,6 +750,7 @@ public void dropIndex_ForAllDataTypesWithExistingData_ShouldDropIndexCorrectly() metadataBuilder.addSecondaryIndex(COL_NAME13); } admin.createTable(namespace1, table, metadataBuilder.build(), options); + InsertBuilder.Buildable insert = Insert.newBuilder() .namespace(namespace1) @@ -776,7 +774,7 @@ public void dropIndex_ForAllDataTypesWithExistingData_ShouldDropIndexCorrectly() COL_NAME13, LocalDateTime.of(LocalDate.of(2020, 6, 2), LocalTime.of(12, 2, 6, 123_000_000))); } - transactionalInsert(manager, insert.build()); + transactionalInsert(insert.build()); // Act admin.dropIndex(namespace1, table, COL_NAME2); @@ -1185,9 +1183,9 @@ public void renameColumn_ForIndexKeyColumn_ShouldRenameColumnAndIndexCorrectly() throws ExecutionException, IOException, TransactionException { // Use a separate table name to avoid hitting the stale cache, which can cause test failure when // executing DMLs - String table = "table_for_alter_2"; + String table = "table_for_alter_1"; - try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -1229,7 +1227,7 @@ public void renameColumn_ForIndexKeyColumn_ShouldRenameColumnAndIndexCorrectly() insert.timestampValue("c11", LocalDateTime.now(ZoneOffset.UTC)); insert.timestampTZValue("c12", Instant.now()); } - transactionalInsert(manager, insert.build()); + transactionalInsert(insert.build()); // Act admin.alterColumnType(namespace1, table, "c3", DataType.TEXT); @@ -1277,9 +1275,9 @@ public void alterColumnType_WideningConversion_ShouldAlterColumnTypesCorrectly() throws ExecutionException, IOException, TransactionException { // Use a separate table name to avoid hitting the stale cache, which can cause test failure when // executing DMLs - String table = "table_for_alter_3"; + String table = "table_for_alter_2"; - try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + try { // Arrange Map options = getCreationOptions(); TableMetadata.Builder currentTableMetadataBuilder = @@ -1303,7 +1301,7 @@ public void alterColumnType_WideningConversion_ShouldAlterColumnTypesCorrectly() .clusteringKey(Key.ofInt("c2", 2)) .intValue("c3", expectedColumn3Value) .floatValue("c4", expectedColumn4Value); - transactionalInsert(manager, insert.build()); + transactionalInsert(insert.build()); // Act admin.alterColumnType(namespace1, table, "c3", DataType.BIGINT); @@ -1329,7 +1327,7 @@ public void alterColumnType_WideningConversion_ShouldAlterColumnTypesCorrectly() .table(table) .partitionKey(Key.ofInt("c1", 1)) .build(); - List results = transactionalScan(manager, scan); + List results = transactionalScan(scan); assertThat(results).hasSize(1); Result result = results.get(0); assertThat(result.getBigInt("c3")).isEqualTo(expectedColumn3Value); @@ -1592,9 +1590,7 @@ protected boolean isCreateIndexOnTextColumnEnabled() { return true; } - protected abstract void transactionalInsert(DistributedTransactionManager manager, Insert insert) - throws TransactionException; + protected abstract void transactionalInsert(Insert insert) throws TransactionException; - protected abstract List transactionalScan( - DistributedTransactionManager manager, Scan scan) throws TransactionException; + protected abstract List transactionalScan(Scan scan) throws TransactionException; } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitAdminIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitAdminIntegrationTestBase.java index ee46d22a6a..f5c9d3c32b 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitAdminIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitAdminIntegrationTestBase.java @@ -34,25 +34,27 @@ protected final Properties getProperties(String testName) { protected abstract Properties getProps(String testName); @Override - protected void transactionalInsert(DistributedTransactionManager manager, Insert insert) - throws TransactionException { + protected void transactionalInsert(Insert insert) throws TransactionException { // Wait for cache expiry Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - DistributedTransaction transaction = manager.start(); - transaction.insert(insert); - transaction.commit(); + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + DistributedTransaction transaction = manager.start(); + transaction.insert(insert); + transaction.commit(); + } } @Override - protected List transactionalScan(DistributedTransactionManager manager, Scan scan) - throws TransactionException { + protected List transactionalScan(Scan scan) throws TransactionException { // Wait for cache expiry Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - DistributedTransaction transaction = manager.start(); - List results = transaction.scan(scan); - transaction.commit(); - return results; + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + DistributedTransaction transaction = manager.start(); + List results = transaction.scan(scan); + transaction.commit(); + return results; + } } } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionAdminIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionAdminIntegrationTestBase.java index 9ef142c4e4..0493a7d7e1 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionAdminIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionAdminIntegrationTestBase.java @@ -101,20 +101,22 @@ protected AdminTestUtils getAdminTestUtils(String testName) { } @Override - protected void transactionalInsert(DistributedTransactionManager manager, Insert insert) - throws TransactionException { + protected void transactionalInsert(Insert insert) throws TransactionException { // Wait for cache expiry Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - manager.insert(insert); + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + manager.insert(insert); + } } @Override - protected List transactionalScan(DistributedTransactionManager manager, Scan scan) - throws TransactionException { + protected List transactionalScan(Scan scan) throws TransactionException { // Wait for cache expiry Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - return manager.scan(scan); + try (DistributedTransactionManager manager = transactionFactory.getTransactionManager()) { + return manager.scan(scan); + } } }