Skip to content

Commit

Permalink
Make INSERTs use a custom plan instead of triggers
Browse files Browse the repository at this point in the history
With this change, hypertables no longer rely on an INSERT trigger to
dispatch tuples to chunks. While an INSERT trigger worked well for
both INSERTs and COPYs, it caused issues with supporting some regular
triggers on hypertables, and didn't support RETURNING statements and
upserts (ON CONFLICT DO UPDATE).

INSERTs are now handled by modifying the plan for INSERT statements. A
custom plan node is inserted as a subplan to a ModifyTable plan node,
taking care of dispatching tuples to chunks by setting the result
table for every tuple scanned.

COPYs are handled by modifying the regular copy code. Unfortunately,
this required copying a significant amount of regular PostgreSQL
source code since there are no hooks to add modifications. However,
since the modifications are small it should be fairly easy to keep the
code in sync with upstream changes.
  • Loading branch information
erimatnor committed Jul 27, 2017
1 parent f23bf58 commit 1f3dcd8
Show file tree
Hide file tree
Showing 37 changed files with 1,661 additions and 791 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,19 @@ SRCS = \
src/ddl_utils.c \
src/chunk_constraint.c \
src/partitioning.c \
src/insert.c \
src/planner.c \
src/executor.c \
src/process_utility.c \
src/copy.c \
src/sort_transform.c \
src/insert_chunk_state.c \
src/insert_statement_state.c \
src/chunk_dispatch.c \
src/chunk_dispatch_state.c \
src/chunk_dispatch_plan.c \
src/chunk_insert_state.c \
src/agg_bookend.c \
src/subspace_store.c \
src/guc.c \
src/compat.c \
src/version.c

OBJS = $(SRCS:.c=.o)
Expand Down
3 changes: 0 additions & 3 deletions sql/chunk_trigger.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,3 @@ BEGIN
WHERE c.hypertable_id = drop_trigger_on_all_chunks.hypertable_id;
END
$BODY$;



29 changes: 4 additions & 25 deletions sql/ddl_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -185,23 +185,9 @@ DECLARE
BEGIN
SELECT * INTO STRICT trigger_row FROM pg_trigger WHERE OID = trigger_oid;

IF trigger_row.tgname = ANY(_timescaledb_internal.timescale_trigger_names()) THEN
RETURN FALSE;
END IF;

IF (trigger_row.tgtype & (1 << 2) != 0) THEN
--is INSERT trigger
IF (trigger_row.tgtype & (1 << 3) != 0) OR (trigger_row.tgtype & (1 << 4) != 0) THEN
RAISE 'Combining INSERT triggers with UPDATE or DELETE triggers is not supported.'
USING HINT = 'Please define separate triggers for each operation';
END IF;
IF (trigger_row.tgtype & ((1 << 1) | (1 << 6)) = 0) THEN
RAISE 'AFTER trigger on INSERT is not supported: %.', trigger_row.tgname;
END IF;
ELSE
IF (trigger_row.tgtype & (1 << 0) != 0) THEN
RETURN TRUE;
END IF;
IF (trigger_row.tgtype & (1 << 0) != 0) THEN
-- row trigger
RETURN TRUE;
END IF;
RETURN FALSE;
END
Expand All @@ -219,7 +205,7 @@ DECLARE
BEGIN
IF _timescaledb_internal.need_chunk_trigger(hypertable_id, trigger_oid) THEN
SELECT * INTO STRICT trigger_row FROM pg_trigger WHERE OID = trigger_oid;
PERFORM _timescaledb_internal.create_trigger_on_all_chunks(hypertable_id, trigger_row.tgname,
PERFORM _timescaledb_internal.create_trigger_on_all_chunks(hypertable_id, trigger_row.tgname,
_timescaledb_internal.get_general_trigger_definition(trigger_oid));
END IF;
END
Expand Down Expand Up @@ -475,13 +461,6 @@ BEGIN
END
$BODY$;

CREATE OR REPLACE FUNCTION _timescaledb_internal.timescale_trigger_names()
RETURNS text[] LANGUAGE SQL IMMUTABLE AS
$BODY$
SELECT array['_timescaledb_main_insert_trigger', '_timescaledb_main_after_insert_trigger'];
$BODY$;


CREATE OR REPLACE FUNCTION _timescaledb_internal.truncate_hypertable(
schema_name NAME,
table_name NAME
Expand Down
8 changes: 3 additions & 5 deletions sql/ddl_triggers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ BEGIN
FROM pg_catalog.pg_trigger
WHERE oid = info.objid;

IF _timescaledb_internal.is_main_table(table_oid)
AND trigger_name <> ALL(_timescaledb_internal.timescale_trigger_names())
THEN
IF _timescaledb_internal.is_main_table(table_oid) THEN
hypertable_row := _timescaledb_internal.hypertable_from_main_table(table_oid);
PERFORM _timescaledb_internal.add_trigger(hypertable_row.id, index_oid);
END IF;
Expand Down Expand Up @@ -183,7 +181,7 @@ END
$BODY$;

CREATE OR REPLACE FUNCTION _timescaledb_internal.ddl_process_alter_table()
RETURNS event_trigger LANGUAGE plpgsql
RETURNS event_trigger LANGUAGE plpgsql
SECURITY DEFINER SET search_path = ''
AS
$BODY$
Expand All @@ -207,7 +205,7 @@ BEGIN
FOR chunk_row IN
SELECT *
FROM _timescaledb_catalog.chunk
WHERE hypertable_id = hypertable_row.id
WHERE hypertable_id = hypertable_row.id
LOOP
EXECUTE format(
$$
Expand Down
21 changes: 0 additions & 21 deletions sql/hypertable_triggers.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,5 @@
-- This file contains triggers that act on the main 'hypertable' table as
-- well as triggers for newly created hypertables.

-- These trigger functions intercept regular inserts and implement our smart insert fastpath
CREATE OR REPLACE FUNCTION _timescaledb_internal.main_table_insert_trigger() RETURNS TRIGGER
AS '$libdir/timescaledb', 'insert_main_table_trigger' LANGUAGE C;

CREATE OR REPLACE FUNCTION _timescaledb_internal.main_table_after_insert_trigger() RETURNS TRIGGER
AS '$libdir/timescaledb', 'insert_main_table_trigger_after' LANGUAGE C;

-- Adds the above triggers to the main table when a hypertable is created.
CREATE OR REPLACE FUNCTION _timescaledb_internal.on_change_hypertable()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
Expand All @@ -31,17 +22,6 @@ BEGIN
RAISE;
END IF;
END;

EXECUTE format(
$$
CREATE TRIGGER _timescaledb_main_insert_trigger BEFORE INSERT ON %I.%I
FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.main_table_insert_trigger();
$$, NEW.schema_name, NEW.table_name);
EXECUTE format(
$$
CREATE TRIGGER _timescaledb_main_after_insert_trigger AFTER INSERT ON %I.%I
FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.main_table_after_insert_trigger();
$$, NEW.schema_name, NEW.table_name);
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
RETURN OLD;
Expand All @@ -53,4 +33,3 @@ BEGIN
END
$BODY$
SET client_min_messages = WARNING; -- suppress NOTICE on IF EXISTS schema

3 changes: 3 additions & 0 deletions sql/updates/post-0.2.0--0.2.1-dev.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP FUNCTION IF EXISTS _timescaledb_internal.timescale_trigger_names();
DROP FUNCTION IF EXISTS _timescaledb_internal.main_table_insert_trigger() CASCADE;
DROP FUNCTION IF EXISTS _timescaledb_internal.main_table_after_insert_trigger() CASCADE;
62 changes: 62 additions & 0 deletions src/chunk_dispatch.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include <postgres.h>
#include <nodes/extensible.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <utils/rel.h>
#include <catalog/pg_type.h>

#include "chunk_dispatch.h"
#include "chunk_insert_state.h"
#include "subspace_store.h"
#include "dimension.h"

ChunkDispatch *
chunk_dispatch_create(Hypertable *ht, EState *estate)
{
ChunkDispatch *cp = palloc(sizeof(ChunkDispatch));

cp->hypertable = ht;
cp->estate = estate;
cp->hypertable_result_rel_info = NULL;
cp->cache = subspace_store_init(HYPERSPACE_NUM_DIMENSIONS(ht->space), estate->es_query_cxt);
return cp;
}

void
chunk_dispatch_destroy(ChunkDispatch *cp)
{
subspace_store_free(cp->cache);
}

static void
destroy_chunk_insert_state(void *cis)
{
chunk_insert_state_destroy((ChunkInsertState *) cis);
}

/*
* Get the chunk insert state for the chunk that matches the given point in the
* partitioned hyperspace.
*/
extern ChunkInsertState *
chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *point)
{
ChunkInsertState *cis;

cis = subspace_store_get(dispatch->cache, point);

if (NULL == cis)
{
Chunk *new_chunk;

new_chunk = hypertable_get_chunk(dispatch->hypertable, point);

if (NULL == new_chunk)
elog(ERROR, "No chunk found or created");

cis = chunk_insert_state_create(new_chunk, dispatch);
subspace_store_add(dispatch->cache, new_chunk->cube, cis, destroy_chunk_insert_state);
}

return cis;
}
29 changes: 29 additions & 0 deletions src/chunk_dispatch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#ifndef TIMESCALEDB_CHUNK_DISPATCH_H
#define TIMESCALEDB_CHUNK_DISPATCH_H

#include <postgres.h>
#include <nodes/execnodes.h>

#include "hypertable_cache.h"
#include "cache.h"
#include "subspace_store.h"

/* ChunkDispatch keeps info needed to dispatch tuples to chunks. */
typedef struct ChunkDispatch
{
Hypertable *hypertable;
SubspaceStore *cache;
EState *estate;
/* Keep a pointer to the original (hypertable's) ResultRelInfo since we will
* reset the pointer in EState as we lookup new chunks. */
ResultRelInfo *hypertable_result_rel_info;
} ChunkDispatch;

typedef struct Point Point;
typedef struct ChunkInsertState ChunkInsertState;

ChunkDispatch *chunk_dispatch_create(Hypertable *, EState *);
void chunk_dispatch_destroy(ChunkDispatch *);
ChunkInsertState *chunk_dispatch_get_chunk_insert_state(ChunkDispatch *, Point *);

#endif /* TIMESCALEDB_CHUNK_DISPATCH_H */
69 changes: 69 additions & 0 deletions src/chunk_dispatch_plan.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include <postgres.h>
#include <nodes/extensible.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <utils/rel.h>
#include <catalog/pg_type.h>

#include "chunk_dispatch_plan.h"
#include "chunk_dispatch_state.h"

/*
* Create a ChunkDispatchState node from this plan. This is the full execution
* state that replaces the plan node as the plan moves from planning to
* execution.
*/
static Node *
create_chunk_dispatch_state(CustomScan *cscan)
{
ChunkDispatchInfo *info = linitial(cscan->custom_private);

return (Node *) chunk_dispatch_state_create(info->hypertable_relid,
linitial(cscan->custom_plans));
}

static CustomScanMethods chunk_dispatch_plan_methods = {
.CustomName = "ChunkDispatch",
.CreateCustomScanState = create_chunk_dispatch_state,
};

/* Create a chunk dispatch plan node in the form of a CustomScan node. The
* purpose of this plan node is to dispatch (route) tuples to the correct chunk
* in a hypertable.
*
* Note that CustomScan nodes cannot be extended (by struct embedding) because
* they might be copied, therefore we pass any extra info as a ChunkDispatchInfo
* in the custom_private field.
*
* The chunk dispatch plan takes the original tuple-producing subplan, which was
* part of a ModifyTable node, and uses this subplan to produce new tuples to
* dispatch.
*/
CustomScan *
chunk_dispatch_plan_create(Plan *subplan, Oid hypertable_relid)
{
CustomScan *cscan = makeNode(CustomScan);
ChunkDispatchInfo *info = palloc(sizeof(ChunkDispatchInfo));

info->hypertable_relid = hypertable_relid;
cscan->custom_private = list_make1(info);
cscan->methods = &chunk_dispatch_plan_methods;
cscan->custom_plans = list_make1(subplan);
cscan->scan.scanrelid = 0; /* Indicate this is not a real relation we are
* scanning */

/* Copy costs from the original plan */
cscan->scan.plan.startup_cost = subplan->startup_cost;
cscan->scan.plan.total_cost = subplan->total_cost;
cscan->scan.plan.plan_rows = subplan->plan_rows;
cscan->scan.plan.plan_width = subplan->plan_width;

/*
* Copy target list from parent table. This should work since hypertables
* mandate that chunks have identical column definitions
*/
cscan->scan.plan.targetlist = subplan->targetlist;
cscan->custom_scan_tlist = NIL;

return cscan;
}
14 changes: 14 additions & 0 deletions src/chunk_dispatch_plan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef TIMESCALEDB_CHUNK_DISPATCH_PLAN_H
#define TIMESCALEDB_CHUNK_DISPATCH_PLAN_H

#include <postgres.h>
#include <nodes/plannodes.h>

typedef struct ChunkDispatchInfo
{
Oid hypertable_relid;
} ChunkDispatchInfo;

extern CustomScan *chunk_dispatch_plan_create(Plan *subplan, Oid hypertable_relid);

#endif /* TIMESCALEDB_CHUNK_DISPATCH_PLAN_H */

0 comments on commit 1f3dcd8

Please sign in to comment.