Skip to content

Commit

Permalink
expose sessionID and ping check
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Nov 14, 2022
1 parent f4431aa commit 3a9080e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* The user can directly read the data using the interface of ValueWrapper.
*/

public class Session implements Serializable {
public class Session implements Serializable, AutoCloseable {

private static final long serialVersionUID = -8855886967097862376L;

Expand Down Expand Up @@ -381,6 +381,7 @@ private boolean retryConnect() {

/**
* convert java list to nebula thrift list
*
* @param list java list
* @return nebula list
*/
Expand All @@ -394,11 +395,12 @@ private static NList list2Nlist(List<Object> list) throws UnsupportedOperationEx

/**
* convert java map to nebula thrift map
*
* @param map java map
* @return nebula map
*/
private static NMap map2Nmap(Map<String, Object> map) throws UnsupportedOperationException {
NMap nmap = new NMap(new HashMap<byte[],Value>());
NMap nmap = new NMap(new HashMap<byte[], Value>());
for (Map.Entry<String, Object> entry : map.entrySet()) {
nmap.kvs.put(entry.getKey().getBytes(), value2Nvalue(entry.getValue()));
}
Expand All @@ -408,6 +410,7 @@ private static NMap map2Nmap(Map<String, Object> map) throws UnsupportedOperatio

/**
* convert java value type to nebula thrift value type
*
* @param value java obj
* @return nebula value
*/
Expand All @@ -416,51 +419,56 @@ public static Value value2Nvalue(Object value) throws UnsupportedOperationExcept
if (value == null) {
nvalue.setNVal(NullType.__NULL__);
} else if (value instanceof Boolean) {
boolean bval = (Boolean)value;
boolean bval = (Boolean) value;
nvalue.setBVal(bval);
} else if (value instanceof Integer) {
int ival = (Integer)value;
int ival = (Integer) value;
nvalue.setIVal(ival);
} else if (value instanceof Short) {
int ival = (Short)value;
int ival = (Short) value;
nvalue.setIVal(ival);
} else if (value instanceof Byte) {
int ival = (Byte)value;
int ival = (Byte) value;
nvalue.setIVal(ival);
} else if (value instanceof Long) {
long ival = (Long)value;
long ival = (Long) value;
nvalue.setIVal(ival);
} else if (value instanceof Float) {
float fval = (Float)value;
float fval = (Float) value;
nvalue.setFVal(fval);
} else if (value instanceof Double) {
double dval = (Double)value;
double dval = (Double) value;
nvalue.setFVal(dval);
} else if (value instanceof String) {
byte[] sval = ((String)value).getBytes();
byte[] sval = ((String) value).getBytes();
nvalue.setSVal(sval);
} else if (value instanceof List) {
nvalue.setLVal(list2Nlist((List<Object>)value));
nvalue.setLVal(list2Nlist((List<Object>) value));
} else if (value instanceof Map) {
nvalue.setMVal(map2Nmap((Map<String, Object>)value));
nvalue.setMVal(map2Nmap((Map<String, Object>) value));
} else if (value instanceof Value) {
return (Value)value;
return (Value) value;
} else if (value instanceof Date) {
nvalue.setDVal((Date)value);
nvalue.setDVal((Date) value);
} else if (value instanceof Time) {
nvalue.setTVal((Time)value);
nvalue.setTVal((Time) value);
} else if (value instanceof Duration) {
nvalue.setDuVal((Duration)value);
nvalue.setDuVal((Duration) value);
} else if (value instanceof DateTime) {
nvalue.setDtVal((DateTime)value);
nvalue.setDtVal((DateTime) value);
} else if (value instanceof Geography) {
nvalue.setGgVal((Geography) value);
} else {
// unsupport other Value type, use this function carefully
throw new UnsupportedOperationException(
"Only support convert boolean/float/int/string/map/list to nebula.Value but was"
+ value.getClass().getTypeName());
+ value.getClass().getTypeName());
}
return nvalue;
}

@Override
public synchronized void close() {
release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ public class SessionWrapper implements Serializable {
private static final long serialVersionUID = -8128331485649098264L;

private final Session session;
private final long sessionID;
private final AtomicBoolean available = new AtomicBoolean(true);

public SessionWrapper(Session session) {
this.session = session;
this.sessionID = session.getSessionID();
}

/**
Expand All @@ -36,6 +38,10 @@ public ResultSet execute(String stmt)
return session.execute(stmt);
}

public boolean ping() {
return session.pingSession();
}

void setNoAvailable() {
this.available.set(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,32 @@ public void testBase() {
NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
nebulaPoolConfig.setMaxConnSize(1);
Assert.assertTrue(pool.init(Collections.singletonList(
new HostAddress("127.0.0.1", 9670)),
nebulaPoolConfig));
new HostAddress("127.0.0.1", 9670)),
nebulaPoolConfig));
Session session = pool.getSession("root", "nebula", true);
ResultSet resp = session.execute(
"CREATE SPACE IF NOT EXISTS test_session_manager(vid_type=INT);");
"CREATE SPACE IF NOT EXISTS test_session_manager(vid_type=INT);");
Assert.assertTrue(resp.getErrorMessage(), resp.isSucceeded());
session.release();
pool.close();
TimeUnit.SECONDS.sleep(3);
} catch (UnknownHostException
| NotValidConnectionException
| AuthFailedException
| InterruptedException
| ClientServerIncompatibleException e) {
| NotValidConnectionException
| AuthFailedException
| InterruptedException
| ClientServerIncompatibleException e) {
Assert.assertFalse(e.getMessage(), false);
}

SessionsManagerConfig config = new SessionsManagerConfig();
NebulaPoolConfig poolConfig = new NebulaPoolConfig();
poolConfig.setMaxConnSize(4);
config.setAddresses(Collections.singletonList(
new HostAddress("127.0.0.1", 9670)))
.setUserName("root")
.setPassword("nebula")
.setSpaceName("test_session_manager")
.setPoolConfig(poolConfig);
new HostAddress("127.0.0.1", 9670)))
.setUserName("root")
.setPassword("nebula")
.setSpaceName("test_session_manager")
.setPoolConfig(poolConfig);
SessionsManager sessionsManager = new SessionsManager(config);
// Gets the session of the specified space
SessionWrapper session = sessionsManager.getSessionWrapper();
Expand All @@ -75,7 +75,7 @@ public void testBase() {
Assert.fail();
} catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().contains(
"The SessionsManager does not have available sessions."));
"The SessionsManager does not have available sessions."));
Assert.assertTrue(e.getMessage(), true);
}

Expand Down Expand Up @@ -121,6 +121,16 @@ public void testBase() {
assert false;
}

// Test ping
try {
session = sessionsManager.getSessionWrapper();
assert session.ping();
session.release();
assert (!session.ping());
} catch (Exception e) {
assert false;
}

} catch (InvalidConfigException | IOErrorException | ClientServerIncompatibleException e) {
Assert.fail();
}
Expand Down

0 comments on commit 3a9080e

Please sign in to comment.