Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Make some functions private
Browse files Browse the repository at this point in the history
Move some functions into the private schema
  • Loading branch information
cevian committed Apr 28, 2020
1 parent 9a78cbd commit f511883
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 54 deletions.
16 changes: 8 additions & 8 deletions pkg/pgmodel/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestSQLGetOrCreateMetricTableName(t *testing.T) {
metricName := "test_metric_1"
var metricID int
var tableName string
err := db.QueryRow(context.Background(), "SELECT * FROM prom.get_or_create_metric_table_name(metric_name => $1)", metricName).Scan(&metricID, &tableName)
err := db.QueryRow(context.Background(), "SELECT * FROM _prom_catalog.get_or_create_metric_table_name(metric_name => $1)", metricName).Scan(&metricID, &tableName)
if err != nil {
t.Fatal(err)
}
Expand All @@ -169,7 +169,7 @@ func TestSQLGetOrCreateMetricTableName(t *testing.T) {
savedMetricID := metricID

//query for same name should give same result
err = db.QueryRow(context.Background(), "SELECT * FROM prom.get_or_create_metric_table_name($1)", metricName).Scan(&metricID, &tableName)
err = db.QueryRow(context.Background(), "SELECT * FROM _prom_catalog.get_or_create_metric_table_name($1)", metricName).Scan(&metricID, &tableName)
if err != nil {
t.Fatal(err)
}
Expand All @@ -182,7 +182,7 @@ func TestSQLGetOrCreateMetricTableName(t *testing.T) {

//different metric id should give new result
metricName = "test_metric_2"
err = db.QueryRow(context.Background(), "SELECT * FROM prom.get_or_create_metric_table_name($1)", metricName).Scan(&metricID, &tableName)
err = db.QueryRow(context.Background(), "SELECT * FROM _prom_catalog.get_or_create_metric_table_name($1)", metricName).Scan(&metricID, &tableName)
if err != nil {
t.Fatal(err)
}
Expand All @@ -196,7 +196,7 @@ func TestSQLGetOrCreateMetricTableName(t *testing.T) {

//test long names that don't fit as table names
metricName = "test_metric_very_very_long_name_have_to_truncate_it_longer_than_64_chars_1"
err = db.QueryRow(context.Background(), "SELECT * FROM prom.get_or_create_metric_table_name($1)", metricName).Scan(&metricID, &tableName)
err = db.QueryRow(context.Background(), "SELECT * FROM _prom_catalog.get_or_create_metric_table_name($1)", metricName).Scan(&metricID, &tableName)
if err != nil {
t.Fatal(err)
}
Expand All @@ -210,7 +210,7 @@ func TestSQLGetOrCreateMetricTableName(t *testing.T) {
savedMetricID = metricID

//another call return same info
err = db.QueryRow(context.Background(), "SELECT * FROM prom.get_or_create_metric_table_name($1)", metricName).Scan(&metricID, &tableName)
err = db.QueryRow(context.Background(), "SELECT * FROM _prom_catalog.get_or_create_metric_table_name($1)", metricName).Scan(&metricID, &tableName)
if err != nil {
t.Fatal(err)
}
Expand All @@ -223,7 +223,7 @@ func TestSQLGetOrCreateMetricTableName(t *testing.T) {

//changing just ending returns new table
metricName = "test_metric_very_very_long_name_have_to_truncate_it_longer_than_64_chars_2"
err = db.QueryRow(context.Background(), "SELECT * FROM prom.get_or_create_metric_table_name($1)", metricName).Scan(&metricID, &tableName)
err = db.QueryRow(context.Background(), "SELECT * FROM _prom_catalog.get_or_create_metric_table_name($1)", metricName).Scan(&metricID, &tableName)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func verifyRetentionPeriod(t testing.TB, db *pgxpool.Pool, metricName string, ex
var dur time.Duration

err := db.QueryRow(context.Background(),
`SELECT prom.get_metric_retention_period($1)`,
`SELECT _prom_catalog.get_metric_retention_period($1)`,
metricName).Scan(&dur)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -623,7 +623,7 @@ func TestSQLJsonLabelArray(t *testing.T) {
}

var seriesIDKeyVal int
err = db.QueryRow(context.Background(), "SELECT prom.get_series_id_for_key_value_array($1, $2, $3)", metricName, keys, values).Scan(&seriesIDKeyVal)
err = db.QueryRow(context.Background(), "SELECT _prom_catalog.get_series_id_for_key_value_array($1, $2, $3)", metricName, keys, values).Scan(&seriesIDKeyVal)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pgmodel/migrations/migration_files_generated.go

Large diffs are not rendered by default.

58 changes: 29 additions & 29 deletions pkg/pgmodel/migrations/sql/1_base_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ BEGIN
END IF;

SELECT table_name
FROM SCHEMA_PROM.get_or_create_metric_table_name(get_new_pos_for_key.metric_name)
FROM SCHEMA_CATALOG.get_or_create_metric_table_name(get_new_pos_for_key.metric_name)
INTO metric_table;
--lock as for ALTER TABLE because we are in effect changing the schema here
--also makes sure the next_position below is correct in terms of concurrency
Expand Down Expand Up @@ -384,7 +384,7 @@ AS $function$array_unnest$function$;
------------------- Public APIs -------------------
---------------------------------------------------

CREATE OR REPLACE FUNCTION SCHEMA_PROM.get_metric_table_name_if_exists(
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_metric_table_name_if_exists(
metric_name text)
RETURNS TABLE (id int, table_name name)
AS $func$
Expand All @@ -396,7 +396,7 @@ LANGUAGE SQL STABLE PARALLEL SAFE;

-- Public function to get the name of the table for a given metric
-- This will create the metric table if it does not yet exist.
CREATE OR REPLACE FUNCTION SCHEMA_PROM.get_or_create_metric_table_name(
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_metric_table_name(
metric_name text)
RETURNS TABLE (id int, table_name name)
AS $func$
Expand All @@ -411,7 +411,7 @@ $func$
LANGUAGE SQL VOLATILE;

--public function to get the array position for a label key
CREATE OR REPLACE FUNCTION SCHEMA_PROM.get_or_create_label_key_pos(
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_label_key_pos(
metric_name text, key text)
RETURNS INT
AS $$
Expand All @@ -431,7 +431,7 @@ $$
LANGUAGE SQL VOLATILE;

--Get the label_id for a key, value pair
CREATE OR REPLACE FUNCTION SCHEMA_PROM.get_or_create_label_id(
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_label_id(
key_name text, value_name text)
RETURNS INT
AS $$
Expand Down Expand Up @@ -462,9 +462,9 @@ RETURNS SCHEMA_PROM.label_array AS $$
-- only call the functions to create new key positions
-- and label ids if they don't exist (for performance reasons)
coalesce(lkp.pos,
SCHEMA_PROM.get_or_create_label_key_pos(js->>'__name__', e.key)) idx,
SCHEMA_CATALOG.get_or_create_label_key_pos(js->>'__name__', e.key)) idx,
coalesce(l.id,
SCHEMA_PROM.get_or_create_label_id(e.key, e.value)) val
SCHEMA_CATALOG.get_or_create_label_id(e.key, e.value)) val
FROM SCHEMA_CATALOG.label_jsonb_each_text(js) e
LEFT JOIN SCHEMA_CATALOG.label l
ON (l.key = e.key AND l.value = e.value)
Expand Down Expand Up @@ -496,9 +496,9 @@ RETURNS SCHEMA_PROM.label_array AS $$
-- only call the functions to create new key positions
-- and label ids if they don't exist (for performance reasons)
coalesce(lkp.pos,
SCHEMA_PROM.get_or_create_label_key_pos(key_value_array_to_label_array.metric_name, kv.key)) idx,
SCHEMA_CATALOG.get_or_create_label_key_pos(key_value_array_to_label_array.metric_name, kv.key)) idx,
coalesce(l.id,
SCHEMA_PROM.get_or_create_label_id(kv.key, kv.value)) val
SCHEMA_CATALOG.get_or_create_label_id(kv.key, kv.value)) val
FROM ROWS FROM(unnest(label_keys), UNNEST(label_values)) AS kv(key, value)
LEFT JOIN SCHEMA_CATALOG.label l
ON (l.key = kv.key AND l.value = kv.value)
Expand Down Expand Up @@ -580,12 +580,12 @@ RETURNS BIGINT AS $$
FROM SCHEMA_CATALOG.series
WHERE labels = (SELECT * FROM cte)
UNION ALL
SELECT SCHEMA_CATALOG.create_series((SCHEMA_PROM.get_or_create_metric_table_name(label->>'__name__')).id, (SELECT * FROM cte))
SELECT SCHEMA_CATALOG.create_series((SCHEMA_CATALOG.get_or_create_metric_table_name(label->>'__name__')).id, (SELECT * FROM cte))
LIMIT 1
$$
LANGUAGE SQL VOLATILE;

CREATE OR REPLACE FUNCTION SCHEMA_PROM.get_series_id_for_key_value_array(metric_name TEXT, label_keys text[], label_values text[])
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_series_id_for_key_value_array(metric_name TEXT, label_keys text[], label_values text[])
RETURNS BIGINT AS $$
WITH CTE AS (
SELECT SCHEMA_PROM.key_value_array_to_label_array(metric_name, label_keys, label_values)
Expand All @@ -594,7 +594,7 @@ RETURNS BIGINT AS $$
FROM SCHEMA_CATALOG.series
WHERE labels = (SELECT * FROM cte)
UNION ALL
SELECT SCHEMA_CATALOG.create_series((SCHEMA_PROM.get_or_create_metric_table_name(metric_name)).id, (SELECT * FROM cte))
SELECT SCHEMA_CATALOG.create_series((SCHEMA_CATALOG.get_or_create_metric_table_name(metric_name)).id, (SELECT * FROM cte))
LIMIT 1
$$
LANGUAGE SQL VOLATILE;
Expand All @@ -609,7 +609,7 @@ AS $func$
--set interval while adding 1% of randomness to the interval so that chunks are not aligned so that
--chunks are staggered for compression jobs.
SELECT set_chunk_time_interval(
format('SCHEMA_DATA.%I',(SELECT table_name FROM SCHEMA_PROM.get_or_create_metric_table_name(metric_name)))::regclass,
format('SCHEMA_DATA.%I',(SELECT table_name FROM SCHEMA_CATALOG.get_or_create_metric_table_name(metric_name)))::regclass,
new_interval * (1.0+((random()*0.01)-0.005)));
$func$
LANGUAGE SQL VOLATILE;
Expand All @@ -633,10 +633,10 @@ RETURNS BOOLEAN
AS $func$
--use get_or_create_metric_table_name because we want to be able to set /before/ any data is ingested
--needs to run before update so row exists before update.
SELECT SCHEMA_PROM.get_or_create_metric_table_name(set_metric_chunk_interval.metric_name);
SELECT SCHEMA_CATALOG.get_or_create_metric_table_name(set_metric_chunk_interval.metric_name);

UPDATE SCHEMA_CATALOG.metric SET default_chunk_interval = false
WHERE id IN (SELECT id FROM SCHEMA_PROM.get_metric_table_name_if_exists(set_metric_chunk_interval.metric_name));
WHERE id IN (SELECT id FROM SCHEMA_CATALOG.get_metric_table_name_if_exists(set_metric_chunk_interval.metric_name));

SELECT SCHEMA_CATALOG.set_chunk_interval_on_metric_table(metric_name, chunk_interval);

Expand All @@ -648,7 +648,7 @@ CREATE OR REPLACE FUNCTION SCHEMA_PROM.reset_metric_chunk_interval(metric_name T
RETURNS BOOLEAN
AS $func$
UPDATE SCHEMA_CATALOG.metric SET default_chunk_interval = true
WHERE id = (SELECT id FROM SCHEMA_PROM.get_metric_table_name_if_exists(metric_name));
WHERE id = (SELECT id FROM SCHEMA_CATALOG.get_metric_table_name_if_exists(metric_name));

SELECT SCHEMA_CATALOG.set_chunk_interval_on_metric_table(metric_name,
SCHEMA_CATALOG.get_default_chunk_interval());
Expand All @@ -658,12 +658,12 @@ $func$
LANGUAGE SQL VOLATILE;


CREATE OR REPLACE FUNCTION SCHEMA_PROM.get_metric_retention_period(metric_name TEXT)
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_metric_retention_period(metric_name TEXT)
RETURNS INTERVAL
AS $$
SELECT COALESCE(m.retention_period, SCHEMA_CATALOG.get_default_retention_period())
FROM SCHEMA_CATALOG.metric m
WHERE id IN (SELECT id FROM SCHEMA_PROM.get_metric_table_name_if_exists(get_metric_retention_period.metric_name))
WHERE id IN (SELECT id FROM SCHEMA_CATALOG.get_metric_table_name_if_exists(get_metric_retention_period.metric_name))
UNION ALL
SELECT SCHEMA_CATALOG.get_default_retention_period()
LIMIT 1
Expand All @@ -684,10 +684,10 @@ RETURNS BOOLEAN
AS $func$
--use get_or_create_metric_table_name because we want to be able to set /before/ any data is ingested
--needs to run before update so row exists before update.
SELECT SCHEMA_PROM.get_or_create_metric_table_name(set_metric_retention_period.metric_name);
SELECT SCHEMA_CATALOG.get_or_create_metric_table_name(set_metric_retention_period.metric_name);

UPDATE SCHEMA_CATALOG.metric SET retention_period = new_retention_period
WHERE id IN (SELECT id FROM SCHEMA_PROM.get_metric_table_name_if_exists(set_metric_retention_period.metric_name));
WHERE id IN (SELECT id FROM SCHEMA_CATALOG.get_metric_table_name_if_exists(set_metric_retention_period.metric_name));

SELECT true;
$func$
Expand All @@ -697,7 +697,7 @@ CREATE OR REPLACE FUNCTION SCHEMA_PROM.reset_metric_retention_period(metric_name
RETURNS BOOLEAN
AS $func$
UPDATE SCHEMA_CATALOG.metric SET retention_period = NULL
WHERE id = (SELECT id FROM SCHEMA_PROM.get_metric_table_name_if_exists(metric_name));
WHERE id = (SELECT id FROM SCHEMA_CATALOG.get_metric_table_name_if_exists(metric_name));
SELECT true;
$func$
LANGUAGE SQL VOLATILE;
Expand All @@ -716,7 +716,7 @@ DECLARE
BEGIN
SELECT table_name
INTO STRICT metric_table
FROM SCHEMA_PROM.get_or_create_metric_table_name(metric_name);
FROM SCHEMA_CATALOG.get_or_create_metric_table_name(metric_name);

SELECT older_than + INTERVAL '1 hour'
INTO check_time;
Expand Down Expand Up @@ -812,7 +812,7 @@ AS $$
WHERE EXISTS (
SELECT 1 FROM
show_chunks(hypertable=>format('%I.%I', 'SCHEMA_DATA', m.table_name),
older_than=>NOW() - SCHEMA_PROM.get_metric_retention_period(m.metric_name)))
older_than=>NOW() - SCHEMA_CATALOG.get_metric_retention_period(m.metric_name)))
--random order also to prevent starvation
ORDER BY random()
$$
Expand All @@ -837,15 +837,15 @@ BEGIN

CONTINUE WHEN NOT FOUND;

PERFORM SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_PROM.get_metric_retention_period(r.metric_name));
PERFORM SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_CATALOG.get_metric_retention_period(r.metric_name));
COMMIT;
END LOOP;

FOR r IN
SELECT *
FROM SCHEMA_CATALOG.get_metrics_that_need_drop_chunk()
LOOP
PERFORM SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_PROM.get_metric_retention_period(r.metric_name));
PERFORM SCHEMA_CATALOG.drop_metric_chunks(r.metric_name, NOW() - SCHEMA_CATALOG.get_metric_retention_period(r.metric_name));
COMMIT;
END LOOP;
END;
Expand Down Expand Up @@ -1095,7 +1095,7 @@ CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.label_find_key_equal(label_key SCHEMA_
RETURNS SCHEMA_PROM.matcher_positive
AS $func$
SELECT COALESCE(array_agg(l.id), array[]::int[])::SCHEMA_PROM.matcher_positive
FROM _prom_catalog.label l
FROM SCHEMA_CATALOG.label l
WHERE l.key = label_key and l.value = pattern
$func$
LANGUAGE SQL STABLE PARALLEL SAFE;
Expand All @@ -1104,7 +1104,7 @@ CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.label_find_key_not_equal(label_key SCH
RETURNS SCHEMA_PROM.matcher_negative
AS $func$
SELECT COALESCE(array_agg(l.id), array[]::int[])::SCHEMA_PROM.matcher_negative
FROM _prom_catalog.label l
FROM SCHEMA_CATALOG.label l
WHERE l.key = label_key and l.value = pattern
$func$
LANGUAGE SQL STABLE PARALLEL SAFE;
Expand All @@ -1113,7 +1113,7 @@ CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.label_find_key_regex(label_key SCHEMA_
RETURNS SCHEMA_PROM.matcher_positive
AS $func$
SELECT COALESCE(array_agg(l.id), array[]::int[])::SCHEMA_PROM.matcher_positive
FROM _prom_catalog.label l
FROM SCHEMA_CATALOG.label l
WHERE l.key = label_key and l.value ~ pattern
$func$
LANGUAGE SQL STABLE PARALLEL SAFE;
Expand All @@ -1122,7 +1122,7 @@ CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.label_find_key_not_regex(label_key SCH
RETURNS SCHEMA_PROM.matcher_negative
AS $func$
SELECT COALESCE(array_agg(l.id), array[]::int[])::SCHEMA_PROM.matcher_negative
FROM _prom_catalog.label l
FROM SCHEMA_CATALOG.label l
WHERE l.key = label_key and l.value ~ pattern
$func$
LANGUAGE SQL STABLE PARALLEL SAFE;
Expand Down
6 changes: 3 additions & 3 deletions pkg/pgmodel/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ const (
dataSchema = "prom_data"
catalogSchema = "_prom_catalog"

getMetricsTableSQL = "SELECT table_name FROM " + promSchema + ".get_metric_table_name_if_exists($1)"
getCreateMetricsTableSQL = "SELECT table_name FROM " + promSchema + ".get_or_create_metric_table_name($1)"
getSeriesIDForLabelSQL = "SELECT " + promSchema + ".get_series_id_for_key_value_array($1, $2, $3)"
getMetricsTableSQL = "SELECT table_name FROM " + catalogSchema + ".get_metric_table_name_if_exists($1)"
getCreateMetricsTableSQL = "SELECT table_name FROM " + catalogSchema + ".get_or_create_metric_table_name($1)"
getSeriesIDForLabelSQL = "SELECT " + catalogSchema + ".get_series_id_for_key_value_array($1, $2, $3)"
)

var (
Expand Down

0 comments on commit f511883

Please sign in to comment.