Skip to content

Commit

Permalink
Persistence Context Part 4: Visibility Manager (#2639)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 24, 2022
1 parent 90b7494 commit d663e2c
Show file tree
Hide file tree
Showing 12 changed files with 457 additions and 244 deletions.
29 changes: 15 additions & 14 deletions common/persistence/visibility/manager/visibility_manager.go
Expand Up @@ -28,6 +28,7 @@ package manager
//go:generate mockgen -copyright_file ../../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination visibility_manager_mock.go -aux_files go.temporal.io/server/common/persistence=../../dataInterfaces.go

import (
"context"
"time"

commonpb "go.temporal.io/api/common/v1"
Expand All @@ -45,22 +46,22 @@ type (
GetName() string

// Write APIs.
RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error
RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error
UpsertWorkflowExecution(request *UpsertWorkflowExecutionRequest) error
DeleteWorkflowExecution(request *VisibilityDeleteWorkflowExecutionRequest) error
RecordWorkflowExecutionStarted(ctx context.Context, request *RecordWorkflowExecutionStartedRequest) error
RecordWorkflowExecutionClosed(ctx context.Context, request *RecordWorkflowExecutionClosedRequest) error
UpsertWorkflowExecution(ctx context.Context, request *UpsertWorkflowExecutionRequest) error
DeleteWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error

// Read APIs.
ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
ListOpenWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error)
ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error)
ListWorkflowExecutions(request *ListWorkflowExecutionsRequestV2) (*ListWorkflowExecutionsResponse, error)
ScanWorkflowExecutions(request *ListWorkflowExecutionsRequestV2) (*ListWorkflowExecutionsResponse, error)
CountWorkflowExecutions(request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error)
ListOpenWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
ListOpenWorkflowExecutionsByType(ctx context.Context, request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByType(ctx context.Context, request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error)
ListOpenWorkflowExecutionsByWorkflowID(ctx context.Context, request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByWorkflowID(ctx context.Context, request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByStatus(ctx context.Context, request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error)
ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsRequestV2) (*ListWorkflowExecutionsResponse, error)
ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsRequestV2) (*ListWorkflowExecutionsResponse, error)
CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error)
}

VisibilityRequestBase struct {
Expand Down
113 changes: 57 additions & 56 deletions common/persistence/visibility/manager/visibility_manager_mock.go

Large diffs are not rendered by default.

Large diffs are not rendered by default.

100 changes: 72 additions & 28 deletions common/persistence/visibility/visibility_manager_dual.go
Expand Up @@ -25,6 +25,8 @@
package visibility

import (
"context"

"go.temporal.io/server/common/persistence/visibility/manager"
)

Expand Down Expand Up @@ -61,98 +63,140 @@ func (v *visibilityManagerDual) GetName() string {
return "VisibilityManagerDual"
}

func (v *visibilityManagerDual) RecordWorkflowExecutionStarted(request *manager.RecordWorkflowExecutionStartedRequest) error {
func (v *visibilityManagerDual) RecordWorkflowExecutionStarted(
ctx context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
) error {
ms, err := v.managerSelector.writeManagers()
if err != nil {
return err
}
for _, m := range ms {
err = m.RecordWorkflowExecutionStarted(request)
err = m.RecordWorkflowExecutionStarted(ctx, request)
if err != nil {
return err
}
}
return nil
}

func (v *visibilityManagerDual) RecordWorkflowExecutionClosed(request *manager.RecordWorkflowExecutionClosedRequest) error {
func (v *visibilityManagerDual) RecordWorkflowExecutionClosed(
ctx context.Context,
request *manager.RecordWorkflowExecutionClosedRequest,
) error {
ms, err := v.managerSelector.writeManagers()
if err != nil {
return err
}
for _, m := range ms {
err = m.RecordWorkflowExecutionClosed(request)
err = m.RecordWorkflowExecutionClosed(ctx, request)
if err != nil {
return err
}
}
return nil
}

func (v *visibilityManagerDual) UpsertWorkflowExecution(request *manager.UpsertWorkflowExecutionRequest) error {
func (v *visibilityManagerDual) UpsertWorkflowExecution(
ctx context.Context,
request *manager.UpsertWorkflowExecutionRequest,
) error {
ms, err := v.managerSelector.writeManagers()
if err != nil {
return err
}
for _, m := range ms {
err = m.UpsertWorkflowExecution(request)
err = m.UpsertWorkflowExecution(ctx, request)
if err != nil {
return err
}
}
return nil
}

func (v *visibilityManagerDual) DeleteWorkflowExecution(request *manager.VisibilityDeleteWorkflowExecutionRequest) error {
func (v *visibilityManagerDual) DeleteWorkflowExecution(
ctx context.Context,
request *manager.VisibilityDeleteWorkflowExecutionRequest,
) error {
ms, err := v.managerSelector.writeManagers()
if err != nil {
return err
}
for _, m := range ms {
err = m.DeleteWorkflowExecution(request)
err = m.DeleteWorkflowExecution(ctx, request)
if err != nil {
return err
}
}
return nil
}

func (v *visibilityManagerDual) ListOpenWorkflowExecutions(request *manager.ListWorkflowExecutionsRequest) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListOpenWorkflowExecutions(request)
func (v *visibilityManagerDual) ListOpenWorkflowExecutions(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListOpenWorkflowExecutions(ctx, request)
}

func (v *visibilityManagerDual) ListClosedWorkflowExecutions(request *manager.ListWorkflowExecutionsRequest) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListClosedWorkflowExecutions(request)
func (v *visibilityManagerDual) ListClosedWorkflowExecutions(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListClosedWorkflowExecutions(ctx, request)
}

func (v *visibilityManagerDual) ListOpenWorkflowExecutionsByType(request *manager.ListWorkflowExecutionsByTypeRequest) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListOpenWorkflowExecutionsByType(request)
func (v *visibilityManagerDual) ListOpenWorkflowExecutionsByType(
ctx context.Context,
request *manager.ListWorkflowExecutionsByTypeRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListOpenWorkflowExecutionsByType(ctx, request)
}

func (v *visibilityManagerDual) ListClosedWorkflowExecutionsByType(request *manager.ListWorkflowExecutionsByTypeRequest) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListClosedWorkflowExecutionsByType(request)
func (v *visibilityManagerDual) ListClosedWorkflowExecutionsByType(
ctx context.Context,
request *manager.ListWorkflowExecutionsByTypeRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListClosedWorkflowExecutionsByType(ctx, request)
}

func (v *visibilityManagerDual) ListOpenWorkflowExecutionsByWorkflowID(request *manager.ListWorkflowExecutionsByWorkflowIDRequest) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListOpenWorkflowExecutionsByWorkflowID(request)
func (v *visibilityManagerDual) ListOpenWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *manager.ListWorkflowExecutionsByWorkflowIDRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListOpenWorkflowExecutionsByWorkflowID(ctx, request)
}

func (v *visibilityManagerDual) ListClosedWorkflowExecutionsByWorkflowID(request *manager.ListWorkflowExecutionsByWorkflowIDRequest) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListClosedWorkflowExecutionsByWorkflowID(request)
func (v *visibilityManagerDual) ListClosedWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *manager.ListWorkflowExecutionsByWorkflowIDRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListClosedWorkflowExecutionsByWorkflowID(ctx, request)
}

func (v *visibilityManagerDual) ListClosedWorkflowExecutionsByStatus(request *manager.ListClosedWorkflowExecutionsByStatusRequest) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListClosedWorkflowExecutionsByStatus(request)
func (v *visibilityManagerDual) ListClosedWorkflowExecutionsByStatus(
ctx context.Context,
request *manager.ListClosedWorkflowExecutionsByStatusRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListClosedWorkflowExecutionsByStatus(ctx, request)
}

func (v *visibilityManagerDual) ListWorkflowExecutions(request *manager.ListWorkflowExecutionsRequestV2) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListWorkflowExecutions(request)
func (v *visibilityManagerDual) ListWorkflowExecutions(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ListWorkflowExecutions(ctx, request)
}

func (v *visibilityManagerDual) ScanWorkflowExecutions(request *manager.ListWorkflowExecutionsRequestV2) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ScanWorkflowExecutions(request)
func (v *visibilityManagerDual) ScanWorkflowExecutions(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*manager.ListWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).ScanWorkflowExecutions(ctx, request)
}

func (v *visibilityManagerDual) CountWorkflowExecutions(request *manager.CountWorkflowExecutionsRequest) (*manager.CountWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).CountWorkflowExecutions(request)
func (v *visibilityManagerDual) CountWorkflowExecutions(
ctx context.Context,
request *manager.CountWorkflowExecutionsRequest,
) (*manager.CountWorkflowExecutionsResponse, error) {
return v.managerSelector.readManager(request.Namespace).CountWorkflowExecutions(ctx, request)
}
71 changes: 57 additions & 14 deletions common/persistence/visibility/visibility_manager_impl.go
Expand Up @@ -25,6 +25,7 @@
package visibility

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -75,7 +76,10 @@ func (p *visibilityManagerImpl) GetName() string {
return p.store.GetName()
}

func (p *visibilityManagerImpl) RecordWorkflowExecutionStarted(request *manager.RecordWorkflowExecutionStartedRequest) error {
func (p *visibilityManagerImpl) RecordWorkflowExecutionStarted(
_ context.Context,
request *manager.RecordWorkflowExecutionStartedRequest,
) error {
requestBase, err := p.newInternalVisibilityRequestBase(request.VisibilityRequestBase)
if err != nil {
return err
Expand All @@ -86,7 +90,10 @@ func (p *visibilityManagerImpl) RecordWorkflowExecutionStarted(request *manager.
return p.store.RecordWorkflowExecutionStarted(req)
}

func (p *visibilityManagerImpl) RecordWorkflowExecutionClosed(request *manager.RecordWorkflowExecutionClosedRequest) error {
func (p *visibilityManagerImpl) RecordWorkflowExecutionClosed(
_ context.Context,
request *manager.RecordWorkflowExecutionClosedRequest,
) error {
requestBase, err := p.newInternalVisibilityRequestBase(request.VisibilityRequestBase)
if err != nil {
return err
Expand All @@ -99,7 +106,10 @@ func (p *visibilityManagerImpl) RecordWorkflowExecutionClosed(request *manager.R
return p.store.RecordWorkflowExecutionClosed(req)
}

func (p *visibilityManagerImpl) UpsertWorkflowExecution(request *manager.UpsertWorkflowExecutionRequest) error {
func (p *visibilityManagerImpl) UpsertWorkflowExecution(
_ context.Context,
request *manager.UpsertWorkflowExecutionRequest,
) error {
requestBase, err := p.newInternalVisibilityRequestBase(request.VisibilityRequestBase)
if err != nil {
return err
Expand All @@ -110,19 +120,28 @@ func (p *visibilityManagerImpl) UpsertWorkflowExecution(request *manager.UpsertW
return p.store.UpsertWorkflowExecution(req)
}

func (p *visibilityManagerImpl) DeleteWorkflowExecution(request *manager.VisibilityDeleteWorkflowExecutionRequest) error {
func (p *visibilityManagerImpl) DeleteWorkflowExecution(
_ context.Context,
request *manager.VisibilityDeleteWorkflowExecutionRequest,
) error {
return p.store.DeleteWorkflowExecution(request)
}

func (p *visibilityManagerImpl) ListOpenWorkflowExecutions(request *manager.ListWorkflowExecutionsRequest) (*manager.ListWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) ListOpenWorkflowExecutions(
_ context.Context,
request *manager.ListWorkflowExecutionsRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
response, err := p.store.ListOpenWorkflowExecutions(request)
if err != nil {
return nil, err
}
return p.convertInternalListResponse(response)
}

func (p *visibilityManagerImpl) ListClosedWorkflowExecutions(request *manager.ListWorkflowExecutionsRequest) (*manager.ListWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) ListClosedWorkflowExecutions(
_ context.Context,
request *manager.ListWorkflowExecutionsRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
response, err := p.store.ListClosedWorkflowExecutions(request)
if err != nil {
return nil, err
Expand All @@ -131,7 +150,10 @@ func (p *visibilityManagerImpl) ListClosedWorkflowExecutions(request *manager.Li
return p.convertInternalListResponse(response)
}

func (p *visibilityManagerImpl) ListOpenWorkflowExecutionsByType(request *manager.ListWorkflowExecutionsByTypeRequest) (*manager.ListWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) ListOpenWorkflowExecutionsByType(
_ context.Context,
request *manager.ListWorkflowExecutionsByTypeRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
response, err := p.store.ListOpenWorkflowExecutionsByType(request)
if err != nil {
return nil, err
Expand All @@ -140,7 +162,10 @@ func (p *visibilityManagerImpl) ListOpenWorkflowExecutionsByType(request *manage
return p.convertInternalListResponse(response)
}

func (p *visibilityManagerImpl) ListClosedWorkflowExecutionsByType(request *manager.ListWorkflowExecutionsByTypeRequest) (*manager.ListWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) ListClosedWorkflowExecutionsByType(
_ context.Context,
request *manager.ListWorkflowExecutionsByTypeRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
response, err := p.store.ListClosedWorkflowExecutionsByType(request)
if err != nil {
return nil, err
Expand All @@ -149,7 +174,10 @@ func (p *visibilityManagerImpl) ListClosedWorkflowExecutionsByType(request *mana
return p.convertInternalListResponse(response)
}

func (p *visibilityManagerImpl) ListOpenWorkflowExecutionsByWorkflowID(request *manager.ListWorkflowExecutionsByWorkflowIDRequest) (*manager.ListWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) ListOpenWorkflowExecutionsByWorkflowID(
_ context.Context,
request *manager.ListWorkflowExecutionsByWorkflowIDRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
response, err := p.store.ListOpenWorkflowExecutionsByWorkflowID(request)
if err != nil {
return nil, err
Expand All @@ -158,7 +186,10 @@ func (p *visibilityManagerImpl) ListOpenWorkflowExecutionsByWorkflowID(request *
return p.convertInternalListResponse(response)
}

func (p *visibilityManagerImpl) ListClosedWorkflowExecutionsByWorkflowID(request *manager.ListWorkflowExecutionsByWorkflowIDRequest) (*manager.ListWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) ListClosedWorkflowExecutionsByWorkflowID(
_ context.Context,
request *manager.ListWorkflowExecutionsByWorkflowIDRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
response, err := p.store.ListClosedWorkflowExecutionsByWorkflowID(request)
if err != nil {
return nil, err
Expand All @@ -167,7 +198,10 @@ func (p *visibilityManagerImpl) ListClosedWorkflowExecutionsByWorkflowID(request
return p.convertInternalListResponse(response)
}

func (p *visibilityManagerImpl) ListClosedWorkflowExecutionsByStatus(request *manager.ListClosedWorkflowExecutionsByStatusRequest) (*manager.ListWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) ListClosedWorkflowExecutionsByStatus(
_ context.Context,
request *manager.ListClosedWorkflowExecutionsByStatusRequest,
) (*manager.ListWorkflowExecutionsResponse, error) {
response, err := p.store.ListClosedWorkflowExecutionsByStatus(request)
if err != nil {
return nil, err
Expand All @@ -176,7 +210,10 @@ func (p *visibilityManagerImpl) ListClosedWorkflowExecutionsByStatus(request *ma
return p.convertInternalListResponse(response)
}

func (p *visibilityManagerImpl) ListWorkflowExecutions(request *manager.ListWorkflowExecutionsRequestV2) (*manager.ListWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) ListWorkflowExecutions(
_ context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*manager.ListWorkflowExecutionsResponse, error) {
response, err := p.store.ListWorkflowExecutions(request)
if err != nil {
return nil, err
Expand All @@ -185,7 +222,10 @@ func (p *visibilityManagerImpl) ListWorkflowExecutions(request *manager.ListWork
return p.convertInternalListResponse(response)
}

func (p *visibilityManagerImpl) ScanWorkflowExecutions(request *manager.ListWorkflowExecutionsRequestV2) (*manager.ListWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) ScanWorkflowExecutions(
_ context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*manager.ListWorkflowExecutionsResponse, error) {
response, err := p.store.ScanWorkflowExecutions(request)
if err != nil {
return nil, err
Expand All @@ -194,7 +234,10 @@ func (p *visibilityManagerImpl) ScanWorkflowExecutions(request *manager.ListWork
return p.convertInternalListResponse(response)
}

func (p *visibilityManagerImpl) CountWorkflowExecutions(request *manager.CountWorkflowExecutionsRequest) (*manager.CountWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) CountWorkflowExecutions(
_ context.Context,
request *manager.CountWorkflowExecutionsRequest,
) (*manager.CountWorkflowExecutionsResponse, error) {
response, err := p.store.CountWorkflowExecutions(request)
if err != nil {
return nil, err
Expand Down

0 comments on commit d663e2c

Please sign in to comment.