From dc28b3a6c779ae00c6698239a6f0507ba8c6ff17 Mon Sep 17 00:00:00 2001 From: inv-jishnu <31100916+inv-jishnu@users.noreply.github.com> Date: Wed, 28 May 2025 13:48:44 +0530 Subject: [PATCH] Add import command for data loader CLI (#2618) Co-authored-by: Peckstadt Yves Co-authored-by: Toshihiro Suzuki --- .../cli/command/dataimport/ImportCommand.java | 296 +++++++++++++++++- .../dataimport/ImportCommandOptions.java | 160 +++++++++- .../command/dataimport/ImportCommandTest.java | 66 ++++ 3 files changed, 520 insertions(+), 2 deletions(-) mode change 100644 => 100755 data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java mode change 100644 => 100755 data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandOptions.java create mode 100755 data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java old mode 100644 new mode 100755 index f7eb9a1198..604adc0586 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -1,14 +1,308 @@ package com.scalar.db.dataloader.cli.command.dataimport; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; +import com.scalar.db.dataloader.core.dataimport.ImportManager; +import com.scalar.db.dataloader.core.dataimport.ImportOptions; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbStorageManager; +import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbTransactionManager; +import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; +import com.scalar.db.dataloader.core.dataimport.log.LogMode; +import com.scalar.db.dataloader.core.dataimport.log.SingleFileImportLogger; +import com.scalar.db.dataloader.core.dataimport.log.SplitByDataChunkImportLogger; +import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.processor.DefaultImportProcessorFactory; +import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory; +import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException; +import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService; +import com.scalar.db.dataloader.core.util.TableMetadataUtil; +import com.scalar.db.service.StorageFactory; +import com.scalar.db.service.TransactionFactory; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; +import org.apache.commons.lang3.StringUtils; import picocli.CommandLine; +import picocli.CommandLine.Model.CommandSpec; +import picocli.CommandLine.ParameterException; +import picocli.CommandLine.Spec; @CommandLine.Command(name = "import", description = "Import data into a ScalarDB table") public class ImportCommand extends ImportCommandOptions implements Callable { - @CommandLine.Spec CommandLine.Model.CommandSpec spec; + + /** Spec injected by PicoCli */ + @Spec CommandSpec spec; @Override public Integer call() throws Exception { + validateImportTarget(controlFilePath, namespace, tableName); + validateLogDirectory(logDirectory); + ControlFile controlFile = parseControlFileFromPath(controlFilePath).orElse(null); + ImportOptions importOptions = createImportOptions(controlFile); + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(logDirectory) + .isLogRawSourceRecordsEnabled(importOptions.isLogRawRecord()) + .isLogSuccessRecordsEnabled(importOptions.isLogSuccessRecords()) + .prettyPrint(prettyPrint) + .build(); + LogWriterFactory logWriterFactory = createLogWriterFactory(config); + Map tableMetadataMap = + createTableMetadataMap(controlFile, namespace, tableName); + try (BufferedReader reader = + Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) { + ImportManager importManager = + createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config); + importManager.startImport(); + } return 0; } + + /** + * Create LogWriterFactory object + * + * @return LogWriterFactory object + */ + private LogWriterFactory createLogWriterFactory(ImportLoggerConfig config) { + return new DefaultLogWriterFactory(config); + } + + /** + * Create TableMetadata Map from provided controlfile/ namespace, table name + * + * @param controlFile control file + * @param namespace Namespace + * @param tableName Single table name + * @return {@code Map} a table metadata map + * @throws ParameterException if one of the argument values is wrong + */ + private Map createTableMetadataMap( + ControlFile controlFile, String namespace, String tableName) + throws IOException, TableMetadataException { + File configFile = new File(configFilePath); + StorageFactory storageFactory = StorageFactory.create(configFile); + try (DistributedStorageAdmin storageAdmin = storageFactory.getStorageAdmin()) { + TableMetadataService tableMetadataService = new TableMetadataService(storageAdmin); + Map tableMetadataMap = new HashMap<>(); + if (controlFile != null) { + for (ControlFileTable table : controlFile.getTables()) { + tableMetadataMap.put( + TableMetadataUtil.getTableLookupKey(table.getNamespace(), table.getTable()), + tableMetadataService.getTableMetadata(table.getNamespace(), table.getTable())); + } + } else { + tableMetadataMap.put( + TableMetadataUtil.getTableLookupKey(namespace, tableName), + tableMetadataService.getTableMetadata(namespace, tableName)); + } + return tableMetadataMap; + } + } + + /** + * Create ImportManager object from data + * + * @param importOptions import options + * @param tableMetadataMap table metadata map + * @param reader buffered reader with source data + * @param logWriterFactory log writer factory object + * @param config import logging config + * @return ImportManager object + */ + private ImportManager createImportManager( + ImportOptions importOptions, + Map tableMetadataMap, + BufferedReader reader, + LogWriterFactory logWriterFactory, + ImportLoggerConfig config) + throws IOException { + File configFile = new File(configFilePath); + ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory(); + ImportManager importManager; + if (scalarDbMode == ScalarDbMode.TRANSACTION) { + ScalarDbTransactionManager scalarDbTransactionManager = + new ScalarDbTransactionManager(TransactionFactory.create(configFile)); + importManager = + new ImportManager( + tableMetadataMap, + reader, + importOptions, + importProcessorFactory, + ScalarDbMode.TRANSACTION, + null, + scalarDbTransactionManager.getDistributedTransactionManager()); + } else { + ScalarDbStorageManager scalarDbStorageManager = + new ScalarDbStorageManager(StorageFactory.create(configFile)); + importManager = + new ImportManager( + tableMetadataMap, + reader, + importOptions, + importProcessorFactory, + ScalarDbMode.STORAGE, + scalarDbStorageManager.getDistributedStorage(), + null); + } + if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) { + importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory)); + } else { + importManager.addListener(new SingleFileImportLogger(config, logWriterFactory)); + } + return importManager; + } + + /** + * Validate import targets + * + * @param controlFilePath control file path + * @param namespace Namespace + * @param tableName Single table name + * @throws ParameterException if one of the argument values is wrong + */ + private void validateImportTarget(String controlFilePath, String namespace, String tableName) { + // Throw an error if there was no clear imports target specified + if (StringUtils.isBlank(controlFilePath) + && (StringUtils.isBlank(namespace) || StringUtils.isBlank(tableName))) { + throw new ParameterException( + spec.commandLine(), CoreError.DATA_LOADER_IMPORT_TARGET_MISSING.buildMessage()); + } + + // Make sure the control file exists when a path is provided + if (!StringUtils.isBlank(controlFilePath)) { + Path path = Paths.get(controlFilePath); + if (!Files.exists(path)) { + throw new ParameterException( + spec.commandLine(), + CoreError.DATA_LOADER_MISSING_IMPORT_FILE.buildMessage( + controlFilePath, FILE_OPTION_NAME_LONG_FORMAT)); + } + } + } + + /** + * Validate log directory path + * + * @param logDirectory log directory path + * @throws ParameterException if the path is invalid + */ + private void validateLogDirectory(String logDirectory) throws ParameterException { + Path logDirectoryPath; + if (!StringUtils.isBlank(logDirectory)) { + // User-provided log directory via CLI argument + logDirectoryPath = Paths.get(logDirectory); + + if (Files.exists(logDirectoryPath)) { + // Check if the provided directory is writable + if (!Files.isWritable(logDirectoryPath)) { + throw new ParameterException( + spec.commandLine(), + CoreError.DATA_LOADER_LOG_DIRECTORY_CREATION_FAILED.buildMessage( + logDirectoryPath.toAbsolutePath())); + } + } else { + // Create the log directory if it doesn't exist + try { + Files.createDirectories(logDirectoryPath); + } catch (IOException e) { + throw new ParameterException( + spec.commandLine(), + CoreError.DATA_LOADER_LOG_DIRECTORY_CREATION_FAILED.buildMessage( + logDirectoryPath.toAbsolutePath())); + } + } + return; + } + + // Use the current working directory as the log directory + logDirectoryPath = Paths.get(System.getProperty("user.dir")); + + // Check if the current working directory is writable + if (!Files.isWritable(logDirectoryPath)) { + throw new ParameterException( + spec.commandLine(), + CoreError.DATA_LOADER_LOG_DIRECTORY_WRITE_ACCESS_DENIED.buildMessage( + logDirectoryPath.toAbsolutePath())); + } + } + + /** + * Generate control file from a valid control file path + * + * @param controlFilePath control directory path + * @return {@code Optional} generated control file object + * @throws ParameterException if the path is invalid + */ + private Optional parseControlFileFromPath(String controlFilePath) { + if (StringUtils.isBlank(controlFilePath)) { + return Optional.empty(); + } + try { + ObjectMapper objectMapper = new ObjectMapper(); + ControlFile controlFile = + objectMapper.readValue(new File(controlFilePath), ControlFile.class); + return Optional.of(controlFile); + } catch (IOException e) { + throw new ParameterException( + spec.commandLine(), + CoreError.DATA_LOADER_INVALID_CONTROL_FILE.buildMessage(controlFilePath)); + } + } + + /** + * Generate import options object from provided cli parameter data + * + * @param controlFile control file + * @return ImportOptions generated import options object + */ + private ImportOptions createImportOptions(ControlFile controlFile) { + ImportOptions.ImportOptionsBuilder builder = + ImportOptions.builder() + .fileFormat(sourceFileFormat) + .requireAllColumns(requireAllColumns) + .prettyPrint(prettyPrint) + .controlFile(controlFile) + .controlFileValidationLevel(controlFileValidation) + .logRawRecord(logRawRecord) + .logSuccessRecords(logSuccessRecords) + .ignoreNullValues(ignoreNullValues) + .namespace(namespace) + .dataChunkSize(dataChunkSize) + .transactionBatchSize(transactionSize) + .maxThreads(maxThreads) + .dataChunkQueueSize(dataChunkQueueSize) + .tableName(tableName); + + // Import mode + if (importMode != null) { + builder.importMode(importMode); + } + if (!splitLogMode) { + builder.logMode(LogMode.SINGLE_FILE); + } + + // CSV options + if (sourceFileFormat.equals(FileFormat.CSV)) { + builder.delimiter(delimiter); + if (!StringUtils.isBlank(customHeaderRow)) { + builder.customHeaderRow(customHeaderRow); + } + } + return builder.build(); + } } diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandOptions.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandOptions.java old mode 100644 new mode 100755 index ab3fa54d45..d0cd2871ce --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandOptions.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandOptions.java @@ -1,3 +1,161 @@ package com.scalar.db.dataloader.cli.command.dataimport; -public class ImportCommandOptions {} +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; +import com.scalar.db.dataloader.core.dataimport.ImportMode; +import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel; +import picocli.CommandLine; + +public class ImportCommandOptions { + + public static final String FILE_OPTION_NAME_LONG_FORMAT = "--file"; + + @CommandLine.Option( + names = {"--mode", "-m"}, + description = "ScalarDB mode (STORAGE, TRANSACTION) (default: STORAGE)", + paramLabel = "", + defaultValue = "STORAGE") + protected ScalarDbMode scalarDbMode; + + @CommandLine.Option( + names = {"--config", "-c"}, + paramLabel = "", + description = "Path to the ScalarDB configuration file (default: scalardb.properties)", + defaultValue = "scalardb.properties") + protected String configFilePath; + + @CommandLine.Option( + names = {FILE_OPTION_NAME_LONG_FORMAT, "-f"}, + paramLabel = "", + description = "Path to the import source file", + required = true) + protected String sourceFilePath; + + @CommandLine.Option( + names = {"--max-threads", "-mt"}, + paramLabel = "", + description = + "Maximum number of threads to use for parallel processing (default: number of available processors)", + defaultValue = "16") + protected int maxThreads; + + @CommandLine.Option( + names = {"--namespace", "-ns"}, + paramLabel = "", + description = "ScalarDB namespace containing the table to import data into") + protected String namespace; + + @CommandLine.Option( + names = {"--table", "-t"}, + paramLabel = "", + description = "Name of the ScalarDB table to import data into") + protected String tableName; + + @CommandLine.Option( + names = {"--control-file", "-cf"}, + paramLabel = "", + description = "Path to the JSON control file for data mapping") + protected String controlFilePath; + + @CommandLine.Option( + names = {"--log-success", "-ls"}, + description = "Enable logging of successfully processed records (default: false)", + defaultValue = "false") + protected boolean logSuccessRecords; + + @CommandLine.Option( + names = {"--log-dir", "-ld"}, + paramLabel = "", + description = "Directory where log files should be stored (default: logs)", + defaultValue = "logs") + protected String logDirectory; + + @CommandLine.Option( + names = {"--format", "-fmt"}, + paramLabel = "", + description = "Format of the import source file (JSON, CSV, JSONL) (default: JSON)", + defaultValue = "JSON") + protected FileFormat sourceFileFormat; + + @CommandLine.Option( + names = {"--require-all-columns", "-rac"}, + description = "Require all columns to be present in the source file (default: false)", + defaultValue = "false") + protected boolean requireAllColumns; + + @CommandLine.Option( + names = {"--pretty-print", "-pp"}, + description = "Enable pretty printing for JSON output (default: false)", + defaultValue = "false") + protected boolean prettyPrint; + + @CommandLine.Option( + names = {"--ignore-nulls", "-in"}, + description = "Ignore null values in the source file during import (default: false)", + defaultValue = "false") + protected boolean ignoreNullValues; + + @CommandLine.Option( + names = {"--log-raw-record", "-lr"}, + description = "Include the original source record in the log file output (default: false)", + defaultValue = "false") + protected boolean logRawRecord; + + @CommandLine.Option( + names = {"--control-file-validation", "-cfv"}, + paramLabel = "", + description = + "Level of validation to perform on control file data mappings (FULL, KEYS, MAPPED) (default: MAPPED)", + defaultValue = "MAPPED") + protected ControlFileValidationLevel controlFileValidation; + + @CommandLine.Option( + names = {"--import-mode", "-im"}, + paramLabel = "", + description = "Import mode (INSERT, UPDATE, UPSERT) (default: INSERT)", + defaultValue = "INSERT") + protected ImportMode importMode; + + @CommandLine.Option( + names = {"--delimiter", "-d"}, + paramLabel = "", + description = "Delimiter character used in the CSV import file (default: comma for CSV)", + defaultValue = ",") + protected char delimiter; + + @CommandLine.Option( + names = {"--header", "-hdr"}, + paramLabel = "
", + description = + "Header row for the CSV/TSV import file (default: use the first line as the header)") + protected String customHeaderRow; + + @CommandLine.Option( + names = {"--data-chunk-size", "-dcs"}, + paramLabel = "", + description = "Maximum number of records to be included in a single data chunk", + defaultValue = "500") + protected int dataChunkSize; + + @CommandLine.Option( + names = {"--transaction-size", "-ts"}, + paramLabel = "", + description = + "Maximum number of put operations that are grouped together into one ScalarDB distributed transaction, only supported in ScalarDB transaction mode", + defaultValue = "100") + protected int transactionSize; + + @CommandLine.Option( + names = {"--split-log-mode", "-slm"}, + paramLabel = "", + description = "Split log file into multiple files based on data chunks", + defaultValue = "false") + protected boolean splitLogMode; + + @CommandLine.Option( + names = {"--data-chunk-queue-size", "-qs"}, + paramLabel = "", + description = "Maximum number of data chunks that can be kept at a time for processing", + defaultValue = "256") + protected int dataChunkQueueSize; +} diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java new file mode 100755 index 0000000000..f0e53b9287 --- /dev/null +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java @@ -0,0 +1,66 @@ +package com.scalar.db.dataloader.cli.command.dataimport; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.dataimport.ImportMode; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import picocli.CommandLine; + +public class ImportCommandTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ImportCommandTest.class); + @TempDir Path tempDir; + + private ImportCommand importCommand; + + @BeforeEach + void setUp() { + importCommand = new ImportCommand(); + CommandLine cmd = new CommandLine(importCommand); + importCommand.spec = cmd.getCommandSpec(); + } + + @AfterEach + public void cleanup() throws IOException { + cleanUpTempDir(); + } + + @Test + void call_WithoutValidConfigFile_ShouldThrowException() throws Exception { + Path configFile = tempDir.resolve("config.properties"); + Files.createFile(configFile); + Path importFile = tempDir.resolve("import.json"); + Files.createFile(importFile); + importCommand.configFilePath = configFile.toString(); + importCommand.namespace = "sample"; + importCommand.tableName = "table"; + importCommand.sourceFileFormat = FileFormat.JSON; + importCommand.sourceFilePath = importFile.toString(); + importCommand.importMode = ImportMode.UPSERT; + assertThrows(IllegalArgumentException.class, () -> importCommand.call()); + } + + private void cleanUpTempDir() throws IOException { + try (Stream paths = Files.list(tempDir)) { + paths.forEach(this::deleteFile); + } + } + + private void deleteFile(Path file) { + try { + Files.deleteIfExists(file); + } catch (IOException e) { + LOGGER.error("Failed to delete file: {}", file, e); + } + } +}