Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/create variable map #4822

Merged
merged 9 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/graph/context/ExecutionContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ void ExecutionContext::setResult(const std::string& name, Result&& result) {
}

void ExecutionContext::dropResult(const std::string& name) {
DCHECK_EQ(valueMap_.count(name), 1);
auto& val = valueMap_[name];
if (FLAGS_enable_async_gc) {
GC::instance().clear(std::move(val));
Expand Down
2 changes: 1 addition & 1 deletion src/graph/context/QueryContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void QueryContext::init() {
}
}
idGen_ = std::make_unique<IdGenerator>(0);
symTable_ = std::make_unique<SymbolTable>(objPool_.get());
symTable_ = std::make_unique<SymbolTable>(objPool_.get(), ectx_.get());
vctx_ = std::make_unique<ValidateContext>(std::make_unique<AnonVarGenerator>(symTable_.get()));
}

Expand Down
4 changes: 3 additions & 1 deletion src/graph/context/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ class QueryContext {
return killed_.load();
}

// This is only valid in building stage!
// TODO remove parameter from variables map
bool existParameter(const std::string& param) const {
return ectx_->exist(param) && (ectx_->getValue(param).type() != Value::Type::DATASET);
return !ectx_->getValue(param).empty(); // Really fill value for parameter
}

private:
Expand Down
12 changes: 5 additions & 7 deletions src/graph/context/Symbols.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,14 @@ std::string SymbolTable::toString() const {
return ss.str();
}

SymbolTable::SymbolTable(ObjectPool* objPool) {
DCHECK(objPool != nullptr);
objPool_ = objPool;
}

Variable* SymbolTable::newVariable(std::string name) {
Variable* SymbolTable::newVariable(const std::string& name) {
VLOG(1) << "New variable for: " << name;
DCHECK(vars_.find(name) == vars_.end());
auto* variable = objPool_->makeAndAdd<Variable>(name);
addVar(std::move(name), variable);
addVar(name, variable);
// Initialize all variable in variable map (ouput of node, inner variable etc.)
// Some variable will be useless after optimizer, maybe we could remove it.
ectx_->initVar(name);
return variable;
}

Expand Down
11 changes: 7 additions & 4 deletions src/graph/context/Symbols.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "common/base/ObjectPool.h"
#include "common/base/StatusOr.h"
#include "common/datatypes/Value.h"
#include "graph/context/ExecutionContext.h"

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -52,11 +53,10 @@ struct Variable {

class SymbolTable final {
public:
explicit SymbolTable(ObjectPool* objPool);
explicit SymbolTable(ObjectPool* objPool, ExecutionContext* ectx)
: objPool_(DCHECK_NOTNULL(objPool)), ectx_(DCHECK_NOTNULL(ectx)) {}

Variable* newVariable(std::string name);

void addVar(std::string varName, Variable* variable);
Variable* newVariable(const std::string& name);

bool readBy(const std::string& varName, PlanNode* node);

Expand All @@ -79,7 +79,10 @@ class SymbolTable final {
std::string toString() const;

private:
void addVar(std::string varName, Variable* variable);

ObjectPool* objPool_{nullptr};
ExecutionContext* ectx_{nullptr};
// var name -> variable
std::unordered_map<std::string, Variable*> vars_;
// alias -> first variable that generate the alias
Expand Down
4 changes: 1 addition & 3 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,7 @@ Executor::Executor(const std::string &name, const PlanNode *node, QueryContext *
// Initialize the position in ExecutionContext for each executor before
// execution plan starting to run. This will avoid lock something for thread
// safety in real execution
if (!ectx_->exist(node->outputVar())) {
ectx_->initVar(node->outputVar());
}
DCHECK(ectx_->exist(node->outputVar()));
}

Executor::~Executor() {}
Expand Down
17 changes: 13 additions & 4 deletions src/graph/executor/test/ProjectTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

#include <gtest/gtest.h>

#include "common/expression/PropertyExpression.h"
#include "graph/context/QueryContext.h"
#include "graph/executor/query/ProjectExecutor.h"
#include "graph/executor/test/QueryTestBase.h"
#include "graph/planner/plan/Logic.h"
#include "graph/planner/plan/Query.h"
#include "parser/Clauses.h"

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -51,7 +53,9 @@ class ProjectTest : public QueryTestBase {

TEST_F(ProjectTest, Project1Col) {
std::string input = "input_project";
auto yieldColumns = getYieldColumns("YIELD $input_project.vid AS vid", qctx_.get());
auto yieldColumns = qctx_->objPool()->makeAndAdd<YieldColumns>();
yieldColumns->addColumn(new YieldColumn(
VariablePropertyExpression::make(qctx_->objPool(), "input_project", "vid"), "vid"));

auto* project = Project::make(qctx_.get(), start_, yieldColumns);
project->setInputVar(input);
Expand All @@ -76,8 +80,11 @@ TEST_F(ProjectTest, Project1Col) {

TEST_F(ProjectTest, Project2Col) {
std::string input = "input_project";
auto yieldColumns =
getYieldColumns("YIELD $input_project.vid AS vid, $input_project.col2 AS num", qctx_.get());
auto yieldColumns = qctx_->objPool()->makeAndAdd<YieldColumns>();
yieldColumns->addColumn(new YieldColumn(
VariablePropertyExpression::make(qctx_->objPool(), "input_project", "vid"), "vid"));
yieldColumns->addColumn(new YieldColumn(
VariablePropertyExpression::make(qctx_->objPool(), "input_project", "col2"), "num"));
auto* project = Project::make(qctx_.get(), start_, yieldColumns);
project->setInputVar(input);
project->setColNames(std::vector<std::string>{"vid", "num"});
Expand All @@ -102,7 +109,9 @@ TEST_F(ProjectTest, Project2Col) {

TEST_F(ProjectTest, EmptyInput) {
std::string input = "empty";
auto yieldColumns = getYieldColumns("YIELD $input_project.vid AS vid", qctx_.get());
auto yieldColumns = qctx_->objPool()->makeAndAdd<YieldColumns>();
yieldColumns->addColumn(new YieldColumn(
VariablePropertyExpression::make(qctx_->objPool(), "input_project", "vid"), "vid"));
auto* project = Project::make(qctx_.get(), start_, std::move(yieldColumns));
project->setInputVar(input);
project->setColNames(std::vector<std::string>{"vid"});
Expand Down
171 changes: 90 additions & 81 deletions tests/tck/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,16 @@ def import_csv_data(request, data, exec_ctx, pytestconfig):
exec_ctx["drop_space"] = True


def exec_query(request, ngql, exec_ctx, sess=None, need_try: bool = False):
def exec_query(request, ngql, exec_ctx, sess=None, need_try: bool = False, times: int = 1):
assert times > 0
if not ngql:
return
ngql = normalize_outline_scenario(request, ngql)
if sess is None:
sess = exec_ctx.get('current_session')
exec_ctx['result_set'] = response(sess, ngql, need_try)
exec_ctx['result_set'] = []
for _ in range(times):
exec_ctx['result_set'].append(response(sess, ngql, need_try))
exec_ctx['ngql'] = ngql


Expand Down Expand Up @@ -405,6 +408,12 @@ def executing_query(query, exec_ctx, request):
ngql = combine_query(query)
exec_query(request, ngql, exec_ctx)

# execute query multiple times
@when(parse("executing query {times:d} times:\n{query}"))
def executing_query_multiple_times(times, query, exec_ctx, request):
ngql = combine_query(query)
exec_query(request, ngql, exec_ctx, times = times)


@when(
parse(
Expand All @@ -414,13 +423,13 @@ def executing_query(query, exec_ctx, request):
def executing_query_with_retry(query, exec_ctx, request, secs, retryTimes):
ngql = combine_query(query)
exec_query(request, ngql, exec_ctx)
res = exec_ctx["result_set"]
res = exec_ctx["result_set"][0]
if not res.is_succeeded():
retryCounter = 0
while retryCounter < retryTimes:
time.sleep(secs)
exec_query(request, ngql, exec_ctx)
resRetry = exec_ctx["result_set"]
resRetry = exec_ctx["result_set"][0]
if not resRetry.is_succeeded():
retryCounter = retryCounter + 1
else:
Expand Down Expand Up @@ -487,7 +496,7 @@ def submit_job(query, exec_ctx, request):

@then("wait the job to finish")
def wait_job_to_finish(exec_ctx):
resp = exec_ctx['result_set']
resp = exec_ctx['result_set'][0]
jid = job_id(resp)
session = exec_ctx.get('current_session')
is_finished = wait_all_jobs_finished(session, [jid])
Expand Down Expand Up @@ -536,58 +545,58 @@ def cmp_dataset(
first_n_records=-1,
hashed_columns=[],
):
rs = exec_ctx['result_set']
ngql = exec_ctx['ngql']
check_resp(rs, ngql)
space_desc = exec_ctx.get('space_desc', None)
vid_fn = murmurhash2 if space_desc and space_desc.is_int_vid() else None
ds = dataset(
table(result, lambda x: normalize_outline_scenario(request, x)),
exec_ctx.get("variables", {}),
)
ds = hash_columns(ds, hashed_columns)
dscmp = DataSetComparator(
strict=strict,
order=order,
contains=contains,
first_n_records=first_n_records,
decode_type=rs._decode_type,
vid_fn=vid_fn,
)

def dsp(ds):
printer = DataSetPrinter(rs._decode_type, vid_fn=vid_fn)
return printer.ds_to_string(ds)

def rowp(ds, i):
if i is None or i < 0:
return "" if i != -2 else "Invalid column names"
assert i < len(ds.rows), f"{i} out of range {len(ds.rows)}"
row = ds.rows[i].values
printer = DataSetPrinter(rs._decode_type, vid_fn=vid_fn)
ss = printer.list_to_string(row, delimiter='|')
return f'{i}: |' + ss + '|'

if rs._data_set_wrapper is None:
assert (
not ds.column_names and not ds.rows
), f"Expected result must be empty table: ||"

rds = rs._data_set_wrapper._data_set
res, i = dscmp(rds, ds)
if not res:
scen = request.function.__scenario__
feature = scen.feature.rel_filename
msg = [
f"Fail to exec: {ngql}",
f"Response: {dsp(rds)}",
f"Expected: {dsp(ds)}",
f"NotFoundRow: {rowp(ds, i)}",
f"Space: {str(space_desc)}",
f"vid_fn: {vid_fn}",
]
assert res, "\n".join(msg)
return rds
for rs in exec_ctx['result_set']:
ngql = exec_ctx['ngql']
check_resp(rs, ngql)
space_desc = exec_ctx.get('space_desc', None)
vid_fn = murmurhash2 if space_desc and space_desc.is_int_vid() else None
ds = dataset(
table(result, lambda x: normalize_outline_scenario(request, x)),
exec_ctx.get("variables", {}),
)
ds = hash_columns(ds, hashed_columns)
dscmp = DataSetComparator(
strict=strict,
order=order,
contains=contains,
first_n_records=first_n_records,
decode_type=rs._decode_type,
vid_fn=vid_fn,
)

def dsp(ds):
printer = DataSetPrinter(rs._decode_type, vid_fn=vid_fn)
return printer.ds_to_string(ds)

def rowp(ds, i):
if i is None or i < 0:
return "" if i != -2 else "Invalid column names"
assert i < len(ds.rows), f"{i} out of range {len(ds.rows)}"
row = ds.rows[i].values
printer = DataSetPrinter(rs._decode_type, vid_fn=vid_fn)
ss = printer.list_to_string(row, delimiter='|')
return f'{i}: |' + ss + '|'

if rs._data_set_wrapper is None:
assert (
not ds.column_names and not ds.rows
), f"Expected result must be empty table: ||"

rds = rs._data_set_wrapper._data_set
res, i = dscmp(rds, ds)
if not res:
scen = request.function.__scenario__
feature = scen.feature.rel_filename
msg = [
f"Fail to exec: {ngql}",
f"Response: {dsp(rds)}",
f"Expected: {dsp(ds)}",
f"NotFoundRow: {rowp(ds, i)}",
f"Space: {str(space_desc)}",
f"vid_fn: {vid_fn}",
]
assert res, "\n".join(msg)
return exec_ctx['result_set'][0]._data_set_wrapper._data_set


@then(parse("define some list variables:\n{text}"))
Expand Down Expand Up @@ -750,9 +759,9 @@ def no_side_effects():

@then("the execution should be successful")
def execution_should_be_succ(exec_ctx):
rs = exec_ctx["result_set"]
stmt = exec_ctx["ngql"]
check_resp(rs, stmt)
for rs in exec_ctx['result_set']:
check_resp(rs, stmt)


@then(
Expand All @@ -761,7 +770,7 @@ def execution_should_be_succ(exec_ctx):
)
)
def raised_type_error(unit, err_type, time, sym, msg, exec_ctx):
res = exec_ctx["result_set"]
res = exec_ctx["result_set"][0]
ngql = exec_ctx['ngql']
assert not res.is_succeeded(), f"Response should be failed: nGQL:{ngql}"
err_type = err_type.strip()
Expand Down Expand Up @@ -793,28 +802,28 @@ def drop_used_space(exec_ctx):
@then(parse("the execution plan should be:\n{plan}"))
def check_plan(request, plan, exec_ctx):
ngql = exec_ctx["ngql"]
resp = exec_ctx["result_set"]
expect = table(plan)
column_names = expect.get('column_names', [])
idx = column_names.index('dependencies')
rows = expect.get("rows", [])
for i, row in enumerate(rows):
row[idx] = [int(cell.strip()) for cell in row[idx].split(",") if len(cell) > 0]
rows[i] = row
differ = PlanDiffer(resp.plan_desc(), expect)

res = differ.diff()
if not res:
scen = request.function.__scenario__
feature = scen.feature.rel_filename
location = f"{feature}:{line_number(scen._steps, plan)}"
msg = [
f"Fail to exec: {ngql}",
f"Location: {location}",
differ.err_msg(),
]
for resp in exec_ctx["result_set"]:
expect = table(plan)
column_names = expect.get('column_names', [])
idx = column_names.index('dependencies')
rows = expect.get("rows", [])
for i, row in enumerate(rows):
row[idx] = [int(cell.strip()) for cell in row[idx].split(",") if len(cell) > 0]
rows[i] = row
differ = PlanDiffer(resp.plan_desc(), expect)

res = differ.diff()
if not res:
scen = request.function.__scenario__
feature = scen.feature.rel_filename
location = f"{feature}:{line_number(scen._steps, plan)}"
msg = [
f"Fail to exec: {ngql}",
f"Location: {location}",
differ.err_msg(),
]

assert res, "\n".join(msg)
assert res, "\n".join(msg)


@when(parse("executing query via graph {index:d}:\n{query}"))
Expand Down
14 changes: 14 additions & 0 deletions tests/tck/features/bugfix/MTSafeConcurrencyVariables.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) 2022 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.
Feature: Test MT-safe varaibles

Scenario: Binary plan of minus
# It's not stable to reproduce the bug, so we run it 100 times
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, what's the real reason about the crash? not thread-safe data structure usage?

Copy link
Contributor Author

@Shylock-Hg Shylock-Hg Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all variable should insert to map in building stage, and executor is running on MT-environment, so executor insert new variable will lead to condition race.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which line of code inserts new variabes to map in runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it leads the query YIELD 1 AS number MINUS YIELD 2 AS number to crash?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputVar = vctx_->anonVarGen()->getVar();

When executing query 100 times:
"""
YIELD 1 AS number MINUS YIELD 2 AS number
"""
Then the result should be, in any order:
| number |
| 1 |