Skip to content

Commit

Permalink
feat: piped redis broker and results (#44)
Browse files Browse the repository at this point in the history
* feat: piped redis broker and results

* feat: accomodate piped redis implementation in existing results/broker
  • Loading branch information
kalbhor committed Jul 15, 2024
1 parent 5ba7a6b commit 42e9432
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 30 deletions.
77 changes: 61 additions & 16 deletions brokers/redis/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,28 @@ type Options struct {
IdleTimeout time.Duration
MinIdleConns int
PollPeriod time.Duration

// OPTIONAL
// If non-zero, enqueue redis commands will be piped instead of being directly sent each time.
// The pipe will be executed every `PipePeriod` duration.
PipePeriod time.Duration
}

type Broker struct {
log *slog.Logger
conn redis.UniversalClient
pollPeriod time.Duration
lo *slog.Logger
opts Options

conn redis.UniversalClient
pipe redis.Pipeliner
}

func New(o Options, lo *slog.Logger) *Broker {
pollPeriod := o.PollPeriod
if o.PollPeriod == 0 {
pollPeriod = DefaultPollPeriod
o.PollPeriod = DefaultPollPeriod
}
return &Broker{
log: lo,
b := &Broker{
opts: o,
lo: lo,
conn: redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: o.Addrs,
DB: o.DB,
Expand All @@ -51,7 +58,36 @@ func New(o Options, lo *slog.Logger) *Broker {
MinIdleConns: o.MinIdleConns,
IdleTimeout: o.IdleTimeout,
}),
pollPeriod: pollPeriod,
}

if o.PipePeriod != 0 {
b.pipe = b.conn.Pipeline()
go b.execPipe(context.TODO())
}

return b
}

func (r *Broker) execPipe(ctx context.Context) {
tk := time.NewTicker(r.opts.PipePeriod)
for {
select {
case <-ctx.Done():
r.lo.Debug("context closed, draining redis pipe", "length", r.pipe.Len())
if _, err := r.pipe.Exec(ctx); err != nil {
r.lo.Error("could not execute redis pipe", "error", err)
}
return
case <-tk.C:
plen := r.pipe.Len()
if plen == 0 {
continue
}
r.lo.Debug("submitting redis pipe", "length", plen)
if _, err := r.pipe.Exec(ctx); err != nil {
r.lo.Error("could not execute redis pipe", "error", err)
}
}
}
}

Expand All @@ -67,10 +103,19 @@ func (r *Broker) GetPending(ctx context.Context, queue string) ([]string, error)
}

func (b *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error {
if b.opts.PipePeriod != 0 {
return b.pipe.LPush(ctx, queue, msg).Err()
}
return b.conn.LPush(ctx, queue, msg).Err()
}

func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error {
if b.opts.PipePeriod != 0 {
return b.pipe.ZAdd(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.Z{
Score: float64(ts.UnixNano()),
Member: msg,
}).Err()
}
return b.conn.ZAdd(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.Z{
Score: float64(ts.UnixNano()),
Member: msg,
Expand All @@ -83,19 +128,19 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
for {
select {
case <-ctx.Done():
b.log.Debug("shutting down consumer..")
b.lo.Debug("shutting down consumer..")
return
default:
b.log.Debug("receiving from consumer..")
res, err := b.conn.BLPop(ctx, b.pollPeriod, queue).Result()
b.lo.Debug("receiving from consumer..")
res, err := b.conn.BLPop(ctx, b.opts.PollPeriod, queue).Result()
if err != nil && err.Error() != "redis: nil" {
b.log.Error("error consuming from redis queue", "error", err)
b.lo.Error("error consuming from redis queue", "error", err)
} else if errors.Is(err, redis.Nil) {
b.log.Debug("no tasks to consume..", "queue", queue)
b.lo.Debug("no tasks to consume..", "queue", queue)
} else {
msg, err := blpopResult(res)
if err != nil {
b.log.Error("error parsing response from redis", "error", err)
b.lo.Error("error parsing response from redis", "error", err)
return
}
work <- []byte(msg)
Expand All @@ -105,12 +150,12 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
}

func (b *Broker) consumeScheduled(ctx context.Context, queue string) {
poll := time.NewTicker(b.pollPeriod)
poll := time.NewTicker(b.opts.PollPeriod)

for {
select {
case <-ctx.Done():
b.log.Debug("shutting down scheduled consumer..")
b.lo.Debug("shutting down scheduled consumer..")
return
case <-poll.C:
b.conn.Watch(ctx, func(tx *redis.Tx) error {
Expand Down
81 changes: 67 additions & 14 deletions results/redis/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ const (
)

type Results struct {
opt Options
opts Options
lo *slog.Logger
conn redis.UniversalClient
pipe redis.Pipeliner
}

type Options struct {
Expand All @@ -34,6 +35,11 @@ type Options struct {
Expiry time.Duration
MetaExpiry time.Duration
MinIdleConns int

// OPTIONAL
// If non-zero, enqueue redis commands will be piped instead of being directly sent each time.
// The pipe will be executed every `PipePeriod` duration.
PipePeriod time.Duration
}

func DefaultRedis() Options {
Expand All @@ -46,7 +52,7 @@ func DefaultRedis() Options {

func New(o Options, lo *slog.Logger) *Results {
rs := &Results{
opt: o,
opts: o,
conn: redis.NewUniversalClient(
&redis.UniversalOptions{
Addrs: o.Addrs,
Expand All @@ -66,26 +72,50 @@ func New(o Options, lo *slog.Logger) *Results {
if o.MetaExpiry != 0 {
go rs.expireMeta(o.MetaExpiry)
}
if o.PipePeriod != 0 {
rs.pipe = rs.conn.Pipeline()
go rs.execPipe(context.TODO())
}

return rs
}

func (r *Results) execPipe(ctx context.Context) {
tk := time.NewTicker(r.opts.PipePeriod)
for {
select {
case <-ctx.Done():
r.lo.Debug("context closed, draining redis pipe", "length", r.pipe.Len())
if _, err := r.pipe.Exec(ctx); err != nil {
r.lo.Error("could not execute redis pipe", "error", err)
}
return
case <-tk.C:
plen := r.pipe.Len()
if plen == 0 {
continue
}
r.lo.Debug("submitting redis pipe", "length", plen)
if _, err := r.pipe.Exec(ctx); err != nil {
r.lo.Error("could not execute redis pipe", "error", err)
}
}
}
}

func (r *Results) DeleteJob(ctx context.Context, id string) error {
r.lo.Debug("deleting job")

pipe := r.conn.Pipeline()
if err := pipe.ZRem(ctx, resultPrefix+success, 1, id).Err(); err != nil {
return err
}

if err := pipe.ZRem(ctx, resultPrefix+failed, 1, id).Err(); err != nil {
return err
}

if err := pipe.Del(ctx, resultPrefix+id).Err(); err != nil {
return err
}

if _, err := pipe.Exec(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -123,6 +153,12 @@ func (r *Results) GetFailed(ctx context.Context) ([]string, error) {

func (r *Results) SetSuccess(ctx context.Context, id string) error {
r.lo.Debug("setting job as successful", "id", id)
if r.opts.PipePeriod != 0 {
return r.pipe.ZAdd(ctx, resultPrefix+success, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
}).Err()
}
return r.conn.ZAdd(ctx, resultPrefix+success, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
Expand All @@ -131,6 +167,12 @@ func (r *Results) SetSuccess(ctx context.Context, id string) error {

func (r *Results) SetFailed(ctx context.Context, id string) error {
r.lo.Debug("setting job as failed", "id", id)
if r.opts.PipePeriod != 0 {
return r.pipe.ZAdd(ctx, resultPrefix+failed, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
}).Err()
}
return r.conn.ZAdd(ctx, resultPrefix+failed, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
Expand All @@ -139,12 +181,14 @@ func (r *Results) SetFailed(ctx context.Context, id string) error {

func (r *Results) Set(ctx context.Context, id string, b []byte) error {
r.lo.Debug("setting result for job", "id", id)
return r.conn.Set(ctx, resultPrefix+id, b, r.opt.Expiry).Err()
if r.opts.PipePeriod != 0 {
return r.pipe.Set(ctx, resultPrefix+id, b, r.opts.Expiry).Err()
}
return r.conn.Set(ctx, resultPrefix+id, b, r.opts.Expiry).Err()
}

func (r *Results) Get(ctx context.Context, id string) ([]byte, error) {
r.lo.Debug("getting result for job", "id", id)

rs, err := r.conn.Get(ctx, resultPrefix+id).Bytes()
if err != nil {
return nil, err
Expand All @@ -171,13 +215,22 @@ func (r *Results) expireMeta(ttl time.Duration) {
score := strconv.FormatInt(now, 10)

r.lo.Debug("purging failed results metadata", "score", score)
if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}

r.lo.Debug("purging success results metadata", "score", score)
if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
if r.opts.PipePeriod != 0 {
if err := r.pipe.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}
r.lo.Debug("purging success results metadata", "score", score)
if err := r.pipe.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}
} else {
if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}
r.lo.Debug("purging success results metadata", "score", score)
if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}
}
}
}
Expand Down

0 comments on commit 42e9432

Please sign in to comment.