Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify cassandra error conversion #1345

Merged
merged 2 commits into from Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions common/persistence/cassandra/cassandraClusterMetadata.go
Expand Up @@ -98,7 +98,7 @@ func (m *cassandraClusterMetadata) GetClusterMetadata() (*p.InternalGetClusterMe
var version int64
err := query.Scan(&clusterMetadata, &encoding, &version)
if err != nil {
return nil, convertCommonErrors("GetClusterMetadata", err)
return nil, gocql.ConvertError("GetClusterMetadata", err)
}
return &p.InternalGetClusterMetadataResponse{
ClusterMetadata: p.NewDataBlob(clusterMetadata, encoding),
Expand Down Expand Up @@ -130,7 +130,7 @@ func (m *cassandraClusterMetadata) SaveClusterMetadata(request *p.InternalSaveCl
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
return false, convertCommonErrors("SaveClusterMetadata", err)
return false, gocql.ConvertError("SaveClusterMetadata", err)
}
if !applied {
return false, serviceerror.NewInternal("SaveClusterMetadata operation encountered concurrent write.")
Expand Down Expand Up @@ -212,7 +212,7 @@ func (m *cassandraClusterMetadata) GetClusterMembers(request *p.GetClusterMember
}

if err := iter.Close(); err != nil {
return nil, convertCommonErrors("GetClusterMembers", err)
return nil, gocql.ConvertError("GetClusterMembers", err)
}

return &p.GetClusterMembersResponse{ActiveMembers: clusterMembers, NextPageToken: pagingTokenCopy}, nil
Expand All @@ -224,7 +224,7 @@ func (m *cassandraClusterMetadata) UpsertClusterMembership(request *p.UpsertClus
err := query.Exec()

if err != nil {
return convertCommonErrors("UpsertClusterMembership", err)
return gocql.ConvertError("UpsertClusterMembership", err)
}

return nil
Expand Down
19 changes: 0 additions & 19 deletions common/persistence/cassandra/cassandraCommon.go
Expand Up @@ -26,11 +26,6 @@ package cassandra

import (
"fmt"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
)

type (
Expand Down Expand Up @@ -69,17 +64,3 @@ func newPersistedTypeMismatchError(
Msg: fmt.Sprintf("Field '%s' is of type '%T' but expected type '%T' in payload - '%v'",
fieldName, received, expectedType, payload)}
}

func convertCommonErrors(
operation string,
err error,
) error {
if gocql.IsNotFoundError(err) {
return serviceerror.NewNotFound(fmt.Sprintf("%v failed. Error: %v ", operation, err))
} else if gocql.IsTimeoutError(err) {
return &persistence.TimeoutError{Msg: fmt.Sprintf("%v timed out. Error: %v", operation, err)}
} else if gocql.IsThrottlingError(err) {
return serviceerror.NewResourceExhausted(fmt.Sprintf("%v operation failed. Error: %v", operation, err))
}
return serviceerror.NewInternal(fmt.Sprintf("%v operation failed. Error: %v", operation, err))
}
16 changes: 8 additions & 8 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Expand Up @@ -110,7 +110,7 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
query := h.session.Query(v2templateUpsertData,
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.EncodingType.String())
if err := query.Exec(); err != nil {
return convertCommonErrors("AppendHistoryNodes", err)
return gocql.ConvertError("AppendHistoryNodes", err)
}
return nil
}
Expand All @@ -124,7 +124,7 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
Info: request.Info,
})
if err != nil {
return convertCommonErrors("AppendHistoryNodes", err)
return gocql.ConvertError("AppendHistoryNodes", err)
}

batch := h.session.NewBatch(gocql.LoggedBatch)
Expand All @@ -133,7 +133,7 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
batch.Query(v2templateUpsertData,
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.EncodingType.String())
if err = h.session.ExecuteBatch(batch); err != nil {
return convertCommonErrors("AppendHistoryNodes", err)
return gocql.ConvertError("AppendHistoryNodes", err)
}
return nil
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (h *cassandraHistoryV2Persistence) ForkHistoryBranch(
query := h.session.Query(v2templateInsertTree, cqlTreeID, cqlNewBranchID, datablob.Data, datablob.EncodingType.String())
err = query.Exec()
if err != nil {
return nil, convertCommonErrors("ForkHistoryBranch", err)
return nil, gocql.ConvertError("ForkHistoryBranch", err)
}

return &p.InternalForkHistoryBranchResponse{
Expand Down Expand Up @@ -378,7 +378,7 @@ func (h *cassandraHistoryV2Persistence) DeleteHistoryBranch(

err = h.session.ExecuteBatch(batch)
if err != nil {
return convertCommonErrors("DeleteHistoryBranch", err)
return gocql.ConvertError("DeleteHistoryBranch", err)
}
return nil
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func (h *cassandraHistoryV2Persistence) GetAllHistoryTreeBranches(
for iter.Scan(&treeUUID, &branchUUID, &data, &encoding) {
hti, err := serialization.HistoryTreeInfoFromBlob(data, encoding)
if err != nil {
return nil, convertCommonErrors("GetAllHistoryTreeBranches", err)
return nil, gocql.ConvertError("GetAllHistoryTreeBranches", err)
}

branchDetail := p.HistoryBranchDetail{
Expand Down Expand Up @@ -475,7 +475,7 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(
for iter.Scan(&branchUUID, &data, &encoding) {
br, err := serialization.HistoryTreeInfoFromBlob(data, encoding)
if err != nil {
return nil, convertCommonErrors("GetHistoryTree", err)
return nil, gocql.ConvertError("GetHistoryTree", err)
}

branches = append(branches, br.BranchInfo)
Expand All @@ -486,7 +486,7 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(
}

if err := iter.Close(); err != nil {
return nil, convertCommonErrors("GetHistoryTree", err)
return nil, gocql.ConvertError("GetHistoryTree", err)
}

if len(pagingToken) == 0 {
Expand Down