Skip to content

Commit

Permalink
zero: resource bundle reconciler (#4445)
Browse files Browse the repository at this point in the history
  • Loading branch information
wasaga committed Aug 17, 2023
1 parent 788376b commit 3b65049
Show file tree
Hide file tree
Showing 18 changed files with 1,560 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ require (
golang.org/x/net v0.12.0
golang.org/x/oauth2 v0.10.0
golang.org/x/sync v0.3.0
golang.org/x/time v0.3.0
google.golang.org/api v0.134.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20230720185612-659f7aaaa771
google.golang.org/grpc v1.57.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,7 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
35 changes: 35 additions & 0 deletions internal/zero/reconciler/bundles_format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package reconciler

import (
"bufio"
"errors"
"fmt"
"io"

"google.golang.org/protobuf/encoding/protodelim"

"github.com/pomerium/pomerium/pkg/grpc/databroker"
)

var unmarshalOpts = protodelim.UnmarshalOptions{}

// ReadBundleRecords reads records in a protobuf wire format from src.
// Each record is expected to be a databroker.Record.
func ReadBundleRecords(src io.Reader) (RecordSetBundle[DatabrokerRecord], error) {
r := bufio.NewReader(src)
rsb := make(RecordSetBundle[DatabrokerRecord])
for {
record := new(databroker.Record)
err := unmarshalOpts.UnmarshalFrom(r, record)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, fmt.Errorf("error reading protobuf record: %w", err)
}

rsb.Add(DatabrokerRecord{record})
}

return rsb, nil
}
64 changes: 64 additions & 0 deletions internal/zero/reconciler/bundles_format_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package reconciler

import (
"fmt"
"io"
"os"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protodelim"
"google.golang.org/protobuf/proto"

"github.com/pomerium/pomerium/pkg/grpc/config"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/protoutil"
)

func TestReadRecords(t *testing.T) {

dir := t.TempDir()
fd, err := os.CreateTemp(dir, "config")
require.NoError(t, err)
t.Cleanup(func() { _ = fd.Close() })

err = writeSampleRecords(fd)
require.NoError(t, err)

_, err = fd.Seek(0, io.SeekStart)
require.NoError(t, err)

records, err := ReadBundleRecords(fd)
require.NoError(t, err)
require.Len(t, records, 1)
}

func writeSampleRecords(dst io.Writer) error {
var marshalOpts = protodelim.MarshalOptions{
MarshalOptions: proto.MarshalOptions{
AllowPartial: false,
Deterministic: true,
UseCachedSize: false,
},
}

cfg := protoutil.NewAny(&config.Config{
Routes: []*config.Route{
{
From: "https://from.example.com",
To: []string{"https://to.example.com"},
},
},
})
rec := &databroker.Record{
Id: "config",
Type: cfg.GetTypeUrl(),
Data: cfg,
}
_, err := marshalOpts.MarshalTo(dst, rec)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}

return nil
}
124 changes: 124 additions & 0 deletions internal/zero/reconciler/bundles_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package reconciler

import (
"container/heap"
"sync"
)

type bundle struct {
id string
synced bool
priority int
}

type bundleHeap []bundle

func (h bundleHeap) Len() int { return len(h) }
func (h bundleHeap) Less(i, j int) bool {
// If one is synced and the other is not, the unsynced one comes first
if h[i].synced != h[j].synced {
return !h[i].synced
}
// Otherwise, the one with the lower priority comes first
return h[i].priority < h[j].priority
}

func (h bundleHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *bundleHeap) Push(x interface{}) {
item := x.(bundle)
*h = append(*h, item)
}

func (h *bundleHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// BundleQueue is a priority queue of bundles to sync.
type BundleQueue struct {
sync.Mutex
bundles bundleHeap
counter int // to assign priorities based on order of insertion
}

// Set sets the bundles to be synced. This will reset the sync status of all bundles.
func (b *BundleQueue) Set(bundles []string) {
b.Lock()
defer b.Unlock()

b.bundles = make(bundleHeap, len(bundles))
b.counter = len(bundles)
for i, id := range bundles {
b.bundles[i] = bundle{
id: id,
synced: false,
priority: i,
}
}
heap.Init(&b.bundles)
}

// MarkForSync marks the bundle with the given ID for syncing.
func (b *BundleQueue) MarkForSync(id string) {
b.Lock()
defer b.Unlock()

for i, bundle := range b.bundles {
if bundle.id == id {
b.bundles[i].synced = false
heap.Fix(&b.bundles, i)
return
}
}

newBundle := bundle{id: id, synced: false, priority: b.counter}
heap.Push(&b.bundles, newBundle)
b.counter++
}

// MarkForSyncLater marks the bundle with the given ID for syncing later (after all other bundles).
func (b *BundleQueue) MarkForSyncLater(id string) {
b.Lock()
defer b.Unlock()

for i, bundle := range b.bundles {
if bundle.id != id {
continue
}

// Increase the counter first to ensure that this bundle has the highest (last) priority.
b.counter++
b.bundles[i].synced = false
b.bundles[i].priority = b.counter
heap.Fix(&b.bundles, i)
return
}
}

// GetNextBundleToSync returns the ID of the next bundle to sync and whether there is one.
func (b *BundleQueue) GetNextBundleToSync() (string, bool) {
b.Lock()
defer b.Unlock()

if len(b.bundles) == 0 {
return "", false
}

// Check the top bundle without popping
if b.bundles[0].synced {
return "", false
}

// Mark the top bundle as synced and push it to the end
id := b.bundles[0].id
b.bundles[0].synced = true
b.bundles[0].priority = b.counter
heap.Fix(&b.bundles, 0)
b.counter++

return id, true
}
95 changes: 95 additions & 0 deletions internal/zero/reconciler/bundles_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package reconciler_test

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pomerium/pomerium/internal/zero/reconciler"
)

func TestQueueSet(t *testing.T) {
t.Parallel()

b := &reconciler.BundleQueue{}
b.Set([]string{"bundle1", "bundle2"})

id1, ok1 := b.GetNextBundleToSync()
id2, ok2 := b.GetNextBundleToSync()

assert.True(t, ok1, "Expected bundle1 to be set")
assert.Equal(t, "bundle1", id1)
assert.True(t, ok2, "Expected bundle2 to be set")
assert.Equal(t, "bundle2", id2)

id3, ok3 := b.GetNextBundleToSync()
assert.False(t, ok3, "Expected no more bundles to sync")
assert.Empty(t, id3)
}

func TestQueueMarkForSync(t *testing.T) {
t.Parallel()

b := &reconciler.BundleQueue{}
b.Set([]string{"bundle1", "bundle2"})

b.MarkForSync("bundle2")
id1, ok1 := b.GetNextBundleToSync()

assert.True(t, ok1, "Expected bundle1 to be marked for sync")
assert.Equal(t, "bundle1", id1)

b.MarkForSync("bundle3")
id2, ok2 := b.GetNextBundleToSync()
id3, ok3 := b.GetNextBundleToSync()

assert.True(t, ok2, "Expected bundle2 to be marked for sync")
assert.Equal(t, "bundle2", id2)
assert.True(t, ok3, "Expected bundle3 to be marked for sync")
assert.Equal(t, "bundle3", id3)
}

func TestQueueMarkForSyncLater(t *testing.T) {
t.Parallel()

b := &reconciler.BundleQueue{}
b.Set([]string{"bundle1", "bundle2", "bundle3"})

id1, ok1 := b.GetNextBundleToSync()
b.MarkForSyncLater("bundle1")
id2, ok2 := b.GetNextBundleToSync()
id3, ok3 := b.GetNextBundleToSync()
id4, ok4 := b.GetNextBundleToSync()
id5, ok5 := b.GetNextBundleToSync()

assert.True(t, ok1, "Expected bundle1 to be marked for sync")
assert.Equal(t, "bundle1", id1)
assert.True(t, ok2, "Expected bundle2 to be marked for sync")
assert.Equal(t, "bundle2", id2)
assert.True(t, ok3, "Expected bundle3 to be marked for sync")
assert.Equal(t, "bundle3", id3)
assert.True(t, ok4, "Expected bundle1 to be marked for sync")
assert.Equal(t, "bundle1", id4)
assert.False(t, ok5, "Expected no more bundles to sync")
assert.Empty(t, id5)

}

func TestQueueGetNextBundleToSync(t *testing.T) {
t.Parallel()

b := &reconciler.BundleQueue{}
b.Set([]string{"bundle1", "bundle2"})

id1, ok1 := b.GetNextBundleToSync()
id2, ok2 := b.GetNextBundleToSync()
id3, ok3 := b.GetNextBundleToSync()

assert.True(t, ok1, "Expected bundle1 to be retrieved for sync")
assert.Equal(t, "bundle1", id1)
assert.True(t, ok2, "Expected bundle2 to be retrieved for sync")
assert.Equal(t, "bundle2", id2)
require.False(t, ok3, "Expected no more bundles to sync")
assert.Empty(t, id3)
}

0 comments on commit 3b65049

Please sign in to comment.