Skip to content

Commit

Permalink
feat: use customized binary serde for nats message payload (#585)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
jy4096 authored and whynowy committed Mar 13, 2023
1 parent dbe3694 commit 67fc688
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 75 deletions.
37 changes: 10 additions & 27 deletions pkg/isb/stores/jetstream/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -221,23 +220,25 @@ func (jr *jetStreamReader) Rate(_ context.Context, seconds int64) (float64, erro
}

func (jr *jetStreamReader) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) {
var err error
result := []*isb.ReadMessage{}
msgs, err := jr.sub.Fetch(int(count), nats.MaxWait(jr.opts.readTimeOut))
if err != nil && !errors.Is(err, nats.ErrTimeout) {
isbReadErrors.With(map[string]string{"buffer": jr.GetName()}).Inc()
return nil, fmt.Errorf("failed to fetch messages from jet stream subject %q, %w", jr.subject, err)
}
for _, msg := range msgs {
m := &isb.ReadMessage{
var m = new(isb.Message)
// err should be nil as we have our own marshaller/unmarshaller
err = m.UnmarshalBinary(msg.Data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal the message into isb.Message, %w", err)
}
rm := &isb.ReadMessage{
ReadOffset: newOffset(msg, jr.inProgessTickDuration, jr.log),
Message: isb.Message{
Header: convert2IsbMsgHeader(msg.Header),
Body: isb.Body{
Payload: msg.Data,
},
},
Message: *m,
}
result = append(result, m)
result = append(result, rm)
}
return result, nil
}
Expand Down Expand Up @@ -269,24 +270,6 @@ func (jr *jetStreamReader) Ack(_ context.Context, offsets []isb.Offset) []error
return errs
}

func convert2IsbMsgHeader(header nats.Header) isb.Header {
r := isb.Header{}
if header.Get(_late) == "1" {
r.IsLate = true
}
if x := header.Get(_id); x != "" {
r.ID = x
}
if x := header.Get(_key); x != "" {
r.Key = x
}
if x := header.Get(_eventTime); x != "" {
i, _ := strconv.ParseInt(x, 10, 64)
r.EventTime = time.UnixMilli(i)
}
return r
}

// offset implements ID interface for JetStream.
type offset struct {
seq uint64
Expand Down
10 changes: 0 additions & 10 deletions pkg/isb/stores/jetstream/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,6 @@ func TestClose(t *testing.T) {

}

// TestConvert2IsbMsgHeader is used to convert isb header
func TestConvert2IsbMsgHeader(t *testing.T) {
natsHeader := nats.Header{}
natsHeader.Set("w", "1")
natsHeader.Set("ps", "1636470000")
natsHeader.Set("pen", "1636470060")

assert.NotNil(t, convert2IsbMsgHeader(natsHeader))
}

func addStream(t *testing.T, js *jsclient.JetStreamContext, streamName string) {
s := natstest.RunJetStreamServer(t)
defer natstest.ShutdownJetStreamServer(t, s)
Expand Down
38 changes: 12 additions & 26 deletions pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,14 @@ func (jw *jetStreamWriter) asyncWrite(_ context.Context, messages []isb.Message,
var writeOffsets = make([]isb.Offset, len(messages))
var futures = make([]nats.PubAckFuture, len(messages))
for index, message := range messages {
payload, err := message.MarshalBinary()
if err != nil {
errs[index] = err
continue
}
m := &nats.Msg{
Header: convert2NatsMsgHeader(message.Header),
Subject: jw.subject,
Data: message.Payload,
Data: payload,
}
if future, err := jw.js.PublishMsgAsync(m, nats.MsgId(message.Header.ID)); err != nil { // nats.MsgId() is for exactly-once writing
errs[index] = err
Expand Down Expand Up @@ -272,10 +276,14 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message,
wg.Add(1)
go func(message isb.Message, idx int) {
defer wg.Done()
payload, err := message.MarshalBinary()
if err != nil {
errs[idx] = err
return
}
m := &nats.Msg{
Header: convert2NatsMsgHeader(message.Header),
Subject: jw.subject,
Data: message.Payload,
Data: payload,
}
if pubAck, err := jw.js.PublishMsg(m, nats.MsgId(message.Header.ID), nats.AckWait(2*time.Second)); err != nil { // nats.MsgId() is for exactly-once writing
errs[idx] = err
Expand Down Expand Up @@ -307,25 +315,3 @@ func (w *writeOffset) Sequence() (int64, error) {
func (w *writeOffset) AckIt() error {
return fmt.Errorf("not supported")
}

const (
_key = "k"
_id = "i"
_late = "l"
_eventTime = "pev"
)

func convert2NatsMsgHeader(header isb.Header) nats.Header {
r := nats.Header{}
r.Add(_id, header.ID)
r.Add(_key, header.Key)
if header.IsLate {
r.Add(_late, "1")
} else {
r.Add(_late, "0")
}
if !header.EventTime.IsZero() {
r.Add(_eventTime, fmt.Sprint(header.EventTime.UnixMilli()))
}
return r
}
11 changes: 0 additions & 11 deletions pkg/isb/stores/jetstream/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,3 @@ func TestWriteClose(t *testing.T) {
assert.NoError(t, bw.Close())

}

// TestConvert2NatsMsgHeader is used to convert nats header
func TestConvert2NatsMsgHeader(t *testing.T) {
isbHeader := isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: time.Unix(1636470000, 0),
},
ID: "1",
}
assert.NotNil(t, convert2NatsMsgHeader(isbHeader))
}
2 changes: 1 addition & 1 deletion pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func TestReduceDataForward_Sum(t *testing.T) {
go reduceDataForward.Start()

// start the producer
go publishMessages(ctx, startTime, messageValue, 300, 10, p[fromBuffer.GetName()], fromBuffer)
publishMessages(ctx, startTime, messageValue, 300, 10, p[fromBuffer.GetName()], fromBuffer)

// wait until there is data in to buffer
for buffer.IsEmpty() {
Expand Down

0 comments on commit 67fc688

Please sign in to comment.