Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock dimension slice tuple when scanning #2150

Merged
merged 3 commits into from Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Expand Up @@ -67,6 +67,7 @@ if (CMAKE_BUILD_TYPE MATCHES Debug)
set(TS_DEBUG 1)
set(DEBUG 1)
list(APPEND SOURCES
debug_wait.c
debug_guc.c)
endif (CMAKE_BUILD_TYPE MATCHES Debug)

Expand Down
123 changes: 81 additions & 42 deletions src/chunk.c
Expand Up @@ -35,6 +35,7 @@
#include <access/tupdesc.h>

#include "export.h"
#include "debug_wait.h"
#include "chunk.h"
#include "chunk_index.h"
#include "chunk_data_node.h"
Expand Down Expand Up @@ -1189,18 +1190,18 @@ chunk_build_from_tuple_and_stub(Chunk **chunkptr, TupleInfo *ti, const ChunkStub
chunk_formdata_fill(&chunk->fd, ti);

/*
* When searching for the chunk stub matching the dimensional point, we only
* scanned for dimensional constraints. We now need to rescan the
* When searching for the chunk stub matching the dimensional point, we
* only scanned for dimensional constraints. We now need to rescan the
* constraints to also get the inherited constraints.
*/
chunk->constraints =
ts_chunk_constraint_scan_by_chunk_id(chunk->fd.id, num_constraints_hint, ti->mctx);

/* If a stub is provided then reuse its hypercube. Note that stubs that are
* results of a point or range scan might be incomplete (in terms of number
* of slices and constraints). Only a chunk stub that matches in all
* dimensions will have a complete hypercube. Thus, we need to check the
* validity of the stub before we can reuse it.
/* If a stub is provided then reuse its hypercube. Note that stubs that
* are results of a point or range scan might be incomplete (in terms of
* number of slices and constraints). Only a chunk stub that matches in
* all dimensions will have a complete hypercube. Thus, we need to check
* the validity of the stub before we can reuse it.
*/
if (chunk_stub_is_valid(stub, chunk->constraints->num_dimension_constraints))
{
Expand Down Expand Up @@ -1244,9 +1245,9 @@ chunk_tuple_found(TupleInfo *ti, void *arg)
Assert(!chunk->fd.dropped);

/* Fill in table relids. Note that we cannot do this in
* chunk_build_from_tuple_and_stub() since chunk_resurrect() also uses that
* function and, in that case, the chunk object is needed to create the data
* table and related objects. */
* chunk_build_from_tuple_and_stub() since chunk_resurrect() also uses
* that function and, in that case, the chunk object is needed to create
* the data table and related objects. */
chunk->table_id = get_relname_relid(chunk->fd.table_name.data,
get_namespace_oid(chunk->fd.schema_name.data, true));
chunk->hypertable_relid = ts_inheritance_parent_relid(chunk->table_id);
Expand Down Expand Up @@ -2492,9 +2493,25 @@ chunk_tuple_delete(TupleInfo *ti, DropBehavior behavior, bool preserve_chunk_cat
ts_dimension_slice_scan_by_id_and_lock(cc->fd.dimension_slice_id,
&tuplock,
CurrentMemoryContext);
if (ts_chunk_constraint_scan_by_dimension_slice_id(slice->fd.id,
NULL,
CurrentMemoryContext) == 0)
/* If the slice is not found in the scan above, the table is
* broken so we do not delete the slice. We proceed
* anyway since users need to be able to drop broken tables or
* remove broken chunks. */
if (!slice)
{
const Hypertable *const ht = ts_hypertable_get_by_id(form.hypertable_id);
ereport(WARNING,
(errmsg("unexpected state for chunk %s.%s, dropping anyway",
quote_identifier(NameStr(form.schema_name)),
quote_identifier(NameStr(form.table_name))),
errdetail("The integrity of hypertable %s.%s might be compromised "
"since one of its chunks lacked a dimension slice.",
quote_identifier(NameStr(ht->fd.schema_name)),
quote_identifier(NameStr(ht->fd.table_name)))));
}
else if (ts_chunk_constraint_scan_by_dimension_slice_id(slice->fd.id,
NULL,
CurrentMemoryContext) == 0)
ts_dimension_slice_delete_by_id(cc->fd.dimension_slice_id, false);
}
}
Expand Down Expand Up @@ -2807,9 +2824,9 @@ ts_chunk_drop_fks(Chunk *const chunk)
}

/*
* Recreates all FK constraints on a chunk by using the constraints on the parent hypertable as a
* template. Currently it is used only during chunk decompression, since FK constraints are dropped
* during compression.
* Recreates all FK constraints on a chunk by using the constraints on the parent hypertable as
* a template. Currently it is used only during chunk decompression, since FK constraints are
* dropped during compression.
*/
void
ts_chunk_create_fks(Chunk *const chunk)
Expand Down Expand Up @@ -3001,10 +3018,9 @@ chunk_cmp(const void *ch1, const void *ch2)
}

/*
* This is a helper set returning function (SRF) that takes a set returning function context and as
* argument and returns oids extracted from funcctx->user_fctx (which is Chunk* array).
* Note that the caller needs to be registered as a
* set returning function for this to work.
* This is a helper set returning function (SRF) that takes a set returning function context and
* as argument and returns oids extracted from funcctx->user_fctx (which is Chunk* array). Note
* that the caller needs to be registered as a set returning function for this to work.
*/
static Datum
chunks_return_srf(FunctionCallInfo fcinfo)
Expand Down Expand Up @@ -3119,11 +3135,12 @@ ts_chunk_drop_process_invalidations(Oid hypertable_relid, int64 older_than, int6
cagg.user_view_schema.data,
cagg.user_view_name.data)));

/* Lock all chunks in Exclusive mode, blocking everything but selects on the table. We have to
* block all modifications so that we can't get new invalidation entries. This makes sure that
* all future modifying txns on this data region will have a now() that higher than ours and
* thus will not invalidate. Otherwise, we could have an old txn with a now() in the past that
* all of a sudden decides to to insert data right after we process_invalidations. */
/* Lock all chunks in Exclusive mode, blocking everything but selects on the table. We have
* to block all modifications so that we can't get new invalidation entries. This makes sure
* that all future modifying txns on this data region will have a now() that higher than
* ours and thus will not invalidate. Otherwise, we could have an old txn with a now() in
* the past that all of a sudden decides to to insert data right after we
* process_invalidations. */
for (i = 0; i < num_chunks; i++)
{
LockRelationOid(chunks[i].table_id, ExclusiveLock);
Expand All @@ -3142,12 +3159,13 @@ ts_chunk_drop_process_invalidations(Oid hypertable_relid, int64 older_than, int6
bool finished_all_invalidation = false;

/* This will loop until all invalidations are done, each iteration of the loop will do
* max_interval_per_job's worth of data. We don't want to ignore max_interval_per_job here
* to avoid large sorts. */
* max_interval_per_job's worth of data. We don't want to ignore max_interval_per_job
* here to avoid large sorts. */
while (!finished_all_invalidation)
{
elog(NOTICE,
"making sure all invalidations for %s.%s have been processed prior to dropping "
"making sure all invalidations for %s.%s have been processed prior to "
"dropping "
"chunks",
NameStr(ca->data.user_view_schema),
NameStr(ca->data.user_view_name));
Expand Down Expand Up @@ -3196,6 +3214,7 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
const int32 hypertable_id = ht->fd.id;
bool has_continuous_aggs;
List *data_nodes = NIL;
const MemoryContext oldcontext = CurrentMemoryContext;
ScanTupLock tuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
Expand Down Expand Up @@ -3230,13 +3249,33 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
break;
}

chunks = get_chunks_in_time_range(ht,
older_than,
newer_than,
DROP_CHUNKS_FUNCNAME,
CurrentMemoryContext,
&num_chunks,
&tuplock);
PG_TRY();
{
chunks = get_chunks_in_time_range(ht,
older_than,
newer_than,
DROP_CHUNKS_FUNCNAME,
CurrentMemoryContext,
&num_chunks,
&tuplock);
}
PG_CATCH();
{
ErrorData *edata;
MemoryContextSwitchTo(oldcontext);
edata = CopyErrorData();
if (edata->sqlerrcode == ERRCODE_LOCK_NOT_AVAILABLE)
{
FlushErrorState();
edata->detail = edata->message;
edata->message =
psprintf("some chunks could not be read since they are being concurrently updated");
}
ReThrowError(edata);
}
PG_END_TRY();

DEBUG_WAITPOINT("drop_chunks_chunks_found");

if (has_continuous_aggs)
ts_chunk_drop_process_invalidations(ht->main_table_relid,
Expand Down Expand Up @@ -3279,10 +3318,9 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
}

/*
* This is a helper set returning function (SRF) that takes a set returning function context and as
* argument and returns cstrings extracted from funcctx->user_fctx (which is a List).
* Note that the caller needs to be registered as a
* set returning function for this to work.
* This is a helper set returning function (SRF) that takes a set returning function context and
* as argument and returns cstrings extracted from funcctx->user_fctx (which is a List). Note
* that the caller needs to be registered as a set returning function for this to work.
*/
static Datum
list_return_srf(FunctionCallInfo fcinfo)
Expand Down Expand Up @@ -3499,9 +3537,10 @@ ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
}

/**
* This function is used to explicitly specify chunks that are being scanned. It's being processed
* in the planning phase and removed from the query tree. This means that the actual function
* implementation will only be executed if something went wrong during explicit chunk exclusion.
* This function is used to explicitly specify chunks that are being scanned. It's being
* processed in the planning phase and removed from the query tree. This means that the actual
* function implementation will only be executed if something went wrong during explicit chunk
* exclusion.
*/
Datum
ts_chunks_in(PG_FUNCTION_ARGS)
Expand Down
6 changes: 6 additions & 0 deletions src/compat.h
Expand Up @@ -254,6 +254,12 @@
#define TUPLE_DESC_HAS_OIDS(desc) false
#endif

#if defined(__GNUC__)
#define TS_ATTRIBUTE_NONNULL(X) __attribute__((nonnull X))
#else
#define TS_ATTRIBUTE_NONNULL(X)
#endif

/* Compatibility functions for table access method API introduced in PG12 */
#if PG11
#include "compat/tupconvert.h"
Expand Down
112 changes: 112 additions & 0 deletions src/debug_wait.c
@@ -0,0 +1,112 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/

#include "debug_wait.h"

#include <postgres.h>

#include <fmgr.h>

#include <access/hash.h>
#include <storage/lock.h>
#include <miscadmin.h>
#include <utils/builtins.h>

#include "export.h"

void
ts_debug_waitpoint_init(DebugWait *waitpoint, const char *tagname)
{
/* Use 64-bit hashing to get two independent 32-bit hashes */
uint64 hash = DatumGetUInt32(hash_any((const unsigned char *) tagname, strlen(tagname)));

SET_LOCKTAG_ADVISORY(waitpoint->tag, MyDatabaseId, (uint32)(hash >> 32), (uint32) hash, 1);
waitpoint->tagname = pstrdup(tagname);
ereport(DEBUG1, (errmsg("initializing waitpoint '%s' to use %lu", waitpoint->tagname, hash)));
}

/*
* Wait for the waitpoint to be released.
*
* This is handled by first trying to get a shared lock, which will not block
* other sessions that try to grab the same lock but will block if an
* exclusive lock is already taken, and then release the lock immediately
* after.
*/
void
ts_debug_waitpoint_wait(DebugWait *waitpoint)
{
LockAcquireResult lock_acquire_result pg_attribute_unused();
bool lock_release_result pg_attribute_unused();

ereport(DEBUG1, (errmsg("waiting on waitpoint '%s'", waitpoint->tagname)));

/* Take the lock. This should always succeed, anything else is a bug. */
lock_acquire_result = LockAcquire(&waitpoint->tag, ShareLock, true, false);
Assert(lock_acquire_result == LOCKACQUIRE_OK);

lock_release_result = LockRelease(&waitpoint->tag, ShareLock, true);
Assert(lock_release_result);

ereport(DEBUG1, (errmsg("proceeding after waitpoint '%s'", waitpoint->tagname)));
}

static void
debug_waitpoint_enable(DebugWait *waitpoint)
{
ereport(DEBUG1, (errmsg("enabling waitpoint \"%s\"", waitpoint->tagname)));
if (LockAcquire(&waitpoint->tag, ExclusiveLock, true, true) == LOCKACQUIRE_NOT_AVAIL)
ereport(NOTICE, (errmsg("debug waitpoint \"%s\" already enabled", waitpoint->tagname)));
}

static void
debug_waitpoint_release(DebugWait *waitpoint)
{
ereport(DEBUG1, (errmsg("releasing waitpoint \"%s\"", waitpoint->tagname)));
if (!LockRelease(&waitpoint->tag, ExclusiveLock, true))
elog(ERROR, "cannot release waitpoint \"%s\"", waitpoint->tagname);
}

/*
* Enable a waitpoint to block when being reached.
*
* This function will always succeed since we will not lock the waitpoint if
* it is already locked. A notice will be printed if the waitpoint is already
* enabled.
*/
TS_FUNCTION_INFO_V1(ts_debug_waitpoint_enable);

Datum
ts_debug_waitpoint_enable(PG_FUNCTION_ARGS)
{
text *tag = PG_GETARG_TEXT_PP(0);
DebugWait waitpoint;

if (PG_ARGISNULL(0))
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("no tag provided")));

ts_debug_waitpoint_init(&waitpoint, text_to_cstring(tag));
debug_waitpoint_enable(&waitpoint);
PG_RETURN_VOID();
}

/*
* Release a waitpoint allowing execution to proceed.
*/
TS_FUNCTION_INFO_V1(ts_debug_waitpoint_release);
Datum
ts_debug_waitpoint_release(PG_FUNCTION_ARGS)
{
text *tag = PG_GETARG_TEXT_PP(0);
DebugWait waitpoint;

if (PG_ARGISNULL(0))
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("no tag provided")));

ts_debug_waitpoint_init(&waitpoint, text_to_cstring(tag));
debug_waitpoint_release(&waitpoint);
PG_RETURN_VOID();
}