Skip to content

Commit

Permalink
支持通过ALTER DATABASE语句执行复制模式场景下的在线扩容
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Aug 31, 2018
1 parent cc07803 commit faed047
Show file tree
Hide file tree
Showing 13 changed files with 524 additions and 86 deletions.
Expand Up @@ -266,6 +266,7 @@ public void replicate(Database db, RunMode oldRunMode, RunMode newRunMode, Strin

@Override
public String[] getReplicationEndpoints(Database db) {
ClusterMetaData.removeReplicationStrategy(db); // 避免使用旧的
String[] oldHostIds = db.getHostIds();
int size = oldHostIds.length;
List<NetEndpoint> oldReplicationEndpoints = new ArrayList<>(size);
Expand Down
Expand Up @@ -50,6 +50,10 @@ public static enum BootstrapState {
private static final AbstractReplicationStrategy defaultReplicationStrategy = ConfigDescriptor
.getDefaultReplicationStrategy();

public static void removeReplicationStrategy(Database db) {
replicationStrategys.remove(db);
}

public static AbstractReplicationStrategy getReplicationStrategy(Database db) {
if (db.getReplicationProperties() == null)
return defaultReplicationStrategy;
Expand Down
287 changes: 218 additions & 69 deletions lealone-sql/src/main/java/org/lealone/sql/ddl/AlterDatabase.java
Expand Up @@ -44,6 +44,9 @@ public class AlterDatabase extends DatabaseStatement {
private final Map<String, String> replicationProperties;
private final RunMode runMode;

private String[] newHostIds;
private String[] oldHostIds;

public AlterDatabase(ServerSession session, Database db, Map<String, String> parameters,
Map<String, String> replicationProperties, RunMode runMode) {
super(session);
Expand All @@ -63,85 +66,231 @@ public int update() {
checkRight();
synchronized (LealoneDatabase.getInstance().getLock(DbObjectType.DATABASE)) {
RunMode oldRunMode = db.getRunMode();
if (runMode != null)
db.setRunMode(runMode);
if (parameters != null)
db.alterParameters(parameters);
if (replicationProperties != null)
db.setReplicationProperties(replicationProperties);

boolean clientServer2ReplicationMode = false;
boolean clientServer2ShardingMode = false;
boolean replication2ShardingMode = false;
if (oldRunMode == RunMode.CLIENT_SERVER) {
if (runMode == RunMode.REPLICATION)
clientServer2ReplicationMode = true;
if (runMode == null) {
if (oldRunMode == RunMode.CLIENT_SERVER)
clientServer2ClientServer();
else if (oldRunMode == RunMode.REPLICATION)
replication2Replication();
else if (oldRunMode == RunMode.SHARDING)
sharding2Sharding();
} else if (oldRunMode == RunMode.CLIENT_SERVER) {
if (runMode == RunMode.CLIENT_SERVER)
clientServer2ClientServer();
else if (runMode == RunMode.REPLICATION)
scaleOutClientServer2Replication();
else if (runMode == RunMode.SHARDING)
clientServer2ShardingMode = true;
scaleOutClientServer2Sharding();
} else if (oldRunMode == RunMode.REPLICATION) {
if (runMode == RunMode.SHARDING)
replication2ShardingMode = true;
if (runMode == RunMode.CLIENT_SERVER)
scaleInReplication2ClientServer();
else if (runMode == RunMode.REPLICATION)
replication2Replication();
else if (runMode == RunMode.SHARDING)
scaleOutReplication2Sharding();
} else if (oldRunMode == RunMode.SHARDING) {
if (runMode == RunMode.CLIENT_SERVER)
scaleInSharding2ClientServer();
else if (runMode == RunMode.REPLICATION)
scaleInSharding2Replication();
else if (runMode == RunMode.SHARDING)
sharding2Sharding();
}
}

String[] newHostIds = null;
String[] oldHostIds = null;
if (clientServer2ReplicationMode || clientServer2ShardingMode || replication2ShardingMode) {
if (session.isRoot()) {
oldHostIds = db.getHostIds();
if (parameters != null && parameters.containsKey("hostIds")) {
newHostIds = StringUtils.arraySplit(parameters.get("hostIds"), ',', true);
} else {
if (clientServer2ReplicationMode)
newHostIds = RouterHolder.getRouter().getReplicationEndpoints(db);
else
newHostIds = RouterHolder.getRouter().getShardingEndpoints(db);
}

String hostIds = StringUtils.arrayCombine(oldHostIds, ',') + ","
+ StringUtils.arrayCombine(newHostIds, ',');
db.getParameters().put("hostIds", hostIds);

StatementBuilder sql = new StatementBuilder("ALTER DATABASE ");
sql.append(db.getShortName());
sql.append(" RUN MODE ").append(runMode.toString());
if (replicationProperties != null && !replicationProperties.isEmpty()) {
sql.append(" WITH REPLICATION STRATEGY");
Database.appendMap(sql, replicationProperties);
}
sql.append(" PARAMETERS");
Database.appendMap(sql, db.getParameters());
this.sql = sql.toString();
} else {
if (isTargetEndpoint(db)) {
oldHostIds = db.getHostIds();
HashSet<String> oldSet = new HashSet<>(Arrays.asList(oldHostIds));
if (parameters != null && parameters.containsKey("hostIds")) {
String[] hostIds = StringUtils.arraySplit(parameters.get("hostIds"), ',', true);
HashSet<String> newSet = new HashSet<>(Arrays.asList(hostIds));
newSet.removeAll(oldSet);
newHostIds = newSet.toArray(new String[0]);
} else {
DbException.throwInternalError();
}
}
}
updateRemoteEndpoints();
return 0;
}

private void alterDatabase() {
if (runMode != null)
db.setRunMode(runMode);
if (parameters != null)
db.alterParameters(parameters);
if (replicationProperties != null)
db.setReplicationProperties(replicationProperties);
}

private void updateLocalMeta() {
LealoneDatabase.getInstance().updateMeta(session, db);
}

private void updateRemoteEndpoints() {
executeDatabaseStatement(db);
}

private void rewriteSql(boolean toReplicationMode) {
if (session.isRoot()) {
oldHostIds = db.getHostIds();
if (parameters != null && parameters.containsKey("hostIds")) {
newHostIds = StringUtils.arraySplit(parameters.get("hostIds"), ',', true);
} else {
if (toReplicationMode)
newHostIds = RouterHolder.getRouter().getReplicationEndpoints(db);
else
newHostIds = RouterHolder.getRouter().getShardingEndpoints(db);
}

LealoneDatabase.getInstance().updateMeta(session, db);
String hostIds = StringUtils.arrayCombine(oldHostIds, ',') + ","
+ StringUtils.arrayCombine(newHostIds, ',');
db.getParameters().put("hostIds", hostIds);

StatementBuilder sql = new StatementBuilder("ALTER DATABASE ");
sql.append(db.getShortName());
sql.append(" RUN MODE ").append(runMode.toString());
if (replicationProperties != null && !replicationProperties.isEmpty()) {
sql.append(" WITH REPLICATION STRATEGY");
Database.appendMap(sql, replicationProperties);
}
sql.append(" PARAMETERS");
Database.appendMap(sql, db.getParameters());
this.sql = sql.toString();
} else {
if (isTargetEndpoint(db)) {
for (Storage storage : db.getStorages()) {
storage.save();
}
Database db2 = db.copy();
if (clientServer2ReplicationMode) {
RouterHolder.getRouter().replicate(db2, oldRunMode, runMode, newHostIds);
} else if (clientServer2ShardingMode || replication2ShardingMode) {
RouterHolder.getRouter().sharding(db2, oldRunMode, runMode, oldHostIds, newHostIds);
oldHostIds = db.getHostIds();
HashSet<String> oldSet = new HashSet<>(Arrays.asList(oldHostIds));
if (parameters != null && parameters.containsKey("hostIds")) {
String[] hostIds = StringUtils.arraySplit(parameters.get("hostIds"), ',', true);
HashSet<String> newSet = new HashSet<>(Arrays.asList(hostIds));
newSet.removeAll(oldSet);
newHostIds = newSet.toArray(new String[0]);
} else {
DbException.throwInternalError();
}
}
}
}

executeDatabaseStatement(db);
return 0;
private Database copyDatabase() {
for (Storage storage : db.getStorages()) {
storage.save();
}
Database db2 = db.copy();
return db2;
}

private void clientServer2ClientServer() {
alterDatabase();
updateLocalMeta();
}

private void replication2Replication() {
int replicationFactorOld = getReplicationFactor(db.getReplicationProperties());
int replicationFactorNew = getReplicationFactor(replicationProperties);
int value = replicationFactorNew - replicationFactorOld;
// int replicationEndpoints = Math.abs(value);
if (value > 0) {
scaleOutReplication2Replication();
} else if (value < 0) {
scaleInReplication2Replication();
} else {
alterDatabase();
updateLocalMeta();
}
}

private void sharding2Sharding() {
int nodesOld = getNodes(db.getParameters());
int nodesNew = getNodes(parameters);
int value = nodesNew - nodesOld;
// int nodes = Math.abs(value);
if (value > 0) {
scaleOutSharding2Sharding();
} else if (value < 0) {
scaleInSharding2Sharding();
} else {
alterDatabase();
updateLocalMeta();
}
}

// ----------------------scale out----------------------

private void scaleOutClientServer2Replication() {
alterDatabase();
rewriteSql(true);
updateLocalMeta();
if (isTargetEndpoint(db)) {
Database db2 = copyDatabase();
RouterHolder.getRouter().replicate(db2, RunMode.CLIENT_SERVER, runMode, newHostIds);
}
}

private void scaleOutClientServer2Sharding() {
alterDatabase();
rewriteSql(false);
updateLocalMeta();
if (isTargetEndpoint(db)) {
Database db2 = copyDatabase();
RouterHolder.getRouter().sharding(db2, RunMode.CLIENT_SERVER, runMode, oldHostIds, newHostIds);
}
}

private void scaleOutReplication2Sharding() {
alterDatabase();
rewriteSql(false);
updateLocalMeta();
if (isTargetEndpoint(db)) {
Database db2 = copyDatabase();
RouterHolder.getRouter().sharding(db2, RunMode.REPLICATION, runMode, oldHostIds, newHostIds);
}
}

private void scaleOutReplication2Replication() {
alterDatabase();
rewriteSql(true);
updateLocalMeta();
if (isTargetEndpoint(db)) {
Database db2 = copyDatabase();
RouterHolder.getRouter().replicate(db2, RunMode.REPLICATION, runMode, newHostIds);
}
}

private void scaleOutSharding2Sharding() {
alterDatabase();
rewriteSql(false);
updateLocalMeta();
if (isTargetEndpoint(db)) {
Database db2 = copyDatabase();
RouterHolder.getRouter().sharding(db2, RunMode.SHARDING, runMode, oldHostIds, newHostIds);
}
}

// ----------------------scale in----------------------

private void scaleInReplication2ClientServer() {

}

private void scaleInSharding2ClientServer() {

}

private void scaleInSharding2Replication() {

}

private void scaleInReplication2Replication() {

}

private void scaleInSharding2Sharding() {

}

private static int getReplicationFactor(Map<String, String> replicationProperties) {
return getIntPropertyValue("replication_factor", replicationProperties);
}

private static int getNodes(Map<String, String> parameters) {
return getIntPropertyValue("nodes", parameters);
}

private static int getIntPropertyValue(String key, Map<String, String> properties) {
if (properties == null)
return 0;
String value = properties.get(key);
if (value == null)
return 0;
return Integer.parseInt(value);
}
}
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.test.runmode;

import org.junit.Test;

public class ClientServerToClientServerTest extends RunModeTest {

public ClientServerToClientServerTest() {
}

@Test
@Override
public void run() throws Exception {
String dbName = ClientServerToClientServerTest.class.getSimpleName();
executeUpdate("CREATE DATABASE IF NOT EXISTS " + dbName + " RUN MODE client_server");
executeUpdate("ALTER DATABASE " + dbName + " RUN MODE client_server PARAMETERS (QUERY_CACHE_SIZE=20)");
executeUpdate("ALTER DATABASE " + dbName + " PARAMETERS (OPTIMIZE_OR=false)");
}
}
Expand Up @@ -18,25 +18,24 @@
package org.lealone.test.runmode;

import org.junit.Test;
import org.lealone.db.LealoneDatabase;
import org.lealone.test.sql.SqlTestBase;

public class ClientServerModeToReplicationModeTest extends SqlTestBase {
public class ClientServerToReplicationTest extends RunModeTest {

public ClientServerModeToReplicationModeTest() {
super(LealoneDatabase.NAME); // 连到LealoneDatabase才能执行CREATE DATABASE
public ClientServerToReplicationTest() {
setHost("127.0.0.2");
}

@Test
@Override
public void run() throws Exception {
String dbName = ClientServerModeToReplicationModeTest.class.getSimpleName();
String dbName = ClientServerToReplicationTest.class.getSimpleName();
executeUpdate("CREATE DATABASE IF NOT EXISTS " + dbName + " RUN MODE client_server");

new CrudTest(dbName).runTest();

executeUpdate("ALTER DATABASE " + dbName //
+ " RUN MODE REPLICATION WITH REPLICATION STRATEGY (class: 'SimpleStrategy', replication_factor: 2)");
+ " RUN MODE replication WITH REPLICATION STRATEGY (class: 'SimpleStrategy', replication_factor: 2)");

// String p = " PARAMETERS(hostIds='1,2')";
// executeUpdate("CREATE DATABASE IF NOT EXISTS " + dbName + " RUN MODE sharding" + p);
Expand Down

0 comments on commit faed047

Please sign in to comment.