Skip to content

Commit

Permalink
ClientSession只处理与远程服务器的连接信息不再包含嵌入式模式的代码逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 22, 2017
1 parent 7129c14 commit ef97485
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 65 deletions.
26 changes: 11 additions & 15 deletions lealone-client/src/main/java/org/lealone/client/ClientSession.java
Expand Up @@ -84,9 +84,12 @@ public class ClientSession extends SessionBase implements DataHandler, Transacti
private AsyncConnection asyncConnection; private AsyncConnection asyncConnection;
private InetSocketAddress inetSocketAddress; private InetSocketAddress inetSocketAddress;


public ClientSession(ConnectionInfo ci) { ClientSession(ConnectionInfo ci) {
if (!ci.isRemote()) {
throw DbException.throwInternalError();
}
this.ci = ci; this.ci = ci;
if (ci.isRemote() && vertx == null) { if (vertx == null) {
synchronized (ClientSession.class) { synchronized (ClientSession.class) {
if (vertx == null) { if (vertx == null) {
vertx = NetFactory.getVertx(ci.getProperties()); vertx = NetFactory.getVertx(ci.getProperties());
Expand All @@ -103,24 +106,17 @@ public int getSessionId() {
} }


/** /**
* Open a new (remote or embedded) session. * Open a new session.
* *
* @return the session * @return the session
*/ */
@Override @Override
public Session connectEmbeddedOrServer() { public Session connect() {
return connectEmbeddedOrServer(true); return connect(true);
} }


@Override @Override
public Session connectEmbeddedOrServer(boolean first) { public Session connect(boolean first) {
if (!ci.isRemote()) {
try {
return ci.getSessionFactory().createSession(ci);
} catch (Exception e) {
throw DbException.convert(e);
}
}
connectServer(); connectServer();
if (first) { if (first) {
if (getRunMode() == RunMode.REPLICATION) { if (getRunMode() == RunMode.REPLICATION) {
Expand All @@ -139,7 +135,7 @@ public Session connectEmbeddedOrServer(boolean first) {
} }
ci = this.ci.copy(servers[i]); ci = this.ci.copy(servers[i]);
sessions[i] = new ClientSession(ci); sessions[i] = new ClientSession(ci);
sessions[i] = sessions[i].connectEmbeddedOrServer(false); sessions[i] = sessions[i].connect(false);
} }
return new ReplicationSession(sessions); return new ReplicationSession(sessions);
} }
Expand All @@ -150,7 +146,7 @@ public Session connectEmbeddedOrServer(boolean first) {
ConnectionInfo ci = this.ci.copy(getTargetEndpoints()); ConnectionInfo ci = this.ci.copy(getTargetEndpoints());
ClientSession session = new ClientSession(ci); ClientSession session = new ClientSession(ci);
this.close(); // 关闭当前session,因为连到的节点不是所要的 this.close(); // 关闭当前session,因为连到的节点不是所要的
return session.connectEmbeddedOrServer(false); return session.connect(false);
} }
default: default:
return this; return this;
Expand Down
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.Executor; import java.util.concurrent.Executor;


import org.lealone.api.ErrorCode; import org.lealone.api.ErrorCode;
import org.lealone.client.ClientSession;
import org.lealone.common.exceptions.DbException; import org.lealone.common.exceptions.DbException;
import org.lealone.common.trace.TraceObject; import org.lealone.common.trace.TraceObject;
import org.lealone.common.util.CloseWatcher; import org.lealone.common.util.CloseWatcher;
Expand Down Expand Up @@ -88,15 +87,8 @@ public class JdbcConnection extends TraceObject implements Connection {
public JdbcConnection(String url, Properties info) throws SQLException { public JdbcConnection(String url, Properties info) throws SQLException {
try { try {
ConnectionInfo ci = new ConnectionInfo(url, info); ConnectionInfo ci = new ConnectionInfo(url, info);
if (!ci.isRemote()) {
String baseDir = SysProperties.getBaseDirSilently();
if (baseDir != null) {
ci.setBaseDir(baseDir);
}
}

// this will return an embedded or server connection // this will return an embedded or server connection
session = new ClientSession(ci).connectEmbeddedOrServer(); session = ci.createSession().connect();
trace = session.getTrace(); trace = session.getTrace();
int id = getNextTraceId(TraceObject.CONNECTION); int id = getNextTraceId(TraceObject.CONNECTION);
setTrace(trace, TraceObject.CONNECTION, id); setTrace(trace, TraceObject.CONNECTION, id);
Expand Down Expand Up @@ -184,8 +176,8 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency) th
try { try {
int id = getNextTraceId(TraceObject.STATEMENT); int id = getNextTraceId(TraceObject.STATEMENT);
if (isDebugEnabled()) { if (isDebugEnabled()) {
debugCodeAssign("Statement", TraceObject.STATEMENT, id, "createStatement(" + resultSetType + ", " debugCodeAssign("Statement", TraceObject.STATEMENT, id,
+ resultSetConcurrency + ")"); "createStatement(" + resultSetType + ", " + resultSetConcurrency + ")");
} }
checkTypeConcurrency(resultSetType, resultSetConcurrency); checkTypeConcurrency(resultSetType, resultSetConcurrency);
checkClosed(); checkClosed();
Expand Down Expand Up @@ -236,8 +228,8 @@ public PreparedStatement prepareStatement(String sql) throws SQLException {
try { try {
int id = getNextTraceId(TraceObject.PREPARED_STATEMENT); int id = getNextTraceId(TraceObject.PREPARED_STATEMENT);
if (isDebugEnabled()) { if (isDebugEnabled()) {
debugCodeAssign("PreparedStatement", TraceObject.PREPARED_STATEMENT, id, "prepareStatement(" debugCodeAssign("PreparedStatement", TraceObject.PREPARED_STATEMENT, id,
+ quote(sql) + ")"); "prepareStatement(" + quote(sql) + ")");
} }
checkClosed(); checkClosed();
sql = translateSQL(sql); sql = translateSQL(sql);
Expand All @@ -259,8 +251,8 @@ PreparedStatement prepareAutoCloseStatement(String sql) throws SQLException {
try { try {
int id = getNextTraceId(TraceObject.PREPARED_STATEMENT); int id = getNextTraceId(TraceObject.PREPARED_STATEMENT);
if (isDebugEnabled()) { if (isDebugEnabled()) {
debugCodeAssign("PreparedStatement", TraceObject.PREPARED_STATEMENT, id, "prepareStatement(" debugCodeAssign("PreparedStatement", TraceObject.PREPARED_STATEMENT, id,
+ quote(sql) + ")"); "prepareStatement(" + quote(sql) + ")");
} }
checkClosed(); checkClosed();
sql = translateSQL(sql); sql = translateSQL(sql);
Expand Down Expand Up @@ -591,8 +583,8 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res
try { try {
int id = getNextTraceId(TraceObject.PREPARED_STATEMENT); int id = getNextTraceId(TraceObject.PREPARED_STATEMENT);
if (isDebugEnabled()) { if (isDebugEnabled()) {
debugCodeAssign("PreparedStatement", TraceObject.PREPARED_STATEMENT, id, "prepareStatement(" debugCodeAssign("PreparedStatement", TraceObject.PREPARED_STATEMENT, id,
+ quote(sql) + ", " + resultSetType + ", " + resultSetConcurrency + ")"); "prepareStatement(" + quote(sql) + ", " + resultSetType + ", " + resultSetConcurrency + ")");
} }
checkTypeConcurrency(resultSetType, resultSetConcurrency); checkTypeConcurrency(resultSetType, resultSetConcurrency);
checkClosed(); checkClosed();
Expand Down Expand Up @@ -819,8 +811,8 @@ public CallableStatement prepareCall(String sql) throws SQLException {
try { try {
int id = getNextTraceId(TraceObject.CALLABLE_STATEMENT); int id = getNextTraceId(TraceObject.CALLABLE_STATEMENT);
if (isDebugEnabled()) { if (isDebugEnabled()) {
debugCodeAssign("CallableStatement", TraceObject.CALLABLE_STATEMENT, id, "prepareCall(" + quote(sql) debugCodeAssign("CallableStatement", TraceObject.CALLABLE_STATEMENT, id,
+ ")"); "prepareCall(" + quote(sql) + ")");
} }
checkClosed(); checkClosed();
sql = translateSQL(sql); sql = translateSQL(sql);
Expand Down Expand Up @@ -848,8 +840,8 @@ public CallableStatement prepareCall(String sql, int resultSetType, int resultSe
try { try {
int id = getNextTraceId(TraceObject.CALLABLE_STATEMENT); int id = getNextTraceId(TraceObject.CALLABLE_STATEMENT);
if (isDebugEnabled()) { if (isDebugEnabled()) {
debugCodeAssign("CallableStatement", TraceObject.CALLABLE_STATEMENT, id, "prepareCall(" + quote(sql) debugCodeAssign("CallableStatement", TraceObject.CALLABLE_STATEMENT, id,
+ ", " + resultSetType + ", " + resultSetConcurrency + ")"); "prepareCall(" + quote(sql) + ", " + resultSetType + ", " + resultSetConcurrency + ")");
} }
checkTypeConcurrency(resultSetType, resultSetConcurrency); checkTypeConcurrency(resultSetType, resultSetConcurrency);
checkClosed(); checkClosed();
Expand Down Expand Up @@ -996,9 +988,9 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res
try { try {
int id = getNextTraceId(TraceObject.PREPARED_STATEMENT); int id = getNextTraceId(TraceObject.PREPARED_STATEMENT);
if (isDebugEnabled()) { if (isDebugEnabled()) {
debugCodeAssign("PreparedStatement", TraceObject.PREPARED_STATEMENT, id, "prepareStatement(" debugCodeAssign("PreparedStatement", TraceObject.PREPARED_STATEMENT, id,
+ quote(sql) + ", " + resultSetType + ", " + resultSetConcurrency + ", " + resultSetHoldability "prepareStatement(" + quote(sql) + ", " + resultSetType + ", " + resultSetConcurrency + ", "
+ ")"); + resultSetHoldability + ")");
} }
checkTypeConcurrency(resultSetType, resultSetConcurrency); checkTypeConcurrency(resultSetType, resultSetConcurrency);
checkHoldability(resultSetHoldability); checkHoldability(resultSetHoldability);
Expand Down Expand Up @@ -1419,8 +1411,8 @@ public Blob createBlob() throws SQLException {
int id = getNextTraceId(TraceObject.BLOB); int id = getNextTraceId(TraceObject.BLOB);
debugCodeAssign("Blob", TraceObject.BLOB, id, "createClob()"); debugCodeAssign("Blob", TraceObject.BLOB, id, "createClob()");
checkClosed(); checkClosed();
Value v = session.getDataHandler().getLobStorage() Value v = session.getDataHandler().getLobStorage().createBlob(new ByteArrayInputStream(Utils.EMPTY_BYTES),
.createBlob(new ByteArrayInputStream(Utils.EMPTY_BYTES), 0); 0);
return new JdbcBlob(this, v, id); return new JdbcBlob(this, v, id);
} catch (Exception e) { } catch (Exception e) {
throw logAndConvert(e); throw logAndConvert(e);
Expand Down
30 changes: 21 additions & 9 deletions lealone-common/src/main/java/org/lealone/db/ConnectionInfo.java
Expand Up @@ -7,6 +7,7 @@
package org.lealone.db; package org.lealone.db;


import java.io.IOException; import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Properties; import java.util.Properties;
Expand Down Expand Up @@ -101,7 +102,6 @@ private static boolean isKnownSetting(String s) {


private SessionFactory sessionFactory; private SessionFactory sessionFactory;


private boolean isClient;
private Boolean persistent; // 首次调用isPersistent()时才初始化 private Boolean persistent; // 首次调用isPersistent()时才初始化


public ConnectionInfo() { public ConnectionInfo() {
Expand All @@ -118,14 +118,15 @@ public ConnectionInfo(String url, String dbName) { // 用于server端, 不需要
this.dbName = dbName; this.dbName = dbName;


checkURL(); checkURL();
embedded = false;
url = url.substring(Constants.URL_PREFIX.length()); url = url.substring(Constants.URL_PREFIX.length());
// server端接收到的URL不可能是嵌入式的 // server端接收到的URL不可能是嵌入式的
if (url.startsWith(Constants.URL_EMBED)) { if (url.startsWith(Constants.URL_EMBED)) {
throw DbException.throwInternalError("Server backend URL: " + this.url); throw DbException.throwInternalError("Server backend URL: " + this.url);
} }


remote = true; // server端的remote总是true // server端这两参数总是false
embedded = false;
remote = false;
} }


public ConnectionInfo(String url) { public ConnectionInfo(String url) {
Expand All @@ -148,6 +149,13 @@ public ConnectionInfo(String url, Properties prop) { // 用于client端,需要


setUserName(removeProperty("USER", "")); setUserName(removeProperty("USER", ""));
convertPasswords(); convertPasswords();

if (isEmbedded() && isPersistent()) {
String baseDir = SysProperties.getBaseDirSilently();
if (baseDir != null) {
setBaseDir(baseDir);
}
}
} }


private void checkURL() { private void checkURL() {
Expand Down Expand Up @@ -309,6 +317,10 @@ public boolean isRemote() {
return remote; return remote;
} }


public void setRemote(boolean b) {
remote = b;
}

/** /**
* Check if the referenced database is persistent. * Check if the referenced database is persistent.
* *
Expand Down Expand Up @@ -667,11 +679,15 @@ private DbException getFormatException() {
return DbException.get(ErrorCode.URL_FORMAT_ERROR_2, Constants.URL_FORMAT, url); return DbException.get(ErrorCode.URL_FORMAT_ERROR_2, Constants.URL_FORMAT, url);
} }


public Session createSession() throws SQLException {
return getSessionFactory().createSession(this);
}

public SessionFactory getSessionFactory() { public SessionFactory getSessionFactory() {
if (sessionFactory == null) { if (sessionFactory == null) {
try { try {
// 要使用反射,避免编译期依赖 // 要使用反射,避免编译期依赖
if (isClient) if (remote)
sessionFactory = (SessionFactory) Class.forName("org.lealone.client.ClientSessionFactory") sessionFactory = (SessionFactory) Class.forName("org.lealone.client.ClientSessionFactory")
.getMethod("getInstance").invoke(null); .getMethod("getInstance").invoke(null);
else else
Expand Down Expand Up @@ -711,10 +727,6 @@ public Properties getProperties() {
return prop; return prop;
} }


public void setClient(boolean b) {
isClient = b;
}

public ConnectionInfo copy(String newServer) { public ConnectionInfo copy(String newServer) {
ConnectionInfo ci = new ConnectionInfo(); ConnectionInfo ci = new ConnectionInfo();
ci.prop.putAll(prop); ci.prop.putAll(prop);
Expand All @@ -732,7 +744,7 @@ public ConnectionInfo copy(String newServer) {
ci.embedded = embedded; ci.embedded = embedded;
ci.servers = newServer; ci.servers = newServer;
ci.sessionFactory = sessionFactory; ci.sessionFactory = sessionFactory;
ci.isClient = isClient; ci.persistent = persistent;
return ci; return ci;
} }
} }
4 changes: 2 additions & 2 deletions lealone-common/src/main/java/org/lealone/db/Session.java
Expand Up @@ -162,9 +162,9 @@ public interface Session extends Closeable, Transaction.Participant {


void commit(String allLocalTransactionNames); void commit(String allLocalTransactionNames);


Session connectEmbeddedOrServer(); Session connect();


Session connectEmbeddedOrServer(boolean first); Session connect(boolean first);


String getURL(); String getURL();


Expand Down
Expand Up @@ -238,13 +238,13 @@ public void commit(String allLocalTransactionNames) {
} }


@Override @Override
public Session connectEmbeddedOrServer() { public Session connect() {
return sessions[0].connectEmbeddedOrServer(); return sessions[0].connect();
} }


@Override @Override
public Session connectEmbeddedOrServer(boolean first) { public Session connect(boolean first) {
return sessions[0].connectEmbeddedOrServer(first); return sessions[0].connect(first);
} }


@Override @Override
Expand Down
4 changes: 2 additions & 2 deletions lealone-db/src/main/java/org/lealone/db/ServerSession.java
Expand Up @@ -1310,12 +1310,12 @@ public void setTransaction(Transaction transaction) {
} }


@Override @Override
public Session connectEmbeddedOrServer() { public Session connect() {
return this; return this;
} }


@Override @Override
public Session connectEmbeddedOrServer(boolean first) { public Session connect(boolean first) {
return this; return this;
} }


Expand Down
8 changes: 5 additions & 3 deletions lealone-db/src/main/java/org/lealone/db/SessionPool.java
Expand Up @@ -66,10 +66,12 @@ public static Session getSession(ServerSession originalSession, String url, bool
ci.setFilePasswordHash(oldCi.getFilePasswordHash()); ci.setFilePasswordHash(oldCi.getFilePasswordHash());
ci.setFileEncryptionKey(oldCi.getFileEncryptionKey()); ci.setFileEncryptionKey(oldCi.getFileEncryptionKey());
if (usesClientSession) if (usesClientSession)
ci.setClient(true); ci.setRemote(true);
else
ci.setRemote(false);
try { try {
// 因为已经精确知道要连哪个节点了,connectEmbeddedOrServer不用考虑运行模式,所以用false // 因为已经精确知道要连哪个节点了,connect不用考虑运行模式,所以用false
session = ci.getSessionFactory().createSession(ci).connectEmbeddedOrServer(false); session = ci.createSession().connect(false);
session.setLocal(true); session.setLocal(true);
} catch (SQLException e) { } catch (SQLException e) {
throw DbException.convert(e); throw DbException.convert(e);
Expand Down
Expand Up @@ -165,7 +165,7 @@ public void writeInitPacket(Session session, int sessionId, Transfer transfer, C
transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_1); // minClientVersion transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_1); // minClientVersion
transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_1); // maxClientVersion transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_1); // maxClientVersion
transfer.writeString(hostAndPort); transfer.writeString(hostAndPort);
transfer.writeString(ci.getDatabaseName()); transfer.writeString(ci.getDatabaseShortName());
transfer.writeString(ci.getURL()); // 不带参数的URL transfer.writeString(ci.getURL()); // 不带参数的URL
transfer.writeString(ci.getUserName()); transfer.writeString(ci.getUserName());
transfer.writeBytes(ci.getUserPasswordHash()); transfer.writeBytes(ci.getUserPasswordHash());
Expand Down Expand Up @@ -266,7 +266,7 @@ private Session createSession(Transfer transfer, String originalURL, String dbNa
} }


try { try {
Session session = ci.getSessionFactory().createSession(ci); Session session = ci.createSession();
if (ci.getProperty("IS_LOCAL") != null) if (ci.getProperty("IS_LOCAL") != null)
session.setLocal(Boolean.parseBoolean(ci.getProperty("IS_LOCAL"))); session.setLocal(Boolean.parseBoolean(ci.getProperty("IS_LOCAL")));
return session; return session;
Expand Down
Expand Up @@ -33,7 +33,7 @@ public void run() {
assertTrue(ci.isEmbedded()); assertTrue(ci.isEmbedded());
assertTrue(ci.isPersistent()); assertTrue(ci.isPersistent());
assertFalse(ci.isRemote()); assertFalse(ci.isRemote());
assertTrue(ci.getDatabaseName() != null && ci.getDatabaseName().endsWith(DB_NAME)); assertTrue(ci.getDatabaseShortName() != null && ci.getDatabaseShortName().endsWith(DB_NAME));
assertNull(ci.getServers()); assertNull(ci.getServers());


setEmbedded(false); setEmbedded(false);
Expand All @@ -42,7 +42,7 @@ public void run() {
assertFalse(ci.isEmbedded()); assertFalse(ci.isEmbedded());
assertFalse(ci.isPersistent()); // TCP类型的URL在Client端建立连接时无法确定是否是Persistent,所以为false assertFalse(ci.isPersistent()); // TCP类型的URL在Client端建立连接时无法确定是否是Persistent,所以为false
assertTrue(ci.isRemote()); assertTrue(ci.isRemote());
assertEquals(DB_NAME, ci.getDatabaseName()); assertEquals(DB_NAME, ci.getDatabaseShortName());
assertEquals(getHostAndPort(), ci.getServers()); assertEquals(getHostAndPort(), ci.getServers());


try { try {
Expand Down

0 comments on commit ef97485

Please sign in to comment.