Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic delete #446

Merged
merged 13 commits into from Mar 18, 2024
6 changes: 3 additions & 3 deletions cpp/periodic_module/CMakeLists.txt
@@ -1,10 +1,10 @@
include(GNUInstallDirs)

# Add all module files related to graph util module
set(periodic_iterate_src
periodic_iterate.cpp)
set(periodic_src
periodic.cpp)

add_query_module(periodic 1 "${periodic_iterate_src}")
add_query_module(periodic 1 "${periodic_src}")

target_compile_definitions(periodic PRIVATE MGCLIENT_STATIC_DEFINE)
target_include_directories(periodic PRIVATE ${MGCLIENT_INCLUDE_DIRS})
Expand Down
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make batch size-related behavior consistent btwn ValidateBatchSize and ValidateDeletionConfig, and try to extract that into its own function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I recommend using positional arguments to avoid repeating, e.g. use
fmt::format("MATCH ({0}) WHERE ID({0}) = __{0}_id", node_name)
instead of
fmt::format("MATCH ({}) WHERE ID({}) = __{}_id", node_name, node_name, node_name) (and ditto elsewhere).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean on the first comment

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the first comment: ValidateBatchSize and ValidateDeletionConfig have different checks & error messages for the batch_size parameter. I advise to make them consistent, and then to extract that into its own function to avoid repeated code.

@@ -1,17 +1,25 @@
#include <fmt/core.h>
#include <mgp.hpp>
#include <string>
#include <string_view>

#include "mgclient.hpp"

const char *kProcedurePeriodic = "iterate";
const char *kProcedurePeriodicIterate = "iterate";
const char *kProcedurePeriodicDelete = "delete";
const char *kArgumentInputQuery = "input_query";
const char *kArgumentRunningQuery = "running_query";
const char *kArgumentConfig = "config";
const char *kConfigKeyBatchSize = "batch_size";
const char *kBatchInternalName = "__batch";
const char *kBatchRowInternalName = "__batch_row";
const char *kConfigKeyLabels = "labels";
const char *kConfigKeyEdgeTypes = "edge_types";

const char *kReturnSuccess = "success";
const char *kReturnNumBatches = "number_of_executed_batches";
const char *kReturnNumDeletedNodes = "number_of_deleted_nodes";
const char *kReturnNumDeletedRelationships = "number_of_deleted_relationships";

const char *kMgHost = "MG_HOST";
const char *kMgPort = "MG_PORT";
Expand All @@ -27,6 +35,45 @@ struct ParamNames {
std::vector<std::string> primitive_names;
};

struct DeletionInfo {
uint64_t batch_size{0};
std::vector<std::string> labels{};
std::vector<std::string> edge_types{};
};

struct DeletionResult {
uint64_t num_batches{0};
uint64_t num_deleted_nodes{0};
uint64_t num_deleted_relationships{0};
};

mg::Client::Params GetClientParams() {
mg::Client::Params mg_params = {
.host = std::string(kDefaultHost), .port = kDefaultPort, .username = "", .password = ""};

auto *maybe_host = std::getenv(kMgHost);
if (maybe_host) {
mg_params.host = std::string(maybe_host);
}

const auto *maybe_port = std::getenv(kMgPort);
if (maybe_port) {
mg_params.port = static_cast<uint16_t>(std::stoi(std::string(maybe_port)));
}

const auto *maybe_username = std::getenv(kMgUsername);
if (maybe_username) {
mg_params.username = std::string(maybe_username);
}

const auto *maybe_password = std::getenv(kMgPassword);
if (maybe_password) {
mg_params.password = std::string(maybe_password);
}

return mg_params;
}
Josipmrden marked this conversation as resolved.
Show resolved Hide resolved

ParamNames ExtractParamNames(const std::vector<std::string> &columns, const std::vector<mg::Value> &batch_row) {
ParamNames res;
for (size_t i = 0; i < columns.size(); i++) {
Expand Down Expand Up @@ -64,11 +111,11 @@ std::string Join(const std::vector<std::string> &strings, const std::string &del
}

std::string GetGraphFirstClassEntityAlias(const std::string &internal_name, const std::string &entity_name) {
return fmt::format("{}.{} AS __{}_id", internal_name, entity_name, entity_name);
return fmt::format("{0}.{1} AS __{1}_id", internal_name, entity_name);
}

std::string GetPrimitiveEntityAlias(const std::string &internal_name, const std::string &primitive_name) {
return fmt::format("{}.{} AS {}", internal_name, primitive_name, primitive_name);
return fmt::format("{0}.{1} AS {1}", internal_name, primitive_name);
}

std::string ConstructWithStatement(const ParamNames &names) {
Expand All @@ -87,11 +134,11 @@ std::string ConstructWithStatement(const ParamNames &names) {
}

std::string ConstructMatchingNodeById(const std::string &node_name) {
return fmt::format("MATCH ({}) WHERE ID({}) = __{}_id", node_name, node_name, node_name);
return fmt::format("MATCH ({0}) WHERE ID({0}) = __{0}_id", node_name);
}

std::string ConstructMatchingRelationshipById(const std::string &rel_name) {
return fmt::format("MATCH ()-[{}]->() WHERE ID({}) = __{}_id", rel_name, rel_name, rel_name);
return fmt::format("MATCH ()-[{0}]->() WHERE ID({0}) = __{0}_id", rel_name);
}

std::string ConstructMatchGraphEntitiesById(const ParamNames &names) {
Expand Down Expand Up @@ -123,7 +170,8 @@ std::string ConstructQueryPrefix(const ParamNames &names) {
return fmt::format("{} {} {}", unwind_batch, with_variables, match_string);
}

mg::Map ConstructQueryParams(const std::vector<std::string> &columns, const std::vector<std::vector<mg::Value>> &batch) {
mg::Map ConstructQueryParams(const std::vector<std::string> &columns,
const std::vector<std::vector<mg::Value>> &batch) {
mg::Map params(1);
mg::List list_value(batch.size());

Expand Down Expand Up @@ -193,36 +241,190 @@ void ValidateBatchSize(const mgp::Value &batch_size_value) {
}
}

mg::Client::Params GetClientParams() {
auto *host = kDefaultHost;
auto port = kDefaultPort;
auto *username = "";
auto *password = "";
void ValidateDeletionConfigEntities(const mgp::Map &config, std::string config_key) {
auto key = std::string_view(config_key);
if (!config.KeyExists(key)) {
return;
}

auto *maybe_host = std::getenv(kMgHost);
if (maybe_host) {
host = std::move(maybe_host);
auto value = config.At(key);
if (!value.IsString() && !value.IsList()) {
throw std::runtime_error(fmt::format("Invalid config for config parameter {}!", config_key));
}

const auto *maybe_port = std::getenv(kMgPort);
if (maybe_port) {
port = static_cast<uint16_t>(std::move(*maybe_port));
if (value.IsString()) {
return;
}

const auto *maybe_username = std::getenv(kMgUsername);
if (maybe_username) {
username = std::move(maybe_username);
auto list_value = value.ValueList();
for (auto elem : list_value) {
if (!elem.IsString()) {
throw std::runtime_error(fmt::format("Invalid config for config parameter {}!", config_key));
}
}
}

const auto *maybe_password = std::getenv(kMgPassword);
if (maybe_password) {
password = std::move(maybe_password);
void ValidateDeletionConfig(const mgp::Map &config) {
auto batch_size_key = std::string(kConfigKeyBatchSize);
auto labels_key = std::string(kConfigKeyLabels);
auto edge_types_key = std::string(kConfigKeyEdgeTypes);

if (!config.KeyExists(batch_size_key)) {
throw std::runtime_error("Periodic.delete() did not specify config parameter batch_size!");
}

auto batch_size_value = config.At(batch_size_key);
if (!batch_size_value.IsInt()) {
throw std::runtime_error("Batch size needs to be an integer!");
}

if (batch_size_value.ValueInt() <= 0) {
throw std::runtime_error("Batch size can't be a non-negative integer!");
}

ValidateDeletionConfigEntities(config, labels_key);
ValidateDeletionConfigEntities(config, edge_types_key);
}

void EmplaceFromConfig(const mgp::Map &config, std::vector<std::string> &vec, std::string &config_key) {
auto key = std::string_view(config_key);
if (!config.KeyExists(key)) {
return;
}

auto value = config.At(key);
if (value.IsString()) {
vec.emplace_back(value.ValueString());
} else if (value.IsList()) {
auto list_value = value.ValueList();
for (const auto elem : list_value) {
vec.emplace_back(elem.ValueString());
}
}
}

DeletionInfo GetDeletionInfo(const mgp::Map &config) {
std::vector<std::string> labels, edge_types;

ValidateDeletionConfig(config);

auto batch_size_key = std::string(kConfigKeyBatchSize);
auto labels_key = std::string(kConfigKeyLabels);
auto edge_types_key = std::string(kConfigKeyEdgeTypes);

auto batch_size = config.At(batch_size_key).ValueInt();

EmplaceFromConfig(config, labels, labels_key);
EmplaceFromConfig(config, edge_types, edge_types_key);

return {.batch_size = static_cast<uint64_t>(batch_size),
.labels = std::move(labels),
.edge_types = std::move(edge_types)};
}

void ExecutePeriodicDelete(const DeletionInfo &deletion_info, DeletionResult &deletion_result) {
auto delete_all = deletion_info.edge_types.empty() && deletion_info.labels.empty();
auto delete_nodes = delete_all || !deletion_info.labels.empty();
auto delete_edges = delete_all || !deletion_info.labels.empty() || !deletion_info.edge_types.empty();

auto labels_formatted = deletion_info.labels.empty() ? "" : fmt::format(":{}", Join(deletion_info.labels, ":"));
auto edge_types_formatted =
deletion_info.edge_types.empty() ? "" : fmt::format(":{}", Join(deletion_info.edge_types, "|"));

auto relationships_deletion_query =
fmt::format("MATCH (n{})-[r{}]-(m) WITH DISTINCT r LIMIT {} DELETE r RETURN count(r) AS num_deleted",
labels_formatted, edge_types_formatted, deletion_info.batch_size);
auto nodes_deletion_query =
fmt::format("MATCH (n{}) WITH DISTINCT n LIMIT {} DETACH DELETE n RETURN count(n) AS num_deleted",
labels_formatted, deletion_info.batch_size);

auto client = mg::Client::Connect(GetClientParams());
if (!client) {
throw std::runtime_error("Unable to connect to client!");
}

if (delete_edges) {
while (true) {
if (!client->Execute(relationships_deletion_query)) {
throw std::runtime_error("Error while executing periodic iterate!");
}

auto result = client->FetchOne();
if (!result || (*result).size() != 1) {
throw std::runtime_error("No result received from periodic delete!");
}

client->DiscardAll();

uint64_t num_deleted = static_cast<uint64_t>((*result)[0].ValueInt());
deletion_result.num_batches++;
deletion_result.num_deleted_relationships += num_deleted;
if (static_cast<uint64_t>(num_deleted) < deletion_info.batch_size) {
break;
}
}
}

if (delete_nodes) {
while (true) {
if (!client->Execute(nodes_deletion_query)) {
throw std::runtime_error("Error while executing periodic iterate!");
}

auto result = client->FetchOne();
if (!result || (*result).size() != 1) {
throw std::runtime_error("No result received from periodic delete!");
}

client->DiscardAll();

uint64_t num_deleted = static_cast<uint64_t>((*result)[0].ValueInt());
deletion_result.num_batches++;
deletion_result.num_deleted_nodes += num_deleted;
if (static_cast<uint64_t>(num_deleted) < deletion_info.batch_size) {
break;
}
}
}
}

void PeriodicDelete(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);

const auto record_factory = mgp::RecordFactory(result);
auto record = record_factory.NewRecord();

const auto config = arguments[0].ValueMap();

DeletionResult deletion_result;

try {
mg::Client::Init();

auto client = mg::Client::Connect(GetClientParams());

return mg::Client::Params{.host = std::move(host),
.port = std::move(port),
.username = std::move(username),
.password = std::move(password)};
if (!client) {
throw std::runtime_error("Unable to connect to client!");
}

auto deletion_info = GetDeletionInfo(config);

ExecutePeriodicDelete(deletion_info, deletion_result);

mg::Client::Finalize();

record.Insert(kReturnSuccess, true);
record.Insert(kReturnNumBatches, static_cast<int64_t>(deletion_result.num_batches));
record.Insert(kReturnNumDeletedNodes, static_cast<int64_t>(deletion_result.num_deleted_nodes));
record.Insert(kReturnNumDeletedRelationships, static_cast<int64_t>(deletion_result.num_deleted_relationships));
} catch (const std::exception &e) {
record_factory.SetErrorMessage(e.what());
record.Insert(kReturnSuccess, false);
record.Insert(kReturnNumBatches, static_cast<int64_t>(deletion_result.num_batches));
record.Insert(kReturnNumDeletedNodes, static_cast<int64_t>(deletion_result.num_deleted_nodes));
record.Insert(kReturnNumDeletedRelationships, static_cast<int64_t>(deletion_result.num_deleted_relationships));
}
}

void PeriodicIterate(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
Expand Down Expand Up @@ -298,10 +500,17 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
try {
mgp::MemoryDispatcherGuard guard{memory};
mgp::AddProcedure(
PeriodicIterate, kProcedurePeriodic, mgp::ProcedureType::Read,
PeriodicIterate, kProcedurePeriodicIterate, mgp::ProcedureType::Read,
{mgp::Parameter(kArgumentInputQuery, mgp::Type::String),
mgp::Parameter(kArgumentRunningQuery, mgp::Type::String), mgp::Parameter(kArgumentConfig, mgp::Type::Map)},
{mgp::Return(kReturnSuccess, mgp::Type::Bool), mgp::Return(kReturnNumBatches, mgp::Type::Int)}, module, memory);

mgp::AddProcedure(PeriodicDelete, kProcedurePeriodicDelete, mgp::ProcedureType::Read,
{mgp::Parameter(kArgumentConfig, mgp::Type::Map)},
{mgp::Return(kReturnSuccess, mgp::Type::Bool), mgp::Return(kReturnNumBatches, mgp::Type::Int),
mgp::Return(kReturnNumDeletedNodes, mgp::Type::Int),
mgp::Return(kReturnNumDeletedRelationships, mgp::Type::Int)},
module, memory);
} catch (const std::exception &e) {
return 1;
}
Expand Down
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ());
@@ -0,0 +1,5 @@
query: >
CALL periodic.delete({batch_size: "a"}) YIELD * RETURN *;

exception: >
Batch size needs to be an integer!
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ());
@@ -0,0 +1,5 @@
query: >
CALL periodic.delete({batch_size: -1}) YIELD * RETURN *;

exception: >
Batch size can't be a non-negative integer!
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ()-[:TYPE]->());
@@ -0,0 +1,8 @@
query: >
CALL periodic.delete({batch_size: 10}) YIELD * RETURN *;

output:
- number_of_deleted_nodes: 20
number_of_deleted_relationships: 10
number_of_executed_batches: 5
success: true
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ());
@@ -0,0 +1,5 @@
query: >
CALL periodic.delete({batch_size: 10, edge_types: [1]}) YIELD * RETURN *;

exception: >
Invalid config for config parameter edge_types!
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ());
@@ -0,0 +1,5 @@
query: >
CALL periodic.delete({batch_size: 10, labels: [1]}) YIELD * RETURN *;

exception: >
Invalid config for config parameter labels!