Skip to content

Commit

Permalink
Persistence context Part 7: SQL store implementation (#2711)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Apr 13, 2022
1 parent 49a9ebd commit 2e55dd3
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 272 deletions.
31 changes: 9 additions & 22 deletions common/persistence/sql/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,9 @@ type sqlClusterMetadataManager struct {
var _ p.ClusterMetadataStore = (*sqlClusterMetadataManager)(nil)

func (s *sqlClusterMetadataManager) ListClusterMetadata(
_ context.Context,
ctx context.Context,
request *p.InternalListClusterMetadataRequest,
) (*p.InternalListClusterMetadataResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
var clusterName string
if request.NextPageToken != nil {
err := gobDeserialize(request.NextPageToken, &clusterName)
Expand Down Expand Up @@ -90,11 +88,9 @@ func (s *sqlClusterMetadataManager) ListClusterMetadata(
}

func (s *sqlClusterMetadataManager) GetClusterMetadata(
_ context.Context,
ctx context.Context,
request *p.InternalGetClusterMetadataRequest,
) (*p.InternalGetClusterMetadataResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
row, err := s.Db.GetClusterMetadata(ctx, &sqlplugin.ClusterMetadataFilter{ClusterName: request.ClusterName})

if err != nil {
Expand All @@ -108,11 +104,9 @@ func (s *sqlClusterMetadataManager) GetClusterMetadata(
}

func (s *sqlClusterMetadataManager) SaveClusterMetadata(
_ context.Context,
ctx context.Context,
request *p.InternalSaveClusterMetadataRequest,
) (bool, error) {
ctx, cancel := newExecutionContext()
defer cancel()
err := s.txExecute(ctx, "SaveClusterMetadata", func(tx sqlplugin.Tx) error {
oldClusterMetadata, err := tx.WriteLockGetClusterMetadata(
ctx,
Expand Down Expand Up @@ -148,11 +142,9 @@ func (s *sqlClusterMetadataManager) SaveClusterMetadata(
}

func (s *sqlClusterMetadataManager) DeleteClusterMetadata(
_ context.Context,
ctx context.Context,
request *p.InternalDeleteClusterMetadataRequest,
) error {
ctx, cancel := newExecutionContext()
defer cancel()
_, err := s.Db.DeleteClusterMetadata(ctx, &sqlplugin.ClusterMetadataFilter{ClusterName: request.ClusterName})

if err != nil {
Expand All @@ -162,11 +154,9 @@ func (s *sqlClusterMetadataManager) DeleteClusterMetadata(
}

func (s *sqlClusterMetadataManager) GetClusterMembers(
_ context.Context,
ctx context.Context,
request *p.GetClusterMembersRequest,
) (*p.GetClusterMembersResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
var lastSeenHostId []byte
if len(request.NextPageToken) == 16 {
lastSeenHostId = request.NextPageToken
Expand Down Expand Up @@ -224,11 +214,9 @@ func (s *sqlClusterMetadataManager) GetClusterMembers(
}

func (s *sqlClusterMetadataManager) UpsertClusterMembership(
_ context.Context,
ctx context.Context,
request *p.UpsertClusterMembershipRequest,
) error {
ctx, cancel := newExecutionContext()
defer cancel()
now := time.Now().UTC()
recordExpiry := now.Add(request.RecordExpiry)
_, err := s.Db.UpsertClusterMembership(ctx, &sqlplugin.ClusterMembershipRow{
Expand All @@ -248,16 +236,15 @@ func (s *sqlClusterMetadataManager) UpsertClusterMembership(
}

func (s *sqlClusterMetadataManager) PruneClusterMembership(
_ context.Context,
ctx context.Context,
request *p.PruneClusterMembershipRequest,
) error {
ctx, cancel := newExecutionContext()
defer cancel()
_, err := s.Db.PruneClusterMembership(
ctx,
&sqlplugin.PruneClusterMembershipFilter{
PruneRecordsBefore: time.Now().UTC(),
})
},
)

if err != nil {
return convertCommonErrors("PruneClusterMembership", err)
Expand Down
35 changes: 8 additions & 27 deletions common/persistence/sql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,9 @@ func (m *sqlExecutionStore) txExecuteShardLocked(
}

func (m *sqlExecutionStore) CreateWorkflowExecution(
_ context.Context,
ctx context.Context,
request *p.InternalCreateWorkflowExecutionRequest,
) (response *p.InternalCreateWorkflowExecutionResponse, err error) {
ctx, cancel := newExecutionContext()
defer cancel()

for _, req := range request.NewWorkflowNewEvents {
if err := m.AppendHistoryNodes(ctx, req); err != nil {
return nil, err
Expand Down Expand Up @@ -226,11 +223,9 @@ func (m *sqlExecutionStore) createWorkflowExecutionTx(
}

func (m *sqlExecutionStore) GetWorkflowExecution(
_ context.Context,
ctx context.Context,
request *p.GetWorkflowExecutionRequest,
) (*p.InternalGetWorkflowExecutionResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
namespaceID := primitives.MustParseUUID(request.NamespaceID)
workflowID := request.WorkflowID
runID := primitives.MustParseUUID(request.RunID)
Expand Down Expand Up @@ -341,12 +336,9 @@ func (m *sqlExecutionStore) GetWorkflowExecution(
}

func (m *sqlExecutionStore) UpdateWorkflowExecution(
_ context.Context,
ctx context.Context,
request *p.InternalUpdateWorkflowExecutionRequest,
) error {
ctx, cancel := newExecutionContext()
defer cancel()

// first append history
for _, req := range request.UpdateWorkflowNewEvents {
if err := m.AppendHistoryNodes(ctx, req); err != nil {
Expand Down Expand Up @@ -463,12 +455,9 @@ func (m *sqlExecutionStore) updateWorkflowExecutionTx(
}

func (m *sqlExecutionStore) ConflictResolveWorkflowExecution(
_ context.Context,
ctx context.Context,
request *p.InternalConflictResolveWorkflowExecutionRequest,
) error {
ctx, cancel := newExecutionContext()
defer cancel()

// first append history
for _, req := range request.CurrentWorkflowEventsNewEvents {
if err := m.AppendHistoryNodes(ctx, req); err != nil {
Expand Down Expand Up @@ -606,11 +595,9 @@ func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(
}

func (m *sqlExecutionStore) DeleteWorkflowExecution(
_ context.Context,
ctx context.Context,
request *p.DeleteWorkflowExecutionRequest,
) error {
ctx, cancel := newExecutionContext()
defer cancel()
namespaceID := primitives.MustParseUUID(request.NamespaceID)
runID := primitives.MustParseUUID(request.RunID)
_, err := m.Db.DeleteFromExecutions(ctx, sqlplugin.ExecutionsFilter{
Expand All @@ -627,11 +614,9 @@ func (m *sqlExecutionStore) DeleteWorkflowExecution(
// runID. The following code will delete the row from current_executions if and only if the runID is
// same as the one we are trying to delete here
func (m *sqlExecutionStore) DeleteCurrentWorkflowExecution(
_ context.Context,
ctx context.Context,
request *p.DeleteCurrentWorkflowExecutionRequest,
) error {
ctx, cancel := newExecutionContext()
defer cancel()
namespaceID := primitives.MustParseUUID(request.NamespaceID)
runID := primitives.MustParseUUID(request.RunID)
_, err := m.Db.DeleteFromCurrentExecutions(ctx, sqlplugin.CurrentExecutionsFilter{
Expand All @@ -644,11 +629,9 @@ func (m *sqlExecutionStore) DeleteCurrentWorkflowExecution(
}

func (m *sqlExecutionStore) GetCurrentExecution(
_ context.Context,
ctx context.Context,
request *p.GetCurrentExecutionRequest,
) (*p.InternalGetCurrentExecutionResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
row, err := m.Db.SelectFromCurrentExecutions(ctx, sqlplugin.CurrentExecutionsFilter{
ShardID: request.ShardID,
NamespaceID: primitives.MustParseUUID(request.NamespaceID),
Expand All @@ -672,11 +655,9 @@ func (m *sqlExecutionStore) GetCurrentExecution(
}

func (m *sqlExecutionStore) SetWorkflowExecution(
_ context.Context,
ctx context.Context,
request *p.InternalSetWorkflowExecutionRequest,
) error {
ctx, cancel := newExecutionContext()
defer cancel()
return m.txExecuteShardLocked(ctx,
"SetWorkflowExecution",
request.ShardID,
Expand Down

0 comments on commit 2e55dd3

Please sign in to comment.