Skip to content

Commit

Permalink
feat: reducer for stream aggregation without fault tolerance (#208)
Browse files Browse the repository at this point in the history
Signed-off-by: ashwinidulams <ashttk@gmail.com>
Signed-off-by: yhl01 <Yashash_HL@intuit.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: Yashash H L <yhl01@Yashashs-MacBook-Pro.local>
Signed-off-by: Yashash H L <yashash_hl@intuit.com>
Co-authored-by: yhl01 <Yashash_HL@intuit.com>
Co-authored-by: Vigith Maurice <vigith@gmail.com>
Co-authored-by: Yashash H L <yhl01@Yashashs-MacBook-Pro.local>
  • Loading branch information
4 people authored and whynowy committed Oct 27, 2022
1 parent 06a9b58 commit d0d74e1
Show file tree
Hide file tree
Showing 46 changed files with 2,324 additions and 162 deletions.
9 changes: 9 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3217,6 +3217,9 @@ StoreType (<code>string</code> alias)
</p>
</h3>
<p>
<p>
PBQ store’s backend type.
</p>
</p>
<h3 id="numaflow.numaproj.io/v1alpha1.TLS">
TLS
Expand Down Expand Up @@ -3768,6 +3771,12 @@ means no delay.
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.WindowType">
WindowType (<code>string</code> alias)
</p>
</h3>
<p>
</p>
<hr/>
<p>
<em> Generated with <code>gen-crd-api-reference-docs</code>. </em>
Expand Down
34 changes: 31 additions & 3 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,23 @@ const (
DefaultStoreSize = 1000000 // Default persistent store size
DefaultStoreMaxBufferSize = 100000 // Default buffer size for pbq in bytes

// Default window options
DefaultWindowType = FixedType
DefaultWindowDuration = 0

// Default reduce forward options
DefaultReadBatchSize = 100
)

// PBQ store's backend type.
type StoreType string

const (
InMemoryType StoreType = "in-memory"
FileSystemType StoreType = "file-system"
NoOpType StoreType = "no-op"
)

func (st StoreType) String() string {
switch st {
case InMemoryType:
Expand All @@ -138,12 +151,27 @@ func (st StoreType) String() string {
}
}

type WindowType string

const (
InMemoryType StoreType = "in-memory"
FileSystemType StoreType = "file-system"
NoOpType StoreType = "no-op"
FixedType WindowType = "fixed"
SlidingType WindowType = "sliding"
SessionType WindowType = "session"
)

func (wt WindowType) String() string {
switch wt {
case FixedType:
return string(FixedType)
case SlidingType:
return string(SlidingType)
case SessionType:
return string(SessionType)
default:
return "unknownWindowType"
}
}

var (
MessageKeyDrop = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
MessageKeyAll = fmt.Sprintf("%U__ALL__", '\\') // U+005C__ALL__
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
udfapplier "github.com/numaproj/numaflow/pkg/udf/applier"
udfapplier "github.com/numaproj/numaflow/pkg/udf/function"
"github.com/stretchr/testify/assert"
)

Expand Down
43 changes: 43 additions & 0 deletions pkg/messages/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package messages

import (
"encoding"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"time"
)

// Message is the representation of a message flowing through the numaflow
type Message interface {
ReadMessage
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
// SetID sets the ID for a message
SetID(id string)
// ID is the unique identifier for a message
ID() string
// SetEventTime represents the timestamp of the message depending on the time characteristics of the pipeline.
// please note that EventTime does not mean only the event time.
// This is intended to be set once and should be immutable. Therefore, an error could be thrown if
// an attempt is made to overwrite the time on a message.
SetEventTime(t time.Time)
// EventTime returns the timestamp of the message. Calling Time() on a message should give a meaningful error
// for the user to understand that a time has not been set yet.
EventTime() time.Time
// Key returns the key of the message
Key() string
// Payload returns the payload received from the source it was read.
Payload() []byte
}

// ReadMessage exposes functions to set and get in-vertex stream characteristics of a NumaMessage.
type ReadMessage interface {
// Offset returns the offset of the message from the source it was read.
Offset() isb.Offset
// SetWatermark sets the watermark on a message read from a source.
SetWatermark(watermark processor.Watermark)
// Watermark returns the watermark for a message.
Watermark() processor.Watermark
// Ack acknowledges a message
Ack()
}
4 changes: 2 additions & 2 deletions pkg/pbq/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type ReadWriteCloser interface {
// Reader provides methods to read from PBQ.
type Reader interface {
// ReadCh exposes channel to read from PBQ
ReadCh() <-chan *isb.Message
ReadCh() <-chan *isb.ReadMessage
// GC does garbage collection, it deletes all the persisted data from the store
GC() error
}
Expand All @@ -24,7 +24,7 @@ type Reader interface {
// No data can be written to PBQ after cob.
type WriteCloser interface {
// Write writes message to PBQ
Write(ctx context.Context, msg *isb.Message) error
Write(ctx context.Context, msg *isb.ReadMessage) error
// CloseOfBook (cob) closes PBQ, no writes will be accepted after cob
CloseOfBook()
// Close to handle context close on writer
Expand Down
17 changes: 17 additions & 0 deletions pkg/pbq/partition/partition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package partition

import (
"fmt"
"time"
)

// ID uniquely identifies a partition
type ID struct {
Start time.Time
End time.Time
Key string
}

func (p ID) String() string {
return fmt.Sprintf("%v-%v-%s", p.Start.Unix(), p.End.Unix(), p.Key)
}
26 changes: 14 additions & 12 deletions pkg/pbq/pbq.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ import (
"errors"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/pbq/partition"
"github.com/numaproj/numaflow/pkg/pbq/store"
"go.uber.org/zap"
)

var COBErr error = errors.New("error while writing to pbq, pbq is closed")
var COBErr = errors.New("error while writing to pbq, pbq is closed")

// PBQ Buffer queue which is backed with a persisted store, each partition
// will have a PBQ associated with it
type PBQ struct {
store store.Store
output chan *isb.Message
output chan *isb.ReadMessage
cob bool // cob to avoid panic in case writes happen after close of book
partitionID string
PartitionID partition.ID
options *options
manager *Manager
log *zap.SugaredLogger
Expand All @@ -26,10 +27,10 @@ type PBQ struct {
var _ ReadWriteCloser = (*PBQ)(nil)

// Write writes message to pbq and persistent store
func (p *PBQ) Write(ctx context.Context, message *isb.Message) error {
func (p *PBQ) Write(ctx context.Context, message *isb.ReadMessage) error {
// if cob we should return
if p.cob {
p.log.Errorw("failed to write message to pbq, pbq is closed", zap.Any("partitionID", p.partitionID), zap.Any("header", message.Header))
p.log.Errorw("failed to write message to pbq, pbq is closed", zap.Any("ID", p.PartitionID), zap.Any("header", message.Header))
return COBErr
}
var writeErr error
Expand Down Expand Up @@ -62,27 +63,28 @@ func (p *PBQ) Close() error {

// ReadCh exposes read channel to read messages from PBQ
// close on read channel indicates COB
func (p *PBQ) ReadCh() <-chan *isb.Message {
func (p *PBQ) ReadCh() <-chan *isb.ReadMessage {
return p.output
}

// GC is invoked after the Reader (ProcessAndForward) has finished
// forwarding the output to ISB.
// GC cleans up the PBQ and also the store associated with it. GC is invoked after the Reader (ProcessAndForward) has
// finished forwarding the output to ISB.
func (p *PBQ) GC() error {
err := p.store.GC()
p.store = nil
p.manager.deregister(p.partitionID)
p.manager.deregister(p.PartitionID)
return err
}

// replayRecordsFromStore replays store messages when replay flag is set during start up time
// replayRecordsFromStore replays store messages when replay flag is set during start up time. It replays by reading from
// the store and writing to the PBQ channel.
func (p *PBQ) replayRecordsFromStore(ctx context.Context) {
size := p.options.readBatchSize
readLoop:
for {
readMessages, eof, err := p.store.Read(int64(size))
readMessages, eof, err := p.store.Read(size)
if err != nil {
p.log.Errorw("error while replaying records from store", zap.Any("partitionID", p.partitionID), zap.Error(err))
p.log.Errorw("error while replaying records from store", zap.Any("ID", p.PartitionID), zap.Error(err))
}
for _, msg := range readMessages {
// select to avoid infinite blocking while writing to output channel
Expand Down
33 changes: 25 additions & 8 deletions pkg/pbq/pbq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
"github.com/numaproj/numaflow/pkg/pbq/partition"
"github.com/numaproj/numaflow/pkg/pbq/store"
"github.com/stretchr/testify/assert"
)
Expand All @@ -29,12 +30,18 @@ func TestPBQ_ReadWrite(t *testing.T) {
// write 10 isb messages to persisted store
msgCount := 10
startTime := time.Now()
writeMessages := testutils.BuildTestWriteMessages(int64(msgCount), startTime)
writeMessages := testutils.BuildTestReadMessages(int64(msgCount), startTime)

pq, err := qManager.CreateNewPBQ(ctx, "partition-13")
partitionID := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "new-partition",
}

pq, err := qManager.CreateNewPBQ(ctx, partitionID)
assert.NoError(t, err)

var readMessages []*isb.Message
var readMessages []*isb.ReadMessage
// run a parallel go routine which reads from pbq
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -87,13 +94,18 @@ func Test_PBQReadWithCanceledContext(t *testing.T) {
//write 10 isb messages to persisted store
msgCount := 10
startTime := time.Now()
writeMessages := testutils.BuildTestWriteMessages(int64(msgCount), startTime)
writeMessages := testutils.BuildTestReadMessages(int64(msgCount), startTime)

partitionID := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "new-partition",
}
var pq ReadWriteCloser
pq, err = qManager.CreateNewPBQ(ctx, "partition-14")
pq, err = qManager.CreateNewPBQ(ctx, partitionID)
assert.NoError(t, err)

var readMessages []*isb.Message
var readMessages []*isb.ReadMessage
// run a parallel go routine which reads from pbq
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -148,10 +160,15 @@ func TestPBQ_WriteWithStoreFull(t *testing.T) {
// write 101 isb messages to pbq, but the store size is 100, we should get store is full error
msgCount := 101
startTime := time.Now()
writeMessages := testutils.BuildTestWriteMessages(int64(msgCount), startTime)
writeMessages := testutils.BuildTestReadMessages(int64(msgCount), startTime)
partitionID := partition.ID{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
Key: "new-partition",
}

var pq ReadWriteCloser
pq, err = qManager.CreateNewPBQ(ctx, "partition-10")
pq, err = qManager.CreateNewPBQ(ctx, partitionID)
assert.NoError(t, err)

for _, msg := range writeMessages {
Expand Down
Loading

0 comments on commit d0d74e1

Please sign in to comment.