Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.alipay.remoting.Connection;
import org.slf4j.Logger;

import java.net.ConnectException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -208,6 +209,25 @@ public void checkStatus() throws Exception {
}
}

public void reConnectAndLogin(String msg) throws ObTableException {
try {
// 1. check the connection is available, force to close it
if (checkAvailable()) {
LOGGER.warn("The connection would be closed and reconnected if: " + connection.getUrl());
close();
}
// 2. reconnect
reconnect(msg);
} catch (ConnectException ex) {
// cannot connect to ob server, need refresh table location
throw new ObTableServerConnectException(ex);
} catch (ObTableServerConnectException ex) {
throw ex;
} catch (Exception ex) {
throw new ObTableConnectionStatusException("check status failed", ex);
}
}

/**
* Reconnect current connection and login
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.Credentialable;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.ObRpcResultCode;
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginRequest;
import com.alipay.oceanbase.rpc.util.ObPureCrc32C;
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
import com.alipay.oceanbase.rpc.util.TraceUtil;
Expand Down Expand Up @@ -66,8 +67,10 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
}
((Credentialable) request).setCredential(conn.getCredential());
}

if (request instanceof AbstractPayload) {
if (request instanceof ObTableLoginRequest) {
// setting sys tenant in rpc header when login
((ObTableLoginRequest) request).setTenantId(1);
} else if (request instanceof AbstractPayload) {
((AbstractPayload) request).setTenantId(conn.getTenantId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@

import java.util.Objects;

import static com.alipay.oceanbase.rpc.protocol.payload.ResultCodes.OB_DESERIALIZE_ERROR;
import static com.alipay.oceanbase.rpc.protocol.payload.ResultCodes.OB_ERR_KV_GLOBAL_INDEX_ROUTE;
import static com.alipay.oceanbase.rpc.protocol.payload.ResultCodes.*;

public class ExceptionUtil {

Expand Down Expand Up @@ -72,10 +71,13 @@ public static ObTableException convertToObTableException(String host, int port,
}

if (resultCodes.errorCode == OB_ERR_KV_GLOBAL_INDEX_ROUTE.errorCode) {
return new ObTableGlobalIndexRouteException("[" + String.valueOf(resultCodes.errorCode)
+ "]" + "[" + resultCodes.name() + "]"
+ "[" + errMsg + "]" + "[" + server + "]"
+ "[" + trace + "]", resultCodes.errorCode);
return new ObTableGlobalIndexRouteException("[" + String.valueOf(resultCodes.errorCode) + "]" + "["
+ resultCodes.name() + "]" + "[" + errMsg + "]" + "[" + server
+ "]" + "[" + trace + "]", resultCodes.errorCode);
} else if (resultCodes.errorCode == OB_TENANT_NOT_IN_SERVER.errorCode) {
return new ObTableTenantNotInServerException("[" + String.valueOf(resultCodes.errorCode) + "]" + "["
+ resultCodes.name() + "]" + "[" + errMsg + "]" + "[" + server
+ "]" + "[" + trace + "]", resultCodes.errorCode);
} else {
// [errCode][errCodeName][errMsg][server][trace]
return new ObTableException("[" + String.valueOf(resultCodes.errorCode) + "]" + "["
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*-
* #%L
* OBKV Table Client Framework
* %%
* Copyright (C) 2021 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.exception;

public class ObTableTenantNotInServerException extends ObTableException {

/*
* Ob table tenant not in server exception.
*/
public ObTableTenantNotInServerException() {
}

/*
* Ob table tenant not in server exception.
*/
public ObTableTenantNotInServerException(int errorCode) {
super(errorCode);
}

/*
* Ob table tenant not in server exception.
*/
public ObTableTenantNotInServerException(String message, int errorCode) {
super(message, errorCode);
}

/*
* Ob table tenant not in server exception.
*/
public ObTableTenantNotInServerException(String message) {
super(message);
}

/*
* Ob table tenant not in server exception.
*/
public ObTableTenantNotInServerException(String message, Throwable cause) {
super(message, cause);
}

/*
* Ob table tenant not in server exception.
*/
public ObTableTenantNotInServerException(Throwable cause) {
super(cause);
}

/*
* Is need refresh table entry.
*/
public boolean isNeedRefreshTableEntry() {
return false;
}
}
34 changes: 28 additions & 6 deletions src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
import com.alipay.oceanbase.rpc.bolt.transport.ObTableRemoting;
import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp;
import com.alipay.oceanbase.rpc.exception.ExceptionUtil;
import com.alipay.oceanbase.rpc.exception.ObTableConnectionStatusException;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.exception.ObTableServerConnectException;
import com.alipay.oceanbase.rpc.exception.*;
import com.alipay.oceanbase.rpc.batch.QueryByBatch;
import com.alipay.oceanbase.rpc.filter.ObTableFilter;
import com.alipay.oceanbase.rpc.mutation.*;
Expand Down Expand Up @@ -394,8 +391,33 @@ public ObPayload execute(final ObPayload request) throws RemotingException,
} catch (Exception ex) {
throw new ObTableConnectionStatusException("check status failed", ex);
}

return realClient.invokeSync(connection, request, obTableExecuteTimeout);
return executeWithReconnect(connection, request);
}

private ObPayload executeWithReconnect(ObTableConnection connection,
final ObPayload request) throws RemotingException, InterruptedException {
boolean needReconnect = false;
int retryTimes = 0;
ObPayload payload = null;
do {
retryTimes++;
try {
if (needReconnect) {
String msg = String.format("Receive error: tenant not in server and reconnect it, ip:{}, port:{}, tenant id:{}, retryTimes: {}",
connection.getObTable().getIp(), connection.getObTable().getPort(), connection.getTenantId(), retryTimes);
connection.reConnectAndLogin(msg);
needReconnect = false;
}
payload = realClient.invokeSync(connection, request, obTableExecuteTimeout);
} catch (ObTableException ex) {
if (ex instanceof ObTableTenantNotInServerException && retryTimes < 2) {
needReconnect = true;
} else {
throw ex;
}
}
} while (needReconnect && retryTimes < 2);
return payload;
}

private void checkObTableOperationResult(String ip, int port, Object result) {
Expand Down