Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs #1

Merged
merged 1 commit into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Config[T1, T2 any] struct {
Handler Handler[T1, T2]
}

// New instantiates a new Processor. `Processor.Run` must be called after calling `New`
// before events will be processed.
func New[T1, T2 any](c Config[T1, T2], options ...func(*Processor[T1, T2])) (*Processor[T1, T2], error) {
if c.Source == nil || c.Destination == nil {
return nil, errors.New("both Source and Destination required")
Expand Down Expand Up @@ -62,11 +64,13 @@ func (p *Processor[T1, T2]) handle(ctx context.Context) error {
}
}

// Run is synchronous, and runs until either the ctx is canceled, or an
// unrecoverable error is encountered. If the context is canceled manually,
// this will not return an error on clean shutdown. Otherwise it will return
// ctx.Err() (e.g. in the case of a timeout). If an unrecoverable error is
// returned from a stage, then it's wrapped and returned.
// Run is a blocking call, and runs until either the ctx is canceled, or an
// unrecoverable error is encountered. If any error is returned from a source,
// destination or the handler func, then it's wrapped and returned. If the
// passed-in context is canceled, this will not return the context.Canceled
// error to indicate a clean shutdown was successful. Run will return
// ctx.Err() in other cases where context termination leads to shutdown of the
// processor.
func (p *Processor[T1, T2]) Run(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(p.parallelism)
Expand Down
100 changes: 84 additions & 16 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,56 @@ import (
"encoding/json"
)

type Attributes interface {
Unwrap() Attributes
}

// Message is the data wrapper which accepts any serializable type as it's
// embedded Value as well as some other metadata.
type Message[T any] struct {
Key string
Value T
Topic string
// Key represents the key of this message. This field is intended to be used
// primarily as an input into sharding functions to determine how a message
// should be routed within a topic.
Key string
// Value is the embedded value of this message. It is the object of interest
// to the users of this library. It can be any serializable type so long as
// the sources and destinations know how to serialize it.
Value T
// Topic indicates which topic this message came from (if applicable). It
// should not be used as a means to set the output topic for destinations.
Topic string
// Attributes are inspired by context.Context and are used as a means to pass
// metadata from a source implementation through to a consumer. See examples
// for details.
Attributes Attributes
}

// NOTE: maybe have sources/destinations which are already message
// oriented have their own interface: Recv(ctx) []byte, func(), error
// combined with a generic deserializer source.
// This way you don't have to necessarily program in generics to write a
// source.
type Attributes interface {
Unwrap() Attributes
}

// Source defines the abstraction for which flow consumes or receives messages
// from an external entity. Most notable implementations are queues (Kafka,
// RabbitMQ, Redis), but anything which is message oriented could be made into
// a source (e.g. a newline-delimited-JSON file could conceivably be a source).
type Source[T any] interface {
// Recv should block until Message is available to be returned from the
// source. Implementations _must_ listen on <-ctx.Done() and return
// ctx.Err() if the context finishes while waiting for new messages.
//
// All errors which are retryable must be handled inside the Recv func, or
// otherwise handled internally. Any errors returned from Recv indicate a
// fatal error to the processor, and the processor will terminate. If you
// want to be able to delegate the responsibility of deciding retryable
// errors to the user of the Source, then allow the user to register a
// callback, e.g. `IsRetryable(err error) bool`, on source instantiation.
//
// The second return value is the acknowlegement function. Ack is called when
// the message returned from Recv has been successfully written to it's
// destination. It should not be called twice. Sources may panic in that
// scenario as it indicates a logical flaw for delivery guarantees within the
// program.
//
// In the case of sending to multiple destinations, or teeing the data stream
// inside a processor's handler function, then the programmer must decide
// themselves how to properly acknowledge the event, and recognize that
// destinations will probably be acknowledging the message as well.
Recv(context.Context) (Message[T], func(), error)
}

Expand All @@ -31,7 +64,36 @@ func (sf SourceFunc[T]) Recv(ctx context.Context) (Message[T], func(), error) {
return sf(ctx)
}

// Destination defines the abstraction for writing messages to an external
// entity. Most notable implementations are queues (Kafka, RabbitMQ, Redis),
// but anything which is message oriented could be made into a Destination
// (e.g. a newline-delimited-JSON file could conceivably be a Destination).
type Destination[T any] interface {
// Send sends the passed in messages to the Destination. Implementations
// _must_ listen on <-ctx.Done() and return ctx.Err() if the context finishes
// while waiting to send messages.
//
// *Send need not be blocking*. In the case of a non-blocking call to send,
// it's expected that ack will be called _only after_ the message has been
// successfully written to the Destination.
//
// All errors which are retryable must be handled inside the Send func, or
// otherwise handled internally. Any errors returned from Send indicate a
// fatal error to the processor, and the processor will terminate. If you
// want to be able to delegate the responsibility of deciding retryable
// errors to the user of the Destination, then allow the user to register a
// callback, e.g. `IsRetryable(err error) bool`, when instantiating a
// Destination.
//
// The second argument value is the acknowlegement function. Ack is called
// when the message has been successfully written to the Destination. It
// should not be called twice. Sources may panic if ack is called twice as
// it indicates a logical flaw for delivery guarantees within the program.
//
// In the case of sending to multiple destinations, or teeing the data stream
// inside a processor's handler function, then the programmer must decide
// themselves how to properly acknowledge the event, and recognize that
// destinations will probably be acknowledging the message as well.
Send(context.Context, func(), ...Message[T]) error
}

Expand All @@ -41,8 +103,14 @@ func (df DestinationFunc[T]) Send(ctx context.Context, ack func(), msgs ...Messa
return df(ctx, ack, msgs...)
}

// Handler defines a function which operates on a single event of type T1 and
// returns a list of events of type T2. T1 and T2 may be equivalent types.
// Returning an empty slice and a nil error indicates that the message passed
// in was processed successfully, no output was necessary, and therefore should
// be acknowledged by the processor as having been processed successfully.
type Handler[T1, T2 any] func(context.Context, Message[T1]) ([]Message[T2], error)

// Pipe is a handler which simply passes a message through without modification.
func Pipe[T any](ctx context.Context, msg Message[T]) ([]Message[T], error) {
return []Message[T]{msg}, nil
}
Expand All @@ -66,13 +134,13 @@ func TransformUnmarshalJSON[T any](bs []byte) (T, error) {

type DeserializationSource[T any] struct {
src ByteSource
xform func([]byte) (T, error)
deser func([]byte) (T, error)
}

func NewDeserSource[T any](src ByteSource, xform DeserFunc[T]) DeserializationSource[T] {
func NewDeserSource[T any](src ByteSource, deser DeserFunc[T]) DeserializationSource[T] {
return DeserializationSource[T]{
src: src,
xform: xform,
deser: deser,
}
}

Expand All @@ -81,7 +149,7 @@ func (ds DeserializationSource[T]) Recv(ctx context.Context) (Message[T], func()
if err != nil {
return Message[T]{}, ack, err
}
val, err := ds.xform(msg.Value)
val, err := ds.deser(msg.Value)

ret := Message[T]{
Key: msg.Key,
Expand Down
39 changes: 24 additions & 15 deletions x/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@ func (ff FlushFunc[T]) Flush(c context.Context, msgs []flow.Message[T]) error {
return ff(c, msgs)
}

type msgAck[T any] struct {
msg flow.Message[T]
ack func()
type Destination[T any] struct {
flusher Flusher[T]
flushq chan struct{}
flushlen int
flushfreq time.Duration
flusherr chan error
flushwg *sync.WaitGroup

messages chan msgAck[T]
buf []msgAck[T]
}

type OptFunc func(*Opts)
Expand All @@ -43,18 +50,10 @@ func FlushLength(size int) func(*Opts) {
}
}

type Destination[T any] struct {
flusher Flusher[T]
flushq chan struct{}
flushlen int
flushfreq time.Duration
flusherr chan error
flushwg *sync.WaitGroup

messages chan msgAck[T]
buf []msgAck[T]
}

// NewDestination instantiates a new batcher. `Destination.Run` must be called
// after calling `New` before events will be processed in this destination. Not
// calling `Run` will likely end in a deadlock as the internal channel being
// written to by `Send` will not be getting read.
func NewDestination[T any](f Flusher[T], opts ...OptFunc) *Destination[T] {
cfg := Opts{
FlushLength: 100,
Expand All @@ -79,6 +78,16 @@ func NewDestination[T any](f Flusher[T], opts ...OptFunc) *Destination[T] {

}

type msgAck[T any] struct {
msg flow.Message[T]
ack func()
}

// Send satisfies the flow.Destination interface and accepts messages to be
// buffered for flushing after the FlushLength limit is reached or the
// FlushFrequency timer fires, whichever comes first.
//
// Messages will not be acknowledged until they have been flushed successfully.
func (d *Destination[T]) Send(ctx context.Context, ack func(), msgs ...flow.Message[T]) error {

callMe := ackFn(ack, len(msgs))
Expand Down