Skip to content

Commit

Permalink
Merge pull request redpanda-data#22 from dcrodman/dr/transaction-exam…
Browse files Browse the repository at this point in the history
…ples

Add examples for transactional producer and consumer
  • Loading branch information
twmb committed Feb 8, 2021
2 parents f84508a + 982dd3a commit c2c014a
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 0 deletions.
12 changes: 12 additions & 0 deletions examples/transactions/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

import "context"

func main() {
kafkaBrokers := "localhost:9092"
topic := "test"

ctx := context.Background()
go startConsuming(ctx, kafkaBrokers, topic)
startProducing(ctx, kafkaBrokers, topic)
}
45 changes: 45 additions & 0 deletions examples/transactions/transactional_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package main

import (
"context"
"fmt"
"strings"

"github.com/twmb/franz-go/pkg/kgo"
)

func startConsuming(ctx context.Context, kafkaBrokers string, topic string) {
client, err := kgo.NewClient(
kgo.SeedBrokers(strings.Split(kafkaBrokers, ",")...),
// Only read messages that have been written as part of committed transactions.
kgo.FetchIsolationLevel(kgo.ReadCommitted()),
)
if err != nil {
fmt.Printf("error initializing Kafka consumer: %v\n", err)
return
}

client.AssignGroup("my-consumer-group", kgo.GroupTopics(topic))
defer client.Close()

consumerLoop:
for {
fetches := client.PollFetches(ctx)
iter := fetches.RecordIter()

for _, fetchErr := range fetches.Errors() {
fmt.Printf(
"error consuming from topic: topic=%s, partition=%d, err=%v",
fetchErr.Topic, fetchErr.Partition, fetchErr.Err,
)
break consumerLoop
}

for !iter.Done() {
record := iter.Next()
fmt.Printf("consumed record with message: %v", string(record.Value))
}
}

fmt.Println("consumer exited")
}
102 changes: 102 additions & 0 deletions examples/transactions/transactional_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main

import (
"context"
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/twmb/franz-go/pkg/kgo"
)

func startProducing(ctx context.Context, kafkaBrokers string, topic string) {
producerId := strconv.FormatInt(int64(os.Getpid()), 10)
client, err := kgo.NewClient(
kgo.SeedBrokers(strings.Split(kafkaBrokers, ",")...),
kgo.TransactionalID(producerId),
)
if err != nil {
fmt.Printf("error initializing Kafka producer client: %v", err)
return
}

defer client.Close()

batch := 0
for {
if err := client.BeginTransaction(); err != nil {
fmt.Printf("error beginning transaction: %v\n", err)
break
}

// Write some messages in the transaction.
if err := produceRecords(ctx, client, topic, batch); err != nil {
fmt.Printf("error producing message: %v\n", err)
rollback(ctx, client)
continue
}

// Flush all of the buffered messages.
if err := client.Flush(ctx); err != nil {
if err != context.Canceled {
fmt.Printf("error flushing messages: %v\n", err)
}

rollback(ctx, client)
continue
}

// Attempt to commit the transaction and explicitly abort if it fails.
if err := client.EndTransaction(ctx, kgo.TryCommit); err != nil {
fmt.Printf("error committing transaction: %v\n", err)

if rollbackErr := client.AbortBufferedRecords(ctx); err != nil {
fmt.Printf("error rolling back transaction after commit failure: %v\n", rollbackErr)
}
}

batch += 1
time.Sleep(10 * time.Second)
}

fmt.Println("producer exited")
}

func produceRecords(ctx context.Context, client *kgo.Client, topic string, batch int) error {
errChan := make(chan error)

// Records are produced sequentially in order to demonstrate that a consumer
// using the ReadCommitted isolation level will not consume any records until
// the transaction is committed.
for i := 0; i < 10; i++ {
message := fmt.Sprintf("batch %d record %d\n", batch, i)
r := &kgo.Record{
Value: []byte(message),
Topic: topic,
}

err := client.Produce(ctx, r, func(r *kgo.Record, e error) {
fmt.Printf("produced message: %s", message)
errChan <- e
})
if err != nil {
return err
}

if err := <-errChan; err != nil {
return err
}
}

return nil
}

func rollback(ctx context.Context, client *kgo.Client) {
if err := client.EndTransaction(ctx, kgo.TryAbort); err != nil {
fmt.Printf("error rolling back transaction: %v\n", err)
return
}
fmt.Println("transaction rolled back")
}

0 comments on commit c2c014a

Please sign in to comment.