Skip to content

Commit

Permalink
Fix OOM in large INSERTs
Browse files Browse the repository at this point in the history
Do not allocate various temporary data in PortalContext, such as the
hyperspace point corresponding to the row, or the intermediate data
required for chunk lookup.
  • Loading branch information
akuzm committed Aug 23, 2022
1 parent 3acfbd0 commit 51259b3
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 49 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ accidentally triggering the load of a previous DB version.**
* #4555 Handle properly default privileges on Continuous Aggregates
* #4575 Fix use of `get_partition_hash` and `get_partition_for_key` inside an IMMUTABLE function
* #4416 Handle TRUNCATE TABLE on chunks
* #4611 Fix a potential OOM when loading large data sets into a hypertable

**Thanks**
@janko for reporting an issue when adding bool column with default value to compressed hypertable
@AlmiS for reporting error on `get_partition_hash` executed inside an IMMUTABLE function
@michaelkitson for reporting permission errors using default privileges on Continuous Aggregates
@jayadevanm for reporting error of TRUNCATE TABLE on compressed chunk
@ninjaltd and @mrksngl for reporting a potential OOM when loading large data sets into a hypertable

## 2.7.2 (2022-07-26)

Expand Down
13 changes: 13 additions & 0 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -2674,6 +2674,19 @@ ts_chunk_get_hypertable_id_by_relid(Oid relid)
return 0;
}

/*
* Returns the compressed chunk id. The original chunk must exist.
*/
int32
ts_chunk_get_compressed_chunk_id(int32 chunk_id)
{
FormData_chunk form;
PG_USED_FOR_ASSERTS_ONLY bool result =
chunk_simple_scan_by_id(chunk_id, &form, /* missing_ok = */ false);
Assert(result);
return form.compressed_chunk_id;
}

/*
* Returns false if there is no chunk with such reloid.
*/
Expand Down
1 change: 1 addition & 0 deletions src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ extern TSDLLEXPORT Chunk *ts_chunk_get_by_relid(Oid relid, bool fail_if_not_foun
extern TSDLLEXPORT void ts_chunk_free(Chunk *chunk);
extern bool ts_chunk_exists(const char *schema_name, const char *table_name);
extern TSDLLEXPORT int32 ts_chunk_get_hypertable_id_by_relid(Oid relid);
extern TSDLLEXPORT int32 ts_chunk_get_compressed_chunk_id(int32 chunk_id);
extern bool ts_chunk_get_hypertable_id_and_status_by_relid(Oid relid, int32 *hypertable_id,
int32 *chunk_status);
extern Oid ts_chunk_get_relid(int32 chunk_id, bool missing_ok);
Expand Down
54 changes: 25 additions & 29 deletions src/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ TSCopyMultiInsertBufferInit(Point *point)
memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
buffer->bistate = GetBulkInsertState();
buffer->nused = 0;
buffer->point = point;

buffer->point = palloc(POINT_SIZE(point->num_coords));
memcpy(buffer->point, point, POINT_SIZE(point->num_coords));

return buffer;
}
Expand Down Expand Up @@ -298,22 +300,18 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf
Assert(miinfo != NULL);
Assert(buffer != NULL);

#if PG14_GE
uint64 save_cur_lineno;
bool line_buf_valid = false;
CopyFromState cstate = miinfo->ccstate->cstate;

/* cstate can be NULL in calls that are invoked from timescaledb_move_from_table_to_chunks. */
if (cstate != NULL)
line_buf_valid = cstate->line_buf_valid;
#endif

EState *estate = miinfo->estate;
CommandId mycid = miinfo->mycid;
int ti_options = miinfo->ti_options;
int nused = buffer->nused;
TupleTableSlot **slots = buffer->slots;

/*
* table_multi_insert and reinitialization of the chunk insert state may
* leak memory, so switch to short-lived memory context before calling it.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));

/*
* A chunk can be closed while buffering the tuples. Even when the chunk
* insert state is moved to the copy memory context, the underlying
Expand All @@ -337,22 +335,24 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf
* members can be accessed.
*/
#if PG14_GE
uint64 save_cur_lineno = 0;
bool line_buf_valid = false;
CopyFromState cstate = miinfo->ccstate->cstate;

/* cstate can be NULL in calls that are invoked from timescaledb_move_from_table_to_chunks. */
if (cstate != NULL)
{
cstate->line_buf_valid = false;
line_buf_valid = cstate->line_buf_valid;
save_cur_lineno = cstate->cur_lineno;

cstate->line_buf_valid = false;
}
#endif

#if PG14_LT
estate->es_result_relation_info = resultRelInfo;
#endif

/*
* table_multi_insert may leak memory, so switch to short-lived memory
* context before calling it.
*/
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
table_multi_insert(resultRelInfo->ri_RelationDesc,
slots,
nused,
Expand All @@ -363,6 +363,10 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf

for (i = 0; i < nused; i++)
{
#if PG14_GE
if (cstate != NULL)
cstate->cur_lineno = buffer->linenos[i];
#endif
/*
* If there are any indexes, update them for all the inserted tuples,
* and run AFTER ROW INSERT triggers.
Expand All @@ -371,11 +375,6 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf
{
List *recheckIndexes;

#if PG14_GE
if (cstate != NULL)
cstate->cur_lineno = buffer->linenos[i];
#endif

recheckIndexes = ExecInsertIndexTuplesCompat(resultRelInfo,
buffer->slots[i],
estate,
Expand All @@ -400,10 +399,6 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf
(resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
resultRelInfo->ri_TrigDesc->trig_insert_new_table))
{
#if PG14_GE
if (cstate != NULL)
cstate->cur_lineno = buffer->linenos[i];
#endif
ExecARInsertTriggers(estate,
resultRelInfo,
slots[i],
Expand Down Expand Up @@ -456,6 +451,7 @@ TSCopyMultiInsertBufferCleanup(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertB

table_finish_bulk_insert(result_relation_info->ri_RelationDesc, miinfo->ti_options);

pfree(buffer->point);
pfree(buffer);
}

Expand Down Expand Up @@ -887,9 +883,6 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte

ExecStoreVirtualTuple(myslot);

/* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo(oldcontext);

/* Calculate the tuple's point in the N-dimensional hyperspace */
point = ts_hyperspace_calculate_point(ht->space, myslot);

Expand All @@ -901,6 +894,9 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte

Assert(cis != NULL);

/* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo(oldcontext);

currentTupleInsertMethod = insertMethod;

/* Insert tuples into compressed chunks tuple by tuple */
Expand Down
12 changes: 10 additions & 2 deletions src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,12 @@ ts_hypertable_get_by_id(int32 hypertable_id)
return ht;
}

static void
hypertable_chunk_store_free(void *entry)
{
ts_chunk_free((Chunk *) entry);
}

/*
* Add the chunk to the cache that allows fast lookup of chunks
* for a given hyperspace Point.
Expand All @@ -1048,7 +1054,7 @@ hypertable_chunk_store_add(const Hypertable *h, const Chunk *input_chunk)
ts_subspace_store_add(h->chunk_cache,
cached_chunk->cube,
cached_chunk,
/* object_free = */ NULL);
hypertable_chunk_store_free);
MemoryContextSwitchTo(old_mcxt);

return cached_chunk;
Expand All @@ -1075,7 +1081,9 @@ ts_hypertable_create_chunk_for_point(const Hypertable *h, const Point *point)
/*
* Find the chunk containing the given point, locking all its dimension slices
* for share. NULL if not found.
* Also uses hypertable chunk cache.
* Also uses hypertable chunk cache. The returned chunk is owned by the cache
* and may become invalid after some subsequent call to this function.
* Leaks memory, so call in a short-lived context.
*/
Chunk *
ts_hypertable_find_chunk_for_point(const Hypertable *h, const Point *point)
Expand Down
9 changes: 9 additions & 0 deletions src/nodes/chunk_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,

if (NULL == cis)
{
/*
* The chunk search functions may leak memory, so switch to a temporary
* memory context.
*/
MemoryContext old_context =
MemoryContextSwitchTo(GetPerTupleMemoryContext(dispatch->estate));

/*
* Normally, for every row of the chunk except the first one, we expect
* the chunk to exist already. The "create" function would take a lock
Expand All @@ -148,6 +155,8 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point,

cis = ts_chunk_insert_state_create(new_chunk, dispatch);
ts_subspace_store_add(dispatch->cache, new_chunk->cube, cis, destroy_chunk_insert_state);

MemoryContextSwitchTo(old_context);
}
else if (cis->rel->rd_id == dispatch->prev_cis_oid && cis == dispatch->prev_cis)
{
Expand Down
28 changes: 11 additions & 17 deletions src/nodes/chunk_insert_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -553,18 +553,19 @@ adjust_projections(ChunkInsertState *cis, ChunkDispatch *dispatch, Oid rowtype)
static Relation
lock_associated_compressed_chunk(int32 chunk_id, bool *has_compressed_chunk)
{
Relation compress_rel = NULL;
Chunk *orig_chunk = ts_chunk_get_by_id(chunk_id, true);
Oid compress_chunk_relid = InvalidOid;
*has_compressed_chunk = false;
if (orig_chunk->fd.compressed_chunk_id)
compress_chunk_relid = ts_chunk_get_relid(orig_chunk->fd.compressed_chunk_id, false);
if (compress_chunk_relid != InvalidOid)
int32 compressed_chunk_id = ts_chunk_get_compressed_chunk_id(chunk_id);
if (compressed_chunk_id)
{
Oid compress_chunk_relid =
ts_chunk_get_relid(compressed_chunk_id, /* missing_ok = */ false);
Assert(compress_chunk_relid != InvalidOid);

*has_compressed_chunk = true;
compress_rel = table_open(compress_chunk_relid, RowExclusiveLock);
return table_open(compress_chunk_relid, RowExclusiveLock);
}
return compress_rel;

*has_compressed_chunk = false;
return NULL;
}

/*
Expand All @@ -580,7 +581,6 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
int32 cagg_trig_args[2] = { 0, 0 };
ChunkInsertState *state;
Relation rel, parent_rel, compress_rel = NULL;
MemoryContext old_mcxt;
MemoryContext cis_context = AllocSetContextCreate(dispatch->estate->es_query_cxt,
"chunk insert state memory context",
ALLOCSET_DEFAULT_SIZES);
Expand Down Expand Up @@ -608,12 +608,6 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
errmsg("insert with ON CONFLICT or RETURNING clause is not supported on "
"compressed chunks")));

/*
* We must allocate the range table entry on the executor's per-query
* context
*/
old_mcxt = MemoryContextSwitchTo(dispatch->estate->es_query_cxt);

rel = table_open(chunk->table_id, RowExclusiveLock);
if (has_compressed_chunk && ts_indexing_relation_has_primary_or_unique_index(rel))
{
Expand All @@ -625,7 +619,7 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch)
}
compress_rel = lock_associated_compressed_chunk(chunk->fd.id, &has_compressed_chunk);

MemoryContextSwitchTo(cis_context);
MemoryContext old_mcxt = MemoryContextSwitchTo(cis_context);
relinfo = create_chunk_result_relation_info(dispatch, rel);
if (!has_compressed_chunk)
resrelinfo = relinfo;
Expand Down
54 changes: 54 additions & 0 deletions test/expected/copy_memory_usage.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.
-- Test that transaction memory usage with COPY doesn't grow.
-- We need memory usage in PortalContext after the completion of the query, so
-- we'll have to log it from a trigger that runs after the query is completed.
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
create table uk_price_paid(price integer, "date" date, postcode1 text, postcode2 text, type smallint, is_new bool, duration smallint, addr1 text, addr2 text, street text, locality text, town text, district text, country text, category smallint);
-- Aim to about 100 partitions, the data is from 1995 to 2022.
select create_hypertable('uk_price_paid', 'date', chunk_time_interval => interval '90 day');
NOTICE: adding not-null constraint to column "date"
create_hypertable
----------------------------
(1,public,uk_price_paid,t)
(1 row)

-- This is where we log the memory usage.
create table portal_memory_log(id serial, bytes int);
-- Returns the amount of memory currently allocated in a given
-- memory context. Only works for PortalContext, and doesn't work for PG 12.
create or replace function ts_debug_allocated_bytes(text) returns int
as :MODULE_PATHNAME, 'ts_debug_allocated_bytes'
language c strict volatile;
-- Log current memory usage into the log table.
create function log_memory() returns trigger as $$
begin
insert into portal_memory_log
values (default, (select ts_debug_allocated_bytes('PortalContext')));
return new;
end;
$$ language plpgsql;
-- Add a trigger that runs after completion of each INSERT/COPY and logs the
-- current memory usage.
create trigger check_update after insert on uk_price_paid
for each statement execute function log_memory();
-- Memory leaks often happen on cache invalidation, so make sure they are
-- invalidated often and independently (at co-prime periods).
set timescaledb.max_open_chunks_per_insert = 2;
set timescaledb.max_cached_chunks_per_hypertable = 3;
-- Try increasingly larger data sets by concatenating the same file multiple
-- times.
\copy uk_price_paid from program 'bash -c "cat <(zcat < data/prices-10k-random-1.tsv.gz)"';
\copy uk_price_paid from program 'bash -c "cat <(zcat < data/prices-10k-random-1.tsv.gz) <(zcat < data/prices-10k-random-1.tsv.gz)"';
\copy uk_price_paid from program 'bash -c "cat <(zcat < data/prices-10k-random-1.tsv.gz) <(zcat < data/prices-10k-random-1.tsv.gz) <(zcat < data/prices-10k-random-1.tsv.gz)"';
\copy uk_price_paid from program 'bash -c "cat <(zcat < data/prices-10k-random-1.tsv.gz) <(zcat < data/prices-10k-random-1.tsv.gz) <(zcat < data/prices-10k-random-1.tsv.gz) <(zcat < data/prices-10k-random-1.tsv.gz)"';
-- We'll only compare the third and fourth run because the others have variance
-- due to new chunks being created and other unknown reasons. Allow 1% change of
-- memory usage to account for some randomness.
select (max(bytes) - min(bytes)) / max(bytes)::float > 0.01 from portal_memory_log where id >= 3;
?column?
----------
f
(1 row)

1 change: 1 addition & 0 deletions test/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ if(CMAKE_BUILD_TYPE MATCHES Debug)
TEST_FILES
bgw_launcher.sql
c_unit_tests.sql
copy_memory_usage.sql
guc_options.sql
loader.sql
metadata.sql
Expand Down

0 comments on commit 51259b3

Please sign in to comment.