Skip to content

Commit

Permalink
修复大量与sharding相关的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 9, 2017
1 parent f94e9a7 commit db029f6
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 82 deletions.
Expand Up @@ -24,6 +24,7 @@
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;


import org.lealone.aose.config.ConfigDescriptor;
import org.lealone.aose.gms.Gossiper; import org.lealone.aose.gms.Gossiper;
import org.lealone.aose.locator.AbstractReplicationStrategy; import org.lealone.aose.locator.AbstractReplicationStrategy;
import org.lealone.aose.server.ClusterMetaData; import org.lealone.aose.server.ClusterMetaData;
Expand Down Expand Up @@ -86,6 +87,7 @@ private int executeDefineStatement(DefineStatement defineStatement) {
sessions[i++] = SessionPool.getSession(s, s.getURL(ia), !Utils.getBroadcastAddress().equals(ia)); sessions[i++] = SessionPool.getSession(s, s.getURL(ia), !Utils.getBroadcastAddress().equals(ia));


ReplicationSession rs = new ReplicationSession(sessions); ReplicationSession rs = new ReplicationSession(sessions);
rs.setRpcTimeout(ConfigDescriptor.getRpcTimeout());
Command c = null; Command c = null;
try { try {
c = rs.createCommand(defineStatement.getSQL(), -1); c = rs.createCommand(defineStatement.getSQL(), -1);
Expand Down
Expand Up @@ -15,6 +15,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;


import org.lealone.aose.config.ConfigDescriptor;
import org.lealone.aose.gms.Gossiper; import org.lealone.aose.gms.Gossiper;
import org.lealone.aose.server.StorageServer; import org.lealone.aose.server.StorageServer;
import org.lealone.aose.storage.AOStorage; import org.lealone.aose.storage.AOStorage;
Expand Down Expand Up @@ -118,7 +119,7 @@ protected BTreeMap(String name, DataType keyType, DataType valueType, Map<String
// 例如节点A执行完DDL后,加入了新节点B,然后在老的节点C上执行DDL,此时节点C就可能看到节点B的host_id了 // 例如节点A执行完DDL后,加入了新节点B,然后在老的节点C上执行DDL,此时节点C就可能看到节点B的host_id了
ArrayList<Integer> hostIds = StorageServer.instance.getTopologyMetaData().sortedHostIds(); ArrayList<Integer> hostIds = StorageServer.instance.getTopologyMetaData().sortedHostIds();
if (!hostIds.isEmpty()) { if (!hostIds.isEmpty()) {
Integer hostId = hostIds.get(name.hashCode() % hostIds.size()); Integer hostId = hostIds.get(Math.abs(name.hashCode() % hostIds.size()));
List<InetAddress> replicationEndpoints = StorageServer.instance.getReplicationEndpoints(db, hostId); List<InetAddress> replicationEndpoints = StorageServer.instance.getReplicationEndpoints(db, hostId);
int size = replicationEndpoints.size(); int size = replicationEndpoints.size();
root.replicationHostIds = new ArrayList<>(size); root.replicationHostIds = new ArrayList<>(size);
Expand Down Expand Up @@ -278,13 +279,13 @@ protected Object put(BTreePage p, Object key, Object value) {
return result; return result;
} }


private void moveLeafPage(Object splitKey, BTreePage rightChindPage) { private void moveLeafPage(Object splitKey, BTreePage rightChildPage) {
if (isShardingMode if (isShardingMode
&& rightChindPage.replicationHostIds.get(0).equals(StorageServer.instance.getLocalHostId())) { && rightChildPage.replicationHostIds.get(0).equals(StorageServer.instance.getLocalHostId())) {
Integer hostId = rightChindPage.replicationHostIds.get(0); Integer hostId = rightChildPage.replicationHostIds.get(0);
Integer nextHostId = StorageServer.instance.getTopologyMetaData().getNextHostId(hostId); Integer nextHostId = StorageServer.instance.getTopologyMetaData().getNextHostId(hostId);
List<InetAddress> newReplicationEndpoints = StorageServer.instance.getReplicationEndpoints(db, nextHostId); List<InetAddress> newReplicationEndpoints = StorageServer.instance.getReplicationEndpoints(db, nextHostId);
List<InetAddress> oldReplicationEndpoints = getReplicationEndpoints(rightChindPage); List<InetAddress> oldReplicationEndpoints = getReplicationEndpoints(rightChildPage);
newReplicationEndpoints.remove(getLocalEndpoint()); newReplicationEndpoints.remove(getLocalEndpoint());
oldReplicationEndpoints.remove(getLocalEndpoint()); oldReplicationEndpoints.remove(getLocalEndpoint());
Set<InetAddress> liveMembers = Gossiper.instance.getLiveMembers(); Set<InetAddress> liveMembers = Gossiper.instance.getLiveMembers();
Expand All @@ -296,7 +297,7 @@ private void moveLeafPage(Object splitKey, BTreePage rightChindPage) {
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
moveTo.add(StorageServer.instance.getTopologyMetaData().getHostId(newReplicationEndpoints.get(i))); moveTo.add(StorageServer.instance.getTopologyMetaData().getHostId(newReplicationEndpoints.get(i)));
} }
rightChindPage.replicationHostIds = moveTo; rightChildPage.replicationHostIds = moveTo;


// move page // move page
Session session = ConnectionInfo.getAndRemoveInternalSession(); Session session = ConnectionInfo.getAndRemoveInternalSession();
Expand All @@ -307,7 +308,8 @@ private void moveLeafPage(Object splitKey, BTreePage rightChindPage) {
sessions[i++] = SessionPool.getSession(s, s.getURL(ia)); sessions[i++] = SessionPool.getSession(s, s.getURL(ia));


ReplicationSession rs = new ReplicationSession(sessions); ReplicationSession rs = new ReplicationSession(sessions);
moveLeafPage(splitKey, rightChindPage, rs, false); rs.setRpcTimeout(ConfigDescriptor.getRpcTimeout());
moveLeafPage(splitKey, rightChildPage, rs, false);


// split root page // split root page
i = 0; i = 0;
Expand All @@ -317,11 +319,12 @@ private void moveLeafPage(Object splitKey, BTreePage rightChindPage) {
sessions[i++] = SessionPool.getSession(s, s.getURL(ia)); sessions[i++] = SessionPool.getSession(s, s.getURL(ia));


rs = new ReplicationSession(sessions); rs = new ReplicationSession(sessions);
moveLeafPage(splitKey, rightChindPage, rs, true); rs.setRpcTimeout(ConfigDescriptor.getRpcTimeout());
moveLeafPage(splitKey, rightChildPage, rs, true);
} }
} }


private void moveLeafPage(Object splitKey, BTreePage rightChindPage, ReplicationSession rs, boolean remote) { private void moveLeafPage(Object splitKey, BTreePage rightChildPage, ReplicationSession rs, boolean remote) {
StorageCommand c = null; StorageCommand c = null;
try { try {
c = rs.createStorageCommand(); c = rs.createStorageCommand();
Expand All @@ -336,7 +339,7 @@ private void moveLeafPage(Object splitKey, BTreePage rightChindPage, Replication


writeBuffer.clear(); writeBuffer.clear();


rightChindPage.writeLeaf(writeBuffer, remote); rightChildPage.writeLeaf(writeBuffer, remote);
buffer = writeBuffer.getBuffer(); buffer = writeBuffer.getBuffer();
buffer.flip(); buffer.flip();
ByteBuffer pageBuffer = ByteBuffer.allocateDirect(buffer.limit()); ByteBuffer pageBuffer = ByteBuffer.allocateDirect(buffer.limit());
Expand Down Expand Up @@ -452,6 +455,7 @@ private void removeLeafPage(Object key, BTreePage leafPage) {
sessions[i++] = SessionPool.getSession(s, s.getURL(ia)); sessions[i++] = SessionPool.getSession(s, s.getURL(ia));


ReplicationSession rs = new ReplicationSession(sessions); ReplicationSession rs = new ReplicationSession(sessions);
rs.setRpcTimeout(ConfigDescriptor.getRpcTimeout());
StorageCommand c = null; StorageCommand c = null;
try { try {
c = rs.createStorageCommand(); c = rs.createStorageCommand();
Expand Down Expand Up @@ -770,17 +774,20 @@ public InetAddress getLocalEndpoint() {
} }


@Override @Override
public Object put(Object key, Object value, DataType valueType, Session session) { public Object put(Object key, Object value, Session session) {
List<InetAddress> replicationEndpoints = getReplicationEndpoints(key); List<InetAddress> replicationEndpoints = getReplicationEndpoints(key);
InetAddress localEndpoint = getLocalEndpoint(); InetAddress localEndpoint = getLocalEndpoint();


Session[] sessions = new Session[replicationEndpoints.size()]; Session[] sessions = new Session[replicationEndpoints.size()];
ServerSession s = (ServerSession) session; ServerSession s = (ServerSession) session;
int i = 0; int i = 0;
for (InetAddress ia : replicationEndpoints) for (InetAddress ia : replicationEndpoints) {
sessions[i++] = SessionPool.getSession(s, s.getURL(ia), !localEndpoint.equals(ia)); sessions[i++] = localEndpoint.equals(ia) ? s
: SessionPool.getSession(s, s.getURL(ia), !localEndpoint.equals(ia));
}


ReplicationSession rs = new ReplicationSession(sessions); ReplicationSession rs = new ReplicationSession(sessions);
rs.setRpcTimeout(ConfigDescriptor.getRpcTimeout());
StorageCommand c = null; StorageCommand c = null;
try { try {
c = rs.createStorageCommand(); c = rs.createStorageCommand();
Expand Down Expand Up @@ -824,6 +831,7 @@ public Object get(Object key, Session session) {
sessions[i++] = SessionPool.getSession(s, s.getURL(ia), !localEndpoint.equals(ia)); sessions[i++] = SessionPool.getSession(s, s.getURL(ia), !localEndpoint.equals(ia));


ReplicationSession rs = new ReplicationSession(sessions); ReplicationSession rs = new ReplicationSession(sessions);
rs.setRpcTimeout(ConfigDescriptor.getRpcTimeout());
StorageCommand c = null; StorageCommand c = null;
try { try {
c = rs.createStorageCommand(); c = rs.createStorageCommand();
Expand Down
46 changes: 36 additions & 10 deletions lealone-client/src/main/java/org/lealone/client/ClientCommand.java
Expand Up @@ -9,6 +9,7 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;


import org.lealone.api.ErrorCode; import org.lealone.api.ErrorCode;
import org.lealone.async.AsyncHandler; import org.lealone.async.AsyncHandler;
Expand Down Expand Up @@ -447,12 +448,25 @@ public Object executePut(String replicationName, String mapName, ByteBuffer key,
transfer.writeInt(session.getSessionId()).writeString(mapName).writeByteBuffer(key).writeByteBuffer(value); transfer.writeInt(session.getSessionId()).writeString(mapName).writeByteBuffer(key).writeByteBuffer(value);
if (replicationName != null) if (replicationName != null)
transfer.writeString(replicationName); transfer.writeString(replicationName);
transfer.flush();

if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString());


bytes = transfer.readBytes(); AtomicReference<byte[]> resultRef = new AtomicReference<>();
AsyncCallback<Void> ac = new AsyncCallback<Void>() {
@Override
public void runInternal() {
try {
if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString());
byte[] bytes = transfer.readBytes();
resultRef.set(bytes);
} catch (IOException e) {
throw DbException.convert(e);
}
}
};
transfer.addAsyncCallback(id, ac);
transfer.flush();
ac.await();
bytes = resultRef.get();
} catch (Exception e) { } catch (Exception e) {
session.handleException(e); session.handleException(e);
} }
Expand All @@ -473,12 +487,24 @@ public Object executeGet(String mapName, ByteBuffer key) {
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_GET); transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_GET);
} }
transfer.writeInt(session.getSessionId()).writeString(mapName).writeByteBuffer(key); transfer.writeInt(session.getSessionId()).writeString(mapName).writeByteBuffer(key);
AtomicReference<byte[]> resultRef = new AtomicReference<>();
AsyncCallback<Void> ac = new AsyncCallback<Void>() {
@Override
public void runInternal() {
try {
if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString());
byte[] bytes = transfer.readBytes();
resultRef.set(bytes);
} catch (IOException e) {
throw DbException.convert(e);
}
}
};
transfer.addAsyncCallback(id, ac);
transfer.flush(); transfer.flush();

ac.await();
if (isDistributedUpdate) bytes = resultRef.get();
session.getTransaction().addLocalTransactionNames(transfer.readString());

bytes = transfer.readBytes();
} catch (Exception e) { } catch (Exception e) {
session.handleException(e); session.handleException(e);
} }
Expand Down
2 changes: 1 addition & 1 deletion lealone-common/src/main/java/org/lealone/db/Constants.java
Expand Up @@ -178,7 +178,7 @@ public class Constants {
/** /**
* The default page size to use for new databases. * The default page size to use for new databases.
*/ */
public static final int DEFAULT_PAGE_SIZE = 2048; public static final int DEFAULT_PAGE_SIZE = 16 * 1024; // 16K;


/** /**
* The default result set concurrency for statements created with * The default result set concurrency for statements created with
Expand Down
Expand Up @@ -22,15 +22,14 @@
import java.util.List; import java.util.List;


import org.lealone.db.Session; import org.lealone.db.Session;
import org.lealone.storage.type.DataType;


public interface Replication { public interface Replication {


List<InetAddress> getReplicationEndpoints(Object key); List<InetAddress> getReplicationEndpoints(Object key);


InetAddress getLocalEndpoint(); InetAddress getLocalEndpoint();


Object put(Object key, Object value, DataType valueType, Session session); Object put(Object key, Object value, Session session);


Object get(Object key, Session session); Object get(Object key, Session session);


Expand Down
Expand Up @@ -169,7 +169,7 @@ public void run() {
} }


try { try {
return writeResponseHandler.get(session.rpcTimeoutMillis); return writeResponseHandler.getUpdateCount(session.rpcTimeoutMillis);
} catch (WriteTimeoutException | WriteFailureException e) { } catch (WriteTimeoutException | WriteFailureException e) {
if (tries < session.maxRries) if (tries < session.maxRries)
return executeUpdate(++tries); return executeUpdate(++tries);
Expand Down Expand Up @@ -216,7 +216,7 @@ private Object executePut(final String mapName, final ByteBuffer key, final Byte
@Override @Override
public void run() { public void run() {
try { try {
writeResponseHandler.response(c.executePut(rn, mapName, key, value)); writeResponseHandler.response(c.executePut(rn, mapName, key.slice(), value.slice()));
} catch (Exception e) { } catch (Exception e) {
if (writeResponseHandler != null) if (writeResponseHandler != null)
writeResponseHandler.onFailure(); writeResponseHandler.onFailure();
Expand All @@ -232,11 +232,13 @@ public void run() {
} }


try { try {
return writeResponseHandler.get(session.rpcTimeoutMillis); return writeResponseHandler.getResult(session.rpcTimeoutMillis);
} catch (WriteTimeoutException | WriteFailureException e) { } catch (WriteTimeoutException | WriteFailureException e) {
if (tries < session.maxRries) if (tries < session.maxRries) {
return executeUpdate(++tries); key.rewind();
else { value.rewind();
return executePut(mapName, key, value, ++tries);
} else {
if (!exceptions.isEmpty()) if (!exceptions.isEmpty())
e.initCause(exceptions.get(0)); e.initCause(exceptions.get(0));
throw e; throw e;
Expand Down
Expand Up @@ -74,7 +74,7 @@ void onFailure() {
signal(); signal();
} }


int get(long rpcTimeoutMillis) { void await(long rpcTimeoutMillis) {
long requestTimeout = rpcTimeoutMillis; long requestTimeout = rpcTimeoutMillis;


// 超时时间把调用构造函数开始直到调用get前的这段时间也算在内 // 超时时间把调用构造函数开始直到调用get前的这段时间也算在内
Expand All @@ -101,10 +101,18 @@ int get(long rpcTimeoutMillis) {
if (!successful && totalBlockFor() + failures >= totalEndpoints()) { if (!successful && totalBlockFor() + failures >= totalEndpoints()) {
throw new WriteFailureException(ConsistencyLevel.QUORUM, ackCount(), failures, totalBlockFor()); throw new WriteFailureException(ConsistencyLevel.QUORUM, ackCount(), failures, totalBlockFor());
} }
}


int getUpdateCount(long rpcTimeoutMillis) {
await(rpcTimeoutMillis);
return updateCountList.get(0); return updateCountList.get(0);
} }


Object getResult(long rpcTimeoutMillis) {
await(rpcTimeoutMillis);
return resultList.get(0);
}

private void signal() { private void signal() {
condition.signalAll(); condition.signalAll();
} }
Expand Down
4 changes: 2 additions & 2 deletions lealone-db/src/main/java/org/lealone/db/Database.java
Expand Up @@ -117,7 +117,7 @@ public Object getAuthLock() {
private int nextSessionId; private int nextSessionId;
private int nextTempTableId; private int nextTempTableId;
private User systemUser; private User systemUser;
private ServerSession systemSession; private SystemSession systemSession;
private Table meta; private Table meta;
private Index metaIdIndex; private Index metaIdIndex;
private boolean starting; private boolean starting;
Expand Down Expand Up @@ -381,7 +381,7 @@ private void openDatabase() {
schemas.put(mainSchema.getName(), mainSchema); schemas.put(mainSchema.getName(), mainSchema);
schemas.put(infoSchema.getName(), infoSchema); schemas.put(infoSchema.getName(), infoSchema);


systemSession = new ServerSession(this, systemUser, ++nextSessionId); systemSession = new SystemSession(this, systemUser, ++nextSessionId);


// long t1 = System.currentTimeMillis(); // long t1 = System.currentTimeMillis();
openMetaTable(); openMetaTable();
Expand Down

0 comments on commit db029f6

Please sign in to comment.