Skip to content

Commit

Permalink
Merge pull request #1252 from tigrisdata/main
Browse files Browse the repository at this point in the history
Release Beta
  • Loading branch information
himank committed Jun 1, 2023
2 parents 159c468 + 2db65af commit c59f645
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 12 deletions.
35 changes: 31 additions & 4 deletions server/metadata/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,29 +528,45 @@ func (m *TenantManager) DecodeTableName(tableName []byte) (*Database, string, bo
// As this is an expensive call, the reloading happens only during the start of the server. It is possible that reloading
// fails during start time then we rely on each transaction to detect it and trigger reload. The consistency shouldn’t
// be impacted if we fail to load the in-memory view.
func (m *TenantManager) Reload(ctx context.Context, tx transaction.Tx) error {
func (m *TenantManager) Reload(ctx context.Context) error {
log.Debug().Msg("reloading tenants")
m.Lock()
defer m.Unlock()

tx, err := m.txMgr.StartTx(ctx)
if err != nil {
log.Err(err).Msg("starting a transaction failed")
return err
}
currentVersion, err := m.versionH.Read(ctx, tx, false)
if ulog.E(err) {
_ = tx.Rollback(ctx)
return err
}
ulog.E(tx.Commit(ctx))

if err = m.reload(ctx, tx, currentVersion); ulog.E(err) {
if err = m.reload(ctx, currentVersion); ulog.E(err) {
return err
}

m.version = currentVersion
log.Debug().Msgf("latest meta version %v", m.version)
return err
}

func (m *TenantManager) reload(ctx context.Context, tx transaction.Tx, currentVersion Version) error {
func (m *TenantManager) reload(ctx context.Context, currentVersion Version) error {
tx, err := m.txMgr.StartTx(ctx)
if err != nil {
log.Err(err).Msg("starting a transaction failed")
return err
}
namespaces, err := m.metaStore.GetNamespaces(ctx, tx)
if err != nil {
_ = tx.Rollback(ctx)
return err
}
ulog.E(tx.Commit(ctx))

log.Debug().Interface("ns", namespaces).Msg("existing reserved namespaces")

for namespace, metadata := range namespaces {
Expand All @@ -562,10 +578,21 @@ func (m *TenantManager) reload(ctx context.Context, tx transaction.Tx, currentVe

for _, tenant := range m.tenants {
log.Debug().Interface("tenant", tenant.String()).Msg("reloading tenant")
tx, err := m.txMgr.StartTx(ctx)
if err != nil {
log.Err(err).Msgf("starting a transaction failed '%s'", tenant.name)
return err
}
tenant.Lock()
err := tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot)
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot)
tenant.Unlock()
if err != nil {
log.Err(err).Msgf("reloading a tenant failed '%s'", tenant.name)
_ = tx.Rollback(ctx)
return err
}
if err = tx.Commit(ctx); err != nil {
log.Err(err).Msgf("committing a reloading of tenant failed '%s'", tenant.name)
return err
}
}
Expand Down
18 changes: 10 additions & 8 deletions server/services/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net/http"
"time"

"github.com/fullstorydev/grpchan/inprocgrpc"
"github.com/go-chi/chi/v5"
Expand Down Expand Up @@ -77,17 +78,18 @@ func newApiService(kv kv.TxStore, searchStore search.Store, tenantMgr *metadata.
authProvider: authProvider,
}

var err error
ctx := context.TODO()
tx, err := u.txMgr.StartTx(ctx)
if ulog.E(err) {
log.Fatal().Err(err).Msgf("error starting server: starting transaction failed")
for i := 0; i < 3; i++ {
if err = tenantMgr.Reload(ctx); ulog.E(err) {
time.Sleep(1 * time.Second)
continue
}
break
}

if err := tenantMgr.Reload(ctx, tx); ulog.E(err) {
// ToDo: no need to panic, probably handle through async thread.
log.Fatal().Err(err).Msgf("error starting server: reloading tenants failed")
if err != nil {
log.Fatal().Err(err).Msgf("error starting server: reloading tenants failed attempted three times")
}
ulog.E(tx.Commit(ctx))

var txListeners []database.TxListener
if config.DefaultConfig.Cdc.Enabled {
Expand Down

0 comments on commit c59f645

Please sign in to comment.