Skip to content

Commit

Permalink
Add explicit support for retention policies
Browse files Browse the repository at this point in the history
Also fixes the bug when trying to migrate measurements
with dots in the name
  • Loading branch information
Blagoj Atanasovski authored and atanasovskib committed Jul 29, 2019
1 parent d1edc6d commit b8d2a76
Show file tree
Hide file tree
Showing 48 changed files with 320 additions and 514 deletions.
9 changes: 6 additions & 3 deletions README.md
Expand Up @@ -54,8 +54,7 @@ $ ./outflux schema-transfer --help
```

Usage of the is `outflux schema-transfer database [measure1 measure2 ...] [flags]`. Where database is the name of the InfluxDB database you wish to export. `[measure1 ...] ` are optional and if specified will export only those measurements from the selected database.
Additionally you can specify the retention policy as `retention_policy.measure` or `"retention-policy"."measure name"` if some of the identifiers contain a space or dash.
⚠️ *The resulting target table will be named `"retention_policy.measure"` in TimescaleDB*
Additionally you can specify the retention policy with the `retention-policy` flag.

For example `outflux schema-transfer benchmark cpu mem` will discover the schema for the `cpu` and `mem` measurements from the `benchmark` database.

Expand All @@ -67,6 +66,7 @@ Available flags for schema-transfer are:
| input-pass | string | | Password to use when connecting to the input database |
| input-user | string | | Username to use when connecting to the input database |
| input-unsafe-https | bool | false | Should 'InsecureSkipVerify' be passed to the input connection |
| retention-policy | string | autogen | The retention policy to select the tags and fields from |
| output-conn | string | sslmode=disable | Connection string to use to connect to the output database|
| output-schema | string | | The schema of the output database that the data will be inserted into |
| schema-strategy | string | CreateIfMissing | Strategy to use for preparing the schema of the output database. Valid options: ValidateOnly, CreateIfMissing, DropAndCreate, DropCascadeAndCreate |
Expand All @@ -85,7 +85,9 @@ $ cd $GOPATH/bin/
$ ./outflux migrate --help
```

Usage of the command is `outflux migrate database [measure1 measure2 ...] [flags]`. Where database is the name of the `database` you wish to export. `[measure1 measure2 ...]` are optional and if specified will export only those measurements from the selected database.
Usage of the command is `outflux migrate database [measure1 measure2 ...] [flags]`. Where database is the name of the `database` you wish to export. `[measure1 measure2 ...]` are optional and if specified will export only those measurements from the selected database.

The retention policy can be specified with the `retention-policy` flag. By default the 'autogen' retention policy is used.

For example `outflux migrate benchmark cpu mem` will export the `cpu` and `mem` measurements from the `benchmark` database. On the other hand `outflux migrate benchmark` will export all measurements in the `benchmark` database.

Expand All @@ -97,6 +99,7 @@ Available flags are:
| input-pass | string | | Password to use when connecting to the input database |
| input-user | string | | Username to use when connecting to the input database |
| input-unsafe-https | bool | false | Should 'InsecureSkipVerify' be passed to the input connection |
| retention-policy | string | autogen | The retention policy to select the data from |
| limit | uint64 | 0 | If specified will limit the export points to its value. 0 = NO LIMIT |
| from | string | | If specified will export data with a timestamp >= of its value. Accepted format: RFC3339 |
| to | string | | If specified will export data with a timestamp <= of its value. Accepted format: RFC3339 |
Expand Down
40 changes: 24 additions & 16 deletions cmd/outflux/app_context.go
Expand Up @@ -11,33 +11,41 @@ import (
)

type appContext struct {
ics connections.InfluxConnectionService
tscs connections.TSConnectionService
pipeService cli.PipeService
influxQueryService influxqueries.InfluxQueryService
extractorService extraction.ExtractorService
schemaManagerService schemamanagement.SchemaManagerService
transformerService cli.TransformerService
ics connections.InfluxConnectionService
tscs connections.TSConnectionService
pipeService cli.PipeService
influxQueryService influxqueries.InfluxQueryService
influxTagExplorer discovery.TagExplorer
influxFieldExplorer discovery.FieldExplorer
influxMeasureExplorer discovery.MeasureExplorer
extractorService extraction.ExtractorService
schemaManagerService schemamanagement.SchemaManagerService
transformerService cli.TransformerService
}

func initAppContext() *appContext {
tscs := connections.NewTSConnectionService()
ics := connections.NewInfluxConnectionService()
ingestorService := ingestion.NewIngestorService()
influxQueryService := influxqueries.NewInfluxQueryService()
schemaManagerService := schemamanagement.NewSchemaManagerService(influxQueryService)
extractorService := extraction.NewExtractorService(schemaManagerService)
influxTagExplorer := discovery.NewTagExplorer(influxQueryService)
influxFieldExplorer := discovery.NewFieldExplorer(influxQueryService)
influxMeasureExplorer := discovery.NewMeasureExplorer(influxQueryService, influxFieldExplorer)
schemaManagerService := schemamanagement.NewSchemaManagerService(influxMeasureExplorer, influxTagExplorer, influxFieldExplorer)
extractorService := extraction.NewExtractorService(schemaManagerService)

transformerService := cli.NewTransformerService(influxTagExplorer, influxFieldExplorer)
pipeService := cli.NewPipeService(ingestorService, extractorService, transformerService)
return &appContext{
ics: ics,
tscs: tscs,
pipeService: pipeService,
influxQueryService: influxQueryService,
extractorService: extractorService,
schemaManagerService: schemaManagerService,
transformerService: transformerService,
ics: ics,
tscs: tscs,
pipeService: pipeService,
influxQueryService: influxQueryService,
extractorService: extractorService,
schemaManagerService: schemaManagerService,
transformerService: transformerService,
influxTagExplorer: influxTagExplorer,
influxFieldExplorer: influxFieldExplorer,
influxMeasureExplorer: influxMeasureExplorer,
}
}
12 changes: 9 additions & 3 deletions cmd/outflux/migrate.go
Expand Up @@ -39,6 +39,7 @@ func initMigrateCmd() *cobra.Command {
},
}
flagparsers.AddConnectionFlagsToCmd(migrateCmd)
migrateCmd.PersistentFlags().String(flagparsers.RetentionPolicyFlag, flagparsers.DefaultRetentionPolicy, "The retention policy to select the data from")
migrateCmd.PersistentFlags().String(flagparsers.SchemaStrategyFlag, flagparsers.DefaultSchemaStrategy.String(), "Strategy to use for preparing the schema of the output database. Valid options: ValidateOnly, CreateIfMissing, DropAndCreate, DropCascadeAndCreate")
migrateCmd.PersistentFlags().String(flagparsers.FromFlag, "", "If specified will export data with a timestamp >= of it's value. Accepted format: RFC3339")
migrateCmd.PersistentFlags().String(flagparsers.ToFlag, "", "If specified will export data with a timestamp <= of it's value. Accepted format: RFC3339")
Expand All @@ -53,6 +54,7 @@ func initMigrateCmd() *cobra.Command {
migrateCmd.PersistentFlags().String(flagparsers.TagsColumnFlag, flagparsers.DefaultTagsColumn, "When "+flagparsers.TagsAsJSONFlag+" is set, this column specifies the name of the JSON column for the tags")
migrateCmd.PersistentFlags().Bool(flagparsers.FieldsAsJSONFlag, flagparsers.DefaultFieldsAsJSON, "If this flag is set to true, then the Fields of the influx measures being exported will be combined into a single JSONb column in Timescale")
migrateCmd.PersistentFlags().String(flagparsers.FieldsColumnFlag, flagparsers.DefaultFieldsColumn, "When "+flagparsers.FieldsAsJSONFlag+" is set, this column specifies the name of the JSON column for the fields")
migrateCmd.PersistentFlags().String(flagparsers.OutputSchemaFlag, flagparsers.DefaultOutputSchema, "The schema of the output database that the data will be inserted into")

return migrateCmd
}
Expand All @@ -63,9 +65,13 @@ func migrate(app *appContext, connArgs *cli.ConnectionConfig, args *cli.Migratio
log.SetOutput(ioutil.Discard)
}

var err error
if len(connArgs.InputMeasures) == 0 {
connArgs.InputMeasures, err = discoverMeasures(app, connArgs)
influxConn, err := app.ics.NewConnection(influxConnParams(connArgs))
if err != nil {
return fmt.Errorf("could not open connection to Influx Server\n%v", err)
}
connArgs.InputMeasures, err = discoverMeasures(app, influxConn, connArgs.InputDb, args.RetentionPolicy)
influxConn.Close()
if err != nil {
return fmt.Errorf("could not discover the available measures for the input db '%s'", connArgs.InputDb)
}
Expand Down Expand Up @@ -120,7 +126,7 @@ func pipeRoutine(
}
defer infConn.Close()
defer pgConn.Close()
pipe, err := app.pipeService.Create(infConn, pgConn, measure, connArgs, args)
pipe, err := app.pipeService.Create(infConn, pgConn, measure, connArgs.InputDb, args)
if err != nil {
pipeChannel <- fmt.Errorf("could not create execution pipeline for measure '%s'\n%v", measure, err)
return
Expand Down
8 changes: 5 additions & 3 deletions cmd/outflux/migrate_i_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/timescale/outflux/internal/cli"

ingestionConfig "github.com/timescale/outflux/internal/ingestion/config"
"github.com/timescale/outflux/internal/schemamanagement/schemaconfig"

Expand Down Expand Up @@ -243,7 +244,7 @@ func TestMigrateRenameOutputSchema(t *testing.T) {

// run
connConf, config := defaultConfig(db, measure)
connConf.OutputSchema = targetSchema
config.OutputSchema = targetSchema
config.TagsAsJSON = true
config.TagsCol = "tags"
appContext := initAppContext()
Expand Down Expand Up @@ -316,8 +317,9 @@ func TestMigrateRetentionPolicy(t *testing.T) {
}

// run
connConf, config := defaultConfig(db, rp+"."+measure)
connConf.OutputSchema = targetSchema
connConf, config := defaultConfig(db, measure)
config.OutputSchema = targetSchema
config.RetentionPolicy = rp
config.TagsAsJSON = true
config.TagsCol = "tags"
appContext := initAppContext()
Expand Down
8 changes: 5 additions & 3 deletions cmd/outflux/mocks_for_test.go
Expand Up @@ -20,19 +20,21 @@ type mockService struct {
inflSchemMngr schemamanagement.SchemaManager
}

func (m *mockService) Create(influx.Client, *pgx.Conn, string, *cli.ConnectionConfig, *cli.MigrationConfig) (pipeline.Pipe, error) {
func (m *mockService) Create(infConn influx.Client, tsConn *pgx.Conn, measure, inputDb string, conf *cli.MigrationConfig) (pipeline.Pipe, error) {
return m.pipe, m.pipeErr
}

func (m *mockService) NewConnection(arg *connections.InfluxConnectionParams) (influx.Client, error) {
return m.inflConn, m.inflConnErr
}

func (m *mockService) Influx(c influx.Client, a string) schemamanagement.SchemaManager {
func (m *mockService) Influx(c influx.Client, db, rp string) schemamanagement.SchemaManager {
return m.inflSchemMngr
}

func (m *mockService) TimeScale(dbConn *pgx.Conn) schemamanagement.SchemaManager { return nil }
func (m *mockService) TimeScale(dbConn *pgx.Conn, schema string) schemamanagement.SchemaManager {
return nil
}

type mockTsConnSer struct {
tsConn *pgx.Conn
Expand Down
50 changes: 23 additions & 27 deletions cmd/outflux/schema_transfer.go
Expand Up @@ -6,8 +6,9 @@ import (
"log"
"time"

influx "github.com/influxdata/influxdb/client/v2"
"github.com/jackc/pgx"
"github.com/timescale/outflux/internal/cli"
"github.com/timescale/outflux/internal/connections"

"github.com/spf13/cobra"
"github.com/timescale/outflux/internal/cli/flagparsers"
Expand Down Expand Up @@ -35,11 +36,14 @@ func initSchemaTransferCmd() *cobra.Command {
}

flagparsers.AddConnectionFlagsToCmd(schemaTransferCmd)
schemaTransferCmd.PersistentFlags().String(flagparsers.RetentionPolicyFlag, flagparsers.DefaultRetentionPolicy, "The retention policy to select the fields and tags from")
schemaTransferCmd.PersistentFlags().String(flagparsers.SchemaStrategyFlag, flagparsers.DefaultSchemaStrategy.String(), "Strategy to use for preparing the schema of the output database. Valid options: ValidateOnly, CreateIfMissing, DropAndCreate, DropCascadeAndCreate")
schemaTransferCmd.PersistentFlags().Bool(flagparsers.TagsAsJSONFlag, flagparsers.DefaultTagsAsJSON, "If this flag is set to true, then the Tags of the influx measures being exported will be combined into a single JSONb column in Timescale")
schemaTransferCmd.PersistentFlags().String(flagparsers.TagsColumnFlag, flagparsers.DefaultTagsColumn, "When "+flagparsers.TagsAsJSONFlag+" is set, this column specifies the name of the JSON column for the tags")
schemaTransferCmd.PersistentFlags().Bool(flagparsers.FieldsAsJSONFlag, flagparsers.DefaultFieldsAsJSON, "If this flag is set to true, then the Fields of the influx measures being exported will be combined into a single JSONb column in Timescale")
schemaTransferCmd.PersistentFlags().String(flagparsers.FieldsColumnFlag, flagparsers.DefaultFieldsColumn, "When "+flagparsers.FieldsAsJSONFlag+" is set, this column specifies the name of the JSON column for the fields")
schemaTransferCmd.PersistentFlags().String(flagparsers.OutputSchemaFlag, flagparsers.DefaultOutputSchema, "The schema of the output database that the data will be inserted into")

return schemaTransferCmd
}

Expand All @@ -53,16 +57,25 @@ func transferSchema(app *appContext, connArgs *cli.ConnectionConfig, args *cli.M
influxDb := connArgs.InputDb
log.Printf("Selected input database: %s\n", influxDb)
var err error

// connect to input and output database
infConn, pgConn, err := openConnections(app, connArgs)
if err != nil {
return fmt.Errorf("could not open connections to input and output database\n%v", err)
}
defer infConn.Close()
defer pgConn.Close()

// transfer the schema for all measures
if len(connArgs.InputMeasures) == 0 {
connArgs.InputMeasures, err = discoverMeasures(app, connArgs)
connArgs.InputMeasures, err = discoverMeasures(app, infConn, connArgs.InputDb, args.RetentionPolicy)
if err != nil {
return fmt.Errorf("could not discover the available measures for the input db '%s'", connArgs.InputDb)
}
}

for _, measure := range connArgs.InputMeasures {
err := routine(app, connArgs, args, measure)
err := transfer(app, connArgs.InputDb, args, infConn, pgConn, measure)
if err != nil {
return fmt.Errorf("could not transfer schema for measurement '%s'\n%v", measure, err)
}
Expand All @@ -73,37 +86,20 @@ func transferSchema(app *appContext, connArgs *cli.ConnectionConfig, args *cli.M
return nil
}

func discoverMeasures(app *appContext, connArgs *cli.ConnectionConfig) ([]string, error) {
client, err := app.ics.NewConnection(&connections.InfluxConnectionParams{
Server: connArgs.InputHost,
Username: connArgs.InputUser,
Password: connArgs.InputPass,
Database: connArgs.InputDb,
UnsafeHTTPS: connArgs.InputUnsafeHTTPS,
})
if err != nil {
return nil, err
}

schemaManager := app.schemaManagerService.Influx(client, connArgs.InputDb)
client.Close()
func discoverMeasures(app *appContext, influxConn influx.Client, db, rp string) ([]string, error) {
schemaManager := app.schemaManagerService.Influx(influxConn, db, rp)
return schemaManager.DiscoverDataSets()
}

func routine(
func transfer(
app *appContext,
connArgs *cli.ConnectionConfig,
inputDb string,
args *cli.MigrationConfig,
infConn influx.Client,
pgConn *pgx.Conn,
measure string) error {

infConn, pgConn, err := openConnections(app, connArgs)
if err != nil {
return fmt.Errorf("could not open connections to input and output database\n%v", err)
}
defer infConn.Close()
defer pgConn.Close()

pipe, err := app.pipeService.Create(infConn, pgConn, measure, connArgs, args)
pipe, err := app.pipeService.Create(infConn, pgConn, measure, inputDb, args)
if err != nil {
return fmt.Errorf("could not create execution pipeline for measure '%s'\n%v", measure, err)
}
Expand Down
18 changes: 3 additions & 15 deletions cmd/outflux/schema_transfer_test.go
Expand Up @@ -13,17 +13,6 @@ import (
"github.com/timescale/outflux/internal/schemamanagement/schemaconfig"
)

func TestDiscoverMeasuresErrorNewConnection(t *testing.T) {
app := &appContext{
ics: &mockService{inflConnErr: fmt.Errorf("error")},
}
connArgs := &cli.ConnectionConfig{}
res, err := discoverMeasures(app, connArgs)
if res != nil || err == nil {
t.Errorf("expected error, none received")
}
}

func TestDiscoverMeasures(t *testing.T) {
mockClient := &tdmc{}
mockSchemaMngr := &tdmsm{}
Expand All @@ -32,14 +21,13 @@ func TestDiscoverMeasures(t *testing.T) {
ics: mockAll,
schemaManagerService: mockAll,
}
connArgs := &cli.ConnectionConfig{}
_, err := discoverMeasures(app, connArgs)
_, err := discoverMeasures(app, mockClient, "db", "autogen")
if err != nil {
t.Errorf("unexpected error:%v", err)
}

if !mockClient.closed || !mockSchemaMngr.discoverCalled {
t.Errorf("expected closed: true, discover: true\ngot closed:%v, discover:%v", mockClient.closed, mockSchemaMngr.discoverCalled)
if mockClient.closed || !mockSchemaMngr.discoverCalled {
t.Errorf("expected closed: false, discover: true\ngot closed:%v, discover:%v", mockClient.closed, mockSchemaMngr.discoverCalled)
}
}

Expand Down
1 change: 0 additions & 1 deletion internal/cli/connection_config.go
Expand Up @@ -9,5 +9,4 @@ type ConnectionConfig struct {
InputPass string
InputUnsafeHTTPS bool
OutputDbConnString string
OutputSchema string
}
15 changes: 8 additions & 7 deletions internal/cli/extraction_conf_creator.go
Expand Up @@ -18,13 +18,14 @@ type defaultExtractionConfCreator struct{}

func (d *defaultExtractionConfCreator) create(pipeID, db, measure string, conf *MigrationConfig) *config.ExtractionConfig {
measureExtractionConf := &config.MeasureExtraction{
Database: db,
Measure: measure,
From: conf.From,
To: conf.To,
ChunkSize: conf.ChunkSize,
Limit: conf.Limit,
SchemaOnly: conf.SchemaOnly,
Database: db,
Measure: measure,
From: conf.From,
To: conf.To,
ChunkSize: conf.ChunkSize,
Limit: conf.Limit,
SchemaOnly: conf.SchemaOnly,
RetentionPolicy: conf.RetentionPolicy,
}

ex := &config.ExtractionConfig{
Expand Down
2 changes: 0 additions & 2 deletions internal/cli/flagparsers/connection_args_parser.go
Expand Up @@ -18,7 +18,6 @@ func FlagsToConnectionConfig(flags *pflag.FlagSet, args []string) (*cli.Connecti
inputHost, _ := flags.GetString(InputServerFlag)
inputUnsafe, _ := flags.GetBool(InputUnsafeHTTPSFlag)
outputConnString, _ := flags.GetString(OutputConnFlag)
schema, _ := flags.GetString(OutputSchemaFlag)
return &cli.ConnectionConfig{
InputDb: args[0],
InputMeasures: args[1:],
Expand All @@ -27,6 +26,5 @@ func FlagsToConnectionConfig(flags *pflag.FlagSet, args []string) (*cli.Connecti
InputPass: inputPass,
InputUnsafeHTTPS: inputUnsafe,
OutputDbConnString: outputConnString,
OutputSchema: schema,
}, nil
}
4 changes: 0 additions & 4 deletions internal/cli/flagparsers/connection_flags.go
Expand Up @@ -26,8 +26,4 @@ func AddConnectionFlagsToCmd(cmd *cobra.Command) {
OutputConnFlag,
DefaultOutputConn,
"Connection string to use to connect to the output database, overrides values in the PG environment variables")
cmd.PersistentFlags().String(
OutputSchemaFlag,
DefaultOutputSchema,
"The schema of the output database that the data will be inserted into")
}
2 changes: 2 additions & 0 deletions internal/cli/flagparsers/flags.go
Expand Up @@ -11,6 +11,7 @@ const (
InputUserFlag = "input-user"
InputPassFlag = "input-pass"
InputUnsafeHTTPSFlag = "input-unsafe-https"
RetentionPolicyFlag = "retention-policy"
OutputConnFlag = "output-conn"
SchemaStrategyFlag = "schema-strategy"
CommitStrategyFlag = "commit-strategy"
Expand All @@ -33,6 +34,7 @@ const (
DefaultInputUser = ""
DefaultInputPass = ""
DefaultInputUnsafeHTTPS = false
DefaultRetentionPolicy = "autogen"
DefaultOutputConn = "sslmode=disable"
DefaultOutputSchema = ""
DefaultSchemaStrategy = schemaconfig.CreateIfMissing
Expand Down

0 comments on commit b8d2a76

Please sign in to comment.