This repository has been archived by the owner on Apr 2, 2024. It is now read-only.
/
base.sql
2681 lines (2444 loc) · 106 KB
/
base.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
--NOTES
--This code assumes that table names can only be 63 chars long
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_default_chunk_interval()
RETURNS INTERVAL
AS $func$
SELECT value::INTERVAL FROM SCHEMA_CATALOG.default WHERE key='chunk_interval';
$func$
LANGUAGE SQL STABLE PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_default_chunk_interval() TO prom_reader;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_timescale_major_version()
RETURNS INT
AS $func$
SELECT split_part(extversion, '.', 1)::INT FROM pg_catalog.pg_extension WHERE extname='timescaledb' LIMIT 1;
$func$
LANGUAGE SQL STABLE PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_timescale_major_version() TO prom_reader;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_default_retention_period()
RETURNS INTERVAL
AS $func$
SELECT value::INTERVAL FROM SCHEMA_CATALOG.default WHERE key='retention_period';
$func$
LANGUAGE SQL STABLE PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_default_retention_period() TO prom_reader;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.is_timescaledb_installed()
RETURNS BOOLEAN
AS $func$
SELECT count(*) > 0 FROM pg_extension WHERE extname='timescaledb';
$func$
LANGUAGE SQL STABLE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.is_timescaledb_installed() TO prom_reader;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.is_multinode()
RETURNS BOOLEAN
AS $func$
DECLARE
is_distributed BOOLEAN = false;
BEGIN
IF SCHEMA_CATALOG.get_timescale_major_version() >= 2 THEN
SELECT count(*) > 0 FROM timescaledb_information.data_nodes
INTO is_distributed;
END IF;
RETURN is_distributed;
EXCEPTION WHEN SQLSTATE '42P01' THEN -- Timescale 1.x, never distributed
RETURN false;
END
$func$
LANGUAGE PLPGSQL STABLE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.is_multinode() TO prom_reader;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_default_compression_setting()
RETURNS BOOLEAN
AS $func$
SELECT value::BOOLEAN FROM SCHEMA_CATALOG.default WHERE key='metric_compression';
$func$
LANGUAGE SQL STABLE PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_default_compression_setting() TO prom_reader;
--Add 1% of randomness to the interval so that chunks are not aligned so that chunks are staggered for compression jobs.
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_staggered_chunk_interval(chunk_interval INTERVAL)
RETURNS INTERVAL
AS $func$
SELECT chunk_interval * (1.0+((random()*0.01)-0.005));
$func$
LANGUAGE SQL VOLATILE;
--only used for setting chunk interval, and admin function
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_staggered_chunk_interval(INTERVAL) TO prom_admin;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.lock_metric_for_maintenance(metric_id int, wait boolean = true)
RETURNS BOOLEAN
AS $func$
DECLARE
res BOOLEAN;
BEGIN
IF NOT wait THEN
SELECT pg_try_advisory_lock(ADVISORY_LOCK_PREFIX_MAINTENACE, metric_id) INTO STRICT res;
RETURN res;
ELSE
PERFORM pg_advisory_lock(ADVISORY_LOCK_PREFIX_MAINTENACE, metric_id);
RETURN TRUE;
END IF;
END
$func$
LANGUAGE PLPGSQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.lock_metric_for_maintenance(int, boolean) TO prom_maintenance;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.unlock_metric_for_maintenance(metric_id int)
RETURNS VOID
AS $func$
DECLARE
BEGIN
PERFORM pg_advisory_unlock(ADVISORY_LOCK_PREFIX_MAINTENACE, metric_id);
END
$func$
LANGUAGE PLPGSQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.unlock_metric_for_maintenance(int) TO prom_maintenance;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.attach_series_partition(metric_record SCHEMA_CATALOG.metric) RETURNS VOID
AS $proc$
DECLARE
BEGIN
EXECUTE format($$
ALTER TABLE SCHEMA_CATALOG.series ATTACH PARTITION SCHEMA_DATA_SERIES.%1$I FOR VALUES IN (%2$L)
$$, metric_record.table_name, metric_record.id);
END;
$proc$
LANGUAGE PLPGSQL
SECURITY DEFINER
--search path must be set for security definer
SET search_path = pg_temp;
--redundant given schema settings but extra caution for security definers
REVOKE ALL ON FUNCTION SCHEMA_CATALOG.attach_series_partition(SCHEMA_CATALOG.metric) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.attach_series_partition(SCHEMA_CATALOG.metric) TO prom_writer;
--Canonical lock ordering:
--metrics
--data table
--labels
--series parent
--series partition
--constraints:
--- The prom_metric view takes locks in the order: data table, series partition.
--This procedure finalizes the creation of a metric. The first part of
--metric creation happens in make_metric_table and the final part happens here.
--We split metric creation into two parts to minimize latency during insertion
--(which happens in the make_metric_table path). Especially noteworthy is that
--attaching the partition to the series table happens here because it requires
--an exclusive lock, which is a high-latency operation. The other actions this
--function does are not as critical latency-wise but are also not necessary
--to perform in order to insert data and thus are put here.
--
--Note: that a consequence of this design is that the series partition is attached
--to the series parent after in this step. Thus a metric might not be seen in some
--cross-metric queries right away. Those queries aren't common however and the delay
--is insignificant in practice.
--
--lock-order: metric table, data_table, series parent, series partition
CREATE OR REPLACE PROCEDURE SCHEMA_CATALOG.finalize_metric_creation()
AS $proc$
DECLARE
r SCHEMA_CATALOG.metric;
created boolean;
BEGIN
FOR r IN
SELECT *
FROM SCHEMA_CATALOG.metric
WHERE NOT creation_completed
ORDER BY random()
LOOP
SELECT creation_completed
INTO created
FROM SCHEMA_CATALOG.metric m
WHERE m.id = r.id
FOR UPDATE;
IF created THEN
--release row lock
COMMIT;
CONTINUE;
END IF;
--do this before taking exclusive lock to minimize work after taking lock
UPDATE SCHEMA_CATALOG.metric SET creation_completed = TRUE WHERE id = r.id;
--we will need this lock for attaching the partition so take it now
--This may not be strictly necessary but good
--to enforce lock ordering (parent->child) explicitly. Note:
--creating a table as a partition takes a stronger lock (access exclusive)
--so, attaching a partition is better
LOCK TABLE ONLY SCHEMA_CATALOG.series IN SHARE UPDATE EXCLUSIVE mode;
PERFORM SCHEMA_CATALOG.attach_series_partition(r);
COMMIT;
END LOOP;
END;
$proc$ LANGUAGE PLPGSQL;
COMMENT ON PROCEDURE SCHEMA_CATALOG.finalize_metric_creation()
IS 'Finalizes metric creation. This procedure should be run by the connector automatically';
GRANT EXECUTE ON PROCEDURE SCHEMA_CATALOG.finalize_metric_creation() TO prom_writer;
--This function is called by a trigger when a new metric is created. It
--sets up the metric just enough to insert data into it. Metric creation
--is completed in finalize_metric_creation() above. See the comments
--on that function for the reasoning for this split design.
--
--Note: latency-sensitive function. Should only contain just enough logic
--to support inserts for the metric.
--lock-order: data table, labels, series partition.
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.make_metric_table()
RETURNS trigger
AS $func$
DECLARE
label_id INT;
compressed_hypertable_name text;
BEGIN
EXECUTE format('CREATE TABLE SCHEMA_DATA.%I(time TIMESTAMPTZ NOT NULL, value DOUBLE PRECISION NOT NULL, series_id BIGINT NOT NULL) WITH (autovacuum_vacuum_threshold = 50000, autovacuum_analyze_threshold = 50000)',
NEW.table_name);
EXECUTE format('GRANT SELECT ON TABLE SCHEMA_DATA.%I TO prom_reader', NEW.table_name);
EXECUTE format('GRANT SELECT, INSERT ON TABLE SCHEMA_DATA.%I TO prom_writer', NEW.table_name);
EXECUTE format('GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE SCHEMA_DATA.%I TO prom_modifier', NEW.table_name);
EXECUTE format('CREATE UNIQUE INDEX data_series_id_time_%s ON SCHEMA_DATA.%I (series_id, time) INCLUDE (value)',
NEW.id, NEW.table_name);
IF SCHEMA_CATALOG.is_timescaledb_installed() THEN
IF SCHEMA_CATALOG.is_multinode() THEN
--Note: we intentionally do not partition by series_id here. The assumption is
--that we'll have more "heavy metrics" than nodes and thus partitioning /individual/
--metrics won't gain us much for inserts and would be detrimental for many queries.
PERFORM SCHEMA_TIMESCALE.create_distributed_hypertable(
format('SCHEMA_DATA.%I', NEW.table_name),
'time',
chunk_time_interval=>SCHEMA_CATALOG.get_staggered_chunk_interval(SCHEMA_CATALOG.get_default_chunk_interval()),
create_default_indexes=>false
);
ELSE
PERFORM SCHEMA_TIMESCALE.create_hypertable(format('SCHEMA_DATA.%I', NEW.table_name), 'time',
chunk_time_interval=>SCHEMA_CATALOG.get_staggered_chunk_interval(SCHEMA_CATALOG.get_default_chunk_interval()),
create_default_indexes=>false);
END IF;
END IF;
--Do not move this into the finalize step, because it's cheap to do while the table is empty
--but takes a heavyweight blocking lock otherwise.
IF SCHEMA_CATALOG.is_timescaledb_installed()
AND SCHEMA_CATALOG.get_default_compression_setting() THEN
PERFORM SCHEMA_PROM.set_compression_on_metric_table(NEW.table_name, TRUE);
END IF;
SELECT SCHEMA_CATALOG.get_or_create_label_id('__name__', NEW.metric_name)
INTO STRICT label_id;
--note that because labels[1] is unique across partitions and UNIQUE(labels) inside partition, labels are guaranteed globally unique
EXECUTE format($$
CREATE TABLE SCHEMA_DATA_SERIES.%1$I (
id bigint NOT NULL,
metric_id int NOT NULL,
labels SCHEMA_PROM.label_array NOT NULL,
delete_epoch BIGINT NULL DEFAULT NULL,
CHECK(labels[1] = %2$L AND labels[1] IS NOT NULL),
CHECK(metric_id = %3$L),
CONSTRAINT series_labels_id_%3$s UNIQUE(labels) INCLUDE (id),
CONSTRAINT series_pkey_%3$s PRIMARY KEY(id)
) WITH (autovacuum_vacuum_threshold = 100, autovacuum_analyze_threshold = 100)
$$, NEW.table_name, label_id, NEW.id);
EXECUTE format('GRANT SELECT ON TABLE SCHEMA_DATA_SERIES.%I TO prom_reader', NEW.table_name);
EXECUTE format('GRANT SELECT, INSERT ON TABLE SCHEMA_DATA_SERIES.%I TO prom_writer', NEW.table_name);
EXECUTE format('GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE SCHEMA_DATA_SERIES.%I TO prom_modifier', NEW.table_name);
RETURN NEW;
END
$func$
LANGUAGE PLPGSQL VOLATILE
SECURITY DEFINER
--search path must be set for security definer
SET search_path = pg_temp;
--redundant given schema settings but extra caution for security definers
REVOKE ALL ON FUNCTION SCHEMA_CATALOG.make_metric_table() FROM PUBLIC;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.make_metric_table() TO prom_writer;
DROP TRIGGER IF EXISTS make_metric_table_trigger ON SCHEMA_CATALOG.metric CASCADE;
CREATE TRIGGER make_metric_table_trigger
AFTER INSERT ON SCHEMA_CATALOG.metric
FOR EACH ROW
EXECUTE PROCEDURE SCHEMA_CATALOG.make_metric_table();
------------------------
-- Internal functions --
------------------------
-- Return a table name built from a full_name and a suffix.
-- The full name is truncated so that the suffix could fit in full.
-- name size will always be exactly 62 chars.
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.pg_name_with_suffix(
full_name text, suffix text)
RETURNS name
AS $func$
SELECT (substring(full_name for 62-(char_length(suffix)+1)) || '_' || suffix)::name
$func$
LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.pg_name_with_suffix(text, text) TO prom_reader;
-- Return a new unique name from a name and id.
-- This tries to use the full_name in full. But if the
-- full name doesn't fit, generates a new unique name.
-- Note that there cannot be a collision betweeen a user
-- defined name and a name with a suffix because user
-- defined names of length 62 always get a suffix and
-- conversely, all names with a suffix are length 62.
-- We use a max name length of 62 not 63 because table creation creates an
-- array type named `_tablename`. We need to ensure that this name is
-- unique as well, so have to reserve a space for the underscore.
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.pg_name_unique(
full_name_arg text, suffix text)
RETURNS name
AS $func$
SELECT CASE
WHEN char_length(full_name_arg) < 62 THEN
full_name_arg::name
ELSE
SCHEMA_CATALOG.pg_name_with_suffix(
full_name_arg, suffix
)
END
$func$
LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.pg_name_unique(text, text) TO prom_reader;
--Creates a new table for a given metric name.
--This uses up some sequences so should only be called
--If the table does not yet exist.
--The function inserts into the metric catalog table,
-- which causes the make_metric_table trigger to fire,
-- which actually creates the table
-- locks: metric, make_metric_table[data table, labels, series partition]
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.create_metric_table(
metric_name_arg text, OUT id int, OUT table_name name)
AS $func$
DECLARE
new_id int;
BEGIN
new_id = nextval(pg_get_serial_sequence('SCHEMA_CATALOG.metric','id'))::int;
LOOP
INSERT INTO SCHEMA_CATALOG.metric (id, metric_name, table_name)
SELECT new_id,
metric_name_arg,
SCHEMA_CATALOG.pg_name_unique(metric_name_arg, new_id::text)
ON CONFLICT DO NOTHING
RETURNING SCHEMA_CATALOG.metric.id, SCHEMA_CATALOG.metric.table_name
INTO id, table_name;
-- under high concurrency the insert may not return anything, so try a select and loop
-- https://stackoverflow.com/a/15950324
EXIT WHEN FOUND;
SELECT m.id, m.table_name
INTO id, table_name
FROM SCHEMA_CATALOG.metric m
WHERE metric_name = metric_name_arg;
EXIT WHEN FOUND;
END LOOP;
END
$func$
LANGUAGE PLPGSQL VOLATILE ;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.create_metric_table(text) TO prom_writer;
--Creates a new label_key row for a given key.
--This uses up some sequences so should only be called
--If the table does not yet exist.
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.create_label_key(
new_key TEXT, OUT id INT, OUT value_column_name NAME, OUT id_column_name NAME
)
AS $func$
DECLARE
new_id int;
BEGIN
new_id = nextval(pg_get_serial_sequence('SCHEMA_CATALOG.label_key','id'))::int;
LOOP
INSERT INTO SCHEMA_CATALOG.label_key (id, key, value_column_name, id_column_name)
SELECT new_id,
new_key,
SCHEMA_CATALOG.pg_name_unique(new_key, new_id::text),
SCHEMA_CATALOG.pg_name_unique(new_key || '_id', format('%s_id', new_id))
ON CONFLICT DO NOTHING
RETURNING SCHEMA_CATALOG.label_key.id, SCHEMA_CATALOG.label_key.value_column_name, SCHEMA_CATALOG.label_key.id_column_name
INTO id, value_column_name, id_column_name;
-- under high concurrency the insert may not return anything, so try a select and loop
-- https://stackoverflow.com/a/15950324
EXIT WHEN FOUND;
SELECT lk.id, lk.value_column_name, lk.id_column_name
INTO id, value_column_name, id_column_name
FROM SCHEMA_CATALOG.label_key lk
WHERE key = new_key;
EXIT WHEN FOUND;
END LOOP;
END
$func$
LANGUAGE PLPGSQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.create_label_key(TEXT) TO prom_writer;
--Get a label key row if one doesn't yet exist.
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_label_key(
key TEXT, OUT id INT, OUT value_column_name NAME, OUT id_column_name NAME)
AS $func$
SELECT id, value_column_name, id_column_name
FROM SCHEMA_CATALOG.label_key lk
WHERE lk.key = get_or_create_label_key.key
UNION ALL
SELECT *
FROM SCHEMA_CATALOG.create_label_key(get_or_create_label_key.key)
LIMIT 1
$func$
LANGUAGE SQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_label_key(TEXT) to prom_writer;
-- Get a new label array position for a label key. For any metric,
-- we want the positions to be as compact as possible.
-- This uses some pretty heavy locks so use sparingly.
-- locks: label_key_position, data table, series partition (in view creation),
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_new_pos_for_key(
metric_name text, key_name_array text[])
RETURNS int[]
AS $func$
DECLARE
position int;
position_array int[];
position_array_idx int;
count_new int;
key_name text;
next_position int;
max_position int;
metric_table NAME;
BEGIN
--use double check locking here
--fist optimistic check:
SELECT
array_agg(lkp.pos ORDER BY k.ord)
FROM
unnest(key_name_array) WITH ORDINALITY as k(key, ord)
INNER JOIN SCHEMA_CATALOG.label_key_position lkp ON
(
lkp.metric_name = get_new_pos_for_key.metric_name
AND lkp.key = k.key
)
INTO position_array;
IF array_length(key_name_array, 1) = array_length(position_array, 1) THEN
RETURN position_array;
END IF;
SELECT table_name
FROM SCHEMA_CATALOG.get_or_create_metric_table_name(get_new_pos_for_key.metric_name)
INTO metric_table;
--lock as for ALTER TABLE because we are in effect changing the schema here
--also makes sure the next_position below is correct in terms of concurrency
EXECUTE format('LOCK TABLE SCHEMA_DATA_SERIES.%I IN SHARE UPDATE EXCLUSIVE MODE', metric_table);
SELECT
max(pos) + 1
FROM
SCHEMA_CATALOG.label_key_position lkp
WHERE
lkp.metric_name = get_new_pos_for_key.metric_name
INTO max_position;
IF max_position IS NULL THEN
max_position := 2; -- element 1 reserved for __name__
END IF;
position_array := array[]::int[];
position_array_idx := 1;
count_new := 0;
FOREACH key_name IN ARRAY key_name_array LOOP
--second check after lock
SELECT
pos
FROM
SCHEMA_CATALOG.label_key_position lkp
WHERE
lkp.metric_name = get_new_pos_for_key.metric_name
AND lkp.key = key_name
INTO position;
IF FOUND THEN
position_array[position_array_idx] := position;
position_array_idx := position_array_idx + 1;
CONTINUE;
END IF;
count_new := count_new + 1;
IF key_name = '__name__' THEN
next_position := 1; -- 1-indexed arrays, __name__ as first element
ELSE
next_position := max_position;
max_position := max_position + 1;
END IF;
PERFORM SCHEMA_CATALOG.get_or_create_label_key(key_name);
INSERT INTO SCHEMA_CATALOG.label_key_position
VALUES (metric_name, key_name, next_position)
ON CONFLICT
DO NOTHING
RETURNING
pos INTO position;
IF NOT FOUND THEN
RAISE 'Could not find a new position';
END IF;
position_array[position_array_idx] := position;
position_array_idx := position_array_idx + 1;
END LOOP;
IF count_new > 0 THEN
--note these functions are expensive in practice so they
--must be run once across a collection of keys
PERFORM SCHEMA_CATALOG.create_series_view(metric_name);
PERFORM SCHEMA_CATALOG.create_metric_view(metric_name);
END IF;
RETURN position_array;
END
$func$
LANGUAGE PLPGSQL
--security definer needed to lock the series table
SECURITY DEFINER
--search path must be set for security definer
SET search_path = pg_temp;
--redundant given schema settings but extra caution for security definers
REVOKE ALL ON FUNCTION SCHEMA_CATALOG.get_new_pos_for_key(text, text[]) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_new_pos_for_key(text, text[]) TO prom_writer;
--should only be called after a check that that the label doesn't exist
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_new_label_id(key_name text, value_name text, OUT id INT)
AS $func$
BEGIN
LOOP
INSERT INTO
SCHEMA_CATALOG.label(key, value)
VALUES
(key_name,value_name)
ON CONFLICT DO NOTHING
RETURNING SCHEMA_CATALOG.label.id
INTO id;
EXIT WHEN FOUND;
SELECT
l.id
INTO id
FROM SCHEMA_CATALOG.label l
WHERE
key = key_name AND
value = value_name;
EXIT WHEN FOUND;
END LOOP;
END
$func$
LANGUAGE PLPGSQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_new_label_id(text, text) to prom_writer;
--wrapper around jsonb_each_text to give a better row_estimate
--for labels (10 not 100)
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.label_jsonb_each_text(js jsonb, OUT key text, OUT value text)
RETURNS SETOF record
LANGUAGE SQL
IMMUTABLE PARALLEL SAFE STRICT ROWS 10
AS $function$ SELECT (jsonb_each_text(js)).* $function$;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.label_jsonb_each_text(jsonb) to prom_reader;
--wrapper around unnest to give better row estimate (10 not 100)
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.label_unnest(label_array anyarray)
RETURNS SETOF anyelement
LANGUAGE SQL
IMMUTABLE PARALLEL SAFE STRICT ROWS 10
AS $function$ SELECT unnest(label_array) $function$;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.label_unnest(anyarray) to prom_reader;
-- safe_approximate_row_count returns the approximate row count of a hypertable if timescaledb is installed
-- else returns the approximate row count in the normal table. This prevents errors in approximate count calculation
-- if timescaledb is not installed, which is the case in plain postgres support.
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.safe_approximate_row_count(table_name_input REGCLASS) RETURNS BIGINT
LANGUAGE PLPGSQL
AS
$$
BEGIN
IF SCHEMA_CATALOG.get_timescale_major_version() >= 2 THEN
RETURN (SELECT * FROM approximate_row_count(table_name_input));
ELSE
IF SCHEMA_CATALOG.is_timescaledb_installed()
AND (SELECT count(*) > 0
FROM _timescaledb_catalog.hypertable
WHERE format('%I.%I', schema_name, table_name)::regclass=table_name_input)
THEN
RETURN (SELECT row_estimate FROM hypertable_approximate_row_count(table_name_input));
END IF;
RETURN (SELECT reltuples::BIGINT FROM pg_class WHERE oid=table_name_input);
END IF;
END;
$$;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.safe_approximate_row_count(regclass) to prom_reader;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.delete_series_catalog_row(
metric_table name,
series_ids bigint[]
) RETURNS VOID AS
$$
BEGIN
EXECUTE FORMAT(
'UPDATE SCHEMA_DATA_SERIES.%1$I SET delete_epoch = current_epoch+1 FROM SCHEMA_CATALOG.ids_epoch WHERE delete_epoch IS NULL AND id = ANY($1)',
metric_table
) USING series_ids;
RETURN;
END;
$$
LANGUAGE PLPGSQL;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.delete_series_catalog_row(name, bigint[]) to prom_modifier;
---------------------------------------------------
------------------- Public APIs -------------------
---------------------------------------------------
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_metric_table_name_if_exists(
metric_name text)
RETURNS TABLE (id int, table_name name)
AS $func$
SELECT id, table_name::name
FROM SCHEMA_CATALOG.metric m
WHERE m.metric_name = get_metric_table_name_if_exists.metric_name
$func$
LANGUAGE SQL STABLE PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_metric_table_name_if_exists(text) to prom_reader;
-- Public function to get the name of the table for a given metric
-- This will create the metric table if it does not yet exist.
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_metric_table_name(
metric_name text, OUT id int, OUT table_name name, OUT possibly_new BOOLEAN)
AS $func$
SELECT id, table_name::name, false
FROM SCHEMA_CATALOG.metric m
WHERE m.metric_name = get_or_create_metric_table_name.metric_name
UNION ALL
SELECT *, true
FROM SCHEMA_CATALOG.create_metric_table(get_or_create_metric_table_name.metric_name)
LIMIT 1
$func$
LANGUAGE SQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_metric_table_name(text) to prom_writer;
--public function to get the array position for a label key
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_label_key_pos(
metric_name text, key text)
RETURNS INT
AS $$
--only executes the more expensive PLPGSQL function if the label doesn't exist
SELECT
pos
FROM
SCHEMA_CATALOG.label_key_position lkp
WHERE
lkp.metric_name = get_or_create_label_key_pos.metric_name
AND lkp.key = get_or_create_label_key_pos.key
UNION ALL
SELECT
(SCHEMA_CATALOG.get_new_pos_for_key(get_or_create_label_key_pos.metric_name, array[get_or_create_label_key_pos.key]))[1]
LIMIT 1
$$
LANGUAGE SQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_label_key_pos(text, text) to prom_writer;
-- label_cardinality returns the cardinality of a label_pair id in the series table.
-- In simple terms, it means the number of times a label_pair/label_matcher is used
-- across all the series.
CREATE OR REPLACE FUNCTION SCHEMA_PROM.label_cardinality(label_id INT)
RETURNS INT
LANGUAGE SQL
AS
$$
SELECT count(*)::INT FROM SCHEMA_CATALOG.series s WHERE s.labels @> array[label_id];
$$ STABLE PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.label_cardinality(int) to prom_reader;
--public function to get the array position for a label key if it exists
--useful in case users want to group by a specific label key
CREATE OR REPLACE FUNCTION SCHEMA_PROM.label_key_position(
metric_name text, key text)
RETURNS INT
AS $$
SELECT
pos
FROM
SCHEMA_CATALOG.label_key_position lkp
WHERE
lkp.metric_name = label_key_position.metric_name
AND lkp.key = label_key_position.key
LIMIT 1
$$
LANGUAGE SQL STABLE PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.label_key_position(text, text) to prom_reader;
-- drop_metric deletes a metric and related series hypertable from the database along with the related series, views and unreferenced labels.
CREATE OR REPLACE FUNCTION SCHEMA_PROM.drop_metric(metric_name_to_be_dropped text) RETURNS VOID
AS
$$
DECLARE
hypertable_name TEXT;
deletable_metric_id INTEGER;
BEGIN
IF (SELECT NOT pg_try_advisory_xact_lock(SCHEMA_LOCK_ID)) THEN
RAISE NOTICE 'drop_metric can run only when no Promscale connectors are running. Please shutdown the Promscale connectors';
PERFORM pg_advisory_xact_lock(SCHEMA_LOCK_ID);
END IF;
SELECT table_name, id INTO hypertable_name, deletable_metric_id FROM SCHEMA_CATALOG.metric WHERE metric_name=metric_name_to_be_dropped;
RAISE NOTICE 'deleting "%" metric with metric_id as "%" and table_name as "%"', metric_name_to_be_dropped, deletable_metric_id, hypertable_name;
EXECUTE FORMAT('DROP VIEW SCHEMA_SERIES.%1$I;', hypertable_name);
EXECUTE FORMAT('DROP VIEW SCHEMA_METRIC.%1$I;', hypertable_name);
EXECUTE FORMAT('DROP TABLE SCHEMA_DATA_SERIES.%1$I;', hypertable_name);
EXECUTE FORMAT('DROP TABLE SCHEMA_DATA.%1$I;', hypertable_name);
DELETE FROM SCHEMA_CATALOG.metric WHERE id=deletable_metric_id;
-- clean up unreferenced labels, label_keys and its position.
DELETE FROM SCHEMA_CATALOG.label_key_position WHERE metric_name=metric_name_to_be_dropped;
DELETE FROM SCHEMA_CATALOG.label_key WHERE key NOT IN (select key from SCHEMA_CATALOG.label_key_position);
END;
$$
LANGUAGE plpgsql;
--Get the label_id for a key, value pair
-- no need for a get function only as users will not be using ids directly
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_label_id(
key_name text, value_name text)
RETURNS INT
AS $$
--first select to prevent sequence from being used up
--unnecessarily
SELECT
id
FROM SCHEMA_CATALOG.label
WHERE
key = key_name AND
value = value_name
UNION ALL
SELECT
SCHEMA_CATALOG.get_new_label_id(key_name, value_name)
LIMIT 1
$$
LANGUAGE SQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_label_id(text, text) to prom_writer;
--This generates a position based array from the jsonb
--0s represent keys that are not set (we don't use NULL
--since intarray does not support it).
--This is not super performance critical since this
--is only used on the insert client and is cached there.
--Read queries can use the eq function or others with the jsonb to find equality
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_label_array(js jsonb)
RETURNS SCHEMA_PROM.label_array AS $$
WITH idx_val AS (
SELECT
-- only call the functions to create new key positions
-- and label ids if they don't exist (for performance reasons)
coalesce(lkp.pos,
SCHEMA_CATALOG.get_or_create_label_key_pos(js->>'__name__', e.key)) idx,
coalesce(l.id,
SCHEMA_CATALOG.get_or_create_label_id(e.key, e.value)) val
FROM label_jsonb_each_text(js) e
LEFT JOIN SCHEMA_CATALOG.label l
ON (l.key = e.key AND l.value = e.value)
LEFT JOIN SCHEMA_CATALOG.label_key_position lkp
ON
(
lkp.metric_name = js->>'__name__' AND
lkp.key = e.key
)
--needs to order by key to prevent deadlocks if get_or_create_label_id is creating labels
ORDER BY l.key
)
SELECT ARRAY(
SELECT coalesce(idx_val.val, 0)
FROM
generate_series(
1,
(SELECT max(idx) FROM idx_val)
) g
LEFT JOIN idx_val ON (idx_val.idx = g)
)::SCHEMA_PROM.label_array
$$
LANGUAGE SQL VOLATILE;
COMMENT ON FUNCTION SCHEMA_CATALOG.get_or_create_label_array(jsonb)
IS 'converts a jsonb to a label array';
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_label_array(jsonb) TO prom_writer;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_label_array(metric_name TEXT, label_keys text[], label_values text[])
RETURNS SCHEMA_PROM.label_array AS $$
WITH idx_val AS (
SELECT
-- only call the functions to create new key positions
-- and label ids if they don't exist (for performance reasons)
coalesce(lkp.pos,
SCHEMA_CATALOG.get_or_create_label_key_pos(get_or_create_label_array.metric_name, kv.key)) idx,
coalesce(l.id,
SCHEMA_CATALOG.get_or_create_label_id(kv.key, kv.value)) val
FROM ROWS FROM(unnest(label_keys), UNNEST(label_values)) AS kv(key, value)
LEFT JOIN SCHEMA_CATALOG.label l
ON (l.key = kv.key AND l.value = kv.value)
LEFT JOIN SCHEMA_CATALOG.label_key_position lkp
ON
(
lkp.metric_name = get_or_create_label_array.metric_name AND
lkp.key = kv.key
)
ORDER BY kv.key
)
SELECT ARRAY(
SELECT coalesce(idx_val.val, 0)
FROM
generate_series(
1,
(SELECT max(idx) FROM idx_val)
) g
LEFT JOIN idx_val ON (idx_val.idx = g)
)::SCHEMA_PROM.label_array
$$
LANGUAGE SQL VOLATILE;
COMMENT ON FUNCTION SCHEMA_CATALOG.get_or_create_label_array(text, text[], text[])
IS 'converts a metric name, array of keys, and array of values to a label array';
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_label_array(TEXT, text[], text[]) TO prom_writer;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_label_ids(metric_name TEXT, label_keys text[], label_values text[])
RETURNS TABLE(pos int[], id int[], label_key text[], label_value text[]) AS $$
WITH cte as (
SELECT
-- only call the functions to create new key positions
-- and label ids if they don't exist (for performance reasons)
lkp.pos as known_pos,
coalesce(l.id, SCHEMA_CATALOG.get_or_create_label_id(kv.key, kv.value)) label_id,
kv.key key_str,
kv.value val_str
FROM ROWS FROM(unnest(label_keys), UNNEST(label_values)) AS kv(key, value)
LEFT JOIN SCHEMA_CATALOG.label l
ON (l.key = kv.key AND l.value = kv.value)
LEFT JOIN SCHEMA_CATALOG.label_key_position lkp ON
(
lkp.metric_name = get_or_create_label_ids.metric_name AND
lkp.key = kv.key
)
ORDER BY kv.key, kv.value
)
SELECT
case when count(*) = count(known_pos) Then
array_agg(known_pos)
else
SCHEMA_CATALOG.get_new_pos_for_key(get_or_create_label_ids.metric_name, array_agg(key_str))
end as poss,
array_agg(label_id) as label_ids,
array_agg(key_str) as keys,
array_agg(val_str) as vals
FROM cte
$$
LANGUAGE SQL VOLATILE;
COMMENT ON FUNCTION SCHEMA_CATALOG.get_or_create_label_ids(text, text[], text[])
IS 'converts a metric name, array of keys, and array of values to a list of label ids';
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_label_ids(TEXT, text[], text[]) TO prom_writer;
-- Returns ids, keys and values for a label_array
-- the order may not be the same as the original labels
-- This function needs to be optimized for performance
CREATE OR REPLACE FUNCTION SCHEMA_PROM.labels_info(INOUT labels INT[], OUT keys text[], OUT vals text[])
AS $$
SELECT
array_agg(l.id), array_agg(l.key), array_agg(l.value)
FROM
label_unnest(labels) label_id
INNER JOIN SCHEMA_CATALOG.label l ON (l.id = label_id)
$$
LANGUAGE SQL STABLE PARALLEL SAFE;
COMMENT ON FUNCTION SCHEMA_PROM.labels_info(INT[])
IS 'converts an array of label ids to three arrays: one for ids, one for keys and another for values';
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.labels_info(INT[]) TO prom_reader;
CREATE OR REPLACE FUNCTION SCHEMA_PROM.key_value_array(labels SCHEMA_PROM.label_array, OUT keys text[], OUT vals text[])
AS $$
SELECT keys, vals FROM SCHEMA_PROM.labels_info(labels)
$$
LANGUAGE SQL STABLE PARALLEL SAFE;
COMMENT ON FUNCTION SCHEMA_PROM.key_value_array(SCHEMA_PROM.label_array)
IS 'converts a labels array to two arrays: one for keys and another for values';
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.key_value_array(SCHEMA_PROM.label_array) TO prom_reader;
--Returns the jsonb for a series defined by a label_array
CREATE OR REPLACE FUNCTION SCHEMA_PROM.jsonb(labels SCHEMA_PROM.label_array)
RETURNS jsonb AS $$
SELECT
jsonb_object(keys, vals)
FROM
SCHEMA_PROM.key_value_array(labels)
$$
LANGUAGE SQL STABLE PARALLEL SAFE;
COMMENT ON FUNCTION SCHEMA_PROM.jsonb(labels SCHEMA_PROM.label_array)
IS 'converts a labels array to a JSONB object';
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.jsonb(SCHEMA_PROM.label_array) TO prom_reader;
--Returns the label_array given a series_id
CREATE OR REPLACE FUNCTION SCHEMA_PROM.labels(series_id BIGINT)
RETURNS SCHEMA_PROM.label_array AS $$
SELECT
labels
FROM
SCHEMA_CATALOG.series
WHERE id = series_id
$$
LANGUAGE SQL STABLE PARALLEL SAFE;
COMMENT ON FUNCTION SCHEMA_PROM.labels(series_id BIGINT)
IS 'fetches labels array for the given series id';
GRANT EXECUTE ON FUNCTION SCHEMA_PROM.labels(series_id BIGINT) TO prom_reader;
--Do not call before checking that the series does not yet exist
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.create_series(
metric_id int,
metric_table_name NAME,
label_array SCHEMA_PROM.label_array,
OUT series_id BIGINT)
AS $func$
DECLARE
new_series_id bigint;
BEGIN
new_series_id = nextval('SCHEMA_CATALOG.series_id');
LOOP
EXECUTE format ($$
INSERT INTO SCHEMA_DATA_SERIES.%I(id, metric_id, labels)
SELECT $1, $2, $3
ON CONFLICT DO NOTHING
RETURNING id
$$, metric_table_name)
INTO series_id
USING new_series_id, metric_id, label_array;
EXIT WHEN series_id is not null;
EXECUTE format($$
SELECT id
FROM SCHEMA_DATA_SERIES.%I
WHERE labels = $1
$$, metric_table_name)
INTO series_id
USING label_array;
EXIT WHEN series_id is not null;
END LOOP;
END
$func$
LANGUAGE PLPGSQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.create_series(int, name, SCHEMA_PROM.label_array) TO prom_writer;
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.resurrect_series_ids(metric_table name, series_id bigint)
RETURNS VOID
AS $func$
BEGIN
EXECUTE FORMAT($query$
UPDATE SCHEMA_DATA_SERIES.%1$I
SET delete_epoch = NULL
WHERE id = $1
$query$, metric_table) using series_id;
END
$func$
LANGUAGE PLPGSQL VOLATILE
--security definer to add jobs as the logged-in user
SECURITY DEFINER
--search path must be set for security definer
SET search_path = pg_temp;
--redundant given schema settings but extra caution for security definers
REVOKE ALL ON FUNCTION SCHEMA_CATALOG.resurrect_series_ids(name, bigint) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.resurrect_series_ids(name, bigint) TO prom_writer;
-- There shouldn't be a need to have a read only version of this as we'll use
-- the eq or other matcher functions to find series ids like this. However,
-- there are possible use cases that need the series id directly for performance
-- that we might want to see if we need to support, in which case a
-- read only version might be useful in future.
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_series_id(label jsonb)
RETURNS BIGINT AS $$
DECLARE
series_id bigint;
table_name name;
metric_id int;
BEGIN
--See get_or_create_series_id_for_kv_array for notes about locking
SELECT mtn.id, mtn.table_name FROM SCHEMA_CATALOG.get_or_create_metric_table_name(label->>'__name__') mtn
INTO metric_id, table_name;
LOCK TABLE ONLY SCHEMA_CATALOG.series in ACCESS SHARE mode;
EXECUTE format($query$
WITH cte AS (
SELECT SCHEMA_CATALOG.get_or_create_label_array($1)
), existing AS (
SELECT
id,
CASE WHEN delete_epoch IS NOT NULL THEN
SCHEMA_CATALOG.resurrect_series_ids(%1$L, id)
END
FROM SCHEMA_DATA_SERIES.%1$I as series
WHERE labels = (SELECT * FROM cte)
)
SELECT id FROM existing
UNION ALL
SELECT SCHEMA_CATALOG.create_series(%2$L, %1$L, (SELECT * FROM cte))