Skip to content

Commit

Permalink
Persistence context part 8: cassandra store (#2712)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Apr 13, 2022
1 parent 2e55dd3 commit 1686461
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 279 deletions.
37 changes: 23 additions & 14 deletions common/persistence/cassandra/cluster_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ func NewClusterMetadataStore(
}

func (m *ClusterMetadataStore) ListClusterMetadata(
_ context.Context,
ctx context.Context,
request *p.InternalListClusterMetadataRequest,
) (*p.InternalListClusterMetadataResponse, error) {
query := m.session.Query(templateListClusterMetadata, constMetadataPartition)
query := m.session.Query(templateListClusterMetadata, constMetadataPartition).WithContext(ctx)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()

response := &p.InternalListClusterMetadataResponse{}
Expand Down Expand Up @@ -124,15 +124,15 @@ func (m *ClusterMetadataStore) ListClusterMetadata(
}

func (m *ClusterMetadataStore) GetClusterMetadata(
_ context.Context,
ctx context.Context,
request *p.InternalGetClusterMetadataRequest,
) (*p.InternalGetClusterMetadataResponse, error) {

var clusterMetadata []byte
var encoding string
var version int64

query := m.session.Query(templateGetClusterMetadata, constMetadataPartition, request.ClusterName)
query := m.session.Query(templateGetClusterMetadata, constMetadataPartition, request.ClusterName).WithContext(ctx)
err := query.Scan(&clusterMetadata, &encoding, &version)
if err != nil {
return nil, gocql.ConvertError("GetClusterMetadata", err)
Expand All @@ -145,7 +145,7 @@ func (m *ClusterMetadataStore) GetClusterMetadata(
}

func (m *ClusterMetadataStore) SaveClusterMetadata(
_ context.Context,
ctx context.Context,
request *p.InternalSaveClusterMetadataRequest,
) (bool, error) {
var query gocql.Query
Expand All @@ -157,7 +157,7 @@ func (m *ClusterMetadataStore) SaveClusterMetadata(
request.ClusterMetadata.Data,
request.ClusterMetadata.EncodingType.String(),
1,
)
).WithContext(ctx)
} else {
query = m.session.Query(
templateUpdateClusterMetadata,
Expand All @@ -167,7 +167,7 @@ func (m *ClusterMetadataStore) SaveClusterMetadata(
constMetadataPartition,
request.ClusterName,
request.Version,
)
).WithContext(ctx)
}

previous := make(map[string]interface{})
Expand All @@ -182,18 +182,18 @@ func (m *ClusterMetadataStore) SaveClusterMetadata(
}

func (m *ClusterMetadataStore) DeleteClusterMetadata(
_ context.Context,
ctx context.Context,
request *p.InternalDeleteClusterMetadataRequest,
) error {
query := m.session.Query(templateDeleteClusterMetadata, constMetadataPartition, request.ClusterName)
query := m.session.Query(templateDeleteClusterMetadata, constMetadataPartition, request.ClusterName).WithContext(ctx)
if err := query.Exec(); err != nil {
return gocql.ConvertError("DeleteClusterMetadata", err)
}
return nil
}

func (m *ClusterMetadataStore) GetClusterMembers(
_ context.Context,
ctx context.Context,
request *p.GetClusterMembersRequest,
) (*p.GetClusterMembersResponse, error) {
var queryString strings.Builder
Expand Down Expand Up @@ -228,7 +228,7 @@ func (m *ClusterMetadataStore) GetClusterMembers(
}

queryString.WriteString(templateAllowFiltering)
query := m.session.Query(queryString.String(), operands...)
query := m.session.Query(queryString.String(), operands...).WithContext(ctx)

iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()

Expand Down Expand Up @@ -270,11 +270,20 @@ func (m *ClusterMetadataStore) GetClusterMembers(
}

func (m *ClusterMetadataStore) UpsertClusterMembership(
_ context.Context,
ctx context.Context,
request *p.UpsertClusterMembershipRequest,
) error {
query := m.session.Query(templateUpsertActiveClusterMembership, constMembershipPartition, []byte(request.HostID),
request.RPCAddress, request.RPCPort, request.Role, request.SessionStart, time.Now().UTC(), int64(request.RecordExpiry.Seconds()))
query := m.session.Query(
templateUpsertActiveClusterMembership,
constMembershipPartition,
[]byte(request.HostID),
request.RPCAddress,
request.RPCPort,
request.Role,
request.SessionStart,
time.Now().UTC(),
int64(request.RecordExpiry.Seconds()),
).WithContext(ctx)
err := query.Exec()

if err != nil {
Expand Down
45 changes: 15 additions & 30 deletions common/persistence/cassandra/history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ package cassandra
import (
"context"
"fmt"
"sort"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
Expand Down Expand Up @@ -90,7 +88,7 @@ func NewHistoryStore(
// AppendHistoryNodes upsert a batch of events as a single node to a history branch
// Note that it's not allowed to append above the branch's ancestors' nodes, which means nodeID >= ForkNodeID
func (h *HistoryStore) AppendHistoryNodes(
_ context.Context,
ctx context.Context,
request *p.InternalAppendHistoryNodesRequest,
) error {
branchInfo := request.BranchInfo
Expand All @@ -105,15 +103,15 @@ func (h *HistoryStore) AppendHistoryNodes(
node.TransactionID,
node.Events.Data,
node.Events.EncodingType.String(),
)
).WithContext(ctx)
if err := query.Exec(); err != nil {
return gocql.ConvertError("AppendHistoryNodes", err)
}
return nil
}

treeInfoDataBlob := request.TreeInfo
batch := h.Session.NewBatch(gocql.LoggedBatch)
batch := h.Session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
batch.Query(v2templateInsertTree,
branchInfo.TreeId,
branchInfo.BranchId,
Expand All @@ -137,7 +135,7 @@ func (h *HistoryStore) AppendHistoryNodes(

// DeleteHistoryNodes delete a history node
func (h *HistoryStore) DeleteHistoryNodes(
_ context.Context,
ctx context.Context,
request *p.InternalDeleteHistoryNodesRequest,
) error {
branchInfo := request.BranchInfo
Expand All @@ -157,7 +155,7 @@ func (h *HistoryStore) DeleteHistoryNodes(
branchID,
nodeID,
txnID,
)
).WithContext(ctx)
if err := query.Exec(); err != nil {
return gocql.ConvertError("DeleteHistoryNodes", err)
}
Expand All @@ -167,7 +165,7 @@ func (h *HistoryStore) DeleteHistoryNodes(
// ReadHistoryBranch returns history node data for a branch
// NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator
func (h *HistoryStore) ReadHistoryBranch(
_ context.Context,
ctx context.Context,
request *p.InternalReadHistoryBranchRequest,
) (*p.InternalReadHistoryBranchResponse, error) {
treeID, err := primitives.ValidateUUID(request.TreeID)
Expand All @@ -189,7 +187,7 @@ func (h *HistoryStore) ReadHistoryBranch(
queryString = v2templateReadHistoryNode
}

query := h.Session.Query(queryString, treeID, branchID, request.MinNodeID, request.MaxNodeID)
query := h.Session.Query(queryString, treeID, branchID, request.MinNodeID, request.MaxNodeID).WithContext(ctx)

iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
var pagingToken []byte
Expand Down Expand Up @@ -259,7 +257,7 @@ func (h *HistoryStore) ReadHistoryBranch(
// 8[8,9]
//
func (h *HistoryStore) ForkHistoryBranch(
_ context.Context,
ctx context.Context,
request *p.InternalForkHistoryBranchRequest,
) error {

Expand All @@ -275,7 +273,7 @@ func (h *HistoryStore) ForkHistoryBranch(
if err != nil {
return serviceerror.NewInternal(fmt.Sprintf("ForkHistoryBranch - Gocql NewBranchID UUID cast failed. Error: %v", err))
}
query := h.Session.Query(v2templateInsertTree, cqlTreeID, cqlNewBranchID, datablob.Data, datablob.EncodingType.String())
query := h.Session.Query(v2templateInsertTree, cqlTreeID, cqlNewBranchID, datablob.Data, datablob.EncodingType.String()).WithContext(ctx)
err = query.Exec()
if err != nil {
return gocql.ConvertError("ForkHistoryBranch", err)
Expand All @@ -286,10 +284,10 @@ func (h *HistoryStore) ForkHistoryBranch(

// DeleteHistoryBranch removes a branch
func (h *HistoryStore) DeleteHistoryBranch(
_ context.Context,
ctx context.Context,
request *p.InternalDeleteHistoryBranchRequest,
) error {
batch := h.Session.NewBatch(gocql.LoggedBatch)
batch := h.Session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
batch.Query(v2templateDeleteBranch, request.TreeId, request.BranchId)

// delete each branch range
Expand Down Expand Up @@ -318,11 +316,11 @@ func (h *HistoryStore) deleteBranchRangeNodes(
}

func (h *HistoryStore) GetAllHistoryTreeBranches(
_ context.Context,
ctx context.Context,
request *p.GetAllHistoryTreeBranchesRequest,
) (*p.InternalGetAllHistoryTreeBranchesResponse, error) {

query := h.Session.Query(v2templateScanAllTreeBranches)
query := h.Session.Query(v2templateScanAllTreeBranches).WithContext(ctx)

iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()

Expand Down Expand Up @@ -366,15 +364,15 @@ func (h *HistoryStore) GetAllHistoryTreeBranches(

// GetHistoryTree returns all branch information of a tree
func (h *HistoryStore) GetHistoryTree(
_ context.Context,
ctx context.Context,
request *p.GetHistoryTreeRequest,
) (*p.InternalGetHistoryTreeResponse, error) {

treeID, err := primitives.ValidateUUID(request.TreeID)
if err != nil {
return nil, serviceerror.NewInternal(fmt.Sprintf("ReadHistoryBranch. Gocql TreeId UUID cast failed. Error: %v", err))
}
query := h.Session.Query(v2templateReadAllBranches, treeID)
query := h.Session.Query(v2templateReadAllBranches, treeID).WithContext(ctx)

pageSize := 100
var pagingToken []byte
Expand Down Expand Up @@ -410,19 +408,6 @@ func (h *HistoryStore) GetHistoryTree(
}, nil
}

func (h *HistoryStore) sortAncestors(
ans []*persistencespb.HistoryBranchRange,
) {
if len(ans) > 0 {
// sort ans based onf EndNodeID so that we can set BeginNodeID
sort.Slice(ans, func(i, j int) bool { return (ans)[i].GetEndNodeId() < (ans)[j].GetEndNodeId() })
(ans)[0].BeginNodeId = int64(1)
for i := 1; i < len(ans); i++ {
(ans)[i].BeginNodeId = (ans)[i-1].GetEndNodeId()
}
}
}

func convertHistoryNode(
message map[string]interface{},
) p.InternalHistoryNode {
Expand Down

0 comments on commit 1686461

Please sign in to comment.