forked from timescale/timescaledb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
create.c
3421 lines (3083 loc) · 114 KB
/
create.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
/* This file contains the code for processing continuous aggregate
* DDL statements which are of the form:
*
* CREATE MATERIALIZED VIEW <name> WITH (ts_continuous = [option] )
* AS <select query>
* The entry point for the code is
* tsl_process_continuous_agg_viewstmt
* The bulk of the code that creates the underlying tables/views etc. is in
* cagg_create.
*
*/
#include <postgres.h>
#include <access/reloptions.h>
#include <access/sysattr.h>
#include <access/xact.h>
#include <catalog/index.h>
#include <catalog/indexing.h>
#include <catalog/pg_aggregate.h>
#include <catalog/pg_collation.h>
#include <catalog/pg_namespace.h>
#include <catalog/pg_trigger.h>
#include <catalog/pg_type.h>
#include <catalog/toasting.h>
#include <commands/defrem.h>
#include <commands/tablecmds.h>
#include <commands/tablespace.h>
#include <commands/trigger.h>
#include <commands/view.h>
#include <miscadmin.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <nodes/nodes.h>
#include <nodes/parsenodes.h>
#include <nodes/pg_list.h>
#include <optimizer/clauses.h>
#include <optimizer/optimizer.h>
#include <optimizer/tlist.h>
#include <parser/analyze.h>
#include <parser/parse_func.h>
#include <parser/parse_oper.h>
#include <parser/parse_relation.h>
#include <parser/parse_type.h>
#include <rewrite/rewriteHandler.h>
#include <rewrite/rewriteManip.h>
#include <utils/acl.h>
#include <utils/rel.h>
#include <utils/builtins.h>
#include <utils/catcache.h>
#include <utils/regproc.h>
#include <utils/ruleutils.h>
#include <utils/syscache.h>
#include <utils/typcache.h>
#include <optimizer/prep.h>
#include "create.h"
#include "ts_catalog/catalog.h"
#include "ts_catalog/continuous_agg.h"
#include "dimension.h"
#include "extension_constants.h"
#include "func_cache.h"
#include "hypertable_cache.h"
#include "hypertable.h"
#include "invalidation.h"
#include "dimension.h"
#include "ts_catalog/continuous_agg.h"
#include "options.h"
#include "time_utils.h"
#include "utils.h"
#include "errors.h"
#include "refresh.h"
#include "remote/dist_commands.h"
#include "ts_catalog/hypertable_data_node.h"
#include "deparse.h"
#include "timezones.h"
#define FINALFN "finalize_agg"
#define PARTIALFN "partialize_agg"
#define CHUNKIDFROMRELID "chunk_id_from_relid"
#define DEFAULT_MATPARTCOLUMN_NAME "time_partition_col"
#define MATPARTCOL_INTERVAL_FACTOR 10
#define HT_DEFAULT_CHUNKFN "calculate_chunk_interval"
#define CAGG_INVALIDATION_TRIGGER "continuous_agg_invalidation_trigger"
#define BOUNDARY_FUNCTION "cagg_watermark"
#define INTERNAL_TO_DATE_FUNCTION "to_date"
#define INTERNAL_TO_TSTZ_FUNCTION "to_timestamp"
#define INTERNAL_TO_TS_FUNCTION "to_timestamp_without_timezone"
#define CONTINUOUS_AGG_MAX_JOIN_RELATIONS 2
#define PRINT_MATCOLNAME(colbuf, type, original_query_resno, colno) \
do \
{ \
int ret = snprintf(colbuf, NAMEDATALEN, "%s_%d_%d", type, original_query_resno, colno); \
if (ret < 0 || ret >= NAMEDATALEN) \
ereport(ERROR, \
(errcode(ERRCODE_INTERNAL_ERROR), \
errmsg("bad materialization table column name"))); \
} while (0);
#define PRINT_MATINTERNAL_NAME(buf, prefix, hypertable_id) \
do \
{ \
int ret = snprintf(buf, NAMEDATALEN, prefix, hypertable_id); \
if (ret < 0 || ret > NAMEDATALEN) \
{ \
ereport(ERROR, \
(errcode(ERRCODE_INTERNAL_ERROR), \
errmsg("bad materialization internal name"))); \
} \
} while (0);
/* Note that we set rowsecurity to false here */
#define CAGG_MAKEQUERY(selquery, srcquery) \
do \
{ \
(selquery) = makeNode(Query); \
(selquery)->commandType = CMD_SELECT; \
(selquery)->querySource = (srcquery)->querySource; \
(selquery)->queryId = (srcquery)->queryId; \
(selquery)->canSetTag = (srcquery)->canSetTag; \
(selquery)->utilityStmt = copyObject((srcquery)->utilityStmt); \
(selquery)->resultRelation = 0; \
(selquery)->hasAggs = true; \
(selquery)->hasRowSecurity = false; \
(selquery)->rtable = NULL; \
} while (0);
typedef struct MatTableColumnInfo
{
List *matcollist; /* column defns for materialization tbl*/
List *partial_seltlist; /* tlist entries for populating the materialization table columns */
List *partial_grouplist; /* group clauses used for populating the materialization table */
List *mat_groupcolname_list; /* names of columns that are populated by the group-by clause
correspond to the partial_grouplist.
time_bucket column is not included here: it is the
matpartcolname */
int matpartcolno; /*index of partitioning column in matcollist */
char *matpartcolname; /*name of the partition column */
} MatTableColumnInfo;
typedef struct FinalizeQueryInfo
{
List *final_seltlist; /* select target list for finalize query */
Node *final_havingqual; /* having qual for finalize query */
Query *final_userquery; /* user query used to compute the finalize_query */
bool finalized; /* finalized form? */
} FinalizeQueryInfo;
typedef struct CAggTimebucketInfo
{
int32 htid; /* hypertable id */
int32 parent_mat_hypertable_id; /* parent materialization hypertable id */
Oid htoid; /* hypertable oid */
AttrNumber htpartcolno; /* primary partitioning column of raw hypertable */
/* This should also be the column used by time_bucket */
Oid htpartcoltype;
int64 htpartcol_interval_len; /* interval length setting for primary partitioning column */
int64 bucket_width; /* bucket_width of time_bucket, stores BUCKET_WIDTH_VARIABLE for
variable-sized buckets */
Oid bucket_width_type; /* type of bucket_width */
Interval *interval; /* stores the interval, NULL if not specified */
const char *timezone; /* the name of the timezone, NULL if not specified */
FuncExpr *bucket_func; /* function call expr of the bucketing function */
/*
* Custom origin value stored as UTC timestamp.
* If not specified, stores infinity.
*/
Timestamp origin;
} CAggTimebucketInfo;
typedef struct AggPartCxt
{
struct MatTableColumnInfo *mattblinfo;
bool added_aggref_col;
bool var_outside_of_aggref; /* Set to true when you come across a Var that is not inside an
Aggref node */
Oid ignore_aggoid;
int original_query_resno;
/*
* "Original variables" are the Var nodes of the target list of the original
* CREATE MATERIALIZED VIEW query. "Mapped variables" are the Var nodes of the materialization
* table columns. The partialization query is the one that populates those columns. The
* finalization query should use the "mapped variables" to populate the user view.
*/
List *orig_vars; /* List of Var nodes that have been mapped to materialization table columns */
List *mapped_vars; /* List of Var nodes of the corresponding materialization table columns */
/* orig_vars and mapped_vars lists are mapped 1 to 1 */
} AggPartCxt;
/* STATIC functions defined on the structs above */
static void mattablecolumninfo_init(MatTableColumnInfo *matcolinfo, List *grouplist);
static Var *mattablecolumninfo_addentry(MatTableColumnInfo *out, Node *input,
int original_query_resno, bool finalized,
bool *skip_adding);
static void mattablecolumninfo_addinternal(MatTableColumnInfo *matcolinfo);
static int32 mattablecolumninfo_create_materialization_table(
MatTableColumnInfo *matcolinfo, int32 hypertable_id, RangeVar *mat_rel,
CAggTimebucketInfo *origquery_tblinfo, bool create_addl_index, char *tablespacename,
char *table_access_method, ObjectAddress *mataddress);
static Query *mattablecolumninfo_get_partial_select_query(MatTableColumnInfo *mattblinfo,
Query *userview_query, bool finalized);
static void caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id,
Oid hypertable_oid, AttrNumber hypertable_partition_colno,
Oid hypertable_partition_coltype,
int64 hypertable_partition_col_interval,
int32 parent_mat_hypertable_id);
static void caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause,
List *targetList);
static void finalizequery_init(FinalizeQueryInfo *inp, Query *orig_query,
MatTableColumnInfo *mattblinfo);
static Query *finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
ObjectAddress *mattbladdress, char *relname);
static bool function_allowed_in_cagg_definition(Oid funcid);
static Const *cagg_boundary_make_lower_bound(Oid type);
static Oid cagg_get_boundary_converter_funcoid(Oid typoid);
static Oid relation_oid(NameData schema, NameData name);
static Query *build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Query *q1, Query *q2,
int materialize_htid);
static Query *destroy_union_query(Query *q);
/* create a entry for the materialization table in table CONTINUOUS_AGGS */
static void
create_cagg_catalog_entry(int32 matht_id, int32 rawht_id, const char *user_schema,
const char *user_view, const char *partial_schema,
const char *partial_view, int64 bucket_width, bool materialized_only,
const char *direct_schema, const char *direct_view, const bool finalized,
const int32 parent_mat_hypertable_id)
{
Catalog *catalog = ts_catalog_get();
Relation rel;
TupleDesc desc;
NameData user_schnm, user_viewnm, partial_schnm, partial_viewnm, direct_schnm, direct_viewnm;
Datum values[Natts_continuous_agg];
bool nulls[Natts_continuous_agg] = { false };
CatalogSecurityContext sec_ctx;
namestrcpy(&user_schnm, user_schema);
namestrcpy(&user_viewnm, user_view);
namestrcpy(&partial_schnm, partial_schema);
namestrcpy(&partial_viewnm, partial_view);
namestrcpy(&direct_schnm, direct_schema);
namestrcpy(&direct_viewnm, direct_view);
rel = table_open(catalog_get_table_id(catalog, CONTINUOUS_AGG), RowExclusiveLock);
desc = RelationGetDescr(rel);
memset(values, 0, sizeof(values));
values[AttrNumberGetAttrOffset(Anum_continuous_agg_mat_hypertable_id)] = matht_id;
values[AttrNumberGetAttrOffset(Anum_continuous_agg_raw_hypertable_id)] = rawht_id;
if (parent_mat_hypertable_id == INVALID_HYPERTABLE_ID)
nulls[AttrNumberGetAttrOffset(Anum_continuous_agg_parent_mat_hypertable_id)] = true;
else
{
values[AttrNumberGetAttrOffset(Anum_continuous_agg_parent_mat_hypertable_id)] =
parent_mat_hypertable_id;
}
values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_schema)] =
NameGetDatum(&user_schnm);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_name)] =
NameGetDatum(&user_viewnm);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_schema)] =
NameGetDatum(&partial_schnm);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_name)] =
NameGetDatum(&partial_viewnm);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_bucket_width)] = Int64GetDatum(bucket_width);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_schema)] =
NameGetDatum(&direct_schnm);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_name)] =
NameGetDatum(&direct_viewnm);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_materialize_only)] =
BoolGetDatum(materialized_only);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_finalized)] = BoolGetDatum(finalized);
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
ts_catalog_restore_user(&sec_ctx);
table_close(rel, RowExclusiveLock);
}
/* create a entry for the materialization table in table CONTINUOUS_AGGS_BUCKET_FUNCTION */
static void
create_bucket_function_catalog_entry(int32 matht_id, bool experimental, const char *name,
const char *bucket_width, const char *origin,
const char *timezone)
{
Catalog *catalog = ts_catalog_get();
Relation rel;
TupleDesc desc;
Datum values[Natts_continuous_aggs_bucket_function];
bool nulls[Natts_continuous_aggs_bucket_function] = { false };
CatalogSecurityContext sec_ctx;
rel = table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_BUCKET_FUNCTION),
RowExclusiveLock);
desc = RelationGetDescr(rel);
memset(values, 0, sizeof(values));
values[AttrNumberGetAttrOffset(Anum_continuous_agg_mat_hypertable_id)] = matht_id;
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_experimental)] =
BoolGetDatum(experimental);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_name)] =
CStringGetTextDatum(name);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_bucket_width)] =
CStringGetTextDatum(bucket_width);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_origin)] =
CStringGetTextDatum(origin);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_bucket_function_timezone)] =
CStringGetTextDatum(timezone ? timezone : "");
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
ts_catalog_restore_user(&sec_ctx);
table_close(rel, RowExclusiveLock);
}
/* create hypertable for the table referred by mat_tbloid
* matpartcolname - partition column for hypertable
* timecol_interval - is the partitioning column's interval for hypertable partition
*/
static void
cagg_create_hypertable(int32 hypertable_id, Oid mat_tbloid, const char *matpartcolname,
int64 mat_tbltimecol_interval)
{
bool created;
int flags = 0;
NameData mat_tbltimecol;
DimensionInfo *time_dim_info;
ChunkSizingInfo *chunk_sizing_info;
namestrcpy(&mat_tbltimecol, matpartcolname);
time_dim_info = ts_dimension_info_create_open(mat_tbloid,
&mat_tbltimecol,
Int64GetDatum(mat_tbltimecol_interval),
INT8OID,
InvalidOid);
/* Ideally would like to change/expand the API so setting the column name manually is
* unnecessary, but not high priority */
chunk_sizing_info = ts_chunk_sizing_info_get_default_disabled(mat_tbloid);
chunk_sizing_info->colname = matpartcolname;
created = ts_hypertable_create_from_info(mat_tbloid,
hypertable_id,
flags,
time_dim_info,
NULL,
NULL,
NULL,
chunk_sizing_info,
HYPERTABLE_REGULAR,
NULL);
if (!created)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not create materialization hypertable")));
}
static bool
check_trigger_exists_hypertable(Oid relid, char *trigname)
{
Relation tgrel;
ScanKeyData skey[1];
SysScanDesc tgscan;
HeapTuple tuple;
bool trg_found = false;
tgrel = table_open(TriggerRelationId, AccessShareLock);
ScanKeyInit(&skey[0],
Anum_pg_trigger_tgrelid,
BTEqualStrategyNumber,
F_OIDEQ,
ObjectIdGetDatum(relid));
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true, NULL, 1, skey);
while (HeapTupleIsValid(tuple = systable_getnext(tgscan)))
{
Form_pg_trigger trig = (Form_pg_trigger) GETSTRUCT(tuple);
if (namestrcmp(&(trig->tgname), trigname) == 0)
{
trg_found = true;
break;
}
}
systable_endscan(tgscan);
table_close(tgrel, AccessShareLock);
return trg_found;
}
/* add continuous agg invalidation trigger to hypertable
* relid - oid of hypertable
* hypertableid - argument to pass to trigger (the hypertable id from timescaledb catalog)
*/
static void
cagg_add_trigger_hypertable(Oid relid, int32 hypertable_id)
{
char hypertable_id_str[12];
ObjectAddress objaddr;
char *relname = get_rel_name(relid);
Oid schemaid = get_rel_namespace(relid);
char *schema = get_namespace_name(schemaid);
Cache *hcache;
Hypertable *ht;
CreateTrigStmt stmt_template = {
.type = T_CreateTrigStmt,
.row = true,
.timing = TRIGGER_TYPE_AFTER,
.trigname = CAGGINVAL_TRIGGER_NAME,
.relation = makeRangeVar(schema, relname, -1),
.funcname =
list_make2(makeString(INTERNAL_SCHEMA_NAME), makeString(CAGG_INVALIDATION_TRIGGER)),
.args = NIL, /* to be filled in later */
.events = TRIGGER_TYPE_INSERT | TRIGGER_TYPE_UPDATE | TRIGGER_TYPE_DELETE,
};
if (check_trigger_exists_hypertable(relid, CAGGINVAL_TRIGGER_NAME))
return;
ht = ts_hypertable_cache_get_cache_and_entry(relid, CACHE_FLAG_NONE, &hcache);
if (hypertable_is_distributed(ht))
{
DistCmdResult *result;
List *data_node_list = ts_hypertable_get_data_node_name_list(ht);
List *cmd_descriptors = NIL; /* same order as ht->data_nodes */
DistCmdDescr *cmd_descr_data = NULL;
ListCell *cell;
unsigned i = 0;
cmd_descr_data = palloc(list_length(data_node_list) * sizeof(*cmd_descr_data));
foreach (cell, ht->data_nodes)
{
HypertableDataNode *node = lfirst(cell);
char node_hypertable_id_str[12];
CreateTrigStmt remote_stmt = stmt_template;
pg_ltoa(node->fd.node_hypertable_id, node_hypertable_id_str);
pg_ltoa(node->fd.hypertable_id, hypertable_id_str);
remote_stmt.args =
list_make2(makeString(node_hypertable_id_str), makeString(hypertable_id_str));
cmd_descr_data[i].sql = deparse_create_trigger(&remote_stmt);
cmd_descr_data[i].params = NULL;
cmd_descriptors = lappend(cmd_descriptors, &cmd_descr_data[i++]);
}
result =
ts_dist_multi_cmds_params_invoke_on_data_nodes(cmd_descriptors, data_node_list, true);
if (result)
ts_dist_cmd_close_response(result);
/*
* FALL-THROUGH
* We let the Access Node create a trigger as well, even though it is not used for data
* modifications. We use the Access Node trigger as a check for existence of the remote
* triggers.
*/
}
CreateTrigStmt local_stmt = stmt_template;
pg_ltoa(hypertable_id, hypertable_id_str);
local_stmt.args = list_make1(makeString(hypertable_id_str));
objaddr = ts_hypertable_create_trigger(ht, &local_stmt, NULL);
if (!OidIsValid(objaddr.objectId))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not create continuous aggregate trigger")));
ts_cache_release(hcache);
}
/* add additional indexes to materialization table for the columns derived from
* the group-by column list of the partial select query
* if partial select query has:
* GROUP BY timebucket_expr, <grpcol1, grpcol2, grpcol3 ...>
* index on mattable is <grpcol1, timebucketcol>, <grpcol2, timebucketcol> ... and so on.
* i.e. #indexes =( #grp-cols - 1)
*/
static void
mattablecolumninfo_add_mattable_index(MatTableColumnInfo *matcolinfo, Hypertable *ht)
{
IndexStmt stmt = {
.type = T_IndexStmt,
.accessMethod = DEFAULT_INDEX_TYPE,
.idxname = NULL,
.relation = makeRangeVar(NameStr(ht->fd.schema_name), NameStr(ht->fd.table_name), 0),
.tableSpace = get_tablespace_name(get_rel_tablespace(ht->main_table_relid)),
};
IndexElem timeelem = { .type = T_IndexElem,
.name = matcolinfo->matpartcolname,
.ordering = SORTBY_DESC };
ListCell *le = NULL;
foreach (le, matcolinfo->mat_groupcolname_list)
{
NameData indxname;
ObjectAddress indxaddr;
HeapTuple indxtuple;
char *grpcolname = (char *) lfirst(le);
IndexElem grpelem = { .type = T_IndexElem, .name = grpcolname };
stmt.indexParams = list_make2(&grpelem, &timeelem);
indxaddr = DefineIndex(ht->main_table_relid,
&stmt,
InvalidOid, /* indexRelationId */
InvalidOid, /* parentIndexId */
InvalidOid, /* parentConstraintId */
false, /* is_alter_table */
false, /* check_rights */
false, /* check_not_in_use */
false, /* skip_build */
false); /* quiet */
indxtuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indxaddr.objectId));
if (!HeapTupleIsValid(indxtuple))
elog(ERROR, "cache lookup failed for index relid %u", indxaddr.objectId);
indxname = ((Form_pg_class) GETSTRUCT(indxtuple))->relname;
elog(DEBUG1,
"adding index %s ON %s.%s USING BTREE(%s, %s)",
NameStr(indxname),
NameStr(ht->fd.schema_name),
NameStr(ht->fd.table_name),
grpcolname,
matcolinfo->matpartcolname);
ReleaseSysCache(indxtuple);
}
}
/*
* Create the materialization hypertable root by faking up a
* CREATE TABLE parsetree and passing it to DefineRelation.
* Reuse the information from ViewStmt:
* Remove the options on the into clause that we will not honour
* Modify the relname to ts_internal_<name>
*
* Parameters:
* mat_rel: relation information for the materialization table
* origquery_tblinfo: - user query's tbale information. used for setting up
* thr partitioning of the hypertable.
* tablespace_name: Name of the tablespace for the materialization table.
* table_access_method: Name of the table access method to use for the
* materialization table.
* mataddress: return the ObjectAddress RETURNS: hypertable id of the
* materialization table
*/
static int32
mattablecolumninfo_create_materialization_table(MatTableColumnInfo *matcolinfo, int32 hypertable_id,
RangeVar *mat_rel,
CAggTimebucketInfo *origquery_tblinfo,
bool create_addl_index, char *const tablespacename,
char *const table_access_method,
ObjectAddress *mataddress)
{
Oid uid, saved_uid;
int sec_ctx;
char *matpartcolname = matcolinfo->matpartcolname;
CreateStmt *create;
Datum toast_options;
int64 matpartcol_interval;
static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
int32 mat_htid;
Oid mat_relid;
Cache *hcache;
Hypertable *mat_ht = NULL, *orig_ht = NULL;
Oid owner = GetUserId();
create = makeNode(CreateStmt);
create->relation = mat_rel;
create->tableElts = matcolinfo->matcollist;
create->inhRelations = NIL;
create->ofTypename = NULL;
create->constraints = NIL;
create->options = NULL;
create->oncommit = ONCOMMIT_NOOP;
create->tablespacename = tablespacename;
create->accessMethod = table_access_method;
create->if_not_exists = false;
/* Create the materialization table. */
SWITCH_TO_TS_USER(mat_rel->schemaname, uid, saved_uid, sec_ctx);
*mataddress = DefineRelation(create, RELKIND_RELATION, owner, NULL, NULL);
CommandCounterIncrement();
mat_relid = mataddress->objectId;
/* NewRelationCreateToastTable calls CommandCounterIncrement */
toast_options =
transformRelOptions((Datum) 0, create->options, "toast", validnsps, true, false);
(void) heap_reloptions(RELKIND_TOASTVALUE, toast_options, true);
NewRelationCreateToastTable(mat_relid, toast_options);
RESTORE_USER(uid, saved_uid, sec_ctx);
/*convert the mat. table to a hypertable */
matpartcol_interval = MATPARTCOL_INTERVAL_FACTOR * (origquery_tblinfo->htpartcol_interval_len);
cagg_create_hypertable(hypertable_id, mat_relid, matpartcolname, matpartcol_interval);
/* retrieve the hypertable id from the cache */
mat_ht = ts_hypertable_cache_get_cache_and_entry(mat_relid, CACHE_FLAG_NONE, &hcache);
mat_htid = mat_ht->fd.id;
/* create additional index on the group-by columns for the materialization table */
if (create_addl_index)
mattablecolumninfo_add_mattable_index(matcolinfo, mat_ht);
/* Initialize the invalidation log for the cagg. Initially, everything is
* invalid. Add an infinite invalidation for the continuous
* aggregate. This is the initial state of the aggregate before any
* refreshes. */
orig_ht = ts_hypertable_cache_get_entry(hcache, origquery_tblinfo->htoid, CACHE_FLAG_NONE);
continuous_agg_invalidate_mat_ht(orig_ht, mat_ht, TS_TIME_NOBEGIN, TS_TIME_NOEND);
ts_cache_release(hcache);
return mat_htid;
}
/* Use the userview query to create the partial query to populate
* the materialization columns and remove HAVING clause and ORDER BY
*/
static Query *
mattablecolumninfo_get_partial_select_query(MatTableColumnInfo *mattblinfo, Query *userview_query,
bool finalized)
{
Query *partial_selquery;
CAGG_MAKEQUERY(partial_selquery, userview_query);
partial_selquery->rtable = copyObject(userview_query->rtable);
partial_selquery->jointree = copyObject(userview_query->jointree);
partial_selquery->targetList = mattblinfo->partial_seltlist;
partial_selquery->groupClause = mattblinfo->partial_grouplist;
if (finalized)
{
partial_selquery->havingQual = copyObject(userview_query->havingQual);
partial_selquery->sortClause = copyObject(userview_query->sortClause);
}
else
{
partial_selquery->havingQual = NULL;
partial_selquery->sortClause = NULL;
}
return partial_selquery;
}
/* create a view for the query using the SELECt stmt sqlquery
* and view name from RangeVar viewrel
*/
static ObjectAddress
create_view_for_query(Query *selquery, RangeVar *viewrel)
{
Oid uid, saved_uid;
int sec_ctx;
ObjectAddress address;
CreateStmt *create;
List *selcollist = NIL;
Oid owner = GetUserId();
ListCell *lc;
foreach (lc, selquery->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (!tle->resjunk)
{
ColumnDef *col = makeColumnDef(tle->resname,
exprType((Node *) tle->expr),
exprTypmod((Node *) tle->expr),
exprCollation((Node *) tle->expr));
selcollist = lappend(selcollist, col);
}
}
create = makeNode(CreateStmt);
create->relation = viewrel;
create->tableElts = selcollist;
create->inhRelations = NIL;
create->ofTypename = NULL;
create->constraints = NIL;
create->options = NULL;
create->oncommit = ONCOMMIT_NOOP;
create->tablespacename = NULL;
create->if_not_exists = false;
/* Create the view. viewname is in viewrel.
*/
SWITCH_TO_TS_USER(viewrel->schemaname, uid, saved_uid, sec_ctx);
address = DefineRelation(create, RELKIND_VIEW, owner, NULL, NULL);
CommandCounterIncrement();
StoreViewQuery(address.objectId, selquery, false);
CommandCounterIncrement();
RESTORE_USER(uid, saved_uid, sec_ctx);
return address;
}
/* initialize caggtimebucket */
static void
caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id, Oid hypertable_oid,
AttrNumber hypertable_partition_colno, Oid hypertable_partition_coltype,
int64 hypertable_partition_col_interval, int32 parent_mat_hypertable_id)
{
src->htid = hypertable_id;
src->parent_mat_hypertable_id = parent_mat_hypertable_id;
src->htoid = hypertable_oid;
src->htpartcolno = hypertable_partition_colno;
src->htpartcoltype = hypertable_partition_coltype;
src->htpartcol_interval_len = hypertable_partition_col_interval;
src->bucket_width = 0; /* invalid value */
src->bucket_width_type = InvalidOid; /* invalid oid */
src->interval = NULL; /* not specified by default */
src->timezone = NULL; /* not specified by default */
TIMESTAMP_NOBEGIN(src->origin); /* origin is not specified by default */
}
static Const *
check_time_bucket_argument(Node *arg, char *position)
{
if (IsA(arg, NamedArgExpr))
arg = (Node *) castNode(NamedArgExpr, arg)->arg;
Node *expr = eval_const_expressions(NULL, arg);
if (!IsA(expr, Const))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("only immutable expressions allowed in time bucket function"),
errhint("Use an immutable expression as %s argument to the time bucket function.",
position)));
return castNode(Const, expr);
}
/*
* Check if the group-by clauses has exactly 1 time_bucket(.., <col>) where
* <col> is the hypertable's partitioning column and other invariants. Then fill
* the `bucket_width` and other fields of `tbinfo`.
*/
static void
caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList)
{
ListCell *l;
bool found = false;
bool custom_origin = false;
/* Make sure tbinfo was initialized. This assumption is used below. */
Assert(tbinfo->bucket_width == 0);
Assert(tbinfo->timezone == NULL);
Assert(TIMESTAMP_NOT_FINITE(tbinfo->origin));
foreach (l, groupClause)
{
SortGroupClause *sgc = (SortGroupClause *) lfirst(l);
TargetEntry *tle = get_sortgroupclause_tle(sgc, targetList);
if (IsA(tle->expr, FuncExpr))
{
FuncExpr *fe = ((FuncExpr *) tle->expr);
Node *width_arg;
Node *col_arg;
if (!function_allowed_in_cagg_definition(fe->funcid))
continue;
/*
* offset variants of time_bucket functions are not
* supported at the moment.
*/
if (list_length(fe->args) >= 5 ||
(list_length(fe->args) == 4 && exprType(lfourth(fe->args)) == INTERVALOID))
continue;
if (found)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("continuous aggregate view cannot contain"
" multiple time bucket functions")));
else
found = true;
tbinfo->bucket_func = fe;
/* only column allowed : time_bucket('1day', <column> ) */
col_arg = lsecond(fe->args);
if (!(IsA(col_arg, Var)) || ((Var *) col_arg)->varattno != tbinfo->htpartcolno)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"time bucket function must reference a hypertable dimension column")));
if (list_length(fe->args) >= 3)
{
Const *arg = check_time_bucket_argument(lthird(fe->args), "third");
if (exprType((Node *) arg) == TEXTOID)
{
const char *tz_name = TextDatumGetCString(arg->constvalue);
if (!ts_is_valid_timezone_name(tz_name))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid timezone name \"%s\"", tz_name)));
}
tbinfo->timezone = tz_name;
tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE;
}
}
if (list_length(fe->args) >= 4)
{
Const *arg = check_time_bucket_argument(lfourth(fe->args), "fourth");
if (exprType((Node *) arg) == TEXTOID)
{
const char *tz_name = TextDatumGetCString(arg->constvalue);
if (!ts_is_valid_timezone_name(tz_name))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid timezone name \"%s\"", tz_name)));
}
tbinfo->timezone = tz_name;
tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE;
}
}
/* check for custom origin */
switch (exprType(col_arg))
{
case DATEOID:
/* origin is always 3rd arg for date variants */
if (list_length(fe->args) == 3)
{
custom_origin = true;
tbinfo->origin = DatumGetTimestamp(
DirectFunctionCall1(date_timestamp,
castNode(Const, lthird(fe->args))->constvalue));
}
break;
case TIMESTAMPOID:
/* origin is always 3rd arg for timestamp variants */
if (list_length(fe->args) == 3)
{
custom_origin = true;
tbinfo->origin =
DatumGetTimestamp(castNode(Const, lthird(fe->args))->constvalue);
}
break;
case TIMESTAMPTZOID:
/* origin can be 3rd or 4th arg for timestamptz variants */
if (list_length(fe->args) >= 3 && exprType(lthird(fe->args)) == TIMESTAMPTZOID)
{
custom_origin = true;
tbinfo->origin =
DatumGetTimestampTz(castNode(Const, lthird(fe->args))->constvalue);
}
else if (list_length(fe->args) >= 4 &&
exprType(lfourth(fe->args)) == TIMESTAMPTZOID)
{
custom_origin = true;
tbinfo->origin =
DatumGetTimestampTz(castNode(Const, lfourth(fe->args))->constvalue);
}
}
if (custom_origin && TIMESTAMP_NOT_FINITE(tbinfo->origin))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid origin value: infinity")));
}
/*
* We constify width expression here so any immutable expression will be allowed
* otherwise it would make it harder to create caggs for hypertables with e.g. int8
* partitioning column as int constants default to int4 and so expression would
* have a cast and not be a Const.
*/
width_arg = eval_const_expressions(NULL, linitial(fe->args));
if (IsA(width_arg, Const))
{
Const *width = castNode(Const, width_arg);
tbinfo->bucket_width_type = width->consttype;
if (width->consttype == INTERVALOID)
{
tbinfo->interval = DatumGetIntervalP(width->constvalue);
if (tbinfo->interval->month != 0)
tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE;
}
if (tbinfo->bucket_width != BUCKET_WIDTH_VARIABLE)
{
/* The bucket size is fixed */
tbinfo->bucket_width =
ts_interval_value_to_internal(width->constvalue, width->consttype);
}
}
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("only immutable expressions allowed in time bucket function"),
errhint("Use an immutable expression as first argument"
" to the time bucket function.")));
if (tbinfo->interval && tbinfo->interval->month)
{
tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE;
}
}
}
if (tbinfo->bucket_width == BUCKET_WIDTH_VARIABLE)
{
/* variable-sized buckets can be used only with intervals */
Assert(tbinfo->interval != NULL);
if ((tbinfo->interval->month != 0) &&
((tbinfo->interval->day != 0) || (tbinfo->interval->time != 0)))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("invalid interval specified"),
errhint("Use either months or days and hours, but not months, days and hours "
"together")));
}
}
if (!found)
elog(ERROR, "continuous aggregate view must include a valid time bucket function");
}
static bool
cagg_agg_validate(Node *node, void *context)
{
if (node == NULL)
return false;
if (IsA(node, Aggref))
{
Aggref *agg = (Aggref *) node;
HeapTuple aggtuple;
Form_pg_aggregate aggform;
if (agg->aggorder || agg->aggdistinct || agg->aggfilter)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregates with FILTER / DISTINCT / ORDER BY are not supported")));
}
/* Fetch the pg_aggregate row */
aggtuple = SearchSysCache1(AGGFNOID, agg->aggfnoid);
if (!HeapTupleIsValid(aggtuple))
elog(ERROR, "cache lookup failed for aggregate %u", agg->aggfnoid);
aggform = (Form_pg_aggregate) GETSTRUCT(aggtuple);
if (aggform->aggkind != 'n')
{
ReleaseSysCache(aggtuple);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("ordered set/hypothetical aggregates are not supported")));
}
if (!OidIsValid(aggform->aggcombinefn) ||
(aggform->aggtranstype == INTERNALOID && !OidIsValid(aggform->aggdeserialfn)))
{
ReleaseSysCache(aggtuple);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregates which are not parallelizable are not supported")));
}
ReleaseSysCache(aggtuple);
return false;
}
return expression_tree_walker(node, cagg_agg_validate, context);
}
/*
* Check query and extract error details and error hints.
*
* Returns:
* True if the query is supported, false otherwise with hints and errors
* added.
*/
static bool
cagg_query_supported(const Query *query, StringInfo hint, StringInfo detail, const bool finalized)
{
/*
* For now deprecate partial aggregates on release builds only.
* Once migration tests are made compatible with PG15 enable deprecation
* on debug builds as well.
*/
#ifndef DEBUG
#if PG15_GE
if (!finalized)
{
/* continuous aggregates with old format will not be allowed */
appendStringInfoString(detail,
"Continuous Aggregates with partials is not supported anymore.");
appendStringInfoString(hint,
"Define the Continuous Aggregate with \"finalized\" parameter set "
"to true.");
return false;
}
#endif
#endif