Skip to content

Commit

Permalink
Spark sstfile generator (#420)
Browse files Browse the repository at this point in the history
* [WIP] add spark sst file generator job to generate sst files per partiton per woker

* add edge's from and to column reference in mapping file, do check those columns exist

* make sure vertex and its outbound edges are in the same partition

* add native client unit test

* manual boxing AnyVal to AnyRef in order to call NativeCLient.encoded ,for that scala has no autoboxing feature like java

* support hive table with date and other partitin columns

* fix double free exception

* remove all rockdbjni related dependency

* use repartitionAndSortWithinPartitions to avoid overlapping sst files key range, update dependency to hadoop 2..7.4

* add mapping file and command line reference, handle mapping load problem

* address comments

* remove duplicate cmake instruction to find JNI header

* fix doc inconsistance

* keep all edges to a single edgeType

* fix flaky UT

* add mapping json schema file and example mapping file

* use hdfs -copyFromLocal to put local sst files to HDFS

* create destination hdfs dir to put sst files before run hdfs -copyFromLocal

* refactor and fix bug when vertex table has only one primary key column but no other column

* edge_type encoded as a property and, clean up local sst file dir and refactor key-value type name

* create parent dir first before creating local sst files

* set java.library.path env variable before run UT in maven surefire pulgin

* files generated suffix with .sst

* COMPILE phase precede PACKAGE phase in default maven lifecycle,so remove redundant COMPILE and enable test in the meantime

* fix build failure caused by imcompatability between maven 3.0.5 and surefire plugin 3.0.0-M2

* add some clearfication about sst file name uniqueness in doc
  • Loading branch information
spacewalkman authored and dangleptr committed Jun 27, 2019
1 parent 86eb34d commit 34eb36d
Show file tree
Hide file tree
Showing 25 changed files with 1,718 additions and 121 deletions.
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.2.8
3 changes: 1 addition & 2 deletions src/tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ if (NOT SKIP_JAVA_CLIENT)
add_subdirectory(importer)
else()
message(STATUS "Skip building the importer")
endif()

endif()
3 changes: 2 additions & 1 deletion src/tools/native-client/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# locate jni header
include_directories($ENV{JAVA_HOME}/include
$ENV{JAVA_HOME}/include/linux)

add_library(nebula_native_client SHARED
add_library(nebula_native_client SHARED
$<TARGET_OBJECTS:dataman_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:base_obj>
Expand Down
3 changes: 1 addition & 2 deletions src/tools/native-client/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#!/bin/bash

mvn clean compile package -DskipTests
mvn test
mvn clean package -X
39 changes: 26 additions & 13 deletions src/tools/native-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,13 @@

<groupId>com.vesoft</groupId>
<artifactId>native-client</artifactId>
<version>0.0.1</version>
<version>0.1.0</version>

<properties>
<project.root.dir>../../..</project.root.dir>
</properties>

<dependencies>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>5.15.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -76,10 +71,14 @@
<goal>javah</goal>
</goals>
<configuration>
<javahOutputDirectory>${basedir}/src/main/cpp</javahOutputDirectory>
<javahOutputFileName>com_vesoft_client_NativeClient.h</javahOutputFileName>
<javahOutputDirectory>${basedir}/src/main/cpp
</javahOutputDirectory>
<javahOutputFileName>
com_vesoft_client_NativeClient.h
</javahOutputFileName>
<javahClassNames>
<javahClassName>com.vesoft.client.NativeClient</javahClassName>
<javahClassName>com.vesoft.client.NativeClient
</javahClassName>
</javahClassNames>
</configuration>
</execution>
Expand All @@ -98,12 +97,14 @@
</goals>
<configuration>
<encoding>UTF-8</encoding>
<outputDirectory>${basedir}/src/main/resources/</outputDirectory>
<outputDirectory>${basedir}/src/main/resources/
</outputDirectory>
<resources>
<resource>
<directory>_build</directory>
<includes>
<include>libnebula_native_client.so</include>
<include>libnebula_native_client.so
</include>
</includes>
<filtering>false</filtering>
</resource>
Expand All @@ -117,11 +118,14 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<configLocation>${project.root.dir}/.linters/java/nebula_java_style_checks.xml</configLocation>
<configLocation>
${project.root.dir}/.linters/java/nebula_java_style_checks.xml
</configLocation>
<encoding>UTF-8</encoding>
<failOnViolation>true</failOnViolation>
<linkXRef>false</linkXRef>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<includeTestSourceDirectory>true
</includeTestSourceDirectory>
<maxAllowedViolations>0</maxAllowedViolations>
<violationSeverity>warning</violationSeverity>
</configuration>
Expand All @@ -142,6 +146,15 @@
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.10</version>
<configuration>
<!-- put nebula_native_client.so in java.library.path env before run test-->
<argLine>-Djava.library.path=_build/</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@
import java.util.Map;
import java.util.Objects;

import org.rocksdb.EnvOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NativeClient implements AutoCloseable {
public class NativeClient {
private static final Logger LOGGER = LoggerFactory.getLogger(NativeClient.class.getName());

static {
NativeClientResourceLoader.resourceLoader();
System.loadLibrary("nebula_native_client");
}

public static class Pair {
Expand All @@ -47,8 +45,6 @@ public String toString() {
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(NativeClient.class.getName());

private static final int PARTITION_ID = 4;
private static final int VERTEX_ID = 8;
private static final int TAG_ID = 4;
Expand All @@ -59,80 +55,6 @@ public String toString() {
private static final int VERTEX_SIZE = PARTITION_ID + VERTEX_ID + TAG_ID + TAG_VERSION;
private static final int EDGE_SIZE = PARTITION_ID + VERTEX_ID + EDGE_TYPE + EDGE_RANKING + VERTEX_ID + EDGE_VERSION;

private SstFileWriter writer;

public NativeClient(String path) {
EnvOptions env = new EnvOptions();
Options options = new Options();
options.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
new NativeClient(path, env, options);
}

public NativeClient(String path, EnvOptions env, Options options) {
if (path == null || path.trim().length() == 0) {
throw new IllegalArgumentException("File Path should not be null and empty");
}
writer = new SstFileWriter(env, options);
try {
writer.open(path);
} catch (RocksDBException e) {
LOGGER.error("SstFileWriter Open Failed {}", e.getMessage());
}
}

public boolean addVertex(String key, Object[] values) {
if (checkKey(key) || checkValues(values)) {
throw new IllegalArgumentException("Add Vertex key and value should not null");
}

byte[] value = encode(values);
try {
writer.put(key.getBytes(), value);
return true;
} catch (RocksDBException e) {
LOGGER.error("AddVertex Failed {}", e.getMessage());
return false;
}
}

public boolean addEdge(String key, Object[] values) {
if (checkKey(key) || checkValues(values)) {
throw new IllegalArgumentException("Add Vertex key and value should not null");
}

byte[] value = encode(values);
try {
writer.put(key.getBytes(), value);
return true;
} catch (RocksDBException e) {
LOGGER.error("AddEdge Failed {}", e.getMessage());
return false;
}
}

public boolean deleteVertex(String key) {
return delete(key);
}

public boolean deleteEdge(String key) {
return delete(key);
}

private boolean delete(String key) {
if (checkKey(key)) {
throw new IllegalArgumentException("Add Vertex key and value should not null");
}

try {
writer.delete(key.getBytes());
return true;
} catch (RocksDBException e) {
LOGGER.error("Delete Failed {}", e.getMessage());
return false;
}
}

public static byte[] createEdgeKey(int partitionId, long srcId, int edgeType,
long edgeRank, long dstId, long edgeVersion) {
ByteBuffer buffer = ByteBuffer.allocate(EDGE_SIZE);
Expand All @@ -157,17 +79,9 @@ public static byte[] createVertexKey(int partitionId, long vertexId,
return buffer.array();
}

private static native byte[] encode(Object[] values);
public static native byte[] encode(Object[] values);

public static byte[] encoded(Object[] values) {
return encode(values);
}

private static native Map<String, byte[]> decode(byte[] encoded, Pair[] fields);

public static Map<String, byte[]> decoded(byte[] encoded, Pair[] fields) {
return decode(encoded, fields);
}
public static native Map<String, byte[]> decode(byte[] encoded, Pair[] fields);

private boolean checkKey(String key) {
return Objects.isNull(key) || key.length() == 0;
Expand All @@ -177,11 +91,4 @@ private boolean checkValues(Object[] values) {
return Objects.isNull(values) || values.length == 0
|| Arrays.asList(values).contains(null);
}

@Override
public void close() throws Exception {
writer.finish();
writer.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

package com.vesoft.client;

import static com.vesoft.client.NativeClient.decoded;
import static com.vesoft.client.NativeClient.encoded;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

Expand Down Expand Up @@ -35,7 +33,7 @@ public void testDecoded() {
0.618,
"Hello".getBytes()
};
byte[] result = encoded(values);
byte[] result = NativeClient.encode(values);

NativeClient.Pair[] pairs = new NativeClient.Pair[]{
new NativeClient.Pair("b_field", Boolean.class.getName()),
Expand All @@ -45,7 +43,7 @@ public void testDecoded() {
new NativeClient.Pair("s_field", byte[].class.getName())
};

Map<String, byte[]> decodedResult = decoded(result, pairs);
Map<String, byte[]> decodedResult = NativeClient.decode(result, pairs);

byte byteValue = decodedResult.get("b_field")[0];
boolean boolValue = (byteValue == 0x00) ? false : true;
Expand Down
75 changes: 75 additions & 0 deletions src/tools/spark-sstfile-generator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
Generate sst files from hive tables datasource, guided by a mapping file, which maps hive tables to vertexes and edges.
Multiple vertexes or edges may map to a single hive table, where a partition column will be used to distinguish different
vertex or edge.
The hive tables may be periodically be regenerated by upstream system to reflect the latest data in so far, and may be
partitioned by a time column to indicate the time when data are generated.
*$HADOOP_HOME* env need to be set for running this job.

# Environment
component|version
---|---
os|centos6.5 final(kernel 2.6.32-431.el6.x86_64)
spark|1.6.2
hadoop|2.7.4
jdk|1.8+
scala|2.10.5
sbt|1.2.8


# Spark-submit command line reference
This is what we used in production environment:
```bash
${SPARK_HOME}/bin/spark-submit --master yarn --queue fmprod --conf spark.executor.instances=24 --conf spark.executor.memory=90g --conf spark.executor.cores=2 --conf spark.executorEnv.LD_LIBRARY_PATH='/soft/server/nebula_native_client:/usr/local/lib:/usr/local/lib64' --conf spark.driver.extraJavaOptions='-Djava.library.path=/soft/server/nebula_native_client/:/usr/local/lib64:/usr/local/lib' --class com.vesoft.tools.SparkSstFileGenerator --files mapping.json nebula-spark-sstfile-generator.jar -di "2019-05-13" -mi mapping.json -pi dt -so file://home/hdp/nebula_output
```
The application options are described as following.

# Spark application command line reference
We keep a convention when naming the option,those suffix with _i_ will be an INPUT type option, while those suffix with _o_ will be an OUTPUT type option

```bash
usage: nebula spark sst file generator
-ci,--default_column_mapping_policy <arg> If omitted, what policy to use when mapping column to property,all columns except primary_key's column will be mapped to tag's property with the same name by default
-di,--latest_date_input <arg> Latest date to query,date format YYYY-MM-dd
-hi,--string_value_charset_input <arg> When the value is of type String,what charset is used when encoded,default to UTF-8
-ho,--hdfs_sst_file_output <arg> Which hdfs directory will those sstfiles be put, should not starts with file:///
-li,--limit_input <arg> Return at most this number of edges/vertex, usually used in POC stage, when omitted, fetch all data.
-mi,--mapping_file_input <arg> Hive tables to nebula graph schema mapping file
-pi,--date_partition_input <arg> A partition field of type String of hive table, which represent a Date, and has format of YYY-MM-dd
-ri,--repartition_number_input <arg> Repartition number. Some optimization trick to improve generation speed and data skewness. Need tuning to suit your data.
-so,--local_sst_file_output <arg> Which local directory those generated sst files will be put, should starts with file:///
-ti,--datasource_type_input <arg> Data source types supported, must be among [hive|hbase|csv] for now, default=hive
```

# Mapping file schema

Mapping file are json format.File Schema is provided as [mapping-schema.json](mapping-schema.json) according to [Json Schema Standard](http://json-schema.org). We provide an example mapping file: [mapping.json](mapping.json)

# FAQ
## How to use libnebula-native-client.so under CentOS6.5(2.6.32-431 x86-64)

1. Don't use officially distributed librocksdbjni-linux64.so, build it natively on CentOS6.5.

```bash
DEBUG_LEVEL=0 make shared_lib
DEBUG_LEVEL=0 make rocksdbjava
```
_make sure to keep consistent with DEBUG_LEVEL when building, or there will be some link error like `symbol not found`
2. run `sbt assembly` to package this project to a spark job jar, which is default named: `nebula-spark-sstfile-generator.jar`
3. run `jar uvf nebula-spark-sstfile-generator.jar librocksdbjni-linux64.so libnebula_native_client.so` to replace the `*.so` files packaged inside the dependency org.rocksdb:rocksdbjni:5.17.2,or some error like following will occur when spark-submit:

```
*** glibc detected *** /soft/java/bin/java: free(): invalid pointer: 0x00007f7985b9f0a0 ***
======= Backtrace: =========
/lib64/libc.so.6(+0x75f4e)[0x7f7c7d5e6f4e]
/lib64/libc.so.6(+0x78c5d)[0x7f7c7d5e9c5d]
/tmp/librocksdbjni3419235685305324910.so(_ZN7rocksdb10EnvOptionsC1Ev+0x578)[0x7f79431ff908]
/tmp/librocksdbjni3419235685305324910.so(Java_org_rocksdb_EnvOptions_newEnvOptions+0x1c)[0x7f7943044dbc]
[0x7f7c689c1747]
```

# TODO
1. Add database_name property to graphspace level and tag/edge level, which the latter will override the former when provided in both levels
2. Schema column definitions' order is important, keep it when parsing mapping file and when encoding
3. Integrated build with maven or cmake, where this spark assembly should be build after nebula native client
4. To handle following situation: different tables share a common Tag, like a tag with properties of (start_time, end_time)

Loading

0 comments on commit 34eb36d

Please sign in to comment.