Skip to content

Commit

Permalink
Bugfix/installation id not set and collector posting data is dropped (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder committed Sep 15, 2020
1 parent f387784 commit a48d513
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 77 deletions.
11 changes: 0 additions & 11 deletions api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,17 +249,6 @@ func (c *Controller) SetupLakeFSHandler() setupop.SetupLakeFSHandler {
})
}

// write metadata
metadata, err := c.deps.Meta.Write()
if err != nil {
return setupop.NewSetupLakeFSDefault(http.StatusInternalServerError).
WithPayload(&models.Error{
Message: err.Error(),
})
}

c.deps.Collector.SetInstallationID(metadata["installation_id"])
c.deps.Collector.CollectMetadata(metadata)
c.deps.Collector.CollectEvent("global", "init")

// setup admin user
Expand Down
2 changes: 0 additions & 2 deletions api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ func createDefaultAdminUser(authService auth.Service, t *testing.T) *authmodel.C

type mockCollector struct{}

func (m *mockCollector) SetInstallationID(_ string) {}

func (m *mockCollector) CollectMetadata(_ map[string]string) {}

func (m *mockCollector) CollectEvent(_, _ string) {}
Expand Down
56 changes: 30 additions & 26 deletions auth/metadata.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package auth

import (
"errors"
"runtime"
"time"

Expand All @@ -12,7 +11,6 @@ import (
)

type MetadataManager interface {
InstallationID() (string, error)
SetupTimestamp() (time.Time, error)
UpdateSetupTimestamp(time.Time) error
Write() (map[string]string, error)
Expand All @@ -35,6 +33,23 @@ func NewDBMetadataManager(version string, database db.Database) *DBMetadataManag
}
}

func insertOrGetInstallationID(tx db.Tx) (string, error) {
newInstallationID := uuid.New().String()
res, err := tx.Exec(`INSERT INTO auth_installation_metadata (key_name, key_value)
VALUES ($1,$2)
ON CONFLICT DO NOTHING`,
InstallationIDKeyName, newInstallationID)
if err != nil {
return "", err
}
if affected, err := res.RowsAffected(); err != nil {
return "", err
} else if affected == 1 {
return newInstallationID, nil
}
return getInstallationID(tx)
}

func getInstallationID(tx db.Tx) (string, error) {
var installationID string
err := tx.Get(&installationID, `SELECT key_value FROM auth_installation_metadata WHERE key_name = $1`,
Expand Down Expand Up @@ -66,16 +81,6 @@ func writeMetadata(tx sqlx.Execer, items map[string]string) error {
return nil
}

func (d *DBMetadataManager) InstallationID() (string, error) {
installationID, err := d.db.Transact(func(tx db.Tx) (interface{}, error) {
return getInstallationID(tx)
}, db.WithLogger(logging.Dummy()), db.ReadOnly())
if err != nil {
return "", err
}
return installationID.(string), nil
}

func (d *DBMetadataManager) UpdateSetupTimestamp(ts time.Time) error {
_, err := d.db.Transact(func(tx db.Tx) (interface{}, error) {
return nil, writeMetadata(tx, map[string]string{
Expand Down Expand Up @@ -107,25 +112,24 @@ func (d *DBMetadataManager) Write() (map[string]string, error) {
metadata[k] = v
}
}

// see if we have existing metadata or we need to generate one
_, err = d.db.Transact(func(tx db.Tx) (interface{}, error) {
// get installation ID - if we don't have one we'll generate one
_, err := getInstallationID(tx)
if err != nil && !errors.Is(err, db.ErrNotFound) {
// write metadata
err = writeMetadata(tx, metadata)
if err != nil {
return nil, err
}

if err != nil { // i.e. err is db.ErrNotFound
// we don't have an installation ID - let's write one.
installationID := uuid.Must(uuid.NewUUID()).String()
metadata["installation_id"] = installationID
metadata["setup_time"] = time.Now().UTC().Format(time.RFC3339)
// write installation id
installationID, err := insertOrGetInstallationID(tx)
if err == nil {
metadata[InstallationIDKeyName] = installationID
}

err = writeMetadata(tx, metadata)
return nil, err
// get setup timestamp
setupTS, err := getSetupTimestamp(tx)
if err == nil {
metadata[SetupTimestampKeyName] = setupTS.UTC().Format(time.RFC3339)
}
return nil, nil
}, db.WithLogger(logging.Dummy()))

return metadata, err
}
2 changes: 1 addition & 1 deletion cmd/lakefs/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var initCmd = &cobra.Command{

ctx, cancelFn := context.WithCancel(context.Background())
processID, bufferedCollectorArgs := cfg.GetStatsBufferedCollectorArgs()
stats := stats.NewBufferedCollector(metadata["installation_id"], processID, bufferedCollectorArgs...)
stats := stats.NewBufferedCollector(metadata[auth.InstallationIDKeyName], processID, bufferedCollectorArgs...)
go stats.Run(ctx)
stats.CollectMetadata(metadata)
stats.CollectEvent("global", "init")
Expand Down
30 changes: 10 additions & 20 deletions cmd/lakefs/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -78,12 +77,17 @@ var runCmd = &cobra.Command{

meta := auth.NewDBMetadataManager(config.Version, dbPool)

installationID, err := meta.InstallationID()
processID, bufferedCollectorArgs := cfg.GetStatsBufferedCollectorArgs()

// collect and write metadata
metadata, err := meta.Write()
if err != nil {
installationID = "" // no installation ID is available
logger.WithError(err).Debug("failed to collect account metadata")
}
processID, bufferedCollectorArgs := cfg.GetStatsBufferedCollectorArgs()
stats := stats.NewBufferedCollector(installationID, processID, bufferedCollectorArgs...)

stats := stats.NewBufferedCollector(metadata[auth.InstallationIDKeyName], processID, bufferedCollectorArgs...)
// send metadata
stats.CollectMetadata(metadata)

dedupCleaner := dedup.NewCleaner(blockStore, cataloger.DedupReportChannel())
defer func() {
Expand Down Expand Up @@ -122,22 +126,8 @@ var runCmd = &cobra.Command{

ctx, cancelFn := context.WithCancel(context.Background())
go stats.Run(ctx)
stats.CollectEvent("global", "run")

// stagger a bit and update metadata
go func() {
// avoid a thundering herd in case we have many lakeFS instances starting together
const maxSplay = 10 * time.Second
randSource := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec
time.Sleep(time.Duration(randSource.Intn(int(maxSplay))))

metadata, err := meta.Write()
if err != nil {
logger.WithError(err).Trace("failed to collect account metadata")
return
}
stats.CollectMetadata(metadata)
}()
stats.CollectEvent("global", "run")

logging.Default().WithField("listen_address", cfg.GetListenAddress()).Info("starting HTTP server")
server := &http.Server{
Expand Down
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,11 @@ func (c *Config) GetStatsFlushInterval() time.Duration {
}

func (c *Config) GetStatsBufferedCollectorArgs() (processID string, opts []stats.BufferedCollectorOpts) {
sender := stats.NewDummySender()
var sender stats.Sender
if c.GetStatsEnabled() && Version != UnreleasedVersion {
sender = stats.NewHTTPSender(c.GetStatsAddress(), time.Now)
} else {
sender = stats.NewDummySender()
}
return uuid.Must(uuid.NewUUID()).String(),
[]stats.BufferedCollectorOpts{
Expand Down
2 changes: 0 additions & 2 deletions gateway/playback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ func TestMain(m *testing.M) {
os.Exit(code)
}

func (m *mockCollector) SetInstallationID(installationID string) {}

func (m *mockCollector) CollectMetadata(accountMetadata map[string]string) {}

func (m *mockCollector) CollectEvent(class, action string) {}
Expand Down
2 changes: 0 additions & 2 deletions loadtest/local_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func TestMain(m *testing.M) {

type mockCollector struct{}

func (m *mockCollector) SetInstallationID(_ string) {}

func (m *mockCollector) CollectMetadata(_ map[string]string) {}

func (m *mockCollector) CollectEvent(_, _ string) {}
Expand Down
12 changes: 0 additions & 12 deletions stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package stats
import (
"context"
"fmt"
"sync"
"time"

"github.com/treeverse/lakefs/logging"
Expand All @@ -16,7 +15,6 @@ const (
)

type Collector interface {
SetInstallationID(installationID string)
CollectEvent(class, action string)
CollectMetadata(accountMetadata map[string]string)
}
Expand Down Expand Up @@ -79,7 +77,6 @@ type BufferedCollector struct {
sendTimeout time.Duration
flushTicker FlushTicker
done chan bool
mutex *sync.RWMutex
installationID string
processID string
}
Expand Down Expand Up @@ -125,7 +122,6 @@ func NewBufferedCollector(installationID, processID string, opts ...BufferedColl
sendTimeout: DefaultSendTimeout,
flushTicker: &TimeTicker{ticker: time.NewTicker(DefaultFlushInterval)},
installationID: installationID,
mutex: &sync.RWMutex{},
processID: processID,
}

Expand All @@ -136,8 +132,6 @@ func NewBufferedCollector(installationID, processID string, opts ...BufferedColl
return s
}
func (s *BufferedCollector) getInstallationID() string {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.installationID
}

Expand Down Expand Up @@ -207,12 +201,6 @@ func makeMetrics(counters keyIndex) []Metric {
return metrics
}

func (s *BufferedCollector) SetInstallationID(installationID string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.installationID = installationID
}

func (s *BufferedCollector) CollectMetadata(accountMetadata map[string]string) {
entries := make([]MetadataEntry, len(accountMetadata))
i := 0
Expand Down

0 comments on commit a48d513

Please sign in to comment.