Skip to content

Commit

Permalink
box: persist gc consumers
Browse files Browse the repository at this point in the history
Note that the commit drops gc_consumer_advance function because it is
not used anymore.
  • Loading branch information
drewdzzz committed Apr 26, 2024
1 parent 2e056d9 commit 1e71292
Show file tree
Hide file tree
Showing 35 changed files with 1,197 additions and 168 deletions.
190 changes: 190 additions & 0 deletions src/box/alter.cc
Expand Up @@ -61,6 +61,7 @@
#include "authentication.h"
#include "node_name.h"
#include "core/func_adapter.h"
#include "gc.h"

/* {{{ Auxiliary functions and methods. */

Expand Down Expand Up @@ -4395,6 +4396,185 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
return 0;
}

/** GC consumer definition. */
struct gc_consumer_def {
/** Instance UUID. */
struct tt_uuid uuid;
/** Instance vclock. */
struct vclock vclock;
/** See gc_consumer::with_snap. */
bool with_snap;
};

/** Mapping from tuple.opts to fields of gc_consumer_def. */
const struct opt_def gc_consumer_def_opts_reg[] = {
OPT_DEF("with_snap", OPT_BOOL, struct gc_consumer_def, with_snap),
OPT_END,
};

/**
* Fill gc_consumer_def with opts from the MsgPack map.
* Argument map can be NULL - default options are set in this case.
*/
static int
gc_consumer_def_opts_decode(struct gc_consumer_def *def, const char *map,
struct region *region)
{
def->with_snap = false;
if (map == NULL)
return 0;
return opts_decode(def, gc_consumer_def_opts_reg, &map, region);
}

/** Build gc_consumer definition from a _gc_consumers' tuple. */
static struct gc_consumer_def *
gc_consumer_def_new_from_tuple(struct tuple *tuple, struct region *region)
{
struct gc_consumer_def *def = xregion_alloc_object(region, typeof(*def));
memset(def, 0, sizeof(*def));
if (tuple_field_uuid(tuple, BOX_GC_CONSUMERS_FIELD_UUID, &def->uuid) != 0)
return NULL;
if (tt_uuid_is_nil(&def->uuid)) {
diag_set(ClientError, ER_INVALID_UUID, tt_uuid_str(&def->uuid));
return NULL;
}
const char *mp_vclock =
tuple_field_with_type(tuple, BOX_GC_CONSUMERS_FIELD_VCLOCK,
MP_MAP);
if (mp_vclock == NULL)
return NULL;
if (mp_decode_vclock_ignore0(&mp_vclock, &def->vclock) != 0)
return NULL;
const char *opts = NULL;
if (tuple_field(tuple, BOX_GC_CONSUMERS_FIELD_OPTS) != NULL) {
opts = tuple_field_with_type(tuple, BOX_GC_CONSUMERS_FIELD_OPTS,
MP_MAP);
}
if (gc_consumer_def_opts_decode(def, opts, region) != 0)
return NULL;
return def;
}

/**
* Data passed to transactional triggers of replace in _gc_consumers.
*/
struct gc_consumers_txn_trigger_data {
/*
* Replica that the gc_consumer belongs to.
*/
struct replica *replica;
/*
* Replica UUID - is used to check if the replica is still registered.
*/
struct tt_uuid uuid;
/*
* New GC consumer, will be unregistered on rollback.
*/
struct gc_consumer *new_gc;
};

static int
gc_consumers_rollback_insert(struct trigger *trigger, void *)
{
struct gc_consumers_txn_trigger_data *data =
(struct gc_consumers_txn_trigger_data *)trigger->data;
/* Consumer was only created on replace, so simply delete it here. */
gc_consumer_unregister(data->new_gc);
return 0;
}

static int
gc_consumers_commit_insert(struct trigger *trigger, void *)
{
struct gc_consumers_txn_trigger_data *data =
(struct gc_consumers_txn_trigger_data *)trigger->data;
struct tt_uuid *uuid = &data->uuid;
/* Set new GC only if the replica is still registered. */
if (replica_by_uuid(uuid) == data->replica)
data->replica->gc = data->new_gc;
else
gc_consumer_unregister(data->new_gc);
return 0;
}

static int
gc_consumers_commit_delete(struct trigger *trigger, void *)
{
struct gc_consumers_txn_trigger_data *data =
(struct gc_consumers_txn_trigger_data *)trigger->data;
struct tt_uuid *uuid = &data->uuid;
/* Drop old GC only if the replica is still registered. */
if (replica_by_uuid(uuid) == data->replica) {
gc_consumer_unregister(data->replica->gc);
data->replica->gc = NULL;
}
return 0;
}

static int
on_replace_dd_gc_consumers(struct trigger * /* trigger */, void *event)
{
struct txn *txn = (struct txn *)event;
RegionGuard region_guard(&fiber()->gc);
struct txn_stmt *stmt = txn_current_stmt(txn);
struct tuple *old_tuple = stmt->old_tuple;
struct tuple *new_tuple = stmt->new_tuple;
struct gc_consumer_def *old_def = NULL;
struct gc_consumer_def *new_def = NULL;
struct tt_uuid *replica_uuid = NULL;
if (old_tuple != NULL) {
old_def =
gc_consumer_def_new_from_tuple(old_tuple, &fiber()->gc);
if (old_def == NULL)
return -1;
replica_uuid = &old_def->uuid;
}
if (new_tuple != NULL) {
new_def =
gc_consumer_def_new_from_tuple(new_tuple, &fiber()->gc);
if (new_def == NULL)
return -1;
replica_uuid = &new_def->uuid;
}
struct replica *replica = replica_by_uuid(replica_uuid);
if (replica == NULL) {
diag_set(ClientError, ER_UNSUPPORTED, "gc_consumer",
"manipulations without replica");
return -1;
}

struct gc_consumers_txn_trigger_data *trg_data =
xregion_alloc_object(&in_txn()->region,
struct gc_consumers_txn_trigger_data);
trg_data->uuid = *replica_uuid;
trg_data->replica = replica;
trg_data->new_gc = NULL;

if (old_tuple != NULL) {
struct trigger *on_commit =
txn_alter_trigger_new(gc_consumers_commit_delete,
trg_data);
txn_stmt_on_commit(stmt, on_commit);
}
if (new_tuple != NULL) {
/* Create new consumer and set it to the replica on commit. */
trg_data->new_gc = gc_consumer_register(
&new_def->vclock, new_def->with_snap, "replica %s",
tt_uuid_str(replica_uuid));
if (trg_data->new_gc == NULL)
return -1;
struct trigger *on_rollback =
txn_alter_trigger_new(gc_consumers_rollback_insert,
trg_data);
txn_stmt_on_rollback(stmt, on_rollback);
struct trigger *on_commit =
txn_alter_trigger_new(gc_consumers_commit_insert,
trg_data);
txn_stmt_on_commit(stmt, on_commit);
}
return 0;
}

/** Unregister the replica affected by the change. */
static int
on_replace_cluster_clear_id(struct trigger *trigger, void * /* event */)
Expand Down Expand Up @@ -4583,6 +4763,12 @@ on_replace_dd_cluster_update(const struct replica_def *old_def,
"own UUID update in _cluster");
return -1;
}
/*
* Drop gc_consumer for the replaced replica. Consumer
* for the new one will be created on join/subscribe.
*/
if (box_gc_consumer_unregister(replica) != 0)
return -1;
if (on_replace_dd_cluster_set_uuid(replica, new_def) != 0)
return -1;
/* The replica was re-created. */
Expand Down Expand Up @@ -4670,6 +4856,9 @@ on_replace_dd_cluster_delete(const struct replica_def *old_def)
"internally - %s", old_def->id,
tt_uuid_str(&old_def->uuid), tt_uuid_str(&replica->uuid));
}
/* Unregister gc consumer of the replica. */
if (box_gc_consumer_unregister(replica) != 0)
return -1;
/*
* Unregister only after commit. Otherwise if the transaction would be
* rolled back, there might be already another replica taken the freed
Expand Down Expand Up @@ -5541,4 +5730,5 @@ TRIGGER(on_replace_sequence_data, on_replace_dd_sequence_data);
TRIGGER(on_replace_space_sequence, on_replace_dd_space_sequence);
TRIGGER(on_replace_trigger, on_replace_dd_trigger);
TRIGGER(on_replace_func_index, on_replace_dd_func_index);
TRIGGER(on_replace_gc_consumers, on_replace_dd_gc_consumers);
/* vim: set foldmethod=marker */
1 change: 1 addition & 0 deletions src/box/alter.h
Expand Up @@ -48,5 +48,6 @@ extern struct trigger on_replace_sequence_data;
extern struct trigger on_replace_space_sequence;
extern struct trigger on_replace_trigger;
extern struct trigger on_replace_func_index;
extern struct trigger on_replace_gc_consumers;

#endif /* INCLUDES_TARANTOOL_BOX_ALTER_H */

0 comments on commit 1e71292

Please sign in to comment.