Skip to content

Commit

Permalink
[GIE Compiler] fix bugs of columnId in schema
Browse files Browse the repository at this point in the history
refactor(flex): Replace the Adhoc csv reader with Arrow CSV reader (alibaba#3154)

1. Use Arrow CSV Reader to replace current adhoc csv reader, to support
more configurable options in `bulk_load.yaml`.
2. Introduce `CSVFragmentLoader`, `BasicFragmentLoader` for
`MutablePropertyFragment`.

With this PR merged, `MutablePropertyFragment` will support loading
fragment from csv with options:
- delimeter: default '|'
- header_row: default true
- quoting: default false
- quoting_char: default '"'
- escaping: default false
- escaping_char: default'\\'
- batch_size: the batch size of when reading file into memory, default
1MB.
- batch_reader: default false. If set to true,
`arrow::csv::StreamingReader` will be used to parse the input file.
Otherwise, `arrow::TableReader` will be used.

With this PR merged, the performance of graph loading will be improved.
The Adhoc Reader denote the current implemented csv parser, 1,2,4,8
denotes the parallelism of graph loading, i.e. how many labels of
vertex/edge are concurrently processed.

Note that TableReader is around 10x faster than StreamingReader. The
possible reason could be the multi-threading is used.
See [arrow-csv-doc](https://arrow.apache.org/docs/cpp/csv.html) for
details.

| Reader | Phase | 1 | 2 | 4 | 8 |
| --------- | -------------- | ---------- |---------- |----------
|---------- |
| Adhoc Reader | ReadFile\+LoadGraph |805s|	468s|	349s|	313s|
| Adhoc Reader | Serialization | 126s|	126s|	126s|	126s|
| Adhoc Reader  | **Total** |931s|	594s|	475s|	439s|
| Table Reader |  ReadFile | 9s	|9s	|9s|	9s|
| Table Reader | LoadGraph |455s|	280s|	211s|	182s|
| Table Reader |Serialization |126s|	126s|	126s|	126s|
| Table Reader | **Total** | 600s|	415s|	346s|	317s|
| Streaming Reader | ReadFile |91s|	91s|	91s|	91s|
| Streaming Reader | LoadGraph | 555s|	289s|	196s|	149s|
| Streaming Reader | Serialization |126s|	126s|	126s|	126s|
| Streaming Reader | **Total** | 772s|	506s|	413s|	366s|

| Reader | Phase | 1 | 2 | 4 | 8 |
| --------- | -------------- | ---------- |---------- |----------
|---------- |
| Adhoc Reader | ReadFile\+LoadGraph |2720s|	1548s|	1176s|	948s|
| Adhoc Reader | Serialization | 409s|	409s|	409s|	409s|
| Adhoc Reader  | **Total** | 3129s|	1957s|	1585s|	1357s|
| Table Reader |  ReadFile |24s|	24s|	24s|	24s|
| Table Reader | LoadGraph |1576s|	949s|	728s|	602s|
| Table Reader |Serialization |409s|	409s|	409s|	409s|
| Table Reader | **Total** | 2009s|	1382s|	1161s|	1035s|
| Streaming Reader | ReadFile |300s|	300s|	300s|	300s|
| Streaming Reader | LoadGraph | 1740s|	965s|	669s|	497s|
| Streaming Reader | Serialization | 409s|	409s|	409s|	409s|
| Streaming Reader | **Total** | 2539s|	1674s|	1378s|	1206s|
| Reader | Phase | 1 | 2 | 4 | 8 |
| --------- | -------------- | ---------- |---------- |----------
|---------- |
| Adhoc Reader | ReadFile\+LoadGraph | 8260s|	4900s	|3603s	|2999s|
| Adhoc Reader | Serialization | 1201s |	1201s|	1201s	|1201s|
| Adhoc Reader  | **Total** | 9461s|	6101s | 4804s	|4200s|
| Table Reader |  ReadFile | 73s	|73s|	96s|	96s|
| Table Reader | LoadGraph |4650s|	2768s|	2155s	|1778s|
| Table Reader |Serialization | 1201s |	1201s|	1201s	|1201s|
| Table Reader | **Total** | 5924s|	4042s|	3452s|	3075s|
| Streaming Reader | ReadFile | 889s |889s | 889s| 889s|
| Streaming Reader | LoadGraph | 5589s|	3005s|	2200s|	1712s|
| Streaming Reader | Serialization | 1201s| 1201s| 1201s |1201s |
| Streaming Reader | **Total** | 7679s	| 5095s |4290s| 	3802s|

FIx alibaba#3116

minor fix and move modern graph

fix grin test

todo: do_start

fix

fix

stash

fix

fix

make rules unique

dockerfile stash

minor change

remove plugin-dir

fix

minor fix

debug

debug

fix

fix
  • Loading branch information
shirly121 authored and zhanglei1949 committed Sep 12, 2023
1 parent a805363 commit cff87da
Show file tree
Hide file tree
Showing 41 changed files with 2,375 additions and 1,214 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/flex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ jobs:
mkdir build && cd build
cmake .. && sudo make -j$(nproc)
export FLEX_DATA_DIR=../../../../storages/rt_mutable_graph/modern_graph/
./run_grin_test
./run_grin_test flex://../../../../interactive/examples/modern_graph/ \
../../../../interactive/examples/modern_graph/modern_graph.yaml \
../../../../interactive/examples/modern_graph/bulk_load.yaml
- name: Test Graph Loading on modern graph
env:
Expand Down
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ core.*

# Flex related
flex/docs/
flex/interactive/data/*/indices/
flex/interactive/data/*/plugins/
flex/interactive/data/*
flex/interactive/logs/*
flex/interactive/examples/sf0.1-raw/
flex/interactive/.running
flex/interactive/.running
flex/interactive/.env
61 changes: 54 additions & 7 deletions flex/bin/load_plan_and_gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ fi
#fi

cypher_to_plan() {
if [ $# -ne 7 ]; then
if [ $# -ne 9 ]; then
echo "Usage: $0 <query_name> <input_file> <output_plan file> <output_yaml_file>"
echo " <ir_compiler_properties> <graph_schema_path> <gie_home>, but receive: "$#
echo " <ir_compiler_properties> <graph_schema_path> <gie_home>"
echo " <procedure_name> <procedure_description>"
echo " but receive: "$#
exit 1
fi
query_name=$1
Expand Down Expand Up @@ -150,26 +152,43 @@ cypher_to_plan() {

compile_hqps_so() {
#check input params size eq 2 or 3
if [ $# -ne 5 ] && [ $# -ne 6 ]; then
echo "Usage: $0 <input_file> <work_dir> <ir_compiler_properties_file> <graph_schema_file> <GIE_HOME> [output_dir]"
if [ $# -gt 8 ] || [ $# -lt 5 ]; then
echo "Usage: $0 <input_file> <work_dir> <ir_compiler_properties_file>"
echo " <graph_schema_file> <GIE_HOME> "
echo " [output_dir] [stored_procedure_name] [stored_procedure_description]"
exit 1
fi
input_path=$1
work_dir=$2
ir_compiler_properties=$3
graph_schema_path=$4
gie_home=$5
if [ $# -eq 6 ]; then
if [ $# -ge 6 ]; then
output_dir=$6
else
output_dir=${work_dir}
fi

if [ $# -ge 7 ]; then
procedure_name=$7
else
procedure_name=""
fi

if [ $# -ge 8 ]; then
procedure_description=$8
else
procedure_description=""
fi

echo "Input path = ${input_path}"
echo "Work dir = ${work_dir}"
echo "ir compiler properties = ${ir_compiler_properties}"
echo "graph schema path = ${graph_schema_path}"
echo "GIE_HOME = ${gie_home}"
echo "Output dir = ${output_dir}"
echo "Procedure name = ${procedure_name}"
echo "Procedure description = ${procedure_description}"

last_file_name=$(basename ${input_path})

Expand All @@ -188,9 +207,18 @@ compile_hqps_so() {
echo "Expect a .pb or .cc file"
exit 1
fi
# if procedure_name is not set, use query_name
if [ -z ${procedure_name} ]; then
procedure_name=${query_name}
fi
# if procedure_description is not set, use query_name
if [ -z ${procedure_description} ]; then
procedure_description="\"Stored procedure for ${query_name}\""
fi
cur_dir=${work_dir}
mkdir -p ${cur_dir}
output_cc_path="${cur_dir}/${query_name}.cc"
dst_yaml_path="${output_dir}/${query_name}.yaml"
if [[ $(uname) == "Linux" ]]; then
output_so_path="${cur_dir}/lib${query_name}.so"
dst_so_path="${output_dir}/lib${query_name}.so"
Expand All @@ -213,7 +241,10 @@ compile_hqps_so() {
# first do .cypher to .pb
output_pb_path="${cur_dir}/${query_name}.pb"
output_yaml_path="${cur_dir}/${query_name}.yaml"
cypher_to_plan ${query_name} ${input_path} ${output_pb_path} ${output_yaml_path} ${ir_compiler_properties} ${graph_schema_path} ${gie_home}
cypher_to_plan ${query_name} ${input_path} ${output_pb_path} \
${output_yaml_path} ${ir_compiler_properties} ${graph_schema_path} ${gie_home} \
${procedure_name} '${procedure_description}'

echo "----------------------------"
echo "Codegen from cypher query done."
echo "----------------------------"
Expand Down Expand Up @@ -294,6 +325,12 @@ compile_hqps_so() {
echo "Copy failed, ${dst_so_path} not exists."
exit 1
fi
# copy the generated yaml
cp ${output_yaml_path} ${output_dir}
if [ ! -f ${dst_yaml_path} ]; then
echo "Copy failed, ${dst_yaml_path} not exists."
exit 1
fi
echo "Finish copying, output to ${dst_so_path}"
}

Expand Down Expand Up @@ -461,6 +498,14 @@ run() {
OUTPUT_DIR="${i#*=}"
shift # past argument=value
;;
--procedure_name=*)
PROCEDURE_NAME="${i#*=}"
shift # past argument=value
;;
--procedure_desc=*)
PROCEDURE_DESCRIPTION="${i#*=}"
shift # past argument=value
;;
-* | --*)
echo "Unknown option $i"
exit 1
Expand All @@ -477,6 +522,8 @@ run() {
echo "graph_schema_path ="${GRAPH_SCHEMA_PATH}
echo "GIE_HOME ="${GIE_HOME}
echo "Output path ="${OUTPUT_DIR}
echo "Procedure name ="${PROCEDURE_NAME}
echo "Procedure description ="${PROCEDURE_DESCRIPTION}

# check input exist
if [ ! -f ${INPUT} ]; then
Expand All @@ -487,7 +534,7 @@ run() {
# if engine_type equals hqps
if [ ${ENGINE_TYPE} == "hqps" ]; then
echo "Engine type is hqps, generating dynamic library for hqps engine."
compile_hqps_so ${INPUT} ${WORK_DIR} ${IR_CONF} ${GRAPH_SCHEMA_PATH} ${GIE_HOME} ${OUTPUT_DIR}
compile_hqps_so ${INPUT} ${WORK_DIR} ${IR_CONF} ${GRAPH_SCHEMA_PATH} ${GIE_HOME} ${OUTPUT_DIR} ${PROCEDURE_NAME} \"${PROCEDURE_DESCRIPTION}\"

# else if engine_type equals pegasus
elif [ ${ENGINE_TYPE} == "pegasus" ]; then
Expand Down
66 changes: 16 additions & 50 deletions flex/bin/sync_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ std::tuple<uint32_t, uint32_t> parse_from_server_config(
<< engine_type_str;
}
}
auto shard_num_node = engine_node["shared_num"];
auto shard_num_node = engine_node["shard_num"];
if (shard_num_node) {
shard_num = shard_num_node.as<uint32_t>();
} else {
LOG(INFO) << "shared_num not found, use default value "
LOG(INFO) << "shard_num not found, use default value "
<< DEFAULT_SHARD_NUM;
}
auto host_node = engine_node["hosts"];
Expand All @@ -151,23 +151,9 @@ std::tuple<uint32_t, uint32_t> parse_from_server_config(
}
}

void load_plugins(const bpo::variables_map& vm) {
if (vm.count("plugin-dir") == 0) {
LOG(INFO) << "plugin-dir is not specified";
return;
}
std::string plugin_dir = vm["plugin-dir"].as<std::string>();
if (!std::filesystem::exists(plugin_dir)) {
LOG(FATAL) << "plugin dir not exists: " << plugin_dir;
}
LOG(INFO) << "plugin dir: " << plugin_dir;
if (!plugin_dir.empty()) {
LOG(INFO) << "Load plugins from dir: " << plugin_dir;
server::StoredProcedureManager::get().LoadFromPluginDir(plugin_dir);
}
}

void init_codegen_proxy(const bpo::variables_map& vm) {
void init_codegen_proxy(const bpo::variables_map& vm,
const std::string& graph_schema_file,
const std::string& engine_config_file) {
std::string codegen_dir = parse_codegen_dir(vm);
std::string codegen_bin;
std::string gie_home;
Expand All @@ -181,25 +167,6 @@ void init_codegen_proxy(const bpo::variables_map& vm) {
LOG(FATAL) << "codegen bin not exists: " << codegen_bin;
}
}
std::string ir_compiler_properties;
std::string compiler_graph_schema;
if (vm.count("ir-compiler-prop") == 0) {
LOG(FATAL) << "ir-compiler-prop is not specified";
} else {
ir_compiler_properties = vm["ir-compiler-prop"].as<std::string>();
if (!std::filesystem::exists(ir_compiler_properties)) {
LOG(FATAL) << "ir-compiler-prop not exists: " << ir_compiler_properties;
}
}
if (vm.count("compiler-graph-schema") == 0) {
LOG(FATAL) << "compiler-graph-schema is not specified";
} else {
compiler_graph_schema = vm["compiler-graph-schema"].as<std::string>();
if (!std::filesystem::exists(compiler_graph_schema)) {
LOG(FATAL) << "compiler-graph-schema not exists: "
<< compiler_graph_schema;
}
}
if (vm.count("gie-home") == 0) {
LOG(FATAL) << "gie-home is not specified";
} else {
Expand All @@ -208,9 +175,8 @@ void init_codegen_proxy(const bpo::variables_map& vm) {
LOG(FATAL) << "gie-home not exists: " << gie_home;
}
}
server::CodegenProxy::get().Init(codegen_dir, codegen_bin,
ir_compiler_properties,
compiler_graph_schema, gie_home);
server::CodegenProxy::get().Init(codegen_dir, codegen_bin, graph_schema_file,
engine_config_file, gie_home);
}
} // namespace gs

Expand All @@ -226,12 +192,7 @@ int main(int argc, char** argv) {
"graph-config,g", bpo::value<std::string>(), "graph schema config file")(
"data-path,a", bpo::value<std::string>(), "data directory path")(
"bulk-load,l", bpo::value<std::string>(), "bulk-load config file")(
"plugin-dir,p", bpo::value<std::string>(), "plugin directory path")(
"gie-home,h", bpo::value<std::string>(), "path to gie home")(
"ir-compiler-prop,i", bpo::value<std::string>(),
"ir compiler property file")("compiler-graph-schema,z",
bpo::value<std::string>(),
"compiler graph schema file");
"gie-home,h", bpo::value<std::string>(), "path to gie home");

setenv("TZ", "Asia/Shanghai", 1);
tzset();
Expand All @@ -251,9 +212,10 @@ int main(int argc, char** argv) {
std::string data_path;
std::string bulk_load_config_path;
std::string plugin_dir;
std::string server_config_path;

if (vm.count("server-config") != 0) {
std::string server_config_path = vm["server-config"].as<std::string>();
server_config_path = vm["server-config"].as<std::string>();
// check file exists
if (!std::filesystem::exists(server_config_path)) {
LOG(ERROR) << "server-config not exists: " << server_config_path;
Expand Down Expand Up @@ -294,8 +256,12 @@ int main(int argc, char** argv) {
LOG(INFO) << "Finished loading graph, elapsed " << t0 << " s";

// loading plugin
gs::load_plugins(vm);
gs::init_codegen_proxy(vm);
if (!schema.GetPluginDir().empty() && !schema.GetPluginsList().empty()) {
server::StoredProcedureManager::get().LoadFromPluginDir(
schema.GetPluginDir(), schema.GetPluginsList());
}

gs::init_codegen_proxy(vm, graph_schema_path, server_config_path);

server::HQPSService::get().init(shard_num, http_port, false);
server::HQPSService::get().run_and_wait_for_exit();
Expand Down
7 changes: 4 additions & 3 deletions flex/engines/graph_db/grin/src/topology/structure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ limitations under the License.
* flex://{path_to_yaml}
* @return A graph handle.
*/
GRIN_GRAPH grin_get_graph_from_storage(const char* uri) {
GRIN_GRAPH grin_get_graph_from_storage(const char* uri, const char* schema_file,
const char* bulk_load_file) {
std::string _uri(uri);
std::string::size_type pos = _uri.find("://");
if (pos == std::string::npos) {
Expand All @@ -35,9 +36,9 @@ GRIN_GRAPH grin_get_graph_from_storage(const char* uri) {
return GRIN_NULL_GRAPH;
}
_uri = _uri.substr(pos + 3);
std::string graph_schema_path = _uri + "/modern_graph.yaml";
std::string graph_schema_path = schema_file;
std::string data_path = uri;
std::string bulk_load_config_path = _uri + "/bulk_load.yaml";
std::string bulk_load_config_path = bulk_load_file;
if (!std::filesystem::exists(graph_schema_path) ||
!(std::filesystem::exists(bulk_load_config_path))) {
return GRIN_NULL_GRAPH;
Expand Down
Loading

0 comments on commit cff87da

Please sign in to comment.