Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.tron.common.storage;

public class WriteOptionsWrapper {
import java.io.Closeable;

public class WriteOptionsWrapper implements Closeable {

public org.rocksdb.WriteOptions rocks = null;
public org.iq80.leveldb.WriteOptions level = null;
Expand All @@ -9,6 +11,23 @@ private WriteOptionsWrapper() {

}

/**
* Returns an WriteOptionsWrapper.
*
* <p><b>CRITICAL:</b> The returned WriteOptionsWrapper holds native resources
* and <b>MUST</b> be closed
* after use to prevent memory leaks. It is strongly recommended to use a try-with-resources
* statement.
*
* <p>Example of correct usage:
* <pre>{@code
* try ( WriteOptionsWrapper readOptions = WriteOptionsWrapper.getInstance()) {
* // do something
* }
* }</pre>
*
* @return a new WriteOptionsWrapper that must be closed.
*/
public static WriteOptionsWrapper getInstance() {
WriteOptionsWrapper wrapper = new WriteOptionsWrapper();
wrapper.level = new org.iq80.leveldb.WriteOptions();
Expand All @@ -23,4 +42,12 @@ public WriteOptionsWrapper sync(boolean bool) {
this.rocks.setSync(bool);
return this;
}

@Override
public void close() {
if (rocks != null) {
rocks.close();
}
// leveldb WriteOptions has no close method, and does not need to be closed
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class RocksDbDataSourceImpl extends DbStat implements DbSourceInter<byte[
private volatile boolean alive;
private String parentPath;
private ReadWriteLock resetDbLock = new ReentrantReadWriteLock();
private Options options;

public RocksDbDataSourceImpl(String parentPath, String name) {
this.dataBaseName = name;
Expand All @@ -78,6 +79,9 @@ public void closeDB() {
if (!isAlive()) {
return;
}
if (this.options != null) {
this.options.close();
}
database.close();
alive = false;
} catch (Exception e) {
Expand Down Expand Up @@ -117,7 +121,8 @@ private static void checkArgNotNull(Object value, String name) {
@Override
public Set<byte[]> allKeys() throws RuntimeException {
resetDbLock.readLock().lock();
try (final RocksIterator iter = getRocksIterator()) {
try (final ReadOptions readOptions = getReadOptions();
final RocksIterator iter = getRocksIterator(readOptions)) {
Set<byte[]> result = Sets.newHashSet();
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
result.add(iter.key());
Expand All @@ -133,7 +138,8 @@ public Set<byte[]> allKeys() throws RuntimeException {
@Override
public Set<byte[]> allValues() throws RuntimeException {
resetDbLock.readLock().lock();
try (final RocksIterator iter = getRocksIterator()) {
try (final ReadOptions readOptions = getReadOptions();
final RocksIterator iter = getRocksIterator(readOptions)) {
Set<byte[]> result = Sets.newHashSet();
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
result.add(iter.value());
Expand All @@ -149,7 +155,8 @@ public Set<byte[]> allValues() throws RuntimeException {
@Override
public long getTotal() throws RuntimeException {
resetDbLock.readLock().lock();
try (final RocksIterator iter = getRocksIterator()) {
try (final ReadOptions readOptions = getReadOptions();
final RocksIterator iter = getRocksIterator(readOptions)) {
long total = 0;
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
total++;
Expand Down Expand Up @@ -180,7 +187,7 @@ private void initDB() {
throw new IllegalArgumentException("No name set to the dbStore");
}

try (Options options = RocksDbSettings.getOptionsByDbName(dataBaseName)) {
try {
logger.debug("Opening database {}.", dataBaseName);
final Path dbPath = getDbPath();

Expand All @@ -191,14 +198,19 @@ private void initDB() {
try {
DbSourceInter.checkOrInitEngine(getEngine(), dbPath.toString(),
TronError.ErrCode.ROCKSDB_INIT);
database = RocksDB.open(options, dbPath.toString());
this.options = RocksDbSettings.getOptionsByDbName(dataBaseName);
database = RocksDB.open(this.options, dbPath.toString());
} catch (RocksDBException e) {
if (Objects.equals(e.getStatus().getCode(), Status.Code.Corruption)) {
logger.error("Database {} corrupted, please delete database directory({}) "
+ "and restart.", dataBaseName, parentPath, e);
} else {
logger.error("Open Database {} failed", dataBaseName, e);
}

if (this.options != null) {
this.options.close();
}
throw new TronError(e, TronError.ErrCode.ROCKSDB_INIT);
}

Expand Down Expand Up @@ -282,7 +294,8 @@ public boolean flush() {
*/
@Override
public org.tron.core.db.common.iterator.DBIterator iterator() {
return new RockStoreIterator(getRocksIterator());
ReadOptions readOptions = getReadOptions();
return new RockStoreIterator(getRocksIterator(readOptions), readOptions);
}

private void updateByBatchInner(Map<byte[], byte[]> rows, WriteOptions options)
Expand All @@ -308,7 +321,9 @@ public void updateByBatch(Map<byte[], byte[]> rows, WriteOptionsWrapper optionsW

@Override
public void updateByBatch(Map<byte[], byte[]> rows) {
this.updateByBatch(rows, new WriteOptions());
try (WriteOptions writeOptions = new WriteOptions()) {
this.updateByBatch(rows, writeOptions);
}
}

private void updateByBatch(Map<byte[], byte[]> rows, WriteOptions options) {
Expand All @@ -331,7 +346,8 @@ public List<byte[]> getKeysNext(byte[] key, long limit) {
return new ArrayList<>();
}
resetDbLock.readLock().lock();
try (RocksIterator iter = getRocksIterator()) {
try (final ReadOptions readOptions = getReadOptions();
final RocksIterator iter = getRocksIterator(readOptions)) {
List<byte[]> result = new ArrayList<>();
long i = 0;
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
Expand All @@ -348,7 +364,8 @@ public Map<byte[], byte[]> getNext(byte[] key, long limit) {
return Collections.emptyMap();
}
resetDbLock.readLock().lock();
try (RocksIterator iter = getRocksIterator()) {
try (final ReadOptions readOptions = getReadOptions();
final RocksIterator iter = getRocksIterator(readOptions)) {
Map<byte[], byte[]> result = new HashMap<>();
long i = 0;
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
Expand All @@ -363,7 +380,8 @@ public Map<byte[], byte[]> getNext(byte[] key, long limit) {
@Override
public Map<WrappedByteArray, byte[]> prefixQuery(byte[] key) {
resetDbLock.readLock().lock();
try (RocksIterator iterator = getRocksIterator()) {
try (final ReadOptions readOptions = getReadOptions();
final RocksIterator iterator = getRocksIterator(readOptions)) {
Map<WrappedByteArray, byte[]> result = new HashMap<>();
for (iterator.seek(key); iterator.isValid(); iterator.next()) {
if (Bytes.indexOf(iterator.key(), key) == 0) {
Expand All @@ -383,7 +401,8 @@ public Set<byte[]> getlatestValues(long limit) {
return Sets.newHashSet();
}
resetDbLock.readLock().lock();
try (RocksIterator iter = getRocksIterator()) {
try (final ReadOptions readOptions = getReadOptions();
final RocksIterator iter = getRocksIterator(readOptions)) {
Set<byte[]> result = Sets.newHashSet();
long i = 0;
for (iter.seekToLast(); iter.isValid() && i < limit; iter.prev(), i++) {
Expand All @@ -400,7 +419,8 @@ public Set<byte[]> getValuesNext(byte[] key, long limit) {
return Sets.newHashSet();
}
resetDbLock.readLock().lock();
try (RocksIterator iter = getRocksIterator()) {
try (final ReadOptions readOptions = getReadOptions();
final RocksIterator iter = getRocksIterator(readOptions)) {
Set<byte[]> result = Sets.newHashSet();
long i = 0;
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
Expand All @@ -419,11 +439,50 @@ public void backup(String dir) throws RocksDBException {
}
}

private RocksIterator getRocksIterator() {
try (ReadOptions readOptions = new ReadOptions().setFillCache(false)) {
throwIfNotAlive();
return database.newIterator(readOptions);
}
/**
* Returns an iterator over the database.
*
* <p><b>CRITICAL:</b> The returned iterator holds native resources and <b>MUST</b> be closed
* after use to prevent memory leaks. It is strongly recommended to use a try-with-resources
* statement.
*
* <p>Example of correct usage:
* <pre>{@code
* try ( ReadOptions readOptions = new ReadOptions().setFillCache(false);
* RocksIterator iterator = getRocksIterator(readOptions)) {
* iterator.seekToFirst();
* // do something
* }
* }</pre>
*
* @return a new database iterator that must be closed.
*/
private RocksIterator getRocksIterator(ReadOptions readOptions) {
throwIfNotAlive();
return database.newIterator(readOptions);
}

/**
* Returns an ReadOptions.
*
* <p><b>CRITICAL:</b> The returned ReadOptions holds native resources and <b>MUST</b> be closed
* after use to prevent memory leaks. It is strongly recommended to use a try-with-resources
* statement.
*
* <p>Example of correct usage:
* <pre>{@code
* try (ReadOptions readOptions = getReadOptions();
* RocksIterator iterator = getRocksIterator(readOptions)) {
* iterator.seekToFirst();
* // do something
* }
* }</pre>
*
* @return a new database iterator that must be closed.
*/
private ReadOptions getReadOptions() {
throwIfNotAlive();
return new ReadOptions().setFillCache(false);
}

public boolean deleteDbBakPath(String dir) {
Expand Down
3 changes: 2 additions & 1 deletion chainbase/src/main/java/org/tron/core/db/TronDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract class TronDatabase<T> implements ITronChainBase<T> {
protected DbSourceInter<byte[]> dbSource;
@Getter
private String dbName;
private WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance()
private final WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance()
.sync(CommonParameter.getInstance().getStorage().isDbSync());

@Autowired
Expand Down Expand Up @@ -77,6 +77,7 @@ public void reset() {
public void close() {
logger.info("******** Begin to close {}. ********", getName());
try {
writeOptions.close();
dbSource.closeDB();
} catch (Exception e) {
logger.warn("Failed to close {}.", getName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksIterator;


Expand All @@ -15,14 +16,17 @@ public final class RockStoreIterator implements DBIterator {
private boolean first = true;

private final AtomicBoolean close = new AtomicBoolean(false);
private final ReadOptions readOptions;

public RockStoreIterator(RocksIterator dbIterator) {
public RockStoreIterator(RocksIterator dbIterator, ReadOptions readOptions) {
this.readOptions = readOptions;
this.dbIterator = dbIterator;
}

@Override
public void close() throws IOException {
if (close.compareAndSet(false, true)) {
readOptions.close();
dbIterator.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class LevelDB implements DB<byte[], byte[]>, Flusher {

@Getter
private LevelDbDataSourceImpl db;
private WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance()
private final WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance()
.sync(CommonParameter.getInstance().getStorage().isDbSync());

public LevelDB(LevelDbDataSourceImpl db) {
Expand Down Expand Up @@ -65,6 +65,7 @@ public void flush(Map<WrappedByteArray, WrappedByteArray> batch) {

@Override
public void close() {
this.writeOptions.close();
db.closeDB();
}

Expand Down
5 changes: 3 additions & 2 deletions chainbase/src/main/java/org/tron/core/db2/common/RocksDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class RocksDB implements DB<byte[], byte[]>, Flusher {
@Getter
private RocksDbDataSourceImpl db;

private WriteOptionsWrapper optionsWrapper = WriteOptionsWrapper.getInstance()
private final WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance()
.sync(CommonParameter.getInstance().getStorage().isDbSync());

public RocksDB(RocksDbDataSourceImpl db) {
Expand Down Expand Up @@ -61,11 +61,12 @@ public void flush(Map<WrappedByteArray, WrappedByteArray> batch) {
Map<byte[], byte[]> rows = batch.entrySet().stream()
.map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes()))
.collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll);
db.updateByBatch(rows, optionsWrapper);
db.updateByBatch(rows, writeOptions);
}

@Override
public void close() {
writeOptions.close();
db.closeDB();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.tron.common.error.TronDBException;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.storage.WriteOptionsWrapper;
import org.tron.common.utils.FileUtil;
import org.tron.common.utils.StorageUtils;
import org.tron.core.db.RevokingDatabase;
Expand Down Expand Up @@ -357,7 +356,6 @@ public void flush() {

public void createCheckpoint() {
TronDatabase<byte[]> checkPointStore = null;
boolean syncFlag;
try {
Map<WrappedByteArray, WrappedByteArray> batch = new HashMap<>();
for (Chainbase db : dbs) {
Expand Down Expand Up @@ -389,16 +387,13 @@ public void createCheckpoint() {
if (isV2Open()) {
String dbName = String.valueOf(System.currentTimeMillis());
checkPointStore = getCheckpointDB(dbName);
syncFlag = CommonParameter.getInstance().getStorage().isCheckpointSync();
} else {
checkPointStore = checkTmpStore;
syncFlag = CommonParameter.getInstance().getStorage().isDbSync();
}

checkPointStore.getDbSource().updateByBatch(batch.entrySet().stream()
checkPointStore.updateByBatch(batch.entrySet().stream()
.map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes()))
.collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll),
WriteOptionsWrapper.getInstance().sync(syncFlag));
.collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll));

} catch (Exception e) {
throw new TronDBException(e);
Expand Down
Loading