Skip to content

Commit

Permalink
Select rows to trigger in batches of 500
Browse files Browse the repository at this point in the history
  • Loading branch information
ibuildthecloud committed Nov 14, 2019
1 parent 7fc7fb0 commit c7dd89e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
5 changes: 4 additions & 1 deletion pkg/drivers/generic/generic.go
Expand Up @@ -294,8 +294,11 @@ func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
return id, err
}

func (d *Generic) After(ctx context.Context, prefix string, rev int64) (*sql.Rows, error) {
func (d *Generic) After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) {
sql := d.AfterSQL
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
}
return d.query(ctx, sql, prefix, rev)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/logstructured/logstructured.go
Expand Up @@ -13,7 +13,7 @@ type Log interface {
Start(ctx context.Context) error
CurrentRevision(ctx context.Context) (int64, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision int64) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix string) (int64, int64, error)
Append(ctx context.Context, event *server.Event) (int64, error)
Expand Down Expand Up @@ -295,7 +295,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64

result := make(chan []*server.Event, 100)

rev, kvs, err := l.log.After(ctx, prefix, revision)
rev, kvs, err := l.log.After(ctx, prefix, revision, 0)
if err != nil {
logrus.Errorf("failed to list %s for revision %d", prefix, revision)
cancel()
Expand Down
34 changes: 20 additions & 14 deletions pkg/logstructured/sqllog/sql.go
Expand Up @@ -31,7 +31,7 @@ type Dialect interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
Count(ctx context.Context, prefix string) (int64, int64, error)
CurrentRevision(ctx context.Context) (int64, error)
After(ctx context.Context, prefix string, rev int64) (*sql.Rows, error)
After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error)
GetRevision(ctx context.Context, revision int64) (*sql.Rows, error)
DeleteRevision(ctx context.Context, revision int64) error
Expand Down Expand Up @@ -152,12 +152,12 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) {
return s.d.CurrentRevision(ctx)
}

func (s *SQLLog) After(ctx context.Context, prefix string, revision int64) (int64, []*server.Event, error) {
func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) {
if strings.HasSuffix(prefix, "/") {
prefix += "%"
}

rows, err := s.d.After(ctx, prefix, revision)
rows, err := s.d.After(ctx, prefix, revision, limit)
if err != nil {
return 0, nil, err
}
Expand Down Expand Up @@ -280,27 +280,31 @@ func (s *SQLLog) startWatch() (chan interface{}, error) {

func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
var (
last = pollStart
skip int64
skipTime time.Time
last = pollStart
skip int64
skipTime time.Time
waitForMore = true
)

wait := time.NewTicker(time.Second)
defer wait.Stop()
defer close(result)

for {
select {
case <-s.ctx.Done():
return
case check := <-s.notify:
if check <= last {
continue
if waitForMore {
select {
case <-s.ctx.Done():
return
case check := <-s.notify:
if check <= last {
continue
}
case <-wait.C:
}
case <-wait.C:
}
waitForMore = true

rows, err := s.d.After(s.ctx, "%", last)
rows, err := s.d.After(s.ctx, "%", last, 500)
if err != nil {
logrus.Errorf("fail to list latest changes: %v", err)
continue
Expand All @@ -316,6 +320,8 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
continue
}

waitForMore = len(events) < 100

rev := last
var (
sequential []*server.Event
Expand Down

0 comments on commit c7dd89e

Please sign in to comment.