Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create 2.plato_doc.md #1343

Merged
merged 1 commit into from
May 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions translation/2.plato_doc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# Using the High-Performance Graph Computing System 'Plato' with Nebula Graph

<!-- Please change this pic to the English version.
![高性能图计算系统 Plato 在 Nebula Graph 中的实践](https://www-cdn.nebula-graph.com.cn/nebula-blog/kv-seperation-00.jpeg) -->

## 1. Introduction to Graph Computation

### 1.1 Graph Database vs. Graph Computation

Graph databases are particularly suitable for OLTP, emphasizing CRUD operations. The amount of data per query is small. While Graph computation is OLAP-oriented for bulk data analysis.

### 1.2 Distributed Graph Computation Architecture

There are standalone and distributed graph computation systems.

One advantage of standalone graph computation systems is simplicity. There is no need to consider distributed communication or to perform graph cuts. However, it is constrained by the resources of the standalone system to perform larger-scale graph data analysis.

Distributed graph computation systems distribute graph data across different nodes to enable you to process larger-scale graph data, but inevitably introduce the overhead problem of distributed communication.

### 1.3 Graph Cuts

There are two main methods to cut graphs Edge Cut and Vertex Cut.

![](https://www-cdn.nebula-graph.com.cn/nebula-blog/graph-edge-cut.png)

Edge Cut: Data of each vertex are stored on one node, but some edges are cut to different nodes.

As shown in figure (a), the data of vertex A is stored only on node 1, and the data of vertex B is stored only on node 2. For edge AB, it is stored on node 1 and node 2. Since vertices A and B are distributed on different nodes, there is a communication overhead during the iterative computation.

Vertex Cut: Each edge is stored on one node, but some vertices may be cut to multiple nodes.

As shown in figure (b), edge AB is stored on node 1, edge BC is stored on node 2, and edge CD is stored on node 3. The data of vertex B are distributed to nodes 1 and 2, and the data of vertex C are distributed to nodes 2 and 3. Since vertices are stored on multiple nodes, maintaining the consistency of vertex data also imposes a communication overhead.

### 1.4 Computational Models

Programming models are for graph computing application developers and can be divided into vertex-centric, edge/path-centric, and subgraph-centric models.

Computational models are for graph computation system developers who often face the problem that whether to use synchronous or asynchronous mode for communication and calculation. The more common computational modes are BSP(Bulk Synchronous Parallel) and GAS (General Adaptation Syndrome) models.

BSP model: The computational process of the BSP model is composed of a series of iterative steps, each of which is called superstep. The main systems that use the BSP model are Pregel, Hama, Giraph, etc.

The BSP model has both a vertical structure and a horizontal structure. The vertical structure consists of a series of supersteps. Horizontally, as shown in the following figure, each superstep is further subdivided into three ordered phases:

- Local Computation: Each processor only performs calculations on data stored in local memory.
- Process Communication: Data are exchanged among nodes.
- Barrier Synchronization: Waiting for the end of all communication actions.

![](https://www-cdn.nebula-graph.com.cn/nebula-blog/graph-computing-processor.png)

GAS Model: It was proposed in the PowerGraph system and has three phases: Gather, Apply, and Scatter.

- **Gather**: Collects information about adjacent vertices.
- **Apply**: Processes the collected information locally and updates it to vertices.
- **Scatter**: Sends new information to adjacent vertices.

## 2. Introduction to the Graph Computation System 'Gemini'

Gemini has been influential in the industry and uses technologies like CSR/CSC, push/pull, master and mirror, sparse and dense graphs, communication and computational co-working, chunk-based partitioning, NUMA-aware sub-partitioning, etc.

Gemini cuts edges to partition graph data in a chunk-based manner and supports the NUMA structure. In the partitioned data, outgoing edges are stored with CSR and incoming edges are stored with CSC. During the iterative computation, the outgoing edge neighbors are updated by a push for sparse graphs, and the incoming edge neighbors are pulled by a pull for dense graphs.

If one edge is cut, the vertex on one end of the edge is called the master, and that on the other end is called the mirror. Mirrors act like placeholders, and during pulling, the mirror vertices on each node pull the data of their incoming edge neighbor master vertices for a single computation, which is then synchronized to the master vertices through the network in the BSP model. During pushing, the master vertices on each node synchronize the data to their mirror vertices first, and then the mirror vertices update the data of their outgoing edge neighbors.

In the Process Communication phase of BSP, each node `Node_i` sends data to its next node `Node_i+1`, and the last node sends data to the first node. While each node sends data, it also receives a message from node `Node_i-1` and performs computation locally and immediately upon receiving the message. The overlapping between communications and computations can hide some communication costs and reduce the overall iteration time.

For more details, see [*Gemini: A Computation-Centric Distributed Graph Processing System*](https://www.usenix.org/conference/osdi16/technical-sessions/presentation/zhu).

## 3. Integration of Plato with Nebula Graph

### 3.1 Introduction to the Graph Computation System 'Plato'

Plato is Tencent's open-source industrial-grade graph computation system. It can run on x86 server clusters, such as Kubernetes and YARN clusters. For file systems, Plato provides multiple interfaces to support mainstream file systems, such as HDFS and Ceph.

<!-- need to change the pic below to the en version
![](https://www-cdn.nebula-graph.com.cn/nebula-blog/plato-architecture.png) -->

### 3.2 Integration with Nebula Graph

We performed secondary development on Plato to access the Nebula Graph data source.

#### 3.2.1 Nebula Graph as the Input and Output data source

Based on Plato, Nebula Graph databases are added as a new input and output data source, which allows you to read data directly from Nebula Graph for graph computation and writes the result back to Nebula Graph.

Nebula Graph storage layer provides a scan interface for partitions, which makes it easy to scan vertex and edge data in bulk.

```cpp
ScanEdgeIter scanEdgeWithPart(std::string spaceName,
int32_t partID,
std::string edgeName,
std::vector<std::string> propNames,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true);

ScanVertexIter scanVertexWithPart(std::string spaceName,
int32_t partId,
std::string tagName,
std::vector<std::string> propNames,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true);
```
We first obtain the partition distribution information in the specified graph space and assign the scan task of each partition to each node in the Plato cluster, and each node further assigns the scan tasks to each thread running on that node to achieve fast data reading in parallel. After the graph computation, the result is written to Nebula Graph through a Nebula Graph client in parallel.

#### 3.2.2 Distributed ID Encoder

Gemini and Plato require that vertex IDs are incremented continuously from 0, but most real data of vertex IDs do not meet this requirement, especially since Nebula Graph supports string type IDs from version 2.0.

Therefore, we need to convert the original IDs from integer or string types to the continuously increasing integer starting from 0. Plato internally implements a single-node version of the ID encoder, where each node in the Plato cluster redundantly stores the mapping relationships of all IDs. When the number of vertices is large, each node requires hundreds of GB of memory just to store the ID mapping table, so we need to implement a distributed ID mapper that slices the ID mapping relationships into multiple replicas to store them separately.

We hash the original IDs across different nodes and globally assign the continuously increasing IDs starting from 0 in parallel. After the ID mapping relationships are generated, the ID mapping table is stored partly on each node. The source and destination vertices of edges are hashed then. The hashed data are sent to the corresponding node for encoding, and the final data is the data for computation. When the computation is finished, data are mapped back to the business IDs, and the process is similar to the above.

#### 3.2.3 Supplementary Algorithms

We added SSSP, APSP, Jaccard Similarity, and Triangle Count algorithms based on open-source Plato, and each algorithm supports input and output to the Nebula Graph data source. The currently supported algorithms are as follows:

| File Name | Algorithm Name | Category |
| :----- | ------ |------ |
| apsp.cc | All Pair Shortest Path | Path |
| sssp.cc | Single Source Shortest Path | Path |
| tree_stat.cc | Tree depth/width | Graph feature |
| nstepdegrees.cc | nth degree | Graph feature |
| hyperanf.cc | It calculates graph average distance. | Graph feature |
| triangle_count.cc | It counts the number of triangles. | Graph feature |
| kcore.cc | - | k-Cores |
| pagerank.cc | It is used to rank web pages. | Node centrality |
| bnc.cc | Betweenness | Node centrality |
| cnc.cc | Closeness Centrality | Node centrality |
| cgm.cc | Connected component computation | Community discovery |
| lpa.cc | Label Propagation Algorithm | Community discovery|
| hanp.cc | Hop attenuation & Node Preference | Community discovery |
| metapath_randomwalk.cc | - | Graph Representation Learning |
| node2vec_randomwalk.cc | - | Graph Representation Learning |
| fast_unfolding.cc | louvain | Clustering |
| infomap_simple.cc | - |Clustering |
| jaccard_similarity.cc | - | Similarity |
| mutual.cc | - | Others |
| torch.cc | - | Others |
| bfs.cc | Breadth-first search | Others |

## 4. Plato Deployment

### 4.1 Deploy a cluster

Plato uses MPI for inter-process communication. To deploy multiple Plato services on a cluster, you need to install Plato in the same directory or use NFS. For details, see [https://mpitutorial.com/tutorials/running-an-mpi-cluster-within-a-lan/](https://mpitutorial.com/tutorials/running-an-mpi-cluster-within-a-lan/).

### 4.2 Scripts and configurations files to run algorithms

#### scripts/run_pagerank_local.sh

```bash
#!/bin/bash

PROJECT="$(cd "$(dirname "$0")" && pwd)/.."

MAIN="./bazel-bin/example/pagerank" # process name

WNUM=3
WCORES=8

#INPUT=${INPUT:="$PROJECT/data/graph/v100_e2150_ua_c3.csv"}
INPUT=${INPUT:="nebula:${PROJECT}/scripts/nebula.conf"}
#OUTPUT=${OUTPUT:='hdfs://192.168.8.149:9000/_test/output'}
OUTPUT=${OUTPUT:="nebula:$PROJECT/scripts/nebula.conf"}
IS_DIRECTED=${IS_DIRECTED:=true} # let plato auto add reversed edge or not
NEED_ENCODE=${NEED_ENCODE:=true}
VTYPE=${VTYPE:=uint32}

ALPHA=-1
PART_BY_IN=false

EPS=${EPS:=0.0001}
DAMPING=${DAMPING:=0.8}
ITERATIONS=${ITERATIONS:=5}

export MPIRUN_CMD=${MPIRUN_CMD:="${PROJECT}/3rd/mpich-3.2.1/bin/mpiexec.hydra"}

PARAMS+=" --threads ${WCORES}"
PARAMS+=" --input ${INPUT} --output ${OUTPUT} --is_directed=${IS_DIRECTED} --need_encode=${NEED_ENCODE} --vtype=${VTYPE}"
PARAMS+=" --iterations ${ITERATIONS} --eps ${EPS} --damping ${DAMPING}"

# env for JAVA && HADOOP
export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}

# env for hadoop
export CLASSPATH=${HADOOP_HOME}/etc/hadoop:`find ${HADOOP_HOME}/share/hadoop/ | awk '{path=path":"$0}END{print path}'`
export LD_LIBRARY_PATH="${HADOOP_HOME}/lib/native":${LD_LIBRARY_PATH}

chmod 777 ./${MAIN}
${MPIRUN_CMD} -n ${WNUM} -f ${PROJECT}/scripts/cluster ./${MAIN} ${PARAMS}
exit $?
```

Parameter Description

- The `INPUT` and `OUTPUT` parameters specify the input and output data sources of the algorithm. Currently, local CSV files, HDFS files, and Nebula Graph are supported as the data source. When the input and output data source is Nebula Graph, the value of `INPUT` and `OUTPUT` parameters take the form `nebula:/path/to/ nebula.conf`.

- `WNUM` specifies the sum of the number of processes running on all nodes in the cluster. It is recommended to be the number of nodes in the cluster or the number of nodes in the NUMA architecture.

#### scripts/nebula.conf

```
## Read/Write
--retry=3 # The number of retries connecting to Nebula Graph.
--space=sf30 # The name of the graph space that can be read from and written to.

## Read from Nebula Graph
--meta_server_addrs=192.168.8.94:9559 # The address of the metad process in Nebula Graph.
--edge=LIKES # The name of edges to be read.
#--edge_data_field # The name of the property to be read as the weight of the edge.
--read_batch_size=10000 # The size of batch for each scan.

## Write to Nebula Graph
--graph_server_addrs=192.168.8.94:9669 # The address of the graphd process in Nebula Graph.
--user=root # The account to log into Nebula Graph.
--password=nebula # The password to log into Nebula Graph.

# Insert or Update
--mode=insert # The pattern used to write data back to Nebula Graph: Insert or Update.
--tag=pagerank # The tag name written back to Nebula Graph.
--prop=pr # The property name corresponding to the tag that is written back to Nebula Graph.
--type=double # The property type corresponding to the tag written back to Nebula Graph.
--write_batch_size=1000 # The size of back per write.
--err_file=/home/plato/err.txt # The file path where the data failed to be written back to Nebula Graph is stored.
```

#### scripts/cluster

The `cluster` file specifies the IPs of cluster nodes where the algorithm runs.
```
192.168.15.3
192.168.15.5
192.168.15.6
```

The above is the application of Plato in Nebula Graph. The above-mentioned Plato, named Nebula Analytics in Nebula Graph, is only available for the Nebula Graph Enterprise Edition. If you are using the open-source version of Nebula Graph, you need to implement the Nebula Graph data reading and writing with Plato feature by yourself.

---