Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,30 @@ public enum CoreError implements ScalarDbError {
""),
DATA_LOADER_FILE_FORMAT_NOT_SUPPORTED(
Category.USER_ERROR, "0178", "The provided file format is not supported : %s", "", ""),
DATA_LOADER_COULD_NOT_FIND_PARTITION_KEY(
Category.USER_ERROR, "0179", "Could not find the partition key", "", ""),
DATA_LOADER_UPSERT_INSERT_MISSING_COLUMNS(
Category.USER_ERROR,
"0180",
"The source record needs to contain all fields if the UPSERT turns into an INSERT",
"",
""),
DATA_LOADER_DATA_ALREADY_EXISTS(Category.USER_ERROR, "0181", "Record already exists", "", ""),
DATA_LOADER_DATA_NOT_FOUND(Category.USER_ERROR, "0182", "Record was not found", "", ""),
DATA_LOADER_COULD_NOT_FIND_CLUSTERING_KEY(
Category.USER_ERROR, "0183", "Could not find the clustering key", "", ""),
DATA_LOADER_TABLE_METADATA_MISSING(
Category.USER_ERROR, "0184", "No table metadata found", "", ""),
DATA_LOADER_MISSING_SOURCE_FIELD(
Category.USER_ERROR,
"0185",
"The data mapping source field '%s' for table '%s' is missing in the json data record",
"",
""),
DATA_LOADER_CSV_DATA_MISMATCH(
Category.USER_ERROR, "0186", "The CSV row: %s does not match header: %s.", "", ""),
DATA_LOADER_JSON_CONTENT_START_ERROR(
Category.USER_ERROR, "0187", "Expected JSON file content to be an array", "", ""),

//
// Errors for the concurrency error category
Expand Down Expand Up @@ -1033,6 +1057,20 @@ public enum CoreError implements ScalarDbError {
"Something went wrong while scanning. Are you sure you are running in the correct transaction mode? Details: %s",
"",
""),
DATA_LOADER_CSV_FILE_READ_FAILED(
Category.INTERNAL_ERROR, "0049", "Failed to read CSV file. Details: %s.", "", ""),
DATA_LOADER_CSV_FILE_HEADER_READ_FAILED(
Category.INTERNAL_ERROR, "0050", "Failed to CSV read header line. Details: %s.", "", ""),
DATA_LOADER_DATA_CHUNK_PROCESS_FAILED(
Category.INTERNAL_ERROR,
"0051",
"Data chunk processing was interrupted. Details: %s",
"",
""),
DATA_LOADER_JSON_FILE_READ_FAILED(
Category.INTERNAL_ERROR, "0052", "Failed to read JSON file. Details: %s.", "", ""),
DATA_LOADER_JSONLINES_FILE_READ_FAILED(
Category.INTERNAL_ERROR, "0053", "Failed to read JSON Lines file. Details: %s.", "", ""),

//
// Errors for the unknown transaction status error category
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.scalar.db.dataloader.core.dataimport;

import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;

/**
* Listener interface for monitoring import events during the data loading process. Implementations
* can use this to track progress and handle various stages of the import process.
*/
public interface ImportEventListener {

/**
* Called when processing of a data chunk begins.
*
* @param status the current status of the data chunk being processed
*/
void onDataChunkStarted(ImportDataChunkStatus status);

/**
* Updates or adds new status information for a data chunk.
*
* @param status the updated status information for the data chunk
*/
void addOrUpdateDataChunkStatus(ImportDataChunkStatus status);

/**
* Called when processing of a data chunk is completed.
*
* @param status the final status of the completed data chunk
*/
void onDataChunkCompleted(ImportDataChunkStatus status);

/**
* Called when all data chunks have been processed. This indicates that the entire chunked import
* process is complete.
*/
void onAllDataChunksCompleted();

/**
* Called when processing of a transaction batch begins.
*
* @param batchStatus the initial status of the transaction batch
*/
void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus);

/**
* Called when processing of a transaction batch is completed.
*
* @param batchResult the result of the completed transaction batch
*/
void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult);

/**
* Called when an import task is completed.
*
* @param taskResult the result of the completed import task
*/
void onTaskComplete(ImportTaskResult taskResult);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package com.scalar.db.dataloader.core.dataimport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDBMode;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessor;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorParams;
import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
import java.io.BufferedReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AllArgsConstructor;
import lombok.NonNull;

/**
* Manages the data import process and coordinates event handling between the import processor and
* listeners. This class implements {@link ImportEventListener} to receive events from the processor
* and relay them to registered listeners.
*
* <p>The import process involves:
*
* <ul>
* <li>Reading data from an input file
* <li>Processing the data in configurable chunk sizes
* <li>Managing database transactions in batches
* <li>Notifying listeners of various import events
* </ul>
*/
@AllArgsConstructor
public class ImportManager implements ImportEventListener {

@NonNull private final Map<String, TableMetadata> tableMetadata;
@NonNull private final BufferedReader importFileReader;
@NonNull private final ImportOptions importOptions;
private final ImportProcessorFactory importProcessorFactory;
private final List<ImportEventListener> listeners = new ArrayList<>();
private final ScalarDBMode scalarDBMode;
private final DistributedStorage distributedStorage;
private final DistributedTransactionManager distributedTransactionManager;
private final ConcurrentHashMap<Integer, ImportDataChunkStatus> importDataChunkStatusMap =
new ConcurrentHashMap<>();

/**
* Starts the import process using the configured parameters.
*
* <p>If the data chunk size in {@link ImportOptions} is set to 0, the entire file will be
* processed as a single chunk. Otherwise, the file will be processed in chunks of the specified
* size.
*
* @return a map of {@link ImportDataChunkStatus} objects containing the status of each processed
* chunk
*/
public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
ImportProcessorParams params =
ImportProcessorParams.builder()
.scalarDBMode(scalarDBMode)
.importOptions(importOptions)
.tableMetadataByTableName(tableMetadata)
.dao(new ScalarDBDao())
.distributedTransactionManager(distributedTransactionManager)
.distributedStorage(distributedStorage)
.tableColumnDataTypes(getTableColumnDataTypes())
.build();
ImportProcessor processor = importProcessorFactory.createImportProcessor(params);
processor.addListener(this);
// If the data chunk size is 0, then process the entire file in a single data chunk
int dataChunkSize =
importOptions.getDataChunkSize() == 0
? Integer.MAX_VALUE
: importOptions.getDataChunkSize();
return processor.process(
dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
}

/**
* Registers a new listener to receive import events.
*
* @param listener the listener to add
* @throws IllegalArgumentException if the listener is null
*/
public void addListener(ImportEventListener listener) {
listeners.add(listener);
}

/**
* Removes a previously registered listener.
*
* @param listener the listener to remove
*/
public void removeListener(ImportEventListener listener) {
listeners.remove(listener);
}

/** {@inheritDoc} Forwards the event to all registered listeners. */
@Override
public void onDataChunkStarted(ImportDataChunkStatus status) {
for (ImportEventListener listener : listeners) {
listener.onDataChunkStarted(status);
}
}

/**
* {@inheritDoc} Updates or adds the status of a data chunk in the status map. This method is
* thread-safe.
*/
@Override
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {
importDataChunkStatusMap.put(status.getDataChunkId(), status);
}

/** {@inheritDoc} Forwards the event to all registered listeners. */
@Override
public void onDataChunkCompleted(ImportDataChunkStatus status) {
for (ImportEventListener listener : listeners) {
listener.onDataChunkCompleted(status);
}
}

/** {@inheritDoc} Forwards the event to all registered listeners. */
@Override
public void onTransactionBatchStarted(ImportTransactionBatchStatus status) {
for (ImportEventListener listener : listeners) {
listener.onTransactionBatchStarted(status);
}
}

/** {@inheritDoc} Forwards the event to all registered listeners. */
@Override
public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
for (ImportEventListener listener : listeners) {
listener.onTransactionBatchCompleted(batchResult);
}
}

/** {@inheritDoc} Forwards the event to all registered listeners. */
@Override
public void onTaskComplete(ImportTaskResult taskResult) {
for (ImportEventListener listener : listeners) {
listener.onTaskComplete(taskResult);
}
}

/** {@inheritDoc} Forwards the event to all registered listeners. */
@Override
public void onAllDataChunksCompleted() {
for (ImportEventListener listener : listeners) {
listener.onAllDataChunksCompleted();
}
}

/**
* Returns the current map of import data chunk status objects.
*
* @return a map of {@link ImportDataChunkStatus} objects
*/
public ConcurrentHashMap<Integer, ImportDataChunkStatus> getImportDataChunkStatus() {
return importDataChunkStatusMap;
}

/**
* Creates and returns a mapping of table column data types from the table metadata.
*
* @return a {@link TableColumnDataTypes} object containing the column data types for all tables
*/
public TableColumnDataTypes getTableColumnDataTypes() {
TableColumnDataTypes tableColumnDataTypes = new TableColumnDataTypes();
tableMetadata.forEach(
(name, metadata) ->
metadata
.getColumnDataTypes()
.forEach((k, v) -> tableColumnDataTypes.addColumnDataType(name, k, v)));
return tableColumnDataTypes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ public class ImportOptions {
private final String tableName;
private final int maxThreads;
private final String customHeaderRow;
private final int dataChunkQueueSize;
}
Loading
Loading