Skip to content

Commit

Permalink
feat: windowing operations (#157)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: ashwinidulams <ashttk@gmail.com>
Co-authored-by: ashwinidulams <ashttk@gmail.com>
Co-authored-by: ashwinidulams <61725106+ashwinidulams@users.noreply.github.com>
  • Loading branch information
3 people committed Sep 15, 2022
1 parent 7d41129 commit 021bb9d
Show file tree
Hide file tree
Showing 8 changed files with 903 additions and 0 deletions.
23 changes: 23 additions & 0 deletions pkg/window/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Package window implements windowing constructs. In the world of data processing on an unbounded stream, Windowing
// is a concept of grouping data using temporal boundaries. We use event-time to discover temporal boundaries on an
// unbounded, infinite stream and Watermark to ensure the datasets within the boundaries are complete. A reduce function
// can be applied on this group of data.
//
// Windows are of different types, quite popular ones are Fixed windows and Sliding windows. Sessions are managed via
// little less popular windowing strategy called Session windows. Windowing is implemented as a two stage process,
// - Assign windows - assign the event to a window
// - Merge windows - group all the events that below to the same window
//
// The two stage approach is required because assignment of windows could happen as elements are streaming in, but merging
// could happen before the data materialization happens. This is important esp. when we handle session windows where a
// new event can change the end time of the window.
//
// For simplicity, we will be truncating the windows' boundaries to the nearest time unit (say, 1 minute windows will
// be truncated to 0th second). Truncating window time to the nearest boundary will help us do mapping with constant time
// without affecting the correctness, except for the very first materialization of result (e.g., we started at 9:00.11
// and the result will be materialized at 9:01.00 and not at 9:01:11).
//
// Windows may be either aligned (e.g., Fixed, Sliding), i.e. applied across all the data for the window of time in
// question, or unaligned, (e.g., Session) i.e. applied across only specific subsets of the data (e.g. per key) for the
// given window of time.
package window
54 changes: 54 additions & 0 deletions pkg/window/keyed/keyed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package keyed

import (
"fmt"
"sync"

"github.com/numaproj/numaflow/pkg/window"
)

// PartitionId uniquely identifies a partition
type PartitionId string

// KeyedWindow maintains association between keys and a window.
// In a keyed stream, we need to close all the partitions when the watermark is past the window.
type KeyedWindow struct {
*window.IntervalWindow
Keys []string
lock sync.RWMutex
}

// NewKeyedWindow creates a new keyed window
func NewKeyedWindow(window *window.IntervalWindow) *KeyedWindow {
kw := &KeyedWindow{
IntervalWindow: window,
Keys: make([]string, 0),
lock: sync.RWMutex{},
}
return kw
}

// AddKey adds a key to an existing window
func (kw *KeyedWindow) AddKey(key string) {
kw.lock.Lock()
defer kw.lock.Unlock()
kw.Keys = append(kw.Keys, key)
}

// Partitions returns an array of partitions for a window
func (kw *KeyedWindow) Partitions() []PartitionId {
kw.lock.RLock()
defer kw.lock.RUnlock()

partitions := make([]PartitionId, len(kw.Keys))
for i, key := range kw.Keys {
partitions[i] = Partition(kw.IntervalWindow, key)
}

return partitions
}

// Partition returns the partitionId for a given window and a key
func Partition(window *window.IntervalWindow, key string) PartitionId {
return PartitionId(fmt.Sprintf("%v-%v-%s", window.Start.Unix(), window.End.Unix(), key))
}
87 changes: 87 additions & 0 deletions pkg/window/keyed/keyed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package keyed

import (
"testing"
"time"

"github.com/numaproj/numaflow/pkg/window"
"github.com/stretchr/testify/assert"
)

func TestKeyedWindow_AddKey(t *testing.T) {
iw := &window.IntervalWindow{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
}
kw := NewKeyedWindow(iw)
tests := []struct {
name string
given *KeyedWindow
input string
expectedKeys []string
}{
{
name: "no_keys",
given: &KeyedWindow{},
input: "key1",
expectedKeys: []string{"key1"},
},
{
name: "with_some_existing_keys",
given: &KeyedWindow{
Keys: []string{"key2", "key3"},
},
input: "key4",
expectedKeys: []string{"key2", "key3", "key4"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kw = NewKeyedWindow(iw)
kw.Keys = append(kw.Keys, tt.given.Keys...)
kw.AddKey(tt.input)
assert.ElementsMatch(t, kw.Keys, tt.expectedKeys)
})
}
}

func TestKeyedWindow_Partitions(t *testing.T) {
iw := &window.IntervalWindow{
Start: time.Unix(60, 0),
End: time.Unix(120, 0),
}
kw := NewKeyedWindow(iw)
tests := []struct {
name string
given *KeyedWindow
input string
expected []string
}{
{
name: "no_keys",
given: &KeyedWindow{},
expected: []string{},
},
{
name: "with_some_existing_keys",
given: &KeyedWindow{
Keys: []string{"key2", "key3"},
},
expected: []string{
"60-120-key2",
"60-120-key3",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
kw.Keys = tt.given.Keys
ret := kw.Partitions()
for idx, s := range tt.expected {
assert.EqualValues(t, ret[idx], s)
}
})
}
}
40 changes: 40 additions & 0 deletions pkg/window/strategy/fixed/fixed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Package fixed implements Fixed windows. Fixed windows (sometimes called tumbling windows) are
// defined by a static window size, e.g. minutely windows or hourly windows. They are generally aligned, i.e. every
// window applies across all the data for the corresponding period of time.
// Package fixed also maintains the state of active keyed windows in a vertex.
// Keyed Window maintains the association between set of keys and an interval window.
// keyed also provides the lifecycle management of an interval window. Watermark is used to trigger the expiration of windows.
package fixed

import (
"time"

"github.com/numaproj/numaflow/pkg/window"
)

// Fixed implements Fixed window.
type Fixed struct {
// Length is the temporal length of the window.
Length time.Duration
}

var _ window.Windower = (*Fixed)(nil)

// NewFixed returns a Fixed window.
func NewFixed(length time.Duration) *Fixed {
return &Fixed{
Length: length,
}
}

// AssignWindow assigns a window for the given eventTime.
func (f *Fixed) AssignWindow(eventTime time.Time) []*window.IntervalWindow {
start := eventTime.Truncate(f.Length)

return []*window.IntervalWindow{
{
Start: start,
End: start.Add(f.Length),
},
}
}
74 changes: 74 additions & 0 deletions pkg/window/strategy/fixed/fixed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package fixed

import (
"testing"
"time"

"github.com/numaproj/numaflow/pkg/window"
)

func TestFixed_AssignWindow(t *testing.T) {

loc, _ := time.LoadLocation("UTC")
baseTime := time.Unix(1651129201, 0).In(loc)

tests := []struct {
name string
length time.Duration
eventTime time.Time
want []*window.IntervalWindow
}{
{
name: "minute",
length: time.Minute,
eventTime: baseTime,
want: []*window.IntervalWindow{
{
Start: time.Unix(1651129200, 0).In(loc),
End: time.Unix(1651129260, 0).In(loc),
},
},
},
{
name: "hour",
length: time.Hour,
eventTime: baseTime,
want: []*window.IntervalWindow{
{
Start: time.Unix(1651129200, 0).In(loc),
End: time.Unix(1651129200+3600, 0).In(loc),
},
},
},
{
name: "5_minute",
length: time.Minute * 5,
eventTime: baseTime,
want: []*window.IntervalWindow{
{
Start: time.Unix(1651129200, 0).In(loc),
End: time.Unix(1651129200+300, 0).In(loc),
},
},
},
{
name: "30_second",
length: time.Second * 30,
eventTime: baseTime,
want: []*window.IntervalWindow{
{
Start: time.Unix(1651129200, 0).In(loc),
End: time.Unix(1651129230, 0).In(loc),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f := NewFixed(tt.length)
if got := f.AssignWindow(tt.eventTime); !(got[0].Start.Equal(tt.want[0].Start) && got[0].End.Equal(tt.want[0].End)) {
t.Errorf("AssignWindow() = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit 021bb9d

Please sign in to comment.