From 35e3814155983491c600a5dcf9ffb4f5cf51d813 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Thu, 18 Jul 2024 14:19:54 +0100 Subject: [PATCH] Switch to json formatted status events --- internal/impl/kafka/enterprise/integration_test.go | 6 +++--- internal/impl/kafka/enterprise/status_updates.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/impl/kafka/enterprise/integration_test.go b/internal/impl/kafka/enterprise/integration_test.go index 7d9f72026..f0247db59 100644 --- a/internal/impl/kafka/enterprise/integration_test.go +++ b/internal/impl/kafka/enterprise/integration_test.go @@ -20,7 +20,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/benthos/v4/public/service/integration" - "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/encoding/protojson" "github.com/redpanda-data/connect/v4/internal/impl/kafka/enterprise" "github.com/redpanda-data/connect/v4/internal/protoconnect" @@ -213,12 +213,12 @@ max_message_bytes: 1MB var m protoconnect.StatusEvent - require.NoError(t, proto.Unmarshal([]byte(outRecords[0]), &m)) + require.NoError(t, protojson.Unmarshal([]byte(outRecords[0]), &m)) assert.Equal(t, protoconnect.StatusEvent_TYPE_INITIALIZING, m.Type) assert.Equal(t, "baz", m.InstanceId) assert.Equal(t, "buz", m.PipelineId) - require.NoError(t, proto.Unmarshal([]byte(outRecords[1]), &m)) + require.NoError(t, protojson.Unmarshal([]byte(outRecords[1]), &m)) assert.Equal(t, protoconnect.StatusEvent_TYPE_EXITING, m.Type) assert.Equal(t, "uh oh", m.ExitError.Message) assert.Equal(t, "baz", m.InstanceId) diff --git a/internal/impl/kafka/enterprise/status_updates.go b/internal/impl/kafka/enterprise/status_updates.go index c026308d1..fb571eebe 100644 --- a/internal/impl/kafka/enterprise/status_updates.go +++ b/internal/impl/kafka/enterprise/status_updates.go @@ -15,7 +15,7 @@ import ( "time" "github.com/redpanda-data/benthos/v4/public/service" - "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/encoding/protojson" "github.com/redpanda-data/connect/v4/internal/protoconnect" ) @@ -65,7 +65,7 @@ func (l *TopicLogger) sendStatusEvent(e *protoconnect.StatusEvent) { return } - data, err := proto.Marshal(e) + data, err := protojson.Marshal(e) if err != nil { l.fallbackLogger.Load().With("error", err).Error("Failed to marshal status event") return