Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Place the data tables in prom_data
Browse files Browse the repository at this point in the history
Previously, data tables and public api were both in the prom
schema. But it makes sense to separate those out because
the public api but not the data tables should be in the search
path. So, move the data tables out into a new schema - prom_data.
  • Loading branch information
cevian committed Apr 20, 2020
1 parent 235d56e commit 20bf94f
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 39 deletions.
1 change: 1 addition & 0 deletions pkg/pgmodel/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (t *mySrc) replaceSchemaNames(r io.ReadCloser) (io.ReadCloser, error) {
s = strings.ReplaceAll(s, "SCHEMA_PROM", promSchema)
s = strings.ReplaceAll(s, "SCHEMA_SERIES", seriesViewSchema)
s = strings.ReplaceAll(s, "SCHEMA_METRIC", metricViewSchema)
s = strings.ReplaceAll(s, "SCHEMA_DATA", dataSchema)
r = ioutil.NopCloser(strings.NewReader(s))
return r, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/pgmodel/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ func TestSQLIngest(t *testing.T) {
totalRows := 0
for table := range tables {
var rowsInTable int
err := db.QueryRow(context.Background(), fmt.Sprintf("SELECT count(*) FROM prom.%s", table)).Scan(&rowsInTable)
err := db.QueryRow(context.Background(), fmt.Sprintf("SELECT count(*) FROM prom_data.%s", table)).Scan(&rowsInTable)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -949,7 +949,7 @@ func TestSQLDropMetricChunk(t *testing.T) {
}

count := 0
err = db.QueryRow(context.Background(), `SELECT count(*) FROM prom.test`).Scan(&count)
err = db.QueryRow(context.Background(), `SELECT count(*) FROM prom_data.test`).Scan(&count)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -1025,7 +1025,7 @@ func TestSQLDropChunk(t *testing.T) {
}

cnt := 0
err = db.QueryRow(context.Background(), "SELECT count(*) FROM show_chunks('prom.test')").Scan(&cnt)
err = db.QueryRow(context.Background(), "SELECT count(*) FROM show_chunks('prom_data.test')").Scan(&cnt)
if err != nil {
t.Fatal(err)
}
Expand All @@ -1037,7 +1037,7 @@ func TestSQLDropChunk(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = db.QueryRow(context.Background(), "SELECT count(*) FROM show_chunks('prom.test')").Scan(&cnt)
err = db.QueryRow(context.Background(), "SELECT count(*) FROM show_chunks('prom_data.test')").Scan(&cnt)
if err != nil {
t.Fatal(err)
}
Expand All @@ -1050,7 +1050,7 @@ func TestSQLDropChunk(t *testing.T) {
t.Fatal(err)
}
//test2 isn't affected
err = db.QueryRow(context.Background(), "SELECT count(*) FROM show_chunks('prom.test2')").Scan(&cnt)
err = db.QueryRow(context.Background(), "SELECT count(*) FROM show_chunks('prom_data.test2')").Scan(&cnt)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pgmodel/migrations/migration_files_generated.go

Large diffs are not rendered by default.

29 changes: 15 additions & 14 deletions pkg/pgmodel/migrations/sql/1_base_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CREATE SCHEMA IF NOT EXISTS SCHEMA_CATALOG; -- catalog tables + internal functio
CREATE SCHEMA IF NOT EXISTS SCHEMA_PROM; -- data tables + public functions
CREATE SCHEMA IF NOT EXISTS SCHEMA_SERIES;
CREATE SCHEMA IF NOT EXISTS SCHEMA_METRIC;
CREATE SCHEMA IF NOT EXISTS SCHEMA_DATA;


CREATE EXTENSION IF NOT EXISTS timescaledb WITH SCHEMA public;
Expand Down Expand Up @@ -90,21 +91,21 @@ CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.make_metric_table()
AS $func$
DECLARE
BEGIN
EXECUTE format('CREATE TABLE SCHEMA_PROM.%I(time TIMESTAMPTZ, value DOUBLE PRECISION, series_id INT)',
EXECUTE format('CREATE TABLE SCHEMA_DATA.%I(time TIMESTAMPTZ, value DOUBLE PRECISION, series_id INT)',
NEW.table_name);
EXECUTE format('CREATE INDEX ON SCHEMA_PROM.%I (series_id, time) INCLUDE (value)',
EXECUTE format('CREATE INDEX ON SCHEMA_DATA.%I (series_id, time) INCLUDE (value)',
NEW.table_name);
PERFORM create_hypertable(format('SCHEMA_PROM.%I', NEW.table_name), 'time',
PERFORM create_hypertable(format('SCHEMA_DATA.%I', NEW.table_name), 'time',
chunk_time_interval=>SCHEMA_CATALOG.get_default_chunk_interval());
EXECUTE format($$
ALTER TABLE SCHEMA_PROM.%I SET (
ALTER TABLE SCHEMA_DATA.%I SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'series_id',
timescaledb.compress_orderby = 'time'
); $$, NEW.table_name);

--chunks where the end time is before now()-10 minutes will be compressed
PERFORM add_compress_chunks_policy(format('SCHEMA_PROM.%I', NEW.table_name), INTERVAL '10 minutes');
PERFORM add_compress_chunks_policy(format('SCHEMA_DATA.%I', NEW.table_name), INTERVAL '10 minutes');
RETURN NEW;
END
$func$
Expand Down Expand Up @@ -272,7 +273,7 @@ BEGIN
INTO metric_table;
--lock as for ALTER TABLE because we are in effect changing the schema here
--also makes sure the next_position below is correct in terms of concurrency
EXECUTE format('LOCK TABLE SCHEMA_PROM.%I IN SHARE UPDATE EXCLUSIVE MODE', metric_table);
EXECUTE format('LOCK TABLE SCHEMA_DATA.%I IN SHARE UPDATE EXCLUSIVE MODE', metric_table);
--second check after lock
SELECT
pos
Expand Down Expand Up @@ -596,7 +597,7 @@ AS $func$
--set interval while adding 1% of randomness to the interval so that chunks are not aligned so that
--chunks are staggered for compression jobs.
SELECT set_chunk_time_interval(
format('SCHEMA_PROM.%I',(SELECT table_name FROM SCHEMA_PROM.get_or_create_metric_table_name(metric_name)))::regclass,
format('SCHEMA_DATA.%I',(SELECT table_name FROM SCHEMA_PROM.get_or_create_metric_table_name(metric_name)))::regclass,
new_interval * (1.0+((random()*0.01)-0.005)));
$func$
LANGUAGE SQL VOLATILE;
Expand Down Expand Up @@ -713,7 +714,7 @@ BEGIN
INTO STRICT time_dimension_id
FROM _timescaledb_catalog.hypertable h
INNER JOIN _timescaledb_catalog.dimension d ON (d.hypertable_id = h.id)
WHERE h.schema_name = 'SCHEMA_PROM' AND h.table_name = metric_table
WHERE h.schema_name = 'SCHEMA_DATA' AND h.table_name = metric_table
ORDER BY d.id ASC
LIMIT 1;

Expand All @@ -740,18 +741,18 @@ BEGIN
$query$
WITH potentially_drop_series AS (
SELECT distinct series_id
FROM SCHEMA_PROM.%1$I
FROM SCHEMA_DATA.%1$I
WHERE time < %2$L
EXCEPT
SELECT distinct series_id
FROM SCHEMA_PROM.%1$I
FROM SCHEMA_DATA.%1$I
WHERE time >= %2$L AND time < %3$L
), confirmed_drop_series AS (
SELECT series_id
FROM potentially_drop_series
WHERE NOT EXISTS (
SELECT 1
FROM SCHEMA_PROM.%1$I data_exists
FROM SCHEMA_DATA.%1$I data_exists
WHERE data_exists.series_id = potentially_drop_series.series_id AND time >= %3$L
--use chunk append + more likely to find something starting at earliest time
ORDER BY time ASC
Expand Down Expand Up @@ -783,7 +784,7 @@ BEGIN
WHERE id IN (SELECT * FROM confirmed_drop_labels);
$query$ USING label_ids;

PERFORM drop_chunks(table_name=>metric_table, schema_name=> 'SCHEMA_PROM', older_than=>older_than);
PERFORM drop_chunks(table_name=>metric_table, schema_name=> 'SCHEMA_DATA', older_than=>older_than);
RETURN true;
END
$func$
Expand All @@ -798,7 +799,7 @@ AS $$
FROM SCHEMA_CATALOG.metric m
WHERE EXISTS (
SELECT 1 FROM
show_chunks(hypertable=>format('%I.%I', 'SCHEMA_PROM', m.table_name),
show_chunks(hypertable=>format('%I.%I', 'SCHEMA_DATA', m.table_name),
older_than=>NOW() - SCHEMA_PROM.get_metric_retention_period(m.metric_name)))
--random order also to prevent starvation
ORDER BY random()
Expand Down Expand Up @@ -955,7 +956,7 @@ BEGIN
series.labels
%2$s
FROM
SCHEMA_PROM.%1$I AS data
SCHEMA_DATA.%1$I AS data
LEFT JOIN SCHEMA_CATALOG.series AS series ON (series.id = data.series_id AND series.metric_id = %3$L)
$$, table_name, label_value_cols, metric_id);
RETURN true;
Expand Down
4 changes: 2 additions & 2 deletions pkg/pgmodel/nan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ func TestSQLStaleNaN(t *testing.T) {
isStaleNaN := getBooleanSQLResult(t, db,
fmt.Sprintf(
`SELECT prom.is_stale_marker(value)
FROM prom."StaleMetric"
FROM prom_data."StaleMetric"
WHERE %s
`, timeClause), time.Duration(int64(time.Millisecond)*startMs), time.Duration(int64(time.Millisecond)*endMs))
isNormalNaN := getBooleanSQLResult(t, db,
fmt.Sprintf(
`SELECT prom.is_normal_nan(value)
FROM prom."StaleMetric"
FROM prom_data."StaleMetric"
WHERE %s
`, timeClause), time.Duration(int64(time.Millisecond)*startMs), time.Duration(int64(time.Millisecond)*endMs))
if c.isStaleNaN {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pgmodel/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
promSchema = "prom"
seriesViewSchema = "prom_series"
metricViewSchema = "prom_metric"
dataSchema = "prom_data"
catalogSchema = "_prom_catalog"

getMetricsTableSQL = "SELECT table_name FROM " + promSchema + ".get_metric_table_name_if_exists($1)"
Expand Down Expand Up @@ -537,7 +538,7 @@ func (h *insertHandler) flushPending(pendingElem *list.Element) {

_, err := h.conn.CopyFrom(
context.Background(),
pgx.Identifier{promSchema, pending.metricTable},
pgx.Identifier{dataSchema, pending.metricTable},
copyColumns,
&pending.batch,
)
Expand Down
22 changes: 11 additions & 11 deletions pkg/pgmodel/pgx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func TestPGXInserterInsertData(t *testing.T) {
if err != nil {
t.Fatalf("error when fetching metric table name: %s", err)
}
tNames = append(tNames, pgx.Identifier{promSchema, realTableName})
tNames = append(tNames, pgx.Identifier{dataSchema, realTableName})
}

// Sorting because range over a map gives random iteration order.
Expand Down Expand Up @@ -736,7 +736,7 @@ func TestPGXQuerierQuery(t *testing.T) {
GROUP BY m.metric_name`,
`SELECT table_name FROM prom.get_metric_table_name_if_exists($1)`,
`SELECT (prom.label_array_to_key_value_array(s.labels)).*, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)
FROM "prom"."foo" m
FROM "prom_data"."foo" m
INNER JOIN _prom_catalog.series s
ON m.series_id = s.id
WHERE m.series_id IN (1)
Expand Down Expand Up @@ -831,7 +831,7 @@ func TestPGXQuerierQuery(t *testing.T) {
GROUP BY m.metric_name`,
`SELECT table_name FROM prom.get_metric_table_name_if_exists($1)`,
`SELECT (prom.label_array_to_key_value_array(s.labels)).*, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)
FROM "prom"."foo" m
FROM "prom_data"."foo" m
INNER JOIN _prom_catalog.series s
ON m.series_id = s.id
WHERE m.series_id IN (1)
Expand Down Expand Up @@ -866,7 +866,7 @@ func TestPGXQuerierQuery(t *testing.T) {
},
sqlQueries: []string{`SELECT table_name FROM prom.get_metric_table_name_if_exists($1)`,
`SELECT (prom.label_array_to_key_value_array(s.labels)).*, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)
FROM "prom"."bar" m
FROM "prom_data"."bar" m
INNER JOIN _prom_catalog.series s
ON m.series_id = s.id
WHERE labels && (SELECT COALESCE(array_agg(l.id), array[]::int[]) FROM _prom_catalog.label l WHERE l.key = $1 and l.value = $2)
Expand Down Expand Up @@ -905,7 +905,7 @@ func TestPGXQuerierQuery(t *testing.T) {
GROUP BY m.metric_name`,
`SELECT table_name FROM prom.get_metric_table_name_if_exists($1)`,
`SELECT (prom.label_array_to_key_value_array(s.labels)).*, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)
FROM "prom"."foo" m
FROM "prom_data"."foo" m
INNER JOIN _prom_catalog.series s
ON m.series_id = s.id
WHERE m.series_id IN (1)
Expand All @@ -914,7 +914,7 @@ func TestPGXQuerierQuery(t *testing.T) {
GROUP BY s.id`,
`SELECT table_name FROM prom.get_metric_table_name_if_exists($1)`,
`SELECT (prom.label_array_to_key_value_array(s.labels)).*, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)
FROM "prom"."bar" m
FROM "prom_data"."bar" m
INNER JOIN _prom_catalog.series s
ON m.series_id = s.id
WHERE m.series_id IN (1)
Expand Down Expand Up @@ -964,7 +964,7 @@ func TestPGXQuerierQuery(t *testing.T) {
GROUP BY m.metric_name`,
`SELECT table_name FROM prom.get_metric_table_name_if_exists($1)`,
`SELECT (prom.label_array_to_key_value_array(s.labels)).*, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)
FROM "prom"."foo" m
FROM "prom_data"."foo" m
INNER JOIN _prom_catalog.series s
ON m.series_id = s.id
WHERE m.series_id IN (1)
Expand All @@ -973,7 +973,7 @@ func TestPGXQuerierQuery(t *testing.T) {
GROUP BY s.id`,
`SELECT table_name FROM prom.get_metric_table_name_if_exists($1)`,
`SELECT (prom.label_array_to_key_value_array(s.labels)).*, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)
FROM "prom"."bar" m
FROM "prom_data"."bar" m
INNER JOIN _prom_catalog.series s
ON m.series_id = s.id
WHERE m.series_id IN (1)
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func TestPGXQuerierQuery(t *testing.T) {
GROUP BY m.metric_name`,
`SELECT table_name FROM prom.get_metric_table_name_if_exists($1)`,
`SELECT (prom.label_array_to_key_value_array(s.labels)).*, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)
FROM "prom"."metric" m
FROM "prom_data"."metric" m
INNER JOIN _prom_catalog.series s
ON m.series_id = s.id
WHERE m.series_id IN (1,99,98)
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func TestPGXQuerierQuery(t *testing.T) {
GROUP BY m.metric_name`,
`SELECT table_name FROM prom.get_metric_table_name_if_exists($1)`,
`SELECT (prom.label_array_to_key_value_array(s.labels)).*, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)
FROM "prom"."metric" m
FROM "prom_data"."metric" m
INNER JOIN _prom_catalog.series s
ON m.series_id = s.id
WHERE m.series_id IN (1,4,5)
Expand Down Expand Up @@ -1113,7 +1113,7 @@ func TestPGXQuerierQuery(t *testing.T) {
GROUP BY m.metric_name`,
`SELECT table_name FROM prom.get_metric_table_name_if_exists($1)`,
`SELECT (prom.label_array_to_key_value_array(s.labels)).*, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)
FROM "prom"."metric" m
FROM "prom_data"."metric" m
INNER JOIN _prom_catalog.series s
ON m.series_id = s.id
WHERE m.series_id IN (1,2)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pgmodel/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func buildMetricNameSeriesIDQuery(cases []string) string {
func buildTimeseriesByLabelClausesQuery(filter metricTimeRangeFilter, cases []string) string {
return fmt.Sprintf(
timeseriesByMetricSQLFormat,
pgx.Identifier{promSchema, filter.metric}.Sanitize(),
pgx.Identifier{dataSchema, filter.metric}.Sanitize(),
strings.Join(cases, " AND "),
filter.startTime,
filter.endTime,
Expand All @@ -257,7 +257,7 @@ func buildTimeseriesBySeriesIDQuery(filter metricTimeRangeFilter, series []Serie
}
return fmt.Sprintf(
timeseriesBySeriesIDsSQLFormat,
pgx.Identifier{promSchema, filter.metric}.Sanitize(),
pgx.Identifier{dataSchema, filter.metric}.Sanitize(),
strings.Join(s, ","),
filter.startTime,
filter.endTime,
Expand Down
4 changes: 2 additions & 2 deletions pkg/pgmodel/query_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,11 +893,11 @@ func TestPromQL(t *testing.T) {
}

if connErr != nil {
t.Errorf("unexpected error returned:\ngot\n%s\nwanted\nnil", err)
t.Fatalf("unexpected error returned:\ngot\n%s\nwanted\nnil", connErr)
}

if promErr != nil {
t.Errorf("unexpected error returned:\ngot\n%s\nwanted\nnil", err)
t.Fatalf("unexpected error returned:\ngot\n%v\nwanted\nnil", promErr)
}

// Length checking is for case when query returns an empty results,
Expand Down

0 comments on commit 20bf94f

Please sign in to comment.