Skip to content

Commit

Permalink
js: Add godoc examples for JetStream context
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs authored and nsurfer committed Apr 1, 2021
1 parent 458d7de commit 2905267
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 3 deletions.
203 changes: 201 additions & 2 deletions example_test.go
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -14,15 +14,16 @@
package nats_test

import (
"context"
"fmt"
"log"
"time"

"github.com/nats-io/nats.go"
)

// Shows different ways to create a Conn
func ExampleConnect() {

nc, _ := nats.Connect(nats.DefaultURL)
nc.Close()

Expand Down Expand Up @@ -277,3 +278,201 @@ func ExampleEncodedConn_BindRecvChan() {

fmt.Printf("%v says hello!\n", who)
}

func ExampleJetStream() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}

// Use the JetStream context to produce and consumer messages
// that have been persisted.
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}

js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo"},
})

js.Publish("foo", []byte("Hello JS!"))

// Create async consumer on subject 'foo'. Async subscribers
// ack a message once exiting the callback.
js.Subscribe("foo", func(msg *nats.Msg) {
meta, _ := msg.Metadata()
fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream)
fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
})

// Async subscriber with manual acks.
js.Subscribe("foo", func(msg *nats.Msg) {
msg.Ack()
}, nats.ManualAck())

// Async queue subscription where members load balance the
// received messages together.
js.QueueSubscribe("foo", "group", func(msg *nats.Msg) {
msg.Ack()
}, nats.ManualAck())

// Subscriber to consume messages synchronously.
sub, _ := js.SubscribeSync("foo")
msg, _ := sub.NextMsg(2 * time.Second)
msg.Ack()

// QueueSubscribe with group or load balancing.
sub, _ = js.QueueSubscribeSync("foo", "group")
msg, _ = sub.NextMsg(2 * time.Second)
msg.Ack()

// ChanSubscribe
msgCh := make(chan *nats.Msg, 8192)
sub, _ = js.ChanSubscribe("foo", msgCh)

select {
case msg := <-msgCh:
fmt.Println("[Received]", msg)
case <-time.After(1 * time.Second):
}
}

func ExampleJetStreamManager() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}

// Use the JetStream context to manage streams and consumers.
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}

// Create a stream
js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.*"},
MaxBytes: 1024,
})

// Update a stream
js.UpdateStream(&nats.StreamConfig{
Name: "foo",
MaxBytes: 2048,
})

// Add a druable consumer
js.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "bar",
})

// Get information about all streams
for info := range js.StreamsInfo() {
fmt.Println("stream name:", info.Config.Name)
}

// Get information about all consumers
for info := range js.ConsumersInfo("foo") {
fmt.Println("consumer name:", info.Name)
}

// Delete a consumer
js.DeleteConsumer("foo", "bar")

// Delete a stream
js.DeleteStream("foo")
}

func ExamplePubOpt() {
nc, err := nats.Connect("localhost")
if err != nil {
log.Fatal(err)
}

// Create JetStream context to produce/consumer messages that will be persisted.
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}

// Create stream to persist messages published on 'foo'.
js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo"},
})

// Publish is synchronous by default, and waits for a PubAck response.
js.Publish("foo", []byte("Hello JS!"))

// Publish with a custom timeout.
js.Publish("foo", []byte("Hello JS!"), nats.AckWait(500*time.Millisecond))

// Publish with a context.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

js.Publish("foo", []byte("Hello JS!"), nats.Context(ctx))

// Publish and assert the expected stream name.
js.Publish("foo", []byte("Hello JS!"), nats.ExpectStream("FOO"))

// Publish and assert the last sequence.
js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastSequence(5))

// Publish and tag the message with an ID.
js.Publish("foo", []byte("Hello JS!"), nats.MsgId("foo:6"))

// Publish and assert the last msg ID.
js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastMsgId("foo:6"))
}

func ExampleMaxWait() {
nc, _ := nats.Connect("localhost")

// Set default timeout for JetStream API requests,
// following requests will inherit this timeout.
js, _ := nc.JetStream(nats.MaxWait(3 * time.Second))

// Set custom timeout for a JetStream API request.
js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo"},
}, nats.MaxWait(2*time.Second))

sub, _ := js.PullSubscribe("foo", "my-durable-name")

// Fetch using the default timeout of 3 seconds.
msgs, _ := sub.Fetch(1)

// Set custom timeout for a pull batch request.
msgs, _ = sub.Fetch(1, nats.MaxWait(2*time.Second))

for _, msg := range msgs {
msg.Ack()
}
}

func ExampleAckWait() {
nc, _ := nats.Connect("localhost")
js, _ := nc.JetStream()

// Set custom timeout for a JetStream API request.
js.AddStream(&nats.StreamConfig{
Name: "FOO",
Subjects: []string{"foo"},
})

// Wait for an ack response for 2 seconds.
js.Publish("foo", []byte("Hello JS!"), nats.AckWait(2*time.Second))

// Create consumer on 'foo' subject that waits for an ack for 10s,
// after which the message will be delivered.
sub, _ := js.SubscribeSync("foo", nats.AckWait(10*time.Second))
msg, _ := sub.NextMsg(2 * time.Second)

// Wait for ack of ack for 2s.
msg.AckSync(nats.AckWait(2 * time.Second))
}
2 changes: 1 addition & 1 deletion norace_test.go
Expand Up @@ -135,7 +135,7 @@ func TestNoRaceJetStreamConsumerSlowConsumer(t *testing.T) {
nc.SetErrorHandler(func(_ *Conn, _ *Subscription, _ error) {})

// Queue up 1M small messages.
toSend := uint64(1_000_000)
toSend := uint64(1000000)
for i := uint64(0); i < toSend; i++ {
nc.Publish("js.p", []byte("ok"))
}
Expand Down

0 comments on commit 2905267

Please sign in to comment.