Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add storage address mapping transform #589

Merged
merged 3 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.facebook.thrift.transport.THeaderTransport;
import com.facebook.thrift.transport.TSocket;
import com.facebook.thrift.transport.TTransportException;
import com.google.common.base.Charsets;
import com.vesoft.nebula.ErrorCode;
import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.vesoft.nebula.meta.IdName;
import com.vesoft.nebula.meta.SpaceItem;
import com.vesoft.nebula.meta.TagItem;
import com.vesoft.nebula.util.NetUtil;
import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand All @@ -24,7 +25,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,6 +47,8 @@ private class SpaceInfo {
private Map<String, MetaManager.SpaceInfo> spacesInfo = new HashMap<>();
private Map<String, Map<Integer, HostAddr>> partLeaders = null;

private Map<HostAddr, HostAddr> storageAddressMapping = new ConcurrentHashMap<>();

private static final Logger LOGGER = LoggerFactory.getLogger(MetaManager.class);

private MetaClient metaClient;
Expand Down Expand Up @@ -75,6 +80,33 @@ public MetaManager(List<HostAddress> address, int timeout, int connectionRetry,
fillMetaInfo();
}

/**
* Add address mapping for storage.Used for change address of storage read from meta server.
*
* @param sourceAddr ip:port
* @param targetAddr ip:port
*/
public void addStorageAddrMapping(String sourceAddr, String targetAddr) {
if (sourceAddr != null && targetAddr != null) {
storageAddressMapping.put(NetUtil.parseHostAddr(sourceAddr),
NetUtil.parseHostAddr(targetAddr));
}
}

/**
* Add address mapping for storage.Used for change address of storage read from meta server.
*
* @param addressMap sourceAddr(ip:port) => targetAddr(ip:port)
*/
public void addStorageAddrMapping(Map<String, String> addressMap) {
if (addressMap != null && !addressMap.isEmpty()) {
for (Map.Entry<String, String> et : addressMap.entrySet()) {
storageAddressMapping.put(NetUtil.parseHostAddr(et.getKey()),
NetUtil.parseHostAddr(et.getValue()));
}
}
}


/**
* close meta client
Expand Down Expand Up @@ -280,7 +312,8 @@ public HostAddr getLeader(String spaceName, int part) throws IllegalArgumentExce
if (!partLeaders.get(spaceName).containsKey(part)) {
throw new IllegalArgumentException("PartId:" + part + " does not exist.");
}
return partLeaders.get(spaceName).get(part);
HostAddr hostAddr = partLeaders.get(spaceName).get(part);
return storageAddressMapping.getOrDefault(hostAddr, hostAddr);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -313,7 +346,17 @@ public Map<Integer, List<HostAddr>> getPartsAlloc(String spaceName)
if (!spacesInfo.containsKey(spaceName)) {
throw new IllegalArgumentException("Space:" + spaceName + " does not exist.");
}
return spacesInfo.get(spaceName).partsAlloc;
Map<Integer, List<HostAddr>> partsAlloc = spacesInfo.get(spaceName).partsAlloc;
if (!storageAddressMapping.isEmpty()) {
// transform real address to special address by mapping
partsAlloc.keySet().forEach(partId -> {
partsAlloc.computeIfPresent(partId, (k, addressList) -> addressList
.stream()
.map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
.collect(Collectors.toList()));
});
}
return partsAlloc;
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -355,6 +398,12 @@ public Set<HostAddr> listHosts() {
if (hosts == null) {
return new HashSet<>();
}
if (!storageAddressMapping.isEmpty()) {
hosts = hosts
.stream()
.map(hostAddr -> storageAddressMapping.getOrDefault(hostAddr, hostAddr))
.collect(Collectors.toSet());
}
return hosts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,8 +42,7 @@ public class StorageClient implements Serializable {
private boolean enableSSL = false;
private SSLParam sslParam = null;

private String user = null;
private String password = null;
private Map<String, String> storageAddressMapping;

/**
* Get a Nebula Storage client that executes the scan query to get NebulaGraph's data with
Expand Down Expand Up @@ -95,6 +95,23 @@ public StorageClient(List<HostAddress> addresses, int timeout, int connectionRet
}
}

/**
* The storage address translation relationship is set to convert the storage address
* that cannot be obtained by requesting the meta service
*
* @param storageAddressMapping sourceAddressFromMeta -> targetAddress,Format ip:port.
* eg: 127.0.0.1:9559 -> 10.1.1.2:9559,
* Translates the storage 127.0.0.1:9559 address obtained from the
* meta server to 10.1.1.2:9559. It will use 10.1.1.2:9559 to
* request storage. Instead of 27.0.0.1:9559
*/
public void setStorageAddressMapping(Map<String, String> storageAddressMapping) {
this.storageAddressMapping = storageAddressMapping;
if (this.metaManager != null) {
this.metaManager.addStorageAddrMapping(storageAddressMapping);
}
}

/**
* Connect to Nebula Storage server.
*
Expand All @@ -108,19 +125,10 @@ public boolean connect() throws Exception {
pool = new StorageConnPool(config);
metaManager = new MetaManager(addresses, timeout, connectionRetry, executionRetry,
enableSSL, sslParam);
metaManager.addStorageAddrMapping(storageAddressMapping);
return true;
}

public StorageClient setUser(String user) {
this.user = user;
return this;
}

public StorageClient setPassword(String password) {
this.password = password;
return this;
}


/**
* scan vertex of all parts with specific return cols, if returnCols is an empty list, then
Expand Down Expand Up @@ -632,8 +640,6 @@ private ScanVertexResultIterator doScanVertex(String spaceName,
.withSpaceName(spaceName)
.withTagName(tagName)
.withPartSuccess(allowPartSuccess)
.withUser(user)
.withPassword(password)
.build();
}

Expand Down Expand Up @@ -1096,8 +1102,6 @@ private ScanEdgeResultIterator doScanEdge(String spaceName,
.withSpaceName(spaceName)
.withEdgeName(edgeName)
.withPartSuccess(allowPartSuccess)
.withUser(user)
.withPassword(password)
.build();
}

Expand Down
25 changes: 25 additions & 0 deletions client/src/main/java/com/vesoft/nebula/util/NetUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.vesoft.nebula.util;

import com.vesoft.nebula.HostAddr;
import com.vesoft.nebula.client.graph.data.HostAddress;

/**
* The util of network
*
* @Author jiangyiwang-jk
* @Date 2024/2/1 15:36
*/
public class NetUtil {

private NetUtil() {
;
}

public static HostAddr parseHostAddr(String hostAddress) {
assert hostAddress != null : "Host address should not be null";
String[] hostPort = hostAddress.split(":");
assert hostPort.length == 2 : String.format("Invalid host address %s", hostAddress);
return new HostAddr(hostPort[0].trim(), Integer.parseInt(hostPort[1].trim()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.examples;

import com.vesoft.nebula.client.storage.StorageClient;
import com.vesoft.nebula.client.storage.data.EdgeTableRow;
import com.vesoft.nebula.client.storage.data.VertexRow;
import com.vesoft.nebula.client.storage.data.VertexTableRow;
import com.vesoft.nebula.client.storage.scan.ScanEdgeResult;
import com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator;
import com.vesoft.nebula.client.storage.scan.ScanVertexResult;
import com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpecialAddressStorageClientExample {
private static final Logger LOGGER =
LoggerFactory.getLogger(SpecialAddressStorageClientExample.class);

public static void main(String[] args) {
Map<String,String> storageAddressMapping = new HashMap<>();
storageAddressMapping.put("127.0.0.1:9559","10.xx.xx.xx:9559");
// input params are the metad's ip and port
StorageClient client = new StorageClient("127.0.0.1", 9559);
// set storage address mapping
client.setStorageAddressMapping(storageAddressMapping);
try {
client.connect();
} catch (Exception e) {
LOGGER.error("storage client connect error, ", e);
client.close();
System.exit(1);
}
scanVertex(client);
scanEdge(client);

client.close();
}

/**
* Vertex Person's property in Nebula Graph:
* first_name, last_name, gender, birthday
* Tom Li 男 2010
*/
public static void scanVertex(StorageClient client) {
ScanVertexResultIterator iterator = client.scanVertex(
"test",
"person",
Arrays.asList("name", "age"));

while (iterator.hasNext()) {
ScanVertexResult result = null;
try {
result = iterator.next();
} catch (Exception e) {
LOGGER.error("scan error, ", e);
client.close();
System.exit(1);
}
if (result.isEmpty()) {
continue;
}

List<VertexRow> vertexRows = result.getVertices();
for (VertexRow row : vertexRows) {
if (result.getVertex(row.getVid()) != null) {
System.out.println(result.getVertex(row.getVid()));
}
}

System.out.println("\nresult vertex table view:");
System.out.println(result.getPropNames());
List<VertexTableRow> vertexTableRows = result.getVertexTableRows();
for (VertexTableRow vertex : vertexTableRows) {
System.out.println(vertex.getValues());
}
System.out.println("\n");
}
}

/**
* Edge Friend's property in Nebula Graph:
* degree
* 1.0
*/
public static void scanEdge(StorageClient client) {
ScanEdgeResultIterator iterator = client.scanEdge(
"test",
"like",
Arrays.asList("likeness"));

while (iterator.hasNext()) {
ScanEdgeResult result = null;
try {
result = iterator.next();
} catch (Exception e) {
LOGGER.error("scan error, ", e);
client.close();
System.exit(1);
}
if (result.isEmpty()) {
continue;
}

System.out.println(result.getEdges());

System.out.println("\nresult edge table view:");
System.out.println(result.getPropNames());
List<EdgeTableRow> edgeTableRows = result.getEdgeTableRows();
for (EdgeTableRow edge : edgeTableRows) {
System.out.println(edge.getValues());
}
System.out.println("\n");
}
}
}
Loading