Skip to content

Commit

Permalink
Remove snapshot logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nsurfer committed Jan 6, 2021
1 parent 1a29e1b commit 3e0aed6
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 148 deletions.
118 changes: 0 additions & 118 deletions js.go
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -52,10 +51,6 @@ type JetStreamManager interface {
DeleteStream(name string) error
// Stream information.
StreamInfo(stream string) (*StreamInfo, error)
// Snapshot stream.
SnapshotStream(name string, timeout time.Duration, cfg *StreamSnapshotConfig) (io.Reader, error)
// Restore stream.
RestoreStream(name string, snapshot io.Reader) error
// Purge stream messages.
PurgeStream(name string) error
// NewStreamLister is used to return pages of StreamInfo objects.
Expand Down Expand Up @@ -1421,119 +1416,6 @@ func (js *js) DeleteStream(name string) error {
return nil
}

// StreamSnapshotConfig contains options for snapshotting a Stream.
type StreamSnapshotConfig struct {
// Subject to deliver the chunks to for the snapshot.
DeliverSubject string `json:"deliver_subject"`
// Do not include consumers in the snapshot.
NoConsumers bool `json:"no_consumers,omitempty"`
// Optional chunk size preference.
// Best to just let server select.
ChunkSize int `json:"chunk_size,omitempty"`
// Check all message's checksums prior to snapshot.
CheckMsgs bool `json:"jsck,omitempty"`
}

// JSAPIStreamSnapshotResponse is the response for a snapshot request.
type JSAPIStreamSnapshotResponse struct {
APIResponse
// Estimate of number of blocks for the messages.
NumBlks int `json:"num_blks"`
// Block size limit as specified by the stream.
BlkSize int `json:"blk_size"`
}

// SnapshotStream creates a snapshot of a Stream. It returns a copy of the
// snapshot data.
func (js *js) SnapshotStream(name string, timeout time.Duration, cfg *StreamSnapshotConfig) (io.Reader, error) {
if cfg == nil {
return nil, ErrStreamSnapshotConfigRequired
}
if cfg.DeliverSubject == "" {
return nil, ErrDeliverSubjectRequired
}

req, err := json.Marshal(cfg)
if err != nil {
return nil, err
}

ssSubj := js.apiSubj(fmt.Sprintf(JSApiStreamSnapshotT, name))
r, err := js.nc.Request(ssSubj, req, js.wait)
if err != nil {
return nil, err
}
var resp JSAPIStreamSnapshotResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}

// Setup to process snapshot chunks.
var snapshot []byte
done := make(chan struct{}, 1)
sub, err := js.nc.Subscribe(cfg.DeliverSubject, func(m *Msg) {
// EOF
if len(m.Data) == 0 {
done <- struct{}{}
return
}
// Could be writing to a file here too.
snapshot = append(snapshot, m.Data...)
// Flow ack
m.Respond(nil)
})
if err != nil {
return nil, err
}
defer sub.Unsubscribe()
// Wait to receive the snapshot.
select {
case <-done:
case <-time.After(timeout):
return nil, errors.New("nats: snapshot timeout exceeded")
}

return bytes.NewReader(snapshot), nil
}

// JSAPIStreamRestoreResponse is the direct response to the restore request.
type JSAPIStreamRestoreResponse struct {
APIResponse
// Subject to deliver the chunks to for the snapshot restore.
DeliverSubject string `json:"deliver_subject"`
}

// RestoreStream restores a Stream from a snapshot.
func (js *js) RestoreStream(name string, snapshot io.Reader) error {
rsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamRestoreT, name))
r, err := js.nc.Request(rsSubj, nil, js.wait)
if err != nil {
return err
}
var resp JSAPIStreamRestoreResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}

// Can be any size message.
var chunk [512]byte
for r := snapshot; ; {
n, err := r.Read(chunk[:])
if err != nil {
break
}
js.nc.Request(resp.DeliverSubject, chunk[:n], js.wait)
}
js.nc.Request(resp.DeliverSubject, nil, js.wait)
return nil
}

// JSAPIStreamPurgeResponse.
type JSAPIStreamPurgeResponse struct {
APIResponse
Expand Down
40 changes: 10 additions & 30 deletions test/js_test.go
Expand Up @@ -579,16 +579,6 @@ func TestJetStreamManagement(t *testing.T) {
t.Fatalf("ConsumerInfo is not correct %+v", si)
}

// Create a snapshot using our client API.
inboxSubj := nats.NewInbox()
snapshot, err := js.SnapshotStream("foo", 5*time.Second, &nats.StreamSnapshotConfig{
DeliverSubject: inboxSubj,
ChunkSize: 1024,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sl := js.NewStreamLister()
if !sl.Next() {
if err := sl.Err(); err != nil {
Expand Down Expand Up @@ -633,6 +623,16 @@ func TestJetStreamManagement(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

// Purge a stream using our client API.
if err := js.PurgeStream("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si, err := js.StreamInfo("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if si.State.Msgs != 0 {
t.Fatalf("StreamInfo.Msgs is not correct")
}

// Delete a stream using our client API.
if err := js.DeleteStream(""); err == nil {
t.Fatal("Unexpected success")
Expand All @@ -643,26 +643,6 @@ func TestJetStreamManagement(t *testing.T) {
if _, err := js.StreamInfo("foo"); err == nil {
t.Fatalf("Unexpected success")
}

// Restore a stream using our client API.
if err := js.RestoreStream("foo", snapshot); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si, err := js.StreamInfo("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if si.Config.Name != "foo" && si.State.Msgs != 25 {
t.Fatalf("StreamInfo is not correct %+v", si)
}

// Purge a stream using our client API.
if err := js.PurgeStream("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si, err := js.StreamInfo("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if si.State.Msgs != 0 {
t.Fatalf("StreamInfo.Msgs is not correct")
}
}

func TestJetStreamImport(t *testing.T) {
Expand Down

0 comments on commit 3e0aed6

Please sign in to comment.