Skip to content

Commit

Permalink
Merge pull request #164 from taosdata/feat/TD-30089
Browse files Browse the repository at this point in the history
Feat/td 30089
  • Loading branch information
sheyanjie-qq committed May 28, 2024
2 parents 6af531c + b260a12 commit ae66e9c
Show file tree
Hide file tree
Showing 26 changed files with 81 additions and 378 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/taosdata/jdbc/SchemalessWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.taosdata.jdbc.ws.entity.Code;
import com.taosdata.jdbc.ws.entity.Request;
import com.taosdata.jdbc.ws.entity.Response;
import com.taosdata.jdbc.ws.schemaless.CommonResp;
import com.taosdata.jdbc.ws.entity.CommonResp;
import com.taosdata.jdbc.ws.schemaless.ConnReq;
import com.taosdata.jdbc.ws.schemaless.InsertReq;
import com.taosdata.jdbc.ws.schemaless.SchemalessAction;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/taosdata/jdbc/rs/RestfulDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ private Connection getWSConnection(String url, ConnectionParam param, Properties
byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
byteBuffer.position(8);
long id = byteBuffer.getLong();

FutureResponse remove = inFlightRequest.remove(Action.FETCH_BLOCK.getAction(), id);
if (null != remove) {
FetchBlockResp fetchBlockResp = new FetchBlockResp(id, byteBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void close() throws SQLException {
FetchReq closeReq = new FetchReq();
closeReq.setReqId(queryId);
closeReq.setId(queryId);
transport.sendWithoutRep(new Request(Action.FREE_RESULT.getAction(), closeReq));
transport.sendWithoutResponse(new Request(Action.FREE_RESULT.getAction(), closeReq));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/taosdata/jdbc/ws/TSWSPreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public int executeUpdate() throws SQLException {
}
// close
Request close = RequestFactory.generateClose(stmtId, reqId);
transport.sendWithoutRep(close);
transport.sendWithoutResponse(close);
return resp.getAffected();
}

Expand Down Expand Up @@ -728,7 +728,7 @@ public int[] executeBatch() throws SQLException {
public void close() throws SQLException {
super.close();
Request close = RequestFactory.generateClose(stmtId, reqId);
transport.sendWithoutRep(close);
transport.sendWithoutResponse(close);
}

@Override
Expand Down
63 changes: 44 additions & 19 deletions src/main/java/com/taosdata/jdbc/ws/Transport.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public class Transport implements AutoCloseable {

public static final int DEFAULT_MESSAGE_WAIT_TIMEOUT = 60_000;

public static final int TSDB_CODE_RPC_NETWORK_UNAVAIL = 0x0B;
public static final int TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED = 0x20;



private final ArrayList<WSClient> clientArr = new ArrayList<>();;
private final InFlightRequest inFlightRequest;
private long timeout;
Expand Down Expand Up @@ -130,6 +135,7 @@ public Response send(Request request) throws SQLException {
completableFuture, timeout, TimeUnit.MILLISECONDS, reqString);
try {
response = responseFuture.get();
handleErrInMasterSlaveMode(response);
} catch (InterruptedException | ExecutionException e) {
inFlightRequest.remove(request.getAction(), request.id());
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_QUERY_TIMEOUT, e.getMessage());
Expand Down Expand Up @@ -177,13 +183,21 @@ public Response send(String action, long reqId, long stmtId, long type, byte[] r
CompletableFuture<Response> responseFuture = CompletableFutureTimeout.orTimeout(completableFuture, timeout, TimeUnit.MILLISECONDS, reqString);
try {
response = responseFuture.get();
handleErrInMasterSlaveMode(response);
} catch (InterruptedException | ExecutionException e) {
inFlightRequest.remove(action, reqId);
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_QUERY_TIMEOUT, e.getMessage());
}
return response;
}

private void handleErrInMasterSlaveMode(Response response) throws InterruptedException{
if (clientArr.size() > 1 && response instanceof CommonResp){
CommonResp commonResp = (CommonResp) response;
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == commonResp.getCode() || TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED == commonResp.getCode()) {
clientArr.get(currentNodeIndex).closeBlocking();
}
}
}

public Response sendWithoutRetry(Request request) throws SQLException {
if (isClosed()){
Expand Down Expand Up @@ -218,7 +232,7 @@ public Response sendWithoutRetry(Request request) throws SQLException {
return response;
}

public void sendWithoutRep(Request request) throws SQLException {
public void sendWithoutResponse(Request request) throws SQLException {
if (isClosed()){
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED, "Websocket Not Connected Exception");
}
Expand Down Expand Up @@ -334,25 +348,36 @@ public boolean doReconnectCurNode() throws SQLException {
}

public boolean reconnectCurNode() throws SQLException {
boolean reconnected = doReconnectCurNode();
if (!reconnected){
return false;
}
for (int retryTimes = 0; retryTimes < connectionParam.getReconnectRetryCount(); retryTimes++) {
try {
boolean reconnected = clientArr.get(currentNodeIndex).reconnectBlocking();
if (reconnected) {
// send con msgs
ConnectReq connectReq = new ConnectReq();
connectReq.setReqId(ReqId.getReqID());
connectReq.setUser(connectionParam.getUser());
connectReq.setPassword(connectionParam.getPassword());
connectReq.setDb(connectionParam.getDatabase());

if (connectionParam.getConnectMode() != 0) {
connectReq.setMode(connectionParam.getConnectMode());
}

// send con msgs
ConnectReq connectReq = new ConnectReq();
connectReq.setReqId(ReqId.getReqID());
connectReq.setUser(connectionParam.getUser());
connectReq.setPassword(connectionParam.getPassword());
connectReq.setDb(connectionParam.getDatabase());
ConnectResp auth;
auth = (ConnectResp) sendWithoutRetry(new Request(Action.CONN.getAction(), connectReq));

if (connectionParam.getConnectMode() != 0){
connectReq.setMode(connectionParam.getConnectMode());
if (Code.SUCCESS.getCode() == auth.getCode()) {
return true;
} else {
clientArr.get(currentNodeIndex).closeBlocking();
log.error("reconnect failed, code: {}, msg: {}", auth.getCode(), auth.getMessage());
}
}
Thread.sleep(connectionParam.getReconnectIntervalMs());
} catch (Exception e) {
log.error("try connect remote server failed!", e);
}
}

ConnectResp auth;
auth = (ConnectResp) sendWithoutRetry(new Request(Action.CONN.getAction(), connectReq));

return Code.SUCCESS.getCode() == auth.getCode();
return false;
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/taosdata/jdbc/ws/WSConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.taosdata.jdbc.rs.RestfulDatabaseMetaData;
import com.taosdata.jdbc.utils.ReqId;
import com.taosdata.jdbc.ws.entity.*;
import com.taosdata.jdbc.ws.schemaless.CommonResp;
import com.taosdata.jdbc.ws.entity.CommonResp;
import com.taosdata.jdbc.ws.schemaless.InsertReq;
import com.taosdata.jdbc.ws.schemaless.SchemalessAction;

Expand Down
2 changes: 0 additions & 2 deletions src/main/java/com/taosdata/jdbc/ws/entity/Action.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.taosdata.jdbc.ws.entity;

import com.taosdata.jdbc.ws.schemaless.CommonResp;
import com.taosdata.jdbc.ws.stmt.entity.ExecResp;
import com.taosdata.jdbc.ws.stmt.entity.StmtResp;

Expand All @@ -14,7 +13,6 @@ public enum Action {
CONN("conn", ConnectResp.class),
QUERY("query", QueryResp.class),
FETCH("fetch", FetchResp.class),
FETCH_JSON("fetch_json", FetchJsonResp.class),
FETCH_BLOCK("fetch_block", FetchBlockResp.class),
// free_result's class is meaningless
FREE_RESULT("free_result", Response.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
package com.taosdata.jdbc.ws.schemaless;

import com.taosdata.jdbc.ws.entity.Response;
package com.taosdata.jdbc.ws.entity;

public class CommonResp extends Response {

Expand Down
20 changes: 1 addition & 19 deletions src/main/java/com/taosdata/jdbc/ws/entity/ConnectResp.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,5 @@
/**
* connection result pojo
*/
public class ConnectResp extends Response {
private int code;
private String message;

public int getCode() {
return code;
}

public void setCode(int code) {
this.code = code;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}
public class ConnectResp extends CommonResp {
}
24 changes: 0 additions & 24 deletions src/main/java/com/taosdata/jdbc/ws/entity/FetchJsonResp.java

This file was deleted.

20 changes: 1 addition & 19 deletions src/main/java/com/taosdata/jdbc/ws/entity/FetchResp.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,12 @@
/**
* fetch result pojo
*/
public class FetchResp extends Response{
private int code;
private String message;
public class FetchResp extends CommonResp{
private long id;
private boolean completed;
private Integer[] lengths;
private int rows;

public int getCode() {
return code;
}

public void setCode(int code) {
this.code = code;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public long getId() {
return id;
}
Expand Down
20 changes: 1 addition & 19 deletions src/main/java/com/taosdata/jdbc/ws/entity/QueryResp.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,8 @@
/**
* query result pojo
*/
public class QueryResp extends Response {
private int code;
public class QueryResp extends CommonResp {

public int getCode() {
return code;
}

public void setCode(int code) {
this.code = code;
}

private String message;

@JSONField(deserializeUsing = UInt64Codec.class)
private long id;
Expand All @@ -42,14 +32,6 @@ public void setCode(int code) {

private int precision;

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public long getId() {
return id;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.taosdata.jdbc.ws.schemaless;

import com.taosdata.jdbc.ws.entity.CommonResp;
import com.taosdata.jdbc.ws.entity.Response;

public enum SchemalessAction {
Expand Down
23 changes: 2 additions & 21 deletions src/main/java/com/taosdata/jdbc/ws/stmt/entity/ExecResp.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,12 @@

import com.alibaba.fastjson.annotation.JSONField;
import com.taosdata.jdbc.utils.UInt64Codec;
import com.taosdata.jdbc.ws.entity.Response;
import com.taosdata.jdbc.ws.entity.CommonResp;

public class ExecResp extends Response {
private int code;
private String message;
public class ExecResp extends CommonResp {
@JSONField(name = "stmt_id", deserializeUsing = UInt64Codec.class)
private long stmtId;
private int affected;

public int getCode() {
return code;
}

public void setCode(int code) {
this.code = code;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public long getStmtId() {
return stmtId;
}
Expand Down
23 changes: 2 additions & 21 deletions src/main/java/com/taosdata/jdbc/ws/stmt/entity/StmtResp.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,12 @@

import com.alibaba.fastjson.annotation.JSONField;
import com.taosdata.jdbc.utils.UInt64Codec;
import com.taosdata.jdbc.ws.entity.Response;
import com.taosdata.jdbc.ws.entity.CommonResp;

// init | prepare | set_table_name | set_tags | bind | add_batch
public class StmtResp extends Response {
private int code;
private String message;
public class StmtResp extends CommonResp {
@JSONField(name = "stmt_id", deserializeUsing = UInt64Codec.class)
private long stmtId;

public int getCode() {
return code;
}

public void setCode(int code) {
this.code = code;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public long getStmtId() {
return stmtId;
}
Expand Down
Loading

0 comments on commit ae66e9c

Please sign in to comment.