Skip to content

Commit

Permalink
支持全部总共10种场景下的扩容缩容
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Sep 3, 2018
1 parent faed047 commit 8be356c
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 34 deletions.
10 changes: 10 additions & 0 deletions lealone-aose/src/main/java/org/lealone/aose/router/P2pRouter.java
Expand Up @@ -317,4 +317,14 @@ public String[] getShardingEndpoints(Database db) {
nodes -= db.getHostIds().length;
return getHostIds(list, size, nodes);
}

@Override
public void scaleIn(Database db, RunMode oldRunMode, RunMode newRunMode, String[] oldEndpoints,
String[] newEndpoints) {
new Thread(() -> {
for (Storage storage : db.getStorages()) {
storage.scaleIn(db, oldRunMode, newRunMode, oldEndpoints, newEndpoints);
}
}, "ScaleIn Endpoints").start();
}
}
Expand Up @@ -126,4 +126,10 @@ public void sharding(Database db, RunMode oldRunMode, RunMode newRunMode, String
public String[] getShardingEndpoints(Database db) {
return nestedRouter.getShardingEndpoints(db);
}

@Override
public void scaleIn(Database db, RunMode oldRunMode, RunMode newRunMode, String[] oldEndpoints,
String[] newEndpoints) {
nestedRouter.scaleIn(db, oldRunMode, newRunMode, oldEndpoints, newEndpoints);
}
}
19 changes: 19 additions & 0 deletions lealone-aose/src/main/java/org/lealone/aose/storage/AOStorage.java
Expand Up @@ -259,4 +259,23 @@ public void save() {
super.save();
}

@Override
public void scaleIn(Object dbObject, RunMode oldRunMode, RunMode newRunMode, String[] oldEndpoints,
String[] newEndpoints) {
for (StorageMap<?, ?> map : maps.values()) {
Database db = (Database) dbObject;
map = map.getRawMap();
if (map instanceof BTreeMap) {
BTreeMap<?, ?> btreeMap = (BTreeMap<?, ?>) map;
btreeMap.setOldEndpoints(oldEndpoints);
btreeMap.setDatabase(db);
btreeMap.setRunMode(newRunMode);
if (oldEndpoints == null) {
btreeMap.replicateAllRemotePages();
} else {
btreeMap.moveAllLocalLeafPages(oldEndpoints, newEndpoints);
}
}
}
}
}
Expand Up @@ -302,7 +302,10 @@ private Object putLocal(BTreePage p, Object key, Object value) {
}

private Set<NetEndpoint> getCandidateEndpoints() {
String[] hostIds = db.getHostIds();
return getCandidateEndpoints(db.getHostIds());
}

static Set<NetEndpoint> getCandidateEndpoints(String[] hostIds) {
Set<NetEndpoint> candidateEndpoints = new HashSet<>(hostIds.length);
TopologyMetaData metaData = P2pServer.instance.getTopologyMetaData();
for (String hostId : hostIds) {
Expand Down Expand Up @@ -373,10 +376,16 @@ private void moveLeafPage(Object keyObejct) {
// client_server模式只有一个节点,在replication模式下,如果副本个数是1,那么也相当于client_server模式。
private void replicateOrMovePage(Object keyObejct, ByteBuffer keyBuffer, BTreePage p, BTreePage parent, int index,
boolean last, String[] oldEndpoints, boolean replicate) {
Set<NetEndpoint> candidateEndpoints = getCandidateEndpoints();
replicateOrMovePage(keyObejct, keyBuffer, p, parent, index, last, oldEndpoints, replicate, candidateEndpoints);
}

void replicateOrMovePage(Object keyObejct, ByteBuffer keyBuffer, BTreePage p, BTreePage parent, int index,
boolean last, String[] oldEndpoints, boolean replicate, Set<NetEndpoint> candidateEndpoints) {
if (oldEndpoints == null || oldEndpoints.length == 0) {
DbException.throwInternalError("oldEndpoints is null");
}
Set<NetEndpoint> candidateEndpoints = getCandidateEndpoints();

List<NetEndpoint> oldReplicationEndpoints = getReplicationEndpoints(oldEndpoints);
Set<NetEndpoint> oldEndpointSet;
if (replicate) {
Expand Down Expand Up @@ -438,7 +447,7 @@ private void replicateOrMovePage(Object keyObejct, ByteBuffer keyBuffer, BTreePa
}

// 当前节点已经不是副本所在节点
if (replicate && otherEndpoints.contains(localEndpoint)) {
if (parent != null && replicate && otherEndpoints.contains(localEndpoint)) {
otherEndpoints.remove(localEndpoint);
PageReference r = new PageReference(null, -1, 0);
r.key = parent.getKey(index);
Expand Down Expand Up @@ -827,6 +836,10 @@ public void transferFrom(ReadableByteChannel src, long position, long count) thr

@Override
public synchronized void addLeafPage(ByteBuffer splitKey, ByteBuffer page, boolean last, boolean addPage) {
if (splitKey == null) {
root = BTreePage.readLeafPage(this, page);
return;
}
BTreePage p = root;
Object k = keyType.read(splitKey);
if (p.isLeaf()) {
Expand Down Expand Up @@ -951,11 +964,11 @@ private List<NetEndpoint> getReplicationEndpoints(BTreePage p) {
return getReplicationEndpoints(p.replicationHostIds);
}

List<NetEndpoint> getReplicationEndpoints(String[] replicationHostIds) {
static List<NetEndpoint> getReplicationEndpoints(String[] replicationHostIds) {
return getReplicationEndpoints(Arrays.asList(replicationHostIds));
}

private List<NetEndpoint> getReplicationEndpoints(List<String> replicationHostIds) {
static List<NetEndpoint> getReplicationEndpoints(List<String> replicationHostIds) {
int size = replicationHostIds.size();
List<NetEndpoint> replicationEndpoints = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -1061,7 +1074,7 @@ public ByteBuffer readPage(ByteBuffer key, boolean last) {
Object k = keyType.read(key);
key.flip();
if (p.isLeaf()) {
throw DbException.throwInternalError("readPage: key=" + key + ", last=" + last);
throw DbException.throwInternalError("readPage: key=" + k + ", last=" + last);
}
BTreePage parent = p;
int index = 0;
Expand Down Expand Up @@ -1120,8 +1133,17 @@ private static List<String> toHostIds(List<NetEndpoint> endpoints) {
@Override
public synchronized void setRootPage(ByteBuffer buff) {
root = BTreePage.readReplicatedPage(this, buff);
if (!root.isLeaf() && !getName().endsWith("_0")) { // 只异步读非SYS表
if (root.isNode() && !getName().endsWith("_0")) { // 只异步读非SYS表
root.readRemotePages();
}
}

public void replicateAllRemotePages() {
root.readRemotePagesRecursive();
}

public void moveAllLocalLeafPages(String[] oldEndpoints, String[] newEndpoints) {
root.moveAllLocalLeafPages(oldEndpoints, newEndpoints);
}

}
Expand Up @@ -12,6 +12,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;

import org.lealone.aose.router.P2pRouter;
Expand Down Expand Up @@ -1135,7 +1136,7 @@ synchronized BTreePage readRemotePage(BTreeMap<Object, Object> map) {

// TODO 支持多节点容错
String remoteHostId = remoteHostIds.get(0);
List<NetEndpoint> replicationEndpoints = map.getReplicationEndpoints(new String[] { remoteHostId });
List<NetEndpoint> replicationEndpoints = BTreeMap.getReplicationEndpoints(new String[] { remoteHostId });
Session session = LealoneDatabase.getInstance().createInternalSession();
ReplicationSession rs = P2pRouter.createReplicationSession(session, replicationEndpoints);
try (DataBuffer buff = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) {
Expand Down Expand Up @@ -1242,9 +1243,74 @@ public BTreePage call() throws Exception {
}
}

void readRemotePagesRecursive() {
if (isNode()) {
for (int i = 0, length = children.length; i < length; i++) {
if (children[i].isRemotePage()) {
final int index = i;
Callable<BTreePage> task = new Callable<BTreePage>() {
@Override
public BTreePage call() throws Exception {
BTreePage p = getChildPage(index);
if (p.isNode()) {
p.readRemotePagesRecursive();
}
return p;
}
};
AOStorageService.submitTask(task);
} else if (children[i].page != null && children[i].page.isNode()) {
readRemotePagesRecursive();
}
}
}
}

void moveAllLocalLeafPages(String[] oldEndpoints, String[] newEndpoints) {
Set<NetEndpoint> candidateEndpoints = BTreeMap.getCandidateEndpoints(newEndpoints);
if (isNode()) {
for (int i = 0, length = keys.length; i <= length; i++) {
if (!children[i].isRemotePage()) {
BTreePage p = getChildPage(i);
if (p.isNode()) {
p.moveAllLocalLeafPages(oldEndpoints, newEndpoints);
} else {
Object keyObejct = i == length ? keys[i - 1] : keys[i];
ByteBuffer keyBuffer;
try (DataBuffer buff = DataBuffer.create()) {
map.getKeyType().write(buff, keyObejct);
keyBuffer = buff.getAndCopyBuffer();
}
if (p.replicationHostIds == null) {
oldEndpoints = new String[0];
} else {
oldEndpoints = new String[p.replicationHostIds.size()];
p.replicationHostIds.toArray(oldEndpoints);
}
map.replicateOrMovePage(keyObejct, keyBuffer, p, this, i, true, oldEndpoints, false,
candidateEndpoints);
}
}
}
} else {
map.replicateOrMovePage(null, null, this, null, 0, true, oldEndpoints, false, candidateEndpoints);
// moveLocalLeafPage(oldEndpoints, newEndpoints);
}
}

// void moveLocalLeafPage(String[] removeEndpoints, String[] newEndpoints) {
// Set<NetEndpoint> candidateEndpoints = BTreeMap.getCandidateEndpoints(newEndpoints);
// List<NetEndpoint> oldReplicationEndpoints = BTreeMap.getReplicationEndpoints(replicationHostIds);
// Set<NetEndpoint> removeEndpointSet = new HashSet<>(BTreeMap.getReplicationEndpoints(removeEndpoints));
//
// List<NetEndpoint> newReplicationEndpoints = P2pServer.instance.getReplicationEndpoints(map.db, new HashSet<>(0),
// candidateEndpoints);
//
// }

void replicatePage(DataBuffer buff, NetEndpoint localEndpoint) {
BTreePage p = copyOnly();
boolean isNode = !isLeaf();
boolean isNode = isNode();
if (isNode) {
int len = children.length;
p.children = new PageReference[len];
Expand Down
Expand Up @@ -293,7 +293,7 @@ public void moveLeafPage(final String mapName, final ByteBuffer splitKey, final
Runnable command = new Runnable() {
@Override
public void run() {
c.moveLeafPage(mapName, splitKey.slice(), page.slice(), last, addPage);
c.moveLeafPage(mapName, splitKey == null ? null : splitKey.slice(), page.slice(), last, addPage);
}
};
futures.add(ThreadPool.executor.submit(command));
Expand Down
5 changes: 5 additions & 0 deletions lealone-common/src/main/java/org/lealone/storage/Storage.java
Expand Up @@ -57,4 +57,9 @@ public default void drop() {
}

StorageMap<?, ?> getMap(String name);

public default void scaleIn(Object dbObject, RunMode oldRunMode, RunMode newRunMode, String[] oldEndpoints,
String[] newEndpoints) {
throw DbException.getUnsupportedException("scaleIn");
}
}

0 comments on commit 8be356c

Please sign in to comment.