Skip to content

Commit

Permalink
[dbnode] Fix overzealous log message for missing schema with non-prot…
Browse files Browse the repository at this point in the history
…obuf namespace (#2013)

* Fix namespace update log message when missing schema for non-protobuf namespace
  • Loading branch information
robskillington authored and notbdu committed Oct 25, 2019
1 parent b39a79f commit 9baa7da
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 15 deletions.
14 changes: 13 additions & 1 deletion src/dbnode/namespace/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ import (
"go.uber.org/zap"
)

func newSchemaHistoryNotFoundError(nsIDStr string) error {
return &schemaHistoryNotFoundError{nsIDStr}
}

type schemaHistoryNotFoundError struct {
nsIDStr string
}

func (s *schemaHistoryNotFoundError) Error() string {
return fmt.Sprintf("schema history is not found for %s", s.nsIDStr)
}

type schemaRegistry struct {
sync.RWMutex

Expand Down Expand Up @@ -125,7 +137,7 @@ func (sr *schemaRegistry) getSchemaHistory(nsIDStr string) (SchemaHistory, error

history, ok := sr.registry[nsIDStr]
if !ok {
return nil, fmt.Errorf("schema history is not found for %v", nsIDStr)
return nil, newSchemaHistoryNotFoundError(nsIDStr)
}
return history.Get().(SchemaHistory), nil
}
Expand Down
48 changes: 34 additions & 14 deletions src/dbnode/namespace/schema_registry_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,42 +31,62 @@ import (
// UpdateSchemaRegistry updates schema registry with namespace updates.
func UpdateSchemaRegistry(newNamespaces Map, schemaReg SchemaRegistry, log *zap.Logger) error {
schemaUpdates := newNamespaces.Metadatas()
merr := xerrors.NewMultiError()
multiErr := xerrors.NewMultiError()
for _, metadata := range schemaUpdates {
curSchemaID := "none"
var (
curSchemaID = "none"
curSchemaNone = true
)
curSchema, err := schemaReg.GetLatestSchema(metadata.ID())
if err != nil {
// NB(bodu): Schema history not found is a valid error as this occurs on initial bootstrap for the db.
if _, ok := err.(*schemaHistoryNotFoundError); !ok {
multiErr = multiErr.Add(fmt.Errorf("cannot get latest namespace schema: %v", err))
continue
}
}

if curSchema != nil {
curSchemaNone = false
curSchemaID = curSchema.DeployId()
if len(curSchemaID) == 0 {
log.Warn("can not update namespace schema with empty deploy ID", zap.Stringer("namespace", metadata.ID()),
msg := "namespace schema update invalid with empty deploy ID"
log.Warn(msg, zap.Stringer("namespace", metadata.ID()),
zap.String("currentSchemaID", curSchemaID))
merr = merr.Add(fmt.Errorf("can not update namespace(%v) schema with empty deploy ID", metadata.ID().String()))
multiErr = multiErr.Add(fmt.Errorf("%s: namespace=%s", msg, metadata.ID().String()))
continue
}
}

// Log schema update.
latestSchema, found := metadata.Options().SchemaHistory().GetLatest()
if !found {
log.Warn("can not update namespace schema to empty", zap.Stringer("namespace", metadata.ID()),
zap.String("currentSchema", curSchemaID))
merr = merr.Add(fmt.Errorf("can not update namespace(%v) schema to empty", metadata.ID().String()))
if !curSchemaNone {
// NB(r): Only interpret this as a warning/error if already had a schema,
// otherwise this is just a namespace that is not using protobuf schemas.
msg := "namespace schema not found on update"
log.Warn(msg, zap.Stringer("namespace", metadata.ID()),
zap.String("currentSchema", curSchemaID))
multiErr = multiErr.Add(fmt.Errorf("%s: namespace=%s", msg, metadata.ID().String()))
}
continue
}

log.Info("updating database namespace schema", zap.Stringer("namespace", metadata.ID()),
zap.String("currentSchema", curSchemaID), zap.String("latestSchema", latestSchema.DeployId()))

err = schemaReg.SetSchemaHistory(metadata.ID(), metadata.Options().SchemaHistory())
if err != nil {
log.Warn("failed to update latest schema for namespace",
msg := "namespace schema failed to update to latest schema"
log.Warn(msg,
zap.Stringer("namespace", metadata.ID()),
zap.Error(err))
merr = merr.Add(fmt.Errorf("failed to update latest schema for namespace %v, error: %v",
metadata.ID().String(), err))
multiErr = multiErr.Add(fmt.Errorf("%s: namespace=%s, error=%v",
msg, metadata.ID().String(), err))
}
}
if merr.Empty() {
return nil
if !multiErr.Empty() {
return multiErr
}
return merr
return nil
}

0 comments on commit 9baa7da

Please sign in to comment.