importDataChunkStatusMap =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Starts the import process using the configured parameters.
+ *
+ * If the data chunk size in {@link ImportOptions} is set to 0, the entire file will be
+ * processed as a single chunk. Otherwise, the file will be processed in chunks of the specified
+ * size.
+ *
+ * @return a map of {@link ImportDataChunkStatus} objects containing the status of each processed
+ * chunk
+ */
+ public ConcurrentHashMap startImport() {
+ ImportProcessorParams params =
+ ImportProcessorParams.builder()
+ .scalarDBMode(scalarDBMode)
+ .importOptions(importOptions)
+ .tableMetadataByTableName(tableMetadata)
+ .dao(new ScalarDBDao())
+ .distributedTransactionManager(distributedTransactionManager)
+ .distributedStorage(distributedStorage)
+ .tableColumnDataTypes(getTableColumnDataTypes())
+ .build();
+ ImportProcessor processor = importProcessorFactory.createImportProcessor(params);
+ processor.addListener(this);
+ // If the data chunk size is 0, then process the entire file in a single data chunk
+ int dataChunkSize =
+ importOptions.getDataChunkSize() == 0
+ ? Integer.MAX_VALUE
+ : importOptions.getDataChunkSize();
+ return processor.process(
+ dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
+ }
+
+ /**
+ * Registers a new listener to receive import events.
+ *
+ * @param listener the listener to add
+ * @throws IllegalArgumentException if the listener is null
+ */
+ public void addListener(ImportEventListener listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Removes a previously registered listener.
+ *
+ * @param listener the listener to remove
+ */
+ public void removeListener(ImportEventListener listener) {
+ listeners.remove(listener);
+ }
+
+ /** {@inheritDoc} Forwards the event to all registered listeners. */
+ @Override
+ public void onDataChunkStarted(ImportDataChunkStatus status) {
+ for (ImportEventListener listener : listeners) {
+ listener.onDataChunkStarted(status);
+ }
+ }
+
+ /**
+ * {@inheritDoc} Updates or adds the status of a data chunk in the status map. This method is
+ * thread-safe.
+ */
+ @Override
+ public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {
+ importDataChunkStatusMap.put(status.getDataChunkId(), status);
+ }
+
+ /** {@inheritDoc} Forwards the event to all registered listeners. */
+ @Override
+ public void onDataChunkCompleted(ImportDataChunkStatus status) {
+ for (ImportEventListener listener : listeners) {
+ listener.onDataChunkCompleted(status);
+ }
+ }
+
+ /** {@inheritDoc} Forwards the event to all registered listeners. */
+ @Override
+ public void onTransactionBatchStarted(ImportTransactionBatchStatus status) {
+ for (ImportEventListener listener : listeners) {
+ listener.onTransactionBatchStarted(status);
+ }
+ }
+
+ /** {@inheritDoc} Forwards the event to all registered listeners. */
+ @Override
+ public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
+ for (ImportEventListener listener : listeners) {
+ listener.onTransactionBatchCompleted(batchResult);
+ }
+ }
+
+ /** {@inheritDoc} Forwards the event to all registered listeners. */
+ @Override
+ public void onTaskComplete(ImportTaskResult taskResult) {
+ for (ImportEventListener listener : listeners) {
+ listener.onTaskComplete(taskResult);
+ }
+ }
+
+ /** {@inheritDoc} Forwards the event to all registered listeners. */
+ @Override
+ public void onAllDataChunksCompleted() {
+ for (ImportEventListener listener : listeners) {
+ listener.onAllDataChunksCompleted();
+ }
+ }
+
+ /**
+ * Returns the current map of import data chunk status objects.
+ *
+ * @return a map of {@link ImportDataChunkStatus} objects
+ */
+ public ConcurrentHashMap getImportDataChunkStatus() {
+ return importDataChunkStatusMap;
+ }
+
+ /**
+ * Creates and returns a mapping of table column data types from the table metadata.
+ *
+ * @return a {@link TableColumnDataTypes} object containing the column data types for all tables
+ */
+ public TableColumnDataTypes getTableColumnDataTypes() {
+ TableColumnDataTypes tableColumnDataTypes = new TableColumnDataTypes();
+ tableMetadata.forEach(
+ (name, metadata) ->
+ metadata
+ .getColumnDataTypes()
+ .forEach((k, v) -> tableColumnDataTypes.addColumnDataType(name, k, v)));
+ return tableColumnDataTypes;
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java
index 9cb6225d30..6d3206765e 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java
@@ -35,4 +35,5 @@ public class ImportOptions {
private final String tableName;
private final int maxThreads;
private final String customHeaderRow;
+ private final int dataChunkQueueSize;
}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java
new file mode 100644
index 0000000000..0c68d5e566
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java
@@ -0,0 +1,207 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.scalar.db.common.error.CoreError;
+import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A processor for importing CSV data into the database.
+ *
+ * This class handles the processing of CSV files by:
+ *
+ *
+ * - Reading and parsing CSV data with configurable delimiters
+ *
- Processing data in configurable chunk sizes for efficient batch processing
+ *
- Supporting parallel processing using multiple threads
+ *
- Converting CSV rows into JSON format for database import
+ *
+ *
+ * The processor supports custom headers and validates that each data row matches the header
+ * structure before processing.
+ */
+public class CsvImportProcessor extends ImportProcessor {
+ private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
+ private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
+
+ /**
+ * Creates a new CsvImportProcessor with the specified parameters.
+ *
+ * @param params Configuration parameters for the import processor
+ */
+ public CsvImportProcessor(ImportProcessorParams params) {
+ super(params);
+ }
+
+ /**
+ * Processes the source data from the given import file.
+ *
+ *
This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
+ * batches transactions according to the specified sizes. The method returns a list of {@link
+ * ImportDataChunkStatus} objects, each representing the status of a processed data chunk.
+ *
+ * @param dataChunkSize the number of records to include in each data chunk
+ * @param transactionBatchSize the number of records to include in each transaction batch
+ * @param reader the {@link BufferedReader} used to read the source file
+ * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each
+ * data chunk
+ */
+ @Override
+ public ConcurrentHashMap process(
+ int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
+ ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor();
+ BlockingQueue dataChunkQueue =
+ new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize());
+
+ try {
+ CompletableFuture readerFuture =
+ CompletableFuture.runAsync(
+ () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor);
+
+ ConcurrentHashMap result = new ConcurrentHashMap<>();
+
+ while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) {
+ ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS);
+ if (dataChunk != null) {
+ ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize);
+ result.put(status.getDataChunkId(), status);
+ }
+ }
+ readerFuture.join();
+ return result;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e);
+ } finally {
+ dataChunkExecutor.shutdown();
+ try {
+ if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ dataChunkExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ dataChunkExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ notifyAllDataChunksCompleted();
+ }
+ }
+
+ /**
+ * Reads and processes CSV data in chunks from the provided reader.
+ *
+ * This method:
+ *
+ *
+ * - Reads the CSV header (custom or from file)
+ *
- Validates each data row against the header
+ *
- Converts rows to JSON format
+ *
- Batches rows into data chunks
+ *
- Enqueues chunks for processing
+ *
+ *
+ * @param reader the BufferedReader containing CSV data
+ * @param dataChunkSize the number of rows to include in each chunk
+ * @param dataChunkQueue the queue where data chunks are placed for processing
+ * @throws RuntimeException if there are errors reading the file or if interrupted
+ */
+ private void readDataChunks(
+ BufferedReader reader, int dataChunkSize, BlockingQueue dataChunkQueue) {
+ try {
+ String delimiter =
+ Optional.of(params.getImportOptions().getDelimiter())
+ .map(c -> Character.toString(c).trim())
+ .filter(s -> !s.isEmpty())
+ .orElse(",");
+
+ String header =
+ Optional.ofNullable(params.getImportOptions().getCustomHeaderRow())
+ .orElseGet(() -> safeReadLine(reader));
+
+ String[] headerArray = header.split(delimiter);
+ List currentDataChunk = new ArrayList<>();
+ String line;
+ int rowNumber = 1;
+ while ((line = reader.readLine()) != null) {
+ String[] dataArray = line.split(delimiter);
+ if (headerArray.length != dataArray.length) {
+ throw new IllegalArgumentException(
+ CoreError.DATA_LOADER_CSV_DATA_MISMATCH.buildMessage(line, header));
+ }
+ JsonNode jsonNode = combineHeaderAndData(headerArray, dataArray);
+ if (jsonNode.isEmpty()) continue;
+
+ currentDataChunk.add(new ImportRow(rowNumber++, jsonNode));
+ if (currentDataChunk.size() == dataChunkSize) {
+ enqueueDataChunk(currentDataChunk, dataChunkQueue);
+ currentDataChunk = new ArrayList<>();
+ }
+ }
+ if (!currentDataChunk.isEmpty()) enqueueDataChunk(currentDataChunk, dataChunkQueue);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(
+ CoreError.DATA_LOADER_CSV_FILE_READ_FAILED.buildMessage(e.getMessage()), e);
+ }
+ }
+
+ /**
+ * Adds a completed data chunk to the processing queue.
+ *
+ * @param dataChunk the list of ImportRows to be processed
+ * @param queue the queue where the chunk should be placed
+ * @throws InterruptedException if the thread is interrupted while waiting to add to the queue
+ */
+ private void enqueueDataChunk(List dataChunk, BlockingQueue queue)
+ throws InterruptedException {
+ int dataChunkId = dataChunkIdCounter.getAndIncrement();
+ queue.put(ImportDataChunk.builder().dataChunkId(dataChunkId).sourceData(dataChunk).build());
+ }
+
+ /**
+ * Safely reads a line from the BufferedReader, handling IOExceptions.
+ *
+ * @param reader the BufferedReader to read from
+ * @return the line read from the reader
+ * @throws UncheckedIOException if an IOException occurs while reading
+ */
+ private String safeReadLine(BufferedReader reader) {
+ try {
+ return reader.readLine();
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ CoreError.DATA_LOADER_CSV_FILE_HEADER_READ_FAILED.buildMessage(e.getMessage()), e);
+ }
+ }
+
+ /**
+ * Combines header fields with data values to create a JSON object.
+ *
+ * @param header array of header field names
+ * @param data array of data values corresponding to the header fields
+ * @return a JsonNode containing the combined header-value pairs
+ */
+ private JsonNode combineHeaderAndData(String[] header, String[] data) {
+ ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
+ for (int i = 0; i < header.length; i++) {
+ objectNode.put(header[i], data[i]);
+ }
+ return objectNode;
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactory.java
new file mode 100644
index 0000000000..d40222d9a7
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactory.java
@@ -0,0 +1,47 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import com.scalar.db.common.error.CoreError;
+
+/**
+ * A factory class that creates appropriate ImportProcessor instances based on the input file
+ * format. This factory implements the ImportProcessorFactory interface and provides a default
+ * implementation for creating processors that handle different file formats (JSON, JSONL, CSV).
+ */
+public class DefaultImportProcessorFactory implements ImportProcessorFactory {
+
+ /**
+ * Creates an appropriate ImportProcessor instance based on the file format specified in the
+ * import parameters.
+ *
+ * @param params ImportProcessorParams containing configuration and import options, including the
+ * file format
+ * @return An ImportProcessor instance configured for the specified file format
+ * @throws IllegalArgumentException if the specified file format is not supported
+ * Supported file formats:
+ *
+ * - JSONL - Creates a JsonLinesImportProcessor for JSON Lines format
+ *
- JSON - Creates a JsonImportProcessor for JSON format
+ *
- CSV - Creates a CsvImportProcessor for CSV format
+ *
+ */
+ @Override
+ public ImportProcessor createImportProcessor(ImportProcessorParams params) {
+ ImportProcessor importProcessor;
+ switch (params.getImportOptions().getFileFormat()) {
+ case JSONL:
+ importProcessor = new JsonLinesImportProcessor(params);
+ break;
+ case JSON:
+ importProcessor = new JsonImportProcessor(params);
+ break;
+ case CSV:
+ importProcessor = new CsvImportProcessor(params);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ CoreError.DATA_LOADER_FILE_FORMAT_NOT_SUPPORTED.buildMessage(
+ params.getImportOptions().getFileFormat().toString()));
+ }
+ return importProcessor;
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java
new file mode 100644
index 0000000000..1a317a1a82
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java
@@ -0,0 +1,453 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import com.scalar.db.api.DistributedTransaction;
+import com.scalar.db.dataloader.core.ScalarDBMode;
+import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatusState;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow;
+import com.scalar.db.dataloader.core.dataimport.task.ImportStorageTask;
+import com.scalar.db.dataloader.core.dataimport.task.ImportTaskParams;
+import com.scalar.db.dataloader.core.dataimport.task.ImportTransactionalTask;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
+import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatch;
+import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
+import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
+import com.scalar.db.exception.transaction.TransactionException;
+import java.io.BufferedReader;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.RequiredArgsConstructor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract class that handles the processing of data imports into ScalarDB. This processor
+ * supports both transactional and non-transactional (storage) modes and provides event notification
+ * capabilities for monitoring the import process.
+ */
+@RequiredArgsConstructor
+public abstract class ImportProcessor {
+
+ final ImportProcessorParams params;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ImportProcessor.class);
+ private final List listeners = new ArrayList<>();
+
+ /**
+ * Processes the source data from the given import file.
+ *
+ * This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
+ * batches transactions according to the specified sizes. The processing can be done in either
+ * transactional or storage mode, depending on the configured {@link ScalarDBMode}.
+ *
+ * @param dataChunkSize the number of records to include in each data chunk for parallel
+ * processing
+ * @param transactionBatchSize the number of records to group together in a single transaction
+ * (only used in transaction mode)
+ * @param reader the {@link BufferedReader} used to read the source file
+ * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status and
+ * results of each data chunk
+ */
+ public abstract ConcurrentHashMap process(
+ int dataChunkSize, int transactionBatchSize, BufferedReader reader);
+
+ /**
+ * Add import event listener to listener list
+ *
+ * @param listener import event listener
+ */
+ public void addListener(ImportEventListener listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Remove import event listener from listener list
+ *
+ * @param listener import event listener
+ */
+ public void removeListener(ImportEventListener listener) {
+ listeners.remove(listener);
+ }
+
+ /**
+ * Notify once the task is completed
+ *
+ * @param result task result object
+ */
+ protected void notifyStorageRecordCompleted(ImportTaskResult result) {
+ // Add data to summary, success logs with/without raw data
+ for (ImportEventListener listener : listeners) {
+ listener.onTaskComplete(result);
+ }
+ }
+
+ /**
+ * Notify once the data chunk process is started
+ *
+ * @param status data chunk status object
+ */
+ protected void notifyDataChunkStarted(ImportDataChunkStatus status) {
+ for (ImportEventListener listener : listeners) {
+ listener.onDataChunkStarted(status);
+ listener.addOrUpdateDataChunkStatus(status);
+ }
+ }
+
+ /**
+ * Notify once the data chunk process is completed
+ *
+ * @param status data chunk status object
+ */
+ protected void notifyDataChunkCompleted(ImportDataChunkStatus status) {
+ for (ImportEventListener listener : listeners) {
+ listener.onDataChunkCompleted(status);
+ listener.addOrUpdateDataChunkStatus(status);
+ }
+ }
+
+ /**
+ * Notify once the import transaction batch is started
+ *
+ * @param batchStatus import transaction batch status object
+ */
+ protected void notifyTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) {
+ for (ImportEventListener listener : listeners) {
+ listener.onTransactionBatchStarted(batchStatus);
+ }
+ }
+
+ /**
+ * Notify once the import transaction batch is completed
+ *
+ * @param batchResult import transaction batch result object
+ */
+ protected void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
+ for (ImportEventListener listener : listeners) {
+ listener.onTransactionBatchCompleted(batchResult);
+ }
+ }
+
+ /** Notify when all data chunks processes are completed */
+ protected void notifyAllDataChunksCompleted() {
+ for (ImportEventListener listener : listeners) {
+ listener.onAllDataChunksCompleted();
+ }
+ }
+
+ /**
+ * Splits a data chunk into smaller transaction batches for processing. This method is used in
+ * transaction mode to group records together for atomic processing.
+ *
+ * @param dataChunk the data chunk to split into batches
+ * @param batchSize the maximum number of records per transaction batch
+ * @return a list of {@link ImportTransactionBatch} objects representing the split batches
+ */
+ private List splitIntoTransactionBatches(
+ ImportDataChunk dataChunk, int batchSize) {
+ List transactionBatches = new ArrayList<>();
+ AtomicInteger transactionBatchIdCounter = new AtomicInteger(0);
+
+ List importRows = dataChunk.getSourceData();
+ for (int i = 0; i < importRows.size(); i += batchSize) {
+ int endIndex = Math.min(i + batchSize, importRows.size());
+ List transactionBatchData = importRows.subList(i, endIndex);
+ int transactionBatchId = transactionBatchIdCounter.getAndIncrement();
+ ImportTransactionBatch transactionBatch =
+ ImportTransactionBatch.builder()
+ .transactionBatchId(transactionBatchId)
+ .sourceData(transactionBatchData)
+ .build();
+ transactionBatches.add(transactionBatch);
+ }
+ return transactionBatches;
+ }
+
+ /**
+ * Processes a single transaction batch within a data chunk. Creates a new transaction, processes
+ * all records in the batch, and commits or aborts the transaction based on the success of all
+ * operations.
+ *
+ * @param dataChunk the parent data chunk containing this batch
+ * @param transactionBatch the batch of records to process in a single transaction
+ * @return an {@link ImportTransactionBatchResult} containing the processing results and any
+ * errors
+ */
+ private ImportTransactionBatchResult processTransactionBatch(
+ ImportDataChunk dataChunk, ImportTransactionBatch transactionBatch) {
+ ImportTransactionBatchStatus status =
+ ImportTransactionBatchStatus.builder()
+ .dataChunkId(dataChunk.getDataChunkId())
+ .transactionBatchId(transactionBatch.getTransactionBatchId())
+ .build();
+ notifyTransactionBatchStarted(status);
+ List importRecordResult = new ArrayList<>();
+ boolean isSuccess;
+ String error = "";
+ DistributedTransaction transaction = null;
+ try {
+ // Create the ScalarDB transaction
+ transaction = params.getDistributedTransactionManager().start();
+
+ // Loop over the transaction batch and process each record
+ for (ImportRow importRow : transactionBatch.getSourceData()) {
+ ImportTaskParams taskParams =
+ ImportTaskParams.builder()
+ .sourceRecord(importRow.getSourceData())
+ .dataChunkId(dataChunk.getDataChunkId())
+ .rowNumber(importRow.getRowNumber())
+ .importOptions(params.getImportOptions())
+ .tableColumnDataTypes(params.getTableColumnDataTypes())
+ .tableMetadataByTableName(params.getTableMetadataByTableName())
+ .dao(params.getDao())
+ .build();
+ importRecordResult.add(new ImportTransactionalTask(taskParams, transaction).execute());
+ }
+ isSuccess =
+ importRecordResult.stream()
+ .allMatch(
+ importTaskResult ->
+ importTaskResult.getTargets().stream()
+ .allMatch(
+ targetResult ->
+ targetResult.getStatus().equals(ImportTargetResultStatus.SAVED)));
+
+ // Check and Commit the transaction
+ if (isSuccess) {
+ transaction.commit();
+ } else {
+ transaction.abort();
+ error = "All transactions are aborted";
+ }
+
+ } catch (TransactionException e) {
+ isSuccess = false;
+ LOGGER.error(e.getMessage());
+ try {
+ if (transaction != null) {
+ transaction.abort(); // Ensure transaction is aborted
+ }
+ } catch (TransactionException abortException) {
+ LOGGER.error(
+ "Failed to abort transaction: {}", abortException.getMessage(), abortException);
+ }
+ error = e.getMessage();
+ }
+ ImportTransactionBatchResult importTransactionBatchResult =
+ ImportTransactionBatchResult.builder()
+ .transactionBatchId(transactionBatch.getTransactionBatchId())
+ .success(isSuccess)
+ .dataChunkId(dataChunk.getDataChunkId())
+ .records(importRecordResult)
+ .errors(Collections.singletonList(error))
+ .build();
+ notifyTransactionBatchCompleted(importTransactionBatchResult);
+ return importTransactionBatchResult;
+ }
+
+ /**
+ * Processes a single record in storage mode (non-transactional). Each record is processed
+ * independently without transaction guarantees.
+ *
+ * @param dataChunk the parent data chunk containing this record
+ * @param importRow the record to process
+ * @return an {@link ImportTaskResult} containing the processing result for the record
+ */
+ private ImportTaskResult processStorageRecord(ImportDataChunk dataChunk, ImportRow importRow) {
+ ImportTaskParams taskParams =
+ ImportTaskParams.builder()
+ .sourceRecord(importRow.getSourceData())
+ .dataChunkId(dataChunk.getDataChunkId())
+ .rowNumber(importRow.getRowNumber())
+ .importOptions(params.getImportOptions())
+ .tableColumnDataTypes(params.getTableColumnDataTypes())
+ .tableMetadataByTableName(params.getTableMetadataByTableName())
+ .dao(params.getDao())
+ .build();
+ ImportTaskResult importRecordResult =
+ new ImportStorageTask(taskParams, params.getDistributedStorage()).execute();
+
+ ImportTaskResult modifiedTaskResult =
+ ImportTaskResult.builder()
+ .rowNumber(importRecordResult.getRowNumber())
+ .rawRecord(importRecordResult.getRawRecord())
+ .targets(importRecordResult.getTargets())
+ .dataChunkId(dataChunk.getDataChunkId())
+ .build();
+ notifyStorageRecordCompleted(modifiedTaskResult);
+ return modifiedTaskResult;
+ }
+
+ /**
+ * Processes a complete data chunk using parallel execution. The processing mode (transactional or
+ * storage) is determined by the configured {@link ScalarDBMode}.
+ *
+ * @param dataChunk the data chunk to process
+ * @param transactionBatchSize the size of transaction batches (used only in transaction mode)
+ * @return an {@link ImportDataChunkStatus} containing the complete processing results and metrics
+ */
+ protected ImportDataChunkStatus processDataChunk(
+ ImportDataChunk dataChunk, int transactionBatchSize) {
+ ImportDataChunkStatus status =
+ ImportDataChunkStatus.builder()
+ .dataChunkId(dataChunk.getDataChunkId())
+ .startTime(Instant.now())
+ .status(ImportDataChunkStatusState.IN_PROGRESS)
+ .build();
+ notifyDataChunkStarted(status);
+ ImportDataChunkStatus importDataChunkStatus;
+ if (params.getScalarDBMode() == ScalarDBMode.TRANSACTION) {
+ importDataChunkStatus = processDataChunkWithTransactions(dataChunk, transactionBatchSize);
+ } else {
+ importDataChunkStatus = processDataChunkWithoutTransactions(dataChunk);
+ }
+ notifyDataChunkCompleted(importDataChunkStatus);
+ return importDataChunkStatus;
+ }
+
+ /**
+ * Processes a data chunk using transaction mode with parallel batch processing. Multiple
+ * transaction batches are processed concurrently using a thread pool.
+ *
+ * @param dataChunk the data chunk to process
+ * @param transactionBatchSize the number of records per transaction batch
+ * @return an {@link ImportDataChunkStatus} containing processing results and metrics
+ */
+ private ImportDataChunkStatus processDataChunkWithTransactions(
+ ImportDataChunk dataChunk, int transactionBatchSize) {
+ Instant startTime = Instant.now();
+ List transactionBatches =
+ splitIntoTransactionBatches(dataChunk, transactionBatchSize);
+ ExecutorService transactionBatchExecutor =
+ Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads());
+ List> transactionBatchFutures = new ArrayList<>();
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger failureCount = new AtomicInteger(0);
+ try {
+ for (ImportTransactionBatch transactionBatch : transactionBatches) {
+ Future> transactionBatchFuture =
+ transactionBatchExecutor.submit(
+ () -> processTransactionBatch(dataChunk, transactionBatch));
+ transactionBatchFutures.add(transactionBatchFuture);
+ }
+
+ waitForFuturesToComplete(transactionBatchFutures);
+ transactionBatchFutures.forEach(
+ batchResult -> {
+ try {
+ ImportTransactionBatchResult importTransactionBatchResult =
+ (ImportTransactionBatchResult) batchResult.get();
+ importTransactionBatchResult
+ .getRecords()
+ .forEach(
+ batchRecords -> {
+ if (batchRecords.getTargets().stream()
+ .allMatch(
+ targetResult ->
+ targetResult
+ .getStatus()
+ .equals(ImportTargetResultStatus.SAVED))) {
+ successCount.incrementAndGet();
+ } else {
+ failureCount.incrementAndGet();
+ }
+ });
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } finally {
+ transactionBatchExecutor.shutdown();
+ }
+ Instant endTime = Instant.now();
+ int totalDuration = (int) Duration.between(startTime, endTime).toMillis();
+ return ImportDataChunkStatus.builder()
+ .dataChunkId(dataChunk.getDataChunkId())
+ .failureCount(failureCount.get())
+ .successCount(successCount.get())
+ .totalRecords(dataChunk.getSourceData().size())
+ .batchCount(transactionBatches.size())
+ .status(ImportDataChunkStatusState.COMPLETE)
+ .startTime(startTime)
+ .endTime(endTime)
+ .totalDurationInMilliSeconds(totalDuration)
+ .build();
+ }
+
+ /**
+ * Processes a data chunk using storage mode with parallel record processing. Individual records
+ * are processed concurrently without transaction guarantees.
+ *
+ * @param dataChunk the data chunk to process
+ * @return an {@link ImportDataChunkStatus} containing processing results and metrics
+ */
+ private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChunk dataChunk) {
+ Instant startTime = Instant.now();
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger failureCount = new AtomicInteger(0);
+ ExecutorService recordExecutor =
+ Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads());
+ List> recordFutures = new ArrayList<>();
+ try {
+ for (ImportRow importRow : dataChunk.getSourceData()) {
+ Future> recordFuture =
+ recordExecutor.submit(() -> processStorageRecord(dataChunk, importRow));
+ recordFutures.add(recordFuture);
+ }
+ waitForFuturesToComplete(recordFutures);
+ recordFutures.forEach(
+ r -> {
+ try {
+ ImportTaskResult result = (ImportTaskResult) r.get();
+ boolean allSaved =
+ result.getTargets().stream()
+ .allMatch(t -> t.getStatus().equals(ImportTargetResultStatus.SAVED));
+ if (allSaved) successCount.incrementAndGet();
+ else failureCount.incrementAndGet();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } finally {
+ recordExecutor.shutdown();
+ }
+ Instant endTime = Instant.now();
+ int totalDuration = (int) Duration.between(startTime, endTime).toMillis();
+ return ImportDataChunkStatus.builder()
+ .dataChunkId(dataChunk.getDataChunkId())
+ .totalRecords(dataChunk.getSourceData().size())
+ .successCount(successCount.get())
+ .failureCount(failureCount.get())
+ .startTime(startTime)
+ .endTime(endTime)
+ .totalDurationInMilliSeconds(totalDuration)
+ .status(ImportDataChunkStatusState.COMPLETE)
+ .build();
+ }
+
+ /**
+ * Waits for all futures in the provided list to complete. Any exceptions during execution are
+ * logged but not propagated.
+ *
+ * @param futures the list of {@link Future} objects to wait for
+ */
+ private void waitForFuturesToComplete(List> futures) {
+ for (Future> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorFactory.java
new file mode 100644
index 0000000000..a84e13de57
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorFactory.java
@@ -0,0 +1,17 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+/**
+ * A factory interface for creating {@link ImportProcessor} instances. This factory follows the
+ * Factory design pattern to encapsulate the creation of specific import processor implementations.
+ */
+public interface ImportProcessorFactory {
+
+ /**
+ * Creates a new instance of an {@link ImportProcessor}.
+ *
+ * @param params The parameters required for configuring the import processor
+ * @return A new {@link ImportProcessor} instance configured with the provided parameters
+ * @throws IllegalArgumentException if the provided parameters are invalid
+ */
+ ImportProcessor createImportProcessor(ImportProcessorParams params);
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java
new file mode 100644
index 0000000000..36b96f62d5
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java
@@ -0,0 +1,43 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransactionManager;
+import com.scalar.db.api.TableMetadata;
+import com.scalar.db.dataloader.core.ScalarDBMode;
+import com.scalar.db.dataloader.core.dataimport.ImportOptions;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
+import java.util.Map;
+import lombok.Builder;
+import lombok.Value;
+
+/**
+ * Parameters class for the import processor containing all necessary components for data import
+ * operations.
+ *
+ * This class is immutable and uses the Builder pattern for construction. It encapsulates all
+ * required parameters and dependencies for processing data imports in ScalarDB.
+ */
+@Builder
+@Value
+public class ImportProcessorParams {
+ /** The operational mode of ScalarDB (transaction or storage mode). */
+ ScalarDBMode scalarDBMode;
+
+ /** Configuration options for the import operation. */
+ ImportOptions importOptions;
+
+ /** Mapping of table names to their corresponding metadata definitions. */
+ Map tableMetadataByTableName;
+
+ /** Data type information for table columns. */
+ TableColumnDataTypes tableColumnDataTypes;
+
+ /** Data Access Object for ScalarDB operations. */
+ ScalarDBDao dao;
+
+ /** Storage interface for non-transactional operations. */
+ DistributedStorage distributedStorage;
+
+ /** Transaction manager for handling transactional operations. */
+ DistributedTransactionManager distributedTransactionManager;
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java
new file mode 100644
index 0000000000..733a5afa96
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java
@@ -0,0 +1,164 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.scalar.db.common.error.CoreError;
+import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A processor for importing JSON data into the database.
+ *
+ * This processor handles JSON files that contain an array of JSON objects. Each object in the
+ * array represents a row to be imported into the database. The processor reads the JSON file,
+ * splits it into chunks of configurable size, and processes these chunks in parallel using multiple
+ * threads.
+ *
+ *
The processing is done in two main phases:
+ *
+ *
+ * - Reading phase: The JSON file is read and split into chunks
+ *
- Processing phase: Each chunk is processed independently and imported into the database
+ *
+ *
+ * The processor uses a producer-consumer pattern where one thread reads the JSON file and
+ * produces data chunks, while a pool of worker threads consumes and processes these chunks.
+ */
+public class JsonImportProcessor extends ImportProcessor {
+
+ private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
+ private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
+
+ public JsonImportProcessor(ImportProcessorParams params) {
+ super(params);
+ }
+
+ /**
+ * Processes the source data from the given import file.
+ *
+ *
This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
+ * batches transactions according to the specified sizes. The method returns a list of {@link
+ * ImportDataChunkStatus} objects, each representing the status of a processed data chunk.
+ *
+ * @param dataChunkSize the number of records to include in each data chunk
+ * @param transactionBatchSize the number of records to include in each transaction batch
+ * @param reader the {@link BufferedReader} used to read the source file
+ * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each
+ * data chunk
+ */
+ @Override
+ public ConcurrentHashMap process(
+ int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
+ ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor();
+ BlockingQueue dataChunkQueue =
+ new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize());
+
+ try {
+ CompletableFuture readerFuture =
+ CompletableFuture.runAsync(
+ () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor);
+
+ ConcurrentHashMap result = new ConcurrentHashMap<>();
+
+ while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) {
+ ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS);
+ if (dataChunk != null) {
+ ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize);
+ result.put(status.getDataChunkId(), status);
+ }
+ }
+
+ readerFuture.join();
+ return result;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e);
+ } finally {
+ dataChunkExecutor.shutdown();
+ try {
+ if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ dataChunkExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ dataChunkExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ notifyAllDataChunksCompleted();
+ }
+ }
+
+ /**
+ * Reads data chunks from the JSON file and adds them to the processing queue.
+ *
+ * This method reads the JSON file as an array of objects, creating data chunks of the
+ * specified size. Each chunk is then added to the queue for processing. The method expects the
+ * JSON file to start with an array token '[' and end with ']'.
+ *
+ *
Empty or null JSON nodes are skipped during processing.
+ *
+ * @param reader the BufferedReader containing the JSON data
+ * @param dataChunkSize the maximum number of records to include in each chunk
+ * @param dataChunkQueue the queue where data chunks are placed for processing
+ * @throws RuntimeException if there is an error reading the JSON file or if the thread is
+ * interrupted
+ */
+ private void readDataChunks(
+ BufferedReader reader, int dataChunkSize, BlockingQueue dataChunkQueue) {
+ try (JsonParser jsonParser = new JsonFactory().createParser(reader)) {
+ if (jsonParser.nextToken() != JsonToken.START_ARRAY) {
+ throw new IOException(CoreError.DATA_LOADER_JSON_CONTENT_START_ERROR.buildMessage());
+ }
+
+ List currentDataChunk = new ArrayList<>();
+ int rowNumber = 1;
+ while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonParser);
+ if (jsonNode == null || jsonNode.isEmpty()) continue;
+
+ currentDataChunk.add(new ImportRow(rowNumber++, jsonNode));
+ if (currentDataChunk.size() == dataChunkSize) {
+ enqueueDataChunk(currentDataChunk, dataChunkQueue);
+ currentDataChunk = new ArrayList<>();
+ }
+ }
+ if (!currentDataChunk.isEmpty()) enqueueDataChunk(currentDataChunk, dataChunkQueue);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(
+ CoreError.DATA_LOADER_JSON_FILE_READ_FAILED.buildMessage(e.getMessage()), e);
+ }
+ }
+
+ /**
+ * Adds a data chunk to the processing queue.
+ *
+ * This method creates a new ImportDataChunk with a unique ID and the provided data, then adds
+ * it to the processing queue. The ID is generated using an atomic counter to ensure thread
+ * safety.
+ *
+ * @param dataChunk the list of ImportRow objects to be processed
+ * @param queue the queue where the data chunk will be added
+ * @throws InterruptedException if the thread is interrupted while waiting to add to the queue
+ */
+ private void enqueueDataChunk(List dataChunk, BlockingQueue queue)
+ throws InterruptedException {
+ int dataChunkId = dataChunkIdCounter.getAndIncrement();
+ queue.put(ImportDataChunk.builder().dataChunkId(dataChunkId).sourceData(dataChunk).build());
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java
new file mode 100644
index 0000000000..a121a106a5
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java
@@ -0,0 +1,156 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.scalar.db.common.error.CoreError;
+import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A processor for importing data from JSON Lines (JSONL) formatted files.
+ *
+ * This processor reads data from files where each line is a valid JSON object. It processes the
+ * input file in chunks, allowing for parallel processing and batched transactions for efficient
+ * data loading.
+ *
+ *
The processor uses a multi-threaded approach with:
+ *
+ *
+ * - A dedicated thread for reading data chunks from the input file
+ *
- Multiple threads for processing data chunks in parallel
+ *
- A queue-based system to manage data chunks between reader and processor threads
+ *
+ */
+public class JsonLinesImportProcessor extends ImportProcessor {
+
+ private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
+ private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
+
+ /**
+ * Creates a new JsonLinesImportProcessor with the specified parameters.
+ *
+ * @param params configuration parameters for the import processor
+ */
+ public JsonLinesImportProcessor(ImportProcessorParams params) {
+ super(params);
+ }
+
+ /**
+ * Processes the source data from the given import file.
+ *
+ * This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
+ * batches transactions according to the specified sizes. The method returns a list of {@link
+ * ImportDataChunkStatus} objects, each representing the status of a processed data chunk.
+ *
+ * @param dataChunkSize the number of records to include in each data chunk
+ * @param transactionBatchSize the number of records to include in each transaction batch
+ * @param reader the {@link BufferedReader} used to read the source file
+ * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each
+ * data chunk
+ */
+ @Override
+ public ConcurrentHashMap process(
+ int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
+ ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor();
+ BlockingQueue dataChunkQueue =
+ new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize());
+
+ try {
+ CompletableFuture readerFuture =
+ CompletableFuture.runAsync(
+ () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor);
+
+ ConcurrentHashMap result = new ConcurrentHashMap<>();
+
+ while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) {
+ ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS);
+ if (dataChunk != null) {
+ ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize);
+ result.put(status.getDataChunkId(), status);
+ }
+ }
+
+ readerFuture.join();
+ return result;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e);
+ } finally {
+ dataChunkExecutor.shutdown();
+ try {
+ if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+ dataChunkExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ dataChunkExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ notifyAllDataChunksCompleted();
+ }
+ }
+
+ /**
+ * Reads data from the input file and creates data chunks for processing.
+ *
+ * This method reads the input file line by line, parsing each line as a JSON object. It
+ * accumulates rows until reaching the specified chunk size, then enqueues the chunk for
+ * processing. Empty lines or invalid JSON objects are skipped.
+ *
+ * @param reader the BufferedReader for reading the input file
+ * @param dataChunkSize the maximum number of rows to include in each data chunk
+ * @param dataChunkQueue the queue where data chunks are placed for processing
+ * @throws RuntimeException if there is an error reading the file or if the thread is interrupted
+ */
+ private void readDataChunks(
+ BufferedReader reader, int dataChunkSize, BlockingQueue dataChunkQueue) {
+ try {
+ List currentDataChunk = new ArrayList<>();
+ int rowNumber = 1;
+ String line;
+ while ((line = reader.readLine()) != null) {
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(line);
+ if (jsonNode == null || jsonNode.isEmpty()) continue;
+
+ currentDataChunk.add(new ImportRow(rowNumber++, jsonNode));
+ if (currentDataChunk.size() == dataChunkSize) {
+ enqueueDataChunk(currentDataChunk, dataChunkQueue);
+ currentDataChunk = new ArrayList<>();
+ }
+ }
+ if (!currentDataChunk.isEmpty()) enqueueDataChunk(currentDataChunk, dataChunkQueue);
+ } catch (IOException | InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ CoreError.DATA_LOADER_JSONLINES_FILE_READ_FAILED.buildMessage(e.getMessage()), e);
+ }
+ }
+
+ /**
+ * Enqueues a data chunk for processing.
+ *
+ * Creates a new ImportDataChunk with a unique ID and adds it to the processing queue.
+ *
+ * @param dataChunk the list of ImportRows to be processed
+ * @param queue the queue where the data chunk should be placed
+ * @throws InterruptedException if the thread is interrupted while waiting to add to the queue
+ */
+ private void enqueueDataChunk(List dataChunk, BlockingQueue queue)
+ throws InterruptedException {
+ int dataChunkId = dataChunkIdCounter.getAndIncrement();
+ queue.put(ImportDataChunk.builder().dataChunkId(dataChunkId).sourceData(dataChunk).build());
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypes.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypes.java
new file mode 100644
index 0000000000..b9684ee64a
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypes.java
@@ -0,0 +1,95 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import com.scalar.db.io.DataType;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A class that maintains a mapping of column data types for database tables.
+ *
+ * This class provides functionality to store and retrieve data type information for table
+ * columns in a database schema. It uses a nested map structure where the outer map keys are table
+ * names and the inner map keys are column names.
+ *
+ *
Example usage:
+ *
+ *
{@code
+ * TableColumnDataTypes types = new TableColumnDataTypes();
+ *
+ * // Add column data types for a table
+ * types.addColumnDataType("users", "id", DataType.INT);
+ * types.addColumnDataType("users", "name", DataType.TEXT);
+ *
+ * // Retrieve data type for a specific column
+ * DataType idType = types.getDataType("users", "id"); // Returns DataType.INT
+ *
+ * // Get all column data types for a table
+ * Map userColumns = types.getColumnDataTypes("users");
+ * }
+ */
+public class TableColumnDataTypes {
+ private final Map> dataTypesByColumnsByTable;
+
+ /**
+ * Constructs a new {@code TableColumnDataTypes} instance with an empty mapping. The internal
+ * structure is initialized as an empty HashMap that will store table names as keys and
+ * column-to-datatype mappings as values.
+ */
+ public TableColumnDataTypes() {
+ this.dataTypesByColumnsByTable = new HashMap<>();
+ }
+
+ /**
+ * Adds a data type for a specific column in a given table.
+ *
+ * If the table doesn't exist in the mapping, a new entry is created automatically. If the
+ * column already exists for the specified table, its data type will be updated with the new
+ * value.
+ *
+ * @param tableName the name of the table
+ * @param columnName the name of the column
+ * @param dataType the data type associated with the column
+ * @throws NullPointerException if any of the parameters is null
+ */
+ public void addColumnDataType(String tableName, String columnName, DataType dataType) {
+ dataTypesByColumnsByTable
+ .computeIfAbsent(tableName, key -> new HashMap<>())
+ .put(columnName, dataType);
+ }
+
+ /**
+ * Retrieves the data type of specific column in a given table.
+ *
+ *
This method performs a lookup in the internal mapping to find the data type associated with
+ * the specified table and column combination.
+ *
+ * @param tableName the name of the table
+ * @param columnName the name of the column
+ * @return the {@link DataType} of the column, or {@code null} if either the table or the column
+ * is not found in the mapping
+ * @throws NullPointerException if any of the parameters is null
+ */
+ public DataType getDataType(String tableName, String columnName) {
+ Map columnDataTypes = dataTypesByColumnsByTable.get(tableName);
+ if (columnDataTypes != null) {
+ return columnDataTypes.get(columnName);
+ }
+ return null;
+ }
+
+ /**
+ * Retrieves all column data types for a given table.
+ *
+ * Returns a map containing all columns and their corresponding data types for the specified
+ * table. The returned map is a direct reference to the internal map, so modifications to it will
+ * affect the internal state.
+ *
+ * @param tableName the name of the table
+ * @return a {@link Map} of column names to their respective {@link DataType}s, or {@code null} if
+ * the table does not exist in the mapping
+ * @throws NullPointerException if tableName is null
+ */
+ public Map getColumnDataTypes(String tableName) {
+ return dataTypesByColumnsByTable.get(tableName);
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java
new file mode 100644
index 0000000000..98d982cac0
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java
@@ -0,0 +1,93 @@
+package com.scalar.db.dataloader.core.dataimport.task;
+
+import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.Result;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException;
+import com.scalar.db.io.Column;
+import com.scalar.db.io.Key;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An import task that interacts with a {@link DistributedStorage} for data retrieval and storage
+ * operations.
+ *
+ * This class extends {@link ImportTask} and provides concrete implementations for fetching and
+ * storing records using a {@link DistributedStorage} instance. It acts as a bridge between the
+ * import process and the underlying distributed storage system.
+ *
+ *
The task handles both read and write operations:
+ *
+ *
+ * - Reading existing records using partition and clustering keys
+ *
- Storing new or updated records with their associated columns
+ *
+ *
+ * All storage operations are performed through the provided {@link DistributedStorage} instance,
+ * which must be properly initialized before creating this task.
+ */
+public class ImportStorageTask extends ImportTask {
+
+ private final DistributedStorage storage;
+
+ /**
+ * Constructs an {@code ImportStorageTask} with the specified parameters and storage.
+ *
+ * @param params the import task parameters containing configuration and DAO objects
+ * @param storage the distributed storage instance to be used for data operations
+ * @throws NullPointerException if either params or storage is null
+ */
+ public ImportStorageTask(ImportTaskParams params, DistributedStorage storage) {
+ super(params);
+ this.storage = storage;
+ }
+
+ /**
+ * Retrieves a data record from the distributed storage using the specified keys.
+ *
+ *
This method attempts to fetch a single record from the specified table using both partition
+ * and clustering keys. The operation is performed through the configured DAO using the associated
+ * storage instance.
+ *
+ * @param namespace the namespace of the table to query
+ * @param tableName the name of the table to query
+ * @param partitionKey the partition key identifying the record's partition
+ * @param clusteringKey the clustering key for further record identification within the partition
+ * @return an {@link Optional} containing the {@link Result} if the record exists, otherwise an
+ * empty {@link Optional}
+ * @throws ScalarDBDaoException if an error occurs during the retrieval operation, such as
+ * connection issues or invalid table/namespace
+ */
+ @Override
+ protected Optional getDataRecord(
+ String namespace, String tableName, Key partitionKey, Key clusteringKey)
+ throws ScalarDBDaoException {
+ return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, this.storage);
+ }
+
+ /**
+ * Saves a record into the distributed storage with the specified keys and columns.
+ *
+ * This method writes or updates a record in the specified table using the provided keys and
+ * column values. The operation is performed through the configured DAO using the associated
+ * storage instance.
+ *
+ * @param namespace the namespace of the target table
+ * @param tableName the name of the target table
+ * @param partitionKey the partition key determining where the record will be stored
+ * @param clusteringKey the clustering key for organizing records within the partition
+ * @param columns the list of columns containing the record's data to be saved
+ * @throws ScalarDBDaoException if an error occurs during the save operation, such as connection
+ * issues, invalid data types, or constraint violations
+ */
+ @Override
+ protected void saveRecord(
+ String namespace,
+ String tableName,
+ Key partitionKey,
+ Key clusteringKey,
+ List> columns)
+ throws ScalarDBDaoException {
+ params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, this.storage);
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java
new file mode 100644
index 0000000000..3be177a00a
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java
@@ -0,0 +1,468 @@
+package com.scalar.db.dataloader.core.dataimport.task;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.scalar.db.api.Result;
+import com.scalar.db.api.TableMetadata;
+import com.scalar.db.common.error.CoreError;
+import com.scalar.db.dataloader.core.dataimport.ImportMode;
+import com.scalar.db.dataloader.core.dataimport.ImportOptions;
+import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
+import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
+import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException;
+import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes;
+import com.scalar.db.dataloader.core.dataimport.task.mapping.ImportDataMapping;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
+import com.scalar.db.dataloader.core.dataimport.task.validation.ImportSourceRecordValidationResult;
+import com.scalar.db.dataloader.core.dataimport.task.validation.ImportSourceRecordValidator;
+import com.scalar.db.dataloader.core.exception.Base64Exception;
+import com.scalar.db.dataloader.core.exception.ColumnParsingException;
+import com.scalar.db.dataloader.core.util.ColumnUtils;
+import com.scalar.db.dataloader.core.util.KeyUtils;
+import com.scalar.db.dataloader.core.util.TableMetadataUtil;
+import com.scalar.db.io.Column;
+import com.scalar.db.io.DataType;
+import com.scalar.db.io.Key;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Abstract base class for handling data import tasks into ScalarDB tables. This class provides
+ * functionality to import data into single or multiple tables based on the provided import options
+ * and control file configurations.
+ */
+@RequiredArgsConstructor
+public abstract class ImportTask {
+
+ protected final ImportTaskParams params;
+
+ /**
+ * Executes the import task by importing data into one or more database tables. If a control file
+ * is specified in the import options, performs a multi-table import. Otherwise, performs a single
+ * table import.
+ *
+ * @return ImportTaskResult containing the results of the import operation including
+ * success/failure status and any error messages for each target table
+ */
+ public ImportTaskResult execute() {
+
+ ObjectNode mutableSourceRecord = params.getSourceRecord().deepCopy();
+ ImportOptions importOptions = params.getImportOptions();
+
+ // Single table import
+ if (importOptions.getControlFile() == null) {
+ String tableLookupKey =
+ TableMetadataUtil.getTableLookupKey(
+ importOptions.getNamespace(), importOptions.getTableName());
+ ImportTargetResult singleTargetResult =
+ importIntoSingleTable(
+ importOptions.getNamespace(),
+ importOptions.getTableName(),
+ params.getTableMetadataByTableName().get(tableLookupKey),
+ params.getTableColumnDataTypes().getColumnDataTypes(tableLookupKey),
+ null,
+ mutableSourceRecord);
+ // Add the single target result to the list of targets and return the result
+ return ImportTaskResult.builder()
+ .rawRecord(params.getSourceRecord())
+ .rowNumber(params.getRowNumber())
+ .targets(Collections.singletonList(singleTargetResult))
+ .build();
+ }
+
+ // Multi-table import
+ List multiTargetResults =
+ startMultiTableImportProcess(
+ importOptions.getControlFile(),
+ params.getTableMetadataByTableName(),
+ params.getTableColumnDataTypes(),
+ mutableSourceRecord);
+
+ return ImportTaskResult.builder()
+ .targets(multiTargetResults)
+ .rawRecord(params.getSourceRecord())
+ .rowNumber(params.getRowNumber())
+ .build();
+ }
+
+ /**
+ * Processes multi-table import based on the control file configuration. For each table specified
+ * in the control file, validates the source data and performs the import operation.
+ *
+ * @param controlFile control file which maps source data columns to target table columns
+ * @param tableMetadataByTableName map of table metadata indexed by table name
+ * @param tableColumnDataTypes map of column data types indexed by table name
+ * @param mutableSourceRecord source record data that can be modified during import
+ * @return List of ImportTargetResult objects containing the results for each table import
+ */
+ private List startMultiTableImportProcess(
+ ControlFile controlFile,
+ Map tableMetadataByTableName,
+ TableColumnDataTypes tableColumnDataTypes,
+ ObjectNode mutableSourceRecord) {
+
+ List targetResults = new ArrayList<>();
+
+ // Import for every table mapping specified in the control file
+ for (ControlFileTable controlFileTable : controlFile.getTables()) {
+ for (ControlFileTableFieldMapping mapping : controlFileTable.getMappings()) {
+ if (!mutableSourceRecord.has(mapping.getSourceField())
+ && !mutableSourceRecord.has(mapping.getTargetColumn())) {
+ String errorMessage =
+ CoreError.DATA_LOADER_MISSING_SOURCE_FIELD.buildMessage(
+ mapping.getSourceField(), controlFileTable.getTable());
+
+ ImportTargetResult targetResult =
+ ImportTargetResult.builder()
+ .namespace(controlFileTable.getNamespace())
+ .tableName(controlFileTable.getTable())
+ .errors(Collections.singletonList(errorMessage))
+ .status(ImportTargetResultStatus.VALIDATION_FAILED)
+ .build();
+ return Collections.singletonList(targetResult);
+ }
+ }
+
+ // Import into a single table
+ String tableLookupKey = TableMetadataUtil.getTableLookupKey(controlFileTable);
+ TableMetadata tableMetadata = tableMetadataByTableName.get(tableLookupKey);
+ Map dataTypesByColumns =
+ tableColumnDataTypes.getColumnDataTypes(tableLookupKey);
+ // Copied data to an object node data was overwritten by following operations and data check
+ // fails when same object is referenced again in logic before
+ ObjectNode copyNode = mutableSourceRecord.deepCopy();
+ ImportTargetResult result =
+ importIntoSingleTable(
+ controlFileTable.getNamespace(),
+ controlFileTable.getTable(),
+ tableMetadata,
+ dataTypesByColumns,
+ controlFileTable,
+ copyNode);
+ targetResults.add(result);
+ }
+ return targetResults;
+ }
+
+ /**
+ * Imports data into a single table with validation and error handling. The method performs the
+ * following steps: 1. Validates table metadata and source record 2. Creates partition and
+ * clustering keys 3. Determines whether to insert or update based on existing data 4. Applies the
+ * import operation according to specified import mode
+ *
+ * @param namespace database namespace name
+ * @param table target table name
+ * @param tableMetadata metadata describing the table structure
+ * @param dataTypeByColumnName map of column names to their data types
+ * @param controlFileTable optional control file table configuration for column mapping
+ * @param mutableSourceRecord source record to be imported
+ * @return ImportTargetResult containing the result of the import operation
+ */
+ private ImportTargetResult importIntoSingleTable(
+ String namespace,
+ String table,
+ TableMetadata tableMetadata,
+ Map dataTypeByColumnName,
+ ControlFileTable controlFileTable,
+ ObjectNode mutableSourceRecord) {
+
+ ImportOptions importOptions = params.getImportOptions();
+
+ if (dataTypeByColumnName == null || tableMetadata == null) {
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .status(ImportTargetResultStatus.VALIDATION_FAILED)
+ .errors(
+ Collections.singletonList(
+ CoreError.DATA_LOADER_TABLE_METADATA_MISSING.buildMessage()))
+ .build();
+ }
+
+ LinkedHashSet partitionKeyNames = tableMetadata.getPartitionKeyNames();
+ LinkedHashSet clusteringKeyNames = tableMetadata.getClusteringKeyNames();
+ LinkedHashSet columnNames = tableMetadata.getColumnNames();
+
+ applyDataMapping(controlFileTable, mutableSourceRecord);
+
+ boolean checkForMissingColumns = shouldCheckForMissingColumns(importOptions);
+
+ ImportSourceRecordValidationResult validationResult =
+ validateSourceRecord(
+ partitionKeyNames,
+ clusteringKeyNames,
+ columnNames,
+ mutableSourceRecord,
+ checkForMissingColumns,
+ tableMetadata);
+
+ if (!validationResult.isValid()) {
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .status(ImportTargetResultStatus.VALIDATION_FAILED)
+ .errors(validationResult.getErrorMessages())
+ .build();
+ }
+
+ Optional optionalPartitionKey =
+ KeyUtils.createPartitionKeyFromSource(
+ partitionKeyNames, dataTypeByColumnName, mutableSourceRecord);
+ if (!optionalPartitionKey.isPresent()) {
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .status(ImportTargetResultStatus.VALIDATION_FAILED)
+ .errors(
+ Collections.singletonList(
+ CoreError.DATA_LOADER_COULD_NOT_FIND_PARTITION_KEY.buildMessage()))
+ .build();
+ }
+ Optional optionalClusteringKey = Optional.empty();
+ if (!clusteringKeyNames.isEmpty()) {
+ optionalClusteringKey =
+ KeyUtils.createClusteringKeyFromSource(
+ clusteringKeyNames, dataTypeByColumnName, mutableSourceRecord);
+ if (!optionalClusteringKey.isPresent()) {
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .status(ImportTargetResultStatus.VALIDATION_FAILED)
+ .errors(
+ Collections.singletonList(
+ CoreError.DATA_LOADER_COULD_NOT_FIND_CLUSTERING_KEY.buildMessage()))
+ .build();
+ }
+ }
+
+ Optional optionalScalarDBResult;
+
+ try {
+ optionalScalarDBResult =
+ getDataRecord(
+ namespace, table, optionalPartitionKey.get(), optionalClusteringKey.orElse(null));
+ } catch (ScalarDBDaoException e) {
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .status(ImportTargetResultStatus.RETRIEVAL_FAILED)
+ .errors(Collections.singletonList(e.getMessage()))
+ .build();
+ }
+ ImportTaskAction importAction =
+ optionalScalarDBResult.isPresent() ? ImportTaskAction.UPDATE : ImportTaskAction.INSERT;
+
+ if (importAction == ImportTaskAction.INSERT
+ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) {
+ ImportSourceRecordValidationResult validationResultForMissingColumns =
+ new ImportSourceRecordValidationResult();
+ ImportSourceRecordValidator.checkMissingColumns(
+ mutableSourceRecord, columnNames, validationResultForMissingColumns, tableMetadata);
+ if (!validationResultForMissingColumns.isValid()) {
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .status(ImportTargetResultStatus.MISSING_COLUMNS)
+ .errors(
+ Collections.singletonList(
+ CoreError.DATA_LOADER_UPSERT_INSERT_MISSING_COLUMNS.buildMessage()))
+ .build();
+ }
+ }
+
+ if (shouldFailForExistingData(importAction, importOptions)) {
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .importedRecord(mutableSourceRecord)
+ .importAction(importAction)
+ .status(ImportTargetResultStatus.DATA_ALREADY_EXISTS)
+ .errors(
+ Collections.singletonList(CoreError.DATA_LOADER_DATA_ALREADY_EXISTS.buildMessage()))
+ .build();
+ }
+
+ if (shouldFailForMissingData(importAction, importOptions)) {
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .importedRecord(mutableSourceRecord)
+ .importAction(importAction)
+ .status(ImportTargetResultStatus.DATA_NOT_FOUND)
+ .errors(Collections.singletonList(CoreError.DATA_LOADER_DATA_NOT_FOUND.buildMessage()))
+ .build();
+ }
+
+ List> columns;
+
+ try {
+ columns =
+ ColumnUtils.getColumnsFromResult(
+ optionalScalarDBResult.orElse(null),
+ mutableSourceRecord,
+ importOptions.isIgnoreNullValues(),
+ tableMetadata);
+ } catch (Base64Exception | ColumnParsingException e) {
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .status(ImportTargetResultStatus.VALIDATION_FAILED)
+ .errors(Collections.singletonList(e.getMessage()))
+ .build();
+ }
+
+ // Save the record
+ try {
+ saveRecord(
+ namespace,
+ table,
+ optionalPartitionKey.get(),
+ optionalClusteringKey.orElse(null),
+ columns);
+
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .importAction(importAction)
+ .importedRecord(mutableSourceRecord)
+ .status(ImportTargetResultStatus.SAVED)
+ .build();
+
+ } catch (ScalarDBDaoException e) {
+ return ImportTargetResult.builder()
+ .namespace(namespace)
+ .tableName(table)
+ .importAction(importAction)
+ .status(ImportTargetResultStatus.SAVE_FAILED)
+ .errors(Collections.singletonList(e.getMessage()))
+ .build();
+ }
+ }
+
+ /**
+ * Applies data mapping to the given source record based on the specified control file table.
+ *
+ * @param controlFileTable the control file table containing column mappings
+ * @param mutableSourceRecord the source record to be modified based on the mappings
+ */
+ private void applyDataMapping(ControlFileTable controlFileTable, ObjectNode mutableSourceRecord) {
+ if (controlFileTable != null) {
+ ImportDataMapping.apply(mutableSourceRecord, controlFileTable);
+ }
+ }
+
+ /**
+ * Determines whether missing columns should be checked based on import options.
+ *
+ * @param importOptions the import options to evaluate
+ * @return {@code true} if missing columns should be checked, otherwise {@code false}
+ */
+ private boolean shouldCheckForMissingColumns(ImportOptions importOptions) {
+ return importOptions.getImportMode() == ImportMode.INSERT
+ || importOptions.isRequireAllColumns();
+ }
+
+ /**
+ * Validates a source record against the given table metadata and constraints.
+ *
+ * @param partitionKeyNames the set of partition key names
+ * @param clusteringKeyNames the set of clustering key names
+ * @param columnNames the set of expected column names
+ * @param mutableSourceRecord the source record to be validated
+ * @param checkForMissingColumns whether to check for missing columns
+ * @param tableMetadata the table metadata containing schema details
+ * @return the validation result containing any validation errors or success status
+ */
+ private ImportSourceRecordValidationResult validateSourceRecord(
+ LinkedHashSet partitionKeyNames,
+ LinkedHashSet clusteringKeyNames,
+ LinkedHashSet columnNames,
+ ObjectNode mutableSourceRecord,
+ boolean checkForMissingColumns,
+ TableMetadata tableMetadata) {
+ return ImportSourceRecordValidator.validateSourceRecord(
+ partitionKeyNames,
+ clusteringKeyNames,
+ columnNames,
+ mutableSourceRecord,
+ checkForMissingColumns,
+ tableMetadata);
+ }
+
+ /**
+ * Determines whether missing columns should be revalidated when performing an upsert operation.
+ *
+ * @param importOptions the import options to evaluate
+ * @param checkForMissingColumns whether missing columns were initially checked
+ * @return {@code true} if missing columns should be revalidated, otherwise {@code false}
+ */
+ private boolean shouldRevalidateMissingColumns(
+ ImportOptions importOptions, boolean checkForMissingColumns) {
+ return !checkForMissingColumns && importOptions.getImportMode() == ImportMode.UPSERT;
+ }
+
+ /**
+ * Determines whether the operation should fail if data already exists.
+ *
+ * @param importAction the action being performed (e.g., INSERT, UPDATE)
+ * @param importOptions the import options specifying behavior
+ * @return {@code true} if the operation should fail for existing data, otherwise {@code false}
+ */
+ private boolean shouldFailForExistingData(
+ ImportTaskAction importAction, ImportOptions importOptions) {
+ return importAction == ImportTaskAction.UPDATE
+ && importOptions.getImportMode() == ImportMode.INSERT;
+ }
+
+ /**
+ * Determines whether the operation should fail if the expected data is missing.
+ *
+ * @param importAction the action being performed (e.g., INSERT, UPDATE)
+ * @param importOptions the import options specifying behavior
+ * @return {@code true} if the operation should fail for missing data, otherwise {@code false}
+ */
+ private boolean shouldFailForMissingData(
+ ImportTaskAction importAction, ImportOptions importOptions) {
+ return importAction == ImportTaskAction.INSERT
+ && importOptions.getImportMode() == ImportMode.UPDATE;
+ }
+
+ /**
+ * Retrieves an existing record from the database if it exists.
+ *
+ * @param namespace the database namespace
+ * @param tableName the target table name
+ * @param partitionKey the partition key for the record
+ * @param clusteringKey the clustering key for the record (can be null)
+ * @return Optional containing the Result if found, empty if not found
+ * @throws ScalarDBDaoException if there is an error accessing the database
+ */
+ protected abstract Optional getDataRecord(
+ String namespace, String tableName, Key partitionKey, Key clusteringKey)
+ throws ScalarDBDaoException;
+
+ /**
+ * Saves a record to the database, either as an insert or update operation.
+ *
+ * @param namespace the database namespace
+ * @param tableName the target table name
+ * @param partitionKey the partition key for the record
+ * @param clusteringKey the clustering key for the record (can be null)
+ * @param columns the columns and their values to be saved
+ * @throws ScalarDBDaoException if there is an error saving to the database
+ */
+ protected abstract void saveRecord(
+ String namespace,
+ String tableName,
+ Key partitionKey,
+ Key clusteringKey,
+ List> columns)
+ throws ScalarDBDaoException;
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java
new file mode 100644
index 0000000000..eafe3a42ae
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java
@@ -0,0 +1,41 @@
+package com.scalar.db.dataloader.core.dataimport.task;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.scalar.db.api.TableMetadata;
+import com.scalar.db.dataloader.core.dataimport.ImportOptions;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
+import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes;
+import java.util.Map;
+import lombok.Builder;
+import lombok.NonNull;
+import lombok.Value;
+
+/**
+ * Parameters required for executing an import task in the data loader. This class encapsulates all
+ * necessary information needed to process and import a single record into ScalarDB.
+ */
+@Builder
+@Value
+public class ImportTaskParams {
+
+ /** The source record to be imported, represented as a JSON node */
+ @NonNull JsonNode sourceRecord;
+
+ /** Identifier for the current chunk of data being processed */
+ int dataChunkId;
+
+ /** The row number of the current record in the source data */
+ int rowNumber;
+
+ /** Configuration options for the import process */
+ @NonNull ImportOptions importOptions;
+
+ /** Mapping of table names to their corresponding metadata */
+ @NonNull Map tableMetadataByTableName;
+
+ /** Data type information for table columns */
+ @NonNull TableColumnDataTypes tableColumnDataTypes;
+
+ /** Data Access Object for interacting with ScalarDB */
+ @NonNull ScalarDBDao dao;
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java
new file mode 100644
index 0000000000..449270d929
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java
@@ -0,0 +1,106 @@
+package com.scalar.db.dataloader.core.dataimport.task;
+
+import com.scalar.db.api.DistributedTransaction;
+import com.scalar.db.api.Result;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException;
+import com.scalar.db.exception.transaction.AbortException;
+import com.scalar.db.exception.transaction.TransactionException;
+import com.scalar.db.io.Column;
+import com.scalar.db.io.Key;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An import task that operates within a {@link DistributedTransaction} context.
+ *
+ * This class extends {@link ImportTask} and provides transactional semantics for data import
+ * operations. It ensures that all data operations (get and put) are executed within the same
+ * transaction context, maintaining ACID properties.
+ *
+ *
The task uses a single {@link DistributedTransaction} instance throughout its lifecycle, which
+ * is passed during construction. This transaction must be managed (committed or aborted) by the
+ * caller.
+ */
+public class ImportTransactionalTask extends ImportTask {
+
+ private final DistributedTransaction transaction;
+
+ /**
+ * Constructs an {@code ImportTransactionalTask} with the specified parameters and transaction.
+ *
+ * @param params the import task parameters containing configuration and DAO objects
+ * @param transaction the distributed transaction to be used for all data operations. This
+ * transaction should be properly managed (committed/aborted) by the caller
+ */
+ public ImportTransactionalTask(ImportTaskParams params, DistributedTransaction transaction) {
+ super(params);
+ this.transaction = transaction;
+ }
+
+ /**
+ * Retrieves a data record within the active transaction context.
+ *
+ *
This method overrides the base implementation to ensure the get operation is executed within
+ * the transaction context provided during construction.
+ *
+ * @param namespace the namespace of the table to query
+ * @param tableName the name of the table to query
+ * @param partitionKey the partition key identifying the record's partition
+ * @param clusteringKey the clustering key for further record identification within the partition
+ * @return an {@link Optional} containing the {@link Result} if the record exists, otherwise an
+ * empty {@link Optional}
+ * @throws ScalarDBDaoException if an error occurs during the database operation or if the
+ * transaction encounters any issues
+ */
+ @Override
+ protected Optional getDataRecord(
+ String namespace, String tableName, Key partitionKey, Key clusteringKey)
+ throws ScalarDBDaoException {
+ return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, transaction);
+ }
+
+ /**
+ * Saves a record within the active transaction context.
+ *
+ * This method overrides the base implementation to ensure the put operation is executed within
+ * the transaction context provided during construction.
+ *
+ * @param namespace the namespace of the target table
+ * @param tableName the name of the target table
+ * @param partitionKey the partition key determining where the record will be stored
+ * @param clusteringKey the clustering key for ordering/organizing records within the partition
+ * @param columns the list of columns containing the actual data to be saved
+ * @throws ScalarDBDaoException if an error occurs during the database operation or if the
+ * transaction encounters any issues
+ */
+ @Override
+ protected void saveRecord(
+ String namespace,
+ String tableName,
+ Key partitionKey,
+ Key clusteringKey,
+ List> columns)
+ throws ScalarDBDaoException {
+ params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, transaction);
+ }
+
+ /**
+ * Aborts the active ScalarDB transaction if it has not been committed.
+ *
+ * This method provides a safe way to abort an active transaction, handling any abort-related
+ * exceptions by wrapping them in a {@link TransactionException}.
+ *
+ * @param tx the transaction to be aborted. If null, this method does nothing
+ * @throws TransactionException if an error occurs during the abort operation or if the underlying
+ * abort operation fails
+ */
+ private void abortActiveTransaction(DistributedTransaction tx) throws TransactionException {
+ if (tx != null) {
+ try {
+ tx.abort();
+ } catch (AbortException e) {
+ throw new TransactionException(e.getMessage(), tx.getId());
+ }
+ }
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java
index 58f10d0f84..7f8255e0f8 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java
@@ -1,7 +1,11 @@
package com.scalar.db.dataloader.core.util;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.scalar.db.api.Result;
+import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.ColumnInfo;
+import com.scalar.db.dataloader.core.exception.Base64Exception;
import com.scalar.db.dataloader.core.exception.ColumnParsingException;
import com.scalar.db.io.BigIntColumn;
import com.scalar.db.io.BlobColumn;
@@ -12,15 +16,34 @@
import com.scalar.db.io.FloatColumn;
import com.scalar.db.io.IntColumn;
import com.scalar.db.io.TextColumn;
+import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils;
+import java.util.ArrayList;
import java.util.Base64;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
/**
* Utility class for creating and managing ScalarDB columns.
*
- *
This class provides methods for creating ScalarDB columns based on the given data type, column
- * information, and value. It includes handling for various data types and special cases like base64
- * encoding for BLOB data.
+ *
This class provides utility methods for:
+ *
+ *
+ * - Creating ScalarDB columns from various data types and values
+ *
- Converting between ScalarDB Result objects and column data
+ *
- Handling special data formats like base64 encoding for BLOB data
+ *
- Managing transaction-related metadata columns
+ *
+ *
+ * The class supports all ScalarDB data types including:
+ *
+ *
+ * - Basic types: BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, TEXT
+ *
- Binary data: BLOB (requires base64 encoding)
+ *
- Temporal types: DATE, TIME, TIMESTAMP, TIMESTAMPTZ
+ *
*/
public final class ColumnUtils {
@@ -30,15 +53,22 @@ private ColumnUtils() {}
/**
* Creates a ScalarDB column from the given data type, column information, and value.
*
- * Blob source values need to be base64 encoded before passing them as a value. If the value is
- * {@code null}, the corresponding column is created as a {@code null} column.
+ *
This method handles the creation of columns for all supported ScalarDB data types. For BLOB
+ * type columns, the input value must be base64 encoded before being passed to this method.
*
- * @param dataType the data type of the specified column
- * @param columnInfo the ScalarDB table column information
- * @param value the value for the ScalarDB column (may be {@code null})
+ *
If the provided value is {@code null}, a null column of the appropriate type is created.
+ *
+ * @param dataType the data type of the specified column (e.g., BOOLEAN, INT, TEXT, etc.)
+ * @param columnInfo the ScalarDB table column information containing column name and metadata
+ * @param value the string representation of the value for the ScalarDB column (maybe {@code
+ * null})
* @return the ScalarDB column created from the specified data
- * @throws ColumnParsingException if an error occurs while creating the column or parsing the
- * value
+ * @throws ColumnParsingException if an error occurs while creating the column, such as:
+ *
+ * - Invalid number format for numeric types
+ *
- Invalid base64 encoding for BLOB type
+ *
- Invalid date/time format for temporal types
+ *
*/
public static Column> createColumnFromValue(
DataType dataType, ColumnInfo columnInfo, @Nullable String value)
@@ -88,4 +118,155 @@ public static Column> createColumnFromValue(
e);
}
}
+
+ /**
+ * Retrieves columns from a ScalarDB Result object, comparing with source data and handling
+ * metadata.
+ *
+ * This method processes the result data while:
+ *
+ *
+ * - Excluding transaction metadata columns
+ *
- Excluding partition and clustering key columns
+ *
- Handling null values based on the ignoreNullValues parameter
+ *
- Merging data from both ScalarDB Result and source record
+ *
+ *
+ * @param scalarDBResult the ScalarDB Result object containing the current data
+ * @param sourceRecord the source data in JSON format to compare against
+ * @param ignoreNullValues if true, null values will be excluded from the result
+ * @param tableMetadata metadata about the table structure and column types
+ * @return a List of Column objects representing the processed data
+ * @throws Base64Exception if there's an error processing base64 encoded BLOB data
+ * @throws ColumnParsingException if there's an error parsing column values
+ */
+ public static List> getColumnsFromResult(
+ Result scalarDBResult,
+ JsonNode sourceRecord,
+ boolean ignoreNullValues,
+ TableMetadata tableMetadata)
+ throws Base64Exception, ColumnParsingException {
+
+ List> columns = new ArrayList<>();
+ Set columnsToIgnore =
+ getColumnsToIgnore(
+ tableMetadata.getPartitionKeyNames(), tableMetadata.getClusteringKeyNames());
+ for (String columnName : tableMetadata.getColumnNames()) {
+ if (ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata)
+ || columnsToIgnore.contains(columnName)) {
+ continue;
+ }
+
+ Column> column =
+ getColumn(
+ scalarDBResult,
+ sourceRecord,
+ columnName,
+ ignoreNullValues,
+ tableMetadata.getColumnDataTypes());
+
+ if (column != null) {
+ columns.add(column);
+ }
+ }
+
+ return columns;
+ }
+
+ /**
+ * Creates a set of column names that should be ignored during processing.
+ *
+ * This method combines:
+ *
+ *
+ * - Transaction metadata columns
+ *
- Partition key columns
+ *
- Clustering key columns
+ *
+ *
+ * @param partitionKeyNames set of column names that are partition keys
+ * @param clusteringKeyNames set of column names that are clustering keys
+ * @return a Set of column names that should be ignored during processing
+ */
+ private static Set getColumnsToIgnore(
+ Set partitionKeyNames, Set clusteringKeyNames) {
+ Set columnsToIgnore =
+ new HashSet<>(ConsensusCommitUtils.getTransactionMetaColumns().keySet());
+ columnsToIgnore.addAll(partitionKeyNames);
+ columnsToIgnore.addAll(clusteringKeyNames);
+ return columnsToIgnore;
+ }
+
+ /**
+ * Retrieves a column value by comparing ScalarDB Result data with source record data.
+ *
+ * This method determines which data source to use for the column value:
+ *
+ *
+ * - If the column exists in ScalarDB Result but not in source record, uses Result data
+ *
- Otherwise, uses the source record data
+ *
+ *
+ * @param scalarDBResult the ScalarDB Result object containing current data
+ * @param sourceRecord the source data in JSON format
+ * @param columnName the name of the column to retrieve
+ * @param ignoreNullValues whether to ignore null values in the result
+ * @param dataTypesByColumns mapping of column names to their data types
+ * @return the Column object containing the value, or null if ignored
+ * @throws ColumnParsingException if there's an error parsing the column value
+ */
+ private static Column> getColumn(
+ @Nullable Result scalarDBResult,
+ JsonNode sourceRecord,
+ String columnName,
+ boolean ignoreNullValues,
+ Map dataTypesByColumns)
+ throws ColumnParsingException {
+ if (scalarDBResult != null && !sourceRecord.has(columnName)) {
+ return getColumnFromResult(scalarDBResult, columnName);
+ } else {
+ return getColumnFromSourceRecord(
+ sourceRecord, columnName, ignoreNullValues, dataTypesByColumns);
+ }
+ }
+
+ /**
+ * Get column from result
+ *
+ * @param scalarDBResult result record
+ * @param columnName column name
+ * @return column data
+ */
+ private static Column> getColumnFromResult(Result scalarDBResult, String columnName) {
+ Map> columnValues = scalarDBResult.getColumns();
+ return columnValues.get(columnName);
+ }
+
+ /**
+ * Get column from result
+ *
+ * @param sourceRecord source data
+ * @param columnName column name
+ * @param ignoreNullValues ignore null values or not
+ * @param dataTypesByColumns data types of columns
+ * @return column data
+ * @throws ColumnParsingException if an error occurs while parsing the column
+ */
+ private static Column> getColumnFromSourceRecord(
+ JsonNode sourceRecord,
+ String columnName,
+ boolean ignoreNullValues,
+ Map dataTypesByColumns)
+ throws ColumnParsingException {
+ DataType dataType = dataTypesByColumns.get(columnName);
+ String columnValue =
+ sourceRecord.has(columnName) && !sourceRecord.get(columnName).isNull()
+ ? sourceRecord.get(columnName).asText()
+ : null;
+ if (!ignoreNullValues || columnValue != null) {
+ ColumnInfo columnInfo = ColumnInfo.builder().columnName(columnName).build();
+ return createColumnFromValue(dataType, columnInfo, columnValue);
+ }
+ return null;
+ }
}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java
index c2491df0f4..e3433a31b5 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java
@@ -1,21 +1,38 @@
package com.scalar.db.dataloader.core.util;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.ColumnInfo;
import com.scalar.db.dataloader.core.ColumnKeyValue;
+import com.scalar.db.dataloader.core.exception.Base64Exception;
import com.scalar.db.dataloader.core.exception.ColumnParsingException;
import com.scalar.db.dataloader.core.exception.KeyParsingException;
import com.scalar.db.io.Column;
import com.scalar.db.io.DataType;
import com.scalar.db.io.Key;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import javax.annotation.Nullable;
/**
* Utility class for creating and managing ScalarDB keys.
*
- * This class provides methods to parse key-value pairs and create ScalarDB key instances. It
- * also includes utility methods for handling data types, columns, and potential parsing exceptions.
+ *
This class provides utility methods for:
+ *
+ *
+ * - Creating partition and clustering keys from source records
+ *
- Parsing key-value pairs into ScalarDB Key instances
+ *
- Creating composite keys from multiple columns
+ *
+ *
+ * The class handles proper type conversion and validation of keys according to the table
+ * metadata and column data types. It also provides comprehensive error handling for various
+ * key-related operations.
*/
public final class KeyUtils {
@@ -23,20 +40,67 @@ public final class KeyUtils {
private KeyUtils() {}
/**
- * Converts a key-value pair, in the format of =, into a ScalarDB Key instance for a
- * specific ScalarDB table.
+ * Creates an {@link Optional} clustering key from the given source record.
+ *
+ * This method constructs a clustering key by extracting values from the source record for each
+ * clustering key column. If any required clustering key column is missing from the source record
+ * or if there's an error in data conversion, an empty Optional is returned.
+ *
+ * @param clusteringKeyNames A set of column names that make up the clustering key
+ * @param dataTypeByColumnName A map defining the data type for each column name
+ * @param sourceRecord The source record containing the column values
+ * @return An {@link Optional} containing the clustering key if all required columns exist and are
+ * valid, otherwise {@link Optional#empty()}
+ */
+ public static Optional createClusteringKeyFromSource(
+ Set clusteringKeyNames,
+ Map dataTypeByColumnName,
+ ObjectNode sourceRecord) {
+ return clusteringKeyNames.isEmpty()
+ ? Optional.empty()
+ : createKeyFromSource(clusteringKeyNames, dataTypeByColumnName, sourceRecord);
+ }
+
+ /**
+ * Creates an {@link Optional} partition key from the given source record.
*
- * This method uses the provided table metadata to determine the data type for the key and
- * creates a corresponding ScalarDB Key. If the key does not match any column in the table
- * metadata, a {@link KeyParsingException} is thrown.
+ *
This method constructs a partition key by extracting values from the source record for each
+ * partition key column. If any required partition key column is missing from the source record or
+ * if there's an error in data conversion, an empty Optional is returned.
*
- * @param columnKeyValue a key-value pair in the format of =
- * @param namespace the name of the ScalarDB namespace
- * @param tableName the name of the ScalarDB table
- * @param tableMetadata metadata for the ScalarDB table
- * @return a new ScalarDB Key instance formatted according to the data type
- * @throws KeyParsingException if there is an error parsing the key value or if the column does
- * not exist
+ * @param partitionKeyNames A set of column names that make up the partition key
+ * @param dataTypeByColumnName A map defining the data type for each column name
+ * @param sourceRecord The source record containing the column values
+ * @return An {@link Optional} containing the partition key if all required columns exist and are
+ * valid, otherwise {@link Optional#empty()}
+ */
+ public static Optional createPartitionKeyFromSource(
+ Set partitionKeyNames,
+ Map dataTypeByColumnName,
+ ObjectNode sourceRecord) {
+ return createKeyFromSource(partitionKeyNames, dataTypeByColumnName, sourceRecord);
+ }
+
+ /**
+ * Converts a key-value pair into a ScalarDB Key instance for a specific ScalarDB table.
+ *
+ * This method performs the following steps:
+ *
+ *
+ * - Validates that the column exists in the table metadata
+ *
- Determines the correct data type for the column
+ *
- Converts the value to the appropriate type
+ *
- Creates and returns a new ScalarDB Key instance
+ *
+ *
+ * @param columnKeyValue A key-value pair containing the column name and value
+ * @param namespace The name of the ScalarDB namespace
+ * @param tableName The name of the ScalarDB table
+ * @param tableMetadata Metadata for the ScalarDB table
+ * @return A new ScalarDB Key instance formatted according to the data type, or null if
+ * columnKeyValue is null
+ * @throws KeyParsingException If the column doesn't exist in the table or if there's an error
+ * parsing the value
*/
@Nullable
public static Key parseKeyValue(
@@ -67,14 +131,16 @@ public static Key parseKeyValue(
/**
* Creates a ScalarDB key based on the provided data type, column information, and value.
*
- * This method creates a ScalarDB Key instance by converting the column value to the
- * appropriate data type and constructing the key using that value.
+ *
This method handles the conversion of string values to their appropriate ScalarDB data types
+ * and constructs a single-column key. The method ensures type safety and proper formatting of the
+ * key value according to the specified data type.
*
- * @param dataType the data type of the specified column
- * @param columnInfo the ScalarDB table column information
- * @param value the value for the ScalarDB key
- * @return a ScalarDB Key instance
- * @throws KeyParsingException if there is an error while creating the ScalarDB key
+ * @param dataType The data type of the specified column
+ * @param columnInfo The ScalarDB table column information
+ * @param value The string value to be converted and used as the key
+ * @return A ScalarDB Key instance containing the converted value
+ * @throws KeyParsingException If there's an error converting the value to the specified data type
+ * or creating the key
*/
public static Key createKey(DataType dataType, ColumnInfo columnInfo, String value)
throws KeyParsingException {
@@ -85,4 +151,66 @@ public static Key createKey(DataType dataType, ColumnInfo columnInfo, String val
throw new KeyParsingException(e.getMessage(), e);
}
}
+
+ /**
+ * Creates a new composite ScalarDB key from multiple columns.
+ *
+ *
This method creates a composite key by combining multiple columns, each with its own data
+ * type and value. The method requires that all input lists (dataTypes, columnNames, and values)
+ * have the same length. If the lists are not of equal length, an empty Optional is returned.
+ *
+ *
The method performs the following for each column:
+ *
+ *
+ * - Creates a ColumnInfo instance
+ *
- Converts the string value to the appropriate data type
+ *
- Adds the converted value to the composite key
+ *
+ *
+ * @param dataTypes List of data types for each column in the composite key
+ * @param columnNames List of column names corresponding to each data type
+ * @param values List of string values to be converted and used in the key
+ * @return An Optional containing the composite ScalarDB Key if successful, or empty if the input
+ * lists have different lengths
+ * @throws Base64Exception If there's an error processing Base64-encoded values
+ * @throws ColumnParsingException If there's an error converting any value to its specified data
+ * type
+ */
+ public static Optional createCompositeKey(
+ List dataTypes, List columnNames, List values)
+ throws Base64Exception, ColumnParsingException {
+ if (!CollectionUtil.areSameLength(dataTypes, columnNames, values)) {
+ return Optional.empty();
+ }
+ Key.Builder builder = Key.newBuilder();
+ for (int i = 0; i < dataTypes.size(); i++) {
+ ColumnInfo columnInfo = ColumnInfo.builder().columnName(columnNames.get(i)).build();
+ Column> keyValue =
+ ColumnUtils.createColumnFromValue(dataTypes.get(i), columnInfo, values.get(i));
+ builder.add(keyValue);
+ }
+ return Optional.of(builder.build());
+ }
+
+ private static Optional createKeyFromSource(
+ Set keyNames, Map columnDataTypes, JsonNode sourceRecord) {
+ List dataTypes = new ArrayList<>();
+ List columnNames = new ArrayList<>();
+ List values = new ArrayList<>();
+
+ for (String keyName : keyNames) {
+ if (!columnDataTypes.containsKey(keyName) || !sourceRecord.has(keyName)) {
+ return Optional.empty();
+ }
+ dataTypes.add(columnDataTypes.get(keyName));
+ columnNames.add(keyName);
+ values.add(sourceRecord.get(keyName).asText());
+ }
+
+ try {
+ return createCompositeKey(dataTypes, columnNames, values);
+ } catch (Base64Exception | ColumnParsingException e) {
+ return Optional.empty();
+ }
+ }
}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java
index bf4b4414af..bc629a99f3 100644
--- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java
@@ -5,7 +5,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.scalar.db.api.Result;
import com.scalar.db.api.TableMetadata;
+import com.scalar.db.common.ResultImpl;
+import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
+import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
+import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping;
+import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes;
import com.scalar.db.dataloader.core.util.DecimalUtil;
import com.scalar.db.io.BigIntColumn;
import com.scalar.db.io.BlobColumn;
@@ -15,10 +21,18 @@
import com.scalar.db.io.DoubleColumn;
import com.scalar.db.io.FloatColumn;
import com.scalar.db.io.IntColumn;
+import com.scalar.db.io.Key;
import com.scalar.db.io.TextColumn;
import com.scalar.db.transaction.consensuscommit.Attribute;
+import java.io.BufferedReader;
+import java.io.StringReader;
import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
/** Utils for the service unit tests */
public class UnitTestUtils {
@@ -224,4 +238,65 @@ public static String getSourceTestValue(DataType dataType) {
return TEST_VALUE_TEXT;
}
}
+
+ public static TableColumnDataTypes getTableColumnData() {
+ TableColumnDataTypes tableColumnDataTypes = new TableColumnDataTypes();
+ Map tableMetadataMap = new HashMap<>();
+ tableMetadataMap.put("namespace.table", createTestTableMetadata());
+ tableMetadataMap.forEach(
+ (name, metadata) ->
+ metadata
+ .getColumnDataTypes()
+ .forEach((k, v) -> tableColumnDataTypes.addColumnDataType(name, k, v)));
+ return tableColumnDataTypes;
+ }
+
+ public static ControlFile getControlFile() {
+ List controlFileTables = new ArrayList<>();
+ List mappings = new ArrayList<>();
+ mappings.add(new ControlFileTableFieldMapping("col1", "col1"));
+ mappings.add(new ControlFileTableFieldMapping("col2", "col2"));
+ mappings.add(new ControlFileTableFieldMapping("col3", "col3"));
+ mappings.add(new ControlFileTableFieldMapping("col4", "col4"));
+ mappings.add(new ControlFileTableFieldMapping("col5", "col5"));
+ mappings.add(new ControlFileTableFieldMapping("col6", "col6"));
+ mappings.add(new ControlFileTableFieldMapping("col7", "col7"));
+ controlFileTables.add(new ControlFileTable("namespace", "table", mappings));
+ return new ControlFile(controlFileTables);
+ }
+
+ public static BufferedReader getJsonReader() {
+ String jsonData =
+ "[{\"col1\":1,\"col2\":\"1\",\"col3\":\"1\",\"col4\":\"1.4e-45\",\"col5\":\"5e-324\",\"col6\":\"VALUE!!s\",\"col7\":\"0x626C6F6220746573742076616C7565\"}]";
+ return new BufferedReader(new StringReader(jsonData));
+ }
+
+ public static BufferedReader getJsonLinesReader() {
+ String jsonLinesData =
+ "{\"col1\":1,\"col2\":\"1\",\"col3\":\"1\",\"col4\":\"1.4e-45\",\"col5\":\"5e-324\",\"col6\":\"VALUE!!s\",\"col7\":\"0x626C6F6220746573742076616C7565\"}\n";
+ return new BufferedReader(new StringReader(jsonLinesData));
+ }
+
+ public static BufferedReader getCsvReader() {
+ String csvData =
+ "col1,col2,col3,col4,col5,col6,col7 \n"
+ + "1,1,1,1.4E-45,5e-324,VALUE!!s,0x626C6F6220746573742076616C7565 \n";
+ return new BufferedReader(new StringReader(csvData));
+ }
+
+ public static Key getClusteringKey() {
+ return Key.newBuilder()
+ .add(IntColumn.of("col2", 1))
+ .add(BooleanColumn.of("col3", true))
+ .build();
+ }
+
+ public static Key getPartitionKey(int j) {
+ return Key.ofBigInt("col1", j);
+ }
+
+ public static Optional getResult(long pk) {
+ Result data = new ResultImpl(createTestValues(), createTestTableMetadata());
+ return Optional.of(data);
+ }
}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java
new file mode 100644
index 0000000000..94acd20ace
--- /dev/null
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java
@@ -0,0 +1,121 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransaction;
+import com.scalar.db.api.DistributedTransactionManager;
+import com.scalar.db.api.TableMetadata;
+import com.scalar.db.dataloader.core.FileFormat;
+import com.scalar.db.dataloader.core.ScalarDBMode;
+import com.scalar.db.dataloader.core.UnitTestUtils;
+import com.scalar.db.dataloader.core.dataimport.ImportMode;
+import com.scalar.db.dataloader.core.dataimport.ImportOptions;
+import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.log.LogMode;
+import com.scalar.db.exception.transaction.TransactionException;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+class CsvImportProcessorTest {
+ @Mock private ImportProcessorParams params;
+ @Mock ScalarDBMode scalarDBMode;
+ @Mock ImportOptions importOptions;
+ @Mock Map tableMetadataByTableName;
+ @Mock TableColumnDataTypes tableColumnDataTypes;
+
+ ScalarDBDao dao;
+ @Mock DistributedStorage distributedStorage;
+ DistributedTransactionManager distributedTransactionManager;
+ CsvImportProcessor csvImportProcessor;
+
+ @BeforeEach
+ void setup() throws ScalarDBDaoException, TransactionException {
+ dao = Mockito.mock(ScalarDBDao.class);
+ distributedTransactionManager = mock(DistributedTransactionManager.class);
+ DistributedTransaction distributedTransaction = mock(DistributedTransaction.class);
+ when(distributedTransactionManager.start()).thenReturn(distributedTransaction);
+ tableMetadataByTableName = new HashMap<>();
+ tableMetadataByTableName.put("namespace.table", UnitTestUtils.createTestTableMetadata());
+ tableColumnDataTypes = UnitTestUtils.getTableColumnData();
+ importOptions =
+ ImportOptions.builder()
+ .importMode(ImportMode.UPSERT)
+ .fileFormat(FileFormat.CSV)
+ .controlFile(UnitTestUtils.getControlFile())
+ .controlFileValidationLevel(ControlFileValidationLevel.MAPPED)
+ .namespace("namespace")
+ .transactionBatchSize(1)
+ .dataChunkSize(5)
+ .tableName("table")
+ .logMode(LogMode.SINGLE_FILE)
+ .maxThreads(8)
+ .dataChunkQueueSize(256)
+ .build();
+ Mockito.when(
+ dao.get(
+ "namespace",
+ "table",
+ UnitTestUtils.getPartitionKey(1),
+ UnitTestUtils.getClusteringKey(),
+ distributedStorage))
+ .thenReturn(UnitTestUtils.getResult(1));
+ Mockito.when(
+ dao.get(
+ "namespace",
+ "table",
+ UnitTestUtils.getPartitionKey(1),
+ UnitTestUtils.getClusteringKey(),
+ distributedTransaction))
+ .thenReturn(UnitTestUtils.getResult(1));
+ }
+
+ @Test
+ void test_importProcessWithStorage() {
+ params =
+ ImportProcessorParams.builder()
+ .scalarDBMode(ScalarDBMode.STORAGE)
+ .importOptions(importOptions)
+ .dao(dao)
+ .distributedStorage(distributedStorage)
+ .distributedTransactionManager(distributedTransactionManager)
+ .scalarDBMode(scalarDBMode)
+ .tableColumnDataTypes(tableColumnDataTypes)
+ .tableMetadataByTableName(tableMetadataByTableName)
+ .build();
+ csvImportProcessor = new CsvImportProcessor(params);
+ Map statusList =
+ csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader());
+ assert statusList != null;
+ Assertions.assertEquals(1, statusList.size());
+ }
+
+ @Test
+ void test_importProcessWithTransaction() {
+ params =
+ ImportProcessorParams.builder()
+ .scalarDBMode(ScalarDBMode.TRANSACTION)
+ .importOptions(importOptions)
+ .dao(dao)
+ .distributedStorage(distributedStorage)
+ .distributedTransactionManager(distributedTransactionManager)
+ .scalarDBMode(scalarDBMode)
+ .tableColumnDataTypes(tableColumnDataTypes)
+ .tableMetadataByTableName(tableMetadataByTableName)
+ .build();
+ csvImportProcessor = new CsvImportProcessor(params);
+ Map statusList =
+ csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader());
+ assert statusList != null;
+ Assertions.assertEquals(1, statusList.size());
+ }
+}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactoryTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactoryTest.java
new file mode 100644
index 0000000000..3e6ed5bcc8
--- /dev/null
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactoryTest.java
@@ -0,0 +1,70 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+import com.scalar.db.dataloader.core.FileFormat;
+import com.scalar.db.dataloader.core.dataimport.ImportOptions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for {@link DefaultImportProcessorFactory} class. Tests the factory's ability to create
+ * appropriate import processors based on different file formats.
+ */
+class DefaultImportProcessorFactoryTest {
+
+ private DefaultImportProcessorFactory factory;
+
+ @BeforeEach
+ void setUp() {
+ factory = new DefaultImportProcessorFactory();
+ }
+
+ /**
+ * Tests that the factory creates a {@link JsonLinesImportProcessor} when JSONL format is
+ * specified.
+ */
+ @Test
+ void createImportProcessor_givenFileFormatIsJsonl_shouldReturnJsonLinesImportProcessor() {
+ // Arrange
+ ImportOptions importOptions = ImportOptions.builder().fileFormat(FileFormat.JSONL).build();
+ ImportProcessorParams params =
+ ImportProcessorParams.builder().importOptions(importOptions).build();
+
+ // Act
+ ImportProcessor result = factory.createImportProcessor(params);
+
+ // Assert
+ assertInstanceOf(JsonLinesImportProcessor.class, result);
+ }
+
+ /** Tests that the factory creates a {@link JsonImportProcessor} when JSON format is specified. */
+ @Test
+ void createImportProcessor_givenFileFormatIsJson_shouldReturnJsonImportProcessor() {
+ // Given
+ ImportOptions importOptions = ImportOptions.builder().fileFormat(FileFormat.JSON).build();
+ ImportProcessorParams params =
+ ImportProcessorParams.builder().importOptions(importOptions).build();
+
+ // When
+ ImportProcessor result = factory.createImportProcessor(params);
+
+ // Then
+ assertInstanceOf(JsonImportProcessor.class, result);
+ }
+
+ /** Tests that the factory creates a {@link CsvImportProcessor} when CSV format is specified. */
+ @Test
+ void createImportProcessor_givenFileFormatIsCsv_shouldReturnCsvImportProcessor() {
+ // Given
+ ImportOptions importOptions = ImportOptions.builder().fileFormat(FileFormat.CSV).build();
+ ImportProcessorParams params =
+ ImportProcessorParams.builder().importOptions(importOptions).build();
+
+ // When
+ ImportProcessor result = factory.createImportProcessor(params);
+
+ // Then
+ assertInstanceOf(CsvImportProcessor.class, result);
+ }
+}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java
new file mode 100644
index 0000000000..aa9a106a0c
--- /dev/null
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java
@@ -0,0 +1,121 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransaction;
+import com.scalar.db.api.DistributedTransactionManager;
+import com.scalar.db.api.TableMetadata;
+import com.scalar.db.dataloader.core.FileFormat;
+import com.scalar.db.dataloader.core.ScalarDBMode;
+import com.scalar.db.dataloader.core.UnitTestUtils;
+import com.scalar.db.dataloader.core.dataimport.ImportMode;
+import com.scalar.db.dataloader.core.dataimport.ImportOptions;
+import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.log.LogMode;
+import com.scalar.db.exception.transaction.TransactionException;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+class JsonImportProcessorTest {
+ @Mock private ImportProcessorParams params;
+ @Mock ScalarDBMode scalarDBMode;
+ @Mock ImportOptions importOptions;
+ @Mock Map tableMetadataByTableName;
+ @Mock TableColumnDataTypes tableColumnDataTypes;
+
+ ScalarDBDao dao;
+ @Mock DistributedStorage distributedStorage;
+ DistributedTransactionManager distributedTransactionManager;
+ JsonImportProcessor jsonImportProcessor;
+
+ @BeforeEach
+ void setup() throws ScalarDBDaoException, TransactionException {
+ dao = Mockito.mock(ScalarDBDao.class);
+ distributedTransactionManager = mock(DistributedTransactionManager.class);
+ DistributedTransaction distributedTransaction = mock(DistributedTransaction.class);
+ when(distributedTransactionManager.start()).thenReturn(distributedTransaction);
+ tableMetadataByTableName = new HashMap<>();
+ tableMetadataByTableName.put("namespace.table", UnitTestUtils.createTestTableMetadata());
+ tableColumnDataTypes = UnitTestUtils.getTableColumnData();
+ importOptions =
+ ImportOptions.builder()
+ .importMode(ImportMode.UPSERT)
+ .fileFormat(FileFormat.JSON)
+ .controlFile(UnitTestUtils.getControlFile())
+ .controlFileValidationLevel(ControlFileValidationLevel.MAPPED)
+ .namespace("namespace")
+ .transactionBatchSize(1)
+ .dataChunkSize(5)
+ .tableName("table")
+ .logMode(LogMode.SINGLE_FILE)
+ .maxThreads(8)
+ .dataChunkQueueSize(256)
+ .build();
+ Mockito.when(
+ dao.get(
+ "namespace",
+ "table",
+ UnitTestUtils.getPartitionKey(1),
+ UnitTestUtils.getClusteringKey(),
+ distributedStorage))
+ .thenReturn(UnitTestUtils.getResult(1));
+ Mockito.when(
+ dao.get(
+ "namespace",
+ "table",
+ UnitTestUtils.getPartitionKey(1),
+ UnitTestUtils.getClusteringKey(),
+ distributedTransaction))
+ .thenReturn(UnitTestUtils.getResult(1));
+ }
+
+ @Test
+ void test_importProcessWithStorage() {
+ params =
+ ImportProcessorParams.builder()
+ .scalarDBMode(ScalarDBMode.STORAGE)
+ .importOptions(importOptions)
+ .dao(dao)
+ .distributedStorage(distributedStorage)
+ .distributedTransactionManager(distributedTransactionManager)
+ .scalarDBMode(scalarDBMode)
+ .tableColumnDataTypes(tableColumnDataTypes)
+ .tableMetadataByTableName(tableMetadataByTableName)
+ .build();
+ jsonImportProcessor = new JsonImportProcessor(params);
+ Map statusList =
+ jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader());
+ assert statusList != null;
+ Assertions.assertEquals(1, statusList.size());
+ }
+
+ @Test
+ void test_importProcessWithTransaction() {
+ params =
+ ImportProcessorParams.builder()
+ .scalarDBMode(ScalarDBMode.TRANSACTION)
+ .importOptions(importOptions)
+ .dao(dao)
+ .distributedStorage(distributedStorage)
+ .distributedTransactionManager(distributedTransactionManager)
+ .scalarDBMode(scalarDBMode)
+ .tableColumnDataTypes(tableColumnDataTypes)
+ .tableMetadataByTableName(tableMetadataByTableName)
+ .build();
+ jsonImportProcessor = new JsonImportProcessor(params);
+ Map statusList =
+ jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader());
+ assert statusList != null;
+ Assertions.assertEquals(1, statusList.size());
+ }
+}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java
new file mode 100644
index 0000000000..e3db391756
--- /dev/null
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java
@@ -0,0 +1,121 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.scalar.db.api.DistributedStorage;
+import com.scalar.db.api.DistributedTransaction;
+import com.scalar.db.api.DistributedTransactionManager;
+import com.scalar.db.api.TableMetadata;
+import com.scalar.db.dataloader.core.FileFormat;
+import com.scalar.db.dataloader.core.ScalarDBMode;
+import com.scalar.db.dataloader.core.UnitTestUtils;
+import com.scalar.db.dataloader.core.dataimport.ImportMode;
+import com.scalar.db.dataloader.core.dataimport.ImportOptions;
+import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
+import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.log.LogMode;
+import com.scalar.db.exception.transaction.TransactionException;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+class JsonLinesImportProcessorTest {
+ @Mock private ImportProcessorParams params;
+ @Mock ScalarDBMode scalarDBMode;
+ @Mock ImportOptions importOptions;
+ @Mock Map tableMetadataByTableName;
+ @Mock TableColumnDataTypes tableColumnDataTypes;
+
+ ScalarDBDao dao;
+ @Mock DistributedStorage distributedStorage;
+ DistributedTransactionManager distributedTransactionManager;
+ JsonLinesImportProcessor jsonLinesImportProcessor;
+
+ @BeforeEach
+ void setup() throws ScalarDBDaoException, TransactionException {
+ dao = Mockito.mock(ScalarDBDao.class);
+ distributedTransactionManager = mock(DistributedTransactionManager.class);
+ DistributedTransaction distributedTransaction = mock(DistributedTransaction.class);
+ when(distributedTransactionManager.start()).thenReturn(distributedTransaction);
+ tableMetadataByTableName = new HashMap<>();
+ tableMetadataByTableName.put("namespace.table", UnitTestUtils.createTestTableMetadata());
+ tableColumnDataTypes = UnitTestUtils.getTableColumnData();
+ importOptions =
+ ImportOptions.builder()
+ .importMode(ImportMode.UPSERT)
+ .fileFormat(FileFormat.JSONL)
+ .controlFile(UnitTestUtils.getControlFile())
+ .controlFileValidationLevel(ControlFileValidationLevel.MAPPED)
+ .namespace("namespace")
+ .transactionBatchSize(1)
+ .dataChunkSize(5)
+ .tableName("table")
+ .maxThreads(8)
+ .dataChunkQueueSize(256)
+ .logMode(LogMode.SINGLE_FILE)
+ .build();
+ Mockito.when(
+ dao.get(
+ "namespace",
+ "table",
+ UnitTestUtils.getPartitionKey(1),
+ UnitTestUtils.getClusteringKey(),
+ distributedStorage))
+ .thenReturn(UnitTestUtils.getResult(1));
+ Mockito.when(
+ dao.get(
+ "namespace",
+ "table",
+ UnitTestUtils.getPartitionKey(1),
+ UnitTestUtils.getClusteringKey(),
+ distributedTransaction))
+ .thenReturn(UnitTestUtils.getResult(1));
+ }
+
+ @Test
+ void test_importProcessWithStorage() {
+ params =
+ ImportProcessorParams.builder()
+ .scalarDBMode(ScalarDBMode.STORAGE)
+ .importOptions(importOptions)
+ .dao(dao)
+ .distributedStorage(distributedStorage)
+ .distributedTransactionManager(distributedTransactionManager)
+ .scalarDBMode(scalarDBMode)
+ .tableColumnDataTypes(tableColumnDataTypes)
+ .tableMetadataByTableName(tableMetadataByTableName)
+ .build();
+ jsonLinesImportProcessor = new JsonLinesImportProcessor(params);
+ Map statusList =
+ jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader());
+ assert statusList != null;
+ Assertions.assertEquals(1, statusList.size());
+ }
+
+ @Test
+ void test_importProcessWithTransaction() {
+ params =
+ ImportProcessorParams.builder()
+ .scalarDBMode(ScalarDBMode.TRANSACTION)
+ .importOptions(importOptions)
+ .dao(dao)
+ .distributedStorage(distributedStorage)
+ .distributedTransactionManager(distributedTransactionManager)
+ .scalarDBMode(scalarDBMode)
+ .tableColumnDataTypes(tableColumnDataTypes)
+ .tableMetadataByTableName(tableMetadataByTableName)
+ .build();
+ jsonLinesImportProcessor = new JsonLinesImportProcessor(params);
+ Map statusList =
+ jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader());
+ assert statusList != null;
+ Assertions.assertEquals(1, statusList.size());
+ }
+}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypesTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypesTest.java
new file mode 100644
index 0000000000..687397523f
--- /dev/null
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypesTest.java
@@ -0,0 +1,49 @@
+package com.scalar.db.dataloader.core.dataimport.processor;
+
+import com.scalar.db.io.DataType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for the TableColumnDataTypes class which manages data type mappings for table columns.
+ */
+class TableColumnDataTypesTest {
+
+ TableColumnDataTypes tableColumnDataTypes;
+
+ /**
+ * Tests that column data types can be successfully added and retrieved for a table. Verifies that
+ * the correct data type is returned for a specific column after adding multiple column
+ * definitions.
+ */
+ @Test
+ void addColumnDataType_withValidData_shouldAddColumnDataType() {
+ tableColumnDataTypes = new TableColumnDataTypes();
+ tableColumnDataTypes.addColumnDataType("table", "id", DataType.BIGINT);
+ tableColumnDataTypes.addColumnDataType("table", "name", DataType.TEXT);
+ Assertions.assertEquals(
+ DataType.BIGINT, tableColumnDataTypes.getColumnDataTypes("table").get("id"));
+ }
+
+ /**
+ * Tests the retrieval of a data type for a specific table and column combination. Verifies that
+ * the correct data type is returned when the table and column exist in the mapping.
+ */
+ @Test
+ void getDataType_withValidTableAndColumnName_shouldReturnCorrectDataType() {
+ tableColumnDataTypes = new TableColumnDataTypes();
+ tableColumnDataTypes.addColumnDataType("table", "id", DataType.BIGINT);
+ tableColumnDataTypes.addColumnDataType("table", "name", DataType.TEXT);
+ Assertions.assertEquals(DataType.TEXT, tableColumnDataTypes.getDataType("table", "name"));
+ }
+
+ /**
+ * Tests the behavior when attempting to retrieve a data type for a non-existent table and column
+ * combination. Verifies that null is returned when the requested mapping doesn't exist.
+ */
+ @Test
+ void getDataType_withInvalidTableAndColumnName_shouldReturnCorrectDataType() {
+ tableColumnDataTypes = new TableColumnDataTypes();
+ Assertions.assertNull(tableColumnDataTypes.getDataType("table", "name"));
+ }
+}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java
index cd47243b16..997bf4d84a 100644
--- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java
@@ -1,9 +1,16 @@
package com.scalar.db.dataloader.core.util;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.scalar.db.api.Result;
+import com.scalar.db.api.TableMetadata;
+import com.scalar.db.common.ResultImpl;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.ColumnInfo;
+import com.scalar.db.dataloader.core.UnitTestUtils;
+import com.scalar.db.dataloader.core.exception.Base64Exception;
import com.scalar.db.dataloader.core.exception.ColumnParsingException;
import com.scalar.db.io.BigIntColumn;
import com.scalar.db.io.BlobColumn;
@@ -16,16 +23,33 @@
import com.scalar.db.io.TextColumn;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
+import java.util.List;
+import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+/**
+ * Unit tests for the ColumnUtils class which handles column creation and manipulation. Tests
+ * various data type conversions and error handling scenarios.
+ */
class ColumnUtilsTest {
private static final float FLOAT_VALUE = 2.78f;
+ private static final TableMetadata mockMetadata = UnitTestUtils.createTestTableMetadata();
+ private static final ObjectNode sourceRecord = UnitTestUtils.getOutputDataWithMetadata();
+ private static final Map> values = UnitTestUtils.createTestValues();
+ private static final Result scalarDBResult = new ResultImpl(values, mockMetadata);
+ /**
+ * Provides test cases for column creation with different data types and values. Each test case
+ * includes: - The target DataType - Column name - Input value (as string) - Expected Column
+ * object
+ *
+ * @return Stream of Arguments containing test parameters
+ */
private static Stream provideColumnsForCreateColumnFromValue() {
return Stream.of(
Arguments.of(DataType.BOOLEAN, "boolColumn", "true", BooleanColumn.of("boolColumn", true)),
@@ -64,6 +88,16 @@ private static Stream provideColumnsForCreateColumnFromValue() {
Arguments.of(DataType.BLOB, "blobColumn", null, BlobColumn.ofNull("blobColumn")));
}
+ /**
+ * Tests column creation from string values for various data types. Verifies that the created
+ * column matches the expected column with correct type and value.
+ *
+ * @param dataType The target ScalarDB data type
+ * @param columnName Name of the column
+ * @param value String value to convert
+ * @param expectedColumn Expected Column object after conversion
+ * @throws ColumnParsingException if the value cannot be parsed into the target data type
+ */
@ParameterizedTest
@MethodSource("provideColumnsForCreateColumnFromValue")
void createColumnFromValue_validInput_returnsColumn(
@@ -74,6 +108,10 @@ void createColumnFromValue_validInput_returnsColumn(
assertEquals(expectedColumn, actualColumn);
}
+ /**
+ * Tests that attempting to create a numeric column with an invalid number format throws a
+ * ColumnParsingException with appropriate error message.
+ */
@Test
void createColumnFromValue_invalidNumberFormat_throwsNumberFormatException() {
String columnName = "intColumn";
@@ -90,6 +128,10 @@ void createColumnFromValue_invalidNumberFormat_throwsNumberFormatException() {
exception.getMessage());
}
+ /**
+ * Tests that attempting to create a BLOB column with invalid Base64 encoding throws a
+ * ColumnParsingException with appropriate error message.
+ */
@Test
void createColumnFromValue_invalidBase64_throwsBase64Exception() {
String columnName = "blobColumn";
@@ -105,4 +147,20 @@ void createColumnFromValue_invalidBase64_throwsBase64Exception() {
columnName, "table", "ns"),
exception.getMessage());
}
+
+ /**
+ * Tests the extraction of columns from a ScalarDB Result object. Verifies that all columns are
+ * correctly extracted and converted from the source record.
+ *
+ * @throws Base64Exception if BLOB data contains invalid Base64 encoding
+ * @throws ColumnParsingException if any column value cannot be parsed into its target data type
+ */
+ @Test
+ void getColumnsFromResult_withValidData_shouldReturnColumns()
+ throws Base64Exception, ColumnParsingException {
+ List> columns =
+ ColumnUtils.getColumnsFromResult(scalarDBResult, sourceRecord, false, mockMetadata);
+ System.out.println(columns);
+ assertEquals(4, columns.size());
+ }
}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java
index f2fe680490..eb19b12c85 100644
--- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java
@@ -1,12 +1,16 @@
package com.scalar.db.dataloader.core.util;
-import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.Mockito.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.ColumnInfo;
import com.scalar.db.dataloader.core.ColumnKeyValue;
+import com.scalar.db.dataloader.core.UnitTestUtils;
import com.scalar.db.dataloader.core.exception.KeyParsingException;
import com.scalar.db.io.BigIntColumn;
import com.scalar.db.io.BlobColumn;
@@ -19,21 +23,38 @@
import com.scalar.db.io.TextColumn;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+/**
+ * Unit tests for the KeyUtils class which handles parsing and creation of database keys. Tests
+ * cover various data types and key creation scenarios including partition and clustering keys.
+ */
@ExtendWith(MockitoExtension.class)
class KeyUtilsTest {
@Mock private TableMetadata tableMetadata;
+ private static final Map dataTypeByColumnName = UnitTestUtils.getColumnData();
+ private static final ObjectNode sourceRecord = UnitTestUtils.getOutputDataWithMetadata();
+ /** Tests that parsing a null key value returns null. */
@Test
void parseKeyValue_nullKeyValue_returnsNull() throws KeyParsingException {
assertNull(KeyUtils.parseKeyValue(null, null, null, tableMetadata));
}
+ /**
+ * Tests that attempting to parse a key value with an invalid column name throws
+ * KeyParsingException. The exception should contain appropriate error message with namespace and
+ * table details.
+ */
@Test
void parseKeyValue_invalidColumnName_throwsKeyParsingException() {
String columnName = "invalidColumn";
@@ -50,6 +71,7 @@ void parseKeyValue_invalidColumnName_throwsKeyParsingException() {
exception.getMessage());
}
+ /** Tests successful parsing of a valid key value with TEXT data type. */
@Test
void parseKeyValue_validKeyValue_returnsKey() throws KeyParsingException {
String columnName = "columnName";
@@ -64,6 +86,7 @@ void parseKeyValue_validKeyValue_returnsKey() throws KeyParsingException {
assertEquals(expected, actual);
}
+ /** Tests creation of a key with BOOLEAN data type. */
@Test
void createKey_boolean_returnsKey() throws KeyParsingException {
String columnName = "booleanColumn";
@@ -74,6 +97,7 @@ void createKey_boolean_returnsKey() throws KeyParsingException {
assertEquals(expected, actual);
}
+ /** Tests creation of a key with INT data type. */
@Test
void createKey_int_returnsKey() throws KeyParsingException {
String columnName = "intColumn";
@@ -84,6 +108,7 @@ void createKey_int_returnsKey() throws KeyParsingException {
assertEquals(expected, actual);
}
+ /** Tests creation of a key with BIGINT data type. */
@Test
void createKey_bigint_returnsKey() throws KeyParsingException {
String columnName = "bigintColumn";
@@ -94,6 +119,7 @@ void createKey_bigint_returnsKey() throws KeyParsingException {
assertEquals(expected, actual);
}
+ /** Tests creation of a key with FLOAT data type. */
@Test
void createKey_float_returnsKey() throws KeyParsingException {
String columnName = "floatColumn";
@@ -104,6 +130,7 @@ void createKey_float_returnsKey() throws KeyParsingException {
assertEquals(expected, actual);
}
+ /** Tests creation of a key with DOUBLE data type. */
@Test
void createKey_double_returnsKey() throws KeyParsingException {
String columnName = "doubleColumn";
@@ -114,6 +141,7 @@ void createKey_double_returnsKey() throws KeyParsingException {
assertEquals(expected, actual);
}
+ /** Tests creation of a key with TEXT data type. */
@Test
void createKey_text_returnsKey() throws KeyParsingException {
String columnName = "textColumn";
@@ -124,6 +152,7 @@ void createKey_text_returnsKey() throws KeyParsingException {
assertEquals(expected, actual);
}
+ /** Tests creation of a key with BLOB data type using Base64 encoded input. */
@Test
void createKey_blob_returnsKey() throws KeyParsingException {
String columnName = "blobColumn";
@@ -138,6 +167,10 @@ void createKey_blob_returnsKey() throws KeyParsingException {
assertEquals(expected, actual);
}
+ /**
+ * Tests that attempting to create a BLOB key with invalid Base64 input throws
+ * KeyParsingException.
+ */
@Test
void createKey_invalidBase64_throwsBase64Exception() {
String columnName = "blobColumn";
@@ -146,4 +179,54 @@ void createKey_invalidBase64_throwsBase64Exception() {
assertThrows(
KeyParsingException.class, () -> KeyUtils.createKey(DataType.BLOB, columnInfo, value));
}
+
+ /** Tests that creating a clustering key from an empty set returns an empty Optional. */
+ @Test
+ void createClusteringKeyFromSource_withEmptyClusteringKeySet_shouldReturnEmpty() {
+ Optional key = KeyUtils.createClusteringKeyFromSource(Collections.EMPTY_SET, null, null);
+ assertEquals(Optional.empty(), key);
+ }
+
+ /**
+ * Tests creation of a clustering key from a valid set of clustering columns. Verifies that the
+ * resulting key contains the expected INT and BOOLEAN values.
+ */
+ @Test
+ void createClusteringKeyFromSource_withValidClusteringKeySet_shouldReturnValidKey() {
+ Set clusterKeySet = new HashSet<>();
+ clusterKeySet.add(UnitTestUtils.TEST_COLUMN_2_CK);
+ clusterKeySet.add(UnitTestUtils.TEST_COLUMN_3_CK);
+ Optional key =
+ KeyUtils.createClusteringKeyFromSource(clusterKeySet, dataTypeByColumnName, sourceRecord);
+ assertEquals(
+ "Optional[Key{IntColumn{name=col2, value=2147483647, hasNullValue=false}, BooleanColumn{name=col3, value=true, hasNullValue=false}}]",
+ key.toString());
+ }
+
+ /**
+ * Tests that attempting to create a partition key with invalid data returns an empty Optional.
+ */
+ @Test
+ void createPartitionKeyFromSource_withInvalidData_shouldReturnEmpty() {
+ Set partitionKeySet = new HashSet<>();
+ partitionKeySet.add("id1");
+ Optional key =
+ KeyUtils.createPartitionKeyFromSource(partitionKeySet, dataTypeByColumnName, sourceRecord);
+ assertEquals(Optional.empty(), key);
+ }
+
+ /**
+ * Tests creation of a partition key from valid data. Verifies that the resulting key contains the
+ * expected BIGINT value.
+ */
+ @Test
+ void createPartitionKeyFromSource_withValidData_shouldReturnValidKey() {
+ Set partitionKeySet = new HashSet<>();
+ partitionKeySet.add(UnitTestUtils.TEST_COLUMN_1_PK);
+ Optional key =
+ KeyUtils.createPartitionKeyFromSource(partitionKeySet, dataTypeByColumnName, sourceRecord);
+ assertEquals(
+ "Optional[Key{BigIntColumn{name=col1, value=9007199254740992, hasNullValue=false}}]",
+ key.toString());
+ }
}