-
Notifications
You must be signed in to change notification settings - Fork 121
/
SessionPool.java
526 lines (471 loc) · 18.8 KB
/
SessionPool.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
package com.vesoft.nebula.client.graph;
import com.alibaba.fastjson.JSON;
import com.vesoft.nebula.ErrorCode;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.BindSpaceFailedException;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.net.AuthResult;
import com.vesoft.nebula.client.graph.net.SessionState;
import com.vesoft.nebula.client.graph.net.SyncConnection;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SessionPool implements Serializable {
private static final long serialVersionUID = 6051248334277617891L;
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ScheduledExecutorService healthCheckSchedule =
Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService sessionQueueMaintainSchedule =
Executors.newScheduledThreadPool(1);
public CopyOnWriteArrayList<NebulaSession> sessionList = new CopyOnWriteArrayList<>();
public AtomicInteger idleSessionSize = new AtomicInteger(0);
public AtomicBoolean hasInit = new AtomicBoolean(false);
public AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicInteger pos = new AtomicInteger(0);
private final SessionPoolConfig sessionPoolConfig;
private final int minSessionSize;
private final int maxSessionSize;
private final int cleanTime;
private final int healthCheckTime;
private final int retryTimes;
private final int intervalTime;
private final boolean reconnect;
private final String spaceName;
private final String useSpace;
public SessionPool(SessionPoolConfig poolConfig) {
this.sessionPoolConfig = poolConfig;
this.minSessionSize = poolConfig.getMinSessionSize();
this.maxSessionSize = poolConfig.getMaxSessionSize();
this.cleanTime = poolConfig.getCleanTime();
this.retryTimes = poolConfig.getRetryTimes();
this.intervalTime = poolConfig.getIntervalTime();
this.reconnect = poolConfig.isReconnect();
this.healthCheckTime = poolConfig.getHealthCheckTime();
this.spaceName = poolConfig.getSpaceName();
useSpace = "USE `" + spaceName + "`;";
init();
}
/**
* return an idle session
*/
private synchronized NebulaSession getSession() throws ClientServerIncompatibleException,
AuthFailedException, IOErrorException, BindSpaceFailedException {
int retry = sessionPoolConfig.getRetryConnectTimes();
while (retry-- >= 0) {
// if there are idle sessions, get session from queue
if (idleSessionSize.get() > 0) {
for (NebulaSession nebulaSession : sessionList) {
if (nebulaSession.isIdleAndSetUsed()) {
idleSessionSize.decrementAndGet();
return nebulaSession;
}
}
}
// if session size is less than max size, get session from pool
if (sessionList.size() < maxSessionSize) {
return createSessionObject(SessionState.USED);
}
// there's no available session, wait for SessionPoolConfig.getWaitTime and re-get
try {
Thread.sleep(sessionPoolConfig.getWaitTime());
} catch (InterruptedException e) {
log.error("getSession error when wait for idle sessions, ", e);
throw new RuntimeException(e);
}
}
// if session size is equal to max size and no idle session here, throw exception
throw new RuntimeException("no extra session available");
}
/**
* init the SessionPool
* this function is moved into SessionPool's constructor, no need to call it manually.
*/
@Deprecated
public boolean init() {
if (hasInit.get()) {
return true;
}
while (sessionList.size() < minSessionSize) {
try {
createSessionObject(SessionState.IDLE);
idleSessionSize.incrementAndGet();
} catch (Exception e) {
log.error("SessionPool init failed. ");
throw new RuntimeException("create session failed.", e);
}
}
healthCheckSchedule.scheduleAtFixedRate(this::checkSession, 0, healthCheckTime,
TimeUnit.SECONDS);
sessionQueueMaintainSchedule.scheduleAtFixedRate(this::updateSessionQueue, 0, cleanTime,
TimeUnit.SECONDS);
hasInit.compareAndSet(false, true);
return true;
}
/**
* Execute the nGql sentence.
*
* @param stmt The nGql sentence.
* such as insert ngql `INSERT VERTEX person(name) VALUES "Tom":("Tom");`
* @return The ResultSet
*/
public ResultSet execute(String stmt) throws IOErrorException,
ClientServerIncompatibleException, AuthFailedException, BindSpaceFailedException {
stmtCheck(stmt);
checkSessionPool();
NebulaSession nebulaSession = null;
ResultSet resultSet = null;
int tryTimes = 0;
while (tryTimes++ <= retryTimes) {
try {
nebulaSession = getSession();
resultSet = nebulaSession.execute(stmt);
if (resultSet.isSucceeded()
|| resultSet.getErrorCode() == ErrorCode.E_SEMANTIC_ERROR.getValue()
|| resultSet.getErrorCode() == ErrorCode.E_SYNTAX_ERROR.getValue()) {
releaseSession(nebulaSession);
return resultSet;
}
log.warn(String.format("execute error, code: %d, message: %s, retry: %d",
resultSet.getErrorCode(), resultSet.getErrorMessage(), tryTimes));
nebulaSession.release();
sessionList.remove(nebulaSession);
try {
Thread.sleep(intervalTime);
} catch (InterruptedException interruptedException) {
// ignore
}
} catch (ClientServerIncompatibleException e) {
// will never get here.
} catch (AuthFailedException | BindSpaceFailedException e) {
throw e;
} catch (IOErrorException e) {
if (nebulaSession != null) {
nebulaSession.release();
sessionList.remove(nebulaSession);
}
if (tryTimes < retryTimes) {
log.warn(String.format("execute failed for IOErrorException, message: %s, "
+ "retry: %d", e.getMessage(), tryTimes));
try {
Thread.sleep(intervalTime);
} catch (InterruptedException interruptedException) {
// ignore
}
} else {
throw e;
}
}
}
if (nebulaSession != null) {
nebulaSession.release();
sessionList.remove(nebulaSession);
}
return resultSet;
}
/**
* Execute the nGql sentence with parameter
*
* @param stmt The nGql sentence.
* @param parameterMap The nGql parameter map
* @return The ResultSet
*/
@Deprecated
public ResultSet execute(String stmt, Map<String, Object> parameterMap)
throws ClientServerIncompatibleException, AuthFailedException,
IOErrorException, BindSpaceFailedException {
stmtCheck(stmt);
checkSessionPool();
NebulaSession nebulaSession = getSession();
ResultSet resultSet;
try {
resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
// re-execute for session error
if (isSessionError(resultSet)) {
sessionList.remove(nebulaSession);
nebulaSession = getSession();
resultSet = nebulaSession.executeWithParameter(stmt, parameterMap);
}
} catch (IOErrorException e) {
useSpace(nebulaSession, null);
throw e;
}
useSpace(nebulaSession, resultSet);
return resultSet;
}
public String executeJson(String stmt)
throws ClientServerIncompatibleException, AuthFailedException,
IOErrorException, BindSpaceFailedException {
return executeJsonWithParameter(stmt, (Map<String, Object>) Collections.EMPTY_MAP);
}
public String executeJsonWithParameter(String stmt,
Map<String, Object> parameterMap)
throws ClientServerIncompatibleException, AuthFailedException,
IOErrorException, BindSpaceFailedException {
stmtCheck(stmt);
checkSessionPool();
NebulaSession nebulaSession = getSession();
String result;
try {
result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
// re-execute for session error
if (isSessionErrorForJson(result)) {
sessionList.remove(nebulaSession);
nebulaSession = getSession();
result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
}
} catch (IOErrorException e) {
if (e.getType() == IOErrorException.E_CONNECT_BROKEN) {
sessionList.remove(nebulaSession);
nebulaSession = getSession();
result = nebulaSession.executeJsonWithParameter(stmt, parameterMap);
return result;
}
useSpace(nebulaSession, null);
throw e;
}
useSpaceForJson(nebulaSession, result);
return result;
}
/**
* close the session pool
*/
public void close() {
if (isClosed.get()) {
return;
}
if (isClosed.compareAndSet(false, true)) {
for (NebulaSession nebulaSession : sessionList) {
nebulaSession.release();
}
sessionList.clear();
if (!healthCheckSchedule.isShutdown()) {
healthCheckSchedule.shutdown();
}
if (!sessionQueueMaintainSchedule.isShutdown()) {
sessionQueueMaintainSchedule.shutdown();
}
}
}
/**
* if the SessionPool has been initialized
*/
public boolean isActive() {
return hasInit.get();
}
/**
* if the SessionPool is closed
*/
public boolean isClosed() {
return isClosed.get();
}
/**
* get the number of all Session
*/
public int getSessionNums() {
return sessionList.size();
}
/**
* get the number of idle Session
*/
public int getIdleSessionNums() {
return idleSessionSize.get();
}
/**
* release the NebulaSession when finished the execution.
*/
private void releaseSession(NebulaSession nebulaSession) {
nebulaSession.isUsedAndSetIdle();
idleSessionSize.incrementAndGet();
}
/**
* check if session is valid, if session is invalid, remove it.
*/
private void checkSession() {
for (NebulaSession nebulaSession : sessionList) {
if (nebulaSession.isIdleAndSetUsed()) {
try {
idleSessionSize.decrementAndGet();
nebulaSession.execute("YIELD 1");
nebulaSession.isUsedAndSetIdle();
idleSessionSize.incrementAndGet();
} catch (IOErrorException e) {
log.error("session ping error, {}, remove current session.", e.getMessage());
nebulaSession.release();
sessionList.remove(nebulaSession);
}
}
}
}
/**
* update the session queue according to minSessionSize
*/
private void updateSessionQueue() {
// remove the idle sessions
if (idleSessionSize.get() > minSessionSize) {
synchronized (this) {
for (NebulaSession nebulaSession : sessionList) {
if (nebulaSession.isIdle()) {
nebulaSession.release();
sessionList.remove(nebulaSession);
if (idleSessionSize.decrementAndGet() <= minSessionSize) {
break;
}
}
}
}
}
}
/**
* create a {@link NebulaSession} with specified state
*
* @param state {@link SessionState}
* @return NebulaSession
*/
private NebulaSession createSessionObject(SessionState state)
throws ClientServerIncompatibleException, AuthFailedException,
IOErrorException, BindSpaceFailedException {
SyncConnection connection = new SyncConnection();
int tryConnect = sessionPoolConfig.getGraphAddressList().size();
// reconnect with all available address
while (tryConnect-- > 0) {
try {
if (sessionPoolConfig.isEnableSsl()) {
connection.open(getAddress(), sessionPoolConfig.getTimeout(),
sessionPoolConfig.getSslParam(),
sessionPoolConfig.isUseHttp2(),
sessionPoolConfig.getCustomHeaders());
} else {
connection.open(getAddress(), sessionPoolConfig.getTimeout(),
sessionPoolConfig.isUseHttp2(),
sessionPoolConfig.getCustomHeaders());
}
break;
} catch (Exception e) {
if (tryConnect == 0 || !reconnect) {
throw e;
} else {
log.warn("connect failed, " + e.getMessage());
}
}
}
AuthResult authResult;
try {
authResult = connection.authenticate(sessionPoolConfig.getUsername(),
sessionPoolConfig.getPassword());
} catch (AuthFailedException e) {
log.error(e.getMessage());
close();
throw e;
}
NebulaSession nebulaSession = new NebulaSession(connection, authResult.getSessionId(),
authResult.getTimezoneOffset(), state);
ResultSet result = nebulaSession.execute(useSpace);
if (!result.isSucceeded()) {
nebulaSession.release();
throw new BindSpaceFailedException(result.getErrorMessage());
}
sessionList.add(nebulaSession);
return nebulaSession;
}
public HostAddress getAddress() {
List<HostAddress> addresses = sessionPoolConfig.getGraphAddressList();
int newPos = (pos.getAndIncrement()) % addresses.size();
return addresses.get(newPos);
}
/**
* execute the "USE SPACE_NAME" when session's space changed.
*
* @param nebulaSession NebulaSession
* @param resultSet execute response
*/
private void useSpace(NebulaSession nebulaSession, ResultSet resultSet)
throws IOErrorException {
if (resultSet == null) {
nebulaSession.release();
sessionList.remove(nebulaSession);
return;
}
// space has been drop, close the SessionPool
if (resultSet.getSpaceName().trim().isEmpty()) {
log.warn("space {} has been drop, close the SessionPool.", spaceName);
close();
return;
}
// re-bind the configured spaceName, if bind failed, then remove this session.
if (!spaceName.equals(resultSet.getSpaceName())) {
ResultSet switchSpaceResult = nebulaSession.execute(useSpace);
if (!switchSpaceResult.isSucceeded()) {
log.warn("Bind Space failed, {}", switchSpaceResult.getErrorMessage());
nebulaSession.release();
sessionList.remove(nebulaSession);
return;
}
}
releaseSession(nebulaSession);
}
/**
* execute the "USE SPACE_NAME" when session's space changed for Json interface
*
* @param nebulaSession NebulaSession
* @param result execute response
*/
private void useSpaceForJson(NebulaSession nebulaSession, String result)
throws IOErrorException {
String responseSpaceName =
(String) JSON.parseObject(result).getJSONArray("results")
.getJSONObject(0).get("spaceName");
if (!spaceName.equals(responseSpaceName)) {
nebulaSession.execute(useSpace);
}
releaseSession(nebulaSession);
}
private boolean isSessionError(ResultSet resultSet) {
return resultSet != null
&& (resultSet.getErrorCode() == ErrorCode.E_SESSION_INVALID.getValue()
|| resultSet.getErrorCode() == ErrorCode.E_SESSION_NOT_FOUND.getValue()
|| resultSet.getErrorCode() == ErrorCode.E_SESSION_TIMEOUT.getValue());
}
private boolean isSessionErrorForJson(String result) {
if (result == null) {
return true;
}
int code = JSON.parseObject(result).getJSONArray("errors")
.getJSONObject(0).getIntValue("code");
return code == ErrorCode.E_SESSION_INVALID.getValue()
|| code == ErrorCode.E_SESSION_NOT_FOUND.getValue()
|| code == ErrorCode.E_SESSION_TIMEOUT.getValue();
}
private void checkSessionPool() {
if (!hasInit.get()) {
throw new RuntimeException("The SessionPool has not been initialized, "
+ "please call init() first.");
}
if (isClosed.get()) {
throw new RuntimeException("The SessionPool has been closed.");
}
}
private void stmtCheck(String stmt) {
if (stmt == null || stmt.trim().isEmpty()) {
throw new IllegalArgumentException("statement is null.");
}
if (stmt.trim().toLowerCase().startsWith("use") && stmt.trim().split(" ").length == 2) {
throw new IllegalArgumentException("`USE SPACE` alone is forbidden.");
}
}
}