Skip to content

Commit

Permalink
Improve archival related code (#2268)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Jul 25, 2019
1 parent af42a50 commit 895cecc
Show file tree
Hide file tree
Showing 45 changed files with 803 additions and 523 deletions.
13 changes: 7 additions & 6 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,17 @@ func (s *server) startService() common.Daemon {
params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName))

configuredForHistoryArchival := params.ClusterMetadata.HistoryArchivalConfig().ClusterConfiguredForArchival()
configuredForVisibilityArchival := params.ClusterMetadata.VisibilityArchivalConfig().ClusterConfiguredForArchival()
historyArchiverProvider := s.cfg.Archival.History.ArchiverProvider
visibilityArchiverProvider := s.cfg.Archival.Visibility.ArchiverProvider
if (configuredForHistoryArchival && historyArchiverProvider == nil) || (!configuredForHistoryArchival && historyArchiverProvider != nil) {
historyArchiverProviderCfg := s.cfg.Archival.History.Provider
if (configuredForHistoryArchival && historyArchiverProviderCfg == nil) || (!configuredForHistoryArchival && historyArchiverProviderCfg != nil) {
log.Fatalf("invalid history archival config")
}
if (configuredForVisibilityArchival && visibilityArchiverProvider == nil) || (!configuredForVisibilityArchival && visibilityArchiverProvider != nil) {

configuredForVisibilityArchival := params.ClusterMetadata.VisibilityArchivalConfig().ClusterConfiguredForArchival()
visibilityArchiverProviderCfg := s.cfg.Archival.Visibility.Provider
if (configuredForVisibilityArchival && visibilityArchiverProviderCfg == nil) || (!configuredForVisibilityArchival && visibilityArchiverProviderCfg != nil) {
log.Fatalf("invalid visibility archival config")
}
params.ArchiverProvider = provider.NewArchiverProvider(historyArchiverProvider, visibilityArchiverProvider)
params.ArchiverProvider = provider.NewArchiverProvider(historyArchiverProviderCfg, visibilityArchiverProviderCfg)

params.PersistenceConfig.TransactionSizeLimit = dc.GetIntProperty(dynamicconfig.TransactionSizeLimit, common.DefaultTransactionSizeLimit)

Expand Down
93 changes: 93 additions & 0 deletions common/archiver/URI.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archiver

import (
"errors"
"net/url"
)

type (
// URI identifies the archival resource to which records are written to and read from.
URI interface {
Scheme() string
Path() string
Hostname() string
Port() string
Username() string
Password() string
String() string
}

uri struct {
url *url.URL
}
)

// NewURI constructs a new archiver URI from string.
func NewURI(s string) (URI, error) {
url, err := url.ParseRequestURI(s)
if err != nil {
return nil, err
}
if url.Opaque != "" {
return nil, errors.New("URI should begin with scheme://")
}
return &uri{url: url}, nil
}

func (u *uri) Scheme() string {
return u.url.Scheme
}

func (u *uri) Path() string {
return u.url.Path
}

func (u *uri) Hostname() string {
return u.url.Hostname()
}

func (u *uri) Port() string {
return u.url.Port()
}

func (u *uri) Username() string {
if u.url.User == nil {
return ""
}
return u.url.User.Username()
}

func (u *uri) Password() string {
if u.url.User == nil {
return ""
}
password, exist := u.url.User.Password()
if !exist {
return ""
}
return password
}

func (u *uri) String() string {
return u.url.String()
}
126 changes: 126 additions & 0 deletions common/archiver/URI_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archiver

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type (
URISuite struct {
*require.Assertions
suite.Suite
}
)

func TestURISuite(t *testing.T) {
suite.Run(t, new(URISuite))
}

func (s *URISuite) SetupTest() {
s.Assertions = require.New(s.T())
}

func (s *URISuite) TestURI() {
testCases := []struct {
URIString string
valid bool
scheme string
path string
hostname string
port string
username string
password string
}{
{
URIString: "",
valid: false,
},
{
URIString: "some random string",
valid: false,
},
{
URIString: "mailto:a@b.com",
valid: false,
},
{
URIString: "test://",
valid: true,
scheme: "test",
},
{
URIString: "http://example.com/path",
valid: true,
scheme: "http",
hostname: "example.com",
path: "/path",
},
{
URIString: "http://example.com/path with space",
valid: true,
scheme: "http",
hostname: "example.com",
path: "/path with space",
},
{
URIString: "https://localhost:8080",
valid: true,
scheme: "https",
hostname: "localhost",
port: "8080",
},
{
URIString: "file:///absolute/path/to/dir",
valid: true,
scheme: "file",
path: "/absolute/path/to/dir",
},
{
URIString: "test://person:password@host/path",
valid: true,
scheme: "test",
hostname: "host",
path: "/path",
username: "person",
password: "password",
},
}

for _, tc := range testCases {
URI, err := NewURI(tc.URIString)

This comment has been minimized.

Copy link
@shreyassrivatsan

shreyassrivatsan Jul 25, 2019

Contributor

for the future, its even better to use subtests here.. that makes it easier to know what failed as the tests are run separately..

s.Run(<testname>, func (t *testing.T) {
    // test logic.
})
if !tc.valid {
s.Error(err)
continue
}

s.NoError(err)
s.Equal(tc.scheme, URI.Scheme())
s.Equal(tc.path, URI.Path())
s.Equal(tc.hostname, URI.Hostname())
s.Equal(tc.port, URI.Port())
s.Equal(tc.username, URI.Username())
s.Equal(tc.password, URI.Password())
}
}
38 changes: 21 additions & 17 deletions common/archiver/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,39 @@ package archiver

import (
"errors"

"github.com/uber/cadence/.gen/go/shared"
)

const (
// ArchiveNonRetriableErrorMsg is the log message when the Archive() method encounters a non-retriable error
ArchiveNonRetriableErrorMsg = "Archive method encountered an non-retriable error."
// ArchiveTransientErrorMsg is the log message when the Archive() method encounters a transient error
ArchiveTransientErrorMsg = "Archive method encountered a transient error."

// ErrInvalidURI is the error reason for invalid URI
ErrInvalidURI = "URI is invalid"
// ErrInvalidArchiveRequest is the error reason for invalid archive request
ErrInvalidArchiveRequest = "archive request is invalid"
// ErrConstructHistoryIterator is the error reason for failing to construct history iterator
ErrConstructHistoryIterator = "failed to construct history iterator"
// ErrReadHistory is the error reason for failing to read history
ErrReadHistory = "failed to read history batches"
// ErrHistoryMutated is the error reason for mutated history
ErrHistoryMutated = "history was mutated"
// ErrReasonInvalidURI is the error reason for invalid URI
ErrReasonInvalidURI = "URI is invalid"
// ErrReasonInvalidArchiveRequest is the error reason for invalid archive request
ErrReasonInvalidArchiveRequest = "archive request is invalid"
// ErrReasonConstructHistoryIterator is the error reason for failing to construct history iterator
ErrReasonConstructHistoryIterator = "failed to construct history iterator"
// ErrReasonReadHistory is the error reason for failing to read history
ErrReasonReadHistory = "failed to read history batches"
// ErrReasonHistoryMutated is the error reason for mutated history
ErrReasonHistoryMutated = "history was mutated"
)

var (
// ErrInvalidURIScheme is the error for invalid URI
ErrInvalidURIScheme = errors.New("URI scheme is invalid")
// ErrInvalidURI is the error for invalid URI
ErrInvalidURI = errors.New("URI is invalid")
// ErrURISchemeMismatch is the error for mismatch between URI scheme and archiver
ErrURISchemeMismatch = errors.New("URI scheme does not match the archiver")
// ErrHistoryMutated is the error for mutated history
ErrHistoryMutated = errors.New("history was mutated")
// ErrContextTimeout is the error for context timeout
ErrContextTimeout = errors.New("archive aborted because context timed out")
// ErrInvalidGetHistoryRequest is the error for invalid GetHistory request
ErrInvalidGetHistoryRequest = &shared.BadRequestError{Message: "Get archived history request is invalid"}
ErrInvalidGetHistoryRequest = errors.New("get archived history request is invalid")
// ErrGetHistoryTokenCorrupted is the error for corrupted GetHistory token
ErrGetHistoryTokenCorrupted = &shared.BadRequestError{Message: "Next page token is corrupted."}
ErrGetHistoryTokenCorrupted = errors.New("next page token is corrupted")
// ErrHistoryNotExist is the error for non-exist history
ErrHistoryNotExist = &shared.BadRequestError{Message: "Requested workflow history does not exist."}
ErrHistoryNotExist = errors.New("requested workflow history does not exist")
)
Loading

0 comments on commit 895cecc

Please sign in to comment.