Skip to content

Commit

Permalink
retry for scan for leader change (#605)
Browse files Browse the repository at this point in the history
* retry for scan for leader change

* code fmt

* set storage address mapping for scan

* fix get leader

* update the default config for allow_read_follower to false
  • Loading branch information
Nicole00 committed Jul 19, 2024
1 parent bd8129e commit fa6bcf2
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 90 deletions.
84 changes: 59 additions & 25 deletions client/src/main/java/com/vesoft/nebula/client/meta/MetaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ public class MetaClient extends AbstractMetaClient {

public static final int LATEST_SCHEMA_VERSION = -1;

private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3;
private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
private static final int RETRY_TIMES = 1;
private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
private static final int RETRY_TIMES = 1;

private boolean enableSSL = false;
private SSLParam sslParam = null;
private boolean enableSSL = false;
private SSLParam sslParam = null;

private MetaService.Client client;
private final List<HostAddress> addresses;
private MetaService.Client client;
private final List<HostAddress> addresses;

public MetaClient(String host, int port) throws UnknownHostException {
this(new HostAddress(host, port));
Expand Down Expand Up @@ -122,9 +122,9 @@ public void connect()
*/
private void doConnect()
throws TTransportException, ClientServerIncompatibleException {
Random random = new Random(System.currentTimeMillis());
int position = random.nextInt(addresses.size());
HostAddress address = addresses.get(position);
Random random = new Random(System.currentTimeMillis());
int position = random.nextInt(addresses.size());
HostAddress address = addresses.get(position);
getClient(address.getHost(), address.getPort());
}

Expand Down Expand Up @@ -192,8 +192,8 @@ public void close() {
* @return
*/
public synchronized List<IdName> getSpaces() throws TException, ExecuteFailedException {
int retry = RETRY_TIMES;
ListSpacesReq request = new ListSpacesReq();
int retry = RETRY_TIMES;
ListSpacesReq request = new ListSpacesReq();
ListSpacesResp response = null;
try {
while (retry-- >= 0) {
Expand Down Expand Up @@ -225,7 +225,7 @@ public synchronized List<IdName> getSpaces() throws TException, ExecuteFailedExc
*/
public synchronized SpaceItem getSpace(String spaceName) throws TException,
ExecuteFailedException {
int retry = RETRY_TIMES;
int retry = RETRY_TIMES;
GetSpaceReq request = new GetSpaceReq();
request.setSpace_name(spaceName.getBytes());
GetSpaceResp response = null;
Expand Down Expand Up @@ -261,8 +261,8 @@ public synchronized List<TagItem> getTags(String spaceName)
throws TException, ExecuteFailedException {
int retry = RETRY_TIMES;

int spaceID = getSpace(spaceName).space_id;
ListTagsReq request = new ListTagsReq(spaceID);
int spaceID = getSpace(spaceName).space_id;
ListTagsReq request = new ListTagsReq(spaceID);
ListTagsResp response = null;
try {
while (retry-- >= 0) {
Expand Down Expand Up @@ -296,9 +296,9 @@ public synchronized List<TagItem> getTags(String spaceName)
*/
public synchronized Schema getTag(String spaceName, String tagName)
throws TException, ExecuteFailedException {
int retry = RETRY_TIMES;
int retry = RETRY_TIMES;
GetTagReq request = new GetTagReq();
int spaceID = getSpace(spaceName).getSpace_id();
int spaceID = getSpace(spaceName).getSpace_id();
request.setSpace_id(spaceID);
request.setTag_name(tagName.getBytes());
request.setVersion(LATEST_SCHEMA_VERSION);
Expand Down Expand Up @@ -335,9 +335,9 @@ public synchronized Schema getTag(String spaceName, String tagName)
*/
public synchronized List<EdgeItem> getEdges(String spaceName)
throws TException, ExecuteFailedException {
int retry = RETRY_TIMES;
int spaceID = getSpace(spaceName).getSpace_id();
ListEdgesReq request = new ListEdgesReq(spaceID);
int retry = RETRY_TIMES;
int spaceID = getSpace(spaceName).getSpace_id();
ListEdgesReq request = new ListEdgesReq(spaceID);
ListEdgesResp response = null;
try {
while (retry-- >= 0) {
Expand Down Expand Up @@ -370,9 +370,9 @@ public synchronized List<EdgeItem> getEdges(String spaceName)
*/
public synchronized Schema getEdge(String spaceName, String edgeName)
throws TException, ExecuteFailedException {
int retry = RETRY_TIMES;
int retry = RETRY_TIMES;
GetEdgeReq request = new GetEdgeReq();
int spaceID = getSpace(spaceName).getSpace_id();
int spaceID = getSpace(spaceName).getSpace_id();
request.setSpace_id(spaceID);
request.setEdge_name(edgeName.getBytes());
request.setVersion(LATEST_SCHEMA_VERSION);
Expand Down Expand Up @@ -410,9 +410,9 @@ public synchronized Schema getEdge(String spaceName, String edgeName)
*/
public synchronized Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
throws ExecuteFailedException, TException {
int retry = RETRY_TIMES;
int retry = RETRY_TIMES;
GetPartsAllocReq request = new GetPartsAllocReq();
int spaceID = getSpace(spaceName).getSpace_id();
int spaceID = getSpace(spaceName).getSpace_id();
request.setSpace_id(spaceID);

GetPartsAllocResp response = null;
Expand Down Expand Up @@ -442,7 +442,7 @@ public synchronized Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
* get all Storaged servers
*/
public synchronized Set<HostAddr> listHosts() {
int retry = RETRY_TIMES;
int retry = RETRY_TIMES;
ListHostsReq request = new ListHostsReq();
request.setType(ListHostType.STORAGE);
ListHostsResp resp = null;
Expand Down Expand Up @@ -471,4 +471,38 @@ public synchronized Set<HostAddr> listHosts() {
}
return hostAddrs;
}

/**
* get the leader parts for all storaged address
*/
public synchronized Set<HostItem> getHostItems() {
int retry = RETRY_TIMES;
ListHostsReq request = new ListHostsReq();
request.setType(ListHostType.ALLOC);
ListHostsResp resp = null;
try {
while (retry-- >= 0) {
resp = client.listHosts(request);
if (resp.getCode() == ErrorCode.E_LEADER_CHANGED) {
freshClient(resp.getLeader());
} else {
break;
}
}
} catch (TException e) {
LOGGER.error("listHosts error", e);
return null;
}
if (resp.getCode() != ErrorCode.SUCCEEDED) {
LOGGER.error("listHosts execute failed, errorCode: " + resp.getCode());
return null;
}
Set<HostItem> hostItems = new HashSet<>();
for (HostItem hostItem : resp.hosts) {
if (hostItem.getStatus().getValue() == HostStatus.ONLINE.getValue()) {
hostItems.add(hostItem);
}
}
return hostItems;
}
}
27 changes: 12 additions & 15 deletions client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
import com.vesoft.nebula.meta.EdgeItem;
import com.vesoft.nebula.meta.HostItem;
import com.vesoft.nebula.meta.IdName;
import com.vesoft.nebula.meta.SpaceItem;
import com.vesoft.nebula.meta.TagItem;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.commons.codec.Charsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -153,21 +155,16 @@ private void fillMetaInfo() {
if (partLeaders == null) {
partLeaders = new HashMap<>();
}
for (String spaceName : spacesInfo.keySet()) {
if (!partLeaders.containsKey(spaceName)) {
partLeaders.put(spaceName, Maps.newConcurrentMap());
for (int partId : spacesInfo.get(spaceName).partsAlloc.keySet()) {
if (spacesInfo.get(spaceName).partsAlloc.get(partId).size() < 1) {
LOGGER.error("space {} part {} has not allocation host.",
spaceName, partId);
} else {
partLeaders.get(spaceName).put(partId,
spacesInfo
.get(spaceName)
.partsAlloc
.get(partId).get(0));
}

for (HostItem hostItem : metaClient.getHostItems()) {
HostAddr leader = hostItem.getHostAddr();
for (Map.Entry<byte[], List<Integer>> spaceParts
: hostItem.getLeader_parts().entrySet()) {
String space = new String(spaceParts.getKey(), Charsets.UTF_8);
if (!partLeaders.containsKey(space)) {
partLeaders.put(space, Maps.newConcurrentMap());
}
for (int part : spaceParts.getValue()) {
partLeaders.get(space).put(part, leader);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ private ScanVertexResultIterator doScanVertex(String spaceName,
.withPartSuccess(allowPartSuccess)
.withUser(user)
.withPassword(password)
.withStorageAddressMapping(storageAddressMapping)
.build();
}

Expand Down Expand Up @@ -1175,5 +1176,5 @@ private long getEdgeId(String spaceName, String edgeName) {
private static final long DEFAULT_START_TIME = 0;
private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
private static final boolean DEFAULT_ALLOW_PART_SUCCESS = false;
private static final boolean DEFAULT_ALLOW_READ_FOLLOWER = true;
private static final boolean DEFAULT_ALLOW_READ_FOLLOWER = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package com.vesoft.nebula.client.storage.scan;

import com.facebook.thrift.TException;
import com.google.common.base.Charsets;
import com.vesoft.nebula.DataSet;
import com.vesoft.nebula.ErrorCode;
import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.meta.MetaManager;
import com.vesoft.nebula.client.storage.GraphStorageConnection;
Expand All @@ -33,7 +34,7 @@ public class ScanEdgeResultIterator extends ScanResultIterator {
private static final Logger LOGGER = LoggerFactory.getLogger(ScanEdgeResultIterator.class);

private final ScanEdgeRequest request;
private ExecutorService threadPool = null;
private ExecutorService threadPool = null;

private ScanEdgeResultIterator(MetaManager metaManager,
StorageConnPool pool,
Expand All @@ -44,9 +45,10 @@ private ScanEdgeResultIterator(MetaManager metaManager,
String labelName,
boolean partSuccess,
String user,
String password) {
String password,
Map<String, String> storageAddressMapping) {
super(metaManager, pool, new PartScanQueue(partScanInfoList), addresses, spaceName,
labelName, partSuccess, user, password);
labelName, partSuccess, user, password, storageAddressMapping);
this.request = request;
}

Expand All @@ -69,13 +71,14 @@ public ScanEdgeResult next() throws Exception {
List<Exception> exceptions =
Collections.synchronizedList(new ArrayList<>(addresses.size()));
CountDownLatch countDownLatch = new CountDownLatch(addresses.size());
AtomicInteger existSuccess = new AtomicInteger(0);
AtomicInteger existSuccess = new AtomicInteger(0);

threadPool = Executors.newFixedThreadPool(addresses.size());
for (HostAddress addr : addresses) {
threadPool.submit(() -> {
HostAddress leader = addr;
ScanResponse response;
PartScanInfo partInfo = partScanQueue.getPart(addr);
PartScanInfo partInfo = partScanQueue.getPart(leader);
// no part need to scan
if (partInfo == null) {
countDownLatch.countDown();
Expand All @@ -85,8 +88,7 @@ public ScanEdgeResult next() throws Exception {

GraphStorageConnection connection;
try {
connection = pool.getStorageConnection(new HostAddress(addr.getHost(),
addr.getPort()));
connection = pool.getStorageConnection(leader);
} catch (Exception e) {
LOGGER.error("get storage client error, ", e);
exceptions.add(e);
Expand All @@ -105,12 +107,25 @@ public ScanEdgeResult next() throws Exception {
partRequest.setNeed_authenticate(true);
try {
response = connection.scanEdge(partRequest);
} catch (TException e) {
if (!response.getResult().failed_parts.isEmpty()
&& response.getResult().failed_parts.get(0).code
== ErrorCode.E_LEADER_CHANGED) {
pool.release(leader, connection);
HostAddr newLeader = response.getResult().failed_parts.get(0).leader;
HostAddr availableLeader = storageAddressMapping
.getOrDefault(newLeader, newLeader);
leader = new HostAddress(availableLeader.host, availableLeader.getPort());
connection = pool.getStorageConnection(leader);
response = connection.scanEdge(partRequest);
}
} catch (Exception e) {
LOGGER.error(String.format("Scan edgeRow failed for %s", e.getMessage()), e);
exceptions.add(e);
partScanQueue.dropPart(partInfo);
countDownLatch.countDown();
return;
} finally {
pool.release(leader, connection);
}

if (response == null) {
Expand Down Expand Up @@ -158,7 +173,7 @@ public ScanEdgeResult next() throws Exception {
if (!exceptions.isEmpty()) {
throwExceptions(exceptions);
}
boolean success = (existSuccess.get() == addresses.size());
boolean success = (existSuccess.get() == addresses.size());
List<DataSet> finalResults = success ? results : null;
return new ScanEdgeResult(finalResults, ScanStatus.ALL_SUCCESS);
}
Expand All @@ -170,16 +185,17 @@ public ScanEdgeResult next() throws Exception {
*/
public static class ScanEdgeResultBuilder {

MetaManager metaManager;
StorageConnPool pool;
Set<PartScanInfo> partScanInfoList;
List<HostAddress> addresses;
ScanEdgeRequest request;
String spaceName;
String edgeName;
boolean partSuccess = false;
String user = null;
String password = null;
MetaManager metaManager;
StorageConnPool pool;
Set<PartScanInfo> partScanInfoList;
List<HostAddress> addresses;
ScanEdgeRequest request;
String spaceName;
String edgeName;
boolean partSuccess = false;
String user = null;
String password = null;
Map<String, String> storageAddressMapping = null;

public ScanEdgeResultBuilder withMetaClient(MetaManager metaManager) {
this.metaManager = metaManager;
Expand Down Expand Up @@ -231,6 +247,12 @@ public ScanEdgeResultBuilder withPassword(String password) {
return this;
}

public ScanEdgeResultBuilder withStorageAddressMapping(
Map<String, String> storageAddressMapping) {
this.storageAddressMapping = storageAddressMapping;
return this;
}

public ScanEdgeResultIterator build() {
return new ScanEdgeResultIterator(
metaManager,
Expand All @@ -242,7 +264,8 @@ public ScanEdgeResultIterator build() {
edgeName,
partSuccess,
user,
password);
password,
storageAddressMapping);
}
}
}
Loading

0 comments on commit fa6bcf2

Please sign in to comment.