Skip to content

Commit

Permalink
Merge branch 'master' into snowden/stacktraces
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Mar 29, 2023
2 parents 6ffcfe9 + fca3662 commit 83ab5ea
Show file tree
Hide file tree
Showing 83 changed files with 4,783 additions and 1,698 deletions.
536 changes: 228 additions & 308 deletions api/adminservice/v1/request_response.pb.go

Large diffs are not rendered by default.

1,021 changes: 538 additions & 483 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

902 changes: 487 additions & 415 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

778 changes: 633 additions & 145 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

349 changes: 325 additions & 24 deletions api/workflow/v1/message.pb.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions client/admin/client.go
Expand Up @@ -80,7 +80,6 @@ func (c *clientImpl) StreamWorkflowReplicationMessages(
ctx context.Context,
opts ...grpc.CallOption,
) (adminservice.AdminService_StreamWorkflowReplicationMessagesClient, error) {
ctx, cancel := c.createContext(ctx)
defer cancel()
// do not use createContext function, let caller manage stream API lifecycle
return c.client.StreamWorkflowReplicationMessages(ctx, opts...)
}
5 changes: 4 additions & 1 deletion client/history/client.go
Expand Up @@ -244,7 +244,10 @@ func (c *clientImpl) StreamWorkflowReplicationMessages(
if err != nil {
return nil, err
}
return client.StreamWorkflowReplicationMessages(ctx, opts...)
return client.StreamWorkflowReplicationMessages(
metadata.NewOutgoingContext(ctx, ctxMetadata),
opts...,
)
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
Expand Down
11 changes: 7 additions & 4 deletions common/archiver/filestore/queryParser.go
Expand Up @@ -82,15 +82,18 @@ func NewQueryParser() QueryParser {
}

func (p *queryParser) Parse(query string) (*parsedQuery, error) {
parsedQuery := &parsedQuery{
earliestCloseTime: time.Time{},
latestCloseTime: time.Now().UTC(),
}
if strings.TrimSpace(query) == "" {
return parsedQuery, nil
}
stmt, err := sqlparser.Parse(fmt.Sprintf(queryTemplate, query))
if err != nil {
return nil, err
}
whereExpr := stmt.(*sqlparser.Select).Where.Expr
parsedQuery := &parsedQuery{
earliestCloseTime: time.Time{},
latestCloseTime: time.Now().UTC(),
}
if err := p.convertWhereExpr(whereExpr, parsedQuery); err != nil {
return nil, err
}
Expand Down
68 changes: 68 additions & 0 deletions common/archiver/filestore/visibilityArchiver_test.go
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"

"go.temporal.io/server/common/searchattribute"
Expand Down Expand Up @@ -510,6 +511,73 @@ func (s *visibilityArchiverSuite) TestArchiveAndQuery() {
s.Equal(ei, executions[1])
}

func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_InvalidNamespace() {
URI := s.testArchivalURI

visibilityArchiver := s.newTestVisibilityArchiver()
mockParser := NewMockQueryParser(s.controller)
mockParser.EXPECT().Parse(gomock.Any()).Return(&parsedQuery{
earliestCloseTime: time.Unix(0, 10),
latestCloseTime: time.Unix(0, 10001),
status: toWorkflowExecutionStatusPtr(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED),
}, nil).AnyTimes()
visibilityArchiver.queryParser = mockParser
req := &archiver.QueryVisibilityRequest{
NamespaceID: "",
PageSize: 1,
NextPageToken: nil,
Query: "",
}
_, err := visibilityArchiver.Query(context.Background(), URI, req, searchattribute.TestNameTypeMap)

var svcErr *serviceerror.InvalidArgument

s.ErrorAs(err, &svcErr)
}

func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_ZeroPageSize() {
visibilityArchiver := s.newTestVisibilityArchiver()

req := &archiver.QueryVisibilityRequest{
NamespaceID: testNamespaceID,
PageSize: 0,
NextPageToken: nil,
Query: "",
}
_, err := visibilityArchiver.Query(context.Background(), s.testArchivalURI, req, searchattribute.TestNameTypeMap)

var svcErr *serviceerror.InvalidArgument

s.ErrorAs(err, &svcErr)
}

func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_Pagination() {
dir := testutils.MkdirTemp(s.T(), "", "TestQuery_EmptyQuery_Pagination")

visibilityArchiver := s.newTestVisibilityArchiver()
URI, err := archiver.NewURI("file://" + dir)
s.NoError(err)
for _, record := range s.visibilityRecords {
err := visibilityArchiver.Archive(context.Background(), URI, record)
s.NoError(err)
}

request := &archiver.QueryVisibilityRequest{
NamespaceID: testNamespaceID,
PageSize: 1,
Query: "",
}
var executions []*workflowpb.WorkflowExecutionInfo
for len(executions) == 0 || request.NextPageToken != nil {
response, err := visibilityArchiver.Query(context.Background(), URI, request, searchattribute.TestNameTypeMap)
s.NoError(err)
s.NotNil(response)
executions = append(executions, response.Executions...)
request.NextPageToken = response.NextPageToken
}
s.Len(executions, 4)
}

func (s *visibilityArchiverSuite) newTestVisibilityArchiver() *visibilityArchiver {
config := &config.FilestoreArchiver{
FileMode: testFileModeStr,
Expand Down
74 changes: 58 additions & 16 deletions common/archiver/gcloud/visibilityArchiver.go
Expand Up @@ -29,6 +29,7 @@ import (
"errors"
"fmt"
"path/filepath"
"strings"
"time"

"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -170,6 +171,10 @@ func (v *visibilityArchiver) Query(
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrInvalidQueryVisibilityRequest.Error()}
}

if strings.TrimSpace(request.Query) == "" {
return v.queryAll(ctx, URI, request, saTypeMap)
}

parsedQuery, err := v.queryParser.Parse(request.Query)
if err != nil {
return nil, &serviceerror.InvalidArgument{Message: err.Error()}
Expand All @@ -194,26 +199,51 @@ func (v *visibilityArchiver) Query(

func (v *visibilityArchiver) query(
ctx context.Context,
URI archiver.URI,
uri archiver.URI,
request *queryVisibilityRequest,
saTypeMap searchattribute.NameTypeMap,
) (*archiver.QueryVisibilityResponse, error) {

token := new(queryVisibilityToken)
if request.nextPageToken != nil {
var err error
token, err = deserializeQueryVisibilityToken(request.nextPageToken)
if err != nil {
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrNextPageTokenCorrupted.Error()}
}
}

var prefix = constructVisibilityFilenamePrefix(request.namespaceID, indexKeyCloseTimeout)
prefix := constructVisibilityFilenamePrefix(request.namespaceID, indexKeyCloseTimeout)
if !request.parsedQuery.closeTime.IsZero() {
prefix = constructTimeBasedSearchKey(request.namespaceID, indexKeyCloseTimeout, request.parsedQuery.closeTime, *request.parsedQuery.searchPrecision)
prefix = constructTimeBasedSearchKey(
request.namespaceID,
indexKeyCloseTimeout,
request.parsedQuery.closeTime,
*request.parsedQuery.searchPrecision,
)
}

if !request.parsedQuery.startTime.IsZero() {
prefix = constructTimeBasedSearchKey(request.namespaceID, indexKeyStartTimeout, request.parsedQuery.startTime, *request.parsedQuery.searchPrecision)
prefix = constructTimeBasedSearchKey(
request.namespaceID,
indexKeyStartTimeout,
request.parsedQuery.startTime,
*request.parsedQuery.searchPrecision,
)
}

return v.queryPrefix(ctx, uri, request, saTypeMap, prefix)
}

func (v *visibilityArchiver) queryAll(
ctx context.Context,
URI archiver.URI,
request *archiver.QueryVisibilityRequest,
saTypeMap searchattribute.NameTypeMap,
) (*archiver.QueryVisibilityResponse, error) {

return v.queryPrefix(ctx, URI, &queryVisibilityRequest{
namespaceID: request.NamespaceID,
pageSize: request.PageSize,
nextPageToken: request.NextPageToken,
parsedQuery: &parsedQuery{},
}, saTypeMap, request.NamespaceID)
}

func (v *visibilityArchiver) queryPrefix(ctx context.Context, uri archiver.URI, request *queryVisibilityRequest, saTypeMap searchattribute.NameTypeMap, prefix string) (*archiver.QueryVisibilityResponse, error) {
token, err := v.parseToken(request.nextPageToken)
if err != nil {
return nil, err
}

filters := make([]connector.Precondition, 0)
Expand All @@ -229,14 +259,14 @@ func (v *visibilityArchiver) query(
filters = append(filters, newWorkflowIDPrecondition(hash(*request.parsedQuery.workflowType)))
}

filenames, completed, currentCursorPos, err := v.gcloudStorage.QueryWithFilters(ctx, URI, prefix, request.pageSize, token.Offset, filters)
filenames, completed, currentCursorPos, err := v.gcloudStorage.QueryWithFilters(ctx, uri, prefix, request.pageSize, token.Offset, filters)
if err != nil {
return nil, &serviceerror.InvalidArgument{Message: err.Error()}
}

response := &archiver.QueryVisibilityResponse{}
for _, file := range filenames {
encodedRecord, err := v.gcloudStorage.Get(ctx, URI, fmt.Sprintf("%s/%s", request.namespaceID, filepath.Base(file)))
encodedRecord, err := v.gcloudStorage.Get(ctx, uri, fmt.Sprintf("%s/%s", request.namespaceID, filepath.Base(file)))
if err != nil {
return nil, &serviceerror.InvalidArgument{Message: err.Error()}
}
Expand Down Expand Up @@ -267,6 +297,18 @@ func (v *visibilityArchiver) query(
return response, nil
}

func (v *visibilityArchiver) parseToken(nextPageToken []byte) (*queryVisibilityToken, error) {
token := new(queryVisibilityToken)
if nextPageToken != nil {
var err error
token, err = deserializeQueryVisibilityToken(nextPageToken)
if err != nil {
return nil, &serviceerror.InvalidArgument{Message: archiver.ErrNextPageTokenCorrupted.Error()}
}
}
return token, nil
}

// ValidateURI is used to define what a valid URI for an implementation is.
func (v *visibilityArchiver) ValidateURI(URI archiver.URI) (err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeoutInSeconds*time.Second)
Expand Down

0 comments on commit 83ab5ea

Please sign in to comment.