Permalink
Browse files

added MySQL configuration items and modified rebalance logic. New

rebalance logic is 60 times fast than older.
  • Loading branch information...
1 parent 667da20 commit 68fca813a03ebe9466555dcf5730b0ceb5ff9b98 @baepiff baepiff committed Mar 19, 2012
@@ -89,6 +89,12 @@
private String mysqlDatabaseName;
private String mysqlHost;
private int mysqlPort;
+ // added by CyberAgent 2012
+ private String mysqlValueType;
+ private int mysqlDsInitialPoolSize;
+ private boolean mysqlDsPoolPreparedStatements;
+ private int mysqlDsMaxActiveConnections;
+ private int mysqlDsMinIdleConnections;
private int readOnlyBackups;
private String readOnlyStorageDir;
@@ -243,6 +249,12 @@ public VoldemortConfig(Props props) {
this.mysqlPort = props.getInt("mysql.port", 3306);
this.mysqlDatabaseName = props.getString("mysql.database", "voldemort");
+ this.mysqlValueType = props.getString("mysql.valuetype", "MEDIUMBLOB");
+ this.mysqlDsInitialPoolSize = props.getInt("mysql.ds.initialpoolsize", 0);
+ this.mysqlDsPoolPreparedStatements = props.getBoolean("mysql.ds.poolpreparedstatements",
+ false);
+ this.mysqlDsMaxActiveConnections = props.getInt("mysql.ds.maxactiveconnections", 8);
+ this.mysqlDsMinIdleConnections = props.getInt("mysql.ds.minidleconnections", 0);
this.maxThreads = props.getInt("max.threads", 100);
this.coreThreads = props.getInt("core.threads", Math.max(1, maxThreads / 2));
@@ -389,6 +401,21 @@ else if(coreThreads > maxThreads)
+ this.schedulerThreads + " set.");
if(enableServerRouting && !enableSocketServer)
throw new ConfigurationException("Server-side routing is enabled, this requires the socket server to also be enabled.");
+ if(mysqlDsInitialPoolSize < 0)
+ throw new ConfigurationException("mysql.ds.initialpoolsize must be 0 or more.");
+ if(!validateMysqlValueType())
+ throw new ConfigurationException("mysql.valuetype must be one of TINYBLOB, BLOB, MEDIUMBLOB and LONGBLOB.");
+
+ }
+
+ private boolean validateMysqlValueType() {
+ String[] array = { "TINYBLOB", "BLOB", "MEDIUMBLOB", "LONGBLOB" };
+ for(String word: array) {
+ if(mysqlValueType.equalsIgnoreCase(word)) {
+ return true;
+ }
+ }
+ return false;
}
private int getIntEnvVariable(String name) {
@@ -1476,4 +1503,43 @@ public void setEnableJmxClusterName(boolean enableJmxClusterName) {
this.enableJmxClusterName = enableJmxClusterName;
}
+ public String getMysqlValueType() {
+ return mysqlValueType;
+ }
+
+ public void setMysqlValueType(String mysqlValueType) {
+ this.mysqlValueType = mysqlValueType;
+ }
+
+ public int getMysqlDsInitialPoolSize() {
+ return mysqlDsInitialPoolSize;
+ }
+
+ public void setMysqlDsInitialPoolSize(int mysqlDsInitialPoolSize) {
+ this.mysqlDsInitialPoolSize = mysqlDsInitialPoolSize;
+ }
+
+ public boolean isMysqlDsPoolPreparedStatements() {
+ return mysqlDsPoolPreparedStatements;
+ }
+
+ public void setMysqlDsPoolPreparedStatements(boolean mysqlDsPoolPreparedStatements) {
+ this.mysqlDsPoolPreparedStatements = mysqlDsPoolPreparedStatements;
+ }
+
+ public int getMysqlDsMaxActiveConnections() {
+ return mysqlDsMaxActiveConnections;
+ }
+
+ public void setMysqlDsMaxActiveConnections(int mysqlDsMaxActiveConnections) {
+ this.mysqlDsMaxActiveConnections = mysqlDsMaxActiveConnections;
+ }
+
+ public int getMysqlDsMinIdleConnections() {
+ return mysqlDsMinIdleConnections;
+ }
+
+ public void setMysqlDsMinIdleConnections(int mysqlDsMinIdleConnections) {
+ this.mysqlDsMinIdleConnections = mysqlDsMinIdleConnections;
+ }
}
@@ -31,6 +31,7 @@
public static final String TYPE_NAME = "mysql";
private BasicDataSource dataSource;
+ private final String valueType;
public MysqlStorageConfiguration(VoldemortConfig config) {
BasicDataSource ds = new BasicDataSource();
@@ -39,11 +40,17 @@ public MysqlStorageConfiguration(VoldemortConfig config) {
ds.setUsername(config.getMysqlUsername());
ds.setPassword(config.getMysqlPassword());
ds.setDriverClassName("com.mysql.jdbc.Driver");
+
+ ds.setInitialSize(config.getMysqlDsInitialPoolSize());
+ ds.setPoolPreparedStatements(config.isMysqlDsPoolPreparedStatements());
+ ds.setMaxActive(config.getMysqlDsMaxActiveConnections());
+ ds.setMinIdle(config.getMysqlDsMinIdleConnections());
this.dataSource = ds;
+ this.valueType = config.getMysqlValueType();
}
public StorageEngine<ByteArray, byte[], byte[]> getStore(String name) {
- return new MysqlStorageEngine(name, dataSource);
+ return new MysqlStorageEngine(name, dataSource, valueType);
}
public String getType() {
@@ -20,6 +20,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.List;
import java.util.Map;
@@ -56,11 +57,13 @@
private static int MYSQL_ERR_DUP_ENTRY = 1062;
private final String name;
+ private final String valueType;
private final DataSource datasource;
- public MysqlStorageEngine(String name, DataSource datasource) {
+ public MysqlStorageEngine(String name, DataSource datasource, String valueType) {
this.name = name;
this.datasource = datasource;
+ this.valueType = valueType;
if(!tableExists()) {
create();
@@ -93,8 +96,8 @@ public void destroy() {
public void create() {
execute("create table " + getName()
- + " (key_ varbinary(200) not null, version_ varbinary(200) not null, "
- + " value_ blob, primary key(key_, version_)) engine = InnoDB");
+ + " (key_ varbinary(200) not null, version_ varbinary(200) not null, " + " value_ "
+ + valueType + ", primary key(key_, version_)) engine = InnoDB");
}
public void execute(String query) {
@@ -113,7 +116,19 @@ public void execute(String query) {
}
public ClosableIterator<ByteArray> keys() {
- return StoreUtils.keys(entries());
+ Connection conn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ String select = "select key_ from " + name;
+ try {
+ conn = datasource.getConnection();
+ stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ stmt.setFetchSize(Integer.MIN_VALUE);
+ rs = stmt.executeQuery(select);
+ return new MysqlKeyIterator(conn, stmt, rs);
+ } catch(SQLException e) {
+ throw new PersistenceFailureException("Fix me!", e);
+ }
}
public void truncate() {
@@ -134,14 +149,15 @@ public void truncate() {
public ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entries() {
Connection conn = null;
- PreparedStatement stmt = null;
+ Statement stmt = null;
ResultSet rs = null;
String select = "select key_, version_, value_ from " + name;
try {
conn = datasource.getConnection();
- stmt = conn.prepareStatement(select);
- rs = stmt.executeQuery();
- return new MysqlClosableIterator(conn, stmt, rs);
+ stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ stmt.setFetchSize(Integer.MIN_VALUE);
+ rs = stmt.executeQuery(select);
+ return new MysqlEntryIterator(conn, stmt, rs);
} catch(SQLException e) {
throw new PersistenceFailureException("Fix me!", e);
}
@@ -270,8 +286,7 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
if(occurred == Occurred.BEFORE)
throw new ObsoleteVersionException("Attempt to put version "
+ value.getVersion()
- + " which is superceeded by " + version
- + ".");
+ + " which is superseded by " + version + ".");
else if(occurred == Occurred.AFTER)
delete(conn, thisKey, version.toBytes());
}
@@ -324,6 +339,15 @@ private void tryClose(Connection c) {
}
}
+ private void tryClose(Statement s) {
+ try {
+ if(s != null)
+ s.close();
+ } catch(Exception e) {
+ logger.error("Failed to close statement.", e);
+ }
+ }
+
private void tryClose(PreparedStatement s) {
try {
if(s != null)
@@ -333,17 +357,22 @@ private void tryClose(PreparedStatement s) {
}
}
- private class MysqlClosableIterator implements
- ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> {
+ public List<Version> getVersions(ByteArray key) {
+ return StoreUtils.getVersions(get(key, null));
+ }
+
+ public boolean isPartitionAware() {
+ return false;
+ }
+
+ private abstract class MysqlClosableIterator<T> implements ClosableIterator<T> {
- private boolean hasMore;
- private final ResultSet rs;
- private final Connection connection;
- private final PreparedStatement statement;
+ boolean hasMore;
+ final ResultSet rs;
+ final Connection connection;
+ final Statement statement;
- public MysqlClosableIterator(Connection connection,
- PreparedStatement statement,
- ResultSet resultSet) {
+ public MysqlClosableIterator(Connection connection, Statement statement, ResultSet resultSet) {
try {
// Move to the first item
this.hasMore = resultSet.next();
@@ -365,35 +394,57 @@ public boolean hasNext() {
return this.hasMore;
}
- public Pair<ByteArray, Versioned<byte[]>> next() {
+ public abstract T next();
+
+ public void remove() {
+ try {
+ rs.deleteRow();
+ } catch(SQLException e) {
+ throw new PersistenceFailureException(e);
+ }
+ }
+ }
+
+ private class MysqlKeyIterator extends MysqlClosableIterator<ByteArray> {
+
+ public MysqlKeyIterator(Connection connection, Statement statement, ResultSet resultSet) {
+ super(connection, statement, resultSet);
+ }
+
+ @Override
+ public ByteArray next() {
try {
if(!this.hasMore)
throw new PersistenceFailureException("Next called on iterator, but no more items available!");
ByteArray key = new ByteArray(rs.getBytes("key_"));
- byte[] value = rs.getBytes("value_");
- VectorClock clock = new VectorClock(rs.getBytes("version_"));
this.hasMore = rs.next();
- return Pair.create(key, new Versioned<byte[]>(value, clock));
+ return key;
} catch(SQLException e) {
throw new PersistenceFailureException(e);
}
}
+ }
- public void remove() {
+ private class MysqlEntryIterator extends
+ MysqlClosableIterator<Pair<ByteArray, Versioned<byte[]>>> {
+
+ public MysqlEntryIterator(Connection connection, Statement statement, ResultSet resultSet) {
+ super(connection, statement, resultSet);
+ }
+
+ @Override
+ public Pair<ByteArray, Versioned<byte[]>> next() {
try {
- rs.deleteRow();
+ if(!this.hasMore)
+ throw new PersistenceFailureException("Next called on iterator, but no more items available!");
+ ByteArray key = new ByteArray(rs.getBytes("key_"));
+ byte[] value = rs.getBytes("value_");
+ VectorClock clock = new VectorClock(rs.getBytes("version_"));
+ this.hasMore = rs.next();
+ return Pair.create(key, new Versioned<byte[]>(value, clock));
} catch(SQLException e) {
throw new PersistenceFailureException(e);
}
}
-
- }
-
- public List<Version> getVersions(ByteArray key) {
- return StoreUtils.getVersions(get(key, null));
- }
-
- public boolean isPartitionAware() {
- return false;
}
}
@@ -43,7 +43,7 @@ public void setUp() throws Exception {
@Override
public StorageEngine<ByteArray, byte[], byte[]> getStorageEngine() {
- return new MysqlStorageEngine("test_store", getDataSource());
+ return new MysqlStorageEngine("test_store", getDataSource(), "BLOB");
}
@Override
@@ -69,7 +69,7 @@ public void executeQuery(DataSource datasource, String query) throws SQLExceptio
public void testOpenNonExistantStoreCreatesTable() throws SQLException {
String newStore = TestUtils.randomLetters(15);
/* Create the engine for side-effect */
- new MysqlStorageEngine(newStore, getDataSource());
+ new MysqlStorageEngine(newStore, getDataSource(), "BLOB");
DataSource ds = getDataSource();
executeQuery(ds, "select 1 from " + newStore + " limit 1");
executeQuery(ds, "drop table " + newStore);

0 comments on commit 68fca81

Please sign in to comment.