Skip to content

Commit

Permalink
JDBC Client依据所连数据库的运行模式自动选择合适的节点
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 11, 2017
1 parent a631cff commit f43963f
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 26 deletions.
23 changes: 23 additions & 0 deletions lealone-aose/src/main/java/org/lealone/aose/router/P2PRouter.java
Expand Up @@ -33,6 +33,7 @@
import org.lealone.api.ErrorCode;
import org.lealone.common.exceptions.DbException;
import org.lealone.db.Command;
import org.lealone.db.Constants;
import org.lealone.db.Database;
import org.lealone.db.LealoneDatabase;
import org.lealone.db.RunMode;
Expand Down Expand Up @@ -202,4 +203,26 @@ public int createDatabase(Database db, ServerSession currentSession) {
}
}

@Override
public String[] getEndpoints(Database db) {
Set<InetAddress> liveMembers;
int[] hostIds = db.getHostIds();
if (hostIds.length == 0) {
liveMembers = Gossiper.instance.getLiveMembers();
} else {
liveMembers = new HashSet<>(hostIds.length);
for (int hostId : hostIds) {
liveMembers.add(StorageServer.instance.getTopologyMetaData().getEndpointForHostId(hostId));
}
}

String[] endpoints = new String[liveMembers.size()];
int i = 0;
for (InetAddress inetAddress : liveMembers) {
// TODO 如何不用默认端口?
endpoints[i++] = inetAddress.getHostAddress() + ":" + Constants.DEFAULT_TCP_PORT;
}
return endpoints;
}

}
Expand Up @@ -104,4 +104,9 @@ public int[] getHostIds(Database db) {
public int createDatabase(Database db, ServerSession currentSession) {
return nestedRouter.createDatabase(db, currentSession);
}

@Override
public String[] getEndpoints(Database db) {
return nestedRouter.getEndpoints(db);
}
}
41 changes: 38 additions & 3 deletions lealone-client/src/main/java/org/lealone/client/ClientSession.java
Expand Up @@ -26,6 +26,7 @@
import org.lealone.db.ConnectionInfo;
import org.lealone.db.Constants;
import org.lealone.db.DataHandler;
import org.lealone.db.RunMode;
import org.lealone.db.Session;
import org.lealone.db.SessionBase;
import org.lealone.db.SetTypes;
Expand Down Expand Up @@ -101,18 +102,49 @@ public int getSessionId() {
*/
@Override
public Session connectEmbeddedOrServer() {
return connectEmbeddedOrServer(true);
}

@Override
public Session connectEmbeddedOrServer(boolean first) {
if (ci.isRemote()) {
connectServer();
if (first) {
if (getRunMode() == RunMode.REPLICATION) {
ConnectionInfo ci = this.ci;
String[] servers = StringUtils.arraySplit(getTargetEndpoints(), ',', true);
int size = servers.length;
Session[] sessions = new ClientSession[size];
for (int i = 0; i < size; i++) {
ci = this.ci.copy(servers[i]);
sessions[i] = new ClientSession(ci);
sessions[i] = sessions[i].connectEmbeddedOrServer(false);
}
return new ReplicationSession(sessions);
}
if (isInvalid()) {
switch (getRunMode()) {
case CLIENT_SERVER:
case SHARDING: {
ConnectionInfo ci = this.ci.copy(getTargetEndpoints());
ClientSession session = new ClientSession(ci);
return session.connectEmbeddedOrServer(false);
}
default:
return this;
}
}
}
return this;
} else if (ci.isReplicaSetMode()) {
ConnectionInfo ci = this.ci;
String[] servers = StringUtils.arraySplit(ci.getServers(), ',', true);
int size = servers.length;
Session[] sessions = new ClientSession[size];
for (int i = 0; i < size; i++) {
ci = this.ci.copyForReplicaSet(servers[i]);
ci = this.ci.copy(servers[i]);
sessions[i] = new ClientSession(ci);
sessions[i] = sessions[i].connectEmbeddedOrServer();
sessions[i] = sessions[i].connectEmbeddedOrServer(false);
}
return new ReplicationSession(sessions);
}
Expand Down Expand Up @@ -217,6 +249,7 @@ private Transfer initTransfer(ConnectionInfo ci, String server) throws Exception
if (res.succeeded()) {
NetSocket socket = res.result();
asyncConnection = new AsyncConnection(socket, false);
asyncConnection.setHostAndPort(hostAndPort);
asyncConnections.put(hostAndPort, asyncConnection);
socket.handler(asyncConnection);
latch.countDown();
Expand Down Expand Up @@ -349,11 +382,13 @@ public void close() {

synchronized (ClientSession.class) {
if (asyncConnection.isEmpty()) {
asyncConnections.remove(asyncConnection.getHostAndPort());
}
if (asyncConnections.isEmpty()) {
client.close();
vertx.close();
client = null;
vertx = null;
asyncConnections.clear();
}
}
} catch (RuntimeException e) {
Expand Down
Expand Up @@ -732,7 +732,7 @@ public boolean isReplicaSetMode() {
return isReplicaSetMode;
}

public ConnectionInfo copyForReplicaSet(String newServer) {
public ConnectionInfo copy(String newServer) {
ConnectionInfo ci = new ConnectionInfo();
ci.prop.putAll(prop);
StringBuilder buff = new StringBuilder(Constants.URL_PREFIX);
Expand Down
13 changes: 13 additions & 0 deletions lealone-common/src/main/java/org/lealone/db/Session.java
Expand Up @@ -162,6 +162,8 @@ public interface Session extends Closeable, Transaction.Participant {

Session connectEmbeddedOrServer();

Session connectEmbeddedOrServer(boolean first);

String getURL();

String getReplicationName();
Expand All @@ -188,4 +190,15 @@ public interface Session extends Closeable, Transaction.Participant {

SessionStatus getStatus();

void setInvalid(boolean v);

boolean isInvalid();

void setTargetEndpoints(String targetEndpoints);

String getTargetEndpoints();

void setRunMode(RunMode runMode);

RunMode getRunMode();
}
34 changes: 34 additions & 0 deletions lealone-common/src/main/java/org/lealone/db/SessionBase.java
Expand Up @@ -29,6 +29,10 @@ public abstract class SessionBase implements Session {
protected AtomicInteger nextId = new AtomicInteger(0);
protected Runnable runnable;

protected String targetEndpoints;
protected RunMode runMode;
protected boolean invalid;

@Override
public String getReplicationName() {
return replicationName;
Expand Down Expand Up @@ -91,4 +95,34 @@ public void prepareCommit() {
public SessionStatus getStatus() {
return SessionStatus.NO_TRANSACTION;
}

@Override
public void setInvalid(boolean v) {
invalid = v;
}

@Override
public boolean isInvalid() {
return invalid;
}

@Override
public void setTargetEndpoints(String targetEndpoints) {
this.targetEndpoints = targetEndpoints;
}

@Override
public String getTargetEndpoints() {
return targetEndpoints;
}

@Override
public void setRunMode(RunMode runMode) {
this.runMode = runMode;
}

@Override
public RunMode getRunMode() {
return runMode;
}
}
Expand Up @@ -218,6 +218,11 @@ public Session connectEmbeddedOrServer() {
return sessions[0].connectEmbeddedOrServer();
}

@Override
public Session connectEmbeddedOrServer(boolean first) {
return sessions[0].connectEmbeddedOrServer(first);
}

@Override
public String getURL() {
return sessions[0].getURL();
Expand Down
20 changes: 19 additions & 1 deletion lealone-db/src/main/java/org/lealone/db/Database.java
Expand Up @@ -178,7 +178,7 @@ public Object getAuthLock() {
private Map<String, String> replicationProperties;
private ReplicationPropertiesChangeListener replicationPropertiesChangeListener;

private RunMode runMode;
private RunMode runMode = RunMode.CLIENT_SERVER;

public Database(int id, String name, Map<String, String> parameters) {
this.id = id;
Expand Down Expand Up @@ -2272,6 +2272,24 @@ public int[] getHostIds() {
return hostIds;
}

private String[] endpoints;

public String[] getEndpoints() {
if (endpoints == null) {
if (parameters != null && parameters.containsKey("endpoints")) {
endpoints = StringUtils.arraySplit(parameters.get("endpoints"), ',', true);
}
}
return endpoints;
}

public String getTargetEndpoints() {
if (parameters != null) {
return parameters.get("endpoints");
}
return null;
}

private java.sql.PreparedStatement psGetVersion;
private java.sql.PreparedStatement psUpdateVersion;

Expand Down
54 changes: 52 additions & 2 deletions lealone-db/src/main/java/org/lealone/db/DatabaseEngine.java
Expand Up @@ -19,6 +19,8 @@
*/
package org.lealone.db;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;

import org.lealone.api.ErrorCode;
Expand Down Expand Up @@ -86,6 +88,9 @@ public synchronized ServerSession createSession(ConnectionInfo ci) {
// ignore
}
}
if (session.isInvalid()) {
return session;
}

initSession(session, ci);
validateUserAndPassword(true);
Expand Down Expand Up @@ -121,10 +126,53 @@ private ServerSession createSession(String dbName, ConnectionInfo ci, boolean if
// 把当前连接进来的用户当成Admin
user = database.createAdminUser(userName, userPasswordHash);
}
}

String url = ci.getURL();
int pos1 = url.indexOf("//") + 2;
String host;
String port;
int pos2 = url.indexOf(':', pos1);
int pos3 = url.indexOf('/', pos1);
if (pos2 != -1) {
host = url.substring(pos1, pos2);
port = url.substring(pos2 + 1, pos3);
} else {
if (!database.isInitialized())
database.init();
host = url.substring(pos1, pos3);
port = String.valueOf(Constants.DEFAULT_TCP_PORT);
}
String currentEndpoint;
try {
currentEndpoint = InetAddress.getByName(host).getHostAddress() + ":" + port;
} catch (UnknownHostException e) {
throw DbException.convert(e);
}
String targetEndpoints;
String[] endpoints = database.getEndpoints();
boolean isTargetEndpoint = false;
if (endpoints != null) {
for (String e : endpoints) {
if (e.equalsIgnoreCase(currentEndpoint)) {
isTargetEndpoint = true;
break;
}
}
targetEndpoints = database.getTargetEndpoints();
} else {
isTargetEndpoint = true;
targetEndpoints = currentEndpoint;
}
if (!isTargetEndpoint) {
ServerSession session = new ServerSession(database,
LealoneDatabase.getInstance().getSystemSession().getUser(), 0);
session.setTargetEndpoints(targetEndpoints);
session.setRunMode(database.getRunMode());
session.setInvalid(true);
return session;
}

if (!database.isInitialized())
database.init();

synchronized (database) {
if (opened) {
Expand Down Expand Up @@ -157,6 +205,8 @@ private ServerSession createSession(String dbName, ConnectionInfo ci, boolean if
}
ServerSession session = database.createSession(user);
session.setConnectionInfo(ci);
session.setTargetEndpoints(targetEndpoints);
session.setRunMode(database.getRunMode());
return session;
}
}
Expand Down
5 changes: 5 additions & 0 deletions lealone-db/src/main/java/org/lealone/db/ServerSession.java
Expand Up @@ -1327,6 +1327,11 @@ public Session connectEmbeddedOrServer() {
return this;
}

@Override
public Session connectEmbeddedOrServer(boolean first) {
return this;
}

@Override
public String getURL() {
return connectionInfo == null ? null : connectionInfo.getURL();
Expand Down

0 comments on commit f43963f

Please sign in to comment.