Skip to content

Commit

Permalink
support to convert internal storage address to available address for …
Browse files Browse the repository at this point in the history
…client (#604)
  • Loading branch information
Nicole00 committed Jul 8, 2024
1 parent ace391f commit bd8129e
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 18 deletions.
86 changes: 68 additions & 18 deletions client/src/main/java/com/vesoft/nebula/client/meta/MetaManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.vesoft.nebula.meta.IdName;
import com.vesoft.nebula.meta.SpaceItem;
import com.vesoft.nebula.meta.TagItem;
import com.vesoft.nebula.util.NetUtil;
import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand All @@ -24,7 +25,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,25 +36,26 @@
*/
public class MetaManager implements MetaCache, Serializable {
private class SpaceInfo {
private SpaceItem spaceItem = null;
private Map<String, TagItem> tagItems = new HashMap<>();
private Map<Integer, String> tagIdNames = new HashMap<>();
private Map<String, EdgeItem> edgeItems = new HashMap<>();
private Map<Integer, String> edgeTypeNames = new HashMap<>();
private Map<Integer, List<HostAddr>> partsAlloc = new HashMap<>();
private SpaceItem spaceItem = null;
private Map<String, TagItem> tagItems = new HashMap<>();
private Map<Integer, String> tagIdNames = new HashMap<>();
private Map<String, EdgeItem> edgeItems = new HashMap<>();
private Map<Integer, String> edgeTypeNames = new HashMap<>();
private Map<Integer, List<HostAddr>> partsAlloc = new HashMap<>();
}

private Map<String, MetaManager.SpaceInfo> spacesInfo = new HashMap<>();
private Map<String, Map<Integer, HostAddr>> partLeaders = null;
private Map<String, MetaManager.SpaceInfo> spacesInfo = new HashMap<>();
private Map<String, Map<Integer, HostAddr>> partLeaders = null;
private Map<HostAddr, HostAddr> storageAddressMapping = new ConcurrentHashMap<>();

private static final Logger LOGGER = LoggerFactory.getLogger(MetaManager.class);

private MetaClient metaClient;
private MetaClient metaClient;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_CONNECTION_RETRY_SIZE = 3;
private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;
private static final int DEFAULT_EXECUTION_RETRY_SIZE = 3;

/**
* init the meta info cache
Expand All @@ -70,11 +74,37 @@ public MetaManager(List<HostAddress> address, int timeout, int connectionRetry,
int executionRetry, boolean enableSSL, SSLParam sslParam)
throws TException, ClientServerIncompatibleException, UnknownHostException {
metaClient = new MetaClient(address, timeout, connectionRetry, executionRetry, enableSSL,
sslParam);
sslParam);
metaClient.connect();
fillMetaInfo();
}

/**
* Add address mapping for storage.Used for change address of storage read from meta server.
*
* @param sourceAddr ip:port
* @param targetAddr ip:port
*/
public void addStorageAddrMapping(String sourceAddr, String targetAddr) {
if (sourceAddr != null && targetAddr != null) {
storageAddressMapping.put(NetUtil.parseHostAddr(sourceAddr),
NetUtil.parseHostAddr(targetAddr));
}
}

/**
* Add address mapping for storage.Used for change address of storage read from meta server.
*
* @param addressMap sourceAddr(ip:port) => targetAddr(ip:port)
*/
public void addStorageAddrMapping(Map<String, String> addressMap) {
if (addressMap != null && !addressMap.isEmpty()) {
for (Map.Entry<String, String> et : addressMap.entrySet()) {
storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()),
NetUtil.parseHostAddr(et.getValue()));
}
}
}

/**
* close meta client
Expand All @@ -90,10 +120,10 @@ public void close() {
private void fillMetaInfo() {
try {
Map<String, MetaManager.SpaceInfo> tempSpacesInfo = new HashMap<>();
List<IdName> spaces = metaClient.getSpaces();
List<IdName> spaces = metaClient.getSpaces();
for (IdName space : spaces) {
SpaceInfo spaceInfo = new SpaceInfo();
String spaceName = new String(space.name);
String spaceName = new String(space.name);
SpaceItem spaceItem = metaClient.getSpace(spaceName);
spaceInfo.spaceItem = spaceItem;
List<TagItem> tags = metaClient.getTags(spaceName);
Expand Down Expand Up @@ -129,10 +159,13 @@ private void fillMetaInfo() {
for (int partId : spacesInfo.get(spaceName).partsAlloc.keySet()) {
if (spacesInfo.get(spaceName).partsAlloc.get(partId).size() < 1) {
LOGGER.error("space {} part {} has not allocation host.",
spaceName, partId);
spaceName, partId);
} else {
partLeaders.get(spaceName).put(partId,
spacesInfo.get(spaceName).partsAlloc.get(partId).get(0));
spacesInfo
.get(spaceName)
.partsAlloc
.get(partId).get(0));
}

}
Expand Down Expand Up @@ -280,7 +313,8 @@ public HostAddr getLeader(String spaceName, int part) throws IllegalArgumentExce
if (!partLeaders.get(spaceName).containsKey(part)) {
throw new IllegalArgumentException("PartId:" + part + " does not exist.");
}
return partLeaders.get(spaceName).get(part);
HostAddr hostAddr = partLeaders.get(spaceName).get(part);
return storageAddressMapping.getOrDefault(hostAddr, hostAddr);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -313,7 +347,17 @@ public Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
if (!spacesInfo.containsKey(spaceName)) {
throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
}
return spacesInfo.get(spaceName).partsAlloc;
Map<Integer, List<HostAddr>> partsAlloc = spacesInfo.get(spaceName).partsAlloc;
if (!storageAddressMapping.isEmpty()) {
// transform real address to special address by mapping
partsAlloc.keySet().forEach(partId -> {
partsAlloc.computeIfPresent(partId, (k, addressList) -> addressList
.stream()
.map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
.collect(Collectors.toList()));
});
}
return partsAlloc;
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -355,6 +399,12 @@ public Set<HostAddr> listHosts() {
if (hosts == null) {
return new HashSet<>();
}
if (!storageAddressMapping.isEmpty()) {
hosts = hosts
.stream()
.map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
.collect(Collectors.toSet());
}
return hosts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,6 +45,8 @@ public class StorageClient implements Serializable {
private String user = null;
private String password = null;

private Map<String, String> storageAddressMapping = null;

/**
* Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with
* one server host.
Expand Down Expand Up @@ -108,6 +111,7 @@ public boolean connect() throws Exception {
pool = new StorageConnPool(config);
metaManager = new MetaManager(addresses, timeout, connectionRetry, executionRetry,
enableSSL, sslParam);
metaManager.addStorageAddrMapping(storageAddressMapping);
return true;
}

Expand All @@ -122,6 +126,24 @@ public StorageClient setPassword(String password) {
}


/**
* The storage address translation relationship is set to convert the storage address
* that cannot be obtained by requesting the meta service
*
* @param storageAddressMapping sourceAddressFromMeta -> targetAddress,Format ip:port.
* eg: 127.0.0.1:9559 -> 10.1.1.2:9559,
* Translates the storage 127.0.0.1:9559 address obtained from the
* meta server to 10.1.1.2:9559. It will use 10.1.1.2:9559 to
* request storage. Instead of 27.0.0.1:9559
*/
public void setStorageAddressMapping(Map<String, String> storageAddressMapping) {
this.storageAddressMapping = storageAddressMapping;
if (this.metaManager != null) {
this.metaManager.addStorageAddrMapping(storageAddressMapping);
}
}


/**
* scan vertex of all parts with specific return cols, if returnCols is an empty list, then
* return all the columns of specific tagName.
Expand Down
25 changes: 25 additions & 0 deletions client/src/main/java/com/vesoft/nebula/util/NetUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright (c) 2024 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.util;

import com.vesoft.nebula.HostAddr;

/**
* The util of network
*
* @Author jiangyiwang-jk
* @Date 2024/2/1 15:36
*/
public class NetUtil {

public static HostAddr parseHostAddr(String hostAddress) {
assert hostAddress != null : "Host address should not be null";
String[] hostPort = hostAddress.split(":");
assert hostPort.length == 2 : String.format("Invalid host address %s", hostAddress);
return new HostAddr(hostPort[0].trim(), Integer.parseInt(hostPort[1].trim()));
}

}

0 comments on commit bd8129e

Please sign in to comment.