Skip to content

Commit

Permalink
Decode DLQ tasks in tdbg
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Oct 26, 2023
1 parent a14d0ef commit cf2818c
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 122 deletions.
91 changes: 91 additions & 0 deletions internal/jsonl/jsonl_parser.go
@@ -0,0 +1,91 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package jsonl

import (
"encoding/json"
"io"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
)

type (
JSONParser[T any] interface {
ParseNext(decoder *json.Decoder) (T, error)
}
protoJSONParser[T proto.Message] struct {
newMessageFn func() T
unmarshaler jsonpb.Unmarshaler
}
jsonStructParser[T any] struct {
newMessageFn func() *T
unmarshaler json.Unmarshaler
}
)

// ParseJSONL parses objects from a JSONL file until EOF.
func ParseJSONL[T any](reader io.Reader, parser JSONParser[T]) ([]T, error) {
decoder := json.NewDecoder(reader)
var (
messages []T
)
for {
message, err := parser.ParseNext(decoder)
if err == io.EOF {
return messages, nil
}
if err != nil {
return nil, err
}
messages = append(messages, message)
}
}

// NewProtoJSONParser returns a JSONParser that parses JSONL files containing JSONPB-serialized protos of type T.
// The newMessageFn argument should return a new instance of the type of message that is being parsed.
func NewProtoJSONParser[T proto.Message](newMessageFn func() T) JSONParser[T] {
return protoJSONParser[T]{
newMessageFn: newMessageFn,
}
}

// NewJSONStructParser returns a JSONParser that parses JSONL files containing JSON-serialized structs of type T.
// The newMessageFn argument should return a new instance of the type of message that is being parsed.
func NewJSONStructParser[T any](newMessageFn func() *T) JSONParser[*T] {
return jsonStructParser[T]{
newMessageFn: newMessageFn,
}
}

func (p protoJSONParser[T]) ParseNext(decoder *json.Decoder) (T, error) {
message := p.newMessageFn()
return message, p.unmarshaler.UnmarshalNext(decoder, message)
}

func (p jsonStructParser[T]) ParseNext(decoder *json.Decoder) (*T, error) {
message := p.newMessageFn()
return message, decoder.Decode(message)
}
56 changes: 56 additions & 0 deletions internal/jsonl/jsonl_parser_test.go
@@ -0,0 +1,56 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package jsonl_test

import (
"bytes"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/server/internal/jsonl"
)

type (
Foo struct {
Foo string
}
)

func TestNewJSONStructParser(t *testing.T) {
t.Parallel()

const text = `{"foo": "bar"}
{"foo": "baz"}`
parser := jsonl.NewJSONStructParser(func() *Foo {
return &Foo{}
})
messages, err := jsonl.ParseJSONL(bytes.NewBuffer([]byte(text)), parser)
require.NoError(t, err)
assert.Equal(t, []*Foo{
{Foo: "bar"},
{Foo: "baz"},
}, messages)
}
12 changes: 9 additions & 3 deletions tests/dlq_test.go
Expand Up @@ -51,6 +51,7 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/internal/jsonl"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/tests/testutils"
"go.uber.org/fx"
Expand Down Expand Up @@ -495,9 +496,14 @@ func (s *dlqSuite) verifyNumTasks(file *os.File, expectedNumTasks int) {

// parseHistoryDLQTasks parses the specified file as a JSONL file containing HistoryDLQTask protos and returns them.
func (s *dlqSuite) parseHistoryDLQTasks(file *os.File) []*commonspb.HistoryDLQTask {
return ParseJSONLProtos[*commonspb.HistoryDLQTask](s.Assertions, file, func() *commonspb.HistoryDLQTask {
return new(commonspb.HistoryDLQTask)
})
dlqTasks, err := jsonl.ParseJSONL[*commonspb.HistoryDLQTask](
file,
jsonl.NewProtoJSONParser(func() *commonspb.HistoryDLQTask {
return new(commonspb.HistoryDLQTask)
}),
)
s.NoError(err)
return dlqTasks
}

// EnqueueTask is used to intercept writes to the DLQ, so that we can unblock the test upon completion.
Expand Down
29 changes: 0 additions & 29 deletions tests/tdbg_parser.go
Expand Up @@ -23,32 +23,3 @@
// THE SOFTWARE.

package tests

import (
"encoding/json"
"io"
"os"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
)

// ParseJSONLProtos parses protos from a JSONL file until EOF.
// The newMessage argument should return a new instance of the type of message that is being parsed.
func ParseJSONLProtos[T proto.Message](t *require.Assertions, file *os.File, newMessage func() T) []T {
decoder := json.NewDecoder(file)
var (
unmarshaler jsonpb.Unmarshaler
messages []T
)
for {
message := newMessage()
err := unmarshaler.UnmarshalNext(decoder, message)
if err == io.EOF {
return messages
}
t.NoError(err)
messages = append(messages, message)
}
}
21 changes: 11 additions & 10 deletions tests/xdc/history_replication_dlq_test.go
Expand Up @@ -48,6 +48,7 @@ import (
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/internal/jsonl"
"go.temporal.io/server/tests/testutils"
"go.uber.org/fx"

Expand Down Expand Up @@ -376,13 +377,13 @@ func (s *historyReplicationDLQSuite) TestWorkflowReplicationTaskFailure() {
// Verify that the replication task contains the correct information (operators will want to know which workflow
// failed to replicate).
if s.enableQueueV2 {
replicationTasks := tests.ParseJSONLProtos[*commonspb.HistoryDLQTask](
s.Assertions,
replicationTasks, err := jsonl.ParseJSONL(
file,
func() *commonspb.HistoryDLQTask {
return &commonspb.HistoryDLQTask{}
},
jsonl.NewProtoJSONParser(func() *commonspb.HistoryDLQTask {
return new(commonspb.HistoryDLQTask)
}),
)
s.NoError(err)
s.NotEmpty(replicationTasks)
blob := replicationTasks[0].Payload.Blob
task, err := serialization.ReplicationTaskInfoFromBlob(blob.Data, blob.EncodingType.String())
Expand All @@ -391,13 +392,13 @@ func (s *historyReplicationDLQSuite) TestWorkflowReplicationTaskFailure() {
s.Equal(run.GetID(), task.WorkflowId)
s.Equal(run.GetRunID(), task.RunId)
} else {
replicationTasks := tests.ParseJSONLProtos[*replicationspb.ReplicationTask](
s.Assertions,
replicationTasks, err := jsonl.ParseJSONL(
file,
func() *replicationspb.ReplicationTask {
return &replicationspb.ReplicationTask{}
},
jsonl.NewProtoJSONParser(func() *replicationspb.ReplicationTask {
return new(replicationspb.ReplicationTask)
}),
)
s.NoError(err)
s.NotEmpty(replicationTasks)
task := replicationTasks[0]
s.Equal(enumspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK, task.GetTaskType())
Expand Down

0 comments on commit cf2818c

Please sign in to comment.