Skip to content

Commit

Permalink
支持在不停机的情况下通过手工执行ALTER DATABASE语句从单机或复制模式转换到Sharding模式
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Aug 27, 2018
1 parent 32562a2 commit 58e29b2
Show file tree
Hide file tree
Showing 18 changed files with 517 additions and 55 deletions.
35 changes: 32 additions & 3 deletions lealone-aose/src/main/java/org/lealone/aose/router/P2pRouter.java
Expand Up @@ -130,7 +130,6 @@ public String[] getHostIds(Database db) {
Set<NetEndpoint> liveMembers = Gossiper.instance.getLiveMembers(); Set<NetEndpoint> liveMembers = Gossiper.instance.getLiveMembers();
ArrayList<NetEndpoint> list = new ArrayList<>(liveMembers); ArrayList<NetEndpoint> list = new ArrayList<>(liveMembers);
int size = liveMembers.size(); int size = liveMembers.size();
size = liveMembers.size();
if (runMode == RunMode.CLIENT_SERVER) { if (runMode == RunMode.CLIENT_SERVER) {
int i = random.nextInt(size); int i = random.nextInt(size);
NetEndpoint addr = list.get(i); NetEndpoint addr = list.get(i);
Expand Down Expand Up @@ -260,7 +259,7 @@ private static ReplicationSession createReplicationSession(ServerSession s, Sess
public void replicate(Database db, RunMode oldRunMode, RunMode newRunMode, String[] newReplicationEndpoints) { public void replicate(Database db, RunMode oldRunMode, RunMode newRunMode, String[] newReplicationEndpoints) {
new Thread(() -> { new Thread(() -> {
for (Storage storage : db.getStorages()) { for (Storage storage : db.getStorages()) {
storage.replicate(newReplicationEndpoints, newRunMode); storage.replicate(db, newReplicationEndpoints, newRunMode);
} }
}, "Replicate Pages").start(); }, "Replicate Pages").start();
} }
Expand All @@ -284,7 +283,37 @@ public String[] getReplicationEndpoints(Database db) {
if (hostId != null) if (hostId != null)
hostIds[j++] = hostId; hostIds[j++] = hostId;
} }

return hostIds; return hostIds;
} }

@Override
public void sharding(Database db, RunMode oldRunMode, RunMode newRunMode, String[] oldEndpoints,
String[] newEndpoints) {
new Thread(() -> {
for (Storage storage : db.getStorages()) {
storage.sharding(db, oldEndpoints, newEndpoints, newRunMode);
}
}, "Sharding Pages").start();
}

@Override
public String[] getShardingEndpoints(Database db) {
HashSet<NetEndpoint> oldEndpoints = new HashSet<>();
for (String hostId : db.getHostIds()) {
oldEndpoints.add(P2pServer.instance.getTopologyMetaData().getEndpointForHostId(hostId));
}
Set<NetEndpoint> liveMembers = Gossiper.instance.getLiveMembers();
liveMembers.removeAll(oldEndpoints);
ArrayList<NetEndpoint> list = new ArrayList<>(liveMembers);
int size = liveMembers.size();
AbstractReplicationStrategy replicationStrategy = ClusterMetaData.getReplicationStrategy(db);
int replicationFactor = replicationStrategy.getReplicationFactor();
Map<String, String> parameters = db.getParameters();
int nodes = replicationFactor + 2;
if (parameters != null && parameters.containsKey("nodes")) {
nodes = Integer.parseInt(parameters.get("nodes"));
}
nodes -= db.getHostIds().length;
return getHostIds(list, size, nodes);
}
} }
Expand Up @@ -115,4 +115,15 @@ public void replicate(Database db, RunMode oldRunMode, RunMode newRunMode, Strin
public String[] getReplicationEndpoints(Database db) { public String[] getReplicationEndpoints(Database db) {
return nestedRouter.getReplicationEndpoints(db); return nestedRouter.getReplicationEndpoints(db);
} }

@Override
public void sharding(Database db, RunMode oldRunMode, RunMode newRunMode, String[] oldEndpoints,
String[] newEndpoints) {
nestedRouter.sharding(db, oldRunMode, newRunMode, oldEndpoints, newEndpoints);
}

@Override
public String[] getShardingEndpoints(Database db) {
return nestedRouter.getShardingEndpoints(db);
}
} }
36 changes: 30 additions & 6 deletions lealone-aose/src/main/java/org/lealone/aose/storage/AOStorage.java
Expand Up @@ -197,34 +197,52 @@ private List<NetEndpoint> getReplicationEndpoints(List<String> replicationHostId
} }


@Override @Override
public void replicate(String[] targetEndpoints, RunMode runMode) { public void replicate(Object dbObject, String[] newReplicationEndpoints, RunMode runMode) {
replicateRootPages(dbObject, null, newReplicationEndpoints, runMode);
}

@Override
public void sharding(Object dbObject, String[] oldEndpoints, String[] newEndpoints, RunMode runMode) {
replicateRootPages(dbObject, oldEndpoints, newEndpoints, runMode);
}

private void replicateRootPages(Object dbObject, String[] oldEndpoints, String[] targetEndpoints, RunMode runMode) {
AOStorageService.forceMerge();

List<NetEndpoint> replicationEndpoints = getReplicationEndpoints(targetEndpoints); List<NetEndpoint> replicationEndpoints = getReplicationEndpoints(targetEndpoints);
// 用最高权限的用户移动页面,因为目标节点上可能还没有对应的数据库 // 用最高权限的用户移动页面,因为目标节点上可能还没有对应的数据库
Session session = LealoneDatabase.getInstance().createInternalSession(); Session session = LealoneDatabase.getInstance().createInternalSession();
ReplicationSession rs = P2pRouter.createReplicationSession(session, replicationEndpoints); ReplicationSession rs = P2pRouter.createReplicationSession(session, replicationEndpoints);
Database db = (Database) config.get("db"); Database db = (Database) dbObject;
int id = db.getId(); int id = db.getId();
String sysMapName = "t_" + id + "_0"; String sysMapName = "t_" + id + "_0";
try (DataBuffer p = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) { try (DataBuffer p = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) {
HashMap<String, StorageMap<?, ?>> maps = new HashMap<>(this.maps); HashMap<String, StorageMap<?, ?>> maps = new HashMap<>(this.maps);
Collection<StorageMap<?, ?>> values = maps.values(); Collection<StorageMap<?, ?>> values = maps.values();
p.putInt(values.size()); p.putInt(values.size());
// SYS表放在前面,并且总是使用CLIENT_SERVER模式
StorageMap<?, ?> sysMap = maps.remove(sysMapName); StorageMap<?, ?> sysMap = maps.remove(sysMapName);
replicateMap(sysMap, p); replicateRootPage(db, sysMap, p, oldEndpoints, RunMode.CLIENT_SERVER);
for (StorageMap<?, ?> map : values) { for (StorageMap<?, ?> map : values) {
replicateMap(map, p); replicateRootPage(db, map, p, oldEndpoints, runMode);
} }
ByteBuffer pageBuffer = p.getAndFlipBuffer(); ByteBuffer pageBuffer = p.getAndFlipBuffer();
c.movePage(db.getShortName(), "", pageBuffer); c.movePage(db.getShortName(), "", pageBuffer);
} }
} }


private void replicateMap(StorageMap<?, ?> map, DataBuffer p) { private void replicateRootPage(Database db, StorageMap<?, ?> map, DataBuffer p, String[] oldEndpoints,
RunMode runMode) {
map = map.getRawMap(); map = map.getRawMap();
if (map instanceof BTreeMap) { if (map instanceof BTreeMap) {
String mapName = map.getName(); String mapName = map.getName();
ValueString.type.write(p, mapName); ValueString.type.write(p, mapName);
((BTreeMap<?, ?>) map).replicateRootPage(p);
BTreeMap<?, ?> btreeMap = (BTreeMap<?, ?>) map;
btreeMap.setOldEndpoints(oldEndpoints);
btreeMap.setDatabase(db);
btreeMap.setRunMode(runMode);
btreeMap.replicateRootPage(p);
} }
} }


Expand All @@ -235,4 +253,10 @@ public void drop() {
FileUtils.deleteRecursive(storageName, false); FileUtils.deleteRecursive(storageName, false);
} }


@Override
public void save() {
AOStorageService.forceMerge();
super.save();
}

} }
Expand Up @@ -108,13 +108,28 @@ else if (map.getWritePercent() > 50)
} }


private static void merge() { private static void merge() {
for (BufferedMap<?, ?> map : bufferedMaps) { synchronized (bufferedMaps) {
if (map.getRawMap().isClosed()) { for (BufferedMap<?, ?> map : bufferedMaps) {
bufferedMaps.remove(map); if (map.getRawMap().isClosed()) {
continue; bufferedMaps.remove(map);
continue;
}
if (map.needMerge())
addTask(map);
}
}
}

static void forceMerge() {
synchronized (bufferedMaps) {
for (BufferedMap<?, ?> map : bufferedMaps) {
if (map.getRawMap().isClosed()) {
bufferedMaps.remove(map);
continue;
}
if (map.needMerge())
map.merge();
} }
if (map.needMerge())
addTask(map);
} }
} }
} }
Expand Up @@ -34,6 +34,7 @@
import org.lealone.db.DataBuffer; import org.lealone.db.DataBuffer;
import org.lealone.db.Database; import org.lealone.db.Database;
import org.lealone.db.LealoneDatabase; import org.lealone.db.LealoneDatabase;
import org.lealone.db.RunMode;
import org.lealone.db.Session; import org.lealone.db.Session;
import org.lealone.db.value.ValueLong; import org.lealone.db.value.ValueLong;
import org.lealone.net.NetEndpoint; import org.lealone.net.NetEndpoint;
Expand Down Expand Up @@ -81,13 +82,16 @@ public BTreeMap<K, V> openMap() {
protected final Map<String, Object> config; protected final Map<String, Object> config;
protected final BTreeStorage storage; protected final BTreeStorage storage;
protected final AOStorage aoStorage; protected final AOStorage aoStorage;
protected final Database db; protected Database db;


/** /**
* The current root page (may not be null). * The current root page (may not be null).
*/ */
protected volatile BTreePage root; protected volatile BTreePage root;


private RunMode runMode;
private String[] oldEndpoints;

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected BTreeMap(String name, StorageDataType keyType, StorageDataType valueType, Map<String, Object> config, protected BTreeMap(String name, StorageDataType keyType, StorageDataType valueType, Map<String, Object> config,
AOStorage aoStorage) { AOStorage aoStorage) {
Expand Down Expand Up @@ -232,7 +236,7 @@ private Object put(BTreePage p, Object key, Object value) {
replicationEndpoints.addAll(p.leafPageMovePlan.replicationEndpoints); replicationEndpoints.addAll(p.leafPageMovePlan.replicationEndpoints);
containsLocalEndpoint = replicationEndpoints.remove(getLocalEndpoint()); containsLocalEndpoint = replicationEndpoints.remove(getLocalEndpoint());


ReplicationSession rs = P2pRouter.createReplicationSession(db.getLastSession(), ReplicationSession rs = P2pRouter.createReplicationSession(db.createInternalSession(),
replicationEndpoints); replicationEndpoints);
try (DataBuffer k = DataBuffer.create(); try (DataBuffer k = DataBuffer.create();
DataBuffer v = DataBuffer.create(); DataBuffer v = DataBuffer.create();
Expand Down Expand Up @@ -315,7 +319,7 @@ private void moveLeafPage(final Object splitKey, final BTreePage oldRightChildPa
List<NetEndpoint> newReplicationEndpoints = P2pServer.instance.getReplicationEndpoints(db, List<NetEndpoint> newReplicationEndpoints = P2pServer.instance.getReplicationEndpoints(db,
new HashSet<>(oldReplicationEndpoints), candidateEndpoints); new HashSet<>(oldReplicationEndpoints), candidateEndpoints);


Session session = db.getLastSession(); Session session = db.createInternalSession();


LeafPageMovePlan leafPageMovePlan = null; LeafPageMovePlan leafPageMovePlan = null;
if (!oldReplicationEndpoints.isEmpty()) { if (!oldReplicationEndpoints.isEmpty()) {
Expand Down Expand Up @@ -465,7 +469,7 @@ private void removeLeafPage(Object key, BTreePage leafPage) {
oldReplicationEndpoints.remove(getLocalEndpoint()); oldReplicationEndpoints.remove(getLocalEndpoint());
Set<NetEndpoint> liveMembers = getCandidateEndpoints(); Set<NetEndpoint> liveMembers = getCandidateEndpoints();
liveMembers.removeAll(oldReplicationEndpoints); liveMembers.removeAll(oldReplicationEndpoints);
Session session = db.getLastSession(); Session session = db.createInternalSession();
ReplicationSession rs = P2pRouter.createReplicationSession(session, liveMembers, true); ReplicationSession rs = P2pRouter.createReplicationSession(session, liveMembers, true);
try (DataBuffer k = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) { try (DataBuffer k = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) {
ByteBuffer keyBuffer = k.write(keyType, key); ByteBuffer keyBuffer = k.write(keyType, key);
Expand Down Expand Up @@ -739,6 +743,9 @@ public void transferFrom(ReadableByteChannel src, long position, long count) thr
public synchronized void addLeafPage(ByteBuffer splitKey, ByteBuffer page) { public synchronized void addLeafPage(ByteBuffer splitKey, ByteBuffer page) {
if (splitKey == null) { if (splitKey == null) {
root = BTreePage.readPage(this, page); root = BTreePage.readPage(this, page);
if (!root.isLeaf() && !getName().endsWith("_0")) { // 只异步读非SYS表
root.readRemotePags();
}
return; return;
} }


Expand Down Expand Up @@ -944,8 +951,28 @@ public void replicateRootPage(DataBuffer p) {
root.movePage(p, NetEndpoint.getLocalTcpEndpoint()); root.movePage(p, NetEndpoint.getLocalTcpEndpoint());
} }


public void setOldEndpoints(String[] oldEndpoints) {
this.oldEndpoints = oldEndpoints;
}

public void setDatabase(Database db) {
this.db = db;
}

public void setRunMode(RunMode runMode) {
this.runMode = runMode;
}

private boolean isShardingMode() {
if (runMode != null) {
return runMode == RunMode.SHARDING;
}

return db.isShardingMode();
}

@Override @Override
public synchronized ByteBuffer readPage(ByteBuffer key, boolean last) { public ByteBuffer readPage(ByteBuffer key, boolean last) {
BTreePage p = root; BTreePage p = root;
Object k = keyType.read(key); Object k = keyType.read(key);
if (p.isLeaf()) { if (p.isLeaf()) {
Expand All @@ -962,12 +989,80 @@ public synchronized ByteBuffer readPage(ByteBuffer key, boolean last) {
} else { } else {
if (last) if (last)
index++; index++;
return movePage(parent.getChildPage(index)); return movePage(k, parent.getChildPage(index));
} }
} }
return null; return null;
} }


private ByteBuffer movePage(Object splitKey, BTreePage p) {
// RunMode runMode = db.getRunMode();
// String[] oldHostIds = db.getHostIds();
if (isShardingMode()) {
Set<NetEndpoint> candidateEndpoints = getCandidateEndpoints();
List<NetEndpoint> oldReplicationEndpoints = getReplicationEndpoints(oldEndpoints);

// List<NetEndpoint> newReplicationEndpoints = P2pServer.instance.getReplicationEndpoints(db,
// new HashSet<>(oldReplicationEndpoints), candidateEndpoint
// 允许选择原来的节点
List<NetEndpoint> newReplicationEndpoints = P2pServer.instance.getReplicationEndpoints(db, new HashSet<>(0),
candidateEndpoints);

if (oldEndpoints.length == 1 && newReplicationEndpoints.contains(oldReplicationEndpoints.get(0))) {
// return movePage(p);
}

Session session = db.createInternalSession();

LeafPageMovePlan leafPageMovePlan = null;
if (!oldReplicationEndpoints.isEmpty()) {
ReplicationSession rs = P2pRouter.createReplicationSession(session, oldReplicationEndpoints);
try (DataBuffer k = DataBuffer.create(); StorageCommand c = rs.createStorageCommand()) {
ByteBuffer keyBuffer = k.write(keyType, splitKey);
LeafPageMovePlan plan = new LeafPageMovePlan(P2pServer.instance.getLocalHostId(),
newReplicationEndpoints, keyBuffer);
leafPageMovePlan = c.prepareMoveLeafPage(getName(), plan);
}
}

if (leafPageMovePlan == null)
return null;

// 重新按splitKey找到rightChildPage,因为经过前面的操作后,
// 可能rightChildPage已经有新数据了,如果只移动老的,会丢失数据
BTreePage rightChildPage = setLeafPageMovePlan(splitKey, leafPageMovePlan);

if (!leafPageMovePlan.moverHostId.equals(P2pServer.instance.getLocalHostId())) {
rightChildPage.replicationHostIds = leafPageMovePlan.getReplicationEndpoints();
return null;
}

Set<NetEndpoint> otherEndpoints = new HashSet<>(candidateEndpoints);
otherEndpoints.removeAll(oldReplicationEndpoints);
otherEndpoints.removeAll(newReplicationEndpoints);

newReplicationEndpoints.removeAll(oldReplicationEndpoints);

// 移动右边的leafPage到新的复制节点(Page中包含数据)
if (!newReplicationEndpoints.isEmpty()) {
rightChildPage.replicationHostIds = New.arrayList();
ReplicationSession rs = P2pRouter.createReplicationSession(session, newReplicationEndpoints,
rightChildPage.replicationHostIds, true);
moveLeafPage(leafPageMovePlan, rightChildPage, rs, false);
}

// 移动右边的leafPage到其他节点(Page中不包含数据,只包含这个Page各数据副本所在节点信息)
if (!otherEndpoints.isEmpty()) {
ReplicationSession rs = P2pRouter.createReplicationSession(session, otherEndpoints, true);
moveLeafPage(leafPageMovePlan, rightChildPage, rs, true);
}

p = rightChildPage;
}

return movePage(p);
}

private ByteBuffer movePage(BTreePage p) { private ByteBuffer movePage(BTreePage p) {
try (DataBuffer buff = DataBuffer.create()) { try (DataBuffer buff = DataBuffer.create()) {
p.movePage(buff, getLocalEndpoint()); p.movePage(buff, getLocalEndpoint());
Expand All @@ -977,6 +1072,8 @@ private ByteBuffer movePage(BTreePage p) {
} }


BTreePage readRemotePage(PageReference ref, final String hostId) { BTreePage readRemotePage(PageReference ref, final String hostId) {
// List<NetEndpoint> replicationEndpoints = new ArrayList<>(1);
// replicationEndpoints.add(NetEndpoint.createTCP(hostId));
List<NetEndpoint> replicationEndpoints = getReplicationEndpoints(new String[] { hostId }); List<NetEndpoint> replicationEndpoints = getReplicationEndpoints(new String[] { hostId });
Session session = LealoneDatabase.getInstance().createInternalSession(); Session session = LealoneDatabase.getInstance().createInternalSession();
ReplicationSession rs = P2pRouter.createReplicationSession(session, replicationEndpoints); ReplicationSession rs = P2pRouter.createReplicationSession(session, replicationEndpoints);
Expand Down
Expand Up @@ -11,7 +11,9 @@
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;


import org.lealone.aose.storage.AOStorageService;
import org.lealone.common.compress.Compressor; import org.lealone.common.compress.Compressor;
import org.lealone.common.util.DataUtils; import org.lealone.common.util.DataUtils;
import org.lealone.db.DataBuffer; import org.lealone.db.DataBuffer;
Expand Down Expand Up @@ -1034,7 +1036,7 @@ public static class PageReference {
/** /**
* The page, if in memory, or null. * The page, if in memory, or null.
*/ */
final BTreePage page; BTreePage page;


/** /**
* The position, if known, or 0. * The position, if known, or 0.
Expand Down Expand Up @@ -1280,4 +1282,18 @@ static BTreePage readPage(BTreeMap<?, ?> map, ByteBuffer buff) {
p.remoteHostId = remoteHostId; p.remoteHostId = remoteHostId;
return p; return p;
} }

void readRemotePags() {
for (int i = 0, length = children.length; i < length; i++) {
final int index = i;
Callable<BTreePage> task = new Callable<BTreePage>() {
@Override
public BTreePage call() throws Exception {
BTreePage p = getChildPage(index);
return p;
}
};
AOStorageService.addTask(task);
}
}
} }

0 comments on commit 58e29b2

Please sign in to comment.