Skip to content

Commit

Permalink
Convert int/int64 timestamps to time.Time (#786)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Oct 2, 2020
1 parent 83ef3eb commit 2051585
Show file tree
Hide file tree
Showing 24 changed files with 227 additions and 237 deletions.
33 changes: 17 additions & 16 deletions common/archiver/filestore/queryParser.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

"go.temporal.io/server/common"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/primitives/timestamp"
)

type (
Expand All @@ -49,8 +50,8 @@ type (
queryParser struct{}

parsedQuery struct {
earliestCloseTime int64
latestCloseTime int64
earliestCloseTime time.Time
latestCloseTime time.Time
workflowID *string
runID *string
workflowTypeName *string
Expand Down Expand Up @@ -87,8 +88,8 @@ func (p *queryParser) Parse(query string) (*parsedQuery, error) {
}
whereExpr := stmt.(*sqlparser.Select).Where.Expr
parsedQuery := &parsedQuery{
earliestCloseTime: 0,
latestCloseTime: time.Now().UnixNano(),
earliestCloseTime: time.Time{},
latestCloseTime: time.Now().UTC(),
}
if err := p.convertWhereExpr(whereExpr, parsedQuery); err != nil {
return nil, err
Expand Down Expand Up @@ -196,7 +197,7 @@ func (p *queryParser) convertComparisonExpr(compExpr *sqlparser.ComparisonExpr,
}
parsedQuery.status = &status
case CloseTime:
timestamp, err := convertToTimestamp(valStr)
timestamp, err := convertToTime(valStr)
if err != nil {
return err
}
Expand All @@ -208,7 +209,7 @@ func (p *queryParser) convertComparisonExpr(compExpr *sqlparser.ComparisonExpr,
return nil
}

func (p *queryParser) convertCloseTime(timestamp int64, op string, parsedQuery *parsedQuery) error {
func (p *queryParser) convertCloseTime(timestamp time.Time, op string, parsedQuery *parsedQuery) error {
switch op {
case "=":
if err := p.convertCloseTime(timestamp, ">=", parsedQuery); err != nil {
Expand All @@ -218,33 +219,33 @@ func (p *queryParser) convertCloseTime(timestamp int64, op string, parsedQuery *
return err
}
case "<":
parsedQuery.latestCloseTime = common.MinInt64(parsedQuery.latestCloseTime, timestamp-1)
parsedQuery.latestCloseTime = common.MinTime(parsedQuery.latestCloseTime, timestamp.Add(-1*time.Nanosecond))
case "<=":
parsedQuery.latestCloseTime = common.MinInt64(parsedQuery.latestCloseTime, timestamp)
parsedQuery.latestCloseTime = common.MinTime(parsedQuery.latestCloseTime, timestamp)
case ">":
parsedQuery.earliestCloseTime = common.MaxInt64(parsedQuery.earliestCloseTime, timestamp+1)
parsedQuery.earliestCloseTime = common.MaxTime(parsedQuery.earliestCloseTime, timestamp.Add(1*time.Nanosecond))
case ">=":
parsedQuery.earliestCloseTime = common.MaxInt64(parsedQuery.earliestCloseTime, timestamp)
parsedQuery.earliestCloseTime = common.MaxTime(parsedQuery.earliestCloseTime, timestamp)
default:
return fmt.Errorf("operator %s is not supported for close time", op)
}
return nil
}

func convertToTimestamp(timeStr string) (int64, error) {
timestamp, err := strconv.ParseInt(timeStr, 10, 64)
func convertToTime(timeStr string) (time.Time, error) {
ts, err := strconv.ParseInt(timeStr, 10, 64)
if err == nil {
return timestamp, nil
return timestamp.UnixOrZeroTime(ts), nil
}
timestampStr, err := extractStringValue(timeStr)
if err != nil {
return 0, err
return time.Time{}, err
}
parsedTime, err := time.Parse(defaultDateTimeFormat, timestampStr)
if err != nil {
return 0, err
return time.Time{}, err
}
return parsedTime.UnixNano(), nil
return parsedTime, nil
}

func convertStatusStr(statusStr string) (enumspb.WorkflowExecutionStatus, error) {
Expand Down
43 changes: 22 additions & 21 deletions common/archiver/filestore/queryParser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package filestore

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -260,32 +261,32 @@ func (s *queryParserSuite) TestParseCloseTime() {
query: "CloseTime <= 1000",
expectErr: false,
parsedQuery: &parsedQuery{
earliestCloseTime: 0,
latestCloseTime: 1000,
earliestCloseTime: time.Time{},
latestCloseTime: time.Unix(0, 1000),
},
},
{
query: "CloseTime < 2000 and CloseTime <= 1000 and CloseTime > 300",
expectErr: false,
parsedQuery: &parsedQuery{
earliestCloseTime: 301,
latestCloseTime: 1000,
earliestCloseTime: time.Unix(0, 301),
latestCloseTime: time.Unix(0, 1000),
},
},
{
query: "CloseTime = 2000 and (CloseTime > 1000 and CloseTime <= 9999)",
expectErr: false,
parsedQuery: &parsedQuery{
earliestCloseTime: 2000,
latestCloseTime: 2000,
earliestCloseTime: time.Unix(0, 2000),
latestCloseTime: time.Unix(0, 2000),
},
},
{
query: "CloseTime <= \"2019-01-01T11:11:11Z\" and CloseTime >= 1000000",
expectErr: false,
parsedQuery: &parsedQuery{
earliestCloseTime: 1000000,
latestCloseTime: 1546341071000000000,
earliestCloseTime: time.Unix(0, 1000000),
latestCloseTime: time.Date(2019, 01, 01, 11, 11, 11, 0, time.UTC),
},
},
{
Expand All @@ -302,17 +303,17 @@ func (s *queryParserSuite) TestParseCloseTime() {
},
}

for _, tc := range testCases {
for i, tc := range testCases {
parsedQuery, err := s.parser.Parse(tc.query)
if tc.expectErr {
s.Error(err)
continue
}
s.NoError(err)
s.Equal(tc.parsedQuery.emptyResult, parsedQuery.emptyResult)
s.NoError(err, "case %d", i)
s.Equal(tc.parsedQuery.emptyResult, parsedQuery.emptyResult, "case %d", i)
if !tc.parsedQuery.emptyResult {
s.Equal(tc.parsedQuery.earliestCloseTime, parsedQuery.earliestCloseTime)
s.Equal(tc.parsedQuery.latestCloseTime, parsedQuery.latestCloseTime)
s.True(tc.parsedQuery.earliestCloseTime.Equal(parsedQuery.earliestCloseTime), "case %d", i)
s.True(tc.parsedQuery.latestCloseTime.Equal(parsedQuery.latestCloseTime), "case %d", i)
}
}
}
Expand All @@ -327,17 +328,17 @@ func (s *queryParserSuite) TestParse() {
query: "CloseTime <= \"2019-01-01T11:11:11Z\" and WorkflowId = 'random workflowID'",
expectErr: false,
parsedQuery: &parsedQuery{
earliestCloseTime: 0,
latestCloseTime: 1546341071000000000,
earliestCloseTime: time.Time{},
latestCloseTime: time.Date(2019, 01, 01, 11, 11, 11, 0, time.UTC),
workflowID: convert.StringPtr("random workflowID"),
},
},
{
query: "CloseTime > 1999 and CloseTime < 10000 and RunId = 'random runID' and ExecutionStatus = 'Failed'",
expectErr: false,
parsedQuery: &parsedQuery{
earliestCloseTime: 2000,
latestCloseTime: 9999,
earliestCloseTime: time.Unix(0, 2000).UTC(),
latestCloseTime: time.Unix(0, 9999).UTC(),
runID: convert.StringPtr("random runID"),
status: toWorkflowExecutionStatusPtr(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED),
},
Expand All @@ -351,16 +352,16 @@ func (s *queryParserSuite) TestParse() {
},
}

for _, tc := range testCases {
for i, tc := range testCases {
parsedQuery, err := s.parser.Parse(tc.query)
if tc.expectErr {
s.Error(err)
continue
}
s.NoError(err)
s.Equal(tc.parsedQuery.emptyResult, parsedQuery.emptyResult)
s.NoError(err, "case %d", i)
s.Equal(tc.parsedQuery.emptyResult, parsedQuery.emptyResult, "case %d", i)
if !tc.parsedQuery.emptyResult {
s.Equal(tc.parsedQuery, parsedQuery)
s.Equal(tc.parsedQuery, parsedQuery, "case %d", i)
}
}
}
22 changes: 12 additions & 10 deletions common/archiver/filestore/visibilityArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sort"
"strconv"
"strings"
"time"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
Expand All @@ -40,6 +41,7 @@ import (
archiverspb "go.temporal.io/server/api/archiver/v1"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/service/config"
)

Expand All @@ -56,7 +58,7 @@ type (
}

queryVisibilityToken struct {
LastCloseTime int64
LastCloseTime time.Time
LastRunID string
}

Expand Down Expand Up @@ -215,7 +217,7 @@ func (v *visibilityArchiver) query(
return nil, serviceerror.NewInternal(err.Error())
}

if record.CloseTime.UnixNano() < request.parsedQuery.earliestCloseTime {
if record.CloseTime.Before(request.parsedQuery.earliestCloseTime) {
break
}

Expand All @@ -224,7 +226,7 @@ func (v *visibilityArchiver) query(
if len(response.Executions) == request.pageSize {
if idx != len(files) {
newToken := &queryVisibilityToken{
LastCloseTime: record.CloseTime.UnixNano(),
LastCloseTime: timestamp.TimeValue(record.CloseTime),
LastRunID: record.GetRunId(),
}
encodedToken, err := serializeToken(newToken)
Expand All @@ -251,7 +253,7 @@ func (v *visibilityArchiver) ValidateURI(URI archiver.URI) error {

type parsedVisFilename struct {
name string
closeTime int64
closeTime time.Time
hashedRunID string
}

Expand All @@ -273,26 +275,26 @@ func sortAndFilterFiles(filenames []string, token *queryVisibilityToken) ([]stri
}
parsedFilenames = append(parsedFilenames, &parsedVisFilename{
name: name,
closeTime: closeTime,
closeTime: timestamp.UnixOrZeroTime(closeTime),
hashedRunID: pieces[1],
})
}

sort.Slice(parsedFilenames, func(i, j int) bool {
if parsedFilenames[i].closeTime == parsedFilenames[j].closeTime {
if parsedFilenames[i].closeTime.Equal(parsedFilenames[j].closeTime) {
return parsedFilenames[i].hashedRunID > parsedFilenames[j].hashedRunID
}
return parsedFilenames[i].closeTime > parsedFilenames[j].closeTime
return parsedFilenames[i].closeTime.After(parsedFilenames[j].closeTime)
})

startIdx := 0
if token != nil {
LastHashedRunID := hash(token.LastRunID)
startIdx = sort.Search(len(parsedFilenames), func(i int) bool {
if parsedFilenames[i].closeTime == token.LastCloseTime {
if parsedFilenames[i].closeTime.Equal(token.LastCloseTime) {
return parsedFilenames[i].hashedRunID < LastHashedRunID
}
return parsedFilenames[i].closeTime < token.LastCloseTime
return parsedFilenames[i].closeTime.Before(token.LastCloseTime)
})
}

Expand All @@ -308,7 +310,7 @@ func sortAndFilterFiles(filenames []string, token *queryVisibilityToken) ([]stri
}

func matchQuery(record *archiverspb.ArchiveVisibilityRequest, query *parsedQuery) bool {
if record.CloseTime.UnixNano() < query.earliestCloseTime || record.CloseTime.UnixNano() > query.latestCloseTime {
if record.CloseTime.Before(query.earliestCloseTime) || record.CloseTime.After(query.latestCloseTime) {
return false
}
if query.workflowID != nil && record.GetWorkflowId() != *query.workflowID {
Expand Down

0 comments on commit 2051585

Please sign in to comment.