Skip to content

Commit

Permalink
Merge pull request #631 from wallyqs/jsm-delete-msg
Browse files Browse the repository at this point in the history
Add DeleteMsg to JetStream management
  • Loading branch information
wallyqs committed Jan 13, 2021
2 parents be60b83 + 9355a98 commit acca6d6
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 2 deletions.
43 changes: 41 additions & 2 deletions js.go
Expand Up @@ -55,6 +55,8 @@ type JetStreamManager interface {
PurgeStream(name string) error
// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister
// DeleteMsg erases a message from a Stream.
DeleteMsg(name string, seq uint64) error

// Create a consumer.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)
Expand Down Expand Up @@ -118,10 +120,10 @@ type js struct {

// Request API subjects for JetStream.
const (
// JSDefaultAPIPrefix is the default prefix for the JetStream API.
JSDefaultAPIPrefix = "$JS.API."
// JSApiAccountInfo is for obtaining general information about JetStream.
JSApiAccountInfo = "INFO"

// JSApiConsumerCreateT is used to create consumers.
JSApiConsumerCreateT = "CONSUMER.CREATE.%s"
// JSApiDurableCreateT is used to create durable consumers.
Expand All @@ -134,7 +136,6 @@ const (
JSApiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
// JSApiConsumerListT is used to return all detailed consumer information
JSApiConsumerListT = "CONSUMER.LIST.%s"

// JSApiStreams can lookup a stream by subject.
JSApiStreams = "STREAM.NAMES"
// JSApiStreamCreateT is the endpoint to create new streams.
Expand All @@ -153,6 +154,8 @@ const (
JSApiStreamPurgeT = "STREAM.PURGE.%s"
// JSApiStreamListT is the endpoint that will return all detailed stream information
JSApiStreamList = "STREAM.LIST"
// JSApiMsgDeleteT is the endpoint to remove a message.
JSApiMsgDeleteT = "STREAM.MSG.DELETE.%s"
)

// JetStream returns a JetStream context for pub/sub interactions.
Expand Down Expand Up @@ -1416,6 +1419,42 @@ func (js *js) DeleteStream(name string) error {
return nil
}

type JSAPIMsgDeleteRequest struct {
Seq uint64 `json:"seq"`
}

// JSAPIMsgDeleteResponse is the response for a Stream delete request.
type JSAPIMsgDeleteResponse struct {
APIResponse
Success bool `json:"success,omitempty"`
}

// DeleteMsg deletes a message from a stream.
func (js *js) DeleteMsg(name string, seq uint64) error {
if name == _EMPTY_ {
return ErrStreamNameRequired
}

req, err := json.Marshal(&JSAPIMsgDeleteRequest{Seq: seq})
if err != nil {
return err
}

dsSubj := js.apiSubj(fmt.Sprintf(JSApiMsgDeleteT, name))
r, err := js.nc.Request(dsSubj, req, js.wait)
if err != nil {
return err
}
var resp JSAPIMsgDeleteResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}
return nil
}

// JSAPIStreamPurgeResponse.
type JSAPIStreamPurgeResponse struct {
APIResponse
Expand Down
117 changes: 117 additions & 0 deletions test/js_test.go
Expand Up @@ -645,6 +645,123 @@ func TestJetStreamManagement(t *testing.T) {
}
}

func TestJetStreamManagement_DeleteMsg(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.A", "foo.B", "foo.C"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 0; i < 5; i++ {
js.Publish("foo.A", []byte("A"))
js.Publish("foo.B", []byte("B"))
js.Publish("foo.C", []byte("C"))
}

si, err := js.StreamInfo("foo")
if err != nil {
t.Fatal(err)
}
var total uint64 = 15
if si.State.Msgs != total {
t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs)
}

expected := 5
msgs := make([]*nats.Msg, 0)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

sub, err := js.Subscribe("foo.C", func(msg *nats.Msg) {
msgs = append(msgs, msg)
if len(msgs) == expected {
cancel()
}
})
if err != nil {
t.Fatal(err)
}
<-ctx.Done()
sub.Unsubscribe()

got := len(msgs)
if got != expected {
t.Fatalf("Expected %d, got %d", expected, got)
}

msg := msgs[0]
meta, err := msg.MetaData()
if err != nil {
t.Fatal(err)
}
originalSeq := meta.Stream

err = js.DeleteMsg("foo", originalSeq)
if err != nil {
t.Fatal(err)
}

si, err = js.StreamInfo("foo")
if err != nil {
t.Fatal(err)
}
total = 14
if si.State.Msgs != total {
t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs)
}

// There should be only 4 messages since one deleted.
expected = 4
msgs = make([]*nats.Msg, 0)
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

sub, err = js.Subscribe("foo.C", func(msg *nats.Msg) {
msgs = append(msgs, msg)

if len(msgs) == expected {
cancel()
}
})
if err != nil {
t.Fatal(err)
}
<-ctx.Done()
sub.Unsubscribe()

msg = msgs[0]
meta, err = msg.MetaData()
if err != nil {
t.Fatal(err)
}
newSeq := meta.Stream

// First message removed
if newSeq <= originalSeq {
t.Errorf("Expected %d to be higher sequence than %d", newSeq, originalSeq)
}
}

func TestJetStreamImport(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
Expand Down

0 comments on commit acca6d6

Please sign in to comment.