Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose sessionID and ping check #490

Merged
merged 4 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* The user can directly read the data using the interface of ValueWrapper.
*/

public class Session implements Serializable {
public class Session implements Serializable, AutoCloseable {

private static final long serialVersionUID = -8855886967097862376L;

Expand Down Expand Up @@ -357,6 +357,13 @@ public synchronized HostAddress getGraphHost() {
return connection.getServerAddress();
}

/**
* get SessionID
*/
public long getSessionID() {
return sessionID;
}

/**
* set current connection is invalid, and get a new connection from the pool,
* if get connection failed, return false, else return true
Expand All @@ -381,6 +388,7 @@ private boolean retryConnect() {

/**
* convert java list to nebula thrift list
*
* @param list java list
* @return nebula list
*/
Expand All @@ -394,11 +402,12 @@ private static NList list2Nlist(List<Object> list) throws UnsupportedOperationEx

/**
* convert java map to nebula thrift map
*
* @param map java map
* @return nebula map
*/
private static NMap map2Nmap(Map<String, Object> map) throws UnsupportedOperationException {
NMap nmap = new NMap(new HashMap<byte[],Value>());
NMap nmap = new NMap(new HashMap<byte[], Value>());
for (Map.Entry<String, Object> entry : map.entrySet()) {
nmap.kvs.put(entry.getKey().getBytes(), value2Nvalue(entry.getValue()));
}
Expand All @@ -408,6 +417,7 @@ private static NMap map2Nmap(Map<String, Object> map) throws UnsupportedOperatio

/**
* convert java value type to nebula thrift value type
*
* @param value java obj
* @return nebula value
*/
Expand All @@ -416,51 +426,56 @@ public static Value value2Nvalue(Object value) throws UnsupportedOperationExcept
if (value == null) {
nvalue.setNVal(NullType.__NULL__);
} else if (value instanceof Boolean) {
boolean bval = (Boolean)value;
boolean bval = (Boolean) value;
nvalue.setBVal(bval);
} else if (value instanceof Integer) {
int ival = (Integer)value;
int ival = (Integer) value;
nvalue.setIVal(ival);
} else if (value instanceof Short) {
int ival = (Short)value;
int ival = (Short) value;
nvalue.setIVal(ival);
} else if (value instanceof Byte) {
int ival = (Byte)value;
int ival = (Byte) value;
nvalue.setIVal(ival);
} else if (value instanceof Long) {
long ival = (Long)value;
long ival = (Long) value;
nvalue.setIVal(ival);
} else if (value instanceof Float) {
float fval = (Float)value;
float fval = (Float) value;
nvalue.setFVal(fval);
} else if (value instanceof Double) {
double dval = (Double)value;
double dval = (Double) value;
nvalue.setFVal(dval);
} else if (value instanceof String) {
byte[] sval = ((String)value).getBytes();
byte[] sval = ((String) value).getBytes();
nvalue.setSVal(sval);
} else if (value instanceof List) {
nvalue.setLVal(list2Nlist((List<Object>)value));
nvalue.setLVal(list2Nlist((List<Object>) value));
} else if (value instanceof Map) {
nvalue.setMVal(map2Nmap((Map<String, Object>)value));
nvalue.setMVal(map2Nmap((Map<String, Object>) value));
} else if (value instanceof Value) {
return (Value)value;
return (Value) value;
} else if (value instanceof Date) {
nvalue.setDVal((Date)value);
nvalue.setDVal((Date) value);
} else if (value instanceof Time) {
nvalue.setTVal((Time)value);
nvalue.setTVal((Time) value);
} else if (value instanceof Duration) {
nvalue.setDuVal((Duration)value);
nvalue.setDuVal((Duration) value);
} else if (value instanceof DateTime) {
nvalue.setDtVal((DateTime)value);
nvalue.setDtVal((DateTime) value);
} else if (value instanceof Geography) {
nvalue.setGgVal((Geography) value);
} else {
// unsupport other Value type, use this function carefully
throw new UnsupportedOperationException(
"Only support convert boolean/float/int/string/map/list to nebula.Value but was"
+ value.getClass().getTypeName());
+ value.getClass().getTypeName());
}
return nvalue;
}

@Override
public synchronized void close() {
release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ public class SessionWrapper implements Serializable {
private static final long serialVersionUID = -8128331485649098264L;

private final Session session;
private final long sessionID;
private final AtomicBoolean available = new AtomicBoolean(true);

public SessionWrapper(Session session) {
this.session = session;
this.sessionID = session.getSessionID();
}

/**
Expand All @@ -36,6 +38,10 @@ public ResultSet execute(String stmt)
return session.execute(stmt);
}

public boolean ping() {
return session.pingSession();
}

void setNoAvailable() {
this.available.set(false);
}
Expand All @@ -46,6 +52,7 @@ boolean available() {

void release() {
session.release();
setNoAvailable();
}

Session getSession() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,32 @@ public void testBase() {
NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
nebulaPoolConfig.setMaxConnSize(1);
Assert.assertTrue(pool.init(Collections.singletonList(
new HostAddress("127.0.0.1", 9670)),
nebulaPoolConfig));
new HostAddress("127.0.0.1", 9670)),
nebulaPoolConfig));
Session session = pool.getSession("root", "nebula", true);
ResultSet resp = session.execute(
"CREATE SPACE IF NOT EXISTS test_session_manager(vid_type=INT);");
"CREATE SPACE IF NOT EXISTS test_session_manager(vid_type=INT);");
Assert.assertTrue(resp.getErrorMessage(), resp.isSucceeded());
session.release();
pool.close();
TimeUnit.SECONDS.sleep(3);
} catch (UnknownHostException
| NotValidConnectionException
| AuthFailedException
| InterruptedException
| ClientServerIncompatibleException e) {
| NotValidConnectionException
| AuthFailedException
| InterruptedException
| ClientServerIncompatibleException e) {
Assert.assertFalse(e.getMessage(), false);
}

SessionsManagerConfig config = new SessionsManagerConfig();
NebulaPoolConfig poolConfig = new NebulaPoolConfig();
poolConfig.setMaxConnSize(4);
config.setAddresses(Collections.singletonList(
new HostAddress("127.0.0.1", 9670)))
.setUserName("root")
.setPassword("nebula")
.setSpaceName("test_session_manager")
.setPoolConfig(poolConfig);
new HostAddress("127.0.0.1", 9670)))
.setUserName("root")
.setPassword("nebula")
.setSpaceName("test_session_manager")
.setPoolConfig(poolConfig);
SessionsManager sessionsManager = new SessionsManager(config);
// Gets the session of the specified space
SessionWrapper session = sessionsManager.getSessionWrapper();
Expand All @@ -75,7 +75,7 @@ public void testBase() {
Assert.fail();
} catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().contains(
"The SessionsManager does not have available sessions."));
"The SessionsManager does not have available sessions."));
Assert.assertTrue(e.getMessage(), true);
}

Expand All @@ -101,14 +101,30 @@ public void testBase() {
for (int i = 0; i < 3; i++) {
sessionsManager.returnSessionWrapper(sessionList.get(i));
}
sessionList.clear();
try {
for (int i = 0; i < 3; i++) {
sessionsManager.getSessionWrapper();
sessionList.add(sessionsManager.getSessionWrapper());
}
} catch (RuntimeException e) {
Assert.assertFalse(e.getMessage(), true);
}


// Test ping
for (int i = 0; i < 3; i++) {
sessionsManager.returnSessionWrapper(sessionList.get(i));
}

try {
session = sessionsManager.getSessionWrapper();
assert session.ping();
session.release();
assert (!session.ping());
} catch (Exception e) {
assert false;
}

// Test close
try {
session = sessionsManager.getSessionWrapper();
Expand All @@ -120,7 +136,6 @@ public void testBase() {
} catch (Exception e) {
assert false;
}

} catch (InvalidConfigException | IOErrorException | ClientServerIncompatibleException e) {
Assert.fail();
}
Expand Down