Skip to content

Commit

Permalink
persist firehose cursor. closes #66
Browse files Browse the repository at this point in the history
currently the cursor is saved every event to the database: i'm not sure how expensive this is, but we'll see :')
  • Loading branch information
itstolf committed Aug 1, 2023
1 parent 2cb37f2 commit 95d9863
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 10 deletions.
103 changes: 93 additions & 10 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"strconv"

"github.com/bluesky-social/indigo/util"

Expand Down Expand Up @@ -50,9 +51,10 @@ type FirehoseIngester struct {
store *store.PGXStore

// configuration
subscribeURL string
workerCount int
workItemTimeout time.Duration
subscribeURL string
workerCount int
workItemTimeout time.Duration
cursorFlushTimeout time.Duration
}

func NewFirehoseIngester(
Expand All @@ -63,9 +65,10 @@ func NewFirehoseIngester(
crc: crc,
store: store,

subscribeURL: pdsHost + "/xrpc/com.atproto.sync.subscribeRepos",
workerCount: 8,
workItemTimeout: time.Second * 30,
subscribeURL: pdsHost + "/xrpc/com.atproto.sync.subscribeRepos",
workerCount: 8,
workItemTimeout: time.Second * 30,
cursorFlushTimeout: time.Second * 60,
}
}

Expand All @@ -76,17 +79,56 @@ func (fi *FirehoseIngester) Start(ctx context.Context) error {
// are not ready. In future, we may want to consider some reasonable
// buffering to account for short spikes in event rates.
evtChan := make(chan *atproto.SyncSubscribeRepos_Commit)

numIngestionWorkers := fi.workerCount - 2
workerFirehoseCommitCursors := make([]struct {
mu sync.Mutex
cursor int64
}, numIngestionWorkers)
flushCommitCursor := func(ctx context.Context) error {
cursor := int64(0)
(func() {
// Acquire locks for all workers at once to force a write barrier across all workers.
for i := range workerFirehoseCommitCursors {
wfcc := &workerFirehoseCommitCursors[i]
wfcc.mu.Lock()
// defer here will defer the unlock to the end of the function and not the block, so this does what we intend it to do.
defer wfcc.mu.Unlock()
}

for i := range workerFirehoseCommitCursors {
wfcc := &workerFirehoseCommitCursors[i]
if wfcc.cursor > cursor {
cursor = wfcc.cursor
}
}
})()

return fi.store.SetFirehoseCommitCursor(ctx, cursor)
}
defer func() {
if err := flushCommitCursor(ctx); err != nil {
fi.log.Error(
"failed to flush final firehose commit cursor",
zap.Error(err),
)
}
}()

eg.Go(func() error {
workerWg := sync.WaitGroup{}
for n := 1; n < fi.workerCount; n++ {
n := n

for i := 0; i < numIngestionWorkers; i++ {
i := i
wfcc := &workerFirehoseCommitCursors[i]

workerWg.Add(1)
go func() {
defer workerWg.Done()
for {
select {
case <-ctx.Done():
fi.log.Warn("worker exiting", zap.Int("worker", n))
fi.log.Warn("worker exiting", zap.Int("worker", i))
return
case evt := <-evtChan:
// record start time so we can collect
Expand All @@ -102,6 +144,20 @@ func (fi *FirehoseIngester) Start(ctx context.Context) error {
zap.Error(err),
)
}

(func() {
wfcc.mu.Lock()
defer wfcc.mu.Unlock()
if evt.Seq <= wfcc.cursor {
fi.log.Error(
"cursor went backwards or was repeated",
zap.Int64("seq", evt.Seq),
zap.Int64("cursor", wfcc.cursor),
)
}
wfcc.cursor = evt.Seq
})()

workItemsProcessed.
WithLabelValues("repo_commit").
Observe(time.Since(start).Seconds())
Expand All @@ -118,8 +174,35 @@ func (fi *FirehoseIngester) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for {
select {
case <-ctx.Done():
fi.log.Warn("cursor flushing worker exiting")
return nil
case <-time.After(fi.cursorFlushTimeout):
}

if err := flushCommitCursor(ctx); err != nil {
fi.log.Error(
"failed to flush firehose commit cursor",
zap.Error(err),
)
}
}
})

eg.Go(func() error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

cursor, err := fi.store.GetFirehoseCommitCursor(ctx)
if err != nil {
return fmt.Errorf("get commit cursor: %w", err)
}
fi.log.Info("starting ingestion", zap.Int64("cursor", cursor))

con, _, err := websocket.DefaultDialer.DialContext(
ctx, fi.subscribeURL, http.Header{},
ctx, fi.subscribeURL+"?cursor="+strconv.FormatInt(cursor, 10), http.Header{},
)
if err != nil {
return fmt.Errorf("dialing websocket: %w", err)
Expand Down
30 changes: 30 additions & 0 deletions store/gen/firehose_cursor.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions store/gen/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE firehose_commit_cursor;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE firehose_commit_cursor (
cursor BIGINT NOT NULL
);
CREATE UNIQUE INDEX firehose_commit_cursor_single_row_idx ON firehose_commit_cursor((0));
INSERT INTO firehose_commit_cursor (cursor) VALUES (0);
10 changes: 10 additions & 0 deletions store/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,3 +683,13 @@ func (s *PGXStore) CreateAuditEvent(ctx context.Context, opts CreateAuditEventOp

return out, nil
}

func (s *PGXStore) GetFirehoseCommitCursor(ctx context.Context) (out int64, err error) {
out, err = s.queries.GetFirehoseCommitCursor(ctx, s.pool)
return
}

func (s *PGXStore) SetFirehoseCommitCursor(ctx context.Context, cursor int64) (err error) {
err = s.queries.SetFirehoseCommitCursor(ctx, s.pool, cursor)
return
}
5 changes: 5 additions & 0 deletions store/queries/firehose_cursor.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- name: SetFirehoseCommitCursor :exec
UPDATE firehose_commit_cursor SET cursor = $1;

-- name: GetFirehoseCommitCursor :one
SELECT cursor FROM firehose_commit_cursor;

0 comments on commit 95d9863

Please sign in to comment.