Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.scalar.db.dataloader.core;

/** The available modes a ScalarDB instance can run in */
public enum ScalarDBMode {
public enum ScalarDbMode {
STORAGE,
TRANSACTION
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import com.scalar.db.dataloader.core.util.CsvUtil;
import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils;
import java.io.IOException;
Expand All @@ -13,7 +13,7 @@

public class CsvExportManager extends ExportManager {
public CsvExportManager(
DistributedStorage storage, ScalarDBDao dao, ProducerTaskFactory producerTaskFactory) {
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidationException;
import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidator;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import com.scalar.db.io.DataType;
import java.io.BufferedWriter;
Expand All @@ -33,7 +33,7 @@ public abstract class ExportManager {
private static final Logger logger = LoggerFactory.getLogger(ExportManager.class);

private final DistributedStorage storage;
private final ScalarDBDao dao;
private final ScalarDbDao dao;
private final ProducerTaskFactory producerTaskFactory;
private final Object lock = new Object();

Expand Down Expand Up @@ -115,7 +115,7 @@ public ExportReport startExport(
} finally {
bufferedWriter.flush();
}
} catch (ExportOptionsValidationException | IOException | ScalarDBDaoException e) {
} catch (ExportOptionsValidationException | IOException | ScalarDbDaoException e) {
logger.error("Error during export: {}", e.getMessage());
}
return exportReport;
Expand Down Expand Up @@ -215,11 +215,11 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat
* @param dao ScalarDB dao object
* @param storage distributed storage object
* @return created scanner
* @throws ScalarDBDaoException throws if any issue occurs in creating scanner object
* @throws ScalarDbDaoException throws if any issue occurs in creating scanner object
*/
private Scanner createScanner(
ExportOptions exportOptions, ScalarDBDao dao, DistributedStorage storage)
throws ScalarDBDaoException {
ExportOptions exportOptions, ScalarDbDao dao, DistributedStorage storage)
throws ScalarDbDaoException {
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
if (isScanAll) {
return dao.createScanner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import java.io.IOException;
import java.io.Writer;

public class JsonExportManager extends ExportManager {
public JsonExportManager(
DistributedStorage storage, ScalarDBDao dao, ProducerTaskFactory producerTaskFactory) {
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import java.io.IOException;
import java.io.Writer;

public class JsonLineExportManager extends ExportManager {
public JsonLineExportManager(
DistributedStorage storage, ScalarDBDao dao, ProducerTaskFactory producerTaskFactory) {
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDBMode;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessor;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
Expand Down Expand Up @@ -43,7 +43,7 @@ public class ImportManager implements ImportEventListener {
@NonNull private final ImportOptions importOptions;
private final ImportProcessorFactory importProcessorFactory;
private final List<ImportEventListener> listeners = new ArrayList<>();
private final ScalarDBMode scalarDBMode;
private final ScalarDbMode scalarDbMode;
private final DistributedStorage distributedStorage;
private final DistributedTransactionManager distributedTransactionManager;
private final ConcurrentHashMap<Integer, ImportDataChunkStatus> importDataChunkStatusMap =
Expand All @@ -62,10 +62,10 @@ public class ImportManager implements ImportEventListener {
public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
ImportProcessorParams params =
ImportProcessorParams.builder()
.scalarDBMode(scalarDBMode)
.scalarDbMode(scalarDbMode)
.importOptions(importOptions)
.tableMetadataByTableName(tableMetadata)
.dao(new ScalarDBDao())
.dao(new ScalarDbDao())
.distributedTransactionManager(distributedTransactionManager)
.distributedStorage(distributedStorage)
.tableColumnDataTypes(getTableColumnDataTypes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import org.slf4j.LoggerFactory;

/** The generic DAO that is used to scan ScalarDB data */
public class ScalarDBDao {
public class ScalarDbDao {

/* Class logger */
private static final Logger logger = LoggerFactory.getLogger(ScalarDBDao.class);
private static final Logger logger = LoggerFactory.getLogger(ScalarDbDao.class);
private static final String GET_COMPLETED_MSG = "GET completed for %s";
private static final String PUT_COMPLETED_MSG = "PUT completed for %s";
private static final String SCAN_START_MSG = "SCAN started...";
Expand All @@ -44,15 +44,15 @@ public class ScalarDBDao {
* @param clusteringKey Optional clustering key for get
* @param storage Distributed storage for ScalarDB connection that is running in storage mode.
* @return Optional get result
* @throws ScalarDBDaoException if something goes wrong while reading the data
* @throws ScalarDbDaoException if something goes wrong while reading the data
*/
public Optional<Result> get(
String namespace,
String table,
Key partitionKey,
Key clusteringKey,
DistributedStorage storage)
throws ScalarDBDaoException {
throws ScalarDbDaoException {

// Retrieving the key data for logging
String loggingKey = keysToString(partitionKey, clusteringKey);
Expand All @@ -63,7 +63,7 @@ public Optional<Result> get(
logger.info(String.format(GET_COMPLETED_MSG, loggingKey));
return result;
} catch (ExecutionException e) {
throw new ScalarDBDaoException("error GET " + loggingKey, e);
throw new ScalarDbDaoException("error GET " + loggingKey, e);
}
}

Expand All @@ -76,15 +76,15 @@ public Optional<Result> get(
* @param clusteringKey Optional clustering key for get
* @param transaction ScalarDB transaction instance
* @return Optional get result
* @throws ScalarDBDaoException if something goes wrong while reading the data
* @throws ScalarDbDaoException if something goes wrong while reading the data
*/
public Optional<Result> get(
String namespace,
String table,
Key partitionKey,
Key clusteringKey,
DistributedTransaction transaction)
throws ScalarDBDaoException {
throws ScalarDbDaoException {

Get get = createGetWith(namespace, table, partitionKey, clusteringKey);
// Retrieving the key data for logging
Expand All @@ -94,7 +94,7 @@ public Optional<Result> get(
logger.info(String.format(GET_COMPLETED_MSG, loggingKey));
return result;
} catch (CrudException e) {
throw new ScalarDBDaoException("error GET " + loggingKey, e.getCause());
throw new ScalarDbDaoException("error GET " + loggingKey, e.getCause());
}
}

Expand All @@ -107,7 +107,7 @@ public Optional<Result> get(
* @param clusteringKey Optional clustering key
* @param columns List of column values to be inserted or updated
* @param transaction ScalarDB transaction instance
* @throws ScalarDBDaoException if something goes wrong while executing the transaction
* @throws ScalarDbDaoException if something goes wrong while executing the transaction
*/
public void put(
String namespace,
Expand All @@ -116,13 +116,13 @@ public void put(
Key clusteringKey,
List<Column<?>> columns,
DistributedTransaction transaction)
throws ScalarDBDaoException {
throws ScalarDbDaoException {

Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns);
try {
transaction.put(put);
} catch (CrudException e) {
throw new ScalarDBDaoException(
throw new ScalarDbDaoException(
CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e);
}
logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey)));
Expand All @@ -137,7 +137,7 @@ public void put(
* @param clusteringKey Optional clustering key
* @param columns List of column values to be inserted or updated
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
* @throws ScalarDBDaoException if something goes wrong while executing the transaction
* @throws ScalarDbDaoException if something goes wrong while executing the transaction
*/
public void put(
String namespace,
Expand All @@ -146,12 +146,12 @@ public void put(
Key clusteringKey,
List<Column<?>> columns,
DistributedStorage storage)
throws ScalarDBDaoException {
throws ScalarDbDaoException {
Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns);
try {
storage.put(put);
} catch (ExecutionException e) {
throw new ScalarDBDaoException(
throw new ScalarDbDaoException(
CoreError.DATA_LOADER_ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e);
}
logger.info(String.format(PUT_COMPLETED_MSG, keysToString(partitionKey, clusteringKey)));
Expand All @@ -169,7 +169,7 @@ public void put(
* @param limit Scan limit value
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
* @return List of ScalarDB scan results
* @throws ScalarDBDaoException if scan fails
* @throws ScalarDbDaoException if scan fails
*/
public List<Result> scan(
String namespace,
Expand All @@ -180,7 +180,7 @@ public List<Result> scan(
List<String> projections,
int limit,
DistributedStorage storage)
throws ScalarDBDaoException {
throws ScalarDbDaoException {
// Create scan
Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit);

Expand All @@ -193,7 +193,7 @@ public List<Result> scan(
return allResults;
}
} catch (ExecutionException | IOException e) {
throw new ScalarDBDaoException(
throw new ScalarDbDaoException(
CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}
Expand All @@ -211,7 +211,7 @@ public List<Result> scan(
* @param transaction Distributed Transaction manager for ScalarDB connection that is * running in
* transaction mode
* @return List of ScalarDB scan results
* @throws ScalarDBDaoException if scan fails
* @throws ScalarDbDaoException if scan fails
*/
public List<Result> scan(
String namespace,
Expand All @@ -222,7 +222,7 @@ public List<Result> scan(
List<String> projections,
int limit,
DistributedTransaction transaction)
throws ScalarDBDaoException {
throws ScalarDbDaoException {

// Create scan
Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit);
Expand All @@ -236,7 +236,7 @@ public List<Result> scan(
} catch (CrudException | NoSuchElementException e) {
// No such element Exception is thrown when the scan is done in transaction mode but
// ScalarDB is running in storage mode
throw new ScalarDBDaoException(
throw new ScalarDbDaoException(
CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}
Expand All @@ -250,21 +250,21 @@ public List<Result> scan(
* @param limit Scan limit value
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
* @return ScalarDB Scanner object
* @throws ScalarDBDaoException if scan fails
* @throws ScalarDbDaoException if scan fails
*/
public Scanner createScanner(
String namespace,
String table,
List<String> projectionColumns,
int limit,
DistributedStorage storage)
throws ScalarDBDaoException {
throws ScalarDbDaoException {
Scan scan =
createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit);
try {
return storage.scan(scan);
} catch (ExecutionException e) {
throw new ScalarDBDaoException(
throw new ScalarDbDaoException(
CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}
Expand All @@ -281,7 +281,7 @@ public Scanner createScanner(
* @param limit Scan limit value
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
* @return ScalarDB Scanner object
* @throws ScalarDBDaoException if scan fails
* @throws ScalarDbDaoException if scan fails
*/
public Scanner createScanner(
String namespace,
Expand All @@ -292,13 +292,13 @@ public Scanner createScanner(
@Nullable List<String> projectionColumns,
int limit,
DistributedStorage storage)
throws ScalarDBDaoException {
throws ScalarDbDaoException {
Scan scan =
createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit);
try {
return storage.scan(scan);
} catch (ExecutionException e) {
throw new ScalarDBDaoException(
throw new ScalarDbDaoException(
CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.scalar.db.dataloader.core.dataimport.dao;

/** A custom DAO exception that encapsulates errors thrown by ScalarDB operations */
public class ScalarDBDaoException extends Exception {
public class ScalarDbDaoException extends Exception {

/**
* Class constructor
*
* @param message error message
* @param cause reason for exception
*/
public ScalarDBDaoException(String message, Throwable cause) {
public ScalarDbDaoException(String message, Throwable cause) {
super(message, cause);
}
}
Loading