Skip to content

Commit

Permalink
Add support for InfluxDB retention policies
Browse files Browse the repository at this point in the history
Because of the behaviour of the InfluxDB query
'SHOW MEASUREMENTS' where it is not specified which measurements
belong to which retention policy, it was impossible to extract
a measure from other than the default RP. This commit removes a
redundant check for the presence of a measurement when doing
migration of a single measurement and modifies the generated
influx queries for schema exploration in order to properly select
a measurement in a specified RP. Passing the name of the measurement
as "retention_policy"."measurement name" now exports the proper data
  • Loading branch information
Blagoj Atanasovski authored and atanasovskib committed Apr 18, 2019
1 parent 4090f30 commit 3d60065
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 42 deletions.
9 changes: 6 additions & 3 deletions README.md
Expand Up @@ -53,7 +53,9 @@ $ cd $GOPATH/bin/
$ ./outflux schema-transfer --help
```

Usage of the is `outflux schema-transfer database [measure1 measure2 ...] [flags]`. Where database is the name of the InfluxDB database you wish to export. `[measure1 ...] ` are optional and if specified will export only those measurements from the selected database.
Usage of the is `outflux schema-transfer database [measure1 measure2 ...] [flags]`. Where database is the name of the InfluxDB database you wish to export. `[measure1 ...] ` are optional and if specified will export only those measurements from the selected database.
Additionally you can specify the retention policy as `retention_policy.measure` or `"retention-policy"."measure name"` if some of the identifiers contain a space or dash.
⚠️ *The resulting target table will be named `"retention_policy.measure"` in TimescaleDB*

For example `outflux schema-transfer benchmark cpu mem` will discover the schema for the `cpu` and `mem` measurements from the `benchmark` database.

Expand Down Expand Up @@ -133,9 +135,10 @@ $ outflux migrate benchmark \
> --output-conn='dbname=targetdb user=test password=test' \
```

* Export only measurement 'cpu' from the 'benchmark' drop the existing 'cpu' table in 'targetdb' if exists, create if not
* Export only measurement 'cpu' from 'two_week' retention policy in the 'benchmark' database.
Drop the existing '"two_week.cpu"' table in 'targetdb' if exists, create if not
```bash
$ outflux migrate benchmark cpu \
$ outflux migrate benchmark two_week.cpu \
> --input-user=test \
> --input-pass=test \
> --output-con='dbname=targetdb user=test pass=test'\
Expand Down
76 changes: 74 additions & 2 deletions cmd/outflux/migrate_i_test.go
@@ -1,5 +1,3 @@
// +build integration

package main

import (
Expand Down Expand Up @@ -281,3 +279,77 @@ func TestMigrateRenameOutputSchema(t *testing.T) {
t.Errorf("expected time > %v and field1=%d and tags={\"tag1\": \"1\"}\ngot: time %s, field1=%d, tags=%s", start, value, time, field1, tagsCol)
}
}

func TestMigrateRetentionPolicy(t *testing.T) {
//prepare influx db
start := time.Now().UTC()
db := "test_rp"
targetSchema := "some_schema"
rp := "rp"
measure := "test measure"
tag := "tag1"
field := "field1"
value := 1
tagValue := "1"
tags := map[string]string{tag: tagValue}
fieldValues := map[string]interface{}{field: value}
if err := testutils.DeleteTimescaleDb(db); err != nil {
t.Fatalf("could not delete if exists ts db: %v", err)
}
if err := testutils.PrepareServersForITest(db); err != nil {
t.Fatalf("could not prepare servers: %v", err)
}
if err := testutils.CreateTimescaleSchema(db, targetSchema); err != nil {
t.Fatalf("could not create target schema: %v", err)
}
defer testutils.ClearServersAfterITest(db)

err := testutils.CreateInfluxRP(db, rp)
if err != nil {
t.Fatal(err)
}
err = testutils.CreateInfluxMeasureWithRP(db, rp, measure, []*map[string]string{&tags}, []*map[string]interface{}{&fieldValues})
if err != nil {
t.Fatal(err)
}

// run
connConf, config := defaultConfig(db, rp+"."+measure)
connConf.OutputSchema = targetSchema
config.TagsAsJSON = true
config.TagsCol = "tags"
appContext := initAppContext()
err = migrate(appContext, connConf, config)
if err != nil {
t.Fatal(err)
}

// check
dbConn, err := testutils.OpenTSConn(db)
if err != nil {
t.Fatalf("could not open db conn: %v", err)
}
defer dbConn.Close()

rows, err := dbConn.Query(fmt.Sprintf(`SELECT * FROM %s."%s"`, targetSchema, measure))
if err != nil {
t.Fatal(err)
}

defer rows.Close()
var time time.Time
var field1 int
var tagsCol string
if !rows.Next() {
t.Fatal("couldn't check state of TS DB")
}

err = rows.Scan(&time, &tagsCol, &field1)
if err != nil {
t.Fatal("couldn't check state of TS DB")
}

if time.Before(start) || field1 != value || tagsCol != "{\"tag1\": \"1\"}" {
t.Errorf("expected time > %v and field1=%d and tags={\"tag1\": \"1\"}\ngot: time %s, field1=%d, tags=%s", start, value, time, field1, tagsCol)
}
}
32 changes: 22 additions & 10 deletions internal/extraction/influx/query_building.go
Expand Up @@ -9,24 +9,27 @@ import (
)

const (
selectQueryDoubleBoundTemplate = "SELECT %s\nFROM \"%s\"\nWHERE time >= '%s' AND time <= '%s'"
selectQueryLowerBoundTemplate = "SELECT %s\nFROM \"%s\"\nWHERE time >= '%s'"
selectQueryUpperBoundTemplate = "SELECT %s\nFROM \"%s\"\nWHERE time <= '%s'"
selectQueryNoBoundTemplate = "SELECT %s\nFROM \"%s\""
limitSuffixTemplate = "\nLIMIT %d"
selectQueryDoubleBoundTemplate = "SELECT %s FROM %s WHERE time >= '%s' AND time <= '%s'"
selectQueryLowerBoundTemplate = "SELECT %s FROM %s WHERE time >= '%s'"
selectQueryUpperBoundTemplate = "SELECT %s FROM %s WHERE time <= '%s'"
selectQueryNoBoundTemplate = "SELECT %s FROM %s"
limitSuffixTemplate = "LIMIT %d"
measurementNameTemplate = `"%s"`
measurementNameWithRPTemplate = `"%s"."%s"`
)

func buildSelectCommand(config *config.MeasureExtraction, columns []*idrf.Column) string {
projection := buildProjection(columns)
measurementName := buildMeasurementName(config.Measure)
var command string
if config.From != "" && config.To != "" {
command = fmt.Sprintf(selectQueryDoubleBoundTemplate, projection, config.Measure, config.From, config.To)
command = fmt.Sprintf(selectQueryDoubleBoundTemplate, projection, measurementName, config.From, config.To)
} else if config.From != "" {
command = fmt.Sprintf(selectQueryLowerBoundTemplate, projection, config.Measure, config.From)
command = fmt.Sprintf(selectQueryLowerBoundTemplate, projection, measurementName, config.From)
} else if config.To != "" {
command = fmt.Sprintf(selectQueryUpperBoundTemplate, projection, config.Measure, config.To)
command = fmt.Sprintf(selectQueryUpperBoundTemplate, projection, measurementName, config.To)
} else {
command = fmt.Sprintf(selectQueryNoBoundTemplate, projection, config.Measure)
command = fmt.Sprintf(selectQueryNoBoundTemplate, projection, measurementName)
}

if config.Limit == 0 {
Expand All @@ -37,10 +40,19 @@ func buildSelectCommand(config *config.MeasureExtraction, columns []*idrf.Column
return fmt.Sprintf("%s %s", command, limit)
}

func buildMeasurementName(measurement string) string {
if !strings.Contains(measurement, ".") {
return fmt.Sprintf(measurementNameTemplate, measurement)
}

parts := strings.SplitN(measurement, ".", 2)
return fmt.Sprintf(measurementNameWithRPTemplate, parts[0], parts[1])
}

func buildProjection(columns []*idrf.Column) string {
columnNames := make([]string, len(columns))
for i, column := range columns {
columnNames[i] = fmt.Sprintf("\"%s\"", column.Name)
columnNames[i] = fmt.Sprintf(`"%s"`, column.Name)
}

return strings.Join(columnNames, ", ")
Expand Down
97 changes: 97 additions & 0 deletions internal/extraction/influx/query_building_test.go
@@ -0,0 +1,97 @@
package influx

import (
"testing"

"github.com/timescale/outflux/internal/extraction/config"
"github.com/timescale/outflux/internal/idrf"
)

func TestBuildMeasurementName(t *testing.T) {
testCases := []struct {
in string
exp string
}{
{in: "measure", exp: `"measure"`},
{in: "rp.measure", exp: `"rp"."measure"`},
{in: "rp.measure name", exp: `"rp"."measure name"`},
{in: "rp name.measure.name", exp: `"rp name"."measure.name"`},
}

for _, tc := range testCases {
out := buildMeasurementName(tc.in)
if out != tc.exp {
t.Errorf("expected: %s, got: %s", tc.exp, out)
}
}
}

func TestBuildProjection(t *testing.T) {
testCases := []struct {
in []*idrf.Column
exp string
}{
{in: []*idrf.Column{{Name: "col1"}}, exp: `"col1"`},
{in: []*idrf.Column{{Name: "col 1"}}, exp: `"col 1"`},
{in: []*idrf.Column{{Name: "col 1"}, {Name: "col 2"}}, exp: `"col 1", "col 2"`},
}

for _, tc := range testCases {
out := buildProjection(tc.in)
if out != tc.exp {
t.Errorf("expected: %s, got: %s", tc.exp, out)
}
}
}

func TestBuildSelectCommand(t *testing.T) {
testCases := []struct {
measure string
columns []*idrf.Column
from string
to string
limit uint64
exp string
}{
{
measure: "m",
columns: []*idrf.Column{{Name: "col1"}},
exp: `SELECT "col1" FROM "m"`,
}, {
measure: "rp.m",
columns: []*idrf.Column{{Name: "col1"}, {Name: "col 2"}},
from: "a",
exp: `SELECT "col1", "col 2" FROM "rp"."m" WHERE time >= 'a'`,
}, {
measure: "m",
columns: []*idrf.Column{{Name: "col1"}},
to: "b",
exp: `SELECT "col1" FROM "m" WHERE time <= 'b'`,
}, {
measure: "m",
columns: []*idrf.Column{{Name: "col1"}},
from: "a",
to: "b",
exp: `SELECT "col1" FROM "m" WHERE time >= 'a' AND time <= 'b'`,
}, {
measure: "m",
columns: []*idrf.Column{{Name: "col1"}},
limit: 11,
exp: `SELECT "col1" FROM "m" LIMIT 11`,
},
}

for _, tc := range testCases {
config := &config.MeasureExtraction{
Measure: tc.measure,
From: tc.from,
To: tc.to,
Limit: tc.limit,
}

out := buildSelectCommand(config, tc.columns)
if out != tc.exp {
t.Errorf("expected: %s, got: %s", tc.exp, out)
}
}
}
12 changes: 10 additions & 2 deletions internal/schemamanagement/influx/discovery/field_discovery.go
Expand Up @@ -2,14 +2,16 @@ package discovery

import (
"fmt"
"strings"

influx "github.com/influxdata/influxdb/client/v2"
"github.com/timescale/outflux/internal/idrf"
"github.com/timescale/outflux/internal/schemamanagement/influx/influxqueries"
)

const (
showFieldsQueryTemplate = "SHOW FIELD KEYS FROM \"%s\""
showFieldsQueryTemplate = "SHOW FIELD KEYS FROM \"%s\""
showFieldsQueryWithRPTemplate = "SHOW FIELD KEYS FROM \"%s\".\"%s\""
)

// FieldExplorer defines an API for discoering InfluxDB fields of a specified measurement
Expand Down Expand Up @@ -37,7 +39,13 @@ func (fe *defaultFieldExplorer) DiscoverMeasurementFields(influxClient influx.Cl
}

func (fe *defaultFieldExplorer) fetchMeasurementFields(influxClient influx.Client, database, measurement string) ([][2]string, error) {
showFieldsQuery := fmt.Sprintf(showFieldsQueryTemplate, measurement)
var showFieldsQuery string
if strings.Contains(measurement, ".") {
parts := strings.SplitN(measurement, ".", 2)
showFieldsQuery = fmt.Sprintf(showFieldsQueryWithRPTemplate, parts[0], parts[1])
} else {
showFieldsQuery = fmt.Sprintf(showFieldsQueryTemplate, measurement)
}
result, err := fe.queryService.ExecuteShowQuery(influxClient, database, showFieldsQuery)

if err != nil {
Expand Down
39 changes: 39 additions & 0 deletions internal/schemamanagement/influx/discovery/field_discovery_test.go
Expand Up @@ -77,3 +77,42 @@ func TestDiscoverMeasurementFields(t *testing.T) {
}
}
}

func TestDiscoverMeasurementFieldsWithRP(t *testing.T) {
var mockClient influx.Client
mockClient = &influxqueries.MockClient{}
database := "database"
measure := "rp.measure"

cases := []testCase{
{
showQueryResult: &influxqueries.InfluxShowResult{ // proper result
Values: [][]string{{"1", "boolean"}},
},
expectedTags: []*idrf.Column{
{Name: "1", DataType: idrf.IDRFBoolean},
},
},
}

for _, testCase := range cases {
fieldExplorer := defaultFieldExplorer{
queryService: mock(testCase),
}
result, err := fieldExplorer.DiscoverMeasurementFields(mockClient, database, measure)
if err != nil {
t.Errorf("unexpected error %v", err)
}

expected := testCase.expectedTags
if len(expected) != len(result) {
t.Errorf("еxpected result: '%v', got '%v'", expected, result)
}

for index, resColumn := range result {
if resColumn.Name != expected[index].Name || resColumn.DataType != expected[index].DataType {
t.Errorf("Expected column: %v, got %v", expected[index], resColumn)
}
}
}
}
12 changes: 10 additions & 2 deletions internal/schemamanagement/influx/discovery/tag_discovery.go
Expand Up @@ -2,14 +2,16 @@ package discovery

import (
"fmt"
"strings"

influx "github.com/influxdata/influxdb/client/v2"
"github.com/timescale/outflux/internal/idrf"
"github.com/timescale/outflux/internal/schemamanagement/influx/influxqueries"
)

const (
showTagsQueryTemplate = "SHOW TAG KEYS FROM \"%s\""
showTagsQueryTemplate = "SHOW TAG KEYS FROM \"%s\""
showTagsQueryWithRPTemplate = "SHOW TAG KEYS FROM \"%s\".\"%s\""
)

// TagExplorer Defines an API for discovering the tags of an InfluxDB measurement
Expand Down Expand Up @@ -40,7 +42,13 @@ func (te *defaultTagExplorer) DiscoverMeasurementTags(influxClient influx.Client
}

func (te *defaultTagExplorer) fetchMeasurementTags(influxClient influx.Client, database, measure string) ([]string, error) {
showTagsQuery := fmt.Sprintf(showTagsQueryTemplate, measure)
var showTagsQuery string
if strings.Contains(measure, ".") {
parts := strings.SplitN(measure, ".", 2)
showTagsQuery = fmt.Sprintf(showTagsQueryWithRPTemplate, parts[0], parts[1])
} else {
showTagsQuery = fmt.Sprintf(showTagsQueryTemplate, measure)
}
result, err := te.queryService.ExecuteShowQuery(influxClient, database, showTagsQuery)

if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions internal/schemamanagement/influx/discovery/tag_discovery_test.go
Expand Up @@ -86,9 +86,10 @@ func TestFetchMeasurementsShowTagsQuery(t *testing.T) {
measure string
db string
}{
{expectedQuery: `SHOW TAG KEYS FROM "measure"`,
measure: "measure",
db: "db",
{
expectedQuery: `SHOW TAG KEYS FROM "measure"`,
measure: "measure",
db: "db",
}, {
expectedQuery: `SHOW TAG KEYS FROM "measure 1"`,
measure: "measure 1",
Expand All @@ -97,6 +98,10 @@ func TestFetchMeasurementsShowTagsQuery(t *testing.T) {
expectedQuery: `SHOW TAG KEYS FROM "measure-2"`,
measure: "measure-2",
db: "db",
}, {
expectedQuery: `SHOW TAG KEYS FROM "rp"."measure-2"`,
measure: "rp.measure-2",
db: "db",
},
}
for _, tc := range testCases {
Expand Down

0 comments on commit 3d60065

Please sign in to comment.