diff --git a/actuator/src/main/java/org/tron/core/vm/repository/WriteOptionsWrapper.java b/actuator/src/main/java/org/tron/core/vm/repository/WriteOptionsWrapper.java deleted file mode 100644 index f9e819f9716..00000000000 --- a/actuator/src/main/java/org/tron/core/vm/repository/WriteOptionsWrapper.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.tron.core.vm.repository; - -import lombok.Getter; - -public class WriteOptionsWrapper { - - @Getter - private org.rocksdb.WriteOptions rocks = null; - @Getter - private org.iq80.leveldb.WriteOptions level = null; - - public static WriteOptionsWrapper getInstance() { - WriteOptionsWrapper wrapper = new WriteOptionsWrapper(); - wrapper.level = new org.iq80.leveldb.WriteOptions(); - wrapper.rocks = new org.rocksdb.WriteOptions(); - return wrapper; - } - - public WriteOptionsWrapper sync(boolean bool) { - this.level.sync(bool); - this.rocks.setSync(bool); - return this; - } -} diff --git a/chainbase/src/main/java/org/tron/common/storage/WriteOptionsWrapper.java b/chainbase/src/main/java/org/tron/common/storage/WriteOptionsWrapper.java index 11277eafe75..bd6cacc6481 100644 --- a/chainbase/src/main/java/org/tron/common/storage/WriteOptionsWrapper.java +++ b/chainbase/src/main/java/org/tron/common/storage/WriteOptionsWrapper.java @@ -1,6 +1,8 @@ package org.tron.common.storage; -public class WriteOptionsWrapper { +import java.io.Closeable; + +public class WriteOptionsWrapper implements Closeable { public org.rocksdb.WriteOptions rocks = null; public org.iq80.leveldb.WriteOptions level = null; @@ -9,6 +11,23 @@ private WriteOptionsWrapper() { } + /** + * Returns an WriteOptionsWrapper. + * + *

CRITICAL: The returned WriteOptionsWrapper holds native resources + * and MUST be closed + * after use to prevent memory leaks. It is strongly recommended to use a try-with-resources + * statement. + * + *

Example of correct usage: + *

{@code
+   * try ( WriteOptionsWrapper readOptions = WriteOptionsWrapper.getInstance()) {
+   *  // do something
+   * }
+   * }
+ * + * @return a new WriteOptionsWrapper that must be closed. + */ public static WriteOptionsWrapper getInstance() { WriteOptionsWrapper wrapper = new WriteOptionsWrapper(); wrapper.level = new org.iq80.leveldb.WriteOptions(); @@ -23,4 +42,12 @@ public WriteOptionsWrapper sync(boolean bool) { this.rocks.setSync(bool); return this; } + + @Override + public void close() { + if (rocks != null) { + rocks.close(); + } + // leveldb WriteOptions has no close method, and does not need to be closed + } } diff --git a/chainbase/src/main/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImpl.java b/chainbase/src/main/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImpl.java index 63191b6ac01..c7ca698cc3d 100644 --- a/chainbase/src/main/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImpl.java +++ b/chainbase/src/main/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImpl.java @@ -52,6 +52,7 @@ public class RocksDbDataSourceImpl extends DbStat implements DbSourceInter allKeys() throws RuntimeException { resetDbLock.readLock().lock(); - try (final RocksIterator iter = getRocksIterator()) { + try (final ReadOptions readOptions = getReadOptions(); + final RocksIterator iter = getRocksIterator(readOptions)) { Set result = Sets.newHashSet(); for (iter.seekToFirst(); iter.isValid(); iter.next()) { result.add(iter.key()); @@ -133,7 +138,8 @@ public Set allKeys() throws RuntimeException { @Override public Set allValues() throws RuntimeException { resetDbLock.readLock().lock(); - try (final RocksIterator iter = getRocksIterator()) { + try (final ReadOptions readOptions = getReadOptions(); + final RocksIterator iter = getRocksIterator(readOptions)) { Set result = Sets.newHashSet(); for (iter.seekToFirst(); iter.isValid(); iter.next()) { result.add(iter.value()); @@ -149,7 +155,8 @@ public Set allValues() throws RuntimeException { @Override public long getTotal() throws RuntimeException { resetDbLock.readLock().lock(); - try (final RocksIterator iter = getRocksIterator()) { + try (final ReadOptions readOptions = getReadOptions(); + final RocksIterator iter = getRocksIterator(readOptions)) { long total = 0; for (iter.seekToFirst(); iter.isValid(); iter.next()) { total++; @@ -180,7 +187,7 @@ private void initDB() { throw new IllegalArgumentException("No name set to the dbStore"); } - try (Options options = RocksDbSettings.getOptionsByDbName(dataBaseName)) { + try { logger.debug("Opening database {}.", dataBaseName); final Path dbPath = getDbPath(); @@ -191,7 +198,8 @@ private void initDB() { try { DbSourceInter.checkOrInitEngine(getEngine(), dbPath.toString(), TronError.ErrCode.ROCKSDB_INIT); - database = RocksDB.open(options, dbPath.toString()); + this.options = RocksDbSettings.getOptionsByDbName(dataBaseName); + database = RocksDB.open(this.options, dbPath.toString()); } catch (RocksDBException e) { if (Objects.equals(e.getStatus().getCode(), Status.Code.Corruption)) { logger.error("Database {} corrupted, please delete database directory({}) " @@ -199,6 +207,10 @@ private void initDB() { } else { logger.error("Open Database {} failed", dataBaseName, e); } + + if (this.options != null) { + this.options.close(); + } throw new TronError(e, TronError.ErrCode.ROCKSDB_INIT); } @@ -282,7 +294,8 @@ public boolean flush() { */ @Override public org.tron.core.db.common.iterator.DBIterator iterator() { - return new RockStoreIterator(getRocksIterator()); + ReadOptions readOptions = getReadOptions(); + return new RockStoreIterator(getRocksIterator(readOptions), readOptions); } private void updateByBatchInner(Map rows, WriteOptions options) @@ -308,7 +321,9 @@ public void updateByBatch(Map rows, WriteOptionsWrapper optionsW @Override public void updateByBatch(Map rows) { - this.updateByBatch(rows, new WriteOptions()); + try (WriteOptions writeOptions = new WriteOptions()) { + this.updateByBatch(rows, writeOptions); + } } private void updateByBatch(Map rows, WriteOptions options) { @@ -331,7 +346,8 @@ public List getKeysNext(byte[] key, long limit) { return new ArrayList<>(); } resetDbLock.readLock().lock(); - try (RocksIterator iter = getRocksIterator()) { + try (final ReadOptions readOptions = getReadOptions(); + final RocksIterator iter = getRocksIterator(readOptions)) { List result = new ArrayList<>(); long i = 0; for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) { @@ -348,7 +364,8 @@ public Map getNext(byte[] key, long limit) { return Collections.emptyMap(); } resetDbLock.readLock().lock(); - try (RocksIterator iter = getRocksIterator()) { + try (final ReadOptions readOptions = getReadOptions(); + final RocksIterator iter = getRocksIterator(readOptions)) { Map result = new HashMap<>(); long i = 0; for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) { @@ -363,7 +380,8 @@ public Map getNext(byte[] key, long limit) { @Override public Map prefixQuery(byte[] key) { resetDbLock.readLock().lock(); - try (RocksIterator iterator = getRocksIterator()) { + try (final ReadOptions readOptions = getReadOptions(); + final RocksIterator iterator = getRocksIterator(readOptions)) { Map result = new HashMap<>(); for (iterator.seek(key); iterator.isValid(); iterator.next()) { if (Bytes.indexOf(iterator.key(), key) == 0) { @@ -383,7 +401,8 @@ public Set getlatestValues(long limit) { return Sets.newHashSet(); } resetDbLock.readLock().lock(); - try (RocksIterator iter = getRocksIterator()) { + try (final ReadOptions readOptions = getReadOptions(); + final RocksIterator iter = getRocksIterator(readOptions)) { Set result = Sets.newHashSet(); long i = 0; for (iter.seekToLast(); iter.isValid() && i < limit; iter.prev(), i++) { @@ -400,7 +419,8 @@ public Set getValuesNext(byte[] key, long limit) { return Sets.newHashSet(); } resetDbLock.readLock().lock(); - try (RocksIterator iter = getRocksIterator()) { + try (final ReadOptions readOptions = getReadOptions(); + final RocksIterator iter = getRocksIterator(readOptions)) { Set result = Sets.newHashSet(); long i = 0; for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) { @@ -419,11 +439,50 @@ public void backup(String dir) throws RocksDBException { } } - private RocksIterator getRocksIterator() { - try (ReadOptions readOptions = new ReadOptions().setFillCache(false)) { - throwIfNotAlive(); - return database.newIterator(readOptions); - } + /** + * Returns an iterator over the database. + * + *

CRITICAL: The returned iterator holds native resources and MUST be closed + * after use to prevent memory leaks. It is strongly recommended to use a try-with-resources + * statement. + * + *

Example of correct usage: + *

{@code
+   * try ( ReadOptions readOptions = new ReadOptions().setFillCache(false);
+   *      RocksIterator iterator = getRocksIterator(readOptions)) {
+   *      iterator.seekToFirst();
+   *  // do something
+   * }
+   * }
+ * + * @return a new database iterator that must be closed. + */ + private RocksIterator getRocksIterator(ReadOptions readOptions) { + throwIfNotAlive(); + return database.newIterator(readOptions); + } + + /** + * Returns an ReadOptions. + * + *

CRITICAL: The returned ReadOptions holds native resources and MUST be closed + * after use to prevent memory leaks. It is strongly recommended to use a try-with-resources + * statement. + * + *

Example of correct usage: + *

{@code
+   * try (ReadOptions readOptions = getReadOptions();
+   *      RocksIterator iterator = getRocksIterator(readOptions)) {
+   *      iterator.seekToFirst();
+   *  // do something
+   * }
+   * }
+ * + * @return a new database iterator that must be closed. + */ + private ReadOptions getReadOptions() { + throwIfNotAlive(); + return new ReadOptions().setFillCache(false); } public boolean deleteDbBakPath(String dir) { diff --git a/chainbase/src/main/java/org/tron/core/db/TronDatabase.java b/chainbase/src/main/java/org/tron/core/db/TronDatabase.java index e699675408f..40762568c82 100644 --- a/chainbase/src/main/java/org/tron/core/db/TronDatabase.java +++ b/chainbase/src/main/java/org/tron/core/db/TronDatabase.java @@ -27,7 +27,7 @@ public abstract class TronDatabase implements ITronChainBase { protected DbSourceInter dbSource; @Getter private String dbName; - private WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance() + private final WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance() .sync(CommonParameter.getInstance().getStorage().isDbSync()); @Autowired @@ -77,6 +77,7 @@ public void reset() { public void close() { logger.info("******** Begin to close {}. ********", getName()); try { + writeOptions.close(); dbSource.closeDB(); } catch (Exception e) { logger.warn("Failed to close {}.", getName(), e); diff --git a/chainbase/src/main/java/org/tron/core/db/common/iterator/RockStoreIterator.java b/chainbase/src/main/java/org/tron/core/db/common/iterator/RockStoreIterator.java index 105d845dc64..cf9a5ff1e22 100644 --- a/chainbase/src/main/java/org/tron/core/db/common/iterator/RockStoreIterator.java +++ b/chainbase/src/main/java/org/tron/core/db/common/iterator/RockStoreIterator.java @@ -5,6 +5,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksIterator; @@ -15,14 +16,17 @@ public final class RockStoreIterator implements DBIterator { private boolean first = true; private final AtomicBoolean close = new AtomicBoolean(false); + private final ReadOptions readOptions; - public RockStoreIterator(RocksIterator dbIterator) { + public RockStoreIterator(RocksIterator dbIterator, ReadOptions readOptions) { + this.readOptions = readOptions; this.dbIterator = dbIterator; } @Override public void close() throws IOException { if (close.compareAndSet(false, true)) { + readOptions.close(); dbIterator.close(); } } diff --git a/chainbase/src/main/java/org/tron/core/db2/common/LevelDB.java b/chainbase/src/main/java/org/tron/core/db2/common/LevelDB.java index b5616f87b8a..5942bb7444c 100644 --- a/chainbase/src/main/java/org/tron/core/db2/common/LevelDB.java +++ b/chainbase/src/main/java/org/tron/core/db2/common/LevelDB.java @@ -13,7 +13,7 @@ public class LevelDB implements DB, Flusher { @Getter private LevelDbDataSourceImpl db; - private WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance() + private final WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance() .sync(CommonParameter.getInstance().getStorage().isDbSync()); public LevelDB(LevelDbDataSourceImpl db) { @@ -65,6 +65,7 @@ public void flush(Map batch) { @Override public void close() { + this.writeOptions.close(); db.closeDB(); } diff --git a/chainbase/src/main/java/org/tron/core/db2/common/RocksDB.java b/chainbase/src/main/java/org/tron/core/db2/common/RocksDB.java index 31970b185cc..1d67438eceb 100644 --- a/chainbase/src/main/java/org/tron/core/db2/common/RocksDB.java +++ b/chainbase/src/main/java/org/tron/core/db2/common/RocksDB.java @@ -14,7 +14,7 @@ public class RocksDB implements DB, Flusher { @Getter private RocksDbDataSourceImpl db; - private WriteOptionsWrapper optionsWrapper = WriteOptionsWrapper.getInstance() + private final WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance() .sync(CommonParameter.getInstance().getStorage().isDbSync()); public RocksDB(RocksDbDataSourceImpl db) { @@ -61,11 +61,12 @@ public void flush(Map batch) { Map rows = batch.entrySet().stream() .map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes())) .collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll); - db.updateByBatch(rows, optionsWrapper); + db.updateByBatch(rows, writeOptions); } @Override public void close() { + writeOptions.close(); db.closeDB(); } diff --git a/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java b/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java index bfcb6237003..19f110c4021 100644 --- a/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java +++ b/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java @@ -29,7 +29,6 @@ import org.tron.common.error.TronDBException; import org.tron.common.es.ExecutorServiceManager; import org.tron.common.parameter.CommonParameter; -import org.tron.common.storage.WriteOptionsWrapper; import org.tron.common.utils.FileUtil; import org.tron.common.utils.StorageUtils; import org.tron.core.db.RevokingDatabase; @@ -357,7 +356,6 @@ public void flush() { public void createCheckpoint() { TronDatabase checkPointStore = null; - boolean syncFlag; try { Map batch = new HashMap<>(); for (Chainbase db : dbs) { @@ -389,16 +387,13 @@ public void createCheckpoint() { if (isV2Open()) { String dbName = String.valueOf(System.currentTimeMillis()); checkPointStore = getCheckpointDB(dbName); - syncFlag = CommonParameter.getInstance().getStorage().isCheckpointSync(); } else { checkPointStore = checkTmpStore; - syncFlag = CommonParameter.getInstance().getStorage().isDbSync(); } - checkPointStore.getDbSource().updateByBatch(batch.entrySet().stream() + checkPointStore.updateByBatch(batch.entrySet().stream() .map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes())) - .collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll), - WriteOptionsWrapper.getInstance().sync(syncFlag)); + .collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll)); } catch (Exception e) { throw new TronDBException(e); diff --git a/chainbase/src/main/java/org/tron/core/store/CheckPointV2Store.java b/chainbase/src/main/java/org/tron/core/store/CheckPointV2Store.java index 2f952e6b82a..f027bd02664 100644 --- a/chainbase/src/main/java/org/tron/core/store/CheckPointV2Store.java +++ b/chainbase/src/main/java/org/tron/core/store/CheckPointV2Store.java @@ -1,18 +1,23 @@ package org.tron.core.store; import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Map; +import java.util.Spliterator; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.tron.common.parameter.CommonParameter; +import org.tron.common.storage.WriteOptionsWrapper; import org.tron.core.db.TronDatabase; import org.tron.core.exception.BadItemException; import org.tron.core.exception.ItemNotFoundException; -import java.util.Spliterator; -import java.util.function.Consumer; - @Slf4j(topic = "DB") public class CheckPointV2Store extends TronDatabase { + private final WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance() + .sync(CommonParameter.getInstance().getStorage().isCheckpointSync()); + @Autowired public CheckPointV2Store(String dbPath) { super(dbPath); @@ -52,6 +57,11 @@ public Spliterator spliterator() { protected void init() { } + @Override + public void updateByBatch(Map rows) { + this.dbSource.updateByBatch(rows, writeOptions); + } + /** * close the database. */ @@ -59,6 +69,7 @@ protected void init() { public void close() { logger.debug("******** Begin to close {}. ********", getName()); try { + writeOptions.close(); dbSource.closeDB(); } catch (Exception e) { logger.warn("Failed to close {}.", getName(), e); diff --git a/common/src/main/java/org/tron/common/setting/RocksDbSettings.java b/common/src/main/java/org/tron/common/setting/RocksDbSettings.java index be1761133bf..d5df5e261b5 100644 --- a/common/src/main/java/org/tron/common/setting/RocksDbSettings.java +++ b/common/src/main/java/org/tron/common/setting/RocksDbSettings.java @@ -163,6 +163,21 @@ public static LRUCache getCache() { return cache; } + /** + * Creates a new RocksDB Options. + * + *

CRITICAL: Must be closed after use to prevent native memory leaks. + * Use try-with-resources. + * + *

{@code
+   * try (Options options = getOptionsByDbName(dbName)) {
+   *     // do something
+   * }
+   * }
+ * + * @param dbName db name + * @return a new Options instance that must be closed + */ public static Options getOptionsByDbName(String dbName) { RocksDbSettings settings = getSettings(); diff --git a/framework/src/test/java/org/tron/common/storage/leveldb/LevelDbDataSourceImplTest.java b/framework/src/test/java/org/tron/common/storage/leveldb/LevelDbDataSourceImplTest.java index 8fc05746fc8..78cbba3d079 100644 --- a/framework/src/test/java/org/tron/common/storage/leveldb/LevelDbDataSourceImplTest.java +++ b/framework/src/test/java/org/tron/common/storage/leveldb/LevelDbDataSourceImplTest.java @@ -172,7 +172,9 @@ public void testupdateByBatchInner() { rows.clear(); rows.put(key1.getBytes(), null); rows.put(key2.getBytes(), null); - dataSource.updateByBatch(rows, WriteOptionsWrapper.getInstance()); + try (WriteOptionsWrapper options = WriteOptionsWrapper.getInstance()) { + dataSource.updateByBatch(rows, options); + } assertEquals(0, dataSource.allKeys().size()); rows.clear(); diff --git a/framework/src/test/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImplTest.java b/framework/src/test/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImplTest.java index bf71b024541..86543db19fb 100644 --- a/framework/src/test/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImplTest.java +++ b/framework/src/test/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImplTest.java @@ -147,7 +147,9 @@ public void testupdateByBatchInner() { rows.clear(); rows.put(key1.getBytes(), null); rows.put(key2.getBytes(), null); - dataSource.updateByBatch(rows, WriteOptionsWrapper.getInstance()); + try (WriteOptionsWrapper options = WriteOptionsWrapper.getInstance()) { + dataSource.updateByBatch(rows, options); + } assertEquals(0, dataSource.allKeys().size()); rows.clear(); diff --git a/framework/src/test/java/org/tron/core/db/DBIteratorTest.java b/framework/src/test/java/org/tron/core/db/DBIteratorTest.java index b4f7ca424c0..100502428d0 100644 --- a/framework/src/test/java/org/tron/core/db/DBIteratorTest.java +++ b/framework/src/test/java/org/tron/core/db/DBIteratorTest.java @@ -14,6 +14,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.tron.core.db.common.iterator.RockStoreIterator; @@ -83,7 +84,7 @@ public void testRocksDb() throws RocksDBException, IOException { RocksDB db = RocksDB.open(options, file.toString())) { db.put("1".getBytes(StandardCharsets.UTF_8), "1".getBytes(StandardCharsets.UTF_8)); db.put("2".getBytes(StandardCharsets.UTF_8), "2".getBytes(StandardCharsets.UTF_8)); - RockStoreIterator iterator = new RockStoreIterator(db.newIterator()); + RockStoreIterator iterator = new RockStoreIterator(db.newIterator(), new ReadOptions()); iterator.seekToFirst(); Assert.assertArrayEquals("1".getBytes(StandardCharsets.UTF_8), iterator.getKey()); Assert.assertArrayEquals("1".getBytes(StandardCharsets.UTF_8), iterator.next().getValue()); @@ -99,7 +100,7 @@ public void testRocksDb() throws RocksDBException, IOException { Assert.assertTrue(e instanceof IllegalStateException); } - iterator = new RockStoreIterator(db.newIterator()); + iterator = new RockStoreIterator(db.newIterator(), new ReadOptions()); iterator.seekToLast(); Assert.assertArrayEquals("2".getBytes(StandardCharsets.UTF_8), iterator.getKey()); Assert.assertArrayEquals("2".getBytes(StandardCharsets.UTF_8), iterator.getValue()); diff --git a/plugins/src/main/java/common/org/tron/plugins/DbConvert.java b/plugins/src/main/java/common/org/tron/plugins/DbConvert.java index 37ea6bdeca4..bcf6e1e7afc 100644 --- a/plugins/src/main/java/common/org/tron/plugins/DbConvert.java +++ b/plugins/src/main/java/common/org/tron/plugins/DbConvert.java @@ -223,8 +223,8 @@ private void batchInsert(RocksDB rocks, List keys, List values) * @throws Exception RocksDBException */ private void write(RocksDB rocks, org.rocksdb.WriteBatch batch) throws Exception { - try { - rocks.write(new org.rocksdb.WriteOptions(), batch); + try (org.rocksdb.WriteOptions writeOptions = new org.rocksdb.WriteOptions()) { + rocks.write(writeOptions, batch); } catch (RocksDBException e) { // retry if (maybeRetry(e)) { @@ -259,7 +259,8 @@ public void convertLevelToRocks() throws Exception { JniDBFactory.pushMemoryPool(1024 * 1024); try ( DB level = DBUtils.newLevelDb(srcDbPath); - RocksDB rocks = DBUtils.newRocksDbForBulkLoad(dstDbPath); + org.rocksdb.Options options = DBUtils.newDefaultRocksDbOptions(true, dbName); + RocksDB rocks = RocksDB.open(options, this.dstDbPath.toString()); DBIterator levelIterator = level.iterator( new org.iq80.leveldb.ReadOptions().fillCache(false))) { @@ -291,7 +292,8 @@ private void compact() throws RocksDBException { if (DBUtils.MARKET_PAIR_PRICE_TO_ORDER.equalsIgnoreCase(this.dbName)) { return; } - try (RocksDB rocks = DBUtils.newRocksDb(this.dstDbPath)) { + try (org.rocksdb.Options options = DBUtils.newDefaultRocksDbOptions(false, dbName); + RocksDB rocks = RocksDB.open(options, this.dstDbPath.toString())) { logger.info("compact database {} start", this.dbName); rocks.compactRange(); logger.info("compact database {} end", this.dbName); @@ -300,7 +302,8 @@ private void compact() throws RocksDBException { private boolean check() throws RocksDBException { try ( - RocksDB rocks = DBUtils.newRocksDbReadOnly(this.dstDbPath); + org.rocksdb.Options options = DBUtils.newDefaultRocksDbOptions(false, dbName); + RocksDB rocks = RocksDB.openReadOnly(options, this.dstDbPath.toString()); org.rocksdb.ReadOptions r = new org.rocksdb.ReadOptions().setFillCache(false); RocksIterator rocksIterator = rocks.newIterator(r)) { diff --git a/plugins/src/main/java/common/org/tron/plugins/DbLite.java b/plugins/src/main/java/common/org/tron/plugins/DbLite.java index 732d4913021..3f8a6cb58c8 100644 --- a/plugins/src/main/java/common/org/tron/plugins/DbLite.java +++ b/plugins/src/main/java/common/org/tron/plugins/DbLite.java @@ -656,12 +656,13 @@ private boolean isLite(String databaseDir) throws RocksDBException, IOException private long getSecondBlock(String databaseDir) throws RocksDBException, IOException { long num = 0; DBInterface sourceBlockIndexDb = DbTool.getDB(databaseDir, BLOCK_INDEX_DB_NAME); - DBIterator iterator = sourceBlockIndexDb.iterator(); - iterator.seek(ByteArray.fromLong(1)); - if (iterator.hasNext()) { - num = Longs.fromByteArray(iterator.getKey()); + try (DBIterator iterator = sourceBlockIndexDb.iterator()) { + iterator.seek(ByteArray.fromLong(1)); + if (iterator.hasNext()) { + num = Longs.fromByteArray(iterator.getKey()); + } + return num; } - return num; } private DBInterface getCheckpointDb(String sourceDir) throws IOException, RocksDBException { diff --git a/plugins/src/main/java/common/org/tron/plugins/utils/DBUtils.java b/plugins/src/main/java/common/org/tron/plugins/utils/DBUtils.java index e003b098a43..6eb097cbec5 100644 --- a/plugins/src/main/java/common/org/tron/plugins/utils/DBUtils.java +++ b/plugins/src/main/java/common/org/tron/plugins/utils/DBUtils.java @@ -14,8 +14,6 @@ import org.rocksdb.BloomFilter; import org.rocksdb.ComparatorOptions; import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; import org.tron.common.arch.Arch; import org.tron.common.utils.MarketOrderPriceComparatorForLevelDB; import org.tron.common.utils.MarketOrderPriceComparatorForRocksDB; @@ -88,7 +86,23 @@ public static org.iq80.leveldb.Options newDefaultLevelDbOptions() { return dbOptions; } - private static Options newDefaultRocksDbOptions(boolean forBulkLoad) { + /** + * Creates a new RocksDB Options. + * + *

CRITICAL: Must be closed after use to prevent native memory leaks. + * Use try-with-resources. + * + *

{@code
+   * try (Options options = newDefaultRocksDbOptions(false, name)) {
+   *     // do something
+   * }
+   * }
+ * + * @param forBulkLoad if true, optimizes for bulk loading + * @param name db name + * @return a new Options instance that must be closed + */ + public static Options newDefaultRocksDbOptions(boolean forBulkLoad, String name) { Options options = new Options(); options.setCreateIfMissing(true); options.setIncreaseParallelism(1); @@ -111,35 +125,10 @@ private static Options newDefaultRocksDbOptions(boolean forBulkLoad) { if (forBulkLoad) { options.prepareForBulkLoad(); } - return options; - } - - public static RocksDB newRocksDb(Path db) throws RocksDBException { - try (Options options = newDefaultRocksDbOptions(false)) { - if (MARKET_PAIR_PRICE_TO_ORDER.equalsIgnoreCase(db.getFileName().toString())) { - options.setComparator(new MarketOrderPriceComparatorForRocksDB(new ComparatorOptions())); - } - return RocksDB.open(options, db.toString()); - } - } - - public static RocksDB newRocksDbForBulkLoad(Path db) throws RocksDBException { - try (Options options = newDefaultRocksDbOptions(true)) { - if (MARKET_PAIR_PRICE_TO_ORDER.equalsIgnoreCase(db.getFileName().toString())) { - options.setComparator(new MarketOrderPriceComparatorForRocksDB(new ComparatorOptions())); - } - return RocksDB.open(options, db.toString()); - } - } - - - public static RocksDB newRocksDbReadOnly(Path db) throws RocksDBException { - try (Options options = newDefaultRocksDbOptions(false)) { - if (MARKET_PAIR_PRICE_TO_ORDER.equalsIgnoreCase(db.getFileName().toString())) { - options.setComparator(new MarketOrderPriceComparatorForRocksDB(new ComparatorOptions())); - } - return RocksDB.openReadOnly(options, db.toString()); + if (MARKET_PAIR_PRICE_TO_ORDER.equalsIgnoreCase(name)) { + options.setComparator(new MarketOrderPriceComparatorForRocksDB(new ComparatorOptions())); } + return options; } public static String simpleDecode(byte[] bytes) { diff --git a/plugins/src/main/java/common/org/tron/plugins/utils/db/DBInterface.java b/plugins/src/main/java/common/org/tron/plugins/utils/db/DBInterface.java index 513e021c83c..13a195f9347 100644 --- a/plugins/src/main/java/common/org/tron/plugins/utils/db/DBInterface.java +++ b/plugins/src/main/java/common/org/tron/plugins/utils/db/DBInterface.java @@ -12,9 +12,25 @@ public interface DBInterface extends Closeable { void delete(byte[] key); + /** + * Returns an iterator over the database. + * + *

CRITICAL: The returned iterator holds native resources and MUST be closed + * after use to prevent memory leaks. It is strongly recommended to use a try-with-resources + * statement. + * + *

Example of correct usage: + *

{@code
+   * try (DBIterator iterator = db.iterator()) {
+   *  // do something
+   * }
+   * }
+ * + * @return a new database iterator that must be closed. + */ DBIterator iterator(); - long size(); + long size() throws IOException; void close() throws IOException; diff --git a/plugins/src/main/java/common/org/tron/plugins/utils/db/DbTool.java b/plugins/src/main/java/common/org/tron/plugins/utils/db/DbTool.java index cf4c69505bc..127b8f97db5 100644 --- a/plugins/src/main/java/common/org/tron/plugins/utils/db/DbTool.java +++ b/plugins/src/main/java/common/org/tron/plugins/utils/db/DbTool.java @@ -181,7 +181,7 @@ public static LevelDBImpl openLevelDb(Path db, String name) throws IOException { } public static RocksDBImpl openRocksDb(Path db, String name) throws RocksDBException { - RocksDBImpl rocksdb = new RocksDBImpl(DBUtils.newRocksDb(db), name); + RocksDBImpl rocksdb = new RocksDBImpl(db, name); tryInitEngineFile(db, ROCKSDB); return rocksdb; } diff --git a/plugins/src/main/java/common/org/tron/plugins/utils/db/LevelDBImpl.java b/plugins/src/main/java/common/org/tron/plugins/utils/db/LevelDBImpl.java index 511f4dfd5b4..1c7f22eff1a 100644 --- a/plugins/src/main/java/common/org/tron/plugins/utils/db/LevelDBImpl.java +++ b/plugins/src/main/java/common/org/tron/plugins/utils/db/LevelDBImpl.java @@ -40,8 +40,11 @@ public DBIterator iterator() { } @Override - public long size() { - return Streams.stream(leveldb.iterator()).count(); + public long size() throws IOException { + try (DBIterator iterator = this.iterator()) { + iterator.seekToFirst(); + return Streams.stream(iterator).count(); + } } @Override diff --git a/plugins/src/main/java/common/org/tron/plugins/utils/db/RockDBIterator.java b/plugins/src/main/java/common/org/tron/plugins/utils/db/RockDBIterator.java index d3e17d9173f..17ecca4a4c1 100644 --- a/plugins/src/main/java/common/org/tron/plugins/utils/db/RockDBIterator.java +++ b/plugins/src/main/java/common/org/tron/plugins/utils/db/RockDBIterator.java @@ -2,14 +2,19 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksIterator; public class RockDBIterator implements DBIterator { private final RocksIterator iterator; + private final ReadOptions readOptions; + private final AtomicBoolean closed = new AtomicBoolean(false); - public RockDBIterator(RocksIterator iterator) { + public RockDBIterator(RocksIterator iterator, ReadOptions readOptions) { this.iterator = iterator; + this.readOptions = readOptions; } @Override @@ -72,6 +77,9 @@ public byte[] setValue(byte[] value) { @Override public void close() throws IOException { - iterator.close(); + if (closed.compareAndSet(false, true)) { + readOptions.close(); + iterator.close(); + } } } diff --git a/plugins/src/main/java/common/org/tron/plugins/utils/db/RocksDBImpl.java b/plugins/src/main/java/common/org/tron/plugins/utils/db/RocksDBImpl.java index 50957bbe61b..236d0a847b3 100644 --- a/plugins/src/main/java/common/org/tron/plugins/utils/db/RocksDBImpl.java +++ b/plugins/src/main/java/common/org/tron/plugins/utils/db/RocksDBImpl.java @@ -1,69 +1,97 @@ package org.tron.plugins.utils.db; +import com.google.common.collect.Streams; import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Getter; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; +import org.tron.plugins.utils.DBUtils; public class RocksDBImpl implements DBInterface { - private org.rocksdb.RocksDB rocksDB; + private final RocksDB rocksDB; @Getter private final String name; + private final AtomicBoolean closed = new AtomicBoolean(false); + private Options options = null; - public RocksDBImpl(org.rocksdb.RocksDB rocksDB, String name) { - this.rocksDB = rocksDB; - this.name = name; + public RocksDBImpl(Path path, String name) throws RocksDBException { + try { + this.options = DBUtils.newDefaultRocksDbOptions(false, name); + this.name = name; + this.rocksDB = RocksDB.open(options, path.toString()); + } catch (RocksDBException e) { + if (this.options != null) { + this.options.close(); + } + throw e; + } } @Override public byte[] get(byte[] key) { + throwIfClosed(); try { return rocksDB.get(key); } catch (RocksDBException e) { - e.printStackTrace(); + throw new RuntimeException(name, e); } - return null; } @Override public void put(byte[] key, byte[] value) { + throwIfClosed(); try { rocksDB.put(key, value); } catch (RocksDBException e) { - e.printStackTrace(); + throw new RuntimeException(name, e); } } @Override public void delete(byte[] key) { + throwIfClosed(); try { rocksDB.delete(key); } catch (RocksDBException e) { - e.printStackTrace(); + throw new RuntimeException(name, e); } } @Override public DBIterator iterator() { - return new RockDBIterator(rocksDB.newIterator( - new org.rocksdb.ReadOptions().setFillCache(false))); + throwIfClosed(); + ReadOptions readOptions = new ReadOptions().setFillCache(false); + return new RockDBIterator(rocksDB.newIterator(readOptions), readOptions); } @Override - public long size() { - RocksIterator iterator = rocksDB.newIterator(); - long size = 0; - for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) { - size++; + public long size() throws IOException { + throwIfClosed(); + try (DBIterator iterator = this.iterator()) { + iterator.seekToFirst(); + return Streams.stream(iterator).count(); } - iterator.close(); - return size; } @Override public void close() throws IOException { - rocksDB.close(); + if (closed.compareAndSet(false, true)) { + if (this.options != null) { + this.options.close(); + } + rocksDB.close(); + } + } + + private void throwIfClosed() { + if (closed.get()) { + throw new IllegalStateException("db " + name + " has been closed"); + } } } diff --git a/plugins/src/test/java/org/tron/plugins/DbTest.java b/plugins/src/test/java/org/tron/plugins/DbTest.java index da693a720c2..bbcc1a0bbf7 100644 --- a/plugins/src/test/java/org/tron/plugins/DbTest.java +++ b/plugins/src/test/java/org/tron/plugins/DbTest.java @@ -5,6 +5,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.UUID; +import org.junit.Assert; import org.junit.Rule; import org.junit.rules.TemporaryFolder; import org.rocksdb.RocksDBException; @@ -77,11 +78,13 @@ private static void initDB(String sourceDir, String dbName, DbTool.DbType dbType db.put(pairPriceKey1, "1".getBytes(StandardCharsets.UTF_8)); db.put(pairPriceKey2, "2".getBytes(StandardCharsets.UTF_8)); db.put(pairPriceKey3, "3".getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(3, db.size()); } else { for (int i = 0; i < 100; i++) { byte[] bytes = UUID.randomUUID().toString().getBytes(); db.put(bytes, bytes); } + Assert.assertEquals(100, db.size()); } } }