Skip to content

Commit

Permalink
Introduce a FDW option to mark reference tables
Browse files Browse the repository at this point in the history
With this patch, the ability to mark reference tables (tables that exist
on all data nodes of a multi-node installation) via an FDW option has
been added.
  • Loading branch information
jnidzwetzki committed Jan 17, 2023
1 parent 5c897ff commit 0a5057c
Show file tree
Hide file tree
Showing 10 changed files with 877 additions and 0 deletions.
65 changes: 65 additions & 0 deletions tsl/src/fdw/option.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@
#include "scan_plan.h"

#include <access/reloptions.h>
#include <catalog/namespace.h>
#include <catalog/pg_foreign_server.h>
#include <catalog/pg_foreign_table.h>
#include <catalog/pg_foreign_data_wrapper.h>
#include <commands/defrem.h>
#include <commands/extension.h>
#include <utils/builtins.h>
#include <utils/regproc.h>
#include <utils/varlena.h>
#include <libpq-fe.h>

#include <remote/connection.h>
#include "option.h"
#include "chunk.h"

/*
* Describes the valid options for objects that this wrapper uses.
Expand Down Expand Up @@ -137,6 +140,11 @@ option_validate(List *options_list, Oid catalog)
/* This will throw an error if not a boolean */
defGetBoolean(def);
}
else if (strcmp(def->defname, "join_reference_tables") == 0)
{
/* check and store list, warn about non existing tables */
(void) option_extract_join_ref_table_list(defGetString(def));
}
}
}

Expand All @@ -160,6 +168,8 @@ init_ts_fdw_options(void)
{ "fetch_size", ForeignDataWrapperRelationId },
{ "fetch_size", ForeignServerRelationId },
{ "available", ForeignServerRelationId },
/* join reference tables */
{ "join_reference_tables", ForeignDataWrapperRelationId },
{ NULL, InvalidOid }
};

Expand Down Expand Up @@ -273,3 +283,58 @@ option_get_from_options_list_int(List *options, const char *optionname, int *val

return found;
}

List *
option_extract_join_ref_table_list(const char *table_string)
{
List *ref_table_oids = NIL;
List *ref_table_list;
ListCell *lc;

/* SplitIdentifierString scribbles on its input, so pstrdup first */
if (!SplitIdentifierString(pstrdup(table_string), ',', &ref_table_list))
{
/* syntax error in name list */
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("parameter \"%s\" must be a comma-separated list of reference table names",
"join_reference_tables")));
}

foreach (lc, ref_table_list)
{
char *tablename = (char *) lfirst(lc);

RangeVar *rangevar = makeRangeVarFromNameList(stringToQualifiedNameList(tablename));

Oid relOid = RangeVarGetRelidExtended(rangevar,
AccessShareLock,
RVR_MISSING_OK,
NULL /* callback */,
NULL /* callback args*/);

if (!OidIsValid(relOid))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("table \"%s\" does not exist", tablename)));
}

/* Validate the relation type */
Relation rel = table_open(relOid, NoLock);

if (rel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("relation \"%s\" is not an ordinary table. Only ordinary tables can be "
"used as reference tables",
tablename)));

ref_table_oids = lappend_oid(ref_table_oids, relOid);
table_close(rel, NoLock);
}

list_free(ref_table_list);

return ref_table_oids;
}
1 change: 1 addition & 0 deletions tsl/src/fdw/option.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

extern void option_validate(List *options_list, Oid catalog);
extern List *option_extract_extension_list(const char *extensions_string, bool warn_on_missing);
extern List *option_extract_join_ref_table_list(const char *join_tables);
extern bool option_get_from_options_list_int(List *options, const char *optionname, int *value);

#endif /* TIMESCALEDB_TSL_FDW_OPTION_H */
7 changes: 7 additions & 0 deletions tsl/src/fdw/relinfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ apply_fdw_and_server_options(TsFdwRelInfo *fpinfo)
option_extract_extension_list(defGetString(def), false));
else if (strcmp(def->defname, "fetch_size") == 0)
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
else if (strcmp(def->defname, "join_reference_tables") == 0)
{
/* This option can only be defined per FDW. So, no list_concat of
* FDW and server options is needed. */
fpinfo->join_reference_tables =
option_extract_join_ref_table_list(defGetString(def));
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions tsl/src/fdw/relinfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ typedef struct TsFdwRelInfo

/* Cached chunk data for the chunk relinfo. */
struct Chunk *chunk;

/* OIDs of join reference tables. */
List *join_reference_tables;
} TsFdwRelInfo;

extern TsFdwRelInfo *fdw_relinfo_create(PlannerInfo *root, RelOptInfo *rel, Oid server_oid,
Expand Down
173 changes: 173 additions & 0 deletions tsl/test/expected/dist_ref_table_join-12.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
\set ECHO all
\set DATA_NODE_1 :TEST_DBNAME _1
\set DATA_NODE_2 :TEST_DBNAME _2
\set DATA_NODE_3 :TEST_DBNAME _3
-- Add data nodes
SELECT node_name, database, node_created, database_created, extension_created
FROM (
SELECT (add_data_node(name, host => 'localhost', DATABASE => name)).*
FROM (VALUES (:'DATA_NODE_1'), (:'DATA_NODE_2'), (:'DATA_NODE_3')) v(name)
) a;
node_name | database | node_created | database_created | extension_created
--------------------------+--------------------------+--------------+------------------+-------------------
db_dist_ref_table_join_1 | db_dist_ref_table_join_1 | t | t | t
db_dist_ref_table_join_2 | db_dist_ref_table_join_2 | t | t | t
db_dist_ref_table_join_3 | db_dist_ref_table_join_3 | t | t | t
(3 rows)

GRANT USAGE ON FOREIGN SERVER :DATA_NODE_1, :DATA_NODE_2, :DATA_NODE_3 TO PUBLIC;
\des
List of foreign servers
Name | Owner | Foreign-data wrapper
--------------------------+--------------------+----------------------
db_dist_ref_table_join_1 | cluster_super_user | timescaledb_fdw
db_dist_ref_table_join_2 | cluster_super_user | timescaledb_fdw
db_dist_ref_table_join_3 | cluster_super_user | timescaledb_fdw
(3 rows)

drop table if exists metric;
NOTICE: table "metric" does not exist, skipping
CREATE table metric(ts timestamptz, id int, value float);
SELECT create_distributed_hypertable('metric', 'ts', 'id');
NOTICE: adding not-null constraint to column "ts"
create_distributed_hypertable
-------------------------------
(1,public,metric,t)
(1 row)

INSERT into metric values ('2022-02-02 02:02:02+03', 1, 50);
INSERT into metric values ('2020-01-01 01:01:01+03', 1, 60);
INSERT into metric values ('2000-03-03 03:03:03+03', 1, 70);
INSERT into metric values ('2000-04-04 04:04:03+03', 2, 80);
-- Reference table with generic replication
CREATE table metric_name(id int primary key, name text);
INSERT into metric_name values (1, 'cpu1');
INSERT into metric_name values (2, 'cpu2');
CALL distributed_exec($$CREATE table metric_name(id int primary key, name text);$$);
CALL distributed_exec($$INSERT into metric_name values (1, 'cpu1');$$);
CALL distributed_exec($$INSERT into metric_name values (2, 'cpu2');$$);
-- The reference table as DHT
CREATE TABLE metric_name_dht(id BIGSERIAL, name text);
SELECT create_distributed_hypertable('metric_name_dht', 'id', chunk_time_interval => 9223372036854775807, replication_factor => 3);
create_distributed_hypertable
-------------------------------
(2,public,metric_name_dht,t)
(1 row)

INSERT into metric_name_dht (id, name) values (1, 'cpu1');
INSERT into metric_name_dht (id, name) values (2, 'cpu2');
-- A local version of the reference table
CREATE table metric_name_local(id int primary key, name text);
INSERT into metric_name_local values (1, 'cpu1');
INSERT into metric_name_local values (2, 'cpu2');
CREATE table reference_table2(id int primary key, name text);
SELECT create_distributed_hypertable('reference_table2', 'id', chunk_time_interval => 2147483647, replication_factor => 3);
create_distributed_hypertable
-------------------------------
(3,public,reference_table2,t)
(1 row)

CREATE table local_table(id int primary key, name text);
SET client_min_messages TO WARNING;
-- Create a table in a different schema
CREATE SCHEMA test1;
GRANT CREATE ON SCHEMA test1 TO :ROLE_DEFAULT_PERM_USER;
GRANT USAGE ON SCHEMA test1 TO :ROLE_DEFAULT_PERM_USER;
CREATE table test1.table_in_schema(id int primary key, name text);
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
------------

(1 row)

ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (ADD join_reference_tables 'metric_name, reference_table2');
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (SET join_reference_tables 'metric_name, metric_name_dht');
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
--------------------------------------------------------
{"join_reference_tables=metric_name, metric_name_dht"}
(1 row)

\set ON_ERROR_STOP 0
-- Try to declare a non existing table as reference table
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (SET join_reference_tables 'metric_name, reference_table2, non_existing_table');
ERROR: table "non_existing_table" does not exist
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
--------------------------------------------------------
{"join_reference_tables=metric_name, metric_name_dht"}
(1 row)

-- Try to declare a hypertable as reference table
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (SET join_reference_tables 'metric_name, reference_table2, metric');
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
-----------------------------------------------------------------
{"join_reference_tables=metric_name, reference_table2, metric"}
(1 row)

-- Try to add an empty field
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (SET join_reference_tables 'metric_name, , metric');
ERROR: parameter "join_reference_tables" must be a comma-separated list of reference table names
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
-----------------------------------------------------------------
{"join_reference_tables=metric_name, reference_table2, metric"}
(1 row)

-- Try to declare a view as reference table
CREATE VIEW metric_name_view AS SELECT * FROM metric_name;
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (SET join_reference_tables 'metric_name, metric_name_view');
ERROR: relation "metric_name_view" is not an ordinary table. Only ordinary tables can be used as reference tables
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
-----------------------------------------------------------------
{"join_reference_tables=metric_name, reference_table2, metric"}
(1 row)

-- Try to use a table in a schema
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (SET join_reference_tables 'test1.table_in_schema');
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
-----------------------------------------------
{join_reference_tables=test1.table_in_schema}
(1 row)

-- Try to use a non-existing table in a schema
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (SET join_reference_tables 'test1.table_in_schema_non_existing');
ERROR: table "test1.table_in_schema_non_existing" does not exist
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
-----------------------------------------------
{join_reference_tables=test1.table_in_schema}
(1 row)

\set ON_ERROR_STOP 1
-- Set empty options
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (SET join_reference_tables '');
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
--------------------------
{join_reference_tables=}
(1 row)

-- Remove options
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (DROP join_reference_tables);
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
------------

(1 row)

-- Set options again
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (ADD join_reference_tables 'metric_name, metric_name_dht, reference_table2');
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
fdwoptions
--------------------------------------------------------------------------
{"join_reference_tables=metric_name, metric_name_dht, reference_table2"}
(1 row)

0 comments on commit 0a5057c

Please sign in to comment.