Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick to v3.0.0 (0110-0114) #3730

Merged
merged 8 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ jobs:
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} tck
working-directory: tests/
timeout-minutes: 60
- name: LDBC
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} ldbc
working-directory: tests/
timeout-minutes: 60
- name: Down cluster
run: |
make RM_DIR=false down
Expand Down
5 changes: 3 additions & 2 deletions conf/nebula-graphd.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@
# System memory high watermark ratio, cancel the memory checking when the ratio greater than 1.0
--system_memory_high_watermark_ratio=0.8

########## metrics ##########
--enable_space_level_metrics=false

########## experimental feature ##########
# if use experimental features
--enable_experimental_feature=false

--enable_space_level_metrics=false
5 changes: 3 additions & 2 deletions conf/nebula-graphd.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@
# System memory high watermark ratio, cancel the memory checking when the ratio greater than 1.0
--system_memory_high_watermark_ratio=0.8

########## metrics ##########
--enable_space_level_metrics=false

########## experimental feature ##########
# if use experimental features
--enable_experimental_feature=false

--enable_space_level_metrics=false
42 changes: 16 additions & 26 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ bool MetaClient::loadData() {
GraphSpaceID spaceId = spaceInfo.first;
std::shared_ptr<SpaceInfoCache> info = spaceInfo.second;
std::shared_ptr<SpaceInfoCache> infoDeepCopy = std::make_shared<SpaceInfoCache>(*info);
infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_);
infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_);
infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_);
infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_);
infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_);
infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_);
newMetaData->localCache_[spaceId] = infoDeepCopy;
Expand Down Expand Up @@ -396,14 +396,14 @@ bool MetaClient::loadData() {
return true;
}

TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, ObjectPool* pool) {
TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec) {
TagSchemas tagSchemas;
TagID lastTagId = -1;
for (auto& tagIt : tagItemVec) {
// meta will return the different version from new to old
auto schema = std::make_shared<NebulaSchemaProvider>(tagIt.get_version());
for (const auto& colIt : tagIt.get_schema().get_columns()) {
addSchemaField(schema.get(), colIt, pool);
addSchemaField(schema.get(), colIt);
}
// handle schema property
schema->setProp(tagIt.get_schema().get_schema_prop());
Expand All @@ -417,16 +417,15 @@ TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, Ob
return tagSchemas;
}

EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec,
ObjectPool* pool) {
EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec) {
EdgeSchemas edgeSchemas;
std::unordered_set<std::pair<GraphSpaceID, EdgeType>> edges;
EdgeType lastEdgeType = -1;
for (auto& edgeIt : edgeItemVec) {
// meta will return the different version from new to old
auto schema = std::make_shared<NebulaSchemaProvider>(edgeIt.get_version());
for (const auto& col : edgeIt.get_schema().get_columns()) {
MetaClient::addSchemaField(schema.get(), col, pool);
MetaClient::addSchemaField(schema.get(), col);
}
// handle shcem property
schema->setProp(edgeIt.get_schema().get_schema_prop());
Expand All @@ -440,32 +439,19 @@ EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec
return edgeSchemas;
}

void MetaClient::addSchemaField(NebulaSchemaProvider* schema,
const cpp2::ColumnDef& col,
ObjectPool* pool) {
void MetaClient::addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col) {
bool hasDef = col.default_value_ref().has_value();
auto& colType = col.get_type();
size_t len = colType.type_length_ref().has_value() ? *colType.get_type_length() : 0;
cpp2::GeoShape geoShape =
colType.geo_shape_ref().has_value() ? *colType.get_geo_shape() : cpp2::GeoShape::ANY;
bool nullable = col.nullable_ref().has_value() ? *col.get_nullable() : false;
Expression* defaultValueExpr = nullptr;
std::string encoded;
if (hasDef) {
auto encoded = *col.get_default_value();
defaultValueExpr = Expression::decode(pool, folly::StringPiece(encoded.data(), encoded.size()));

if (defaultValueExpr == nullptr) {
LOG(ERROR) << "Wrong expr default value for column name: " << col.get_name();
hasDef = false;
}
encoded = *col.get_default_value();
}

schema->addField(col.get_name(),
colType.get_type(),
len,
nullable,
hasDef ? defaultValueExpr : nullptr,
geoShape);
schema->addField(col.get_name(), colType.get_type(), len, nullable, encoded, geoShape);
}

bool MetaClient::loadSchemas(GraphSpaceID spaceId,
Expand Down Expand Up @@ -493,9 +479,9 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId,
auto edgeItemVec = edgeRet.value();
allEdgeMap[spaceId] = {};
spaceInfoCache->tagItemVec_ = tagItemVec;
spaceInfoCache->tagSchemas_ = buildTagSchemas(tagItemVec, &spaceInfoCache->pool_);
spaceInfoCache->tagSchemas_ = buildTagSchemas(tagItemVec);
spaceInfoCache->edgeItemVec_ = edgeItemVec;
spaceInfoCache->edgeSchemas_ = buildEdgeSchemas(edgeItemVec, &spaceInfoCache->pool_);
spaceInfoCache->edgeSchemas_ = buildEdgeSchemas(edgeItemVec);

for (auto& tagIt : tagItemVec) {
tagNameIdMap.emplace(std::make_pair(spaceId, tagIt.get_tag_name()), tagIt.get_tag_id());
Expand Down Expand Up @@ -859,6 +845,10 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("Invalid param!");
case nebula::cpp2::ErrorCode::E_WRONGCLUSTER:
return Status::Error("Wrong cluster!");
case nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH:
return Status::Error("Zone not enough!");
case nebula::cpp2::ErrorCode::E_ZONE_IS_EMPTY:
return Status::Error("Zone is empty!");
case nebula::cpp2::ErrorCode::E_STORE_FAILURE:
return Status::Error("Store failure!");
case nebula::cpp2::ErrorCode::E_STORE_SEGMENT_ILLEGAL:
Expand Down
8 changes: 3 additions & 5 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ struct SpaceInfoCache {
std::vector<cpp2::IndexItem> edgeIndexItemVec_;
Indexes edgeIndexes_;
Listeners listeners_;
// objPool used to decode when adding field
ObjectPool pool_;
std::unordered_map<PartitionID, TermID> termOfPartition_;

SpaceInfoCache() = default;
Expand Down Expand Up @@ -816,10 +814,10 @@ class MetaClient {
ServiceClientsList serviceClientList_;
};

void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool);
void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col);

TagSchemas buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, ObjectPool* pool);
EdgeSchemas buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec, ObjectPool* pool);
TagSchemas buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec);
EdgeSchemas buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec);

std::unique_ptr<thread::GenericWorker> bgThread_;
SpaceNameIdMap spaceIndexByName_;
Expand Down
6 changes: 4 additions & 2 deletions src/codec/RowWriterV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,9 @@ WriteResult RowWriterV2::checkUnsetFields() noexcept {

WriteResult r = WriteResult::SUCCEEDED;
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
ObjectPool pool;
auto& exprStr = field->defaultValue();
auto expr = Expression::decode(&pool, folly::StringPiece(exprStr.data(), exprStr.size()));
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::NULLVALUE:
Expand Down Expand Up @@ -851,7 +853,7 @@ WriteResult RowWriterV2::checkUnsetFields() noexcept {
default:
LOG(FATAL) << "Unsupported default value type: " << defVal.typeName()
<< ", default value: " << defVal
<< ", default value expr: " << field->defaultValue()->toString();
<< ", default value expr: " << field->defaultValue();
}
} else {
// Set NULL
Expand Down
6 changes: 3 additions & 3 deletions src/codec/test/ResultSchemaProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ResultSchemaProvider::ResultSchemaField::ResultSchemaField(std::string name,
bool nullable,
int32_t offset,
size_t nullFlagPos,
Expression* defaultValue,
std::string defaultValue,
meta::cpp2::GeoShape geoShape)
: name_(std::move(name)),
type_(type),
Expand All @@ -42,14 +42,14 @@ PropertyType ResultSchemaProvider::ResultSchemaField::type() const {
}

bool ResultSchemaProvider::ResultSchemaField::hasDefault() const {
return defaultValue_ != nullptr;
return defaultValue_ != "";
}

bool ResultSchemaProvider::ResultSchemaField::nullable() const {
return nullable_;
}

Expression* ResultSchemaProvider::ResultSchemaField::defaultValue() const {
const std::string& ResultSchemaProvider::ResultSchemaField::defaultValue() const {
return defaultValue_;
}

Expand Down
6 changes: 3 additions & 3 deletions src/codec/test/ResultSchemaProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ class ResultSchemaProvider : public meta::SchemaProviderIf {
bool nullable,
int32_t offset,
size_t nullFlagPos,
Expression* defaultValue = nullptr,
std::string defaultValue = "",
meta::cpp2::GeoShape = meta::cpp2::GeoShape::ANY);

const char* name() const override;
nebula::cpp2::PropertyType type() const override;
bool nullable() const override;
bool hasDefault() const override;
Expression* defaultValue() const override;
const std::string& defaultValue() const override;
size_t size() const override;
size_t offset() const override;
size_t nullFlagPos() const override;
Expand All @@ -41,7 +41,7 @@ class ResultSchemaProvider : public meta::SchemaProviderIf {
bool nullable_;
int32_t offset_;
size_t nullFlagPos_;
Expression* defaultValue_;
std::string defaultValue_;
meta::cpp2::GeoShape geoShape_;
};

Expand Down
10 changes: 8 additions & 2 deletions src/codec/test/SchemaWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,14 @@ SchemaWriter& SchemaWriter::appendCol(folly::StringPiece name,
nullFlagPos = numNullableFields_++;
}

columns_.emplace_back(
name.toString(), type, size, nullable, offset, nullFlagPos, defaultValue, geoShape);
columns_.emplace_back(name.toString(),
type,
size,
nullable,
offset,
nullFlagPos,
defaultValue ? defaultValue->encode() : "",
geoShape);
nameIndex_.emplace(std::make_pair(hash, columns_.size() - 1));

return *this;
Expand Down
1 change: 1 addition & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ nebula_add_subdirectory(ssl)
nebula_add_subdirectory(geo)
nebula_add_subdirectory(memory)
nebula_add_subdirectory(id)
nebula_add_subdirectory(log)
2 changes: 2 additions & 0 deletions src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
X(E_CONFLICT, -2008) \
X(E_INVALID_PARM, -2009) \
X(E_WRONGCLUSTER, -2010) \
X(E_ZONE_NOT_ENOUGH, -2011) \
X(E_ZONE_IS_EMPTY, -2012) \
\
X(E_STORE_FAILURE, -2021) \
X(E_STORE_SEGMENT_ILLEGAL, -2022) \
Expand Down
8 changes: 8 additions & 0 deletions src/common/log/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copyright (c) 2021 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.

nebula_add_library(
log_monitor_obj OBJECT
LogMonitor.cpp
)
69 changes: 69 additions & 0 deletions src/common/log/LogMonitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#include "common/log/LogMonitor.h"

namespace nebula {

// default min_warn is 256M, disk freebytes < 256M will change LOG_LEVEL to WARNING
DEFINE_uint64(log_min_reserved_bytes_to_warn,
256 * (1UL << 20),
"if freebytes in logdir less than this, will change log level to WARN");

// default min_error is 64M, disk freebytes < 64M will change LOG_LEVEL to ERROR
DEFINE_uint64(log_min_reserved_bytes_to_error,
64 * (1UL << 20),
"if freebytes in logdir less than this, will change log level to ERROR");

// default min_fatal is 4M, disk freebytes < 4M will change LOG_LEVEL to FATAL
DEFINE_uint64(log_min_reserved_bytes_to_fatal,
4 * (1UL << 20),
"if freebytes in logdir less than this, will change log level to FATAL");

// default check log_disk interval is 10s
DEFINE_int32(log_disk_check_interval_secs, 10, "interval to check free space of log path");

void LogMonitor::getLogDiskFreeByte() {
boost::system::error_code ec;
auto info = boost::filesystem::space(FLAGS_log_dir, ec);
if (!ec) {
freeByte_ = info.available;
} else {
LOG(WARNING) << "Get filesystem info of logdir: " << FLAGS_log_dir << " failed";
}
}

void LogMonitor::checkAndChangeLogLevel() {
getLogDiskFreeByte();

if (FLAGS_log_min_reserved_bytes_to_fatal > FLAGS_log_min_reserved_bytes_to_error ||
FLAGS_log_min_reserved_bytes_to_fatal > FLAGS_log_min_reserved_bytes_to_warn ||
FLAGS_log_min_reserved_bytes_to_error > FLAGS_log_min_reserved_bytes_to_warn) {
LOG(ERROR) << "Get Invalid config in LogMonitor, the LogMonitor config should be "
<< "FLAGS_log_min_reserved_bytes_to_warn >"
<< "FLAGS_log_min_reserved_bytes_to_error > FLAGS_log_min_reserved_bytes_to_fatal;";
return;
}

if (freeByte_ < FLAGS_log_min_reserved_bytes_to_fatal) {
LOG(ERROR) << "log disk freebyte is less than " << FLAGS_log_min_reserved_bytes_to_fatal
<< ", change log level to FATAL";
FLAGS_minloglevel = google::GLOG_FATAL;
} else if (freeByte_ < FLAGS_log_min_reserved_bytes_to_error) {
LOG(ERROR) << "log disk freebyte is less than " << FLAGS_log_min_reserved_bytes_to_error
<< ", change log level to ERROR";
FLAGS_minloglevel = google::GLOG_ERROR;
} else if (freeByte_ < FLAGS_log_min_reserved_bytes_to_warn) {
LOG(ERROR) << "log disk freebyte is less than " << FLAGS_log_min_reserved_bytes_to_warn
<< ", change log level to WARNING";
FLAGS_minloglevel = google::GLOG_WARNING;
} else {
// if freeByte_ is bigger than every min_log_flag, reset the FLAGS_minloglevel to old value
if (FLAGS_minloglevel != oldMinLogLevel_) {
FLAGS_minloglevel = oldMinLogLevel_;
}
}
}

} // namespace nebula
43 changes: 43 additions & 0 deletions src/common/log/LogMonitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once

#include <boost/filesystem.hpp>
#include <boost/system/error_code.hpp>

#include "common/thread/GenericWorker.h"

namespace nebula {

DECLARE_uint64(log_min_reserved_bytes_to_warn);
DECLARE_uint64(log_min_reserved_bytes_to_error);
DECLARE_uint64(log_min_reserved_bytes_to_fatal);
DECLARE_int32(log_disk_check_interval_secs);

class LogMonitor {
public:
LogMonitor() : oldMinLogLevel_(FLAGS_minloglevel), freeByte_(1UL << 60) {
worker_ = std::make_shared<thread::GenericWorker>();
CHECK(worker_->start());
worker_->addRepeatTask(
FLAGS_log_disk_check_interval_secs * 1000, &LogMonitor::checkAndChangeLogLevel, this);
}

~LogMonitor() {
worker_->stop();
worker_->wait();
}

void getLogDiskFreeByte();

void checkAndChangeLogLevel();

private:
int32_t oldMinLogLevel_;
std::shared_ptr<thread::GenericWorker> worker_;
std::atomic_uint64_t freeByte_;
};

} // namespace nebula
Loading