Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Refactor auth in prom-migrator to be passed as config.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Apr 20, 2021
1 parent 99c564a commit 4302d6f
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 132 deletions.
41 changes: 30 additions & 11 deletions cmd/prom-migrator/main.go
Expand Up @@ -45,6 +45,7 @@ type config struct {
progressEnabled bool
readerAuth utils.Auth
writerAuth utils.Auth
progressMetricAuth utils.Auth
}

func main() {
Expand All @@ -67,14 +68,6 @@ func main() {
os.Exit(1)
}
log.Info("msg", fmt.Sprintf("%v+", conf))
if err := utils.SetAuthStore(utils.Read, conf.readerAuth.ToHTTPClientConfig()); err != nil {
log.Error("msg", "could not set read-auth in authStore", "error", err)
os.Exit(1)
}
if err := utils.SetAuthStore(utils.Write, conf.writerAuth.ToHTTPClientConfig()); err != nil {
log.Error("msg", "could not set write-auth in authStore", "error", err)
os.Exit(1)
}

planConfig := &plan.Config{
Mint: conf.mint,
Expand All @@ -85,6 +78,7 @@ func main() {
ProgressEnabled: conf.progressEnabled,
ProgressMetricName: conf.progressMetricName,
ProgressMetricURL: conf.progressMetricURL,
HTTPConfig: conf.progressMetricAuth.ToHTTPClientConfig(),
}
planner, proceed, err := plan.Init(planConfig)
if err != nil {
Expand All @@ -101,12 +95,31 @@ func main() {
sigSlabRead = make(chan *plan.Slab)
)
cont, cancelFunc := context.WithCancel(context.Background())
read, err := reader.New(cont, conf.readURL, planner, conf.concurrentPulls, sigSlabRead)
readerConfig := reader.Config{
Context: cont,
Url: conf.readURL,
Plan: planner,
HTTPConfig: conf.readerAuth.ToHTTPClientConfig(),
ConcurrentPulls: conf.concurrentPulls,
SigSlabRead: sigSlabRead,
}
read, err := reader.New(readerConfig)
if err != nil {
log.Error("msg", "could not create reader", "error", err)
os.Exit(2)
}
write, err := writer.New(cont, conf.writeURL, conf.progressMetricName, conf.name, conf.concurrentPush, conf.progressEnabled, sigSlabRead)

writerConfig := writer.Config{
Context: cont,
Url: conf.writeURL,
HTTPConfig: conf.writerAuth.ToHTTPClientConfig(),
ProgressEnabled: conf.progressEnabled,
ProgressMetricName: conf.progressMetricName,
MigrationJobName: conf.name,
ConcurrentPush: conf.concurrentPush,
SigSlabRead: sigSlabRead,
}
write, err := writer.New(writerConfig)
if err != nil {
log.Error("msg", "could not create writer", "error", err)
os.Exit(2)
Expand Down Expand Up @@ -163,7 +176,7 @@ func parseFlags(conf *config, args []string) {
"set this to the remote write storage that the migrator is writing along with the progress-enabled.")
flag.BoolVar(&conf.progressEnabled, "progress-enabled", true, "This flag tells the migrator, whether or not to use the progress mechanism. It is helpful if you want to "+
"carry out migration with the same time-range. If this is enabled, the migrator will resume the migration from the last time, where it was stopped/interrupted. "+
"If you do not want any extra metric(s) while migration, you can set this to false. But, setting this to false will disble progress-metric and hence, the ability to resume migration.")
"If you do not want any extra metric(s) while migration, you can set this to false. But, setting this to false will disable progress-metric and hence, the ability to resume migration.")
// Authentication.
// TODO: Auth/password via password_file and bearer_token via bearer_token_file.
flag.StringVar(&conf.readerAuth.Username, "read-auth-username", "", "Auth username for remote-read storage.")
Expand All @@ -174,6 +187,12 @@ func parseFlags(conf *config, args []string) {
flag.StringVar(&conf.writerAuth.Password, "write-auth-password", "", "Auth password for remote-write storage.")
flag.StringVar(&conf.writerAuth.BearerToken, "write-auth-bearer-token", "", "Bearer token for remote-write storage. "+
"This should be mutually exclusive with username and password.")
flag.StringVar(&conf.progressMetricAuth.Username, "progress-metric-auth-username", "", "Read auth username for remote-write storage.")
flag.StringVar(&conf.progressMetricAuth.Password, "progress-metric-password", "", "Read auth password for remote-write storage.")
flag.StringVar(&conf.progressMetricAuth.BearerToken, "progress-metric-bearer-token", "", "Read bearer token for remote-write storage. "+
"This should be mutually exclusive with username and password.")
// TLS configurations.

_ = flag.CommandLine.Parse(args)
convertSecFlagToMs(conf)
}
Expand Down
92 changes: 82 additions & 10 deletions pkg/migration-tool/integration_tests/integration_test.go
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/prometheus/common/config"
plan "github.com/timescale/promscale/pkg/migration-tool/planner"
"github.com/timescale/promscale/pkg/migration-tool/reader"
"github.com/timescale/promscale/pkg/migration-tool/utils"
Expand Down Expand Up @@ -75,11 +76,31 @@ func TestReaderWriterPlannerIntegrationWithoutHalts(t *testing.T) {
sigSlabRead = make(chan *plan.Slab)
)
cont, cancelFunc := context.WithCancel(context.Background())
read, err := reader.New(cont, conf.readURL, planner, conf.concurrentPulls, sigSlabRead)

readerConfig := reader.Config{
Context: cont,
Url: conf.readURL,
Plan: planner,
HTTPConfig: config.HTTPClientConfig{},
ConcurrentPulls: conf.concurrentPulls,
SigSlabRead: sigSlabRead,
}
read, err := reader.New(readerConfig)
if err != nil {
t.Fatal("msg", "could not create reader", "error", err.Error())
}
write, err := writer.New(cont, conf.writeURL, conf.progressMetricName, conf.name, conf.numShards, conf.progressEnabled, sigSlabRead)

writerConfig := writer.Config{
Context: cont,
Url: conf.writeURL,
HTTPConfig: config.HTTPClientConfig{},
ProgressEnabled: conf.progressEnabled,
ProgressMetricName: conf.progressMetricName,
MigrationJobName: conf.name,
ConcurrentPush: conf.numShards,
SigSlabRead: sigSlabRead,
}
write, err := writer.New(writerConfig)
if err != nil {
t.Fatal("msg", "could not create writer", "error", err.Error())
}
Expand Down Expand Up @@ -176,12 +197,31 @@ func TestReaderWriterPlannerIntegrationWithHalt(t *testing.T) {
sigRead = make(chan *plan.Slab)
)
cont, cancelFunc := context.WithCancel(context.Background())
read, err := reader.New(cont, conf.readURL, planner, conf.concurrentPulls, sigRead)
readerConfig := reader.Config{
Context: cont,
Url: conf.readURL,
Plan: planner,
HTTPConfig: config.HTTPClientConfig{},
ConcurrentPulls: conf.concurrentPulls,
SigSlabRead: sigRead,
}
read, err := reader.New(readerConfig)
if err != nil {
t.Fatal("msg", "could not create reader", "error", err.Error())
}
read.SigSlabStop = make(chan struct{})
write, err := writer.New(cont, conf.writeURL, conf.progressMetricName, conf.name, conf.numShards, conf.progressEnabled, sigRead)

writerConfig := writer.Config{
Context: cont,
Url: conf.writeURL,
HTTPConfig: config.HTTPClientConfig{},
ProgressEnabled: conf.progressEnabled,
ProgressMetricName: conf.progressMetricName,
MigrationJobName: conf.name,
ConcurrentPush: conf.numShards,
SigSlabRead: sigRead,
}
write, err := writer.New(writerConfig)
if err != nil {
t.Fatal("msg", "could not create writer", "error", err.Error())
}
Expand Down Expand Up @@ -209,11 +249,17 @@ func TestReaderWriterPlannerIntegrationWithHalt(t *testing.T) {
sigRead = make(chan *plan.Slab)

cont, cancelFunc = context.WithCancel(context.Background())
read, err = reader.New(cont, conf.readURL, planner, conf.concurrentPulls, sigRead)

readerConfig.Context = cont
readerConfig.SigSlabRead = sigRead
read, err = reader.New(readerConfig)
if err != nil {
t.Fatal("msg", "could not create reader", "error", err.Error())
}
write, err = writer.New(cont, conf.writeURL, conf.progressMetricName, conf.name, conf.numShards, conf.progressEnabled, sigRead)

writerConfig.Context = cont
writerConfig.SigSlabRead = sigRead
write, err = writer.New(writerConfig)
if err != nil {
t.Fatal("msg", "could not create writer", "error", err.Error())
}
Expand Down Expand Up @@ -317,12 +363,32 @@ func TestReaderWriterPlannerIntegrationWithHaltWithSlabSizeOverflow(t *testing.T
sigRead = make(chan *plan.Slab)
)
cont, cancelFunc := context.WithCancel(context.Background())
read, err := reader.New(cont, conf.readURL, planner, conf.concurrentPulls, sigRead)

readerConfig := reader.Config{
Context: cont,
Url: conf.readURL,
Plan: planner,
HTTPConfig: config.HTTPClientConfig{},
ConcurrentPulls: conf.concurrentPulls,
SigSlabRead: sigRead,
}
read, err := reader.New(readerConfig)
if err != nil {
t.Fatal("msg", "could not create reader", "error", err.Error())
}
read.SigSlabStop = make(chan struct{})
write, err := writer.New(cont, conf.writeURL, conf.progressMetricName, conf.name, conf.numShards, conf.progressEnabled, sigRead)

writerConfig := writer.Config{
Context: cont,
Url: conf.writeURL,
HTTPConfig: config.HTTPClientConfig{},
ProgressEnabled: conf.progressEnabled,
ProgressMetricName: conf.progressMetricName,
MigrationJobName: conf.name,
ConcurrentPush: conf.numShards,
SigSlabRead: sigRead,
}
write, err := writer.New(writerConfig)
if err != nil {
t.Fatal("msg", "could not create writer", "error", err.Error())
}
Expand Down Expand Up @@ -350,11 +416,17 @@ func TestReaderWriterPlannerIntegrationWithHaltWithSlabSizeOverflow(t *testing.T
sigRead = make(chan *plan.Slab)

cont, cancelFunc = context.WithCancel(context.Background())
read, err = reader.New(cont, conf.readURL, planner, conf.concurrentPulls, sigRead)

readerConfig.Context = cont
readerConfig.SigSlabRead = sigRead
read, err = reader.New(readerConfig)
if err != nil {
t.Fatal("msg", "could not create reader", "error", err.Error())
}
write, err = writer.New(cont, conf.writeURL, conf.progressMetricName, conf.name, conf.numShards, conf.progressEnabled, sigRead)

writerConfig.Context = cont
writerConfig.SigSlabRead = sigRead
write, err = writer.New(writerConfig)
if err != nil {
t.Fatal("msg", "could not create writer", "error", err.Error())
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/migration-tool/planner/planner.go
Expand Up @@ -7,6 +7,7 @@ package planner
import (
"context"
"fmt"
"github.com/prometheus/common/config"
"go.uber.org/atomic"
"os"
"sync"
Expand Down Expand Up @@ -53,6 +54,7 @@ type Config struct {
JobName string
ProgressMetricURL string
ProgressMetricName string // Name for progress metric.
HTTPConfig config.HTTPClientConfig
}

// InitPlan creates an in-memory planner and initializes it. It is responsible for fetching the last pushed maxt and based on that, updates
Expand Down Expand Up @@ -104,7 +106,7 @@ func (c *Config) fetchLastPushedMaxt() (lastPushedMaxt int64, found bool, err er
if err != nil {
return -1, false, fmt.Errorf("fetch-last-pushed-maxt create promb query: %w", err)
}
readClient, err := utils.NewClient("reader-last-maxt-pushed", c.ProgressMetricURL, utils.Write, model.Duration(time.Minute*2))
readClient, err := utils.NewClient("reader-last-maxt-pushed", c.ProgressMetricURL, c.HTTPConfig, model.Duration(time.Minute*2))
if err != nil {
return -1, false, fmt.Errorf("create fetch-last-pushed-maxt reader: %w", err)
}
Expand Down
61 changes: 33 additions & 28 deletions pkg/migration-tool/reader/reader.go
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"time"

"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/timescale/promscale/pkg/log"
Expand All @@ -18,76 +19,80 @@ import (

const defaultReadTimeout = time.Minute * 5

type RemoteRead struct {
c context.Context
url string
client *utils.Client
plan *plan.Plan
concurrentPulls int
sigSlabRead chan *plan.Slab // To the writer.
SigSlabStop chan struct{}
// Config is config for reader.
type Config struct {
Context context.Context
Url string
Plan *plan.Plan
HTTPConfig config.HTTPClientConfig

ConcurrentPulls int

SigSlabRead chan *plan.Slab // To the writer.
SigSlabStop chan struct{}
}

type Read struct {
Config
client *utils.Client
}

// New creates a new RemoteRead. It creates a ReadClient that is imported from Prometheus remote storage.
// RemoteRead takes help of plan to understand how to create fetchers.
func New(c context.Context, readStorageUrl string, p *plan.Plan, numConcurrentPulls int, sigRead chan *plan.Slab) (*RemoteRead, error) {
rc, err := utils.NewClient(fmt.Sprintf("reader-%d", 1), readStorageUrl, utils.Read, model.Duration(defaultReadTimeout))
// New creates a new Read. It creates a ReadClient that is imported from Prometheus remote storage.
// Read takes help of plan to understand how to create fetchers.
func New(config Config) (*Read, error) {
rc, err := utils.NewClient(fmt.Sprintf("reader-%d", 1), config.Url, config.HTTPConfig, model.Duration(defaultReadTimeout))
if err != nil {
return nil, fmt.Errorf("creating read-client: %w", err)
}
read := &RemoteRead{
c: c,
url: readStorageUrl,
plan: p,
client: rc,
concurrentPulls: numConcurrentPulls,
sigSlabRead: sigRead,
read := &Read{
Config: config,
client: rc,
}
return read, nil
}

// Run runs the remote read and starts fetching the samples from the read storage.
func (rr *RemoteRead) Run(errChan chan<- error) {
func (rr *Read) Run(errChan chan<- error) {
var (
err error
slabRef *plan.Slab
)
go func() {
defer func() {
close(rr.sigSlabRead)
close(rr.SigSlabRead)
log.Info("msg", "reader is down")
close(errChan)
}()
log.Info("msg", "reader is up")
select {
case <-rr.c.Done():
case <-rr.Context.Done():
return
default:
}
for rr.plan.ShouldProceed() {
for rr.Plan.ShouldProceed() {
select {
case <-rr.c.Done():
case <-rr.Context.Done():
return
case <-rr.SigSlabStop:
return
default:
}
slabRef, err = rr.plan.NextSlab()
slabRef, err = rr.Plan.NextSlab()
if err != nil {
errChan <- fmt.Errorf("remote-run run: %w", err)
return
}
ms := []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".*")}
err = slabRef.Fetch(rr.c, rr.client, slabRef.Mint(), slabRef.Maxt(), ms)
err = slabRef.Fetch(rr.Context, rr.client, slabRef.Mint(), slabRef.Maxt(), ms)
if err != nil {
errChan <- fmt.Errorf("remote-run run: %w", err)
return
}
if slabRef.IsEmpty() {
rr.plan.DecrementSlabCount()
rr.Plan.DecrementSlabCount()
continue
}
rr.sigSlabRead <- slabRef
rr.SigSlabRead <- slabRef
slabRef = nil
}
}()
Expand Down

0 comments on commit 4302d6f

Please sign in to comment.