Skip to content

Commit

Permalink
ersist firehose cursor. closes #66
Browse files Browse the repository at this point in the history
currently the cursor is saved every event to the database: i'm not sure how expensive this is, but we'll see :')
  • Loading branch information
itstolf committed Aug 9, 2023
1 parent 721ddd4 commit 7bdc399
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 10 deletions.
122 changes: 112 additions & 10 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"strconv"

"github.com/bluesky-social/indigo/util"

Expand Down Expand Up @@ -48,16 +49,27 @@ type actorCacher interface {
CreatePendingCandidateActor(ctx context.Context, did string) (err error)
}

var workerCursors = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "bff_ingester_worker_cursors",
Help: "The current cursor a worker is at.",
}, []string{"worker"})

var flushedWorkerCursor = promauto.NewGauge(prometheus.GaugeOpts{
Name: "bff_ingester_flushed_worker_cursor",
Help: "The current cursor flushed to persistent storage.",
})

type FirehoseIngester struct {
// dependencies
log *zap.Logger
actorCache actorCacher
store *store.PGXStore

// configuration
subscribeURL string
workerCount int
workItemTimeout time.Duration
subscribeURL string
workerCount int
workItemTimeout time.Duration
cursorFlushTimeout time.Duration
}

func NewFirehoseIngester(
Expand All @@ -68,9 +80,10 @@ func NewFirehoseIngester(
actorCache: crc,
store: store,

subscribeURL: pdsHost + "/xrpc/com.atproto.sync.subscribeRepos",
workerCount: 8,
workItemTimeout: time.Second * 30,
subscribeURL: pdsHost + "/xrpc/com.atproto.sync.subscribeRepos",
workerCount: 8,
workItemTimeout: time.Second * 30,
cursorFlushTimeout: time.Second * 60,
}
}

Expand All @@ -81,17 +94,64 @@ func (fi *FirehoseIngester) Start(ctx context.Context) error {
// are not ready. In future, we may want to consider some reasonable
// buffering to account for short spikes in event rates.
evtChan := make(chan *atproto.SyncSubscribeRepos_Commit)

workerFirehoseCommitCursors := make([]struct {
mu sync.Mutex
cursor int64
}, fi.workerCount)
flushCommitCursor := func(ctx context.Context) error {
cursors := make([]int64, len(workerFirehoseCommitCursors))
(func() {
// Acquire locks for all workers at once to force a write barrier across all workers.
for i := range workerFirehoseCommitCursors {
wfcc := &workerFirehoseCommitCursors[i]

wfcc.mu.Lock()
// defer here will defer the unlock to the end of the function and not the block, so this does what we intend it to do.
defer wfcc.mu.Unlock()

cursors[i] = wfcc.cursor
}
})()

// We want to set to the lowest cursor of all known cursors, since if we set it to the highest it may result in losing data from unprocessed events.
// If we set it to the lowest cursor, we will maybe get events get redelivered, but at least we won't lose any.
cursor := cursors[0]
for _, v := range cursors[1:] {
if v < cursor {
cursor = v
}
}

if err := fi.store.SetFirehoseCommitCursor(ctx, cursor); err != nil {
return err
}
flushedWorkerCursor.Set(float64(cursor))
return nil
}
defer func() {
if err := flushCommitCursor(ctx); err != nil {
fi.log.Error(
"failed to flush final firehose commit cursor",
zap.Error(err),
)
}
}()

eg.Go(func() error {
workerWg := sync.WaitGroup{}
for n := 1; n < fi.workerCount; n++ {
n := n

for i := 0; i < fi.workerCount; i++ {
i := i
wfcc := &workerFirehoseCommitCursors[i]

workerWg.Add(1)
go func() {
defer workerWg.Done()
for {
select {
case <-ctx.Done():
fi.log.Warn("worker exiting", zap.Int("worker", n))
fi.log.Warn("worker exiting", zap.Int("worker", i))
return
case evt := <-evtChan:
// record start time so we can collect
Expand All @@ -107,6 +167,21 @@ func (fi *FirehoseIngester) Start(ctx context.Context) error {
zap.Error(err),
)
}

(func() {
wfcc.mu.Lock()
defer wfcc.mu.Unlock()
if evt.Seq <= wfcc.cursor {
fi.log.Error(
"cursor went backwards or was repeated",
zap.Int64("seq", evt.Seq),
zap.Int64("cursor", wfcc.cursor),
)
}
workerCursors.WithLabelValues(strconv.Itoa(i)).Set(float64(evt.Seq))
wfcc.cursor = evt.Seq
})()

workItemsProcessed.
WithLabelValues("repo_commit").
Observe(time.Since(start).Seconds())
Expand All @@ -123,8 +198,35 @@ func (fi *FirehoseIngester) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for {
select {
case <-ctx.Done():
fi.log.Warn("cursor flushing worker exiting")
return nil
case <-time.After(fi.cursorFlushTimeout):
}

if err := flushCommitCursor(ctx); err != nil {
fi.log.Error(
"failed to flush firehose commit cursor",
zap.Error(err),
)
}
}
})

eg.Go(func() error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

cursor, err := fi.store.GetFirehoseCommitCursor(ctx)
if err != nil {
return fmt.Errorf("get commit cursor: %w", err)
}
fi.log.Info("starting ingestion", zap.Int64("cursor", cursor))

con, _, err := websocket.DefaultDialer.DialContext(
ctx, fi.subscribeURL, http.Header{},
ctx, fi.subscribeURL+"?cursor="+strconv.FormatInt(cursor, 10), http.Header{},
)
if err != nil {
return fmt.Errorf("dialing websocket: %w", err)
Expand Down
30 changes: 30 additions & 0 deletions store/gen/firehose_cursor.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions store/gen/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE firehose_commit_cursor;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE firehose_commit_cursor (
cursor BIGINT NOT NULL
);
CREATE UNIQUE INDEX firehose_commit_cursor_single_row_idx ON firehose_commit_cursor((0));
INSERT INTO firehose_commit_cursor (cursor) VALUES (0);
10 changes: 10 additions & 0 deletions store/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,3 +704,13 @@ func (s *PGXStore) GetPostByURI(ctx context.Context, uri string) (out gen.Candid
// TODO: Return a proto type rather than exposing gen.CandidatePost
return s.queries.GetPostByURI(ctx, s.pool, uri)
}

func (s *PGXStore) GetFirehoseCommitCursor(ctx context.Context) (out int64, err error) {
out, err = s.queries.GetFirehoseCommitCursor(ctx, s.pool)
return
}

func (s *PGXStore) SetFirehoseCommitCursor(ctx context.Context, cursor int64) (err error) {
err = s.queries.SetFirehoseCommitCursor(ctx, s.pool, cursor)
return
}
5 changes: 5 additions & 0 deletions store/queries/firehose_cursor.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- name: SetFirehoseCommitCursor :exec
UPDATE firehose_commit_cursor SET cursor = $1;

-- name: GetFirehoseCommitCursor :one
SELECT cursor FROM firehose_commit_cursor;

0 comments on commit 7bdc399

Please sign in to comment.