diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 29b34dd033..23d66b3797 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -824,6 +824,30 @@ public enum CoreError implements ScalarDbError { ""), DATA_LOADER_FILE_FORMAT_NOT_SUPPORTED( Category.USER_ERROR, "0178", "The provided file format is not supported : %s", "", ""), + DATA_LOADER_COULD_NOT_FIND_PARTITION_KEY( + Category.USER_ERROR, "0179", "Could not find the partition key", "", ""), + DATA_LOADER_UPSERT_INSERT_MISSING_COLUMNS( + Category.USER_ERROR, + "0180", + "The source record needs to contain all fields if the UPSERT turns into an INSERT", + "", + ""), + DATA_LOADER_DATA_ALREADY_EXISTS(Category.USER_ERROR, "0181", "Record already exists", "", ""), + DATA_LOADER_DATA_NOT_FOUND(Category.USER_ERROR, "0182", "Record was not found", "", ""), + DATA_LOADER_COULD_NOT_FIND_CLUSTERING_KEY( + Category.USER_ERROR, "0183", "Could not find the clustering key", "", ""), + DATA_LOADER_TABLE_METADATA_MISSING( + Category.USER_ERROR, "0184", "No table metadata found", "", ""), + DATA_LOADER_MISSING_SOURCE_FIELD( + Category.USER_ERROR, + "0185", + "The data mapping source field '%s' for table '%s' is missing in the json data record", + "", + ""), + DATA_LOADER_CSV_DATA_MISMATCH( + Category.USER_ERROR, "0186", "The CSV row: %s does not match header: %s.", "", ""), + DATA_LOADER_JSON_CONTENT_START_ERROR( + Category.USER_ERROR, "0187", "Expected JSON file content to be an array", "", ""), // // Errors for the concurrency error category @@ -1087,6 +1111,20 @@ public enum CoreError implements ScalarDbError { "Something went wrong while scanning. Are you sure you are running in the correct transaction mode? Details: %s", "", ""), + DATA_LOADER_CSV_FILE_READ_FAILED( + Category.INTERNAL_ERROR, "0049", "Failed to read CSV file. Details: %s.", "", ""), + DATA_LOADER_CSV_FILE_HEADER_READ_FAILED( + Category.INTERNAL_ERROR, "0050", "Failed to CSV read header line. Details: %s.", "", ""), + DATA_LOADER_DATA_CHUNK_PROCESS_FAILED( + Category.INTERNAL_ERROR, + "0051", + "Data chunk processing was interrupted. Details: %s", + "", + ""), + DATA_LOADER_JSON_FILE_READ_FAILED( + Category.INTERNAL_ERROR, "0052", "Failed to read JSON file. Details: %s.", "", ""), + DATA_LOADER_JSONLINES_FILE_READ_FAILED( + Category.INTERNAL_ERROR, "0053", "Failed to read JSON Lines file. Details: %s.", "", ""), // // Errors for the unknown transaction status error category diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java new file mode 100644 index 0000000000..8081931c50 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java @@ -0,0 +1,61 @@ +package com.scalar.db.dataloader.core.dataimport; + +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; + +/** + * Listener interface for monitoring import events during the data loading process. Implementations + * can use this to track progress and handle various stages of the import process. + */ +public interface ImportEventListener { + + /** + * Called when processing of a data chunk begins. + * + * @param status the current status of the data chunk being processed + */ + void onDataChunkStarted(ImportDataChunkStatus status); + + /** + * Updates or adds new status information for a data chunk. + * + * @param status the updated status information for the data chunk + */ + void addOrUpdateDataChunkStatus(ImportDataChunkStatus status); + + /** + * Called when processing of a data chunk is completed. + * + * @param status the final status of the completed data chunk + */ + void onDataChunkCompleted(ImportDataChunkStatus status); + + /** + * Called when all data chunks have been processed. This indicates that the entire chunked import + * process is complete. + */ + void onAllDataChunksCompleted(); + + /** + * Called when processing of a transaction batch begins. + * + * @param batchStatus the initial status of the transaction batch + */ + void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus); + + /** + * Called when processing of a transaction batch is completed. + * + * @param batchResult the result of the completed transaction batch + */ + void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult); + + /** + * Called when an import task is completed. + * + * @param taskResult the result of the completed import task + */ + void onTaskComplete(ImportTaskResult taskResult); +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java new file mode 100644 index 0000000000..f1984d6c26 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java @@ -0,0 +1,183 @@ +package com.scalar.db.dataloader.core.dataimport; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.ScalarDBMode; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessor; +import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory; +import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorParams; +import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import java.io.BufferedReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import lombok.AllArgsConstructor; +import lombok.NonNull; + +/** + * Manages the data import process and coordinates event handling between the import processor and + * listeners. This class implements {@link ImportEventListener} to receive events from the processor + * and relay them to registered listeners. + * + *

The import process involves: + * + *

+ */ +@AllArgsConstructor +public class ImportManager implements ImportEventListener { + + @NonNull private final Map tableMetadata; + @NonNull private final BufferedReader importFileReader; + @NonNull private final ImportOptions importOptions; + private final ImportProcessorFactory importProcessorFactory; + private final List listeners = new ArrayList<>(); + private final ScalarDBMode scalarDBMode; + private final DistributedStorage distributedStorage; + private final DistributedTransactionManager distributedTransactionManager; + private final ConcurrentHashMap importDataChunkStatusMap = + new ConcurrentHashMap<>(); + + /** + * Starts the import process using the configured parameters. + * + *

If the data chunk size in {@link ImportOptions} is set to 0, the entire file will be + * processed as a single chunk. Otherwise, the file will be processed in chunks of the specified + * size. + * + * @return a map of {@link ImportDataChunkStatus} objects containing the status of each processed + * chunk + */ + public ConcurrentHashMap startImport() { + ImportProcessorParams params = + ImportProcessorParams.builder() + .scalarDBMode(scalarDBMode) + .importOptions(importOptions) + .tableMetadataByTableName(tableMetadata) + .dao(new ScalarDBDao()) + .distributedTransactionManager(distributedTransactionManager) + .distributedStorage(distributedStorage) + .tableColumnDataTypes(getTableColumnDataTypes()) + .build(); + ImportProcessor processor = importProcessorFactory.createImportProcessor(params); + processor.addListener(this); + // If the data chunk size is 0, then process the entire file in a single data chunk + int dataChunkSize = + importOptions.getDataChunkSize() == 0 + ? Integer.MAX_VALUE + : importOptions.getDataChunkSize(); + return processor.process( + dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader); + } + + /** + * Registers a new listener to receive import events. + * + * @param listener the listener to add + * @throws IllegalArgumentException if the listener is null + */ + public void addListener(ImportEventListener listener) { + listeners.add(listener); + } + + /** + * Removes a previously registered listener. + * + * @param listener the listener to remove + */ + public void removeListener(ImportEventListener listener) { + listeners.remove(listener); + } + + /** {@inheritDoc} Forwards the event to all registered listeners. */ + @Override + public void onDataChunkStarted(ImportDataChunkStatus status) { + for (ImportEventListener listener : listeners) { + listener.onDataChunkStarted(status); + } + } + + /** + * {@inheritDoc} Updates or adds the status of a data chunk in the status map. This method is + * thread-safe. + */ + @Override + public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) { + importDataChunkStatusMap.put(status.getDataChunkId(), status); + } + + /** {@inheritDoc} Forwards the event to all registered listeners. */ + @Override + public void onDataChunkCompleted(ImportDataChunkStatus status) { + for (ImportEventListener listener : listeners) { + listener.onDataChunkCompleted(status); + } + } + + /** {@inheritDoc} Forwards the event to all registered listeners. */ + @Override + public void onTransactionBatchStarted(ImportTransactionBatchStatus status) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchStarted(status); + } + } + + /** {@inheritDoc} Forwards the event to all registered listeners. */ + @Override + public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchCompleted(batchResult); + } + } + + /** {@inheritDoc} Forwards the event to all registered listeners. */ + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + for (ImportEventListener listener : listeners) { + listener.onTaskComplete(taskResult); + } + } + + /** {@inheritDoc} Forwards the event to all registered listeners. */ + @Override + public void onAllDataChunksCompleted() { + for (ImportEventListener listener : listeners) { + listener.onAllDataChunksCompleted(); + } + } + + /** + * Returns the current map of import data chunk status objects. + * + * @return a map of {@link ImportDataChunkStatus} objects + */ + public ConcurrentHashMap getImportDataChunkStatus() { + return importDataChunkStatusMap; + } + + /** + * Creates and returns a mapping of table column data types from the table metadata. + * + * @return a {@link TableColumnDataTypes} object containing the column data types for all tables + */ + public TableColumnDataTypes getTableColumnDataTypes() { + TableColumnDataTypes tableColumnDataTypes = new TableColumnDataTypes(); + tableMetadata.forEach( + (name, metadata) -> + metadata + .getColumnDataTypes() + .forEach((k, v) -> tableColumnDataTypes.addColumnDataType(name, k, v))); + return tableColumnDataTypes; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java index 9cb6225d30..6d3206765e 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java @@ -35,4 +35,5 @@ public class ImportOptions { private final String tableName; private final int maxThreads; private final String customHeaderRow; + private final int dataChunkQueueSize; } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java new file mode 100644 index 0000000000..0c68d5e566 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java @@ -0,0 +1,207 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A processor for importing CSV data into the database. + * + *

This class handles the processing of CSV files by: + * + *

    + *
  • Reading and parsing CSV data with configurable delimiters + *
  • Processing data in configurable chunk sizes for efficient batch processing + *
  • Supporting parallel processing using multiple threads + *
  • Converting CSV rows into JSON format for database import + *
+ * + *

The processor supports custom headers and validates that each data row matches the header + * structure before processing. + */ +public class CsvImportProcessor extends ImportProcessor { + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); + + /** + * Creates a new CsvImportProcessor with the specified parameters. + * + * @param params Configuration parameters for the import processor + */ + public CsvImportProcessor(ImportProcessorParams params) { + super(params); + } + + /** + * Processes the source data from the given import file. + * + *

This method reads data from the provided {@link BufferedReader}, processes it in chunks, and + * batches transactions according to the specified sizes. The method returns a list of {@link + * ImportDataChunkStatus} objects, each representing the status of a processed data chunk. + * + * @param dataChunkSize the number of records to include in each data chunk + * @param transactionBatchSize the number of records to include in each transaction batch + * @param reader the {@link BufferedReader} used to read the source file + * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each + * data chunk + */ + @Override + public ConcurrentHashMap process( + int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); + BlockingQueue dataChunkQueue = + new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); + + try { + CompletableFuture readerFuture = + CompletableFuture.runAsync( + () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor); + + ConcurrentHashMap result = new ConcurrentHashMap<>(); + + while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) { + ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS); + if (dataChunk != null) { + ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize); + result.put(status.getDataChunkId(), status); + } + } + readerFuture.join(); + return result; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e); + } finally { + dataChunkExecutor.shutdown(); + try { + if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + dataChunkExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + dataChunkExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + notifyAllDataChunksCompleted(); + } + } + + /** + * Reads and processes CSV data in chunks from the provided reader. + * + *

This method: + * + *

    + *
  • Reads the CSV header (custom or from file) + *
  • Validates each data row against the header + *
  • Converts rows to JSON format + *
  • Batches rows into data chunks + *
  • Enqueues chunks for processing + *
+ * + * @param reader the BufferedReader containing CSV data + * @param dataChunkSize the number of rows to include in each chunk + * @param dataChunkQueue the queue where data chunks are placed for processing + * @throws RuntimeException if there are errors reading the file or if interrupted + */ + private void readDataChunks( + BufferedReader reader, int dataChunkSize, BlockingQueue dataChunkQueue) { + try { + String delimiter = + Optional.of(params.getImportOptions().getDelimiter()) + .map(c -> Character.toString(c).trim()) + .filter(s -> !s.isEmpty()) + .orElse(","); + + String header = + Optional.ofNullable(params.getImportOptions().getCustomHeaderRow()) + .orElseGet(() -> safeReadLine(reader)); + + String[] headerArray = header.split(delimiter); + List currentDataChunk = new ArrayList<>(); + String line; + int rowNumber = 1; + while ((line = reader.readLine()) != null) { + String[] dataArray = line.split(delimiter); + if (headerArray.length != dataArray.length) { + throw new IllegalArgumentException( + CoreError.DATA_LOADER_CSV_DATA_MISMATCH.buildMessage(line, header)); + } + JsonNode jsonNode = combineHeaderAndData(headerArray, dataArray); + if (jsonNode.isEmpty()) continue; + + currentDataChunk.add(new ImportRow(rowNumber++, jsonNode)); + if (currentDataChunk.size() == dataChunkSize) { + enqueueDataChunk(currentDataChunk, dataChunkQueue); + currentDataChunk = new ArrayList<>(); + } + } + if (!currentDataChunk.isEmpty()) enqueueDataChunk(currentDataChunk, dataChunkQueue); + } catch (IOException | InterruptedException e) { + throw new RuntimeException( + CoreError.DATA_LOADER_CSV_FILE_READ_FAILED.buildMessage(e.getMessage()), e); + } + } + + /** + * Adds a completed data chunk to the processing queue. + * + * @param dataChunk the list of ImportRows to be processed + * @param queue the queue where the chunk should be placed + * @throws InterruptedException if the thread is interrupted while waiting to add to the queue + */ + private void enqueueDataChunk(List dataChunk, BlockingQueue queue) + throws InterruptedException { + int dataChunkId = dataChunkIdCounter.getAndIncrement(); + queue.put(ImportDataChunk.builder().dataChunkId(dataChunkId).sourceData(dataChunk).build()); + } + + /** + * Safely reads a line from the BufferedReader, handling IOExceptions. + * + * @param reader the BufferedReader to read from + * @return the line read from the reader + * @throws UncheckedIOException if an IOException occurs while reading + */ + private String safeReadLine(BufferedReader reader) { + try { + return reader.readLine(); + } catch (IOException e) { + throw new UncheckedIOException( + CoreError.DATA_LOADER_CSV_FILE_HEADER_READ_FAILED.buildMessage(e.getMessage()), e); + } + } + + /** + * Combines header fields with data values to create a JSON object. + * + * @param header array of header field names + * @param data array of data values corresponding to the header fields + * @return a JsonNode containing the combined header-value pairs + */ + private JsonNode combineHeaderAndData(String[] header, String[] data) { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + for (int i = 0; i < header.length; i++) { + objectNode.put(header[i], data[i]); + } + return objectNode; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactory.java new file mode 100644 index 0000000000..d40222d9a7 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactory.java @@ -0,0 +1,47 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.scalar.db.common.error.CoreError; + +/** + * A factory class that creates appropriate ImportProcessor instances based on the input file + * format. This factory implements the ImportProcessorFactory interface and provides a default + * implementation for creating processors that handle different file formats (JSON, JSONL, CSV). + */ +public class DefaultImportProcessorFactory implements ImportProcessorFactory { + + /** + * Creates an appropriate ImportProcessor instance based on the file format specified in the + * import parameters. + * + * @param params ImportProcessorParams containing configuration and import options, including the + * file format + * @return An ImportProcessor instance configured for the specified file format + * @throws IllegalArgumentException if the specified file format is not supported + *

Supported file formats: + *

    + *
  • JSONL - Creates a JsonLinesImportProcessor for JSON Lines format + *
  • JSON - Creates a JsonImportProcessor for JSON format + *
  • CSV - Creates a CsvImportProcessor for CSV format + *
+ */ + @Override + public ImportProcessor createImportProcessor(ImportProcessorParams params) { + ImportProcessor importProcessor; + switch (params.getImportOptions().getFileFormat()) { + case JSONL: + importProcessor = new JsonLinesImportProcessor(params); + break; + case JSON: + importProcessor = new JsonImportProcessor(params); + break; + case CSV: + importProcessor = new CsvImportProcessor(params); + break; + default: + throw new IllegalArgumentException( + CoreError.DATA_LOADER_FILE_FORMAT_NOT_SUPPORTED.buildMessage( + params.getImportOptions().getFileFormat().toString())); + } + return importProcessor; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java new file mode 100644 index 0000000000..1a317a1a82 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -0,0 +1,453 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.dataloader.core.ScalarDBMode; +import com.scalar.db.dataloader.core.dataimport.ImportEventListener; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatusState; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; +import com.scalar.db.dataloader.core.dataimport.task.ImportStorageTask; +import com.scalar.db.dataloader.core.dataimport.task.ImportTaskParams; +import com.scalar.db.dataloader.core.dataimport.task.ImportTransactionalTask; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatch; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import com.scalar.db.exception.transaction.TransactionException; +import java.io.BufferedReader; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.RequiredArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract class that handles the processing of data imports into ScalarDB. This processor + * supports both transactional and non-transactional (storage) modes and provides event notification + * capabilities for monitoring the import process. + */ +@RequiredArgsConstructor +public abstract class ImportProcessor { + + final ImportProcessorParams params; + private static final Logger LOGGER = LoggerFactory.getLogger(ImportProcessor.class); + private final List listeners = new ArrayList<>(); + + /** + * Processes the source data from the given import file. + * + *

This method reads data from the provided {@link BufferedReader}, processes it in chunks, and + * batches transactions according to the specified sizes. The processing can be done in either + * transactional or storage mode, depending on the configured {@link ScalarDBMode}. + * + * @param dataChunkSize the number of records to include in each data chunk for parallel + * processing + * @param transactionBatchSize the number of records to group together in a single transaction + * (only used in transaction mode) + * @param reader the {@link BufferedReader} used to read the source file + * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status and + * results of each data chunk + */ + public abstract ConcurrentHashMap process( + int dataChunkSize, int transactionBatchSize, BufferedReader reader); + + /** + * Add import event listener to listener list + * + * @param listener import event listener + */ + public void addListener(ImportEventListener listener) { + listeners.add(listener); + } + + /** + * Remove import event listener from listener list + * + * @param listener import event listener + */ + public void removeListener(ImportEventListener listener) { + listeners.remove(listener); + } + + /** + * Notify once the task is completed + * + * @param result task result object + */ + protected void notifyStorageRecordCompleted(ImportTaskResult result) { + // Add data to summary, success logs with/without raw data + for (ImportEventListener listener : listeners) { + listener.onTaskComplete(result); + } + } + + /** + * Notify once the data chunk process is started + * + * @param status data chunk status object + */ + protected void notifyDataChunkStarted(ImportDataChunkStatus status) { + for (ImportEventListener listener : listeners) { + listener.onDataChunkStarted(status); + listener.addOrUpdateDataChunkStatus(status); + } + } + + /** + * Notify once the data chunk process is completed + * + * @param status data chunk status object + */ + protected void notifyDataChunkCompleted(ImportDataChunkStatus status) { + for (ImportEventListener listener : listeners) { + listener.onDataChunkCompleted(status); + listener.addOrUpdateDataChunkStatus(status); + } + } + + /** + * Notify once the import transaction batch is started + * + * @param batchStatus import transaction batch status object + */ + protected void notifyTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchStarted(batchStatus); + } + } + + /** + * Notify once the import transaction batch is completed + * + * @param batchResult import transaction batch result object + */ + protected void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchCompleted(batchResult); + } + } + + /** Notify when all data chunks processes are completed */ + protected void notifyAllDataChunksCompleted() { + for (ImportEventListener listener : listeners) { + listener.onAllDataChunksCompleted(); + } + } + + /** + * Splits a data chunk into smaller transaction batches for processing. This method is used in + * transaction mode to group records together for atomic processing. + * + * @param dataChunk the data chunk to split into batches + * @param batchSize the maximum number of records per transaction batch + * @return a list of {@link ImportTransactionBatch} objects representing the split batches + */ + private List splitIntoTransactionBatches( + ImportDataChunk dataChunk, int batchSize) { + List transactionBatches = new ArrayList<>(); + AtomicInteger transactionBatchIdCounter = new AtomicInteger(0); + + List importRows = dataChunk.getSourceData(); + for (int i = 0; i < importRows.size(); i += batchSize) { + int endIndex = Math.min(i + batchSize, importRows.size()); + List transactionBatchData = importRows.subList(i, endIndex); + int transactionBatchId = transactionBatchIdCounter.getAndIncrement(); + ImportTransactionBatch transactionBatch = + ImportTransactionBatch.builder() + .transactionBatchId(transactionBatchId) + .sourceData(transactionBatchData) + .build(); + transactionBatches.add(transactionBatch); + } + return transactionBatches; + } + + /** + * Processes a single transaction batch within a data chunk. Creates a new transaction, processes + * all records in the batch, and commits or aborts the transaction based on the success of all + * operations. + * + * @param dataChunk the parent data chunk containing this batch + * @param transactionBatch the batch of records to process in a single transaction + * @return an {@link ImportTransactionBatchResult} containing the processing results and any + * errors + */ + private ImportTransactionBatchResult processTransactionBatch( + ImportDataChunk dataChunk, ImportTransactionBatch transactionBatch) { + ImportTransactionBatchStatus status = + ImportTransactionBatchStatus.builder() + .dataChunkId(dataChunk.getDataChunkId()) + .transactionBatchId(transactionBatch.getTransactionBatchId()) + .build(); + notifyTransactionBatchStarted(status); + List importRecordResult = new ArrayList<>(); + boolean isSuccess; + String error = ""; + DistributedTransaction transaction = null; + try { + // Create the ScalarDB transaction + transaction = params.getDistributedTransactionManager().start(); + + // Loop over the transaction batch and process each record + for (ImportRow importRow : transactionBatch.getSourceData()) { + ImportTaskParams taskParams = + ImportTaskParams.builder() + .sourceRecord(importRow.getSourceData()) + .dataChunkId(dataChunk.getDataChunkId()) + .rowNumber(importRow.getRowNumber()) + .importOptions(params.getImportOptions()) + .tableColumnDataTypes(params.getTableColumnDataTypes()) + .tableMetadataByTableName(params.getTableMetadataByTableName()) + .dao(params.getDao()) + .build(); + importRecordResult.add(new ImportTransactionalTask(taskParams, transaction).execute()); + } + isSuccess = + importRecordResult.stream() + .allMatch( + importTaskResult -> + importTaskResult.getTargets().stream() + .allMatch( + targetResult -> + targetResult.getStatus().equals(ImportTargetResultStatus.SAVED))); + + // Check and Commit the transaction + if (isSuccess) { + transaction.commit(); + } else { + transaction.abort(); + error = "All transactions are aborted"; + } + + } catch (TransactionException e) { + isSuccess = false; + LOGGER.error(e.getMessage()); + try { + if (transaction != null) { + transaction.abort(); // Ensure transaction is aborted + } + } catch (TransactionException abortException) { + LOGGER.error( + "Failed to abort transaction: {}", abortException.getMessage(), abortException); + } + error = e.getMessage(); + } + ImportTransactionBatchResult importTransactionBatchResult = + ImportTransactionBatchResult.builder() + .transactionBatchId(transactionBatch.getTransactionBatchId()) + .success(isSuccess) + .dataChunkId(dataChunk.getDataChunkId()) + .records(importRecordResult) + .errors(Collections.singletonList(error)) + .build(); + notifyTransactionBatchCompleted(importTransactionBatchResult); + return importTransactionBatchResult; + } + + /** + * Processes a single record in storage mode (non-transactional). Each record is processed + * independently without transaction guarantees. + * + * @param dataChunk the parent data chunk containing this record + * @param importRow the record to process + * @return an {@link ImportTaskResult} containing the processing result for the record + */ + private ImportTaskResult processStorageRecord(ImportDataChunk dataChunk, ImportRow importRow) { + ImportTaskParams taskParams = + ImportTaskParams.builder() + .sourceRecord(importRow.getSourceData()) + .dataChunkId(dataChunk.getDataChunkId()) + .rowNumber(importRow.getRowNumber()) + .importOptions(params.getImportOptions()) + .tableColumnDataTypes(params.getTableColumnDataTypes()) + .tableMetadataByTableName(params.getTableMetadataByTableName()) + .dao(params.getDao()) + .build(); + ImportTaskResult importRecordResult = + new ImportStorageTask(taskParams, params.getDistributedStorage()).execute(); + + ImportTaskResult modifiedTaskResult = + ImportTaskResult.builder() + .rowNumber(importRecordResult.getRowNumber()) + .rawRecord(importRecordResult.getRawRecord()) + .targets(importRecordResult.getTargets()) + .dataChunkId(dataChunk.getDataChunkId()) + .build(); + notifyStorageRecordCompleted(modifiedTaskResult); + return modifiedTaskResult; + } + + /** + * Processes a complete data chunk using parallel execution. The processing mode (transactional or + * storage) is determined by the configured {@link ScalarDBMode}. + * + * @param dataChunk the data chunk to process + * @param transactionBatchSize the size of transaction batches (used only in transaction mode) + * @return an {@link ImportDataChunkStatus} containing the complete processing results and metrics + */ + protected ImportDataChunkStatus processDataChunk( + ImportDataChunk dataChunk, int transactionBatchSize) { + ImportDataChunkStatus status = + ImportDataChunkStatus.builder() + .dataChunkId(dataChunk.getDataChunkId()) + .startTime(Instant.now()) + .status(ImportDataChunkStatusState.IN_PROGRESS) + .build(); + notifyDataChunkStarted(status); + ImportDataChunkStatus importDataChunkStatus; + if (params.getScalarDBMode() == ScalarDBMode.TRANSACTION) { + importDataChunkStatus = processDataChunkWithTransactions(dataChunk, transactionBatchSize); + } else { + importDataChunkStatus = processDataChunkWithoutTransactions(dataChunk); + } + notifyDataChunkCompleted(importDataChunkStatus); + return importDataChunkStatus; + } + + /** + * Processes a data chunk using transaction mode with parallel batch processing. Multiple + * transaction batches are processed concurrently using a thread pool. + * + * @param dataChunk the data chunk to process + * @param transactionBatchSize the number of records per transaction batch + * @return an {@link ImportDataChunkStatus} containing processing results and metrics + */ + private ImportDataChunkStatus processDataChunkWithTransactions( + ImportDataChunk dataChunk, int transactionBatchSize) { + Instant startTime = Instant.now(); + List transactionBatches = + splitIntoTransactionBatches(dataChunk, transactionBatchSize); + ExecutorService transactionBatchExecutor = + Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads()); + List> transactionBatchFutures = new ArrayList<>(); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failureCount = new AtomicInteger(0); + try { + for (ImportTransactionBatch transactionBatch : transactionBatches) { + Future transactionBatchFuture = + transactionBatchExecutor.submit( + () -> processTransactionBatch(dataChunk, transactionBatch)); + transactionBatchFutures.add(transactionBatchFuture); + } + + waitForFuturesToComplete(transactionBatchFutures); + transactionBatchFutures.forEach( + batchResult -> { + try { + ImportTransactionBatchResult importTransactionBatchResult = + (ImportTransactionBatchResult) batchResult.get(); + importTransactionBatchResult + .getRecords() + .forEach( + batchRecords -> { + if (batchRecords.getTargets().stream() + .allMatch( + targetResult -> + targetResult + .getStatus() + .equals(ImportTargetResultStatus.SAVED))) { + successCount.incrementAndGet(); + } else { + failureCount.incrementAndGet(); + } + }); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } finally { + transactionBatchExecutor.shutdown(); + } + Instant endTime = Instant.now(); + int totalDuration = (int) Duration.between(startTime, endTime).toMillis(); + return ImportDataChunkStatus.builder() + .dataChunkId(dataChunk.getDataChunkId()) + .failureCount(failureCount.get()) + .successCount(successCount.get()) + .totalRecords(dataChunk.getSourceData().size()) + .batchCount(transactionBatches.size()) + .status(ImportDataChunkStatusState.COMPLETE) + .startTime(startTime) + .endTime(endTime) + .totalDurationInMilliSeconds(totalDuration) + .build(); + } + + /** + * Processes a data chunk using storage mode with parallel record processing. Individual records + * are processed concurrently without transaction guarantees. + * + * @param dataChunk the data chunk to process + * @return an {@link ImportDataChunkStatus} containing processing results and metrics + */ + private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChunk dataChunk) { + Instant startTime = Instant.now(); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failureCount = new AtomicInteger(0); + ExecutorService recordExecutor = + Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads()); + List> recordFutures = new ArrayList<>(); + try { + for (ImportRow importRow : dataChunk.getSourceData()) { + Future recordFuture = + recordExecutor.submit(() -> processStorageRecord(dataChunk, importRow)); + recordFutures.add(recordFuture); + } + waitForFuturesToComplete(recordFutures); + recordFutures.forEach( + r -> { + try { + ImportTaskResult result = (ImportTaskResult) r.get(); + boolean allSaved = + result.getTargets().stream() + .allMatch(t -> t.getStatus().equals(ImportTargetResultStatus.SAVED)); + if (allSaved) successCount.incrementAndGet(); + else failureCount.incrementAndGet(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } finally { + recordExecutor.shutdown(); + } + Instant endTime = Instant.now(); + int totalDuration = (int) Duration.between(startTime, endTime).toMillis(); + return ImportDataChunkStatus.builder() + .dataChunkId(dataChunk.getDataChunkId()) + .totalRecords(dataChunk.getSourceData().size()) + .successCount(successCount.get()) + .failureCount(failureCount.get()) + .startTime(startTime) + .endTime(endTime) + .totalDurationInMilliSeconds(totalDuration) + .status(ImportDataChunkStatusState.COMPLETE) + .build(); + } + + /** + * Waits for all futures in the provided list to complete. Any exceptions during execution are + * logged but not propagated. + * + * @param futures the list of {@link Future} objects to wait for + */ + private void waitForFuturesToComplete(List> futures) { + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorFactory.java new file mode 100644 index 0000000000..a84e13de57 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorFactory.java @@ -0,0 +1,17 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +/** + * A factory interface for creating {@link ImportProcessor} instances. This factory follows the + * Factory design pattern to encapsulate the creation of specific import processor implementations. + */ +public interface ImportProcessorFactory { + + /** + * Creates a new instance of an {@link ImportProcessor}. + * + * @param params The parameters required for configuring the import processor + * @return A new {@link ImportProcessor} instance configured with the provided parameters + * @throws IllegalArgumentException if the provided parameters are invalid + */ + ImportProcessor createImportProcessor(ImportProcessorParams params); +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java new file mode 100644 index 0000000000..36b96f62d5 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java @@ -0,0 +1,43 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.ScalarDBMode; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import java.util.Map; +import lombok.Builder; +import lombok.Value; + +/** + * Parameters class for the import processor containing all necessary components for data import + * operations. + * + *

This class is immutable and uses the Builder pattern for construction. It encapsulates all + * required parameters and dependencies for processing data imports in ScalarDB. + */ +@Builder +@Value +public class ImportProcessorParams { + /** The operational mode of ScalarDB (transaction or storage mode). */ + ScalarDBMode scalarDBMode; + + /** Configuration options for the import operation. */ + ImportOptions importOptions; + + /** Mapping of table names to their corresponding metadata definitions. */ + Map tableMetadataByTableName; + + /** Data type information for table columns. */ + TableColumnDataTypes tableColumnDataTypes; + + /** Data Access Object for ScalarDB operations. */ + ScalarDBDao dao; + + /** Storage interface for non-transactional operations. */ + DistributedStorage distributedStorage; + + /** Transaction manager for handling transactional operations. */ + DistributedTransactionManager distributedTransactionManager; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java new file mode 100644 index 0000000000..733a5afa96 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java @@ -0,0 +1,164 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; +import java.io.BufferedReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A processor for importing JSON data into the database. + * + *

This processor handles JSON files that contain an array of JSON objects. Each object in the + * array represents a row to be imported into the database. The processor reads the JSON file, + * splits it into chunks of configurable size, and processes these chunks in parallel using multiple + * threads. + * + *

The processing is done in two main phases: + * + *

    + *
  • Reading phase: The JSON file is read and split into chunks + *
  • Processing phase: Each chunk is processed independently and imported into the database + *
+ * + *

The processor uses a producer-consumer pattern where one thread reads the JSON file and + * produces data chunks, while a pool of worker threads consumes and processes these chunks. + */ +public class JsonImportProcessor extends ImportProcessor { + + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); + + public JsonImportProcessor(ImportProcessorParams params) { + super(params); + } + + /** + * Processes the source data from the given import file. + * + *

This method reads data from the provided {@link BufferedReader}, processes it in chunks, and + * batches transactions according to the specified sizes. The method returns a list of {@link + * ImportDataChunkStatus} objects, each representing the status of a processed data chunk. + * + * @param dataChunkSize the number of records to include in each data chunk + * @param transactionBatchSize the number of records to include in each transaction batch + * @param reader the {@link BufferedReader} used to read the source file + * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each + * data chunk + */ + @Override + public ConcurrentHashMap process( + int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); + BlockingQueue dataChunkQueue = + new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); + + try { + CompletableFuture readerFuture = + CompletableFuture.runAsync( + () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor); + + ConcurrentHashMap result = new ConcurrentHashMap<>(); + + while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) { + ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS); + if (dataChunk != null) { + ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize); + result.put(status.getDataChunkId(), status); + } + } + + readerFuture.join(); + return result; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e); + } finally { + dataChunkExecutor.shutdown(); + try { + if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + dataChunkExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + dataChunkExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + notifyAllDataChunksCompleted(); + } + } + + /** + * Reads data chunks from the JSON file and adds them to the processing queue. + * + *

This method reads the JSON file as an array of objects, creating data chunks of the + * specified size. Each chunk is then added to the queue for processing. The method expects the + * JSON file to start with an array token '[' and end with ']'. + * + *

Empty or null JSON nodes are skipped during processing. + * + * @param reader the BufferedReader containing the JSON data + * @param dataChunkSize the maximum number of records to include in each chunk + * @param dataChunkQueue the queue where data chunks are placed for processing + * @throws RuntimeException if there is an error reading the JSON file or if the thread is + * interrupted + */ + private void readDataChunks( + BufferedReader reader, int dataChunkSize, BlockingQueue dataChunkQueue) { + try (JsonParser jsonParser = new JsonFactory().createParser(reader)) { + if (jsonParser.nextToken() != JsonToken.START_ARRAY) { + throw new IOException(CoreError.DATA_LOADER_JSON_CONTENT_START_ERROR.buildMessage()); + } + + List currentDataChunk = new ArrayList<>(); + int rowNumber = 1; + while (jsonParser.nextToken() != JsonToken.END_ARRAY) { + JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonParser); + if (jsonNode == null || jsonNode.isEmpty()) continue; + + currentDataChunk.add(new ImportRow(rowNumber++, jsonNode)); + if (currentDataChunk.size() == dataChunkSize) { + enqueueDataChunk(currentDataChunk, dataChunkQueue); + currentDataChunk = new ArrayList<>(); + } + } + if (!currentDataChunk.isEmpty()) enqueueDataChunk(currentDataChunk, dataChunkQueue); + } catch (IOException | InterruptedException e) { + throw new RuntimeException( + CoreError.DATA_LOADER_JSON_FILE_READ_FAILED.buildMessage(e.getMessage()), e); + } + } + + /** + * Adds a data chunk to the processing queue. + * + *

This method creates a new ImportDataChunk with a unique ID and the provided data, then adds + * it to the processing queue. The ID is generated using an atomic counter to ensure thread + * safety. + * + * @param dataChunk the list of ImportRow objects to be processed + * @param queue the queue where the data chunk will be added + * @throws InterruptedException if the thread is interrupted while waiting to add to the queue + */ + private void enqueueDataChunk(List dataChunk, BlockingQueue queue) + throws InterruptedException { + int dataChunkId = dataChunkIdCounter.getAndIncrement(); + queue.put(ImportDataChunk.builder().dataChunkId(dataChunkId).sourceData(dataChunk).build()); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java new file mode 100644 index 0000000000..a121a106a5 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java @@ -0,0 +1,156 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; +import java.io.BufferedReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A processor for importing data from JSON Lines (JSONL) formatted files. + * + *

This processor reads data from files where each line is a valid JSON object. It processes the + * input file in chunks, allowing for parallel processing and batched transactions for efficient + * data loading. + * + *

The processor uses a multi-threaded approach with: + * + *

    + *
  • A dedicated thread for reading data chunks from the input file + *
  • Multiple threads for processing data chunks in parallel + *
  • A queue-based system to manage data chunks between reader and processor threads + *
+ */ +public class JsonLinesImportProcessor extends ImportProcessor { + + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0); + + /** + * Creates a new JsonLinesImportProcessor with the specified parameters. + * + * @param params configuration parameters for the import processor + */ + public JsonLinesImportProcessor(ImportProcessorParams params) { + super(params); + } + + /** + * Processes the source data from the given import file. + * + *

This method reads data from the provided {@link BufferedReader}, processes it in chunks, and + * batches transactions according to the specified sizes. The method returns a list of {@link + * ImportDataChunkStatus} objects, each representing the status of a processed data chunk. + * + * @param dataChunkSize the number of records to include in each data chunk + * @param transactionBatchSize the number of records to include in each transaction batch + * @param reader the {@link BufferedReader} used to read the source file + * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each + * data chunk + */ + @Override + public ConcurrentHashMap process( + int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); + BlockingQueue dataChunkQueue = + new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); + + try { + CompletableFuture readerFuture = + CompletableFuture.runAsync( + () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor); + + ConcurrentHashMap result = new ConcurrentHashMap<>(); + + while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) { + ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS); + if (dataChunk != null) { + ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize); + result.put(status.getDataChunkId(), status); + } + } + + readerFuture.join(); + return result; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e); + } finally { + dataChunkExecutor.shutdown(); + try { + if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + dataChunkExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + dataChunkExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + notifyAllDataChunksCompleted(); + } + } + + /** + * Reads data from the input file and creates data chunks for processing. + * + *

This method reads the input file line by line, parsing each line as a JSON object. It + * accumulates rows until reaching the specified chunk size, then enqueues the chunk for + * processing. Empty lines or invalid JSON objects are skipped. + * + * @param reader the BufferedReader for reading the input file + * @param dataChunkSize the maximum number of rows to include in each data chunk + * @param dataChunkQueue the queue where data chunks are placed for processing + * @throws RuntimeException if there is an error reading the file or if the thread is interrupted + */ + private void readDataChunks( + BufferedReader reader, int dataChunkSize, BlockingQueue dataChunkQueue) { + try { + List currentDataChunk = new ArrayList<>(); + int rowNumber = 1; + String line; + while ((line = reader.readLine()) != null) { + JsonNode jsonNode = OBJECT_MAPPER.readTree(line); + if (jsonNode == null || jsonNode.isEmpty()) continue; + + currentDataChunk.add(new ImportRow(rowNumber++, jsonNode)); + if (currentDataChunk.size() == dataChunkSize) { + enqueueDataChunk(currentDataChunk, dataChunkQueue); + currentDataChunk = new ArrayList<>(); + } + } + if (!currentDataChunk.isEmpty()) enqueueDataChunk(currentDataChunk, dataChunkQueue); + } catch (IOException | InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + CoreError.DATA_LOADER_JSONLINES_FILE_READ_FAILED.buildMessage(e.getMessage()), e); + } + } + + /** + * Enqueues a data chunk for processing. + * + *

Creates a new ImportDataChunk with a unique ID and adds it to the processing queue. + * + * @param dataChunk the list of ImportRows to be processed + * @param queue the queue where the data chunk should be placed + * @throws InterruptedException if the thread is interrupted while waiting to add to the queue + */ + private void enqueueDataChunk(List dataChunk, BlockingQueue queue) + throws InterruptedException { + int dataChunkId = dataChunkIdCounter.getAndIncrement(); + queue.put(ImportDataChunk.builder().dataChunkId(dataChunkId).sourceData(dataChunk).build()); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypes.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypes.java new file mode 100644 index 0000000000..b9684ee64a --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypes.java @@ -0,0 +1,95 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.scalar.db.io.DataType; +import java.util.HashMap; +import java.util.Map; + +/** + * A class that maintains a mapping of column data types for database tables. + * + *

This class provides functionality to store and retrieve data type information for table + * columns in a database schema. It uses a nested map structure where the outer map keys are table + * names and the inner map keys are column names. + * + *

Example usage: + * + *

{@code
+ * TableColumnDataTypes types = new TableColumnDataTypes();
+ *
+ * // Add column data types for a table
+ * types.addColumnDataType("users", "id", DataType.INT);
+ * types.addColumnDataType("users", "name", DataType.TEXT);
+ *
+ * // Retrieve data type for a specific column
+ * DataType idType = types.getDataType("users", "id"); // Returns DataType.INT
+ *
+ * // Get all column data types for a table
+ * Map userColumns = types.getColumnDataTypes("users");
+ * }
+ */ +public class TableColumnDataTypes { + private final Map> dataTypesByColumnsByTable; + + /** + * Constructs a new {@code TableColumnDataTypes} instance with an empty mapping. The internal + * structure is initialized as an empty HashMap that will store table names as keys and + * column-to-datatype mappings as values. + */ + public TableColumnDataTypes() { + this.dataTypesByColumnsByTable = new HashMap<>(); + } + + /** + * Adds a data type for a specific column in a given table. + * + *

If the table doesn't exist in the mapping, a new entry is created automatically. If the + * column already exists for the specified table, its data type will be updated with the new + * value. + * + * @param tableName the name of the table + * @param columnName the name of the column + * @param dataType the data type associated with the column + * @throws NullPointerException if any of the parameters is null + */ + public void addColumnDataType(String tableName, String columnName, DataType dataType) { + dataTypesByColumnsByTable + .computeIfAbsent(tableName, key -> new HashMap<>()) + .put(columnName, dataType); + } + + /** + * Retrieves the data type of specific column in a given table. + * + *

This method performs a lookup in the internal mapping to find the data type associated with + * the specified table and column combination. + * + * @param tableName the name of the table + * @param columnName the name of the column + * @return the {@link DataType} of the column, or {@code null} if either the table or the column + * is not found in the mapping + * @throws NullPointerException if any of the parameters is null + */ + public DataType getDataType(String tableName, String columnName) { + Map columnDataTypes = dataTypesByColumnsByTable.get(tableName); + if (columnDataTypes != null) { + return columnDataTypes.get(columnName); + } + return null; + } + + /** + * Retrieves all column data types for a given table. + * + *

Returns a map containing all columns and their corresponding data types for the specified + * table. The returned map is a direct reference to the internal map, so modifications to it will + * affect the internal state. + * + * @param tableName the name of the table + * @return a {@link Map} of column names to their respective {@link DataType}s, or {@code null} if + * the table does not exist in the mapping + * @throws NullPointerException if tableName is null + */ + public Map getColumnDataTypes(String tableName) { + return dataTypesByColumnsByTable.get(tableName); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java new file mode 100644 index 0000000000..98d982cac0 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportStorageTask.java @@ -0,0 +1,93 @@ +package com.scalar.db.dataloader.core.dataimport.task; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.Result; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.io.Column; +import com.scalar.db.io.Key; +import java.util.List; +import java.util.Optional; + +/** + * An import task that interacts with a {@link DistributedStorage} for data retrieval and storage + * operations. + * + *

This class extends {@link ImportTask} and provides concrete implementations for fetching and + * storing records using a {@link DistributedStorage} instance. It acts as a bridge between the + * import process and the underlying distributed storage system. + * + *

The task handles both read and write operations: + * + *

    + *
  • Reading existing records using partition and clustering keys + *
  • Storing new or updated records with their associated columns + *
+ * + *

All storage operations are performed through the provided {@link DistributedStorage} instance, + * which must be properly initialized before creating this task. + */ +public class ImportStorageTask extends ImportTask { + + private final DistributedStorage storage; + + /** + * Constructs an {@code ImportStorageTask} with the specified parameters and storage. + * + * @param params the import task parameters containing configuration and DAO objects + * @param storage the distributed storage instance to be used for data operations + * @throws NullPointerException if either params or storage is null + */ + public ImportStorageTask(ImportTaskParams params, DistributedStorage storage) { + super(params); + this.storage = storage; + } + + /** + * Retrieves a data record from the distributed storage using the specified keys. + * + *

This method attempts to fetch a single record from the specified table using both partition + * and clustering keys. The operation is performed through the configured DAO using the associated + * storage instance. + * + * @param namespace the namespace of the table to query + * @param tableName the name of the table to query + * @param partitionKey the partition key identifying the record's partition + * @param clusteringKey the clustering key for further record identification within the partition + * @return an {@link Optional} containing the {@link Result} if the record exists, otherwise an + * empty {@link Optional} + * @throws ScalarDBDaoException if an error occurs during the retrieval operation, such as + * connection issues or invalid table/namespace + */ + @Override + protected Optional getDataRecord( + String namespace, String tableName, Key partitionKey, Key clusteringKey) + throws ScalarDBDaoException { + return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, this.storage); + } + + /** + * Saves a record into the distributed storage with the specified keys and columns. + * + *

This method writes or updates a record in the specified table using the provided keys and + * column values. The operation is performed through the configured DAO using the associated + * storage instance. + * + * @param namespace the namespace of the target table + * @param tableName the name of the target table + * @param partitionKey the partition key determining where the record will be stored + * @param clusteringKey the clustering key for organizing records within the partition + * @param columns the list of columns containing the record's data to be saved + * @throws ScalarDBDaoException if an error occurs during the save operation, such as connection + * issues, invalid data types, or constraint violations + */ + @Override + protected void saveRecord( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + List> columns) + throws ScalarDBDaoException { + params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, this.storage); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java new file mode 100644 index 0000000000..3be177a00a --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java @@ -0,0 +1,468 @@ +package com.scalar.db.dataloader.core.dataimport.task; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.dataimport.ImportMode; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes; +import com.scalar.db.dataloader.core.dataimport.task.mapping.ImportDataMapping; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.task.validation.ImportSourceRecordValidationResult; +import com.scalar.db.dataloader.core.dataimport.task.validation.ImportSourceRecordValidator; +import com.scalar.db.dataloader.core.exception.Base64Exception; +import com.scalar.db.dataloader.core.exception.ColumnParsingException; +import com.scalar.db.dataloader.core.util.ColumnUtils; +import com.scalar.db.dataloader.core.util.KeyUtils; +import com.scalar.db.dataloader.core.util.TableMetadataUtil; +import com.scalar.db.io.Column; +import com.scalar.db.io.DataType; +import com.scalar.db.io.Key; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.RequiredArgsConstructor; + +/** + * Abstract base class for handling data import tasks into ScalarDB tables. This class provides + * functionality to import data into single or multiple tables based on the provided import options + * and control file configurations. + */ +@RequiredArgsConstructor +public abstract class ImportTask { + + protected final ImportTaskParams params; + + /** + * Executes the import task by importing data into one or more database tables. If a control file + * is specified in the import options, performs a multi-table import. Otherwise, performs a single + * table import. + * + * @return ImportTaskResult containing the results of the import operation including + * success/failure status and any error messages for each target table + */ + public ImportTaskResult execute() { + + ObjectNode mutableSourceRecord = params.getSourceRecord().deepCopy(); + ImportOptions importOptions = params.getImportOptions(); + + // Single table import + if (importOptions.getControlFile() == null) { + String tableLookupKey = + TableMetadataUtil.getTableLookupKey( + importOptions.getNamespace(), importOptions.getTableName()); + ImportTargetResult singleTargetResult = + importIntoSingleTable( + importOptions.getNamespace(), + importOptions.getTableName(), + params.getTableMetadataByTableName().get(tableLookupKey), + params.getTableColumnDataTypes().getColumnDataTypes(tableLookupKey), + null, + mutableSourceRecord); + // Add the single target result to the list of targets and return the result + return ImportTaskResult.builder() + .rawRecord(params.getSourceRecord()) + .rowNumber(params.getRowNumber()) + .targets(Collections.singletonList(singleTargetResult)) + .build(); + } + + // Multi-table import + List multiTargetResults = + startMultiTableImportProcess( + importOptions.getControlFile(), + params.getTableMetadataByTableName(), + params.getTableColumnDataTypes(), + mutableSourceRecord); + + return ImportTaskResult.builder() + .targets(multiTargetResults) + .rawRecord(params.getSourceRecord()) + .rowNumber(params.getRowNumber()) + .build(); + } + + /** + * Processes multi-table import based on the control file configuration. For each table specified + * in the control file, validates the source data and performs the import operation. + * + * @param controlFile control file which maps source data columns to target table columns + * @param tableMetadataByTableName map of table metadata indexed by table name + * @param tableColumnDataTypes map of column data types indexed by table name + * @param mutableSourceRecord source record data that can be modified during import + * @return List of ImportTargetResult objects containing the results for each table import + */ + private List startMultiTableImportProcess( + ControlFile controlFile, + Map tableMetadataByTableName, + TableColumnDataTypes tableColumnDataTypes, + ObjectNode mutableSourceRecord) { + + List targetResults = new ArrayList<>(); + + // Import for every table mapping specified in the control file + for (ControlFileTable controlFileTable : controlFile.getTables()) { + for (ControlFileTableFieldMapping mapping : controlFileTable.getMappings()) { + if (!mutableSourceRecord.has(mapping.getSourceField()) + && !mutableSourceRecord.has(mapping.getTargetColumn())) { + String errorMessage = + CoreError.DATA_LOADER_MISSING_SOURCE_FIELD.buildMessage( + mapping.getSourceField(), controlFileTable.getTable()); + + ImportTargetResult targetResult = + ImportTargetResult.builder() + .namespace(controlFileTable.getNamespace()) + .tableName(controlFileTable.getTable()) + .errors(Collections.singletonList(errorMessage)) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .build(); + return Collections.singletonList(targetResult); + } + } + + // Import into a single table + String tableLookupKey = TableMetadataUtil.getTableLookupKey(controlFileTable); + TableMetadata tableMetadata = tableMetadataByTableName.get(tableLookupKey); + Map dataTypesByColumns = + tableColumnDataTypes.getColumnDataTypes(tableLookupKey); + // Copied data to an object node data was overwritten by following operations and data check + // fails when same object is referenced again in logic before + ObjectNode copyNode = mutableSourceRecord.deepCopy(); + ImportTargetResult result = + importIntoSingleTable( + controlFileTable.getNamespace(), + controlFileTable.getTable(), + tableMetadata, + dataTypesByColumns, + controlFileTable, + copyNode); + targetResults.add(result); + } + return targetResults; + } + + /** + * Imports data into a single table with validation and error handling. The method performs the + * following steps: 1. Validates table metadata and source record 2. Creates partition and + * clustering keys 3. Determines whether to insert or update based on existing data 4. Applies the + * import operation according to specified import mode + * + * @param namespace database namespace name + * @param table target table name + * @param tableMetadata metadata describing the table structure + * @param dataTypeByColumnName map of column names to their data types + * @param controlFileTable optional control file table configuration for column mapping + * @param mutableSourceRecord source record to be imported + * @return ImportTargetResult containing the result of the import operation + */ + private ImportTargetResult importIntoSingleTable( + String namespace, + String table, + TableMetadata tableMetadata, + Map dataTypeByColumnName, + ControlFileTable controlFileTable, + ObjectNode mutableSourceRecord) { + + ImportOptions importOptions = params.getImportOptions(); + + if (dataTypeByColumnName == null || tableMetadata == null) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .errors( + Collections.singletonList( + CoreError.DATA_LOADER_TABLE_METADATA_MISSING.buildMessage())) + .build(); + } + + LinkedHashSet partitionKeyNames = tableMetadata.getPartitionKeyNames(); + LinkedHashSet clusteringKeyNames = tableMetadata.getClusteringKeyNames(); + LinkedHashSet columnNames = tableMetadata.getColumnNames(); + + applyDataMapping(controlFileTable, mutableSourceRecord); + + boolean checkForMissingColumns = shouldCheckForMissingColumns(importOptions); + + ImportSourceRecordValidationResult validationResult = + validateSourceRecord( + partitionKeyNames, + clusteringKeyNames, + columnNames, + mutableSourceRecord, + checkForMissingColumns, + tableMetadata); + + if (!validationResult.isValid()) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .errors(validationResult.getErrorMessages()) + .build(); + } + + Optional optionalPartitionKey = + KeyUtils.createPartitionKeyFromSource( + partitionKeyNames, dataTypeByColumnName, mutableSourceRecord); + if (!optionalPartitionKey.isPresent()) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .errors( + Collections.singletonList( + CoreError.DATA_LOADER_COULD_NOT_FIND_PARTITION_KEY.buildMessage())) + .build(); + } + Optional optionalClusteringKey = Optional.empty(); + if (!clusteringKeyNames.isEmpty()) { + optionalClusteringKey = + KeyUtils.createClusteringKeyFromSource( + clusteringKeyNames, dataTypeByColumnName, mutableSourceRecord); + if (!optionalClusteringKey.isPresent()) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .errors( + Collections.singletonList( + CoreError.DATA_LOADER_COULD_NOT_FIND_CLUSTERING_KEY.buildMessage())) + .build(); + } + } + + Optional optionalScalarDBResult; + + try { + optionalScalarDBResult = + getDataRecord( + namespace, table, optionalPartitionKey.get(), optionalClusteringKey.orElse(null)); + } catch (ScalarDBDaoException e) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .status(ImportTargetResultStatus.RETRIEVAL_FAILED) + .errors(Collections.singletonList(e.getMessage())) + .build(); + } + ImportTaskAction importAction = + optionalScalarDBResult.isPresent() ? ImportTaskAction.UPDATE : ImportTaskAction.INSERT; + + if (importAction == ImportTaskAction.INSERT + && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) { + ImportSourceRecordValidationResult validationResultForMissingColumns = + new ImportSourceRecordValidationResult(); + ImportSourceRecordValidator.checkMissingColumns( + mutableSourceRecord, columnNames, validationResultForMissingColumns, tableMetadata); + if (!validationResultForMissingColumns.isValid()) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .status(ImportTargetResultStatus.MISSING_COLUMNS) + .errors( + Collections.singletonList( + CoreError.DATA_LOADER_UPSERT_INSERT_MISSING_COLUMNS.buildMessage())) + .build(); + } + } + + if (shouldFailForExistingData(importAction, importOptions)) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .importedRecord(mutableSourceRecord) + .importAction(importAction) + .status(ImportTargetResultStatus.DATA_ALREADY_EXISTS) + .errors( + Collections.singletonList(CoreError.DATA_LOADER_DATA_ALREADY_EXISTS.buildMessage())) + .build(); + } + + if (shouldFailForMissingData(importAction, importOptions)) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .importedRecord(mutableSourceRecord) + .importAction(importAction) + .status(ImportTargetResultStatus.DATA_NOT_FOUND) + .errors(Collections.singletonList(CoreError.DATA_LOADER_DATA_NOT_FOUND.buildMessage())) + .build(); + } + + List> columns; + + try { + columns = + ColumnUtils.getColumnsFromResult( + optionalScalarDBResult.orElse(null), + mutableSourceRecord, + importOptions.isIgnoreNullValues(), + tableMetadata); + } catch (Base64Exception | ColumnParsingException e) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .status(ImportTargetResultStatus.VALIDATION_FAILED) + .errors(Collections.singletonList(e.getMessage())) + .build(); + } + + // Save the record + try { + saveRecord( + namespace, + table, + optionalPartitionKey.get(), + optionalClusteringKey.orElse(null), + columns); + + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .importAction(importAction) + .importedRecord(mutableSourceRecord) + .status(ImportTargetResultStatus.SAVED) + .build(); + + } catch (ScalarDBDaoException e) { + return ImportTargetResult.builder() + .namespace(namespace) + .tableName(table) + .importAction(importAction) + .status(ImportTargetResultStatus.SAVE_FAILED) + .errors(Collections.singletonList(e.getMessage())) + .build(); + } + } + + /** + * Applies data mapping to the given source record based on the specified control file table. + * + * @param controlFileTable the control file table containing column mappings + * @param mutableSourceRecord the source record to be modified based on the mappings + */ + private void applyDataMapping(ControlFileTable controlFileTable, ObjectNode mutableSourceRecord) { + if (controlFileTable != null) { + ImportDataMapping.apply(mutableSourceRecord, controlFileTable); + } + } + + /** + * Determines whether missing columns should be checked based on import options. + * + * @param importOptions the import options to evaluate + * @return {@code true} if missing columns should be checked, otherwise {@code false} + */ + private boolean shouldCheckForMissingColumns(ImportOptions importOptions) { + return importOptions.getImportMode() == ImportMode.INSERT + || importOptions.isRequireAllColumns(); + } + + /** + * Validates a source record against the given table metadata and constraints. + * + * @param partitionKeyNames the set of partition key names + * @param clusteringKeyNames the set of clustering key names + * @param columnNames the set of expected column names + * @param mutableSourceRecord the source record to be validated + * @param checkForMissingColumns whether to check for missing columns + * @param tableMetadata the table metadata containing schema details + * @return the validation result containing any validation errors or success status + */ + private ImportSourceRecordValidationResult validateSourceRecord( + LinkedHashSet partitionKeyNames, + LinkedHashSet clusteringKeyNames, + LinkedHashSet columnNames, + ObjectNode mutableSourceRecord, + boolean checkForMissingColumns, + TableMetadata tableMetadata) { + return ImportSourceRecordValidator.validateSourceRecord( + partitionKeyNames, + clusteringKeyNames, + columnNames, + mutableSourceRecord, + checkForMissingColumns, + tableMetadata); + } + + /** + * Determines whether missing columns should be revalidated when performing an upsert operation. + * + * @param importOptions the import options to evaluate + * @param checkForMissingColumns whether missing columns were initially checked + * @return {@code true} if missing columns should be revalidated, otherwise {@code false} + */ + private boolean shouldRevalidateMissingColumns( + ImportOptions importOptions, boolean checkForMissingColumns) { + return !checkForMissingColumns && importOptions.getImportMode() == ImportMode.UPSERT; + } + + /** + * Determines whether the operation should fail if data already exists. + * + * @param importAction the action being performed (e.g., INSERT, UPDATE) + * @param importOptions the import options specifying behavior + * @return {@code true} if the operation should fail for existing data, otherwise {@code false} + */ + private boolean shouldFailForExistingData( + ImportTaskAction importAction, ImportOptions importOptions) { + return importAction == ImportTaskAction.UPDATE + && importOptions.getImportMode() == ImportMode.INSERT; + } + + /** + * Determines whether the operation should fail if the expected data is missing. + * + * @param importAction the action being performed (e.g., INSERT, UPDATE) + * @param importOptions the import options specifying behavior + * @return {@code true} if the operation should fail for missing data, otherwise {@code false} + */ + private boolean shouldFailForMissingData( + ImportTaskAction importAction, ImportOptions importOptions) { + return importAction == ImportTaskAction.INSERT + && importOptions.getImportMode() == ImportMode.UPDATE; + } + + /** + * Retrieves an existing record from the database if it exists. + * + * @param namespace the database namespace + * @param tableName the target table name + * @param partitionKey the partition key for the record + * @param clusteringKey the clustering key for the record (can be null) + * @return Optional containing the Result if found, empty if not found + * @throws ScalarDBDaoException if there is an error accessing the database + */ + protected abstract Optional getDataRecord( + String namespace, String tableName, Key partitionKey, Key clusteringKey) + throws ScalarDBDaoException; + + /** + * Saves a record to the database, either as an insert or update operation. + * + * @param namespace the database namespace + * @param tableName the target table name + * @param partitionKey the partition key for the record + * @param clusteringKey the clustering key for the record (can be null) + * @param columns the columns and their values to be saved + * @throws ScalarDBDaoException if there is an error saving to the database + */ + protected abstract void saveRecord( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + List> columns) + throws ScalarDBDaoException; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java new file mode 100644 index 0000000000..eafe3a42ae --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java @@ -0,0 +1,41 @@ +package com.scalar.db.dataloader.core.dataimport.task; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes; +import java.util.Map; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +/** + * Parameters required for executing an import task in the data loader. This class encapsulates all + * necessary information needed to process and import a single record into ScalarDB. + */ +@Builder +@Value +public class ImportTaskParams { + + /** The source record to be imported, represented as a JSON node */ + @NonNull JsonNode sourceRecord; + + /** Identifier for the current chunk of data being processed */ + int dataChunkId; + + /** The row number of the current record in the source data */ + int rowNumber; + + /** Configuration options for the import process */ + @NonNull ImportOptions importOptions; + + /** Mapping of table names to their corresponding metadata */ + @NonNull Map tableMetadataByTableName; + + /** Data type information for table columns */ + @NonNull TableColumnDataTypes tableColumnDataTypes; + + /** Data Access Object for interacting with ScalarDB */ + @NonNull ScalarDBDao dao; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java new file mode 100644 index 0000000000..449270d929 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java @@ -0,0 +1,106 @@ +package com.scalar.db.dataloader.core.dataimport.task; + +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.Result; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.exception.transaction.AbortException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.io.Column; +import com.scalar.db.io.Key; +import java.util.List; +import java.util.Optional; + +/** + * An import task that operates within a {@link DistributedTransaction} context. + * + *

This class extends {@link ImportTask} and provides transactional semantics for data import + * operations. It ensures that all data operations (get and put) are executed within the same + * transaction context, maintaining ACID properties. + * + *

The task uses a single {@link DistributedTransaction} instance throughout its lifecycle, which + * is passed during construction. This transaction must be managed (committed or aborted) by the + * caller. + */ +public class ImportTransactionalTask extends ImportTask { + + private final DistributedTransaction transaction; + + /** + * Constructs an {@code ImportTransactionalTask} with the specified parameters and transaction. + * + * @param params the import task parameters containing configuration and DAO objects + * @param transaction the distributed transaction to be used for all data operations. This + * transaction should be properly managed (committed/aborted) by the caller + */ + public ImportTransactionalTask(ImportTaskParams params, DistributedTransaction transaction) { + super(params); + this.transaction = transaction; + } + + /** + * Retrieves a data record within the active transaction context. + * + *

This method overrides the base implementation to ensure the get operation is executed within + * the transaction context provided during construction. + * + * @param namespace the namespace of the table to query + * @param tableName the name of the table to query + * @param partitionKey the partition key identifying the record's partition + * @param clusteringKey the clustering key for further record identification within the partition + * @return an {@link Optional} containing the {@link Result} if the record exists, otherwise an + * empty {@link Optional} + * @throws ScalarDBDaoException if an error occurs during the database operation or if the + * transaction encounters any issues + */ + @Override + protected Optional getDataRecord( + String namespace, String tableName, Key partitionKey, Key clusteringKey) + throws ScalarDBDaoException { + return params.getDao().get(namespace, tableName, partitionKey, clusteringKey, transaction); + } + + /** + * Saves a record within the active transaction context. + * + *

This method overrides the base implementation to ensure the put operation is executed within + * the transaction context provided during construction. + * + * @param namespace the namespace of the target table + * @param tableName the name of the target table + * @param partitionKey the partition key determining where the record will be stored + * @param clusteringKey the clustering key for ordering/organizing records within the partition + * @param columns the list of columns containing the actual data to be saved + * @throws ScalarDBDaoException if an error occurs during the database operation or if the + * transaction encounters any issues + */ + @Override + protected void saveRecord( + String namespace, + String tableName, + Key partitionKey, + Key clusteringKey, + List> columns) + throws ScalarDBDaoException { + params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, transaction); + } + + /** + * Aborts the active ScalarDB transaction if it has not been committed. + * + *

This method provides a safe way to abort an active transaction, handling any abort-related + * exceptions by wrapping them in a {@link TransactionException}. + * + * @param tx the transaction to be aborted. If null, this method does nothing + * @throws TransactionException if an error occurs during the abort operation or if the underlying + * abort operation fails + */ + private void abortActiveTransaction(DistributedTransaction tx) throws TransactionException { + if (tx != null) { + try { + tx.abort(); + } catch (AbortException e) { + throw new TransactionException(e.getMessage(), tx.getId()); + } + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java index 2ab4706fc9..0907388978 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java @@ -1,7 +1,11 @@ package com.scalar.db.dataloader.core.util; +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ColumnInfo; +import com.scalar.db.dataloader.core.exception.Base64Exception; import com.scalar.db.dataloader.core.exception.ColumnParsingException; import com.scalar.db.io.BigIntColumn; import com.scalar.db.io.BlobColumn; @@ -16,19 +20,38 @@ import com.scalar.db.io.TimeColumn; import com.scalar.db.io.TimestampColumn; import com.scalar.db.io.TimestampTZColumn; +import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.util.ArrayList; import java.util.Base64; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; /** * Utility class for creating and managing ScalarDB columns. * - *

This class provides methods for creating ScalarDB columns based on the given data type, column - * information, and value. It includes handling for various data types and special cases like base64 - * encoding for BLOB data. + *

This class provides utility methods for: + * + *

    + *
  • Creating ScalarDB columns from various data types and values + *
  • Converting between ScalarDB Result objects and column data + *
  • Handling special data formats like base64 encoding for BLOB data + *
  • Managing transaction-related metadata columns + *
+ * + *

The class supports all ScalarDB data types including: + * + *

    + *
  • Basic types: BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, TEXT + *
  • Binary data: BLOB (requires base64 encoding) + *
  • Temporal types: DATE, TIME, TIMESTAMP, TIMESTAMPTZ + *
*/ public final class ColumnUtils { @@ -38,15 +61,22 @@ private ColumnUtils() {} /** * Creates a ScalarDB column from the given data type, column information, and value. * - *

Blob source values need to be base64 encoded before passing them as a value. If the value is - * {@code null}, the corresponding column is created as a {@code null} column. + *

This method handles the creation of columns for all supported ScalarDB data types. For BLOB + * type columns, the input value must be base64 encoded before being passed to this method. * - * @param dataType the data type of the specified column - * @param columnInfo the ScalarDB table column information - * @param value the value for the ScalarDB column (may be {@code null}) + *

If the provided value is {@code null}, a null column of the appropriate type is created. + * + * @param dataType the data type of the specified column (e.g., BOOLEAN, INT, TEXT, etc.) + * @param columnInfo the ScalarDB table column information containing column name and metadata + * @param value the string representation of the value for the ScalarDB column (maybe {@code + * null}) * @return the ScalarDB column created from the specified data - * @throws ColumnParsingException if an error occurs while creating the column or parsing the - * value + * @throws ColumnParsingException if an error occurs while creating the column, such as: + *

    + *
  • Invalid number format for numeric types + *
  • Invalid base64 encoding for BLOB type + *
  • Invalid date/time format for temporal types + *
*/ public static Column createColumnFromValue( DataType dataType, ColumnInfo columnInfo, @Nullable String value) @@ -112,4 +142,155 @@ public static Column createColumnFromValue( e); } } + + /** + * Retrieves columns from a ScalarDB Result object, comparing with source data and handling + * metadata. + * + *

This method processes the result data while: + * + *

    + *
  • Excluding transaction metadata columns + *
  • Excluding partition and clustering key columns + *
  • Handling null values based on the ignoreNullValues parameter + *
  • Merging data from both ScalarDB Result and source record + *
+ * + * @param scalarDBResult the ScalarDB Result object containing the current data + * @param sourceRecord the source data in JSON format to compare against + * @param ignoreNullValues if true, null values will be excluded from the result + * @param tableMetadata metadata about the table structure and column types + * @return a List of Column objects representing the processed data + * @throws Base64Exception if there's an error processing base64 encoded BLOB data + * @throws ColumnParsingException if there's an error parsing column values + */ + public static List> getColumnsFromResult( + Result scalarDBResult, + JsonNode sourceRecord, + boolean ignoreNullValues, + TableMetadata tableMetadata) + throws Base64Exception, ColumnParsingException { + + List> columns = new ArrayList<>(); + Set columnsToIgnore = + getColumnsToIgnore( + tableMetadata.getPartitionKeyNames(), tableMetadata.getClusteringKeyNames()); + for (String columnName : tableMetadata.getColumnNames()) { + if (ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata) + || columnsToIgnore.contains(columnName)) { + continue; + } + + Column column = + getColumn( + scalarDBResult, + sourceRecord, + columnName, + ignoreNullValues, + tableMetadata.getColumnDataTypes()); + + if (column != null) { + columns.add(column); + } + } + + return columns; + } + + /** + * Creates a set of column names that should be ignored during processing. + * + *

This method combines: + * + *

    + *
  • Transaction metadata columns + *
  • Partition key columns + *
  • Clustering key columns + *
+ * + * @param partitionKeyNames set of column names that are partition keys + * @param clusteringKeyNames set of column names that are clustering keys + * @return a Set of column names that should be ignored during processing + */ + private static Set getColumnsToIgnore( + Set partitionKeyNames, Set clusteringKeyNames) { + Set columnsToIgnore = + new HashSet<>(ConsensusCommitUtils.getTransactionMetaColumns().keySet()); + columnsToIgnore.addAll(partitionKeyNames); + columnsToIgnore.addAll(clusteringKeyNames); + return columnsToIgnore; + } + + /** + * Retrieves a column value by comparing ScalarDB Result data with source record data. + * + *

This method determines which data source to use for the column value: + * + *

    + *
  • If the column exists in ScalarDB Result but not in source record, uses Result data + *
  • Otherwise, uses the source record data + *
+ * + * @param scalarDBResult the ScalarDB Result object containing current data + * @param sourceRecord the source data in JSON format + * @param columnName the name of the column to retrieve + * @param ignoreNullValues whether to ignore null values in the result + * @param dataTypesByColumns mapping of column names to their data types + * @return the Column object containing the value, or null if ignored + * @throws ColumnParsingException if there's an error parsing the column value + */ + private static Column getColumn( + @Nullable Result scalarDBResult, + JsonNode sourceRecord, + String columnName, + boolean ignoreNullValues, + Map dataTypesByColumns) + throws ColumnParsingException { + if (scalarDBResult != null && !sourceRecord.has(columnName)) { + return getColumnFromResult(scalarDBResult, columnName); + } else { + return getColumnFromSourceRecord( + sourceRecord, columnName, ignoreNullValues, dataTypesByColumns); + } + } + + /** + * Get column from result + * + * @param scalarDBResult result record + * @param columnName column name + * @return column data + */ + private static Column getColumnFromResult(Result scalarDBResult, String columnName) { + Map> columnValues = scalarDBResult.getColumns(); + return columnValues.get(columnName); + } + + /** + * Get column from result + * + * @param sourceRecord source data + * @param columnName column name + * @param ignoreNullValues ignore null values or not + * @param dataTypesByColumns data types of columns + * @return column data + * @throws ColumnParsingException if an error occurs while parsing the column + */ + private static Column getColumnFromSourceRecord( + JsonNode sourceRecord, + String columnName, + boolean ignoreNullValues, + Map dataTypesByColumns) + throws ColumnParsingException { + DataType dataType = dataTypesByColumns.get(columnName); + String columnValue = + sourceRecord.has(columnName) && !sourceRecord.get(columnName).isNull() + ? sourceRecord.get(columnName).asText() + : null; + if (!ignoreNullValues || columnValue != null) { + ColumnInfo columnInfo = ColumnInfo.builder().columnName(columnName).build(); + return createColumnFromValue(dataType, columnInfo, columnValue); + } + return null; + } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java index c2491df0f4..e3433a31b5 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/KeyUtils.java @@ -1,21 +1,38 @@ package com.scalar.db.dataloader.core.util; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.scalar.db.api.TableMetadata; import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ColumnInfo; import com.scalar.db.dataloader.core.ColumnKeyValue; +import com.scalar.db.dataloader.core.exception.Base64Exception; import com.scalar.db.dataloader.core.exception.ColumnParsingException; import com.scalar.db.dataloader.core.exception.KeyParsingException; import com.scalar.db.io.Column; import com.scalar.db.io.DataType; import com.scalar.db.io.Key; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import javax.annotation.Nullable; /** * Utility class for creating and managing ScalarDB keys. * - *

This class provides methods to parse key-value pairs and create ScalarDB key instances. It - * also includes utility methods for handling data types, columns, and potential parsing exceptions. + *

This class provides utility methods for: + * + *

    + *
  • Creating partition and clustering keys from source records + *
  • Parsing key-value pairs into ScalarDB Key instances + *
  • Creating composite keys from multiple columns + *
+ * + *

The class handles proper type conversion and validation of keys according to the table + * metadata and column data types. It also provides comprehensive error handling for various + * key-related operations. */ public final class KeyUtils { @@ -23,20 +40,67 @@ public final class KeyUtils { private KeyUtils() {} /** - * Converts a key-value pair, in the format of =, into a ScalarDB Key instance for a - * specific ScalarDB table. + * Creates an {@link Optional} clustering key from the given source record. + * + *

This method constructs a clustering key by extracting values from the source record for each + * clustering key column. If any required clustering key column is missing from the source record + * or if there's an error in data conversion, an empty Optional is returned. + * + * @param clusteringKeyNames A set of column names that make up the clustering key + * @param dataTypeByColumnName A map defining the data type for each column name + * @param sourceRecord The source record containing the column values + * @return An {@link Optional} containing the clustering key if all required columns exist and are + * valid, otherwise {@link Optional#empty()} + */ + public static Optional createClusteringKeyFromSource( + Set clusteringKeyNames, + Map dataTypeByColumnName, + ObjectNode sourceRecord) { + return clusteringKeyNames.isEmpty() + ? Optional.empty() + : createKeyFromSource(clusteringKeyNames, dataTypeByColumnName, sourceRecord); + } + + /** + * Creates an {@link Optional} partition key from the given source record. * - *

This method uses the provided table metadata to determine the data type for the key and - * creates a corresponding ScalarDB Key. If the key does not match any column in the table - * metadata, a {@link KeyParsingException} is thrown. + *

This method constructs a partition key by extracting values from the source record for each + * partition key column. If any required partition key column is missing from the source record or + * if there's an error in data conversion, an empty Optional is returned. * - * @param columnKeyValue a key-value pair in the format of = - * @param namespace the name of the ScalarDB namespace - * @param tableName the name of the ScalarDB table - * @param tableMetadata metadata for the ScalarDB table - * @return a new ScalarDB Key instance formatted according to the data type - * @throws KeyParsingException if there is an error parsing the key value or if the column does - * not exist + * @param partitionKeyNames A set of column names that make up the partition key + * @param dataTypeByColumnName A map defining the data type for each column name + * @param sourceRecord The source record containing the column values + * @return An {@link Optional} containing the partition key if all required columns exist and are + * valid, otherwise {@link Optional#empty()} + */ + public static Optional createPartitionKeyFromSource( + Set partitionKeyNames, + Map dataTypeByColumnName, + ObjectNode sourceRecord) { + return createKeyFromSource(partitionKeyNames, dataTypeByColumnName, sourceRecord); + } + + /** + * Converts a key-value pair into a ScalarDB Key instance for a specific ScalarDB table. + * + *

This method performs the following steps: + * + *

    + *
  1. Validates that the column exists in the table metadata + *
  2. Determines the correct data type for the column + *
  3. Converts the value to the appropriate type + *
  4. Creates and returns a new ScalarDB Key instance + *
+ * + * @param columnKeyValue A key-value pair containing the column name and value + * @param namespace The name of the ScalarDB namespace + * @param tableName The name of the ScalarDB table + * @param tableMetadata Metadata for the ScalarDB table + * @return A new ScalarDB Key instance formatted according to the data type, or null if + * columnKeyValue is null + * @throws KeyParsingException If the column doesn't exist in the table or if there's an error + * parsing the value */ @Nullable public static Key parseKeyValue( @@ -67,14 +131,16 @@ public static Key parseKeyValue( /** * Creates a ScalarDB key based on the provided data type, column information, and value. * - *

This method creates a ScalarDB Key instance by converting the column value to the - * appropriate data type and constructing the key using that value. + *

This method handles the conversion of string values to their appropriate ScalarDB data types + * and constructs a single-column key. The method ensures type safety and proper formatting of the + * key value according to the specified data type. * - * @param dataType the data type of the specified column - * @param columnInfo the ScalarDB table column information - * @param value the value for the ScalarDB key - * @return a ScalarDB Key instance - * @throws KeyParsingException if there is an error while creating the ScalarDB key + * @param dataType The data type of the specified column + * @param columnInfo The ScalarDB table column information + * @param value The string value to be converted and used as the key + * @return A ScalarDB Key instance containing the converted value + * @throws KeyParsingException If there's an error converting the value to the specified data type + * or creating the key */ public static Key createKey(DataType dataType, ColumnInfo columnInfo, String value) throws KeyParsingException { @@ -85,4 +151,66 @@ public static Key createKey(DataType dataType, ColumnInfo columnInfo, String val throw new KeyParsingException(e.getMessage(), e); } } + + /** + * Creates a new composite ScalarDB key from multiple columns. + * + *

This method creates a composite key by combining multiple columns, each with its own data + * type and value. The method requires that all input lists (dataTypes, columnNames, and values) + * have the same length. If the lists are not of equal length, an empty Optional is returned. + * + *

The method performs the following for each column: + * + *

    + *
  1. Creates a ColumnInfo instance + *
  2. Converts the string value to the appropriate data type + *
  3. Adds the converted value to the composite key + *
+ * + * @param dataTypes List of data types for each column in the composite key + * @param columnNames List of column names corresponding to each data type + * @param values List of string values to be converted and used in the key + * @return An Optional containing the composite ScalarDB Key if successful, or empty if the input + * lists have different lengths + * @throws Base64Exception If there's an error processing Base64-encoded values + * @throws ColumnParsingException If there's an error converting any value to its specified data + * type + */ + public static Optional createCompositeKey( + List dataTypes, List columnNames, List values) + throws Base64Exception, ColumnParsingException { + if (!CollectionUtil.areSameLength(dataTypes, columnNames, values)) { + return Optional.empty(); + } + Key.Builder builder = Key.newBuilder(); + for (int i = 0; i < dataTypes.size(); i++) { + ColumnInfo columnInfo = ColumnInfo.builder().columnName(columnNames.get(i)).build(); + Column keyValue = + ColumnUtils.createColumnFromValue(dataTypes.get(i), columnInfo, values.get(i)); + builder.add(keyValue); + } + return Optional.of(builder.build()); + } + + private static Optional createKeyFromSource( + Set keyNames, Map columnDataTypes, JsonNode sourceRecord) { + List dataTypes = new ArrayList<>(); + List columnNames = new ArrayList<>(); + List values = new ArrayList<>(); + + for (String keyName : keyNames) { + if (!columnDataTypes.containsKey(keyName) || !sourceRecord.has(keyName)) { + return Optional.empty(); + } + dataTypes.add(columnDataTypes.get(keyName)); + columnNames.add(keyName); + values.add(sourceRecord.get(keyName).asText()); + } + + try { + return createCompositeKey(dataTypes, columnNames, values); + } catch (Base64Exception | ColumnParsingException e) { + return Optional.empty(); + } + } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java index 52947e139d..3df487fa7b 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/UnitTestUtils.java @@ -5,7 +5,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.api.Result; import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.ResultImpl; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping; +import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes; import com.scalar.db.dataloader.core.util.DecimalUtil; import com.scalar.db.io.BigIntColumn; import com.scalar.db.io.BlobColumn; @@ -16,17 +22,25 @@ import com.scalar.db.io.DoubleColumn; import com.scalar.db.io.FloatColumn; import com.scalar.db.io.IntColumn; +import com.scalar.db.io.Key; import com.scalar.db.io.TextColumn; import com.scalar.db.io.TimeColumn; import com.scalar.db.io.TimestampColumn; import com.scalar.db.io.TimestampTZColumn; import com.scalar.db.transaction.consensuscommit.Attribute; +import java.io.BufferedReader; +import java.io.StringReader; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; -import java.util.*; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; /** Utils for the service unit tests */ public class UnitTestUtils { @@ -273,4 +287,70 @@ public static String getSourceTestValue(DataType dataType) { return TEST_VALUE_TEXT; } } + + public static TableColumnDataTypes getTableColumnData() { + TableColumnDataTypes tableColumnDataTypes = new TableColumnDataTypes(); + Map tableMetadataMap = new HashMap<>(); + tableMetadataMap.put("namespace.table", createTestTableMetadata()); + tableMetadataMap.forEach( + (name, metadata) -> + metadata + .getColumnDataTypes() + .forEach((k, v) -> tableColumnDataTypes.addColumnDataType(name, k, v))); + return tableColumnDataTypes; + } + + public static ControlFile getControlFile() { + List controlFileTables = new ArrayList<>(); + List mappings = new ArrayList<>(); + mappings.add(new ControlFileTableFieldMapping("col1", "col1")); + mappings.add(new ControlFileTableFieldMapping("col2", "col2")); + mappings.add(new ControlFileTableFieldMapping("col3", "col3")); + mappings.add(new ControlFileTableFieldMapping("col4", "col4")); + mappings.add(new ControlFileTableFieldMapping("col5", "col5")); + mappings.add(new ControlFileTableFieldMapping("col6", "col6")); + mappings.add(new ControlFileTableFieldMapping("col7", "col7")); + mappings.add(new ControlFileTableFieldMapping("col8", "col8")); + mappings.add(new ControlFileTableFieldMapping("col9", "col9")); + mappings.add(new ControlFileTableFieldMapping("col10", "col10")); + mappings.add(new ControlFileTableFieldMapping("col11", "col11")); + mappings.add(new ControlFileTableFieldMapping("col11", "col11")); + controlFileTables.add(new ControlFileTable("namespace", "table", mappings)); + return new ControlFile(controlFileTables); + } + + public static BufferedReader getJsonReader() { + String jsonData = + "[{\"col1\":1,\"col2\":\"1\",\"col3\":\"1\",\"col4\":\"1.4e-45\",\"col5\":\"5e-324\",\"col6\":\"VALUE!!s\",\"col7\":\"0x626C6F6220746573742076616C7565\",\"col8\":\"2000-01-01\",\"col9\":\"01:01:01.000000\",\"col10\":\"2000-01-01T01:01:00\",\"col11\":\"1970-01-21T03:20:41.740Z\"}]"; + return new BufferedReader(new StringReader(jsonData)); + } + + public static BufferedReader getJsonLinesReader() { + String jsonLinesData = + "{\"col1\":1,\"col2\":\"1\",\"col3\":\"1\",\"col4\":\"1.4e-45\",\"col5\":\"5e-324\",\"col6\":\"VALUE!!s\",\"col7\":\"0x626C6F6220746573742076616C7565\",\"col8\":\"2000-01-01\",\"col9\":\"01:01:01.000000\",\"col10\":\"2000-01-01T01:01:00\",\"col11\":\"1970-01-21T03:20:41.740Z\"}\n"; + return new BufferedReader(new StringReader(jsonLinesData)); + } + + public static BufferedReader getCsvReader() { + String csvData = + "col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11 \n" + + "1,1,1,1.4E-45,5e-324,VALUE!!s,0x626C6F6220746573742076616C7565,2000-01-01,01:01:01.000000,2000-01-01T01:01:00,1970-01-21T03:20:41.740Z \n"; + return new BufferedReader(new StringReader(csvData)); + } + + public static Key getClusteringKey() { + return Key.newBuilder() + .add(IntColumn.of("col2", 1)) + .add(BooleanColumn.of("col3", true)) + .build(); + } + + public static Key getPartitionKey(int j) { + return Key.ofBigInt("col1", j); + } + + public static Optional getResult(long pk) { + Result data = new ResultImpl(createTestValues(), createTestTableMetadata()); + return Optional.of(data); + } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java new file mode 100644 index 0000000000..94acd20ace --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java @@ -0,0 +1,121 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDBMode; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.dataloader.core.dataimport.ImportMode; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.LogMode; +import com.scalar.db.exception.transaction.TransactionException; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +class CsvImportProcessorTest { + @Mock private ImportProcessorParams params; + @Mock ScalarDBMode scalarDBMode; + @Mock ImportOptions importOptions; + @Mock Map tableMetadataByTableName; + @Mock TableColumnDataTypes tableColumnDataTypes; + + ScalarDBDao dao; + @Mock DistributedStorage distributedStorage; + DistributedTransactionManager distributedTransactionManager; + CsvImportProcessor csvImportProcessor; + + @BeforeEach + void setup() throws ScalarDBDaoException, TransactionException { + dao = Mockito.mock(ScalarDBDao.class); + distributedTransactionManager = mock(DistributedTransactionManager.class); + DistributedTransaction distributedTransaction = mock(DistributedTransaction.class); + when(distributedTransactionManager.start()).thenReturn(distributedTransaction); + tableMetadataByTableName = new HashMap<>(); + tableMetadataByTableName.put("namespace.table", UnitTestUtils.createTestTableMetadata()); + tableColumnDataTypes = UnitTestUtils.getTableColumnData(); + importOptions = + ImportOptions.builder() + .importMode(ImportMode.UPSERT) + .fileFormat(FileFormat.CSV) + .controlFile(UnitTestUtils.getControlFile()) + .controlFileValidationLevel(ControlFileValidationLevel.MAPPED) + .namespace("namespace") + .transactionBatchSize(1) + .dataChunkSize(5) + .tableName("table") + .logMode(LogMode.SINGLE_FILE) + .maxThreads(8) + .dataChunkQueueSize(256) + .build(); + Mockito.when( + dao.get( + "namespace", + "table", + UnitTestUtils.getPartitionKey(1), + UnitTestUtils.getClusteringKey(), + distributedStorage)) + .thenReturn(UnitTestUtils.getResult(1)); + Mockito.when( + dao.get( + "namespace", + "table", + UnitTestUtils.getPartitionKey(1), + UnitTestUtils.getClusteringKey(), + distributedTransaction)) + .thenReturn(UnitTestUtils.getResult(1)); + } + + @Test + void test_importProcessWithStorage() { + params = + ImportProcessorParams.builder() + .scalarDBMode(ScalarDBMode.STORAGE) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .scalarDBMode(scalarDBMode) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + csvImportProcessor = new CsvImportProcessor(params); + Map statusList = + csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); + assert statusList != null; + Assertions.assertEquals(1, statusList.size()); + } + + @Test + void test_importProcessWithTransaction() { + params = + ImportProcessorParams.builder() + .scalarDBMode(ScalarDBMode.TRANSACTION) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .scalarDBMode(scalarDBMode) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + csvImportProcessor = new CsvImportProcessor(params); + Map statusList = + csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); + assert statusList != null; + Assertions.assertEquals(1, statusList.size()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactoryTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactoryTest.java new file mode 100644 index 0000000000..3e6ed5bcc8 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/DefaultImportProcessorFactoryTest.java @@ -0,0 +1,70 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link DefaultImportProcessorFactory} class. Tests the factory's ability to create + * appropriate import processors based on different file formats. + */ +class DefaultImportProcessorFactoryTest { + + private DefaultImportProcessorFactory factory; + + @BeforeEach + void setUp() { + factory = new DefaultImportProcessorFactory(); + } + + /** + * Tests that the factory creates a {@link JsonLinesImportProcessor} when JSONL format is + * specified. + */ + @Test + void createImportProcessor_givenFileFormatIsJsonl_shouldReturnJsonLinesImportProcessor() { + // Arrange + ImportOptions importOptions = ImportOptions.builder().fileFormat(FileFormat.JSONL).build(); + ImportProcessorParams params = + ImportProcessorParams.builder().importOptions(importOptions).build(); + + // Act + ImportProcessor result = factory.createImportProcessor(params); + + // Assert + assertInstanceOf(JsonLinesImportProcessor.class, result); + } + + /** Tests that the factory creates a {@link JsonImportProcessor} when JSON format is specified. */ + @Test + void createImportProcessor_givenFileFormatIsJson_shouldReturnJsonImportProcessor() { + // Given + ImportOptions importOptions = ImportOptions.builder().fileFormat(FileFormat.JSON).build(); + ImportProcessorParams params = + ImportProcessorParams.builder().importOptions(importOptions).build(); + + // When + ImportProcessor result = factory.createImportProcessor(params); + + // Then + assertInstanceOf(JsonImportProcessor.class, result); + } + + /** Tests that the factory creates a {@link CsvImportProcessor} when CSV format is specified. */ + @Test + void createImportProcessor_givenFileFormatIsCsv_shouldReturnCsvImportProcessor() { + // Given + ImportOptions importOptions = ImportOptions.builder().fileFormat(FileFormat.CSV).build(); + ImportProcessorParams params = + ImportProcessorParams.builder().importOptions(importOptions).build(); + + // When + ImportProcessor result = factory.createImportProcessor(params); + + // Then + assertInstanceOf(CsvImportProcessor.class, result); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java new file mode 100644 index 0000000000..aa9a106a0c --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java @@ -0,0 +1,121 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDBMode; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.dataloader.core.dataimport.ImportMode; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.LogMode; +import com.scalar.db.exception.transaction.TransactionException; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +class JsonImportProcessorTest { + @Mock private ImportProcessorParams params; + @Mock ScalarDBMode scalarDBMode; + @Mock ImportOptions importOptions; + @Mock Map tableMetadataByTableName; + @Mock TableColumnDataTypes tableColumnDataTypes; + + ScalarDBDao dao; + @Mock DistributedStorage distributedStorage; + DistributedTransactionManager distributedTransactionManager; + JsonImportProcessor jsonImportProcessor; + + @BeforeEach + void setup() throws ScalarDBDaoException, TransactionException { + dao = Mockito.mock(ScalarDBDao.class); + distributedTransactionManager = mock(DistributedTransactionManager.class); + DistributedTransaction distributedTransaction = mock(DistributedTransaction.class); + when(distributedTransactionManager.start()).thenReturn(distributedTransaction); + tableMetadataByTableName = new HashMap<>(); + tableMetadataByTableName.put("namespace.table", UnitTestUtils.createTestTableMetadata()); + tableColumnDataTypes = UnitTestUtils.getTableColumnData(); + importOptions = + ImportOptions.builder() + .importMode(ImportMode.UPSERT) + .fileFormat(FileFormat.JSON) + .controlFile(UnitTestUtils.getControlFile()) + .controlFileValidationLevel(ControlFileValidationLevel.MAPPED) + .namespace("namespace") + .transactionBatchSize(1) + .dataChunkSize(5) + .tableName("table") + .logMode(LogMode.SINGLE_FILE) + .maxThreads(8) + .dataChunkQueueSize(256) + .build(); + Mockito.when( + dao.get( + "namespace", + "table", + UnitTestUtils.getPartitionKey(1), + UnitTestUtils.getClusteringKey(), + distributedStorage)) + .thenReturn(UnitTestUtils.getResult(1)); + Mockito.when( + dao.get( + "namespace", + "table", + UnitTestUtils.getPartitionKey(1), + UnitTestUtils.getClusteringKey(), + distributedTransaction)) + .thenReturn(UnitTestUtils.getResult(1)); + } + + @Test + void test_importProcessWithStorage() { + params = + ImportProcessorParams.builder() + .scalarDBMode(ScalarDBMode.STORAGE) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .scalarDBMode(scalarDBMode) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + jsonImportProcessor = new JsonImportProcessor(params); + Map statusList = + jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); + assert statusList != null; + Assertions.assertEquals(1, statusList.size()); + } + + @Test + void test_importProcessWithTransaction() { + params = + ImportProcessorParams.builder() + .scalarDBMode(ScalarDBMode.TRANSACTION) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .scalarDBMode(scalarDBMode) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + jsonImportProcessor = new JsonImportProcessor(params); + Map statusList = + jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); + assert statusList != null; + Assertions.assertEquals(1, statusList.size()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java new file mode 100644 index 0000000000..e3db391756 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java @@ -0,0 +1,121 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDBMode; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.dataloader.core.dataimport.ImportMode; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.LogMode; +import com.scalar.db.exception.transaction.TransactionException; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +class JsonLinesImportProcessorTest { + @Mock private ImportProcessorParams params; + @Mock ScalarDBMode scalarDBMode; + @Mock ImportOptions importOptions; + @Mock Map tableMetadataByTableName; + @Mock TableColumnDataTypes tableColumnDataTypes; + + ScalarDBDao dao; + @Mock DistributedStorage distributedStorage; + DistributedTransactionManager distributedTransactionManager; + JsonLinesImportProcessor jsonLinesImportProcessor; + + @BeforeEach + void setup() throws ScalarDBDaoException, TransactionException { + dao = Mockito.mock(ScalarDBDao.class); + distributedTransactionManager = mock(DistributedTransactionManager.class); + DistributedTransaction distributedTransaction = mock(DistributedTransaction.class); + when(distributedTransactionManager.start()).thenReturn(distributedTransaction); + tableMetadataByTableName = new HashMap<>(); + tableMetadataByTableName.put("namespace.table", UnitTestUtils.createTestTableMetadata()); + tableColumnDataTypes = UnitTestUtils.getTableColumnData(); + importOptions = + ImportOptions.builder() + .importMode(ImportMode.UPSERT) + .fileFormat(FileFormat.JSONL) + .controlFile(UnitTestUtils.getControlFile()) + .controlFileValidationLevel(ControlFileValidationLevel.MAPPED) + .namespace("namespace") + .transactionBatchSize(1) + .dataChunkSize(5) + .tableName("table") + .maxThreads(8) + .dataChunkQueueSize(256) + .logMode(LogMode.SINGLE_FILE) + .build(); + Mockito.when( + dao.get( + "namespace", + "table", + UnitTestUtils.getPartitionKey(1), + UnitTestUtils.getClusteringKey(), + distributedStorage)) + .thenReturn(UnitTestUtils.getResult(1)); + Mockito.when( + dao.get( + "namespace", + "table", + UnitTestUtils.getPartitionKey(1), + UnitTestUtils.getClusteringKey(), + distributedTransaction)) + .thenReturn(UnitTestUtils.getResult(1)); + } + + @Test + void test_importProcessWithStorage() { + params = + ImportProcessorParams.builder() + .scalarDBMode(ScalarDBMode.STORAGE) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .scalarDBMode(scalarDBMode) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + jsonLinesImportProcessor = new JsonLinesImportProcessor(params); + Map statusList = + jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); + assert statusList != null; + Assertions.assertEquals(1, statusList.size()); + } + + @Test + void test_importProcessWithTransaction() { + params = + ImportProcessorParams.builder() + .scalarDBMode(ScalarDBMode.TRANSACTION) + .importOptions(importOptions) + .dao(dao) + .distributedStorage(distributedStorage) + .distributedTransactionManager(distributedTransactionManager) + .scalarDBMode(scalarDBMode) + .tableColumnDataTypes(tableColumnDataTypes) + .tableMetadataByTableName(tableMetadataByTableName) + .build(); + jsonLinesImportProcessor = new JsonLinesImportProcessor(params); + Map statusList = + jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); + assert statusList != null; + Assertions.assertEquals(1, statusList.size()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypesTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypesTest.java new file mode 100644 index 0000000000..687397523f --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/TableColumnDataTypesTest.java @@ -0,0 +1,49 @@ +package com.scalar.db.dataloader.core.dataimport.processor; + +import com.scalar.db.io.DataType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for the TableColumnDataTypes class which manages data type mappings for table columns. + */ +class TableColumnDataTypesTest { + + TableColumnDataTypes tableColumnDataTypes; + + /** + * Tests that column data types can be successfully added and retrieved for a table. Verifies that + * the correct data type is returned for a specific column after adding multiple column + * definitions. + */ + @Test + void addColumnDataType_withValidData_shouldAddColumnDataType() { + tableColumnDataTypes = new TableColumnDataTypes(); + tableColumnDataTypes.addColumnDataType("table", "id", DataType.BIGINT); + tableColumnDataTypes.addColumnDataType("table", "name", DataType.TEXT); + Assertions.assertEquals( + DataType.BIGINT, tableColumnDataTypes.getColumnDataTypes("table").get("id")); + } + + /** + * Tests the retrieval of a data type for a specific table and column combination. Verifies that + * the correct data type is returned when the table and column exist in the mapping. + */ + @Test + void getDataType_withValidTableAndColumnName_shouldReturnCorrectDataType() { + tableColumnDataTypes = new TableColumnDataTypes(); + tableColumnDataTypes.addColumnDataType("table", "id", DataType.BIGINT); + tableColumnDataTypes.addColumnDataType("table", "name", DataType.TEXT); + Assertions.assertEquals(DataType.TEXT, tableColumnDataTypes.getDataType("table", "name")); + } + + /** + * Tests the behavior when attempting to retrieve a data type for a non-existent table and column + * combination. Verifies that null is returned when the requested mapping doesn't exist. + */ + @Test + void getDataType_withInvalidTableAndColumnName_shouldReturnCorrectDataType() { + tableColumnDataTypes = new TableColumnDataTypes(); + Assertions.assertNull(tableColumnDataTypes.getDataType("table", "name")); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java index 7cade95d06..09c94b3844 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java @@ -1,9 +1,16 @@ package com.scalar.db.dataloader.core.util; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.scalar.db.api.Result; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.ResultImpl; import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ColumnInfo; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.dataloader.core.exception.Base64Exception; import com.scalar.db.dataloader.core.exception.ColumnParsingException; import com.scalar.db.io.BigIntColumn; import com.scalar.db.io.BlobColumn; @@ -24,16 +31,33 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Base64; +import java.util.List; +import java.util.Map; import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +/** + * Unit tests for the ColumnUtils class which handles column creation and manipulation. Tests + * various data type conversions and error handling scenarios. + */ class ColumnUtilsTest { private static final float FLOAT_VALUE = 2.78f; + private static final TableMetadata mockMetadata = UnitTestUtils.createTestTableMetadata(); + private static final ObjectNode sourceRecord = UnitTestUtils.getOutputDataWithMetadata(); + private static final Map> values = UnitTestUtils.createTestValues(); + private static final Result scalarDBResult = new ResultImpl(values, mockMetadata); + /** + * Provides test cases for column creation with different data types and values. Each test case + * includes: - The target DataType - Column name - Input value (as string) - Expected Column + * object + * + * @return Stream of Arguments containing test parameters + */ private static Stream provideColumnsForCreateColumnFromValue() { return Stream.of( Arguments.of(DataType.BOOLEAN, "boolColumn", "true", BooleanColumn.of("boolColumn", true)), @@ -101,6 +125,16 @@ private static Stream provideColumnsForCreateColumnFromValue() { TimestampTZColumn.ofNull("timestampTZColumn"))); } + /** + * Tests column creation from string values for various data types. Verifies that the created + * column matches the expected column with correct type and value. + * + * @param dataType The target ScalarDB data type + * @param columnName Name of the column + * @param value String value to convert + * @param expectedColumn Expected Column object after conversion + * @throws ColumnParsingException if the value cannot be parsed into the target data type + */ @ParameterizedTest @MethodSource("provideColumnsForCreateColumnFromValue") void createColumnFromValue_validInput_returnsColumn( @@ -111,6 +145,10 @@ void createColumnFromValue_validInput_returnsColumn( assertEquals(expectedColumn, actualColumn); } + /** + * Tests that attempting to create a numeric column with an invalid number format throws a + * ColumnParsingException with appropriate error message. + */ @Test void createColumnFromValue_invalidNumberFormat_throwsNumberFormatException() { String columnName = "intColumn"; @@ -127,6 +165,10 @@ void createColumnFromValue_invalidNumberFormat_throwsNumberFormatException() { exception.getMessage()); } + /** + * Tests that attempting to create a BLOB column with invalid Base64 encoding throws a + * ColumnParsingException with appropriate error message. + */ @Test void createColumnFromValue_invalidBase64_throwsBase64Exception() { String columnName = "blobColumn"; @@ -142,4 +184,19 @@ void createColumnFromValue_invalidBase64_throwsBase64Exception() { columnName, "table", "ns"), exception.getMessage()); } + + /** + * Tests the extraction of columns from a ScalarDB Result object. Verifies that all columns are + * correctly extracted and converted from the source record. + * + * @throws Base64Exception if BLOB data contains invalid Base64 encoding + * @throws ColumnParsingException if any column value cannot be parsed into its target data type + */ + @Test + void getColumnsFromResult_withValidData_shouldReturnColumns() + throws Base64Exception, ColumnParsingException { + List> columns = + ColumnUtils.getColumnsFromResult(scalarDBResult, sourceRecord, false, mockMetadata); + assertEquals(8, columns.size()); + } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java index f2fe680490..eb19b12c85 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/KeyUtilsTest.java @@ -1,12 +1,16 @@ package com.scalar.db.dataloader.core.util; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.scalar.db.api.TableMetadata; import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ColumnInfo; import com.scalar.db.dataloader.core.ColumnKeyValue; +import com.scalar.db.dataloader.core.UnitTestUtils; import com.scalar.db.dataloader.core.exception.KeyParsingException; import com.scalar.db.io.BigIntColumn; import com.scalar.db.io.BlobColumn; @@ -19,21 +23,38 @@ import com.scalar.db.io.TextColumn; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +/** + * Unit tests for the KeyUtils class which handles parsing and creation of database keys. Tests + * cover various data types and key creation scenarios including partition and clustering keys. + */ @ExtendWith(MockitoExtension.class) class KeyUtilsTest { @Mock private TableMetadata tableMetadata; + private static final Map dataTypeByColumnName = UnitTestUtils.getColumnData(); + private static final ObjectNode sourceRecord = UnitTestUtils.getOutputDataWithMetadata(); + /** Tests that parsing a null key value returns null. */ @Test void parseKeyValue_nullKeyValue_returnsNull() throws KeyParsingException { assertNull(KeyUtils.parseKeyValue(null, null, null, tableMetadata)); } + /** + * Tests that attempting to parse a key value with an invalid column name throws + * KeyParsingException. The exception should contain appropriate error message with namespace and + * table details. + */ @Test void parseKeyValue_invalidColumnName_throwsKeyParsingException() { String columnName = "invalidColumn"; @@ -50,6 +71,7 @@ void parseKeyValue_invalidColumnName_throwsKeyParsingException() { exception.getMessage()); } + /** Tests successful parsing of a valid key value with TEXT data type. */ @Test void parseKeyValue_validKeyValue_returnsKey() throws KeyParsingException { String columnName = "columnName"; @@ -64,6 +86,7 @@ void parseKeyValue_validKeyValue_returnsKey() throws KeyParsingException { assertEquals(expected, actual); } + /** Tests creation of a key with BOOLEAN data type. */ @Test void createKey_boolean_returnsKey() throws KeyParsingException { String columnName = "booleanColumn"; @@ -74,6 +97,7 @@ void createKey_boolean_returnsKey() throws KeyParsingException { assertEquals(expected, actual); } + /** Tests creation of a key with INT data type. */ @Test void createKey_int_returnsKey() throws KeyParsingException { String columnName = "intColumn"; @@ -84,6 +108,7 @@ void createKey_int_returnsKey() throws KeyParsingException { assertEquals(expected, actual); } + /** Tests creation of a key with BIGINT data type. */ @Test void createKey_bigint_returnsKey() throws KeyParsingException { String columnName = "bigintColumn"; @@ -94,6 +119,7 @@ void createKey_bigint_returnsKey() throws KeyParsingException { assertEquals(expected, actual); } + /** Tests creation of a key with FLOAT data type. */ @Test void createKey_float_returnsKey() throws KeyParsingException { String columnName = "floatColumn"; @@ -104,6 +130,7 @@ void createKey_float_returnsKey() throws KeyParsingException { assertEquals(expected, actual); } + /** Tests creation of a key with DOUBLE data type. */ @Test void createKey_double_returnsKey() throws KeyParsingException { String columnName = "doubleColumn"; @@ -114,6 +141,7 @@ void createKey_double_returnsKey() throws KeyParsingException { assertEquals(expected, actual); } + /** Tests creation of a key with TEXT data type. */ @Test void createKey_text_returnsKey() throws KeyParsingException { String columnName = "textColumn"; @@ -124,6 +152,7 @@ void createKey_text_returnsKey() throws KeyParsingException { assertEquals(expected, actual); } + /** Tests creation of a key with BLOB data type using Base64 encoded input. */ @Test void createKey_blob_returnsKey() throws KeyParsingException { String columnName = "blobColumn"; @@ -138,6 +167,10 @@ void createKey_blob_returnsKey() throws KeyParsingException { assertEquals(expected, actual); } + /** + * Tests that attempting to create a BLOB key with invalid Base64 input throws + * KeyParsingException. + */ @Test void createKey_invalidBase64_throwsBase64Exception() { String columnName = "blobColumn"; @@ -146,4 +179,54 @@ void createKey_invalidBase64_throwsBase64Exception() { assertThrows( KeyParsingException.class, () -> KeyUtils.createKey(DataType.BLOB, columnInfo, value)); } + + /** Tests that creating a clustering key from an empty set returns an empty Optional. */ + @Test + void createClusteringKeyFromSource_withEmptyClusteringKeySet_shouldReturnEmpty() { + Optional key = KeyUtils.createClusteringKeyFromSource(Collections.EMPTY_SET, null, null); + assertEquals(Optional.empty(), key); + } + + /** + * Tests creation of a clustering key from a valid set of clustering columns. Verifies that the + * resulting key contains the expected INT and BOOLEAN values. + */ + @Test + void createClusteringKeyFromSource_withValidClusteringKeySet_shouldReturnValidKey() { + Set clusterKeySet = new HashSet<>(); + clusterKeySet.add(UnitTestUtils.TEST_COLUMN_2_CK); + clusterKeySet.add(UnitTestUtils.TEST_COLUMN_3_CK); + Optional key = + KeyUtils.createClusteringKeyFromSource(clusterKeySet, dataTypeByColumnName, sourceRecord); + assertEquals( + "Optional[Key{IntColumn{name=col2, value=2147483647, hasNullValue=false}, BooleanColumn{name=col3, value=true, hasNullValue=false}}]", + key.toString()); + } + + /** + * Tests that attempting to create a partition key with invalid data returns an empty Optional. + */ + @Test + void createPartitionKeyFromSource_withInvalidData_shouldReturnEmpty() { + Set partitionKeySet = new HashSet<>(); + partitionKeySet.add("id1"); + Optional key = + KeyUtils.createPartitionKeyFromSource(partitionKeySet, dataTypeByColumnName, sourceRecord); + assertEquals(Optional.empty(), key); + } + + /** + * Tests creation of a partition key from valid data. Verifies that the resulting key contains the + * expected BIGINT value. + */ + @Test + void createPartitionKeyFromSource_withValidData_shouldReturnValidKey() { + Set partitionKeySet = new HashSet<>(); + partitionKeySet.add(UnitTestUtils.TEST_COLUMN_1_PK); + Optional key = + KeyUtils.createPartitionKeyFromSource(partitionKeySet, dataTypeByColumnName, sourceRecord); + assertEquals( + "Optional[Key{BigIntColumn{name=col1, value=9007199254740992, hasNullValue=false}}]", + key.toString()); + } }