Skip to content

Commit

Permalink
update: use UNLINK and chunking keys to delete task keys
Browse files Browse the repository at this point in the history
  • Loading branch information
shioshiota committed Sep 7, 2021
1 parent 21b33e1 commit 2b99cf7
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 23 deletions.
9 changes: 5 additions & 4 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,11 @@ func mustInitializeQueueBackend() {
queueBackend, err = backendfactory.NewBackend(logger, backendconfig.Config{
BackendType: cmdOpts.Backend,
Redis: &backendconfig.RedisConfig{
KeyPrefix: cmdOpts.Redis.KeyPrefix,
Client: cmdOpts.Redis.NewClient(),
Backoff: cmdOpts.Redis.Backoff,
ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet,
KeyPrefix: cmdOpts.Redis.KeyPrefix,
Client: cmdOpts.Redis.NewClient(),
Backoff: cmdOpts.Redis.Backoff,
ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet,
ChunkSizeInDelete: cmdOpts.Redis.ChunkSizeInDelete,
},
})

Expand Down
12 changes: 7 additions & 5 deletions pkg/backend/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ type Config struct {
}

type RedisConfig struct {
KeyPrefix string
Client *redis.Client
Backoff BackoffConfig
ChunkSizeInGet int
KeyPrefix string
Client *redis.Client
Backoff BackoffConfig
ChunkSizeInGet int
ChunkSizeInDelete int
}

// TODO: support UniversalOptions
Expand All @@ -52,7 +53,8 @@ type RedisClientConfig struct {
IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" default:"5m"`
IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"`

ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"`
ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"`
ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInGet" default:"1000"`
}

func (c RedisClientConfig) NewClient() *redis.Client {
Expand Down
12 changes: 10 additions & 2 deletions pkg/backend/redis/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error {
// .. task_keys = collect task keys
// WATCh task_keys
// MULTI
// DEL {queue_key} worker_keys task_keys
// UNLINK {queue_key} worker_keys task_keys
// HDEL {all_queues_key} {queueName}
// EXEC
txf := func(tx *redis.Tx) error {
Expand All @@ -240,8 +240,16 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error {
tx.Watch(taskKeysToDelete...)
keysToDelete = append(keysToDelete, taskKeysToDelete...)

chunkSize := b.ChunkSizeInGet
numOfKeysToDelete := len(keysToDelete)
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Del(keysToDelete...)
for begin := 0; begin < numOfKeysToDelete; begin += chunkSize {
end := begin + chunkSize
if end > numOfKeysToDelete {
end = numOfKeysToDelete
}
pipe.Unlink(keysToDelete[begin:end]...)
}
pipe.HDel(b.allQueuesKey(), queue.Spec.Name)
return nil
})
Expand Down
9 changes: 5 additions & 4 deletions pkg/backend/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ var _ = Describe("Backend", func() {
ibackend, err := NewBackend(logger, backendconfig.Config{
BackendType: "redis",
Redis: &backendconfig.RedisConfig{
KeyPrefix: "test",
Client: client,
Backoff: backoffConfig,
ChunkSizeInGet: 1000,
KeyPrefix: "test",
Client: client,
Backoff: backoffConfig,
ChunkSizeInGet: 1000,
ChunkSizeInDelete: 1000,
},
})
Expect(err).NotTo(HaveOccurred())
Expand Down
10 changes: 5 additions & 5 deletions pkg/backend/redis/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (
)

const (
KB = 1 << 10
PayloadMaxSizeInKB = 1
MessageMaxSizeInKB = 1
HistoryLengthMax = 10
MaxNameLength = 1024
KB = 1 << 10
PayloadMaxSizeInKB = 1
MessageMaxSizeInKB = 1
HistoryLengthMax = 10
MaxNameLength = 1024
)

func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ var _ = Describe("Worker", func() {
bcknd, err = backendfactory.NewBackend(logger, backendconfig.Config{
BackendType: "redis",
Redis: &backendconfig.RedisConfig{
Client: client,
Backoff: backendConfig,
ChunkSizeInGet: 1000,
Client: client,
Backoff: backendConfig,
ChunkSizeInGet: 1000,
ChunkSizeInDelete: 1000,
},
})
Expect(err).NotTo(HaveOccurred())
Expand Down

0 comments on commit 2b99cf7

Please sign in to comment.