From 963ac9b0db8d87b6c1e7c90c0deda411d99bc1b9 Mon Sep 17 00:00:00 2001 From: Sophie-Xie <84560950+Sophie-Xie@users.noreply.github.com> Date: Wed, 13 Oct 2021 17:36:45 +0800 Subject: [PATCH] Cherry pick from master to v2.6.0 (#3052) Cherry-pick #3033 #3037 #3032 #3025 #3042 --- codecov.yml | 14 +++ conf/nebula-graphd.conf.default | 4 + conf/nebula-graphd.conf.production | 4 + src/storage/StorageServer.cpp | 7 +- .../ChainAddEdgesProcessorLocal.cpp | 112 +++++++++++++----- tests/conftest.py | 24 ++-- tests/nebula-test-run.py | 7 +- tests/tck/conftest.py | 58 ++++----- .../VerifyClientVersion.feature | 1 - 9 files changed, 153 insertions(+), 78 deletions(-) create mode 100644 codecov.yml diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 00000000000..dd59a0ba054 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,14 @@ +# Copyright (c) 2021 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License, +# attached with Common Clause Condition 1.0, found in the LICENSES directory. + +# For more configuration details: +# https://docs.codecov.io/docs/codecov-yaml + +# validate the configuration: +# curl -X POST --data-binary @codecov.yml https://codecov.io/validate + +codecov: + allow_pseudo_compare: True + allow_coverage_offsets: True diff --git a/conf/nebula-graphd.conf.default b/conf/nebula-graphd.conf.default index 9951cf37896..fa406f8008a 100644 --- a/conf/nebula-graphd.conf.default +++ b/conf/nebula-graphd.conf.default @@ -81,3 +81,7 @@ ########## memory ########## # System memory high watermark ratio --system_memory_high_watermark_ratio=0.8 + +########## experimental feature ########## +# if use experimental features +--enable_experimental_feature=false diff --git a/conf/nebula-graphd.conf.production b/conf/nebula-graphd.conf.production index b3eebaf8dbf..26bc28827cf 100644 --- a/conf/nebula-graphd.conf.production +++ b/conf/nebula-graphd.conf.production @@ -79,3 +79,7 @@ ########## memory ########## # System memory high watermark ratio --system_memory_high_watermark_ratio=0.8 + +########## experimental feature ########## +# if use experimental features +--enable_experimental_feature=false diff --git a/src/storage/StorageServer.cpp b/src/storage/StorageServer.cpp index 816b4e46b11..b415f9963b4 100644 --- a/src/storage/StorageServer.cpp +++ b/src/storage/StorageServer.cpp @@ -336,6 +336,9 @@ void StorageServer::stop() { webSvc_.reset(); + if (txnMan_) { + txnMan_->stop(); + } if (taskMgr_) { taskMgr_->shutdown(); } @@ -348,10 +351,6 @@ void StorageServer::stop() { if (adminServer_) { adminServer_->stop(); } - if (txnMan_) { - txnMan_->stop(); - txnMan_.reset(); - } if (internalStorageServer_) { internalStorageServer_->stop(); } diff --git a/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp b/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp index 70317f31235..8ac4d12994a 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp +++ b/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp @@ -136,7 +136,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re spaceVidType_ = vidType.value(); } localPartId_ = req.get_parts().begin()->first; - // replaceNullWithDefaultValue(req_); + replaceNullWithDefaultValue(req_); auto part = env_->kvstore_->part(spaceId_, localPartId_); if (!nebula::ok(part)) { pushResultCode(nebula::error(part), localPartId_); @@ -425,44 +425,96 @@ std::string ChainAddEdgesProcessorLocal::makeReadableEdge(const cpp2::AddEdgesRe * in/out edge, but they will calculate independent * which lead to inconsistance * - * that why we need to replace the inconsistance prone value + * that's why we need to replace the inconsistance prone value * at the monment the request comes * */ void ChainAddEdgesProcessorLocal::replaceNullWithDefaultValue(cpp2::AddEdgesRequest& req) { + auto& edgesOfPart = *req.parts_ref(); + if (edgesOfPart.empty()) { + return; + } + auto& edgesOfFirstPart = edgesOfPart.begin()->second; + if (edgesOfFirstPart.empty()) { + return; + } + auto firstEdgeKey = edgesOfFirstPart.front().get_key(); + auto edgeType = std::abs(*firstEdgeKey.edge_type_ref()); + auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, edgeType); + DefaultValueContext expCtx; + // the coming request has two forms + // 1st "propNames" is empty, + // which means all vals should be write as the same sequence of schema + // 2nd "propNames" is not empty + // vals of request should be write according to propName of schema + // use the following "idxVec" to identify which index a val should be write to. + std::vector idxVec; auto& propNames = *req.prop_names_ref(); + if (propNames.empty()) { + for (auto i = 0U; i < schema->getNumFields(); ++i) { + idxVec.emplace_back(i); + } + } else { + // first scan the origin input propNames + for (auto& name : propNames) { + int64_t index = schema->getFieldIndex(name); + idxVec.emplace_back(index); + } + // second, check if there any cols not filled but has default val + // we need to append these cols + for (auto i = 0U; i < schema->getNumFields(); ++i) { + auto it = std::find(idxVec.begin(), idxVec.end(), i); + if (it == idxVec.end()) { + auto field = schema->field(i); + if (field->hasDefault()) { + idxVec.emplace_back(i); + } + } + } + } + for (auto& part : *req.parts_ref()) { for (auto& edge : part.second) { - auto edgeKey = edge.get_key(); - auto edgeType = std::abs(*edgeKey.edge_type_ref()); - auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, edgeType); auto& vals = *edge.props_ref(); - for (auto i = 0U; i < schema->getNumFields(); ++i) { - std::string fieldName(schema->getFieldName(i)); - auto it = std::find(propNames.begin(), propNames.end(), fieldName); - if (it == propNames.end()) { - auto field = schema->field(i); - if (field->hasDefault()) { - auto expr = field->defaultValue()->clone(); - propNames.emplace_back(fieldName); - auto defVal = Expression::eval(expr, expCtx); - switch (defVal.type()) { - case Value::Type::DATE: - vals.emplace_back(defVal.getDate()); - break; - case Value::Type::TIME: - vals.emplace_back(defVal.getTime()); - break; - case Value::Type::DATETIME: - vals.emplace_back(defVal.getDateTime()); - break; - default: - // for other type, local and remote should behavior same. - break; - } - } else { - // it's ok if this field doesn't have a default value + if (vals.size() > idxVec.size()) { + LOG(ERROR) << folly::sformat( + "error vals.size()={} > idxVec.size()={}", vals.size(), idxVec.size()); + continue; + } + for (auto i = vals.size(); i < idxVec.size(); ++i) { + auto field = schema->field(idxVec[i]); + if (field->hasDefault()) { + auto expr = field->defaultValue()->clone(); + auto defVal = Expression::eval(expr, expCtx); + switch (defVal.type()) { + case Value::Type::BOOL: + vals.emplace_back(defVal.getBool()); + break; + case Value::Type::INT: + vals.emplace_back(defVal.getInt()); + break; + case Value::Type::FLOAT: + vals.emplace_back(defVal.getFloat()); + break; + case Value::Type::STRING: + vals.emplace_back(defVal.getStr()); + break; + case Value::Type::DATE: + vals.emplace_back(defVal.getDate()); + break; + case Value::Type::TIME: + vals.emplace_back(defVal.getTime()); + break; + case Value::Type::DATETIME: + vals.emplace_back(defVal.getDateTime()); + break; + default: + // for other type, local and remote should behavior same. + break; } + } else { + // set null + vals.emplace_back(Value::kNullValue); } } } diff --git a/tests/conftest.py b/tests/conftest.py index c4f320ea462..fa7d1770d3b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,9 +14,9 @@ from tests.common.utils import get_conn_pool from tests.common.constants import NB_TMP_PATH, SPACE_TMP_PATH -#from thrift.transport import TSocket -#from thrift.transport import TTransport -#from thrift.protocol import TBinaryProtocol +from nebula2.fbthrift.transport import TSocket +from nebula2.fbthrift.transport import TTransport +from nebula2.fbthrift.protocol import TBinaryProtocol from nebula2.gclient.net import Connection from nebula2.graph import GraphService @@ -193,12 +193,12 @@ def workarround_for_class(request, pytestconfig, conn_pool, request.cls.cleanup() request.cls.drop_data() -#@pytest.fixture(scope="class") -#def establish_a_rare_connection(pytestconfig): -# addr = pytestconfig.getoption("address") -# host_addr = addr.split(":") if addr else ["localhost", get_ports()[0]] -# socket = TSocket.TSocket(host_addr[0], host_addr[1]) -# transport = TTransport.TBufferedTransport(socket) -# protocol = TBinaryProtocol.TBinaryProtocol(transport) -# transport.open() -# return GraphService.Client(protocol) +@pytest.fixture(scope="class") +def establish_a_rare_connection(pytestconfig): + addr = pytestconfig.getoption("address") + host_addr = addr.split(":") if addr else ["localhost", get_ports()[0]] + socket = TSocket.TSocket(host_addr[0], host_addr[1]) + transport = TTransport.TBufferedTransport(socket) + protocol = TBinaryProtocol.TBinaryProtocol(transport) + transport.open() + return GraphService.Client(protocol) diff --git a/tests/nebula-test-run.py b/tests/nebula-test-run.py index 2f1fbc371fe..c012cbca2fe 100755 --- a/tests/nebula-test-run.py +++ b/tests/nebula-test-run.py @@ -108,8 +108,11 @@ def start_nebula(nb, configs): with open(SPACE_TMP_PATH, "w") as f: spaces = [] - for space in ("nba", "nba_int_vid", "student"): - data_dir = os.path.join(CURR_PATH, "data", space) + folder = os.path.join(CURR_PATH, "data") + for space in os.listdir(folder): + if not os.path.exists(os.path.join(folder, space, "config.yaml")): + continue + data_dir = os.path.join(folder, space) space_desc = load_csv_data(sess, data_dir, space) spaces.append(space_desc.__dict__) f.write(json.dumps(spaces)) diff --git a/tests/tck/conftest.py b/tests/tck/conftest.py index fc2d8f67685..eff2f8fe17a 100644 --- a/tests/tck/conftest.py +++ b/tests/tck/conftest.py @@ -32,8 +32,8 @@ from tests.tck.utils.table import dataset, table from tests.tck.utils.nbv import murmurhash2 -#from nebula2.graph.ttypes import VerifyClientVersionReq -#from nebula2.graph.ttypes import VerifyClientVersionResp +from nebula2.graph.ttypes import VerifyClientVersionReq +from nebula2.graph.ttypes import VerifyClientVersionResp parse = functools.partial(parsers.parse) rparse = functools.partial(parsers.re) @@ -536,30 +536,30 @@ def executing_query_with_params(query, indices, keys, graph_spaces, session, req ngql = combine_query(query).format(*vals) exec_query(request, ngql, session, graph_spaces) -#@given(parse("nothing")) -#def nothing(): -# pass -# -#@when(parse("connecting the servers with a compatible client version")) -#def connecting_servers_with_a_compatible_client_version(establish_a_rare_connection, graph_spaces): -# conn = establish_a_rare_connection -# graph_spaces["resp"] = conn.verifyClientVersion(VerifyClientVersionReq()) -# conn._iprot.trans.close() -# -#@then(parse("the connection should be established")) -#def check_client_compatible(graph_spaces): -# resp = graph_spaces["resp"] -# assert resp.error_code == ErrorCode.SUCCEEDED, f'The client was rejected by server: {resp}' -# -#@when(parse("connecting the servers with a client version of {version}")) -#def connecting_servers_with_a_compatible_client_version(version, establish_a_rare_connection, graph_spaces): -# conn = establish_a_rare_connection -# req = VerifyClientVersionReq() -# req.version = version -# graph_spaces["resp"] = conn.verifyClientVersion(req) -# conn._iprot.trans.close() -# -#@then(parse("the connection should be rejected")) -#def check_client_compatible(graph_spaces): -# resp = graph_spaces["resp"] -# assert resp.error_code == ErrorCode.E_CLIENT_SERVER_INCOMPATIBLE, f'The client was not rejected by server: {resp}' +@given(parse("nothing")) +def nothing(): + pass + +@when(parse("connecting the servers with a compatible client version")) +def connecting_servers_with_a_compatible_client_version(establish_a_rare_connection, graph_spaces): + conn = establish_a_rare_connection + graph_spaces["resp"] = conn.verifyClientVersion(VerifyClientVersionReq()) + conn._iprot.trans.close() + +@then(parse("the connection should be established")) +def check_client_compatible(graph_spaces): + resp = graph_spaces["resp"] + assert resp.error_code == ErrorCode.SUCCEEDED, f'The client was rejected by server: {resp}' + +@when(parse("connecting the servers with a client version of {version}")) +def connecting_servers_with_a_compatible_client_version(version, establish_a_rare_connection, graph_spaces): + conn = establish_a_rare_connection + req = VerifyClientVersionReq() + req.version = version + graph_spaces["resp"] = conn.verifyClientVersion(req) + conn._iprot.trans.close() + +@then(parse("the connection should be rejected")) +def check_client_compatible(graph_spaces): + resp = graph_spaces["resp"] + assert resp.error_code == ErrorCode.E_CLIENT_SERVER_INCOMPATIBLE, f'The client was not rejected by server: {resp}' diff --git a/tests/tck/features/verify_client_version/VerifyClientVersion.feature b/tests/tck/features/verify_client_version/VerifyClientVersion.feature index 777c11b9efd..0eb84a15825 100644 --- a/tests/tck/features/verify_client_version/VerifyClientVersion.feature +++ b/tests/tck/features/verify_client_version/VerifyClientVersion.feature @@ -2,7 +2,6 @@ # # This source code is licensed under Apache 2.0 License, # attached with Common Clause Condition 1.0, found in the LICENSES directory. -@skip Feature: Verify client version Scenario: compatible version