Skip to content

Commit

Permalink
make some s3 utility functions public
Browse files Browse the repository at this point in the history
  • Loading branch information
paulnpdev committed Nov 28, 2022
1 parent 53f4615 commit 130d73e
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 26 deletions.
16 changes: 8 additions & 8 deletions common/archiver/s3store/historyArchiver.go
Expand Up @@ -146,7 +146,7 @@ func (h *historyArchiver) Archive(

logger := archiver.TagLoggerWithArchiveHistoryRequestAndURI(h.container.Logger, request, URI.String())

if err := softValidateURI(URI); err != nil {
if err := SoftValidateURI(URI); err != nil {
logger.Error(archiver.ArchiveNonRetryableErrorMsg, tag.ArchivalArchiveFailReason(archiver.ErrReasonInvalidURI), tag.Error(err))
return err
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func (h *historyArchiver) Archive(
}
key := constructHistoryKey(URI.Path(), request.NamespaceID, request.WorkflowID, request.RunID, request.CloseFailoverVersion, progress.BatchIdx)

exists, err := keyExists(ctx, h.s3cli, URI, key)
exists, err := KeyExists(ctx, h.s3cli, URI, key)
if err != nil {
if isRetryableError(err) {
logger.Error(archiver.ArchiveTransientErrorMsg, tag.ArchivalArchiveFailReason(errWriteKey), tag.Error(err))
Expand All @@ -208,7 +208,7 @@ func (h *historyArchiver) Archive(
if exists {
handler.Counter(metrics.HistoryArchiverBlobExistsCount.GetMetricName()).Record(1)
} else {
if err := upload(ctx, h.s3cli, URI, key, encodedHistoryBlob); err != nil {
if err := Upload(ctx, h.s3cli, URI, key, encodedHistoryBlob); err != nil {
if isRetryableError(err) {
logger.Error(archiver.ArchiveTransientErrorMsg, tag.ArchivalArchiveFailReason(errWriteKey), tag.Error(err))
} else {
Expand Down Expand Up @@ -270,7 +270,7 @@ func (h *historyArchiver) Get(
URI archiver.URI,
request *archiver.GetHistoryRequest,
) (*archiver.GetHistoryResponse, error) {
if err := softValidateURI(URI); err != nil {
if err := SoftValidateURI(URI); err != nil {
return nil, serviceerror.NewInvalidArgument(archiver.ErrInvalidURI.Error())
}

Expand Down Expand Up @@ -312,7 +312,7 @@ func (h *historyArchiver) Get(
}
key := constructHistoryKey(URI.Path(), request.NamespaceID, request.WorkflowID, request.RunID, token.CloseFailoverVersion, token.BatchIdx)

encodedRecord, err := download(ctx, h.s3cli, URI, key)
encodedRecord, err := Download(ctx, h.s3cli, URI, key)
if err != nil {
if isRetryableError(err) {
return nil, serviceerror.NewUnavailable(err.Error())
Expand Down Expand Up @@ -343,7 +343,7 @@ func (h *historyArchiver) Get(
}

if isTruncated {
nextToken, err := serializeToken(token)
nextToken, err := SerializeToken(token)
if err != nil {
return nil, serviceerror.NewInternal(err.Error())
}
Expand All @@ -354,11 +354,11 @@ func (h *historyArchiver) Get(
}

func (h *historyArchiver) ValidateURI(URI archiver.URI) error {
err := softValidateURI(URI)
err := SoftValidateURI(URI)
if err != nil {
return err
}
return bucketExists(context.TODO(), h.s3cli, URI)
return BucketExists(context.TODO(), h.s3cli, URI)
}

func (h *historyArchiver) getHighestVersion(ctx context.Context, URI archiver.URI, request *archiver.GetHistoryRequest) (*int64, error) {
Expand Down
20 changes: 10 additions & 10 deletions common/archiver/s3store/util.go
Expand Up @@ -52,7 +52,7 @@ import (

// encoding & decoding util

func encode(message proto.Message) ([]byte, error) {
func Encode(message proto.Message) ([]byte, error) {
encoder := codec.NewJSONPBEncoder()
return encoder.Encode(message)
}
Expand All @@ -67,7 +67,7 @@ func decodeVisibilityRecord(data []byte) (*archiverspb.VisibilityRecord, error)
return record, nil
}

func serializeToken(token interface{}) ([]byte, error) {
func SerializeToken(token interface{}) ([]byte, error) {
if token == nil {
return nil, nil
}
Expand All @@ -89,7 +89,7 @@ func serializeQueryVisibilityToken(token string) []byte {
}

// Only validates the scheme and buckets are passed
func softValidateURI(URI archiver.URI) error {
func SoftValidateURI(URI archiver.URI) error {
if URI.Scheme() != URIScheme {
return archiver.ErrURISchemeMismatch
}
Expand All @@ -99,7 +99,7 @@ func softValidateURI(URI archiver.URI) error {
return nil
}

func bucketExists(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI) error {
func BucketExists(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI) error {
ctx, cancel := ensureContextTimeout(ctx)
defer cancel()
_, err := s3cli.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
Expand All @@ -108,29 +108,29 @@ func bucketExists(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI) er
if err == nil {
return nil
}
if isNotFoundError(err) {
if IsNotFoundError(err) {
return errBucketNotExists
}
return err
}

func keyExists(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key string) (bool, error) {
func KeyExists(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key string) (bool, error) {
ctx, cancel := ensureContextTimeout(ctx)
defer cancel()
_, err := s3cli.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(URI.Hostname()),
Key: aws.String(key),
})
if err != nil {
if isNotFoundError(err) {
if IsNotFoundError(err) {
return false, nil
}
return false, err
}
return true, nil
}

func isNotFoundError(err error) bool {
func IsNotFoundError(err error) bool {
aerr, ok := err.(awserr.Error)
return ok && (aerr.Code() == "NotFound")
}
Expand Down Expand Up @@ -183,7 +183,7 @@ func ensureContextTimeout(ctx context.Context) (context.Context, context.CancelF
}
return context.WithTimeout(ctx, defaultBlobstoreTimeout)
}
func upload(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key string, data []byte) error {
func Upload(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key string, data []byte) error {
ctx, cancel := ensureContextTimeout(ctx)
defer cancel()

Expand All @@ -203,7 +203,7 @@ func upload(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key stri
return nil
}

func download(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key string) ([]byte, error) {
func Download(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key string) ([]byte, error) {
ctx, cancel := ensureContextTimeout(ctx)
defer cancel()
result, err := s3cli.GetObjectWithContext(ctx, &s3.GetObjectInput{
Expand Down
14 changes: 7 additions & 7 deletions common/archiver/s3store/visibilityArchiver.go
Expand Up @@ -128,7 +128,7 @@ func (v *visibilityArchiver) Archive(
}
}()

if err := softValidateURI(URI); err != nil {
if err := SoftValidateURI(URI); err != nil {
archiveFailReason = archiver.ErrReasonInvalidURI
return err
}
Expand All @@ -138,7 +138,7 @@ func (v *visibilityArchiver) Archive(
return err
}

encodedVisibilityRecord, err := encode(request)
encodedVisibilityRecord, err := Encode(request)
if err != nil {
archiveFailReason = errEncodeVisibilityRecord
return err
Expand All @@ -147,7 +147,7 @@ func (v *visibilityArchiver) Archive(
// Upload archive to all indexes
for _, element := range indexes {
key := constructTimestampIndex(URI.Path(), request.GetNamespaceId(), element.primaryIndex, element.primaryIndexValue, element.secondaryIndex, element.secondaryIndexTimestamp, request.GetRunId())
if err := upload(ctx, v.s3cli, URI, key, encodedVisibilityRecord); err != nil {
if err := Upload(ctx, v.s3cli, URI, key, encodedVisibilityRecord); err != nil {
archiveFailReason = errWriteKey
return err
}
Expand All @@ -171,7 +171,7 @@ func (v *visibilityArchiver) Query(
saTypeMap searchattribute.NameTypeMap,
) (*archiver.QueryVisibilityResponse, error) {

if err := softValidateURI(URI); err != nil {
if err := SoftValidateURI(URI); err != nil {
return nil, serviceerror.NewInvalidArgument(archiver.ErrInvalidURI.Error())
}

Expand Down Expand Up @@ -244,7 +244,7 @@ func (v *visibilityArchiver) query(
response.NextPageToken = serializeQueryVisibilityToken(*results.NextContinuationToken)
}
for _, item := range results.Contents {
encodedRecord, err := download(ctx, v.s3cli, URI, *item.Key)
encodedRecord, err := Download(ctx, v.s3cli, URI, *item.Key)
if err != nil {
return nil, serviceerror.NewUnavailable(err.Error())
}
Expand All @@ -263,9 +263,9 @@ func (v *visibilityArchiver) query(
}

func (v *visibilityArchiver) ValidateURI(URI archiver.URI) error {
err := softValidateURI(URI)
err := SoftValidateURI(URI)
if err != nil {
return err
}
return bucketExists(context.TODO(), v.s3cli, URI)
return BucketExists(context.TODO(), v.s3cli, URI)
}
2 changes: 1 addition & 1 deletion common/archiver/s3store/visibilityArchiver_test.go
Expand Up @@ -225,7 +225,7 @@ func (s *visibilityArchiverSuite) TestArchive_Success() {
s.NoError(err)

expectedKey := constructTimestampIndex(URI.Path(), testNamespaceID, primaryIndexKeyWorkflowID, testWorkflowID, secondaryIndexKeyCloseTimeout, timestamp.TimeValue(closeTimestamp), testRunID)
data, err := download(context.Background(), visibilityArchiver.s3cli, URI, expectedKey)
data, err := Download(context.Background(), visibilityArchiver.s3cli, URI, expectedKey)
s.NoError(err, expectedKey)

archivedRecord := &archiverspb.VisibilityRecord{}
Expand Down

0 comments on commit 130d73e

Please sign in to comment.