Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Panic documentation & Migration guide #252

Merged
merged 2 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
This document sums up issues for migrating between 0.1.4 to 0.9.x

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The guide is really clear, nice work 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there something else that needs to be documented let me know. It's just a start...


## restartable/auto reconnect view.Run()

In 0.1.4, if a view was created to be restartable, it returned from `Run` in case of errors, but allowed
to be restarted calling `Run` again. The view was still usable, even if it was not running and receiving updates.

The behavior of that option has changed in 0.9.x in a way that the `view.Run` does the reconnect internally using configurable backoff.
The Option was also renamed (the old version has been kept for compatibility reasons).

```go

// create a view
view014, _ := NewView(..., WithViewRestartable())

// reconnect logic in 0.1.4
go func(){
for running{
err:= view014.Run(context.Background())
// handle error

// sleep or simple backoff logic
time.Sleep(time.Second)
}
}()

// After migration:
// create a view
view09x, _ := NewView(..., WithViewAutoReconnect())
ctx, cancel := context.WithCancel(context.Background())

// no need for reconnect logic, it's handled by the view internally
go func(){
err:= view09x.Run(ctx)
// handle shutdown error
}()

// stop view
cancel()

```
56 changes: 54 additions & 2 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,28 @@ type emitter func(topic string, key string, value []byte) *Promise
// Context provides access to the processor's table and emit capabilities to
// arbitrary topics in kafka.
// Upon arrival of a message from subscribed topics, the respective
// ConsumeCallback is invoked with a context object along
// with the input message.
// ConsumeCallback is invoked with a context object along with the input message.
// The context is only valid within the callback, do not store it or pass it to other goroutines.
//
// Error handling
//
// Most methods of the context can fail due to different reasons, which are handled in different ways:
// Synchronous errors like
// * wrong codec for topic (a message cannot be marshalled or unmarshalled)
// * Emit to a topic without the Output definition in the group graph
// * Value/SetValue without defining Persist in the group graph
// * Join/Lookup without the definition in the group graph etc..
// will result in a panic to stop the callback immediately and shutdown the processor.
// This is necessary to preserve integrity of the processor and avoid further actions.
// Do not recover from that panic, otherwise the goroutine will deadlock.
//
// Retrying synchronous errors must be implemented by restarting the processor.
// If errors must be tolerated (which is not advisable because they're usually persistent), provide
// fail-tolerant versions of the producer, storage or codec as needed.
//
// Asynchronous errors can occur when the callback has been finished, but e.g. sending a batched
// message to kafka fails due to connection errors or leader election in the cluster.
// Those errors still shutdown the processor but will not result in a panic in the callback.
type Context interface {
// Topic returns the topic of input message.
Topic() Stream
Expand All @@ -32,38 +52,70 @@ type Context interface {
Offset() int64

// Value returns the value of the key in the group table.
//
// This method might panic to initiate an immediate shutdown of the processor
// to maintain data integrity. Do not recover from that panic or
// the processor might deadlock.
Value() interface{}

// Headers returns the headers of the input message
Headers() map[string][]byte

// SetValue updates the value of the key in the group table.
// It stores the value in the local cache and sends the
// update to the Kafka topic representing the group table.
//
// This method might panic to initiate an immediate shutdown of the processor
// to maintain data integrity. Do not recover from that panic or
// the processor might deadlock.
SetValue(value interface{})

// Delete deletes a value from the group table. IMPORTANT: this deletes the
// value associated with the key from both the local cache and the persisted
// table in Kafka.
//
// This method might panic to initiate an immediate shutdown of the processor
// to maintain data integrity. Do not recover from that panic or
// the processor might deadlock.
Delete()

// Timestamp returns the timestamp of the input message. If the timestamp is
// invalid, a zero time will be returned.
Timestamp() time.Time

// Join returns the value of key in the copartitioned table.
//
// This method might panic to initiate an immediate shutdown of the processor
// to maintain data integrity. Do not recover from that panic or
// the processor might deadlock.
Join(topic Table) interface{}

// Lookup returns the value of key in the view of table.
//
// This method might panic to initiate an immediate shutdown of the processor
// to maintain data integrity. Do not recover from that panic or
// the processor might deadlock.
Lookup(topic Table, key string) interface{}

// Emit asynchronously writes a message into a topic.
//
// This method might panic to initiate an immediate shutdown of the processor
// to maintain data integrity. Do not recover from that panic or
// the processor might deadlock.
Emit(topic Stream, key string, value interface{})

// Loopback asynchronously sends a message to another key of the group
// table. Value passed to loopback is encoded via the codec given in the
// Loop subscription.
//
// This method might panic to initiate an immediate shutdown of the processor
// to maintain data integrity. Do not recover from that panic or
// the processor might deadlock.
Loopback(key string, value interface{})

// Fail stops execution and shuts down the processor
// The callback is stopped immediately by panicking. Do not recover from that panic or
// the processor might deadlock.
Fail(err error)

// Context returns the underlying context used to start the processor or a
Expand Down
8 changes: 7 additions & 1 deletion graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,16 @@ type groupTable struct {
}

// Persist represents the edge of the group table, which is log-compacted and
// copartitioned with the input streams. This edge specifies the codec of the
// copartitioned with the input streams.
// Without Persist, calls to ctx.Value or ctx.SetValue in the consume callback will
// fail and lead to shutdown of the processor.
//
// This edge specifies the codec of the
// messages in the topic, ie, the codec of the values of the table.
// The processing of input streams is blocked until all partitions of the group
// table are recovered.
//
// The topic name is derived from the group name by appending "-table".
func Persist(c Codec) Edge {
return &groupTable{&topicDef{codec: c}}
}
Expand Down
18 changes: 13 additions & 5 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"hash"
"hash/fnv"
"log"
"path/filepath"
"time"

Expand Down Expand Up @@ -301,7 +302,7 @@ type voptions struct {
tableCodec Codec
updateCallback UpdateCallback
hasher func() hash.Hash32
restartable bool
autoreconnect bool
backoffResetTime time.Duration

builders struct {
Expand Down Expand Up @@ -370,12 +371,19 @@ func WithViewClientID(clientID string) ViewOption {
}
}

// WithViewRestartable defines the view can be restarted, even when Run()
// returns errors. If the view is restartable, the client must call Terminate()
// to release all resources, ie, close the local storage.
// WithViewRestartable is kept only for backwards compatibility.
// DEPRECATED: since the behavior has changed, this name is misleading and should be replaced by
// WithViewAutoReconnect().
func WithViewRestartable() ViewOption {
log.Printf("Warning: this option is deprecated and will be removed. Replace with WithViewAutoReconnect, which is semantically equivalent")
return WithViewAutoReconnect()
}

// WithViewAutoReconnect defines the view is reconnecting internally, so Run() does not return
// in case of connection errors. The view must be shutdown by cancelling the context passed to Run()
func WithViewAutoReconnect() ViewOption {
return func(o *voptions, table Table, codec Codec) {
o.restartable = true
o.autoreconnect = true
}
}

Expand Down
10 changes: 8 additions & 2 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,13 @@ func (v *View) createPartitions(brokers []string) (rerr error) {
return nil
}

// Run starts consuming the view's topic.
// Run starts consuming the view's topic and saving updates in the local persistent cache.
//
// The view will shutdown in case of errors or when the context is closed.
// It can be initialized with autoreconnect
// view := NewView(..., WithViewAutoReconnect())
// which makes the view internally reconnect in case of errors.
// Then it will only stop by canceling the context (see example).
func (v *View) Run(ctx context.Context) (rerr error) {
v.log.Debugf("starting")
defer v.log.Debugf("stopped")
Expand Down Expand Up @@ -178,7 +184,7 @@ func (v *View) Run(ctx context.Context) (rerr error) {
for _, partition := range v.partitions {
partition := partition
catchupErrg.Go(func() error {
return partition.CatchupForever(catchupCtx, v.opts.restartable)
return partition.CatchupForever(catchupCtx, v.opts.autoreconnect)
})
}

Expand Down
Loading