From 2324b551f81e2af6e687688f1cae937fe103c794 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Mon, 26 May 2025 16:40:59 +0530 Subject: [PATCH 1/5] data chunk id counter value reset --- .../core/dataimport/processor/CsvImportProcessor.java | 4 ++++ .../core/dataimport/processor/JsonImportProcessor.java | 4 ++++ .../core/dataimport/processor/JsonLinesImportProcessor.java | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java index 0c68d5e566..84e0d084d4 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java @@ -66,6 +66,10 @@ public CsvImportProcessor(ImportProcessorParams params) { @Override public ConcurrentHashMap process( int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + // Reset the counter to 0 before starting a new import process. + // Since the JVM is not restarted between API calls (as in a web application’s API server), + // failing to reset the counter would cause the next import to continue from the previous data chunk ID. + dataChunkIdCounter.set(0); ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); BlockingQueue dataChunkQueue = new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java index 733a5afa96..51f9799fd8 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java @@ -65,6 +65,10 @@ public JsonImportProcessor(ImportProcessorParams params) { @Override public ConcurrentHashMap process( int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + // Reset the counter to 0 before starting a new import process. + // Since the JVM is not restarted between API calls (as in a web application’s API server), + // failing to reset the counter would cause the next import to continue from the previous data chunk ID. + dataChunkIdCounter.set(0); ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); BlockingQueue dataChunkQueue = new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java index a121a106a5..d247d2e79d 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java @@ -64,6 +64,10 @@ public JsonLinesImportProcessor(ImportProcessorParams params) { @Override public ConcurrentHashMap process( int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + // Reset the counter to 0 before starting a new import process. + // Since the JVM is not restarted between API calls (as in a web application’s API server), + // failing to reset the counter would cause the next import to continue from the previous data chunk ID. + dataChunkIdCounter.set(0); ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); BlockingQueue dataChunkQueue = new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); From 530ba82ffa796b34304e0922cfe3fc03b0bfcabb Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Mon, 26 May 2025 17:00:35 +0530 Subject: [PATCH 2/5] spotless applied --- .../core/dataimport/processor/CsvImportProcessor.java | 3 ++- .../core/dataimport/processor/JsonImportProcessor.java | 3 ++- .../core/dataimport/processor/JsonLinesImportProcessor.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java index 84e0d084d4..2909cfbb67 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java @@ -68,7 +68,8 @@ public ConcurrentHashMap process( int dataChunkSize, int transactionBatchSize, BufferedReader reader) { // Reset the counter to 0 before starting a new import process. // Since the JVM is not restarted between API calls (as in a web application’s API server), - // failing to reset the counter would cause the next import to continue from the previous data chunk ID. + // failing to reset the counter would cause the next import to continue from the previous data + // chunk ID. dataChunkIdCounter.set(0); ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); BlockingQueue dataChunkQueue = diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java index 51f9799fd8..ad6a69082d 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java @@ -67,7 +67,8 @@ public ConcurrentHashMap process( int dataChunkSize, int transactionBatchSize, BufferedReader reader) { // Reset the counter to 0 before starting a new import process. // Since the JVM is not restarted between API calls (as in a web application’s API server), - // failing to reset the counter would cause the next import to continue from the previous data chunk ID. + // failing to reset the counter would cause the next import to continue from the previous data + // chunk ID. dataChunkIdCounter.set(0); ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); BlockingQueue dataChunkQueue = diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java index d247d2e79d..e1ce9b8f50 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java @@ -66,7 +66,8 @@ public ConcurrentHashMap process( int dataChunkSize, int transactionBatchSize, BufferedReader reader) { // Reset the counter to 0 before starting a new import process. // Since the JVM is not restarted between API calls (as in a web application’s API server), - // failing to reset the counter would cause the next import to continue from the previous data chunk ID. + // failing to reset the counter would cause the next import to continue from the previous data + // chunk ID. dataChunkIdCounter.set(0); ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); BlockingQueue dataChunkQueue = From fd5cece525d212cc8341c418d365e81b3b1cbd5b Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 28 May 2025 14:34:01 +0530 Subject: [PATCH 3/5] Unit tests updated --- .../processor/CsvImportProcessorTest.java | 44 +++++++++++++++++++ .../processor/JsonImportProcessorTest.java | 42 ++++++++++++++++++ .../JsonLinesImportProcessorTest.java | 42 ++++++++++++++++++ 3 files changed, 128 insertions(+) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java index ff57e42bac..45f47894a5 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java @@ -115,4 +115,48 @@ void test_importProcessWithTransaction() { assert statusList != null; Assertions.assertEquals(1, statusList.size()); } + + @Test + void test_importProcessWithStorage_runTwice_CheckDataChunkId() { + params = + ImportProcessorParams.builder() + .scalarDbMode(ScalarDbMode.STORAGE) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + csvImportProcessor = new CsvImportProcessor(params); + Map statusList = + csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); + Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); + + Map statusList2 = + csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); + Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); + } + + @Test + void test_importProcessWithTransaction_runTwice_CheckDataChunkId() { + params = + ImportProcessorParams.builder() + .scalarDbMode(ScalarDbMode.TRANSACTION) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + csvImportProcessor = new CsvImportProcessor(params); + Map statusList = + csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); + Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); + csvImportProcessor = new CsvImportProcessor(params); + Map statusList2 = + csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); + Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); + } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java index 44c57874c2..a5b664246d 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java @@ -115,4 +115,46 @@ void test_importProcessWithTransaction() { assert statusList != null; Assertions.assertEquals(1, statusList.size()); } + + @Test + void test_importProcessWithStorage_runTwice_CheckDataChunkId() { + params = + ImportProcessorParams.builder() + .scalarDbMode(ScalarDbMode.STORAGE) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + jsonImportProcessor = new JsonImportProcessor(params); + Map statusList = + jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); + Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); + Map statusList2 = + jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); + Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); + } + + @Test + void test_importProcessWithTransaction_runTwice_CheckDataChunkId() { + params = + ImportProcessorParams.builder() + .scalarDbMode(ScalarDbMode.TRANSACTION) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + jsonImportProcessor = new JsonImportProcessor(params); + Map statusList = + jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); + Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); + Map statusList2 = + jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); + Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); + } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java index 4c0e755aac..417f62dc3a 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java @@ -115,4 +115,46 @@ void test_importProcessWithTransaction() { assert statusList != null; Assertions.assertEquals(1, statusList.size()); } + + @Test + void test_importProcessWithStorage_runTwice_CheckDataChunkId() { + params = + ImportProcessorParams.builder() + .scalarDbMode(ScalarDbMode.STORAGE) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + jsonLinesImportProcessor = new JsonLinesImportProcessor(params); + Map statusList = + jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); + Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); + Map statusList2 = + jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); + Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); + } + + @Test + void test_importProcessWithTransaction_runTwice_CheckDataChunkId() { + params = + ImportProcessorParams.builder() + .scalarDbMode(ScalarDbMode.TRANSACTION) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + jsonLinesImportProcessor = new JsonLinesImportProcessor(params); + Map statusList = + jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); + Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); + Map statusList2 = + jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); + Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); + } } From baa5a26bae7c3e47ecd6d260ac8bf1e5a1d40e82 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 28 May 2025 15:31:20 +0530 Subject: [PATCH 4/5] Fixed spotbugs issue --- .../core/dataimport/processor/CsvImportProcessorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java index 45f47894a5..a9c5f4916e 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java @@ -135,7 +135,7 @@ void test_importProcessWithStorage_runTwice_CheckDataChunkId() { Map statusList2 = csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); - Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); + Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); } @Test From 9ad1583714514c78bf79755ddfd811f86451bbb3 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Thu, 29 May 2025 11:28:09 +0530 Subject: [PATCH 5/5] Changes --- .../processor/CsvImportProcessor.java | 7 +-- .../processor/JsonImportProcessor.java | 7 +-- .../processor/JsonLinesImportProcessor.java | 7 +-- .../processor/CsvImportProcessorTest.java | 44 ------------------- .../processor/JsonImportProcessorTest.java | 42 ------------------ .../JsonLinesImportProcessorTest.java | 42 ------------------ 6 files changed, 3 insertions(+), 146 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java index 2909cfbb67..f16bb8137b 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java @@ -39,7 +39,7 @@ */ public class CsvImportProcessor extends ImportProcessor { private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); - private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); + private final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); /** * Creates a new CsvImportProcessor with the specified parameters. @@ -66,11 +66,6 @@ public CsvImportProcessor(ImportProcessorParams params) { @Override public ConcurrentHashMap process( int dataChunkSize, int transactionBatchSize, BufferedReader reader) { - // Reset the counter to 0 before starting a new import process. - // Since the JVM is not restarted between API calls (as in a web application’s API server), - // failing to reset the counter would cause the next import to continue from the previous data - // chunk ID. - dataChunkIdCounter.set(0); ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); BlockingQueue dataChunkQueue = new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java index ad6a69082d..deecf6cce5 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java @@ -43,7 +43,7 @@ public class JsonImportProcessor extends ImportProcessor { private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); - private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); + private final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); public JsonImportProcessor(ImportProcessorParams params) { super(params); @@ -65,11 +65,6 @@ public JsonImportProcessor(ImportProcessorParams params) { @Override public ConcurrentHashMap process( int dataChunkSize, int transactionBatchSize, BufferedReader reader) { - // Reset the counter to 0 before starting a new import process. - // Since the JVM is not restarted between API calls (as in a web application’s API server), - // failing to reset the counter would cause the next import to continue from the previous data - // chunk ID. - dataChunkIdCounter.set(0); ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); BlockingQueue dataChunkQueue = new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java index e1ce9b8f50..3b85e381d3 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java @@ -37,7 +37,7 @@ public class JsonLinesImportProcessor extends ImportProcessor { private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); - private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); + private final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); /** * Creates a new JsonLinesImportProcessor with the specified parameters. @@ -64,11 +64,6 @@ public JsonLinesImportProcessor(ImportProcessorParams params) { @Override public ConcurrentHashMap process( int dataChunkSize, int transactionBatchSize, BufferedReader reader) { - // Reset the counter to 0 before starting a new import process. - // Since the JVM is not restarted between API calls (as in a web application’s API server), - // failing to reset the counter would cause the next import to continue from the previous data - // chunk ID. - dataChunkIdCounter.set(0); ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); BlockingQueue dataChunkQueue = new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java index a9c5f4916e..ff57e42bac 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java @@ -115,48 +115,4 @@ void test_importProcessWithTransaction() { assert statusList != null; Assertions.assertEquals(1, statusList.size()); } - - @Test - void test_importProcessWithStorage_runTwice_CheckDataChunkId() { - params = - ImportProcessorParams.builder() - .scalarDbMode(ScalarDbMode.STORAGE) - .importOptions(importOptions) - .dao(dao) - .distributedStorage(distributedStorage) - .distributedTransactionManager(distributedTransactionManager) - .tableColumnDataTypes(tableColumnDataTypes) - .tableMetadataByTableName(tableMetadataByTableName) - .build(); - csvImportProcessor = new CsvImportProcessor(params); - Map statusList = - csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); - Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); - - Map statusList2 = - csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); - Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); - } - - @Test - void test_importProcessWithTransaction_runTwice_CheckDataChunkId() { - params = - ImportProcessorParams.builder() - .scalarDbMode(ScalarDbMode.TRANSACTION) - .importOptions(importOptions) - .dao(dao) - .distributedStorage(distributedStorage) - .distributedTransactionManager(distributedTransactionManager) - .tableColumnDataTypes(tableColumnDataTypes) - .tableMetadataByTableName(tableMetadataByTableName) - .build(); - csvImportProcessor = new CsvImportProcessor(params); - Map statusList = - csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); - Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); - csvImportProcessor = new CsvImportProcessor(params); - Map statusList2 = - csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); - Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); - } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java index a5b664246d..44c57874c2 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java @@ -115,46 +115,4 @@ void test_importProcessWithTransaction() { assert statusList != null; Assertions.assertEquals(1, statusList.size()); } - - @Test - void test_importProcessWithStorage_runTwice_CheckDataChunkId() { - params = - ImportProcessorParams.builder() - .scalarDbMode(ScalarDbMode.STORAGE) - .importOptions(importOptions) - .dao(dao) - .distributedStorage(distributedStorage) - .distributedTransactionManager(distributedTransactionManager) - .tableColumnDataTypes(tableColumnDataTypes) - .tableMetadataByTableName(tableMetadataByTableName) - .build(); - jsonImportProcessor = new JsonImportProcessor(params); - Map statusList = - jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); - Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); - Map statusList2 = - jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); - Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); - } - - @Test - void test_importProcessWithTransaction_runTwice_CheckDataChunkId() { - params = - ImportProcessorParams.builder() - .scalarDbMode(ScalarDbMode.TRANSACTION) - .importOptions(importOptions) - .dao(dao) - .distributedStorage(distributedStorage) - .distributedTransactionManager(distributedTransactionManager) - .tableColumnDataTypes(tableColumnDataTypes) - .tableMetadataByTableName(tableMetadataByTableName) - .build(); - jsonImportProcessor = new JsonImportProcessor(params); - Map statusList = - jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); - Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); - Map statusList2 = - jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); - Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); - } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java index 417f62dc3a..4c0e755aac 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java @@ -115,46 +115,4 @@ void test_importProcessWithTransaction() { assert statusList != null; Assertions.assertEquals(1, statusList.size()); } - - @Test - void test_importProcessWithStorage_runTwice_CheckDataChunkId() { - params = - ImportProcessorParams.builder() - .scalarDbMode(ScalarDbMode.STORAGE) - .importOptions(importOptions) - .dao(dao) - .distributedStorage(distributedStorage) - .distributedTransactionManager(distributedTransactionManager) - .tableColumnDataTypes(tableColumnDataTypes) - .tableMetadataByTableName(tableMetadataByTableName) - .build(); - jsonLinesImportProcessor = new JsonLinesImportProcessor(params); - Map statusList = - jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); - Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); - Map statusList2 = - jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); - Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); - } - - @Test - void test_importProcessWithTransaction_runTwice_CheckDataChunkId() { - params = - ImportProcessorParams.builder() - .scalarDbMode(ScalarDbMode.TRANSACTION) - .importOptions(importOptions) - .dao(dao) - .distributedStorage(distributedStorage) - .distributedTransactionManager(distributedTransactionManager) - .tableColumnDataTypes(tableColumnDataTypes) - .tableMetadataByTableName(tableMetadataByTableName) - .build(); - jsonLinesImportProcessor = new JsonLinesImportProcessor(params); - Map statusList = - jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); - Assertions.assertEquals(0, statusList.get(0).getDataChunkId()); - Map statusList2 = - jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); - Assertions.assertEquals(0, statusList2.get(0).getDataChunkId()); - } }