Skip to content

Commit

Permalink
Fix partial namespace creation (#4267)
Browse files Browse the repository at this point in the history
* Fix partial namespace creation

* Check return result

* Fix some bugs

* Return Unavailable Error

* Check duplicate name condition

* Update id value
  • Loading branch information
samanbarghi committed May 11, 2023
1 parent 58e247b commit 3cbab42
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions common/persistence/cassandra/metadata_store.go
Expand Up @@ -125,15 +125,22 @@ func (m *MetadataStore) CreateNamespace(
ctx context.Context,
request *p.InternalCreateNamespaceRequest,
) (*p.CreateNamespaceResponse, error) {

query := m.session.Query(templateCreateNamespaceQuery, request.ID, request.Name).WithContext(ctx)
applied, err := query.MapScanCAS(make(map[string]interface{}))
existingRow := make(map[string]interface{})
applied, err := query.MapScanCAS(existingRow)
if err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf("CreateNamespace operation failed. Inserting into namespaces table. Error: %v", err))
}

if !applied {
return nil, serviceerror.NewNamespaceAlreadyExists("CreateNamespace operation failed because of uuid collision.")
// if the id with the same name exists in `namespaces_by_id`, fall through and either add a row in `namespaces` table
// or fail if name exists in that table already. This is to make sure we do not end up with a row in `namespaces_by_id`
// table and no entry in `namespaces` table
if name, ok := existingRow["name"]; !ok || name != request.Name {
return nil, serviceerror.NewNamespaceAlreadyExists("CreateNamespace operation failed because of uuid collision.")
}
}

return m.CreateNamespaceInV2Table(ctx, request)
}

Expand Down Expand Up @@ -161,23 +168,40 @@ func (m *MetadataStore) CreateNamespaceInV2Table(

previous := make(map[string]interface{})
applied, iter, err := m.session.MapExecuteBatchCAS(batch, previous)
deleteOrphanNamespace := func() {
// Delete namespace from `namespaces_by_id`
if errDelete := m.session.Query(templateDeleteNamespaceQuery, request.ID).WithContext(ctx).Exec(); errDelete != nil {
m.logger.Warn("Unable to delete orphan namespace record. Error", tag.Error(errDelete))
}
}

if err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf("CreateNamespace operation failed. Inserting into namespaces table. Error: %v", err))
}

defer func() { _ = iter.Close() }()

if !applied {
// Namespace already exist. Delete orphan namespace record before returning back to user
if errDelete := m.session.Query(templateDeleteNamespaceQuery, request.ID).WithContext(ctx).Exec(); errDelete != nil {
m.logger.Warn("Unable to delete orphan namespace record. Error", tag.Error(errDelete))
}

if id, ok := previous["Id"].([]byte); ok {
msg := fmt.Sprintf("Namespace already exists. NamespaceId: %v", primitives.UUIDString(id))
// if conditional failure is due to a duplicate name in namespaces table
if name, ok := previous["name"]; ok && name == request.Name {
existingID := request.ID
if id, ok := previous["id"]; ok && gocql.UUIDToString(id) != request.ID {
existingID = gocql.UUIDToString(id)
// Delete orphan namespace record before returning back to user
deleteOrphanNamespace()
}

msg := fmt.Sprintf("Namespace already exists. NamespaceId: %v", existingID)
return nil, serviceerror.NewNamespaceAlreadyExists(msg)

}

return nil, serviceerror.NewNamespaceAlreadyExists("CreateNamespace operation failed because of conditional failure.")
// If namespace does not exist already and applied is false,
// notification_version does not match our expectations and it's conditional failure.
// Delete orphan namespace record before returning back to user
deleteOrphanNamespace()
return nil, serviceerror.NewUnavailable("CreateNamespace operation failed because of conditional failure.")
}

return &p.CreateNamespaceResponse{ID: request.ID}, nil
Expand Down

0 comments on commit 3cbab42

Please sign in to comment.