Skip to content

Commit

Permalink
优化page move流程,单节点时不需要prepareMove步骤,合并三种场景的代码处理逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Aug 30, 2018
1 parent b4f77a1 commit 4d5b5df
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 172 deletions.
270 changes: 119 additions & 151 deletions lealone-aose/src/main/java/org/lealone/aose/storage/btree/BTreeMap.java
Expand Up @@ -29,11 +29,9 @@
import org.lealone.aose.storage.btree.BTreePage.PageReference;
import org.lealone.common.exceptions.DbException;
import org.lealone.common.util.DataUtils;
import org.lealone.common.util.New;
import org.lealone.common.util.StringUtils;
import org.lealone.db.DataBuffer;
import org.lealone.db.Database;
import org.lealone.db.LealoneDatabase;
import org.lealone.db.RunMode;
import org.lealone.db.Session;
import org.lealone.db.value.ValueLong;
Expand Down Expand Up @@ -174,7 +172,7 @@ public synchronized V put(K key, V value) {

Object result = put(p, key, value);
if (split && isShardingMode && root.isLeaf()) {
moveLeafPage(p.getKey(0), p.getChildPage(1));
moveLeafPage(p.getKey(0), p.getChildPage(1), p, 1);
}

newRoot(p);
Expand Down Expand Up @@ -284,7 +282,7 @@ private Object put(BTreePage p, Object key, Object value) {
// now we are not sure where to add
Object result = put(p, key, value);
if (isLeaf && isShardingMode) {
moveLeafPage(k, rightChildPage);
moveLeafPage(k, rightChildPage, p, index);
}
return result;
}
Expand Down Expand Up @@ -312,67 +310,122 @@ private synchronized BTreePage setLeafPageMovePlan(Object splitKey, LeafPageMove
return page;
}

private void moveLeafPage(final Object splitKey, final BTreePage oldRightChildPage) {
private void moveLeafPage(Object keyObejct, BTreePage oldRightChildPage, BTreePage parent, int index) {
ByteBuffer keyBuffer;
try (DataBuffer buff = DataBuffer.create()) {
getKeyType().write(buff, keyObejct);
keyBuffer = buff.getAndCopyBuffer();
}
String[] oldEndpoints;
if (oldRightChildPage.replicationHostIds == null) {
oldEndpoints = new String[0];
} else {
oldEndpoints = new String[oldRightChildPage.replicationHostIds.size()];
oldRightChildPage.replicationHostIds.toArray(oldEndpoints);
}

AOStorageService.addTask(() -> {
Set<NetEndpoint> candidateEndpoints = getCandidateEndpoints();
List<NetEndpoint> oldReplicationEndpoints = getReplicationEndpoints(oldRightChildPage);
List<NetEndpoint> newReplicationEndpoints = P2pServer.instance.getReplicationEndpoints(db,
new HashSet<>(oldReplicationEndpoints), candidateEndpoints);
replicateOrMovePage(keyObejct, keyBuffer, oldRightChildPage, parent, index, true, oldEndpoints, false);
return null;
});
}

Session session = db.createInternalSession();
// 处理三种场景:
// 1. 从client_server模式转到sharding模式
// 2. 从replication模式转到sharding模式
// 3. 在sharding模式下发生page split时需要移动右边的page
//
// 前两种场景在移动page时所选定的目标节点可以是原来的节点,后一种不可以。
// 除此之外,这三者并没有多大差异,只是oldEndpoints中包含的节点个数多少的问题,
// client_server模式只有一个节点,在replication模式下,如果副本个数是1,那么也相当于client_server模式。
private void replicateOrMovePage(Object keyObejct, ByteBuffer keyBuffer, BTreePage p, BTreePage parent, int index,
boolean last, String[] oldEndpoints, boolean replicate) {
if (oldEndpoints == null || oldEndpoints.length == 0) {
DbException.throwInternalError("oldEndpoints is null");
}
Set<NetEndpoint> candidateEndpoints = getCandidateEndpoints();
List<NetEndpoint> oldReplicationEndpoints = getReplicationEndpoints(oldEndpoints);
Set<NetEndpoint> oldEndpointSet;
if (replicate) {
// 允许选择原来的节点,所以用new HashSet<>(0)替代new HashSet<>(oldReplicationEndpoints)
oldEndpointSet = new HashSet<>(0);
} else {
oldEndpointSet = new HashSet<>(oldReplicationEndpoints);
}

LeafPageMovePlan leafPageMovePlan = null;
if (!oldReplicationEndpoints.isEmpty()) {
ReplicationSession rs = P2pRouter.createReplicationSession(session, oldReplicationEndpoints);
try (DataBuffer k = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) {
ByteBuffer keyBuffer = k.write(keyType, splitKey);
LeafPageMovePlan plan = new LeafPageMovePlan(P2pServer.instance.getLocalHostId(),
newReplicationEndpoints, keyBuffer);
leafPageMovePlan = c.prepareMoveLeafPage(getName(), plan);
}
List<NetEndpoint> newReplicationEndpoints = P2pServer.instance.getReplicationEndpoints(db, oldEndpointSet,
candidateEndpoints);

Session session = db.createInternalSession();
LeafPageMovePlan leafPageMovePlan = null;

if (oldEndpoints.length == 1) {
leafPageMovePlan = new LeafPageMovePlan(oldEndpoints[0], newReplicationEndpoints, keyBuffer);
p.leafPageMovePlan = leafPageMovePlan;
} else {
ReplicationSession rs = P2pRouter.createReplicationSession(session, oldReplicationEndpoints);
try (DataBuffer k = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) {
ByteBuffer key = k.write(keyType, keyObejct);
LeafPageMovePlan plan = new LeafPageMovePlan(P2pServer.instance.getLocalHostId(),
newReplicationEndpoints, key);
leafPageMovePlan = c.prepareMoveLeafPage(getName(), plan);
}

if (leafPageMovePlan == null)
return null;
return;

// 重新按splitKey找到rightChildPage,因为经过前面的操作后,
// 可能rightChildPage已经有新数据了,如果只移动老的,会丢失数据
BTreePage rightChildPage = setLeafPageMovePlan(splitKey, leafPageMovePlan);
// 重新按key找到page,因为经过前面的操作后,
// 可能page已经有新数据了,如果只移动老的,会丢失数据
p = setLeafPageMovePlan(keyObejct, leafPageMovePlan);

if (!leafPageMovePlan.moverHostId.equals(P2pServer.instance.getLocalHostId())) {
rightChildPage.replicationHostIds = leafPageMovePlan.getReplicationEndpoints();
return null;
p.replicationHostIds = leafPageMovePlan.getReplicationEndpoints();
return;
}
}

Set<NetEndpoint> otherEndpoints = new HashSet<>(candidateEndpoints);
otherEndpoints.removeAll(oldReplicationEndpoints);
otherEndpoints.removeAll(newReplicationEndpoints);
p.replicationHostIds = toHostIds(newReplicationEndpoints);
NetEndpoint localEndpoint = getLocalEndpoint();

Set<NetEndpoint> otherEndpoints = new HashSet<>(candidateEndpoints);
otherEndpoints.removeAll(newReplicationEndpoints);
if (!replicate) {
otherEndpoints.removeAll(oldReplicationEndpoints);
newReplicationEndpoints.removeAll(oldReplicationEndpoints);
}

// 移动右边的leafPage到新的复制节点(Page中包含数据)
if (!newReplicationEndpoints.isEmpty()) {
rightChildPage.replicationHostIds = New.arrayList();
ReplicationSession rs = P2pRouter.createReplicationSession(session, newReplicationEndpoints,
rightChildPage.replicationHostIds, true);
moveLeafPage(leafPageMovePlan, rightChildPage, rs, false, true, true);
}
if (newReplicationEndpoints.contains(localEndpoint)) {
newReplicationEndpoints.remove(localEndpoint);
}

// 移动右边的leafPage到其他节点(Page中不包含数据,只包含这个Page各数据副本所在节点信息)
if (!otherEndpoints.isEmpty()) {
ReplicationSession rs = P2pRouter.createReplicationSession(session, otherEndpoints, true);
moveLeafPage(leafPageMovePlan, rightChildPage, rs, true, true, true);
}
// 移动page到新的复制节点(page中包含数据)
if (!newReplicationEndpoints.isEmpty()) {
ReplicationSession rs = P2pRouter.createReplicationSession(session, newReplicationEndpoints, true);
moveLeafPage(leafPageMovePlan, p, rs, false, last, !replicate);
}

return null;
});
// 当前节点已经不是副本所在节点
if (replicate && otherEndpoints.contains(localEndpoint)) {
otherEndpoints.remove(localEndpoint);
PageReference r = new PageReference(null, -1, 0);
r.key = parent.getKey(index);
r.last = index == parent.getKeyCount() - 1;
r.remoteHostIds = p.replicationHostIds;
parent.setChild(index, r);
}

// 移动page到其他节点(page中不包含数据,只包含这个page各数据副本所在节点信息)
if (!otherEndpoints.isEmpty()) {
ReplicationSession rs = P2pRouter.createReplicationSession(session, otherEndpoints, true);
moveLeafPage(leafPageMovePlan, p, rs, true, last, !replicate);
}
}

private void moveLeafPage(LeafPageMovePlan leafPageMovePlan, BTreePage rightChildPage, ReplicationSession rs,
boolean remote, boolean last, boolean addPage) {
try (DataBuffer p = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) {
rightChildPage.writeLeaf(p, remote);
ByteBuffer pageBuffer = p.getAndFlipBuffer();
private void moveLeafPage(LeafPageMovePlan leafPageMovePlan, BTreePage page, ReplicationSession rs, boolean remote,
boolean last, boolean addPage) {
try (DataBuffer buff = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) {
page.writeLeaf(buff, remote);
ByteBuffer pageBuffer = buff.getAndFlipBuffer();
c.moveLeafPage(getName(), leafPageMovePlan.splitKey, pageBuffer, last, addPage);
}
}
Expand Down Expand Up @@ -758,28 +811,26 @@ public synchronized void addLeafPage(ByteBuffer splitKey, ByteBuffer page, boole
} else {
BTreePage parent = p;
int index = 0;
while (true) {
while (p.isNode()) {
parent = p;
index = p.binarySearch(k);
if (!p.isLeaf()) {
if (index < 0) {
index = -index - 1;
} else {
if (last)
index++;
}
parent = p;
PageReference r = p.getChildPageReference(index);
if (r.isRemotePage()) {
break;
}
p = p.getChildPage(index);
continue;
if (index < 0) {
index = -index - 1;
} else {
if (last)
index++;
}
PageReference r = p.getChildPageReference(index);
if (r.isRemotePage()) {
break;
}
break;
p = p.getChildPage(index);
}
BTreePage right = BTreePage.readLeafPage(this, page);
if (addPage) {
parent.insertNode(index, k, right);
BTreePage left = parent.getChildPage(index);
parent.setChild(index, right);
parent.insertNode(index, k, left);
} else {
parent.setChild(index, right);
}
Expand Down Expand Up @@ -867,7 +918,7 @@ private List<NetEndpoint> getReplicationEndpoints(BTreePage p) {
return getReplicationEndpoints(p.replicationHostIds);
}

private List<NetEndpoint> getReplicationEndpoints(String[] replicationHostIds) {
List<NetEndpoint> getReplicationEndpoints(String[] replicationHostIds) {
return getReplicationEndpoints(Arrays.asList(replicationHostIds));
}

Expand Down Expand Up @@ -981,7 +1032,7 @@ public ByteBuffer readPage(ByteBuffer key, boolean last) {
}
BTreePage parent = p;
int index = 0;
while (!p.isLeaf()) {
while (p.isNode()) {
index = p.binarySearch(k);
if (index < 0) {
index = -index - 1;
Expand Down Expand Up @@ -1016,83 +1067,11 @@ private ByteBuffer replicateOrMovePage(Object keyObejct, ByteBuffer keyBuffer, B
return replicatePage(p);
}

if (oldEndpoints == null || oldEndpoints.length == 0) {
DbException.throwInternalError("oldEndpoints is null");
}

// 以下处理从client_server或replication模式到sharding模式的场景
// ---------------------------------------------------------------
Set<NetEndpoint> candidateEndpoints = getCandidateEndpoints();
List<NetEndpoint> oldReplicationEndpoints = getReplicationEndpoints(oldEndpoints);

// 允许选择原来的节点,所以用new HashSet<>(0)替代new HashSet<>(oldReplicationEndpoints)
List<NetEndpoint> newReplicationEndpoints = P2pServer.instance.getReplicationEndpoints(db, new HashSet<>(0),
candidateEndpoints);

Session session = db.createInternalSession();
LeafPageMovePlan leafPageMovePlan = null;

// 从client_server模式到sharding模式
if (oldEndpoints.length == 1) {
leafPageMovePlan = new LeafPageMovePlan(oldEndpoints[0], newReplicationEndpoints, keyBuffer);
p.leafPageMovePlan = leafPageMovePlan;
} else {
// 从replication模式到sharding模式
if (!oldReplicationEndpoints.isEmpty()) {
ReplicationSession rs = P2pRouter.createReplicationSession(session, oldReplicationEndpoints);
try (DataBuffer k = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) {
ByteBuffer key = k.write(keyType, keyObejct);
LeafPageMovePlan plan = new LeafPageMovePlan(P2pServer.instance.getLocalHostId(),
newReplicationEndpoints, key);
leafPageMovePlan = c.prepareMoveLeafPage(getName(), plan);
}
}

if (leafPageMovePlan == null)
return null;

// 重新按key找到page,因为经过前面的操作后,
// 可能page已经有新数据了,如果只移动老的,会丢失数据
p = setLeafPageMovePlan(keyObejct, leafPageMovePlan);

if (!leafPageMovePlan.moverHostId.equals(P2pServer.instance.getLocalHostId())) {
p.replicationHostIds = leafPageMovePlan.getReplicationEndpoints();
return null;
}
}

p.replicationHostIds = toHostIds(newReplicationEndpoints);
NetEndpoint localEndpoint = getLocalEndpoint();

Set<NetEndpoint> otherEndpoints = new HashSet<>(candidateEndpoints);
otherEndpoints.removeAll(newReplicationEndpoints);
replicateOrMovePage(keyObejct, keyBuffer, p, parent, index, last, oldEndpoints, true);

if (newReplicationEndpoints.contains(localEndpoint)) {
newReplicationEndpoints.remove(localEndpoint);
}

// 移动page到新的复制节点(page中包含数据)
if (!newReplicationEndpoints.isEmpty()) {
ReplicationSession rs = P2pRouter.createReplicationSession(session, newReplicationEndpoints, true);
moveLeafPage(leafPageMovePlan, p, rs, false, last, false);
}

// 先复制,有可能当前节点不在选中的newReplicationEndpoints里面
ByteBuffer replicatedPage = replicatePage(p);

// 当前节点已经不是副本所在节点
if (otherEndpoints.contains(localEndpoint)) {
otherEndpoints.remove(localEndpoint);
parent.setChild(index, new PageReference(null, -1, 0));
}

// 移动page到其他节点(page中不包含数据,只包含这个page各数据副本所在节点信息)
if (!otherEndpoints.isEmpty()) {
ReplicationSession rs = P2pRouter.createReplicationSession(session, otherEndpoints, true);
moveLeafPage(leafPageMovePlan, p, rs, true, last, false);
}

return replicatedPage;
return replicatePage(p);
}

private static List<String> toHostIds(List<NetEndpoint> endpoints) {
Expand All @@ -1105,17 +1084,6 @@ private static List<String> toHostIds(List<NetEndpoint> endpoints) {
return hostIds;
}

BTreePage readRemotePage(PageReference ref, final String hostId) {
List<NetEndpoint> replicationEndpoints = getReplicationEndpoints(new String[] { hostId });
Session session = LealoneDatabase.getInstance().createInternalSession();
ReplicationSession rs = P2pRouter.createReplicationSession(session, replicationEndpoints);
try (DataBuffer buff = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) {
ByteBuffer keyBuffer = buff.write(keyType, ref.key);
ByteBuffer page = c.readRemotePage(getName(), keyBuffer, ref.last);
return BTreePage.readReplicatedPage(this, page);
}
}

@Override
public synchronized void setRootPage(ByteBuffer buff) {
root = BTreePage.readReplicatedPage(this, buff);
Expand Down

0 comments on commit 4d5b5df

Please sign in to comment.