Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic delete #446

Merged
merged 13 commits into from Mar 18, 2024
6 changes: 3 additions & 3 deletions cpp/periodic_module/CMakeLists.txt
@@ -1,10 +1,10 @@
include(GNUInstallDirs)

# Add all module files related to graph util module
set(periodic_iterate_src
periodic_iterate.cpp)
set(periodic_src
periodic.cpp)

add_query_module(periodic 1 "${periodic_iterate_src}")
add_query_module(periodic 1 "${periodic_src}")

target_compile_definitions(periodic PRIVATE MGCLIENT_STATIC_DEFINE)
target_include_directories(periodic PRIVATE ${MGCLIENT_INCLUDE_DIRS})
Expand Down
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make batch size-related behavior consistent btwn ValidateBatchSize and ValidateDeletionConfig, and try to extract that into its own function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I recommend using positional arguments to avoid repeating, e.g. use
fmt::format("MATCH ({0}) WHERE ID({0}) = __{0}_id", node_name)
instead of
fmt::format("MATCH ({}) WHERE ID({}) = __{}_id", node_name, node_name, node_name) (and ditto elsewhere).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean on the first comment

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the first comment: ValidateBatchSize and ValidateDeletionConfig have different checks & error messages for the batch_size parameter. I advise to make them consistent, and then to extract that into its own function to avoid repeated code.

@@ -1,17 +1,25 @@
#include <fmt/core.h>
#include <mgp.hpp>
#include <string>
#include <string_view>

#include "mgclient.hpp"

const char *kProcedurePeriodic = "iterate";
const char *kProcedurePeriodicIterate = "iterate";
const char *kProcedurePeriodicDelete = "delete";
const char *kArgumentInputQuery = "input_query";
const char *kArgumentRunningQuery = "running_query";
const char *kArgumentConfig = "config";
const char *kConfigKeyBatchSize = "batch_size";
const char *kBatchInternalName = "__batch";
const char *kBatchRowInternalName = "__batch_row";
const char *kConfigKeyLabels = "labels";
const char *kConfigKeyEdgeTypes = "edge_types";

const char *kReturnSuccess = "success";
const char *kReturnNumBatches = "number_of_executed_batches";
const char *kReturnNumDeletedNodes = "number_of_deleted_nodes";
const char *kReturnNumDeletedRelationships = "number_of_deleted_relationships";

const char *kMgHost = "MG_HOST";
const char *kMgPort = "MG_PORT";
Expand All @@ -27,6 +35,50 @@ struct ParamNames {
std::vector<std::string> primitive_names;
};

struct DeletionInfo {
uint64_t batch_size{0};
std::vector<std::string> labels{};
std::vector<std::string> edge_types{};
};

struct DeletionResult {
uint64_t num_batches{0};
uint64_t num_deleted_nodes{0};
uint64_t num_deleted_relationships{0};
};

mg::Client::Params GetClientParams() {
auto *host = kDefaultHost;
auto port = kDefaultPort;
auto *username = "";
auto *password = "";

auto *maybe_host = std::getenv(kMgHost);
if (maybe_host) {
host = std::move(maybe_host);
}

const auto *maybe_port = std::getenv(kMgPort);
if (maybe_port) {
port = static_cast<uint16_t>(std::move(*maybe_port));
}

const auto *maybe_username = std::getenv(kMgUsername);
if (maybe_username) {
username = std::move(maybe_username);
}

const auto *maybe_password = std::getenv(kMgPassword);
if (maybe_password) {
password = std::move(maybe_password);
}

return mg::Client::Params{.host = std::move(host),
.port = std::move(port),
.username = std::move(username),
.password = std::move(password)};
}
Josipmrden marked this conversation as resolved.
Show resolved Hide resolved

ParamNames ExtractParamNames(const std::vector<std::string> &columns, const std::vector<mg::Value> &batch_row) {
ParamNames res;
for (size_t i = 0; i < columns.size(); i++) {
Expand Down Expand Up @@ -123,7 +175,8 @@ std::string ConstructQueryPrefix(const ParamNames &names) {
return fmt::format("{} {} {}", unwind_batch, with_variables, match_string);
}

mg::Map ConstructQueryParams(const std::vector<std::string> &columns, const std::vector<std::vector<mg::Value>> &batch) {
mg::Map ConstructQueryParams(const std::vector<std::string> &columns,
const std::vector<std::vector<mg::Value>> &batch) {
mg::Map params(1);
mg::List list_value(batch.size());

Expand Down Expand Up @@ -193,36 +246,189 @@ void ValidateBatchSize(const mgp::Value &batch_size_value) {
}
}

mg::Client::Params GetClientParams() {
auto *host = kDefaultHost;
auto port = kDefaultPort;
auto *username = "";
auto *password = "";
void ValidateDeletionConfigEntities(const mgp::Map &config, std::string config_key) {
auto key = std::string_view(config_key);
if (!config.KeyExists(key)) {
return;
}

auto *maybe_host = std::getenv(kMgHost);
if (maybe_host) {
host = std::move(maybe_host);
auto value = config.At(key);
if (!value.IsString() && !value.IsList()) {
throw std::runtime_error(fmt::format("Invalid config for config parameter {}!", config_key));
}

const auto *maybe_port = std::getenv(kMgPort);
if (maybe_port) {
port = static_cast<uint16_t>(std::move(*maybe_port));
if (value.IsString()) {
return;
}

const auto *maybe_username = std::getenv(kMgUsername);
if (maybe_username) {
username = std::move(maybe_username);
if (value.IsList()) {
auto list_value = value.ValueList();
for (auto elem : list_value) {
if (!elem.IsString()) {
throw std::runtime_error(fmt::format("Invalid config for config parameter {}!", config_key));
}
antoniofilipovic marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

const auto *maybe_password = std::getenv(kMgPassword);
if (maybe_password) {
password = std::move(maybe_password);
void ValidateDeletionConfig(const mgp::Map &config) {
auto batch_size_key = std::string(kConfigKeyBatchSize);
auto labels_key = std::string(kConfigKeyLabels);
auto edge_types_key = std::string(kConfigKeyEdgeTypes);

if (!config.KeyExists(batch_size_key)) {
throw std::runtime_error("Periodic.delete() did not specify config parameter batch_size!");
}

return mg::Client::Params{.host = std::move(host),
.port = std::move(port),
.username = std::move(username),
.password = std::move(password)};
auto batch_size_value = config.At(batch_size_key);
if (!batch_size_value.IsInt()) {
throw std::runtime_error("Batch size needs to be an integer!");
}

if (batch_size_value.ValueInt() <= 0) {
throw std::runtime_error("Batch size can't be a non-negative integer!");
}

ValidateDeletionConfigEntities(config, labels_key);
ValidateDeletionConfigEntities(config, edge_types_key);
}

void EmplaceFromConfig(const mgp::Map &config, std::vector<std::string> &vec, std::string &config_key) {
auto key = std::string_view(config_key);
if (config.KeyExists(key)) {
auto value = config.At(key);
if (value.IsString()) {
vec.emplace_back(std::string(value.ValueString()));
antoniofilipovic marked this conversation as resolved.
Show resolved Hide resolved
} else if (value.IsList()) {
auto list_value = value.ValueList();
for (const auto elem : list_value) {
vec.emplace_back(elem.ValueString());
}
}
}
antoniofilipovic marked this conversation as resolved.
Show resolved Hide resolved
}

DeletionInfo GetDeletionInfo(const mgp::Map &config) {
std::vector<std::string> labels, edge_types;

ValidateDeletionConfig(config);

auto batch_size_key = std::string(kConfigKeyBatchSize);
auto labels_key = std::string(kConfigKeyLabels);
auto edge_types_key = std::string(kConfigKeyEdgeTypes);

auto batch_size = config.At(batch_size_key).ValueInt();

EmplaceFromConfig(config, labels, labels_key);
EmplaceFromConfig(config, edge_types, edge_types_key);

return {.batch_size = static_cast<uint64_t>(batch_size),
.labels = std::move(labels),
.edge_types = std::move(edge_types)};
}

void ExecutePeriodicDelete(DeletionInfo deletion_info, DeletionResult &deletion_result) {
antoniofilipovic marked this conversation as resolved.
Show resolved Hide resolved
auto delete_all = deletion_info.edge_types.empty() && deletion_info.labels.empty();
auto delete_nodes = delete_all || !deletion_info.labels.empty();
auto delete_edges = delete_all || !deletion_info.labels.empty() || !deletion_info.edge_types.empty();

auto labels_formatted = deletion_info.labels.empty() ? "" : fmt::format(":{}", Join(deletion_info.labels, ":"));
auto edge_types_formatted =
deletion_info.edge_types.empty() ? "" : fmt::format(":{}", Join(deletion_info.edge_types, "|"));

auto relationships_deletion_query =
fmt::format("MATCH (n{})-[r{}]-(m) WITH DISTINCT r LIMIT {} DELETE r RETURN count(r) AS num_deleted", labels_formatted,
edge_types_formatted, deletion_info.batch_size);
auto nodes_deletion_query = fmt::format("MATCH (n{}) WITH DISTINCT n LIMIT {} DETACH DELETE n RETURN count(n) AS num_deleted",
labels_formatted, deletion_info.batch_size);

auto client = mg::Client::Connect(GetClientParams());
if (!client) {
throw std::runtime_error("Unable to connect to client!");
}

if (delete_edges) {
while (true) {
if (!client->Execute(relationships_deletion_query)) {
throw std::runtime_error("Error while executing periodic iterate!");
}

auto result = client->FetchOne();
if (!result || (*result).size() != 1) {
throw std::runtime_error("No result received from periodic delete!");
}

client->DiscardAll();

uint64_t num_deleted = static_cast<uint64_t>((*result)[0].ValueInt());
deletion_result.num_batches++;
deletion_result.num_deleted_relationships += num_deleted;
if (static_cast<uint64_t>(num_deleted) < deletion_info.batch_size) {
break;
}
}
}

if (delete_nodes) {
while (true) {
if (!client->Execute(nodes_deletion_query)) {
throw std::runtime_error("Error while executing periodic iterate!");
}

auto result = client->FetchOne();
if (!result || (*result).size() != 1) {
throw std::runtime_error("No result received from periodic delete!");
}

client->DiscardAll();

uint64_t num_deleted = static_cast<uint64_t>((*result)[0].ValueInt());
deletion_result.num_batches++;
deletion_result.num_deleted_nodes += num_deleted;
if (static_cast<uint64_t>(num_deleted) < deletion_info.batch_size) {
break;
}
}
}
}

void PeriodicDelete(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);

const auto record_factory = mgp::RecordFactory(result);
auto record = record_factory.NewRecord();

const auto config = arguments[0].ValueMap();

DeletionResult deletion_result;

try {
mg::Client::Init();

auto client = mg::Client::Connect(GetClientParams());

if (!client) {
throw std::runtime_error("Unable to connect to client!");
}

auto deletion_info = GetDeletionInfo(config);

ExecutePeriodicDelete(std::move(deletion_info), deletion_result);

mg::Client::Finalize();

record.Insert(kReturnSuccess, true);
record.Insert(kReturnNumBatches, static_cast<int64_t>(deletion_result.num_batches));
record.Insert(kReturnNumDeletedNodes, static_cast<int64_t>(deletion_result.num_deleted_nodes));
record.Insert(kReturnNumDeletedRelationships, static_cast<int64_t>(deletion_result.num_deleted_relationships));
} catch (const std::exception &e) {
record_factory.SetErrorMessage(e.what());
record.Insert(kReturnSuccess, false);
record.Insert(kReturnNumBatches, static_cast<int64_t>(deletion_result.num_batches));
record.Insert(kReturnNumDeletedNodes, static_cast<int64_t>(deletion_result.num_deleted_nodes));
record.Insert(kReturnNumDeletedRelationships, static_cast<int64_t>(deletion_result.num_deleted_relationships));
}
}

void PeriodicIterate(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
Expand Down Expand Up @@ -298,10 +504,17 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
try {
mgp::MemoryDispatcherGuard guard{memory};
mgp::AddProcedure(
PeriodicIterate, kProcedurePeriodic, mgp::ProcedureType::Read,
PeriodicIterate, kProcedurePeriodicIterate, mgp::ProcedureType::Read,
{mgp::Parameter(kArgumentInputQuery, mgp::Type::String),
mgp::Parameter(kArgumentRunningQuery, mgp::Type::String), mgp::Parameter(kArgumentConfig, mgp::Type::Map)},
{mgp::Return(kReturnSuccess, mgp::Type::Bool), mgp::Return(kReturnNumBatches, mgp::Type::Int)}, module, memory);

mgp::AddProcedure(PeriodicDelete, kProcedurePeriodicDelete, mgp::ProcedureType::Read,
{mgp::Parameter(kArgumentConfig, mgp::Type::Map)},
{mgp::Return(kReturnSuccess, mgp::Type::Bool), mgp::Return(kReturnNumBatches, mgp::Type::Int),
mgp::Return(kReturnNumDeletedNodes, mgp::Type::Int),
mgp::Return(kReturnNumDeletedRelationships, mgp::Type::Int)},
module, memory);
} catch (const std::exception &e) {
return 1;
}
Expand Down
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ());
@@ -0,0 +1,5 @@
query: >
CALL periodic.delete({batch_size: "a"}) YIELD * RETURN *;

exception: >
Batch size needs to be an integer!
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ());
@@ -0,0 +1,5 @@
query: >
CALL periodic.delete({batch_size: -1}) YIELD * RETURN *;

exception: >
Batch size can't be a non-negative integer!
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ()-[:TYPE]->());
@@ -0,0 +1,8 @@
query: >
CALL periodic.delete({batch_size: 10}) YIELD * RETURN *;

output:
- number_of_deleted_nodes: 20
number_of_deleted_relationships: 10
number_of_executed_batches: 5
success: true
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ());
@@ -0,0 +1,5 @@
query: >
CALL periodic.delete({batch_size: 10, edge_types: [1]}) YIELD * RETURN *;

exception: >
Invalid config for config parameter edge_types!
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ());
@@ -0,0 +1,5 @@
query: >
CALL periodic.delete({batch_size: 10, labels: [1]}) YIELD * RETURN *;

exception: >
Invalid config for config parameter labels!
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ());
@@ -0,0 +1,5 @@
query: >
CALL periodic.delete({}) YIELD * RETURN *;

exception: >
Periodic.delete() did not specify config parameter batch_size!
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ()-[:TYPE]->());
@@ -0,0 +1,8 @@
query: >
CALL periodic.delete({batch_size: 10, edge_types:["TYPE"]}) YIELD * RETURN *;

output:
- number_of_deleted_nodes: 0
number_of_deleted_relationships: 10
number_of_executed_batches: 2
success: true
@@ -0,0 +1 @@
FOREACH (i in range(1, 10) | CREATE ());