diff --git a/.github/workflows/ISSUE_TEMPLATE.md b/.github/workflows/ISSUE_TEMPLATE.md new file mode 100644 index 0000000..8efba33 --- /dev/null +++ b/.github/workflows/ISSUE_TEMPLATE.md @@ -0,0 +1,11 @@ +#### Expected behavior + +#### Actual behavior + +#### Steps to reproduce + +#### JVM version (e.g. `java -version`) + +#### Scala version (e.g. `scala -version`) + +#### OS version (e.g. `uname -a`) diff --git a/.github/workflows/PULL_REQUEST_TEMPLATE.md b/.github/workflows/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..bb3eb3e --- /dev/null +++ b/.github/workflows/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,11 @@ +Motivation: + +Why you're making that change and what is the problem you're trying to solve. + +Modification: + +Describe the modifications you've done. + +Result: + +Fixes #. diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml new file mode 100644 index 0000000..713c417 --- /dev/null +++ b/.github/workflows/maven.yml @@ -0,0 +1,34 @@ +# This workflow will build a Java project with Maven +# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven + +name: Java CI with Maven + +on: + push: + branches: [ master ] + pull_request: + branches: + - master + - 'v[0-9]+.*' + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + + - name: Cache the Maven packages to speed up build + uses: actions/cache@v2 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: ${{ runner.os }}-maven- + + - name: Build with Maven + run: mvn -B package diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3877c83 --- /dev/null +++ b/.gitignore @@ -0,0 +1,37 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# build target +target/ + +# IDE +.idea/ +.eclipse/ +*.iml + +spark-importer.ipr +spark-importer.iws + +# mac +.DS_Store diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..ef61765 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,4 @@ +align = more +maxColumn = 100 +docstrings = ScalaDoc +assumeStandardLibraryStripMargin = true \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..9d0c0d8 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,13 @@ +# Copyright (c) 2020 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License, +# attached with Common Clause Condition 1.0, found in the LICENSES directory. + +language: java + +jdk: + - oraclejdk11 + - openjdk8 + - openjdk11 + +install: mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true diff --git a/LICENSES/Apache-2.0.txt b/LICENSES/Apache-2.0.txt new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSES/Apache-2.0.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/LICENSES/CC-1.0.txt b/LICENSES/CC-1.0.txt new file mode 100644 index 0000000..a1ab126 --- /dev/null +++ b/LICENSES/CC-1.0.txt @@ -0,0 +1,14 @@ +"Commons Clause" License Condition v1.0 + +The Software is provided to you by the Licensor under the License, as defined below, subject to the following condition. + +Without limiting other conditions in the License, the grant of rights under the License will not include, and the License does not grant to you, the right to Sell the Software. + +For purposes of the foregoing, "Sell" means practicing any or all of the rights granted to you under the License to provide to third parties, for a fee or other considerationon (including without limitation fees for hosting or consulting/support services related to the Software), a product or service whose value derives, entirely or substantially, from the functionality of the Software. Any license notice or attribution required by the License must also include this Commons Clause License Condition notice. + +Software: Nebula Graph [Software in this repository] + +License: Apache 2.0 [https://www.apache.org/licenses/LICENSE-2.0.html] + +Licensor: vesoft inc. + diff --git a/README-CN.md b/README-CN.md new file mode 100644 index 0000000..4f1f7fb --- /dev/null +++ b/README-CN.md @@ -0,0 +1,85 @@ +# 欢迎使用 Nebula Algorithm + +nebula-algorithm 是一款基于 [GraphX](https://spark.apache.org/graphx/) 的 Spark 应用程序,提供了以下图计算算法: + + + | 算法名 |中文说明|应用场景| + |:------------------------:|:-----------:|:----:| + | PageRank | 页面排序 | 网页排序、重点节点挖掘| + | Louvain | 社区发现 | 社团挖掘、层次化聚类| + | KCore | K核 |社区发现、金融风控| + | LabelPropagation | 标签传播 |资讯传播、广告推荐、社区发现| + | ConnectedComponent | 联通分量 |社区发现、孤岛发现| + |StronglyConnectedComponent| 强联通分量 |社区发现| + | ShortestPath | 最短路径 |路径规划、网络规划| + | TriangleCount | 三角形计数 |网络结构分析| + | GraphTriangleCount |全图三角形计数|网络紧密性分析| + | BetweennessCentrality | 介数中心性 |关键节点挖掘,节点影响力计算| + | DegreeStatic | 度统计 |图结构分析| + +使用 `nebula-algorithm`,可以通过提交 `Spark` 任务的形式使用完整的算法工具对 `Nebula Graph` 数据库中的数据执行图计算,也可以通过编程形式调用`lib`库下的算法针对DataFrame执行图计算。 + +## 如何获取 + 1. 编译打包 Nebula Algorithm + ``` + $ git clone https://github.com/vesoft-inc/nebula-algorithm.git + $ cd nebula-algorithm + $ mvn clean package -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true + ``` + 编译完成后,在 `nebula-algorithm/target` 目录下会生成 `nebula-algorithm-2.0.0.jar` 。 + + 2. 在 Maven 远程仓库下载 + https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/2.0.0/ + +# 使用 Nebula Algorithm + + 使用限制:Nebula Algorithm 未自动对字符串id进行编码,因此执行图算法时,边的源点和目标点必须是整数(Nebula Space 的 vid_type可以是String类型,但数据必须是整数)。 + +* 使用方法1:直接提交 nebula-algorithm 算法包 + + * 设置配置文件 + + 关于配置项的具体说明参考[示例配置](https://github.com/vesoft-inc/nebula-algorithm/blob/master/nebula-algorithm/src/main/resources/application.conf) + + * 提交算法任务 + + ``` + ${SPARK_HOME}/bin/spark-submit --master --class com.vesoft.nebula.algorithm.Main nebula-algorithm-2.0.0.jar -p application.conf + ``` +* 使用方法2:调用 nebula-algorithm 算法接口 + + 在`nebula-algorithm`的`lib`库中提供了10中常用图计算算法,可通过编程调用的形式调用算法。 + * 在pom.xml中添加依赖 + ``` + + com.vesoft + nebula-algorithm + 2.0.0 + + ``` + * 定义算法参数调用算法(以`PageRank`为例) + ``` + val prConfig = new PRConfig(5, 1.0) + val louvainResult = PageRankAlgo.apply(spark, data, prConfig, false) + ``` + + 其他算法的调用方法见[测试示例](https://github.com/vesoft-inc/nebula-algorithm/tree/master/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib) 。 + + > 注:执行算法的DataFrame默认第一列是源点,第二列是目标点,第三列是边权重。 + +## 版本匹配 + +| Nebula Algorithm Version | Nebula Version | +|:------------------------:|:--------------:| +| 2.0.0 | 2.0.0, 2.0.1 | +| 2.1.0 | 2.0.0, 2.0.1 | +| 2.5.0 | 2.5.0 | +| 2.5-SNAPSHOT | nightly | + +## 贡献 + +Nebula Algorithm 是一个完全开源的项目,欢迎开源爱好者通过以下方式参与: + +- 前往 [Nebula Graph 论坛](https://discuss.nebula-graph.com.cn/ "点击前往“Nebula Graph 论坛") 上参与 Issue 讨论,如答疑、提供想法或者报告无法解决的问题 +- 撰写或改进文档 +- 提交优化代码 diff --git a/README.md b/README.md index 1346b0d..71dfedd 100644 --- a/README.md +++ b/README.md @@ -1 +1,95 @@ -# nebula-algorithm \ No newline at end of file +# Welcome to Nebula Algorithm + +

+
English | 中文 +

+ +nebula-algorithm is a Spark Application based on [GraphX](https://spark.apache.org/graphx/) with the following Algorithm provided for now: + + +| Name |Use Case| +|:------------------------:|:---------------:| +| PageRank | page ranking, important node digging| +| Louvain | community digging, hierarchical clustering| +| KCore | community detection, financial risk control| +| LabelPropagation | community detection, consultation propagation, advertising recommendation| +| ConnectedComponent | community detection, isolated island detection| +|StronglyConnectedComponent| community detection| +| ShortestPath | path plan, network plan| +| TriangleCount | network structure analysis| +| GraphTriangleCount | network structure and tightness analysis| +| BetweennessCentrality | important node digging, node influence calculation| +| DegreeStatic | graph structure analysis| + + +You could submit the entire spark application or invoke algorithms in `lib` library to apply graph algorithms for DataFrame. + +## Get Nebula Algorithm + 1. Build Nebula Algorithm + ``` + $ git clone https://github.com/vesoft-inc/nebula-algorithm.git + $ cd nebula-algorithm + $ mvn clean package -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true + ``` + After the above buiding process, the target file `nebula-algorithm-2.0.0.jar` will be placed under `nebula-algorithm/target`. + + 2. Download from Maven repo + + Alternatively, it could be downloaded from the following Maven repo: + + https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/2.0.0/ + +## Use Nebula Algorithm + +Limitation: Due to Nebula Algorithm will not encode string id, thus during the algorithm execution, the source and target of edges must be in Type Int (The `vid_type` in Nebula Space could be String, while data must be in Type Int). + +* Option 1: Submit nebula-algorithm package + + * Configuration + + Refer to the [configuration example](https://github.com/vesoft-inc/nebula-algorithm/blob/master/nebula-algorithm/src/main/resources/application.conf). + + * Submit Spark Application + + ``` + ${SPARK_HOME}/bin/spark-submit --master --class com.vesoft.nebula.algorithm.Main nebula-algorithm-2.0.0.jar -p application.conf + ``` + +* Option2: Call nebula-algorithm interface + + Now there are 10 algorithms provided in `lib` from `nebula-algorithm`, which could be invoked in a programming fashion as below: + + * Add dependencies in `pom.xml`. + ``` + + com.vesoft + nebula-algorithm + 2.0.0 + + ``` + * Instantiate algorithm's config, below is an example for `PageRank`. + ``` + val prConfig = new PRConfig(5, 1.0) + val louvainResult = PageRankAlgo.apply(spark, data, prConfig, false) + ``` + + For other algorithms, please refer to [test cases](https://github.com/vesoft-inc/nebula-algorithm/tree/master/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib). + + > Note: The first column of DataFrame in the application represents the source vertices, the second represents the target vertices and the third represents edges' weight. + +## Version match + +| Nebula Algorithm Version | Nebula Version | +|:------------------------:|:--------------:| +| 2.0.0 | 2.0.0, 2.0.1 | +| 2.1.0 | 2.0.0, 2.0.1 | +| 2.5.0 | 2.5.0 | +| 2.5-SNAPSHOT | nightly | + +## Contribute + +Nebula Algorithm is open source, you are more than welcomed to contribute in the following ways: + +- Discuss in the community via [the forum](https://discuss.nebula-graph.io/) or raise issues here. +- Compose or improve our documents. +- Pull Request to help improve the code itself here. diff --git a/example/.gitignore b/example/.gitignore new file mode 100644 index 0000000..84e7a6b --- /dev/null +++ b/example/.gitignore @@ -0,0 +1,36 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# build target +target/ + +# IDE +.idea/ +.eclipse/ +*.iml + +spark-importer.ipr +spark-importer.iws + +.DS_Store diff --git a/example/pom.xml b/example/pom.xml new file mode 100644 index 0000000..bc9d8aa --- /dev/null +++ b/example/pom.xml @@ -0,0 +1,164 @@ + + + + nebula-spark + com.vesoft + 2.5-SNAPSHOT + ../pom.xml + + 4.0.0 + + example + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.2.0 + + + + test-jar + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + false + + + org.apache.spark:* + org.apache.hadoop:* + org.apache.hive:* + log4j:log4j + org.apache.orc:* + xml-apis:xml-apis + javax.inject:javax.inject + org.spark-project.hive:hive-exec + stax:stax-api + org.glassfish.hk2.external:aopalliance-repackaged + + + + + *:* + + com/vesoft/tools/** + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + 2.11.12 + + -target:jvm-1.8 + + + -Xss4096K + + + + + scala-compile + + compile + + + + com/vesoft/tools/** + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + scala-test-compile + + testCompile + + + + com/vesoft/tools/** + + + + + + + + + + + org.slf4j + slf4j-log4j12 + 1.7.25 + + + org.slf4j + slf4j-api + 1.7.25 + + + + org.apache.spark + spark-core_2.11 + 2.4.4 + + + org.apache.spark + spark-sql_2.11 + 2.4.4 + + + + com.vesoft + nebula-algorithm + 2.5-SNAPSHOT + + + diff --git a/example/src/main/resources/data.csv b/example/src/main/resources/data.csv new file mode 100644 index 0000000..d4966c1 --- /dev/null +++ b/example/src/main/resources/data.csv @@ -0,0 +1,14 @@ +id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13 +1,Tom,tom,10,20,30,40,2021-01-27,2021-01-01T12:10:10,43535232,true,1.0,2.0,10:10:10 +2,Jina,Jina,11,21,31,41,2021-01-28,2021-01-02T12:10:10,43535232,false,1.1,2.1,11:10:10 +3,Tim,Tim,12,22,32,42,2021-01-29,2021-01-03T12:10:10,43535232,false,1.2,2.2,12:10:10 +4,张三,张三,13,23,33,43,2021-01-30,2021-01-04T12:10:10,43535232,true,1.3,2.3,13:10:10 +5,李四,李四,14,24,34,44,2021-02-01,2021-01-05T12:10:10,43535232,false,1.4,2.4,14:10:10 +6,王五,王五,15,25,35,45,2021-02-02,2021-01-06T12:10:10,0,false,1.5,2.5,15:10:10 +7,Jina,Jina,16,26,36,46,2021-02-03,2021-01-07T12:10:10,43535232,true,1.6,2.6,16:10:10 +8,Jina,Jina,17,27,37,47,2021-02-04,2021-01-08T12:10:10,43535232,false,1.7,2.7,17:10:10 +9,Jina,Jina,18,28,38,48,2021-02-05,2021-01-09T12:10:10,43535232,true,1.8,2.8,18:10:10 +10,Jina,Jina,19,29,39,49,2021-02-06,2021-01-10T12:10:10,43535232,false,1.9,2.9,19:10:10 +-1,Jina,Jina,20,30,40,50,2021-02-07,2021-02-11T12:10:10,43535232,false,2.0,3.0,20:10:10 +-2,Jina,Jina,21,31,41,51,2021-02-08,2021-03-12T12:10:10,43535232,false,2.1,3.1,21:10:10 +-3,Jina,Jina,22,32,42,52,2021-02-09,2021-04-13T12:10:10,43535232,false,2.2,3.2,22:10:10 diff --git a/example/src/main/resources/edge b/example/src/main/resources/edge new file mode 100644 index 0000000..0681588 --- /dev/null +++ b/example/src/main/resources/edge @@ -0,0 +1,10 @@ +{"src":12345,"dst":23456,"degree":34, "descr": "aaa","timep": "2020-01-01"} +{"src":11111,"dst":22222,"degree":33, "descr": "aaa","timep": "2020-01-01"} +{"src":11111,"dst":33333,"degree":32, "descr": "a\baa","timep": "2020-01-01"} +{"src":11111,"dst":44444,"degree":31, "descr": "aaa","timep": "2020-01-01"} +{"src":22222,"dst":55555,"degree":30, "descr": "a\naa","timep": "2020-01-01"} +{"src":33333,"dst":44444,"degree":29, "descr": "aaa","timep": "2020-01-01"} +{"src":33333,"dst":55555,"degree":28, "descr": "aa\ta","timep": "2020-01-01"} +{"src":44444,"dst":22222,"degree":27, "descr": "aaa","timep": "2020-01-01"} +{"src":44444,"dst":55555,"degree":26, "descr": "aaa","timep": "2020-01-01"} +{"src":22222,"dst":66666,"degree":25, "descr": "aaa","timep": "2020-01-01"} \ No newline at end of file diff --git a/example/src/main/resources/log4j.properties b/example/src/main/resources/log4j.properties new file mode 100644 index 0000000..913391d --- /dev/null +++ b/example/src/main/resources/log4j.properties @@ -0,0 +1,6 @@ +# Global logging configuration +log4j.rootLogger=INFO, stdout +# Console output... +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n diff --git a/example/src/main/resources/vertex b/example/src/main/resources/vertex new file mode 100644 index 0000000..f66c62f --- /dev/null +++ b/example/src/main/resources/vertex @@ -0,0 +1,10 @@ +{"id":12,"name":"Tom","age":20,"born": "2000-01-01"} +{"id":13,"name":"Bob","age":21,"born": "1999-01-02"} +{"id":14,"name":"Jane","age":22,"born": "1998-01-03"} +{"id":15,"name":"Jena","age":23,"born": "1997-01-04"} +{"id":16,"name":"Nic","age":24,"born": "1996-01-05"} +{"id":17,"name":"Mei","age":25,"born": "1995-01-06"} +{"id":18,"name":"HH","age":26,"born": "1994-01-07"} +{"id":19,"name":"Tyler","age":27,"born": "1993-01-08"} +{"id":20,"name":"Ber","age":28,"born": "1992-01-09"} +{"id":21,"name":"Mercy","age":29,"born": "1991-01-10"} \ No newline at end of file diff --git a/nebula-algorithm/pom.xml b/nebula-algorithm/pom.xml new file mode 100644 index 0000000..9f07775 --- /dev/null +++ b/nebula-algorithm/pom.xml @@ -0,0 +1,296 @@ + + + + + nebula-spark + com.vesoft + 2.5-SNAPSHOT + ../pom.xml + + 4.0.0 + + nebula-algorithm + + + 2.4.4 + 2.5-SNAPSHOT + 1.4.0 + 3.7.1 + 3.2.0 + 4.13.1 + 1.8 + 1.8 + + + + + org.apache.spark + spark-core_2.11 + ${spark.version} + + + org.apache.spark + spark-sql_2.11 + ${spark.version} + + + org.apache.spark + spark-graphx_2.11 + ${spark.version} + + + com.vesoft + nebula-spark-connector + ${nebula.version} + + + com.typesafe + config + ${config.version} + + + com.github.scopt + scopt_2.11 + ${scopt.version} + + + org.scalatest + scalatest_2.11 + ${scalatest.version} + test + + + junit + junit + ${junit.version} + test + + + + + + src/main/scala + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.8.2 + + + default-deploy + deploy + + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + 2.11.12 + + -target:jvm-1.8 + + + -Xss4096K + + + + + scala-compile + + compile + + + + com/vesoft/tools/** + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + scala-test-compile + + testCompile + + + + com/vesoft/tools/** + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + true + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.0 + + + attach-sources + + jar + + + + + + net.alchim31.maven + scala-maven-plugin + + + Scaladoc + + doc + + prepare-package + + + -nobootcp + -no-link-warnings + + + + + attach-javadocs + + doc-jar + + + + -nobootcp + -no-link-warnings + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.2.0 + + com.facebook.thrift:com.facebook.thrift.* + + + + attach-javadocs + package + + jar + + + UTF-8 + UTF-8 + + -source 8 + -Xdoclint:none + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.2.0 + + + + test-jar + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${compiler.source.version} + ${compiler.target.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + false + + + org.apache.spark:* + org.apache.hadoop:* + org.apache.hive:* + log4j:log4j + org.apache.orc:* + xml-apis:xml-apis + javax.inject:javax.inject + org.spark-project.hive:hive-exec + stax:stax-api + org.glassfish.hk2.external:aopalliance-repackaged + + + + + *:* + + com/vesoft/tools/** + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + org.scalatest + scalatest-maven-plugin + 2.0.0 + + + test + + test + + + + + + + + + + snapshots + https://oss.sonatype.org/content/repositories/snapshots/ + + + diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf new file mode 100644 index 0000000..4d2bdc2 --- /dev/null +++ b/nebula-algorithm/src/main/resources/application.conf @@ -0,0 +1,129 @@ +{ + # Spark relation config + spark: { + app: { + name: LPA + # spark.app.partitionNum + partitionNum:100 + } + master:local + } + + data: { + # data source. optional of nebula,csv,json + source: csv + # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text + sink: csv + # if your algorithm needs weight + hasWeight: false + } + + # Nebula Graph relation config + nebula: { + # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid. + read: { + # Nebula metad server address, multiple addresses are split by English comma + metaAddress: "127.0.0.1:9559" + # Nebula space + space: nb + # Nebula edge types, multiple labels means that data from multiple edges will union together + labels: ["serve"] + # Nebula edge property name for each edge type, this property will be as weight col for algorithm. + # Make sure the weightCols are corresponding to labels. + weightCols: ["start_year"] + } + + # algo result sink into Nebula. If data.sink is nebula, then this nebula.write config can be valid. + write:{ + # Nebula graphd server address, multiple addresses are split by English comma + graphAddress: "127.0.0.1:9669" + # Nebula metad server address, multiple addresses are split by English comma + metaAddress: "127.0.0.1:9559,127.0.0.1:9560" + user:root + pswd:nebula + # Nebula space name + space:nb + # Nebula tag name, the algorithm result will be write into this tag + tag:pagerank + } + } + + local: { + # algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid. + read:{ + filePath: "file:///tmp/algo_edge.csv" + # srcId column + srcId:"_c0" + # dstId column + dstId:"_c1" + # weight column + #weight: "col3" + # if csv file has header + header: false + # csv file's delimiter + delimiter:"," + } + + # algo result sink into local file. If data.sink is csv or text, then this local.write can be valid. + write:{ + resultPath:/tmp/count + } + } + + + algorithm: { + # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent, + # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount, + # betweenness, graphtriangleCount] + executeAlgo: graphtrianglecount + + # PageRank parameter + pagerank: { + maxIter: 10 + resetProb: 0.15 # default 0.15 + } + + # Louvain parameter + louvain: { + maxIter: 20 + internalIter: 10 + tol: 0.5 + } + + # connected component parameter. + connectedcomponent: { + maxIter: 20 + } + + # LabelPropagation parameter + labelpropagation: { + maxIter: 20 + } + + # ShortestPaths parameter + shortestpaths: { + # several vertices to compute the shortest path to all vertices. + landmarks: "1" + } + + # Vertex degree statistics parameter + degreestatic: {} + + # KCore parameter + kcore:{ + maxIter:10 + degree:1 + } + + # Trianglecount parameter + trianglecount:{} + + # graphTriangleCount parameter + graphtrianglecount:{} + + # Betweenness centrality parameter + betweenness:{ + maxIter:5 + } + } +} diff --git a/nebula-algorithm/src/main/resources/edge b/nebula-algorithm/src/main/resources/edge new file mode 100644 index 0000000..0681588 --- /dev/null +++ b/nebula-algorithm/src/main/resources/edge @@ -0,0 +1,10 @@ +{"src":12345,"dst":23456,"degree":34, "descr": "aaa","timep": "2020-01-01"} +{"src":11111,"dst":22222,"degree":33, "descr": "aaa","timep": "2020-01-01"} +{"src":11111,"dst":33333,"degree":32, "descr": "a\baa","timep": "2020-01-01"} +{"src":11111,"dst":44444,"degree":31, "descr": "aaa","timep": "2020-01-01"} +{"src":22222,"dst":55555,"degree":30, "descr": "a\naa","timep": "2020-01-01"} +{"src":33333,"dst":44444,"degree":29, "descr": "aaa","timep": "2020-01-01"} +{"src":33333,"dst":55555,"degree":28, "descr": "aa\ta","timep": "2020-01-01"} +{"src":44444,"dst":22222,"degree":27, "descr": "aaa","timep": "2020-01-01"} +{"src":44444,"dst":55555,"degree":26, "descr": "aaa","timep": "2020-01-01"} +{"src":22222,"dst":66666,"degree":25, "descr": "aaa","timep": "2020-01-01"} \ No newline at end of file diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala new file mode 100644 index 0000000..96cf9b3 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala @@ -0,0 +1,193 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm + +import com.vesoft.nebula.algorithm.config.Configs.Argument +import com.vesoft.nebula.algorithm.config.{ + AlgoConfig, + AlgoConstants, + BetweennessConfig, + CcConfig, + Configs, + KCoreConfig, + LPAConfig, + LouvainConfig, + PRConfig, + ShortestPathConfig, + SparkConfig +} +import com.vesoft.nebula.algorithm.lib.{ + BetweennessCentralityAlgo, + ConnectedComponentsAlgo, + DegreeStaticAlgo, + GraphTriangleCountAlgo, + KCoreAlgo, + LabelPropagationAlgo, + LouvainAlgo, + PageRankAlgo, + ShortestPathAlgo, + StronglyConnectedComponentsAlgo, + TriangleCountAlgo +} +import com.vesoft.nebula.algorithm.reader.{CsvReader, JsonReader, NebulaReader} +import com.vesoft.nebula.algorithm.writer.{CsvWriter, NebulaWriter, TextWriter} +import org.apache.commons.math3.ode.UnknownParameterException +import org.apache.log4j.Logger +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +/** + * This object is the entry of all graph algorithms. + * + * How to use this tool to run algorithm: + * 1. Configure application.conf file. + * 2. Make sure your environment has installed spark and started spark service. + * 3. Submit nebula algorithm application using this command: + * spark-submit --class com.vesoft.nebula.tools.algorithm.Main /your-jar-path/nebula-algorithm-1.1.0.jar -p /your-application.conf-path/application.conf + */ +object Main { + + private val LOGGER = Logger.getLogger(this.getClass) + + def main(args: Array[String]): Unit = { + val PROGRAM_NAME = "Nebula graphx" + val options = Configs.parser(args, PROGRAM_NAME) + val p: Argument = options match { + case Some(config) => config + case _ => + LOGGER.error("Argument parse failed") + sys.exit(-1) + } + val configs = Configs.parse(p.config) + LOGGER.info(s"configs = ${configs}") + + val algoName: String = AlgoConfig.getAlgoName(configs) + LOGGER.info(s"algoName= ${algoName}") + + val sparkConfig = SparkConfig.getSpark(configs) + val partitionNum = sparkConfig.partitionNum + + // reader + val dataSet = createDataSource(sparkConfig.spark, configs, partitionNum) + + // algorithm + val algoResult = executeAlgorithm(sparkConfig.spark, algoName, configs, dataSet) + // writer + saveAlgoResult(algoResult, configs) + + sys.exit(0) + } + + /** + * create data from datasource + * + * @param spark + * @param configs + * @return DataFrame + */ + private[this] def createDataSource(spark: SparkSession, + configs: Configs, + partitionNum: String): DataFrame = { + val dataSource = configs.dataSourceSinkEntry.source + val dataSet: Dataset[Row] = dataSource.toLowerCase match { + case "nebula" => { + val reader = new NebulaReader(spark, configs, partitionNum) + reader.read() + } + case "csv" => { + val reader = new CsvReader(spark, configs, partitionNum) + reader.read() + } + case "json" => { + val reader = new JsonReader(spark, configs, partitionNum) + reader.read() + } + } + dataSet + } + + /** + * execute algorithms + * @param spark + * @param algoName + * @param configs + * @param dataSet + * @return DataFrame + */ + private[this] def executeAlgorithm(spark: SparkSession, + algoName: String, + configs: Configs, + dataSet: DataFrame): DataFrame = { + val hasWeight = configs.dataSourceSinkEntry.hasWeight + val algoResult = { + algoName.toLowerCase match { + case "pagerank" => { + val pageRankConfig = PRConfig.getPRConfig(configs) + PageRankAlgo(spark, dataSet, pageRankConfig, hasWeight) + } + case "louvain" => { + val louvainConfig = LouvainConfig.getLouvainConfig(configs) + LouvainAlgo(spark, dataSet, louvainConfig, hasWeight) + } + case "connectedcomponent" => { + val ccConfig = CcConfig.getCcConfig(configs) + ConnectedComponentsAlgo(spark, dataSet, ccConfig, hasWeight) + } + case "labelpropagation" => { + val lpaConfig = LPAConfig.getLPAConfig(configs) + LabelPropagationAlgo(spark, dataSet, lpaConfig, hasWeight) + } + case "shortestpaths" => { + val spConfig = ShortestPathConfig.getShortestPathConfig(configs) + ShortestPathAlgo(spark, dataSet, spConfig, hasWeight) + } + case "degreestatic" => { + DegreeStaticAlgo(spark, dataSet) + } + case "kcore" => { + val kCoreConfig = KCoreConfig.getKCoreConfig(configs) + KCoreAlgo(spark, dataSet, kCoreConfig) + } + case "stronglyconnectedcomponent" => { + val ccConfig = CcConfig.getCcConfig(configs) + StronglyConnectedComponentsAlgo(spark, dataSet, ccConfig, hasWeight) + } + case "betweenness" => { + val betweennessConfig = BetweennessConfig.getBetweennessConfig(configs) + BetweennessCentralityAlgo(spark, dataSet, betweennessConfig, hasWeight) + } + case "trianglecount" => { + TriangleCountAlgo(spark, dataSet) + } + case "graphtrianglecount" => { + GraphTriangleCountAlgo(spark, dataSet) + } + case _ => throw new UnknownParameterException("unknown executeAlgo name.") + } + } + algoResult + } + + private[this] def saveAlgoResult(algoResult: DataFrame, configs: Configs): Unit = { + val dataSink = configs.dataSourceSinkEntry.sink + dataSink.toLowerCase match { + case "nebula" => { + val writer = new NebulaWriter(algoResult, configs) + writer.write() + } + case "csv" => { + val writer = new CsvWriter(algoResult, configs) + writer.write() + } + case "text" => { + val writer = new TextWriter(algoResult, configs) + writer.write() + } + case _ => throw new UnsupportedOperationException("unsupported data sink") + } + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala new file mode 100644 index 0000000..3077214 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala @@ -0,0 +1,171 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.config + +import org.apache.spark.graphx.VertexId + +case class PRConfig(maxIter: Int, resetProb: Double) + +/** + * pagerank algorithm configuration + */ +object PRConfig { + var maxIter: Int = _ + var resetProb: Double = _ + + def getPRConfig(configs: Configs): PRConfig = { + val prConfig = configs.algorithmConfig.map + + maxIter = prConfig("algorithm.pagerank.maxIter").toInt + resetProb = + if (prConfig.contains("algorithm.pagerank.resetProb")) + prConfig("algorithm.pagerank.resetProb").toDouble + else 0.15 + + PRConfig(maxIter, resetProb) + } +} + +case class LPAConfig(maxIter: Int) + +/** + * labelPropagation algorithm configuration + */ +object LPAConfig { + var maxIter: Int = _ + + def getLPAConfig(configs: Configs): LPAConfig = { + val lpaConfig = configs.algorithmConfig.map + + maxIter = lpaConfig("algorithm.labelpropagation.maxIter").toInt + LPAConfig(maxIter) + } +} + +case class CcConfig(maxIter: Int) + +/** + * ConnectedComponect algorithm configuration + */ +object CcConfig { + var maxIter: Int = _ + + def getCcConfig(configs: Configs): CcConfig = { + val ccConfig = configs.algorithmConfig.map + + maxIter = ccConfig("algorithm.connectedcomponent.maxIter").toInt + CcConfig(maxIter) + } +} + +case class ShortestPathConfig(landmarks: Seq[VertexId]) + +/** + * ConnectedComponect algorithm configuration + */ +object ShortestPathConfig { + var landmarks: Seq[Long] = _ + + def getShortestPathConfig(configs: Configs): ShortestPathConfig = { + val spConfig = configs.algorithmConfig.map + + landmarks = spConfig("algorithm.shortestpaths.landmarks").split(",").toSeq.map(_.toLong) + ShortestPathConfig(landmarks) + } +} + +case class LouvainConfig(maxIter: Int, internalIter: Int, tol: Double) + +/** + * louvain algorithm configuration + */ +object LouvainConfig { + var maxIter: Int = _ + var internalIter: Int = _ + var tol: Double = _ + + def getLouvainConfig(configs: Configs): LouvainConfig = { + val louvainConfig = configs.algorithmConfig.map + + maxIter = louvainConfig("algorithm.louvain.maxIter").toInt + internalIter = louvainConfig("algorithm.louvain.internalIter").toInt + tol = louvainConfig("algorithm.louvain.tol").toDouble + + LouvainConfig(maxIter, internalIter, tol) + } +} + +/** + * degree static + */ +case class DegreeStaticConfig(degree: Boolean, inDegree: Boolean, outDegree: Boolean) + +object DegreeStaticConfig { + var degree: Boolean = false + var inDegree: Boolean = false + var outDegree: Boolean = false + + def getDegreeStaticConfig(configs: Configs): DegreeStaticConfig = { + val degreeConfig = configs.algorithmConfig.map + degree = ConfigUtil.getOrElseBoolean(degreeConfig, "algorithm.degreestatic.degree", false) + inDegree = ConfigUtil.getOrElseBoolean(degreeConfig, "algorithm.degreestatic.indegree", false) + outDegree = ConfigUtil.getOrElseBoolean(degreeConfig, "algorithm.degreestatic.outdegree", false) + DegreeStaticConfig(degree, inDegree, outDegree) + } +} + +/** + * k-core + */ +case class KCoreConfig(maxIter: Int, degree: Int) + +object KCoreConfig { + var maxIter: Int = _ + var degree: Int = _ + + def getKCoreConfig(configs: Configs): KCoreConfig = { + val kCoreConfig = configs.algorithmConfig.map + maxIter = kCoreConfig("algorithm.kcore.maxIter").toInt + degree = kCoreConfig("algorithm.kcore.degree").toInt + KCoreConfig(maxIter, degree) + } +} + +/** + * Betweenness + */ +case class BetweennessConfig(maxIter: Int) + +object BetweennessConfig { + var maxIter: Int = _ + + def getBetweennessConfig(configs: Configs): BetweennessConfig = { + val betweennessConfig = configs.algorithmConfig.map + maxIter = betweennessConfig("algorithm.betweenness.maxIter").toInt + BetweennessConfig(maxIter) + } +} + +case class AlgoConfig(configs: Configs) + +object AlgoConfig { + def getAlgoName(configs: Configs): String = { + val algoConfig = configs.algorithmConfig.map + algoConfig("algorithm.executeAlgo") + } +} + +object ConfigUtil { + def getOrElseBoolean(config: Map[String, String], key: String, defaultValue: Boolean): Boolean = { + if (config.contains(key)) { + config(key).toBoolean + } else { + defaultValue + } + } + +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala new file mode 100644 index 0000000..addfd3d --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala @@ -0,0 +1,365 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.config + +import java.io.File +import java.nio.file.Files +import org.apache.log4j.Logger +import scala.collection.JavaConverters._ +import com.typesafe.config.{Config, ConfigFactory} +import scala.collection.mutable + +/** + * sparkConfig is used to submit spark application, such as graph algorithm + */ +object SparkConfigEntry { + def apply(config: Config): SparkConfigEntry = { + val map = mutable.Map[String, String]() + val sparkConfig = config.getObject("spark") + for (key <- sparkConfig.unwrapped().keySet().asScala) { + val sparkKey = s"spark.${key}" + if (config.getAnyRef(sparkKey).isInstanceOf[String]) { + val sparkValue = config.getString(sparkKey) + map += sparkKey -> sparkValue + } else { + for (subKey <- config.getObject(sparkKey).unwrapped().keySet().asScala) { + val key = s"${sparkKey}.${subKey}" + val sparkValue = config.getString(key) + map += key -> sparkValue + } + } + } + SparkConfigEntry(map.toMap) + } +} + +/** + * AlgorithmConfig is used to run graph algorithm + */ +object AlgorithmConfigEntry { + def apply(config: Config): AlgorithmConfigEntry = { + val map = mutable.Map[String, String]() + val algoConfig = config.getObject("algorithm") + for (key <- algoConfig.unwrapped().keySet().asScala) { + val algorithmKey = s"algorithm.${key}" + if (config.getAnyRef(algorithmKey).isInstanceOf[String]) { + val algorithmValue = config.getString(algorithmKey) + map += algorithmKey -> algorithmValue + } else { + for (subkey <- config.getObject(algorithmKey).unwrapped().keySet().asScala) { + val key = s"${algorithmKey}.${subkey}" + val value = config.getString(key) + map += key -> value + } + } + } + AlgorithmConfigEntry(map.toMap) + } +} + +/** DataSourceEntry is used to determine the data source , nebula or local */ +object DataSourceSinkEntry { + def apply(config: Config): DataSourceSinkEntry = { + val dataSource = config.getString("data.source") + val dataSink = config.getString("data.sink") + val hasWeight = if (config.hasPath("data.hasWeight")) { + config.getBoolean("data.hasWeight") + } else false + DataSourceSinkEntry(dataSource, dataSink, hasWeight) + } +} + +/** + * NebulaConfig is used to read edge data + */ +object NebulaConfigEntry { + def apply(config: Config): NebulaConfigEntry = { + if (!config.hasPath("nebula")) { + return NebulaConfigEntry(NebulaReadConfigEntry(), NebulaWriteConfigEntry()) + } + val nebulaConfig = config.getConfig("nebula") + + val readMetaAddress = nebulaConfig.getString("read.metaAddress") + val readSpace = nebulaConfig.getString("read.space") + val readLabels = nebulaConfig.getStringList("read.labels").asScala.toList + val readWeightCols = if (nebulaConfig.hasPath("read.weightCols")) { + nebulaConfig.getStringList("read.weightCols").asScala.toList + } else { + List() + } + val readConfigEntry = + NebulaReadConfigEntry(readMetaAddress, readSpace, readLabels, readWeightCols) + + val graphAddress = nebulaConfig.getString("write.graphAddress") + val writeMetaAddress = nebulaConfig.getString("write.metaAddress") + val user = nebulaConfig.getString("write.user") + val pswd = nebulaConfig.getString("write.pswd") + val writeSpace = nebulaConfig.getString("write.space") + val writeTag = nebulaConfig.getString("write.tag") + val writeConfigEntry = + NebulaWriteConfigEntry(graphAddress, writeMetaAddress, user, pswd, writeSpace, writeTag) + NebulaConfigEntry(readConfigEntry, writeConfigEntry) + } +} + +object LocalConfigEntry { + def apply(config: Config): LocalConfigEntry = { + + var filePath: String = "" + var src: String = "" + var dst: String = "" + var weight: String = null + var resultPath: String = null + var header: Boolean = false + var delimiter: String = "," + + if (config.hasPath("local.read.filePath")) { + filePath = config.getString("local.read.filePath") + src = config.getString("local.read.srcId") + dst = config.getString("local.read.dstId") + if (config.hasPath("local.read.weight")) { + weight = config.getString("local.read.weight") + } + if (config.hasPath("local.read.delimiter")) { + delimiter = config.getString("local.read.delimiter") + } + if (config.hasPath("local.read.header")) { + header = config.getBoolean("local.read.header") + } + } + if (config.hasPath("local.write.resultPath")) { + resultPath = config.getString("local.write.resultPath") + } + LocalConfigEntry(filePath, src, dst, weight, resultPath, header, delimiter) + } +} + +/** + * SparkConfigEntry support key-value pairs for spark session. + * + * @param map + */ +case class SparkConfigEntry(map: Map[String, String]) { + override def toString: String = { + map.toString() + } +} + +/** + * AlgorithmConfigEntry support key-value pairs for algorithms. + * + * @param map + */ +case class AlgorithmConfigEntry(map: Map[String, String]) { + override def toString: String = { + map.toString() + } +} + +/** + * DataSourceEntry + */ +case class DataSourceSinkEntry(source: String, sink: String, hasWeight: Boolean) { + override def toString: String = { + s"DataSourceEntry: {source:$source, sink:$sink, hasWeight:$hasWeight}" + } +} + +case class LocalConfigEntry(filePath: String, + srcId: String, + dstId: String, + weight: String, + resultPath: String, + header: Boolean, + delimiter: String) { + override def toString: String = { + s"LocalConfigEntry: {filePath: $filePath, srcId: $srcId, dstId: $dstId, " + + s"weight:$weight, resultPath:$resultPath, delimiter:$delimiter}" + } +} + +/** + * NebulaConfigEntry + * @param readConfigEntry config for nebula-spark-connector reader + * @param writeConfigEntry config for nebula-spark-connector writer + */ +case class NebulaConfigEntry(readConfigEntry: NebulaReadConfigEntry, + writeConfigEntry: NebulaWriteConfigEntry) { + override def toString: String = { + s"NebulaConfigEntry:{${readConfigEntry.toString}, ${writeConfigEntry.toString}" + } +} + +case class NebulaReadConfigEntry(address: String = "", + space: String = "", + labels: List[String] = List(), + weightCols: List[String] = List()) { + override def toString: String = { + s"NebulaReadConfigEntry: " + + s"{address: $address, space: $space, labels: ${labels.mkString(",")}, " + + s"weightCols: ${weightCols.mkString(",")}}" + } +} + +case class NebulaWriteConfigEntry(graphAddress: String = "", + metaAddress: String = "", + user: String = "", + pswd: String = "", + space: String = "", + tag: String = "") { + override def toString: String = { + s"NebulaWriteConfigEntry: " + + s"{graphAddress: $graphAddress, user: $user, password: $pswd, space: $space, tag: $tag}" + } +} + +/** + * Configs + */ +case class Configs(sparkConfig: SparkConfigEntry, + dataSourceSinkEntry: DataSourceSinkEntry, + nebulaConfig: NebulaConfigEntry, + localConfigEntry: LocalConfigEntry, + algorithmConfig: AlgorithmConfigEntry) + +object Configs { + private[this] val LOG = Logger.getLogger(this.getClass) + + /** + * + * @param configPath + * @return + */ + def parse(configPath: File): Configs = { + if (!Files.exists(configPath.toPath)) { + throw new IllegalArgumentException(s"${configPath} not exist") + } + + val config = ConfigFactory.parseFile(configPath) + val dataSourceEntry = DataSourceSinkEntry(config) + val localConfigEntry = LocalConfigEntry(config) + val nebulaConfigEntry = NebulaConfigEntry(config) + val sparkEntry = SparkConfigEntry(config) + val algorithmEntry = AlgorithmConfigEntry(config) + + Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, algorithmEntry) + } + + /** + * Get the config list by the path. + * + * @param config The config. + * @param path The path of the config. + * + * @return + */ + private[this] def getConfigsOrNone(config: Config, + path: String): Option[java.util.List[_ <: Config]] = { + if (config.hasPath(path)) { + Some(config.getConfigList(path)) + } else { + None + } + } + + /** + * Get the config by the path. + * + * @param config + * @param path + * + * @return + */ + def getConfigOrNone(config: Config, path: String): Option[Config] = { + if (config.hasPath(path)) { + Some(config.getConfig(path)) + } else { + None + } + } + + /** + * Get the value from config by the path. If the path not exist, return the default value. + * + * @param config The config. + * @param path The path of the config. + * @param defaultValue The default value for the path. + * + * @return + */ + private[this] def getOrElse[T](config: Config, path: String, defaultValue: T): T = { + if (config.hasPath(path)) { + config.getAnyRef(path).asInstanceOf[T] + } else { + defaultValue + } + } + + private[this] def getOptOrElse(config: Config, path: String): Option[String] = { + if (config.hasPath(path)) { + Some(config.getString(path)) + } else { + None + } + } + + /** + * Get the value from config by the path which is optional. + * If the path not exist, return the default value. + * + * @param config + * @param path + * @param defaultValue + * @tparam T + * @return + */ + private[this] def getOrElse[T](config: Option[Config], path: String, defaultValue: T): T = { + if (config.isDefined && config.get.hasPath(path)) { + config.get.getAnyRef(path).asInstanceOf[T] + } else { + defaultValue + } + } + + final case class Argument(config: File = new File("application.conf")) + + /** + * Use to parse command line arguments. + * + * @param args + * @param programName + * @return Argument + */ + def parser(args: Array[String], programName: String): Option[Argument] = { + val parser = new scopt.OptionParser[Argument](programName) { + head(programName, "1.0.0") + + opt[File]('p', "prop") + .required() + .valueName("") + .action((x, c) => c.copy(config = x)) + .text("config file") + } + parser.parse(args, Argument()) + } +} + +object AlgoConstants { + val ALGO_ID_COL: String = "_id" + val PAGERANK_RESULT_COL: String = "pagerank" + val LOUVAIN_RESULT_COL: String = "louvain" + val KCORE_RESULT_COL: String = "kcore" + val LPA_RESULT_COL: String = "lpa" + val CC_RESULT_COL: String = "cc" + val SCC_RESULT_COL: String = "scc" + val BETWEENNESS_RESULT_COL: String = "betweennedss" + val SHORTPATH_RESULT_COL: String = "shortestpath" + val DEGREE_RESULT_COL: String = "degree" + val INDEGREE_RESULT_COL: String = "inDegree" + val OUTDEGREE_RESULT_COL: String = "outDegree" + val TRIANGLECOUNT_RESULT_COL: String = "tranglecount" +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/NebulaConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/NebulaConfig.scala new file mode 100644 index 0000000..8156d78 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/NebulaConfig.scala @@ -0,0 +1,20 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.config + +object NebulaConfig { + + def getReadNebula(configs: Configs): NebulaReadConfigEntry = { + val nebulaConfigs = configs.nebulaConfig + nebulaConfigs.readConfigEntry + } + + def getWriteNebula(configs: Configs): NebulaWriteConfigEntry = { + val nebulaConfigs = configs.nebulaConfig + nebulaConfigs.writeConfigEntry + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala new file mode 100644 index 0000000..c073b84 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala @@ -0,0 +1,31 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.config + +import org.apache.spark.sql.SparkSession + +case class SparkConfig(spark: SparkSession, partitionNum: String) + +object SparkConfig { + + var spark: SparkSession = _ + + var partitionNum: String = _ + + def getSpark(configs: Configs, defaultAppName: String = "algorithm"): SparkConfig = { + val sparkConfigs = configs.sparkConfig.map + val session = SparkSession.builder + .appName(defaultAppName) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + + for (key <- sparkConfigs.keySet) { + session.config(key, sparkConfigs(key)) + } + partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0") + SparkConfig(session.getOrCreate(), partitionNum) + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BetweennessCentralityAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BetweennessCentralityAlgo.scala new file mode 100644 index 0000000..e6284e1 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/BetweennessCentralityAlgo.scala @@ -0,0 +1,375 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.{AlgoConstants, BetweennessConfig} +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.log4j.Logger +import org.apache.spark.SparkContext +import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, Pregel, VertexId} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType} + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +object BetweennessCentralityAlgo { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "BetweennessCentrality" + + /** + * run the BetweennessCentrality algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + betweennessConfig: BetweennessConfig, + hasWeight: Boolean): DataFrame = { + + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, false) + val betweennessGraph = execute(graph, betweennessConfig.maxIter, hasWeight) + + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.BETWEENNESS_RESULT_COL, DoubleType, nullable = true) + )) + val resultRDD = betweennessGraph.vertices.map(vertex => Row(vertex._1, vertex._2)) + val algoResult = spark.sqlContext.createDataFrame(resultRDD, schema) + algoResult + } + + /** + * execute betweenness centrality 算法 + */ + def execute(graph: Graph[None.type, Double], + k: Integer, + isWeighted: Boolean): Graph[Double, Double] = { + val initBCgraph = createBetweenGraph(graph, k) + //有权图计算方法 + if (isWeighted) { + val vertexBCGraph = initBCgraph.mapVertices((id, attr) => { + val a = betweennessCentralityForWeightedGraph(id, attr) + (id, a) + }) + val BCGraph = aggregateBetweennessScores(vertexBCGraph) + BCGraph + } + //无权图计算方法 + else { + val vertexBCGraph = initBCgraph.mapVertices((id, attr) => { + (id, betweennessCentralityForUnweightedGraph(id, attr)) + }) + val BCGraph = aggregateBetweennessScores(vertexBCGraph) + BCGraph + } + } + + /** + * betweennessCentralityForUnweightedGraph + * + * 对无权图计算顶点的介数中心性 + * 对每个顶点vid计算从该节点出发时其他节点的介数中心性,返回对于顶点vid而言图中节点所计算出来的介数中心性 + * + * 对图中所有节点都会计算一次全局的介数中心性,最后进行汇总 + * + * @param vid:顶点id + * @param vAttr:顶点对应的属性信息 + * + *@return List((Vid, betweennessValue)) + * + * */ + def betweennessCentralityForUnweightedGraph(vid: VertexId, + vAttr: VertexProperty): List[(VertexId, Double)] = { + //无权图的计算方法 + println("enter betweennessCentrality for vertex: " + vid) + + //对图中每个顶点做如下操作 + val S = mutable.Stack[VertexId]() //每次访问过的节点入栈 + val P = new mutable.HashMap[VertexId, ListBuffer[VertexId]]() //存储源顶点到某个顶点中间经过哪些顶点 + //如[5,[2,3]],表示源顶点到顶点5的最短路径会经过顶点2,3 + val Q = mutable.Queue[VertexId]() //BFS遍历时将顶点入队列 + val dist = new mutable.HashMap[VertexId, Double]() + val sigma = new mutable.HashMap[VertexId, Double]() + val delta = new mutable.HashMap[VertexId, Double]() + val neighborMap = getNeighborMap(vAttr.vlist, vAttr.elist) + val medBC = new ListBuffer[(VertexId, Double)]() + + for (vertex <- vAttr.vlist) { + dist.put(vertex, -1) + sigma.put(vertex, 0.0) + delta.put(vertex, 0.0) + P.put(vertex, ListBuffer[VertexId]()) + } + //对于当前节点,有特殊对待 + dist(vid) = 0.0 + sigma(vid) = 1.0 + Q.enqueue(vid) + + while (Q.nonEmpty) { + val v = Q.dequeue() + S.push(v) + for (w <- neighborMap(v)) { + if (dist(w) < 0) { //节点w未被访问过 + Q.enqueue(w) + dist(w) = dist(v) + 1 + } + if (dist(w) == dist(v) + 1) { + sigma(w) += sigma(v) + P(w).+=(v) + } + } + } + + while (S.nonEmpty) { + val w = S.pop() + for (v <- P(w)) { + delta(v) += sigma(v) / sigma(w) * (1 + delta(w)) + } + if (w != vid) + medBC.append((w, delta(w) / 2)) //一条边会被两个节点各自计算一次,所以需要对求出来的值除以2 + } + medBC.toList + } + + /** + * betweennessCentralityForWeightedGraph + * + * 有权图求介数中心性 + * + * 与无权图求介数中心性的区别在于“存储邻居节点信息”的数据结构不同 + * 有权图不是用队列,而是采用scala的优先级队列PriorityQueue,即最小堆 + * 利用最小堆维护顶点的邻居节点以及与邻居节点的边权重元组:(vw_dist, v, w)其中v是w的前驱节点,即w是v的往深处走的邻居节点 + * 当遍历完顶点v的所有邻居节点后,需要从中选择一个最近的邻居继续进行深度遍历,所以让最小堆根据wv_dist降序排列, + * 每次pop出来的即是最小边对应的顶点信息,也就是每次选出来的邻居节点都是距离最近的邻居。 + * + * @param vid:顶点id + * @param vAttr:顶点对应的属性信息 + * + *@return List((Vid, betweennessValue)) + * */ + def betweennessCentralityForWeightedGraph(vid: VertexId, + vAttr: VertexProperty): List[(VertexId, Double)] = { + println("enter betweennessCentralityForWeightedGraph function") + //对图中每个顶点做如下操作 + val S = mutable.Stack[VertexId]() //每次访问过的节点入栈 + val P = new mutable.HashMap[VertexId, ListBuffer[VertexId]]() //存储源顶点到某个顶点中间经过哪些顶点 + //如[5,[2,3]],表示源顶点到顶点5的最短路径会经过顶点2,3 + + //下面定义一个优先级队列,即最小堆,根据第一个元素进行倒排 + val ord = Ordering.by[(Double, VertexId, VertexId), Double](_._1).reverse + val Q = new mutable.PriorityQueue[(Double, VertexId, VertexId)]()(ord) //遍历时将顶点及对应的路径长度入队列 + + val dist = new mutable.HashMap[VertexId, Double]() + val sigma = new mutable.HashMap[VertexId, Double]() + val delta = new mutable.HashMap[VertexId, Double]() + val neighborMap = getNeighborMap(vAttr.vlist, vAttr.elist) + val medBC = new ListBuffer[(VertexId, Double)]() + + for (vertex <- vAttr.vlist) { + dist.put(vertex, -1) + sigma.put(vertex, 0.0) + delta.put(vertex, 0.0) + P.put(vertex, ListBuffer[VertexId]()) + } + //对于当前节点,有特殊对待 + sigma(vid) = 1.0 + val seen = new mutable.HashMap[VertexId, Double]() + seen(vid) = 0 + Q.enqueue((0.0, vid, vid)) + + //获取两个相邻节点之间的距离 + def getDist(v: VertexId, w: VertexId) = { + vAttr.elist + .filter(e => (e._1 == v && e._2 == w) || (e._2 == v && e._1 == w)) + .map(x => x._3) + .reduce(_.min(_)) + } + + while (Q.nonEmpty) { + val (d, pred, v) = Q.dequeue() + if (dist(v) > 0) { //节点v已经访问过了 + null + } else { + sigma(v) += sigma(pred) + S.push(v) + dist(v) = d + for (w <- neighborMap(v)) { + val vw_dist = d + getDist(v, w) + if (dist(w) < 0 && (!seen.contains(w) || vw_dist < seen(w))) { + seen(w) = vw_dist + Q.enqueue((vw_dist, v, w)) + sigma(w) = 0.0 + P(w) = ListBuffer[VertexId](v) + } else if (vw_dist == seen(w)) { + sigma(w) += sigma(v) + P(w).+=(v) + } + } + } + } + + while (S.nonEmpty) { + val w = S.pop() + for (v <- P(w)) { + delta(v) += sigma(v) / sigma(w) * (1 + delta(w)) + } + if (w != vid) + medBC.append((w, delta(w) / 2)) + } + medBC.toList + } + + /** + * 为每个顶点收集其邻居节点信息 + * 尝试过用收集邻居节点的api,但在计算介数中心性内部又需要每个节点都维护所有节点信息和边信息,所以采用对每个节点根据边来计算邻居节点的方式 + * + * @param vlist, elist 所有顶点信息和所有边信息 + * @return [vid, [ 邻居id, 邻居id ...] ] + * */ + def getNeighborMap( + vlist: List[VertexId], + elist: List[(VertexId, VertexId, Double)]): mutable.HashMap[VertexId, List[VertexId]] = { + val neighborList = new mutable.HashMap[VertexId, List[VertexId]]() + vlist.map(v => { + val nlist = (elist + .filter(e => (e._1 == v || e._2 == v))) + .map(e => { + if (v == e._1) e._2 + else e._1 + }) + neighborList.+=((v, nlist.distinct)) + }) + neighborList + } + + /** + * 根据原始数据构建初始图 + * + * @param sc + * @param path 原始数据所在的hdfs路径 + * @param separator 数据不同字段间的分隔符 + * @param weightCol 用于标识哪一列作为权重列,给出列号,从0开始。 + * 对权重列的限制: 权重列为-1时,表示没有权重列,即该图是无权图,默认设权重为1.0 + * 为非-1的整数时,表示图数据中的第weightCol列为权重列 + * 要求:其值小于数据中列的数目,且该权重列对应的数据必须是double数值 + * + * @return Graph + * */ + def loadInitGraph(sc: SparkContext, + path: String, + separator: String, + weightCol: Int): Graph[None.type, Double] = { + val data = sc.textFile(path) + val edges = data.map(line => { + val items = line.split(separator) + require(items.length > weightCol, + "权重列超过了图数据的字段数,图数据字段数目为 " + items.length + ", 选择的权重列为 " + weightCol) + var weightValue = 0.0 + if (weightCol == -1) { + weightValue = 1.0 + } else { + require(isNumic(items(weightCol)), "权重列必须为double数值") + weightValue = items(weightCol).toDouble + } + Edge(items(0).toLong, items(1).toLong, weightValue) + }) + Graph.fromEdges(edges, None) + } + + /** + * 工具方法,验证权重列中的值可以转为double + * */ + def isNumic(str: String): Boolean = { + var result = true + for (s <- str.replaceAll(".", "")) { + if (!s.isDigit) + result = false + } + result + } + + /** + * 构建BetweennessCentrality图,图中顶点属性维护了图中所有顶点id的列表和所有边(srcId, dstId, attr)的列表 + * + * @param initG 原始数据构造的图 + * @param k 最大迭代次数 + * @return Graph + * */ + def createBetweenGraph(initG: Graph[None.type, Double], k: Int): Graph[VertexProperty, Double] = { + val betweenG = initG + .mapTriplets[Double]({ x: EdgeTriplet[None.type, Double] => + x.attr + }) + .mapVertices((id, attr) => new VertexProperty) + .cache + //准备进入pregel前的初始化消息、vertexProgram方法、 sendMessage方法、mergeMessage方法 + val initMessage = (List[VertexId](), List[(VertexId, VertexId, Double)]()) + //将发送过来的邻居节点信息以及当前点与邻居点的边,更新到当前点的属性中 + def vertexProgram( + id: VertexId, + attr: VertexProperty, + msgSum: (List[VertexId], List[(VertexId, VertexId, Double)])): VertexProperty = { + val newAttr = new VertexProperty() + newAttr.CB = attr.CB + newAttr.vlist = (msgSum._1 ++ attr.vlist).distinct + newAttr.elist = (msgSum._2 ++ attr.elist).distinct + newAttr + } + //向邻居节点发送自身节点的id和自身与邻居点的边 + def sendMessage(edge: EdgeTriplet[VertexProperty, Double]) + : Iterator[(VertexId, (List[VertexId], List[(VertexId, VertexId, Double)]))] = Iterator( + (edge.dstId, + (edge.srcId +: edge.srcAttr.vlist, + (edge.srcId, edge.dstId, edge.attr) +: edge.srcAttr.elist)), + (edge.srcId, + (edge.dstId +: edge.dstAttr.vlist, + (edge.srcId, edge.dstId, edge.attr) +: edge.dstAttr.elist)) + ) + //合并接受到的多条消息 + def mergeMessage(a: (List[VertexId], List[(VertexId, VertexId, Double)]), + b: (List[VertexId], List[(VertexId, VertexId, Double)])) + : (List[VertexId], List[(VertexId, VertexId, Double)]) = { + ((a._1 ++ b._1).distinct, (a._2 ++ b._2).distinct) + } + + Pregel(betweenG, initMessage, k, EdgeDirection.Either)(vertexProgram, sendMessage, mergeMessage) + } + + /** + * 将每个节点分别计算出来的BC值进行统计 + * */ + def aggregateBetweennessScores( + BCgraph: Graph[(VertexId, List[(VertexId, Double)]), Double]): Graph[Double, Double] = { + //将图中顶点属性所维护的listBC信息单独提取出来 + val BCaggregate = BCgraph.vertices.flatMap { + case (v, (id, listBC)) => { + listBC.map { case (w, bc) => (w, bc) } + } + } + //对BCaggregate的信息 (w, bc)根据顶点id做汇总 + val vertexBC = BCaggregate.reduceByKey(_ + _) + val resultG = BCgraph.outerJoinVertices(vertexBC)((vid, oldAttr, vBC) => { + vBC.getOrElse(0.0) + }) + resultG + } +} + +/** + * 定义顶点的属性类 + * CB: 定义初始的betweennessCentrality + * vlist: 每个顶点需维护图中所有的顶点信息 + * elist: 每个顶点需维护图中所有的边信息 + * + * 维护所有边信息是为了在计算介数中心性的时候可以从每个顶点依次根据邻居节点走下去(空间复杂度很高,O(n2)) + * */ +class VertexProperty() extends Serializable { + var CB = 0.0 + var vlist = List[VertexId]() + var elist = List[(VertexId, VertexId, Double)]() +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ConnectedComponentsAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ConnectedComponentsAlgo.scala new file mode 100644 index 0000000..66f4e6a --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ConnectedComponentsAlgo.scala @@ -0,0 +1,59 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import com.vesoft.nebula.algorithm.config.{ + AlgoConstants, + CcConfig, + Configs, + LPAConfig, + NebulaConfig, + PRConfig, + SparkConfig +} +import org.apache.log4j.Logger +import org.apache.spark.graphx.{Graph, VertexId, VertexRDD} +import org.apache.spark.rdd.RDD +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.spark.graphx.lib.ConnectedComponents +import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +object ConnectedComponentsAlgo { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "ConnectedComponents" + + /** + * run the ConnectedComponents algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + ccConfig: CcConfig, + hasWeight: Boolean): DataFrame = { + + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) + + val ccResultRDD = execute(graph, ccConfig.maxIter) + + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.CC_RESULT_COL, LongType, nullable = true) + )) + val algoResult = spark.sqlContext + .createDataFrame(ccResultRDD, schema) + + algoResult + } + + def execute(graph: Graph[None.type, Double], maxIter: Int): RDD[Row] = { + val ccResultRDD: VertexRDD[VertexId] = ConnectedComponents.run(graph, maxIter).vertices + ccResultRDD.map(row => Row(row._1, row._2)) + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgo.scala new file mode 100644 index 0000000..1001f3c --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgo.scala @@ -0,0 +1,58 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.AlgoConstants +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.log4j.Logger +import org.apache.spark.graphx.{Graph, VertexRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +object DegreeStaticAlgo { + + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "DegreeStatic" + + /** + * run the pagerank algorithm for nebula graph + */ + def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = { + + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, false) + + val degreeResultRDD = execute(graph) + + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.DEGREE_RESULT_COL, IntegerType, nullable = true), + StructField(AlgoConstants.INDEGREE_RESULT_COL, IntegerType, nullable = true), + StructField(AlgoConstants.OUTDEGREE_RESULT_COL, IntegerType, nullable = true) + )) + val algoResult = spark.sqlContext + .createDataFrame(degreeResultRDD, schema) + + algoResult + } + + def execute(graph: Graph[None.type, Double]): RDD[Row] = { + val degreeRdd: VertexRDD[Int] = graph.degrees + val inDegreeRdd: VertexRDD[Int] = graph.inDegrees + val outDegreeRdd: VertexRDD[Int] = graph.outDegrees + + val degreeAndInDegree: VertexRDD[(Int, Int)] = + degreeRdd.leftJoin(inDegreeRdd)((id, d, inD) => (d, inD.getOrElse(0))) + + val result = degreeAndInDegree.leftJoin(outDegreeRdd)((id, dAndInD, opt) => + (dAndInD._1, dAndInD._2, opt.getOrElse(0))) + result.map(vertex => Row(vertex._1, vertex._2._1, vertex._2._2, vertex._2._3)) + } + +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala new file mode 100644 index 0000000..7d3585f --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/GraphTriangleCountAlgo.scala @@ -0,0 +1,38 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.AlgoConstants +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +/** + * compute all graph's triangle count + */ +object GraphTriangleCountAlgo { + + def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = { + + val triangleCount = TriangleCountAlgo(spark, dataset) + val count = triangleCount + .select(AlgoConstants.TRIANGLECOUNT_RESULT_COL) + .rdd + .map(value => value.get(0).asInstanceOf[Int]) + .reduce(_ + _) / 3 + val list = List(count) + val rdd = spark.sparkContext.parallelize(list).map(row => Row(row)) + + val schema = StructType( + List( + StructField("count", IntegerType, nullable = false) + )) + val algoResult = spark.sqlContext + .createDataFrame(rdd, schema) + + algoResult + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KCoreAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KCoreAlgo.scala new file mode 100644 index 0000000..421b412 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/KCoreAlgo.scala @@ -0,0 +1,74 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.{AlgoConstants, KCoreConfig} +import org.apache.log4j.Logger +import org.apache.spark.graphx.Graph +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +object KCoreAlgo { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "LabelPropagation" + + /** + * run the louvain algorithm for nebula graph + */ + def apply(spark: SparkSession, dataset: Dataset[Row], kCoreConfig: KCoreConfig): DataFrame = { + + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, false) + val kCoreGraph = execute(graph, kCoreConfig.maxIter, kCoreConfig.degree) + + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.KCORE_RESULT_COL, IntegerType, nullable = true) + )) + val resultRDD = kCoreGraph.vertices.map(vertex => Row(vertex._1, vertex._2)) + val algoResult = spark.sqlContext.createDataFrame(resultRDD, schema) + algoResult + } + + /** + * extract k-core sub-graph + */ + def execute(graph: Graph[None.type, Double], maxIter: Int, k: Int): Graph[Int, Double] = { + var lastVertexNum: Long = graph.numVertices + var currentVertexNum: Long = -1 + var isStable: Boolean = false + var iterNum: Int = 1 + + var degreeGraph = graph + .outerJoinVertices(graph.degrees) { (vid, vd, degree) => + degree.getOrElse(0) + } + .cache + var subGraph: Graph[Int, Double] = null + + while (iterNum < maxIter) { + subGraph = degreeGraph.subgraph(vpred = (vid, degree) => degree >= k) + degreeGraph = subGraph + .outerJoinVertices(subGraph.degrees) { (vid, vd, degree) => + degree.getOrElse(0) + } + .cache + + currentVertexNum = degreeGraph.numVertices + if (currentVertexNum == lastVertexNum) { + isStable = true; + } else { + lastVertexNum = currentVertexNum + } + + iterNum += 1 + } + subGraph + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LabelPropagationAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LabelPropagationAlgo.scala new file mode 100644 index 0000000..3c78fca --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LabelPropagationAlgo.scala @@ -0,0 +1,50 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import com.vesoft.nebula.algorithm.config.{AlgoConstants, LPAConfig} +import org.apache.log4j.Logger +import org.apache.spark.graphx.{Graph, VertexId, VertexRDD} +import org.apache.spark.rdd.RDD +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.spark.graphx.lib.LabelPropagation +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +object LabelPropagationAlgo { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "LabelPropagation" + + /** + * run the LabelPropagation algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + lpaConfig: LPAConfig, + hasWeight: Boolean): DataFrame = { + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) + + val lpaResultRDD = execute(graph, lpaConfig.maxIter) + + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.LPA_RESULT_COL, LongType, nullable = true) + )) + val algoResult = spark.sqlContext + .createDataFrame(lpaResultRDD, schema) + + algoResult + } + + def execute(graph: Graph[None.type, Double], maxIter: Int): RDD[Row] = { + val lpaResultRDD: VertexRDD[VertexId] = LabelPropagation.run(graph, maxIter).vertices + lpaResultRDD.map(row => Row(row._1, row._2)) + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgo.scala new file mode 100644 index 0000000..99b17de --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgo.scala @@ -0,0 +1,379 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import com.vesoft.nebula.algorithm.config.{ + AlgoConstants, + Configs, + LouvainConfig, + NebulaConfig, + SparkConfig +} +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.log4j.Logger +import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.types.{LongType, StructField, StructType} + +import scala.collection.mutable +import scala.collection.mutable.{HashMap, HashSet} + +object LouvainAlgo { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "Louvain" + + /** + * run the louvain algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + louvainConfig: LouvainConfig, + hasWeight: Boolean): DataFrame = { + + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) + + val louvainResultRDD: RDD[Row] = + execute(spark, graph, louvainConfig.maxIter, louvainConfig.internalIter, louvainConfig.tol) + + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.LOUVAIN_RESULT_COL, LongType, nullable = false) + )) + + val louvainResult = spark.sqlContext + .createDataFrame(louvainResultRDD, schema) + louvainResult + } + + def execute(spark: SparkSession, + graph: Graph[None.type, Double], + maxIter: Int, + internalIter: Int, + tol: Double): RDD[Row] = { + + val sc = spark.sparkContext + + // convert origin graph to Louvain Graph, Louvain Graph records vertex's community、innerVertices and innerDegrees + var louvainG: Graph[VertexData, Double] = LouvainGraphUtil.createLouvainGraph(graph) + + // compute and broadcast the sum of all edges + val m = sc.broadcast(louvainG.edges.map(e => e.attr).sum()) + var curIter = 0 + var res = step1(internalIter, louvainG, m.value, tol) + while (res._2 != 0 && curIter < maxIter) { + louvainG = res._1 + louvainG = step2(louvainG) + res = step1(internalIter, louvainG, m.value, tol) + curIter += 1 + } + CommUtil.getCommunities(louvainG) + } + + /** + * Louvain step1:Traverse the vertices and get the new community information of each node遍历节点,获取每个节点对应的所属新社区信息 + * + * 1. Calculate the information of the community that each vertex currently belongs to. + * 2. Calculate the community modularity deference and get the community info which has max modularity deference. + * 3. Count the number of vertices that have changed in the community, used to determine whether the internal iteration can stop. + * 4. Update vertices' community id and update each community's innerVertices. + * + * This step update vertexData's cid and commVertex. + * + * @param maxIter max interation + * @param louvainG + * @param m The sum of the weights of all edges in the graph + * + * @return (Graph[VertexData,Double],Int) + */ + def step1( + maxIter: Int, + louvainG: Graph[VertexData, Double], + m: Double, + tol: Double + ): (Graph[VertexData, Double], Int) = { + LOGGER.info("============================== step 1 =======================") + var G = louvainG + var iterTime = 0 + var canStop = false + while (iterTime < maxIter && !canStop) { + val neighborComm = getNeighCommInfo(G) + val changeInfo = getChangeInfo(G, neighborComm, m, tol) + // Count the number of vertices that have changed in the community + val changeCount = + G.vertices.zip(changeInfo).filter(x => x._1._2.cId != x._2._2).count() + if (changeCount == 0) + canStop = true + // use connectedComponents algorithm to solve the problem of community attribution delay. + else { + val newChangeInfo = Graph + .fromEdgeTuples(changeInfo.map(x => (x._1, x._2)), 0) + .connectedComponents() + .vertices + G = LouvainGraphUtil.updateGraph(G, newChangeInfo) + iterTime += 1 + } + } + (G, iterTime) + } + + /** + * Louvain step 2:Combine the new graph node obtained in the first step into a super node according to + * the community information to which it belongs. + * + * @param G graph + * @return graph + */ + def step2(G: Graph[VertexData, Double]): Graph[VertexData, Double] = { + LOGGER.info("============================== step 2 =======================") + //get edges between different communities + val edges = G.triplets + .filter(trip => trip.srcAttr.cId != trip.dstAttr.cId) + .map(trip => { + val cid1 = trip.srcAttr.cId + val cid2 = trip.dstAttr.cId + val weight = trip.attr + ((math.min(cid1, cid2), math.max(cid1, cid2)), weight) + }) + .reduceByKey(_ + _) + .map(x => Edge(x._1._1, x._1._2, x._2)) //sum the edge weights between communities + + // sum kin of all vertices within the same community + val vInnerKin = G.vertices + .map(v => (v._2.cId, (v._2.innerVertices.toSet, v._2.innerDegree))) + .reduceByKey((x, y) => { + val vertices = (x._1 ++ y._1).toSet + val kIn = x._2 + y._2 + (vertices, kIn) + }) + + // get all edge weights within the same community + val v2vKin = G.triplets + .filter(trip => trip.srcAttr.cId == trip.dstAttr.cId) + .map(trip => { + val cid = trip.srcAttr.cId + val vertices1 = trip.srcAttr.innerVertices + val vertices2 = trip.dstAttr.innerVertices + val weight = trip.attr * 2 + (cid, (vertices1.union(vertices2).toSet, weight)) + }) + .reduceByKey((x, y) => { + val vertices = new HashSet[VertexId].toSet + val kIn = x._2 + y._2 + (vertices, kIn) + }) + + // new super vertex + val superVertexInfo = vInnerKin + .union(v2vKin) + .reduceByKey((x, y) => { + val vertices = x._1 ++ y._1 + val kIn = x._2 + y._2 + (vertices, kIn) + }) + + // reconstruct graph based on new edge info + val initG = Graph.fromEdges(edges, None) + var louvainGraph = LouvainGraphUtil.createLouvainGraph(initG) + // get new louvain graph + louvainGraph = louvainGraph.outerJoinVertices(superVertexInfo)((vid, data, opt) => { + var innerVerteices = new HashSet[VertexId]() + val kIn = opt.get._2 + for (vid <- opt.get._1) + innerVerteices += vid + data.innerVertices = innerVerteices + data.innerDegree = kIn + data + }) + louvainGraph + } + + /** + * get new community's basic info after the vertex joins the community + * 1. get each vertex's community id and the community's tot. + * 2. compute each vertex's k_in. (The sum of the edge weights between vertex and vertex i in the community) + * + * @param G + */ + def getNeighCommInfo( + G: Graph[VertexData, Double] + ): RDD[(VertexId, Iterable[(Long, Double, Double)])] = { + + val commKIn = G.triplets + .flatMap(trip => { + Array( + ( + trip.srcAttr.cId, + ( + (trip.dstId -> trip.attr), + (trip.srcId, trip.srcAttr.innerDegree + trip.srcAttr.degree) + ) + ), + ( + trip.dstAttr.cId, + ( + (trip.srcId -> trip.attr), + (trip.dstId, trip.dstAttr.innerDegree + trip.dstAttr.degree) + ) + ) + ) + }) + .groupByKey() + .map(t => { + val cid = t._1 + // add the weight of the same vid in one community. + val m = new HashMap[VertexId, Double]() // store community's vertexId and vertex kin + val degrees = new HashSet[VertexId]() // record if all vertices has computed the tot + var tot = 0.0 + for (x <- t._2) { + if (m.contains(x._1._1)) + m(x._1._1) += x._1._2 + else + m(x._1._1) = x._1._2 + // compute vertex's tot + if (!degrees.contains(x._2._1)) { + tot += x._2._2 + degrees += x._2._1 + } + } + (cid, (tot, m)) + }) + + // convert commKIn + val neighCommInfo = commKIn + .flatMap(x => { + val cid = x._1 + val tot = x._2._1 + x._2._2.map(t => { + val vid = t._1 + val kIn = t._2 + (vid, (cid, kIn, tot)) + }) + }) + .groupByKey() + + neighCommInfo + } + + /** + * Calculate the influence of each vertex on the modularity change of neighbor communities, and find the most suitable community for the vertex + * △Q = [Kin - Σtot * Ki / m] + * + * @param G graph + * @param neighCommInfo neighbor community info + * @param m broadcast value + * @param tol threshold for modularity deference + * + * @return RDD + */ + def getChangeInfo(G: Graph[VertexData, Double], + neighCommInfo: RDD[(VertexId, Iterable[(Long, Double, Double)])], + m: Double, + tol: Double): RDD[(VertexId, Long, Double)] = { + val changeInfo = G.vertices + .join(neighCommInfo) + .map(x => { + val vid = x._1 + val data = x._2._1 + val commIter = x._2._2 + val vCid = data.cId + val k_v = data.degree + data.innerDegree + + val dertaQs = commIter.map(t => { + val nCid = t._1 // neighbor community id + val k_v_in = t._2 + var tot = t._3 + if (vCid == nCid) + tot -= k_v + val q = (k_v_in - tot * k_v / m) + (vid, nCid, q) + }) + val maxQ = + dertaQs.max(Ordering.by[(VertexId, Long, Double), Double](_._3)) + if (maxQ._3 > tol) + maxQ + else // if entering other communities reduces its modularity, then stays in the current community + (vid, vCid, 0.0) + }) + + changeInfo //(vid,wCid,△Q) + } + +} + +object LouvainGraphUtil { + + /** + * Construct louvain graph + * + * @param initG + * @return Graph + */ + def createLouvainGraph( + initG: Graph[None.type, Double] + ): Graph[VertexData, Double] = { + // sum of the weights of the links incident to node i + val nodeWeights: VertexRDD[Double] = initG.aggregateMessages( + trip => { + trip.sendToSrc(trip.attr) + trip.sendToDst(trip.attr) + }, + (a, b) => a + b + ) + // update graph vertex's property + val louvainG = initG.outerJoinVertices(nodeWeights)((vid, oldData, opt) => { + val vData = new VertexData(vid, vid) + val weights = opt.getOrElse(0.0) + vData.degree = weights + vData.innerVertices += vid + vData + }) + louvainG + } + + /** + * update graph using new community info + * + * @param G Louvain graph + * @param changeInfo (vid,new_cid) + * + * @return Graph[VertexData, Double] + */ + def updateGraph( + G: Graph[VertexData, Double], + changeInfo: RDD[(VertexId, Long)] + ): Graph[VertexData, Double] = { + // update community id + G.joinVertices(changeInfo)((vid, data, newCid) => { + val vData = new VertexData(vid, newCid) + vData.innerDegree = data.innerDegree + vData.innerVertices = data.innerVertices + vData.degree = data.degree + vData + }) + } +} + +object CommUtil { + // return the collections of communities + def getCommunities(G: Graph[VertexData, Double]): RDD[Row] = { + val communities = G.vertices + .map(x => { + Row(x._1, x._2.cId) + }) + communities + } +} + +class VertexData(val vId: Long, var cId: Long) extends Serializable { + var innerDegree = 0.0 + var innerVertices = new mutable.HashSet[Long]() + var degree = 0.0 +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/PageRankAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/PageRankAlgo.scala new file mode 100644 index 0000000..2bf9254 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/PageRankAlgo.scala @@ -0,0 +1,56 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.{ + AlgoConstants, + Configs, + NebulaConfig, + PRConfig, + SparkConfig +} +import org.apache.log4j.Logger +import org.apache.spark.graphx.{Graph, VertexRDD} +import org.apache.spark.rdd.RDD +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.spark.graphx.lib.PageRank +import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +object PageRankAlgo { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "PageRank" + + /** + * run the pagerank algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + pageRankConfig: PRConfig, + hasWeight: Boolean): DataFrame = { + + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) + + val prResultRDD = execute(graph, pageRankConfig.maxIter, pageRankConfig.resetProb) + + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.PAGERANK_RESULT_COL, DoubleType, nullable = true) + )) + val algoResult = spark.sqlContext + .createDataFrame(prResultRDD, schema) + + algoResult + } + + def execute(graph: Graph[None.type, Double], maxIter: Int, resetProb: Double): RDD[Row] = { + val prResultRDD: VertexRDD[Double] = PageRank.run(graph, maxIter, resetProb).vertices + prResultRDD.map(row => Row(row._1, row._2)) + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala new file mode 100644 index 0000000..e4a5771 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgo.scala @@ -0,0 +1,60 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import com.vesoft.nebula.algorithm.config.{ + AlgoConstants, + CcConfig, + Configs, + NebulaConfig, + PRConfig, + ShortestPathConfig, + SparkConfig +} +import org.apache.log4j.Logger +import org.apache.spark.graphx.{Graph, VertexId, VertexRDD} +import org.apache.spark.rdd.RDD +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.spark.graphx.lib.ShortestPaths +import org.apache.spark.graphx.lib.ShortestPaths.SPMap +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +object ShortestPathAlgo { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "ShortestPath" + + /** + * run the ShortestPath algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + shortestPathConfig: ShortestPathConfig, + hasWeight: Boolean): DataFrame = { + + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) + + val prResultRDD = execute(graph, shortestPathConfig.landmarks) + + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.SHORTPATH_RESULT_COL, StringType, nullable = true) + )) + val algoResult = spark.sqlContext + .createDataFrame(prResultRDD, schema) + + algoResult + } + + def execute(graph: Graph[None.type, Double], landmarks: Seq[VertexId]): RDD[Row] = { + val spResultRDD: VertexRDD[SPMap] = ShortestPaths.run(graph, landmarks).vertices + spResultRDD.map(row => Row(row._1, row._2.toString())) + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/StronglyConnectedComponentsAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/StronglyConnectedComponentsAlgo.scala new file mode 100644 index 0000000..720c379 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/StronglyConnectedComponentsAlgo.scala @@ -0,0 +1,50 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import com.vesoft.nebula.algorithm.config.{AlgoConstants, CcConfig} +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.spark.graphx.{Graph, VertexId, VertexRDD} +import org.apache.spark.graphx.lib.{ConnectedComponents, StronglyConnectedComponents} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.types.{LongType, StructField, StructType} + +object StronglyConnectedComponentsAlgo { + + val ALGORITHM: String = "StronglyConnectedComponents" + + /** + * run the StronglyConnectedComponents algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + ccConfig: CcConfig, + hasWeight: Boolean): DataFrame = { + + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, hasWeight) + + val ccResultRDD = execute(graph, ccConfig.maxIter) + + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.SCC_RESULT_COL, LongType, nullable = true) + )) + val algoResult = spark.sqlContext + .createDataFrame(ccResultRDD, schema) + + algoResult + } + + def execute(graph: Graph[None.type, Double], maxIter: Int): RDD[Row] = { + val ccResultRDD: VertexRDD[VertexId] = StronglyConnectedComponents.run(graph, maxIter).vertices + ccResultRDD.map(row => Row(row._1, row._2)) + } + +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala new file mode 100644 index 0000000..b9badb8 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/TriangleCountAlgo.scala @@ -0,0 +1,49 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.AlgoConstants +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.log4j.Logger +import org.apache.spark.graphx.{Graph, VertexRDD} +import org.apache.spark.graphx.lib.TriangleCount +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} + +object TriangleCountAlgo { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "TriangleCount" + + /** + * run the TriangleCount algorithm for nebula graph + * + * compute each vertex's triangle count + */ + def apply(spark: SparkSession, dataset: Dataset[Row]): DataFrame = { + + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, false) + + val triangleResultRDD = execute(graph) + + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.TRIANGLECOUNT_RESULT_COL, IntegerType, nullable = true) + )) + val algoResult = spark.sqlContext + .createDataFrame(triangleResultRDD, schema) + + algoResult + } + + def execute(graph: Graph[None.type, Double]): RDD[Row] = { + val resultRDD: VertexRDD[Int] = TriangleCount.run(graph).vertices + resultRDD.map(row => Row(row._1, row._2)) + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala new file mode 100644 index 0000000..9b6fc2b --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala @@ -0,0 +1,111 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.reader + +import com.vesoft.nebula.connector.connector.NebulaDataFrameReader +import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig} +import com.vesoft.nebula.algorithm.config.Configs +import org.apache.spark.sql.{DataFrame, SparkSession} + +import scala.collection.mutable.ListBuffer + +abstract class DataReader(spark: SparkSession, configs: Configs) { + def read(): DataFrame +} + +class NebulaReader(spark: SparkSession, configs: Configs, partitionNum: String) + extends DataReader(spark, configs) { + override def read(): DataFrame = { + val metaAddress = configs.nebulaConfig.readConfigEntry.address + val space = configs.nebulaConfig.readConfigEntry.space + val labels = configs.nebulaConfig.readConfigEntry.labels + val weights = configs.nebulaConfig.readConfigEntry.weightCols + val partition = partitionNum.toInt + + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress(metaAddress) + .withConenctionRetry(2) + .build() + + val noColumn = weights.isEmpty + + var dataset: DataFrame = null + for (i <- labels.indices) { + val returnCols: ListBuffer[String] = new ListBuffer[String] + if (configs.dataSourceSinkEntry.hasWeight && weights.nonEmpty) { + returnCols.append(weights(i)) + } + val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace(space) + .withLabel(labels(i)) + .withNoColumn(noColumn) + .withReturnCols(returnCols.toList) + .withPartitionNum(partition) + .build() + if (dataset == null) { + dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF() + } else { + dataset = dataset.union(spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()) + } + } + dataset + } +} + +class CsvReader(spark: SparkSession, configs: Configs, partitionNum: String) + extends DataReader(spark, configs) { + override def read(): DataFrame = { + val delimiter = configs.localConfigEntry.delimiter + val header = configs.localConfigEntry.header + val localPath = configs.localConfigEntry.filePath + + val partition = partitionNum.toInt + + val data = + spark.read + .option("header", header) + .option("delimiter", delimiter) + .csv(localPath) + val weight = configs.localConfigEntry.weight + val src = configs.localConfigEntry.srcId + val dst = configs.localConfigEntry.dstId + if (configs.dataSourceSinkEntry.hasWeight && weight != null && !weight.trim.isEmpty) { + data.select(src, dst, weight) + } else { + data.select(src, dst) + } + if (partition != 0) { + data.repartition(partition) + } + data + } +} + +class JsonReader(spark: SparkSession, configs: Configs, partitionNum: String) + extends DataReader(spark, configs) { + override def read(): DataFrame = { + val localPath = configs.localConfigEntry.filePath + val data = spark.read.json(localPath) + val partition = partitionNum.toInt + + val weight = configs.localConfigEntry.weight + val src = configs.localConfigEntry.srcId + val dst = configs.localConfigEntry.dstId + if (configs.dataSourceSinkEntry.hasWeight && weight != null && !weight.trim.isEmpty) { + data.select(src, dst, weight) + } else { + data.select(src, dst) + } + if (partition != 0) { + data.repartition(partition) + } + data + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/utils/NebulaUtil.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/utils/NebulaUtil.scala new file mode 100644 index 0000000..98d5723 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/utils/NebulaUtil.scala @@ -0,0 +1,53 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.utils + +import org.apache.spark.graphx.{Edge, Graph} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Encoder, Row} +import org.slf4j.LoggerFactory + +object NebulaUtil { + private val LOG = LoggerFactory.getLogger(this.getClass) + + /** + * construct original graph + * + * @param hasWeight if the graph has no weight, then edge's weight is default 1.0 + * @return Graph + */ + def loadInitGraph(dataSet: Dataset[Row], hasWeight: Boolean): Graph[None.type, Double] = { + implicit val encoder: Encoder[Edge[Double]] = org.apache.spark.sql.Encoders.kryo[Edge[Double]] + val edges: RDD[Edge[Double]] = dataSet + .map(row => { + if (hasWeight) { + Edge(row.get(0).toString.toLong, row.get(1).toString.toLong, row.get(2).toString.toDouble) + } else { + Edge(row.get(0).toString.toLong, row.get(1).toString.toLong, 1.0) + } + })(encoder) + .rdd + + Graph.fromEdges(edges, None) + } + + /** + * Assembly algorithm's result file path + * + * @param path algorithm configuration + * @param algorithmName + * + * @return validate result path + */ + def getResultPath(path: String, algorithmName: String): String = { + var resultFilePath = path + if (!resultFilePath.endsWith("/")) { + resultFilePath = resultFilePath + "/" + } + resultFilePath + algorithmName + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala new file mode 100644 index 0000000..6498b80 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala @@ -0,0 +1,58 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.writer + +import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter +import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteNebulaVertexConfig} +import com.vesoft.nebula.algorithm.config.{AlgoConstants, Configs} +import org.apache.spark.sql.DataFrame + +abstract class AlgoWriter(data: DataFrame, configs: Configs) { + def write(): Unit +} + +class NebulaWriter(data: DataFrame, configs: Configs) extends AlgoWriter(data, configs) { + override def write(): Unit = { + val graphAddress = configs.nebulaConfig.writeConfigEntry.graphAddress + val metaAddress = configs.nebulaConfig.writeConfigEntry.metaAddress + val space = configs.nebulaConfig.writeConfigEntry.space + val tag = configs.nebulaConfig.writeConfigEntry.tag + val user = configs.nebulaConfig.writeConfigEntry.user + val passwd = configs.nebulaConfig.writeConfigEntry.pswd + + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress(metaAddress) + .withGraphAddress(graphAddress) + .withConenctionRetry(2) + .build() + val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig + .builder() + .withSpace(space) + .withTag(tag) + .withVidField(AlgoConstants.ALGO_ID_COL) + .withVidAsProp(false) + .withBatch(1000) + .build() + data.write.nebula(config, nebulaWriteVertexConfig).writeVertices() + } +} + +class CsvWriter(data: DataFrame, configs: Configs) extends AlgoWriter(data, configs) { + override def write(): Unit = { + val resultPath = configs.localConfigEntry.resultPath + data.repartition(1).write.option("header", true).csv(resultPath) + } +} + +class TextWriter(data: DataFrame, configs: Configs) extends AlgoWriter(data, configs) { + override def write(): Unit = { + val resultPath = configs.localConfigEntry.resultPath + data.repartition(1).write.option("header", true).text(resultPath) + } +} diff --git a/nebula-algorithm/src/test/resources/application.conf b/nebula-algorithm/src/test/resources/application.conf new file mode 100644 index 0000000..a17fb11 --- /dev/null +++ b/nebula-algorithm/src/test/resources/application.conf @@ -0,0 +1,126 @@ +{ + # Spark relation config + spark: { + app: { + name: LPA + # spark.app.partitionNum + partitionNum:100 + } + master:local + } + + data: { + # data source. optional of nebula,csv,json,parquet + source: csv + # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,txt + sink: nebula + # if your algorithm needs weight + hasWeight: false + } + + # Nebula Graph relation config + nebula: { + # algo's data source from Nebula + read: { + # Nebula metad server address, multiple addresses are split by English comma + metaAddress: "127.0.0.1:9559" + # Nebula space + space: nb + # Nebula edge types, multiple labels means that data from multiple edges will union together + labels: ["serve"] + # Nebula edge property name for each edge type, this property will be as weight col for algorithm. + # Make sure the weightCols are corresponding to labels. + weightCols: ["start_year"] + } + + # algo result sink into Nebula + write:{ + # Nebula graphd server address, multiple addresses are split by English comma + graphAddress: "127.0.0.1:9669" + # Nebula metad server address, multiple addresses are split by English comma + metaAddress: "127.0.0.1:9559,127.0.0.1:9560" + user:root + pswd:nebula + # Nebula space name + space:nb + # Nebula tag name, the algorithm result will be write into this tag + tag:pagerank + } + } + + local: { + # algo's data source from Nebula + read:{ + filePath: "hdfs://127.0.0.1:9000/edge/work_for.csv" + # srcId column + srcId:"_c0" + # dstId column + dstId:"_c1" + # weight column + #weight: "col3" + # if csv file has header + header: false + # csv file's delimiter + delimiter:"," + } + + # algo result sink into local file + write:{ + resultPath:/tmp/ + } + } + + + algorithm: { + # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent, + # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount, + # betweenness] + executeAlgo: pagerank + + # pagerank parameter + pagerank: { + maxIter: 10 + resetProb: 0.15 # default 0.15 + } + + # louvain parameter + louvain: { + maxIter: 20 + internalIter: 10 + tol: 0.5 + } + + # connected component parameter TODO not implemented yet. + connectedcomponent: { + maxIter: 20 + } + + # LabelPropagation + labelpropagation: { + maxIter: 20 + } + + # ShortestPaths + shortestpaths: { + # several vertices to compute the shortest path to all vertices. + landmarks: "1" + } + + # vertex degree static + degreestatic: {} + + # kcore + kcore:{ + maxIter:10 + degree:1 + } + + # trianglecount + trianglecount:{} + + # betweenness centrality + betweenness:{ + maxIter:5 + } + } +} diff --git a/nebula-algorithm/src/test/resources/edge.csv b/nebula-algorithm/src/test/resources/edge.csv new file mode 100644 index 0000000..ce93855 --- /dev/null +++ b/nebula-algorithm/src/test/resources/edge.csv @@ -0,0 +1,17 @@ +src,dst,weight +1,1,5.0 +1,2,1.0 +1,3,5.0 +1,4,1.0 +2,1,5.0 +2,2,1.0 +2,3,5.0 +2,4,1.0 +3,1,1.0 +3,2,5.0 +3,3,1.0 +3,4,5.0 +4,1,1.0 +4,2,5.0 +4,3,1.0 +4,4,5.0 \ No newline at end of file diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/config/ConfigSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/config/ConfigSuite.scala new file mode 100644 index 0000000..f8f485c --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/config/ConfigSuite.scala @@ -0,0 +1,133 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.config + +import com.vesoft.nebula.algorithm.config.Configs.Argument +import org.junit.Test + +import scala.collection.mutable.ListBuffer + +class ConfigSuite { + + var configs: Configs = _ + + @Test + def getConfigsSuite(): Unit = { + val args: ListBuffer[String] = new ListBuffer[String] + args.append("-p") + args.append("src/test/resources/application.conf") + try { + val options = Configs.parser(args.toArray, "TestProgram") + val p: Argument = options match { + case Some(config) => config + case _ => + assert(false) + sys.exit(-1) + } + configs = Configs.parse(p.config) + } catch { + case e: Exception => { + e.printStackTrace() + assert(false) + } + } + + } + + @Test + def getSparkConfigSuite(): Unit = { + if (configs == null) { + getConfigsSuite() + } + val sparkConfig = configs.sparkConfig + assert(sparkConfig.map.size == 3) + + val spark = SparkConfig.getSpark(configs) + assert(spark.partitionNum.toInt == 100) + } + + @Test + def getSourceSinkConfigSuite(): Unit = { + if (configs == null) { + getConfigsSuite() + } + val dataSourceSinkEntry = configs.dataSourceSinkEntry + assert(dataSourceSinkEntry.source.equals("csv")) + assert(dataSourceSinkEntry.sink.equals("nebula")) + assert(!dataSourceSinkEntry.hasWeight) + } + @Test + def getNebulaConfigSuite(): Unit = { + if (configs == null) { + getConfigsSuite() + } + val nebulaConfigEntry = configs.nebulaConfig + val writeConfig = nebulaConfigEntry.writeConfigEntry + assert(writeConfig.graphAddress.equals("127.0.0.1:9669")) + assert(writeConfig.metaAddress.equals("127.0.0.1:9559,127.0.0.1:9560")) + assert(writeConfig.space.equals("nb")) + assert(writeConfig.tag.equals("pagerank")) + assert(writeConfig.user.equals("root")) + assert(writeConfig.pswd.equals("nebula")) + + val readConfig = nebulaConfigEntry.readConfigEntry + assert(readConfig.address.equals("127.0.0.1:9559")) + assert(readConfig.space.equals("nb")) + assert(readConfig.labels.size == 1) + assert(readConfig.weightCols.size == 1) + } + + @Test + def getLocalConfigSuite(): Unit = { + if (configs == null) { + getConfigsSuite() + } + val localConfigEntry = configs.localConfigEntry + assert(localConfigEntry.filePath.startsWith("hdfs://")) + assert(localConfigEntry.srcId.equals("_c0")) + assert(localConfigEntry.dstId.equals("_c1")) + assert(localConfigEntry.weight == null) + assert(!localConfigEntry.header) + assert(localConfigEntry.delimiter.equals(",")) + assert(localConfigEntry.resultPath.equals("/tmp/")) + } + + @Test + def getAlgoConfigSuite(): Unit = { + if (configs == null) { + getConfigsSuite() + } + val algoConfig = configs.algorithmConfig + val algoName = AlgoConfig.getAlgoName(configs) + assert(algoName.equals("pagerank")) + + val prConfig = PRConfig.getPRConfig(configs) + assert(prConfig.maxIter == 10) + assert(prConfig.resetProb < 0.150000001) + + val louvainConfig = LouvainConfig.getLouvainConfig(configs) + assert(louvainConfig.maxIter == 20) + assert(louvainConfig.internalIter == 10) + assert(louvainConfig.tol < 0.5000001) + + val ccConfig = CcConfig.getCcConfig(configs) + assert(ccConfig.maxIter == 20) + + val lpaConfig = LPAConfig.getLPAConfig(configs) + assert(lpaConfig.maxIter == 20) + + val shortestPathConfig = ShortestPathConfig.getShortestPathConfig(configs) + assert(shortestPathConfig.landmarks.size == 1) + + val kcoreConfig = KCoreConfig.getKCoreConfig(configs) + assert(kcoreConfig.maxIter == 10) + assert(kcoreConfig.degree == 1) + + val betweennessConfig = BetweennessConfig.getBetweennessConfig(configs) + assert(betweennessConfig.maxIter == 5) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/data/MockNebulaData.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/data/MockNebulaData.scala new file mode 100644 index 0000000..a71f04a --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/data/MockNebulaData.scala @@ -0,0 +1,9 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.data + +object MockNebulaData {} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/BetweennessAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/BetweennessAlgoSuite.scala new file mode 100644 index 0000000..210c24c --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/BetweennessAlgoSuite.scala @@ -0,0 +1,22 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.BetweennessConfig +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class BetweennessAlgoSuite { + @Test + def betweennessAlgoSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val betweennessConfig = new BetweennessConfig(5) + val result = BetweennessCentralityAlgo.apply(spark, data, betweennessConfig, false) + assert(result.count() == 4) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/CcAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/CcAlgoSuite.scala new file mode 100644 index 0000000..c513537 --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/CcAlgoSuite.scala @@ -0,0 +1,22 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.CcConfig +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class CcAlgoSuite { + @Test + def ccAlgoSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val ccAlgoConfig = new CcConfig(5) + val result = ConnectedComponentsAlgo.apply(spark, data, ccAlgoConfig, false) + assert(result.count() == 4) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgoSuite.scala new file mode 100644 index 0000000..4f68ff0 --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/DegreeStaticAlgoSuite.scala @@ -0,0 +1,25 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class DegreeStaticAlgoSuite { + @Test + def degreeStaticAlgoSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val result = DegreeStaticAlgo.apply(spark, data) + assert(result.count() == 4) + result.foreach(row => { + assert(row.get(1).toString.toInt == 8) + assert(row.get(2).toString.toInt == 4) + assert(row.get(3).toString.toInt == 4) + }) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/KCoreAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/KCoreAlgoSuite.scala new file mode 100644 index 0000000..f70056a --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/KCoreAlgoSuite.scala @@ -0,0 +1,22 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.KCoreConfig +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class KCoreAlgoSuite { + @Test + def kcoreSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val kcoreConfig = new KCoreConfig(10, 3) + val kcoreResult = KCoreAlgo.apply(spark, data, kcoreConfig) + assert(kcoreResult.count() == 4) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/LabelPropagationAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/LabelPropagationAlgoSuite.scala new file mode 100644 index 0000000..7d226c5 --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/LabelPropagationAlgoSuite.scala @@ -0,0 +1,25 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.LPAConfig +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class LabelPropagationAlgoSuite { + @Test + def lpaAlgoSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val lpaConfig = new LPAConfig(5) + val result = LabelPropagationAlgo.apply(spark, data, lpaConfig, false) + assert(result.count() == 4) + result.foreach(row => { + assert(row.get(1).toString.toInt == 1) + }) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgoSuite.scala new file mode 100644 index 0000000..06aa3ed --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/LouvainAlgoSuite.scala @@ -0,0 +1,22 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.{ConfigSuite, Configs, LouvainConfig, SparkConfig} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.junit.Test + +class LouvainAlgoSuite { + @Test + def louvainSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val louvainConfig = new LouvainConfig(5, 2, 1.0) + val louvainResult = LouvainAlgo.apply(spark, data, louvainConfig, false) + assert(louvainResult.count() == 4) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/PageRankAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/PageRankAlgoSuite.scala new file mode 100644 index 0000000..f402931 --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/PageRankAlgoSuite.scala @@ -0,0 +1,22 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.{Configs, PRConfig, SparkConfig} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.junit.Test + +class PageRankAlgoSuite { + @Test + def pageRankSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val prConfig = new PRConfig(5, 1.0) + val louvainResult = PageRankAlgo.apply(spark, data, prConfig, false) + assert(louvainResult.count() == 4) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/SCCAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/SCCAlgoSuite.scala new file mode 100644 index 0000000..65ea743 --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/SCCAlgoSuite.scala @@ -0,0 +1,22 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.CcConfig +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class SCCAlgoSuite { + @Test + def sccAlgoSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val sccConfig = new CcConfig(5) + val sccResult = StronglyConnectedComponentsAlgo.apply(spark, data, sccConfig, true) + assert(sccResult.count() == 4) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgoSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgoSuite.scala new file mode 100644 index 0000000..d980d4b --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/ShortestPathAlgoSuite.scala @@ -0,0 +1,22 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.ShortestPathConfig +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class ShortestPathAlgoSuite { + @Test + def shortestPathAlgoSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val shortestPathConfig = new ShortestPathConfig(Seq(1, 2)) + val result = ShortestPathAlgo.apply(spark, data, shortestPathConfig, false) + assert(result.count() == 4) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/TrangleCountSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/TrangleCountSuite.scala new file mode 100644 index 0000000..95e9fc6 --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/lib/TrangleCountSuite.scala @@ -0,0 +1,24 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.lib + +import org.apache.spark.sql.SparkSession +import org.junit.Test + +class TrangleCountSuite { + @Test + def trangleCountSuite(): Unit = { + val spark = SparkSession.builder().master("local").getOrCreate() + val data = spark.read.option("header", true).csv("src/test/resources/edge.csv") + val trangleCountResult = TriangleCountAlgo.apply(spark, data) + assert(trangleCountResult.count() == 4) + assert(trangleCountResult.first().get(1) == 3) + trangleCountResult.foreach(row => { + assert(row.get(1) == 3) + }) + } +} diff --git a/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/utils/NebulaUtilSuite.scala b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/utils/NebulaUtilSuite.scala new file mode 100644 index 0000000..fc39ff5 --- /dev/null +++ b/nebula-algorithm/src/test/scala/com/vesoft/nebula/algorithm/utils/NebulaUtilSuite.scala @@ -0,0 +1,44 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm.utils + +import org.junit.Test + +class NebulaUtilSuite { + + @Test + def validateWithWeight: Unit = { + val hostPorts: String = "127.0.0.1:9559" + val nameSpace: String = "nb" + val labels: List[String] = List("serve", "follow") + val hasWeight: Boolean = true + val weightCols: List[String] = List("start_year", "degree") + } + + @Test + def validateWithoutWeight: Unit = { + val hostPorts: String = "127.0.0.1:9559" + val nameSpace: String = "nb" + val labels: List[String] = List("serve") + val hasWeight: Boolean = false + val weightCols: List[String] = List() + } + + @Test + def getResultPathWithEnding: Unit = { + val path: String = "/tmp/" + val algorithmName: String = "aaa" + assert(NebulaUtil.getResultPath(path, algorithmName).equals("/tmp/aaa")) + } + + @Test + def getResultPathWithoutEnding: Unit = { + val path: String = "/tmp" + val algorithmName: String = "aaa" + assert(NebulaUtil.getResultPath(path, algorithmName).equals("/tmp/aaa")) + } +} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ac4b060 --- /dev/null +++ b/pom.xml @@ -0,0 +1,80 @@ + + + 4.0.0 + + com.vesoft + nebula-spark + pom + 2.5-SNAPSHOT + + + UTF-8 + + + + nebula-spark + Nebula Algorithm + https://github.com/vesoft-inc/nebula-algorithm + + scm:git:https://github.com/vesoft-inc/nebula + https://github.com/vesoft-inc/nebula + scm:git:https://github.com/vesoft-inc/nebula + + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + license + + + + + + nebula + Nebula Graph + nebula-algorithm@vesoft-inc.com + vesoft + + architect + developer + + + + + + nebula-algorithm + example + + + + + release + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + snapshots + https://oss.sonatype.org/content/repositories/snapshots/ + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + verify + + sign + + + + + + +