Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rocksdb22 #2028

Merged
merged 14 commits into from Mar 7, 2019
5 changes: 4 additions & 1 deletion build.gradle
Expand Up @@ -116,6 +116,8 @@ dependencies {

compile "org.iq80.leveldb:leveldb:0.7"

compile group: 'org.rocksdb', name: 'rocksdbjni', version: '5.15.10'

compile group: leveldbGroup, name: leveldbName, version: leveldbVersion

compile "org.apache.commons:commons-collections4:4.0"
Expand Down Expand Up @@ -338,5 +340,6 @@ def binaryRelease(taskName, jarName, mainClass) {
artifacts {
archives(binaryRelease('buildSolidityNodeJar', 'SolidityNode', 'org.tron.program.SolidityNode'),
binaryRelease('buildFullNodeJar', 'FullNode', 'org.tron.program.FullNode'),
binaryRelease('buildKeystoreFactoryJar', 'KeystoreFactory', 'org.tron.program.KeystoreFactory'))
binaryRelease('buildKeystoreFactoryJar', 'KeystoreFactory', 'org.tron.program.KeystoreFactory'),
binaryRelease('buildDBConvertJar', 'DBConvert', 'org.tron.program.DBConvert'))
}
Expand Up @@ -68,6 +68,7 @@ public void startup() {
@Override
public void shutdown() {
logger.info("******** begin to shutdown ********");
//p2pNode.shutDown();
synchronized (dbManager.getRevokingStore()) {
closeRevokingStore();
closeAllStore();
Expand Down Expand Up @@ -124,6 +125,7 @@ private void closeConnection() {
}

private void closeRevokingStore() {
logger.info("******** begin to closeRevokingStore ********");
dbManager.getRevokingStore().shutdown();
}

Expand Down
4 changes: 1 addition & 3 deletions src/main/java/org/tron/common/storage/BatchSourceInter.java
Expand Up @@ -18,8 +18,6 @@

package org.tron.common.storage;

import org.iq80.leveldb.WriteOptions;

import java.util.Map;


Expand All @@ -28,5 +26,5 @@ public interface BatchSourceInter<K, V> extends SourceInter<K, V> {

void updateByBatch(Map<K, V> rows);

void updateByBatch(Map<K, V> rows, WriteOptions writeOptions);
void updateByBatch(Map<K, V> rows, WriteOptionsWrapper writeOptions);
}
155 changes: 155 additions & 0 deletions src/main/java/org/tron/common/storage/DBSettings.java
@@ -0,0 +1,155 @@
package org.tron.common.storage;


import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.rocksdb.CompressionType;

@Slf4j
public class DBSettings {

private static DBSettings settings;

@Getter
private int levelNumber;
@Getter
private int maxOpenFiles;
@Getter
private int compactThreads;
@Getter
private long blockSize;
@Getter
private long maxBytesForLevelBase;
@Getter
private double maxBytesForLevelMultiplier;
@Getter
private int level0FileNumCompactionTrigger;
@Getter
private List<CompressionType> compressionTypeList;
@Getter
private long targetFileSizeBase;
@Getter
private int targetFileSizeMultiplier;
@Getter
private boolean enableStatistics;

private DBSettings() {

}

public static DBSettings getDefaultSettings() {
DBSettings defaultSettings = new DBSettings();
return defaultSettings.withLevelNumber(7).withBlockSize(64).withCompactThreads(32)
.withCompressionTypeList("no:no:no:lz4:lz4:zstd:zstd").withTargetFileSizeBase(256)
.withMaxBytesForLevelMultiplier(10).withTargetFileSizeMultiplier(1).withMaxBytesForLevelBase(256).withMaxOpenFiles(-1)
.withEnableStatistics(false);
}

public static DBSettings getSettings() {
if (settings == null) {
return getDefaultSettings();
}
return settings;
}

public static DBSettings initCustomSettings(int levelNumber, int compactThreads, int blocksize,
long maxBytesForLevelBase,
double maxBytesForLevelMultiplier, String compressionStr,
int level0FileNumCompactionTrigger, long targetFileSizeBase, int targetFileSizeMultiplier) {
settings = new DBSettings()
.withMaxOpenFiles(-1)
.withEnableStatistics(false)
.withLevelNumber(levelNumber)
.withCompactThreads(compactThreads)
.withBlockSize(blocksize)
.withMaxBytesForLevelBase(maxBytesForLevelBase)
.withMaxBytesForLevelMultiplier(maxBytesForLevelMultiplier)
.withCompressionTypeList(compressionStr)
.withLevel0FileNumCompactionTrigger(level0FileNumCompactionTrigger)
.withTargetFileSizeBase(targetFileSizeBase)
.withTargetFileSizeMultiplier(targetFileSizeMultiplier);
return settings;
}


public DBSettings withMaxOpenFiles(int maxOpenFiles) {
this.maxOpenFiles = maxOpenFiles;
return this;
}

public DBSettings withCompactThreads(int compactThreads) {
this.compactThreads = compactThreads;
return this;
}

public DBSettings withBlockSize(long blockSize) {
this.blockSize = blockSize * 1024;
return this;
}

public DBSettings withMaxBytesForLevelBase(long maxBytesForLevelBase) {
this.maxBytesForLevelBase = maxBytesForLevelBase * 1024 * 1024;
return this;
}

public DBSettings withMaxBytesForLevelMultiplier(double maxBytesForLevelMultiplier) {
this.maxBytesForLevelMultiplier = maxBytesForLevelMultiplier;
return this;
}

public DBSettings withLevel0FileNumCompactionTrigger(int level0FileNumCompactionTrigger) {
this.level0FileNumCompactionTrigger = level0FileNumCompactionTrigger;
return this;
}

public DBSettings withCompressionTypeList(String compressionTypeListString) {
List<String> compressionTypeStringList = Arrays.asList(compressionTypeListString.split(":"));
List<CompressionType> compressionTypeList = new ArrayList<>();
for (String str : compressionTypeStringList) {
if (str.equals("no")) {
compressionTypeList.add(CompressionType.NO_COMPRESSION);
} else if (str.equals("lz4")) {
compressionTypeList.add(CompressionType.LZ4_COMPRESSION);
} else if (str.equals("zstd")) {
compressionTypeList.add(CompressionType.ZSTD_COMPRESSION);
} else if (str.equals("zlib")) {
compressionTypeList.add(CompressionType.ZLIB_COMPRESSION);
}
}
this.compressionTypeList = compressionTypeList;
return this;
}

public DBSettings withEnableStatistics(boolean enable) {
this.enableStatistics = enable;
return this;
}

public DBSettings withLevelNumber(int levelNumber) {
this.levelNumber = levelNumber;
return this;
}


public DBSettings withTargetFileSizeBase(long targetFileSizeBase) {
this.targetFileSizeBase = targetFileSizeBase * 1024 * 1024;
return this;
}

public DBSettings withTargetFileSizeMultiplier(int targetFileSizeMultiplier) {
this.targetFileSizeMultiplier = targetFileSizeMultiplier;
return this;
}

public static void loggingSettings() {
logger.info(String.format(
"level number: %d, CompactThreads: %d, Blocksize: %d, maxBytesForLevelBase: %d, withMaxBytesForLevelMultiplier: %f, level0FileNumCompactionTrigger: %d, withTargetFileSizeBase: %d, withTargetFileSizeMultiplier: %d",
settings.getLevelNumber(),
settings.getCompactThreads(), settings.getBlockSize(), settings.getMaxBytesForLevelBase(),
settings.getMaxBytesForLevelMultiplier(), settings.getLevel0FileNumCompactionTrigger(),
settings.getTargetFileSizeBase(), settings.getTargetFileSizeMultiplier()));
}
}
6 changes: 2 additions & 4 deletions src/main/java/org/tron/common/storage/SourceInter.java
Expand Up @@ -18,21 +18,19 @@
package org.tron.common.storage;


import org.iq80.leveldb.WriteOptions;

public interface SourceInter<K, V> {


void putData(K key, V val);

void putData(K k, V v, WriteOptions options);
void putData(K k, V v, WriteOptionsWrapper options);

V getData(K key);


void deleteData(K key);

void deleteData(K k, WriteOptions options);
void deleteData(K k, WriteOptionsWrapper options);

boolean flush();

Expand Down
25 changes: 25 additions & 0 deletions src/main/java/org/tron/common/storage/WriteOptionsWrapper.java
@@ -0,0 +1,25 @@
package org.tron.common.storage;

public class WriteOptionsWrapper {

public org.rocksdb.WriteOptions rocks = null;
public org.iq80.leveldb.WriteOptions level = null;

private WriteOptionsWrapper() {

}

public static WriteOptionsWrapper getInstance() {
WriteOptionsWrapper wapper = new WriteOptionsWrapper();
wapper.level = new org.iq80.leveldb.WriteOptions();
wapper.rocks = new org.rocksdb.WriteOptions();
return wapper;
}


public WriteOptionsWrapper sync(boolean bool) {
this.level.sync(bool);
this.rocks.setSync(bool);
return this;
}
}
Expand Up @@ -43,7 +43,9 @@
import org.iq80.leveldb.WriteBatch;
import org.iq80.leveldb.WriteOptions;
import org.tron.common.storage.DbSourceInter;
import org.tron.common.storage.WriteOptionsWrapper;
import org.tron.common.utils.FileUtil;
import org.tron.common.utils.PropUtil;
import org.tron.core.config.args.Args;
import org.tron.core.db.common.iterator.StoreIterator;

Expand All @@ -64,13 +66,43 @@ public class LevelDbDataSourceImpl implements DbSourceInter<byte[]>,
public LevelDbDataSourceImpl(String parentName, String name) {
this.dataBaseName = name;
this.parentName = Paths.get(
parentName,
Args.getInstance().getStorage().getDbDirectory()
parentName,
Args.getInstance().getStorage().getDbDirectory()
).toString();
}

public boolean checkOrInitEngine() {
String dir = Args.getInstance().getOutputDirectory() + Args.getInstance().getStorage().getDbDirectory() + File.separator + dataBaseName;
String enginePath = dir + File.separator + "engine.properties";

if (FileUtil.createDirIfNotExists(dir)) {
if (!FileUtil.createFileIfNotExists(enginePath)) {
return false;
}
} else {
return false;
}

String engine = PropUtil.readProperty(enginePath, "ENGINE");
if (engine.equals("")) {
if (!PropUtil.writeProperty(enginePath, "ENGINE", "LEVELDB")) {
return false;
}
}
engine = PropUtil.readProperty(enginePath, "ENGINE");
if (engine.equals("LEVELDB")) {
return true;
} else {
return false;
}
}

@Override
public void initDB() {
if (!checkOrInitEngine()) {
logger.error("database engine do not match");
throw new RuntimeException("Failed to initialize database");
}
resetDbLock.writeLock().lock();
try {
logger.debug("~> LevelDbDataSourceImpl.initDB(): " + dataBaseName);
Expand Down Expand Up @@ -207,10 +239,10 @@ public void putData(byte[] key, byte[] value) {
}

@Override
public void putData(byte[] key, byte[] value, WriteOptions options) {
public void putData(byte[] key, byte[] value, WriteOptionsWrapper options) {
resetDbLock.readLock().lock();
try {
database.put(key, value, options);
database.put(key, value, options.level);
} finally {
resetDbLock.readLock().unlock();
}
Expand All @@ -227,10 +259,10 @@ public void deleteData(byte[] key) {
}

@Override
public void deleteData(byte[] key, WriteOptions options) {
public void deleteData(byte[] key, WriteOptionsWrapper options) {
resetDbLock.readLock().lock();
try {
database.delete(key, options);
database.delete(key, options.level);
} finally {
resetDbLock.readLock().unlock();
}
Expand Down Expand Up @@ -416,13 +448,13 @@ public void updateByBatch(Map<byte[], byte[]> rows) {
}

@Override
public void updateByBatch(Map<byte[], byte[]> rows, WriteOptions options) {
public void updateByBatch(Map<byte[], byte[]> rows, WriteOptionsWrapper options) {
resetDbLock.readLock().lock();
try {
updateByBatchInner(rows, options);
updateByBatchInner(rows, options.level);
} catch (Exception e) {
try {
updateByBatchInner(rows, options);
updateByBatchInner(rows, options.level);
} catch (Exception e1) {
throw new RuntimeException(e);
}
Expand Down