Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement bulk key fetching and deleting for large queue without transaction #45

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,12 @@ func mustInitializeQueueBackend() {
queueBackend, err = backendfactory.NewBackend(logger, backendconfig.Config{
BackendType: cmdOpts.Backend,
Redis: &backendconfig.RedisConfig{
KeyPrefix: cmdOpts.Redis.KeyPrefix,
Client: cmdOpts.Redis.NewClient(),
Backoff: cmdOpts.Redis.Backoff,
ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet,
KeyPrefix: cmdOpts.Redis.KeyPrefix,
Client: cmdOpts.Redis.NewClient(),
Backoff: cmdOpts.Redis.Backoff,
ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet,
ChunkSizeInDelete: cmdOpts.Redis.ChunkSizeInDelete,
WithoutTransaction: cmdOpts.Redis.WithoutTransaction,
},
})

Expand Down
14 changes: 9 additions & 5 deletions pkg/backend/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ type Config struct {
}

type RedisConfig struct {
KeyPrefix string
Client *redis.Client
Backoff BackoffConfig
ChunkSizeInGet int
KeyPrefix string
Client *redis.Client
Backoff BackoffConfig
ChunkSizeInGet int
ChunkSizeInDelete int
WithoutTransaction bool
}

// TODO: support UniversalOptions
Expand All @@ -52,7 +54,9 @@ type RedisClientConfig struct {
IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" default:"5m"`
IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"`

ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"`
ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"`
ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInDelete" default:"1000"`
WithoutTransaction bool `json:"withoutTransaction" yaml:"withoutTransaction" default:"false"`
}

func (c RedisClientConfig) NewClient() *redis.Client {
Expand Down
1 change: 1 addition & 0 deletions pkg/backend/iface/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
TaskQueueNotFound = errors.New("Queue not found")
TaskQueueExisted = errors.New("Queue already exists")
TaskQueueEmptyError = errors.New("Queue is empty")
TaskQueueIsTooLarge = errors.New("Queue have many tasks so we need --without_transaction option")
TaskSuspendedError = errors.New("Queue is suspended")
WorkerNotFound = errors.New("Worker not found")
WorkerExitedError = errors.New("Worker already exists")
Expand Down
109 changes: 107 additions & 2 deletions pkg/backend/redis/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,24 @@ func (b *Backend) UpdateQueue(ctx context.Context, queueSpec taskqueue.TaskQueue
return queue, nil
}

func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error {
func (b *Backend) deleteQueueWithTransaction(ctx context.Context, queueName string) error {
if err := taskqueue.ValidateQueueName(queueName); err != nil {
return err
}
queue, err := b.ensureQueueExistsByName(b.Client, queueName)
if err != nil {
return err
}

numOfTasks, err := b.Client.SCard(b.tasksKey(queue.UID.String())).Result()
if err != nil {
return err
}

if int64(b.ChunkSizeInDelete) <= numOfTasks {
return iface.TaskQueueIsTooLarge
}

// WATCH {all_queues_key} {queue_key}
// .. worker_keys = collect worker keys
// WATCH worker_keys
Expand Down Expand Up @@ -252,13 +262,108 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error {
b.Logger.With().
Str("queueName", queueName).
Str("queueUID", queue.UID.String()).
Str("operation", "DeleteQueue").
Str("operation", "DeleteQueueWithTransaction").
Logger(),
txf,
b.allQueuesKey(), b.queueKey(queue.UID.String()),
)
}

func (b *Backend) deleteQueueWithoutTransaction(ctx context.Context, queueName string) error {
if err := taskqueue.ValidateQueueName(queueName); err != nil {
return err
}
queue, err := b.ensureQueueExistsByName(b.Client, queueName)
if err != nil {
return err
}
//
// .. worker_keys = collect worker keys
// .. task_keys = collect task keys
// Use chunk to divide and loop until UNLINK all keys
// ---loop start---
// WATCH {all_queues_key} {queue_key}
// UNLINK chulk_keys
// ---loop end---
// WATCH {all_queues_key} {queue_key}
// MULTI
// DEL {queue_key} worker_keys task_keys
// HDEL {all_queues_key} {queueName}
// EXEC
keysToDelete := []string{}

workerKeysToDelete, err := b.allWorkersKeysForDeleteQueue(b.Client, queue.UID.String())
if err != nil {
return err
}
keysToDelete = append(keysToDelete, workerKeysToDelete...)

taskKeysToDelete, err := b.allTasksKeysForDeleteQueue(b.Client, queue.UID.String())
if err != nil {
return err
}
keysToDelete = append(keysToDelete, taskKeysToDelete...)

// chunk delete
for cursor := 0; cursor < len(keysToDelete); cursor += b.ChunkSizeInDelete {
time.Sleep(100 * time.Millisecond)
end := cursor + b.ChunkSizeInDelete
if len(keysToDelete) <= end {
end = len(keysToDelete)
}

deleteKeys := keysToDelete[cursor:end]
if len(deleteKeys) == 0 {
continue
}
err = b.runTxWithBackOff(
ctx,
b.Logger.With().
Str("queueName", queueName).
Str("queueUID", queue.UID.String()).
Str("operation", "DeleteQueueWithTransaction").
Logger(),
func(tx *redis.Tx) error {
_, err = tx.Unlink(deleteKeys...).Result()
return err
},
b.allQueuesKey(), b.queueKey(queue.UID.String()),
)
if err != nil {
return err
}
}

err = b.runTxWithBackOff(
ctx,
b.Logger.With().
Str("queueName", queueName).
Str("queueUID", queue.UID.String()).
Str("operation", "DeleteQueueWithTransaction").
Logger(),
func(tx *redis.Tx) error {
_, err = tx.Unlink(b.queueKey(queue.UID.String())).Result()
if err != nil {
return err
}

_, err = tx.HDel(b.allQueuesKey(), queue.Spec.Name).Result()
return err
},
b.allQueuesKey(), b.queueKey(queue.UID.String()),
)

return err
}

func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error {
if b.WithoutTransaction {
return b.deleteQueueWithoutTransaction(ctx, queueName)
} else {
return b.deleteQueueWithTransaction(ctx, queueName)
}
}

func (b *Backend) ensureQueueExistsByName(rds redis.Cmdable, queueName string) (*taskqueue.TaskQueue, error) {
uid, err := b.lookupQueueUID(rds, queueName)
if err != nil {
Expand Down
81 changes: 77 additions & 4 deletions pkg/backend/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ var _ = Describe("Backend", func() {
ibackend, err := NewBackend(logger, backendconfig.Config{
BackendType: "redis",
Redis: &backendconfig.RedisConfig{
KeyPrefix: "test",
Client: client,
Backoff: backoffConfig,
ChunkSizeInGet: 1000,
KeyPrefix: "test",
Client: client,
Backoff: backoffConfig,
ChunkSizeInGet: 1000,
ChunkSizeInDelete: 1000,
},
})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -277,6 +278,20 @@ var _ = Describe("Backend", func() {
Expect(err).To(Equal(iface.TaskQueueNotFound))
})
})
When("the large queue exists", func() {
It("can delete the queue", func() {
testutil.MustCreateQueue(backend, SampleQueueSpec)
// numOfTasks % chunkSize != 0 && numOfTasks > chunkSize
numOfTasks := 1000 // numOfTasks < ChunkSizeInDelete
for i := 0; i < numOfTasks; i++ {
_, err := backend.AddTask(context.Background(), QueueName, SampleTaskSpec)
Expect(err).NotTo(HaveOccurred())
}

err := backend.DeleteQueue(context.Background(), SampleQueueSpec.Name)
Expect(err).To(Equal(iface.TaskQueueIsTooLarge))
})
})
})
})

Expand Down Expand Up @@ -1032,3 +1047,61 @@ var _ = Describe("Backend", func() {
})
})
})

var _ = Describe("BackendWihoutTransaction", func() {
var backend *Backend
BeforeEach(func() {
var err error
backoffConfig := backendconfig.DefaultBackoffConfig()
backoffConfig.MaxRetry = 0
ibackend, err := NewBackend(logger, backendconfig.Config{
BackendType: "redis",
Redis: &backendconfig.RedisConfig{
KeyPrefix: "test",
Client: client,
Backoff: backoffConfig,
ChunkSizeInGet: 1000,
ChunkSizeInDelete: 1000,
WithoutTransaction: true,
},
})
Expect(err).NotTo(HaveOccurred())
backend, _ = ibackend.(*Backend)
})

AfterEach(func() {
keys, err := client.Keys("*").Result()
Expect(err).NotTo(HaveOccurred())
if len(keys) > 0 {
num, err := client.Del(keys...).Result()
Expect(err).NotTo(HaveOccurred())
Expect(num).To(Equal(int64(len(keys))))
}
})

Context("Queue Operation", func() {
Context("DeleteQueue", func() {
When("the large queue exists", func() {
It("can delete the queue", func() {
queue := testutil.MustCreateQueue(backend, SampleQueueSpec)
// numOfTasks % chunkSize != 0 && numOfTasks > chunkSize
// numOfTaksk <= chunSize
numOfTasks := 1000
for i := 0; i < numOfTasks; i++ {
_, err := backend.AddTask(context.Background(), QueueName, SampleTaskSpec)
Expect(err).NotTo(HaveOccurred())
}

Expect(backend.DeleteQueue(context.Background(), SampleQueueSpec.Name)).NotTo(HaveOccurred())

queuesHash, err := client.HGetAll(backend.allQueuesKey()).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(queuesHash)).To(Equal(0))
keys, err := client.Keys(backend.queueKey(queue.UID.String()) + "*").Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(keys)).To(Equal(0))
})
})
})
})
})
31 changes: 23 additions & 8 deletions pkg/backend/redis/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,7 @@ func (b *Backend) getTasksByUIDs(queueUID string, taskUIDs []string, filter func
}

func (b *Backend) getTasks(queueUID string, filter func(*task.Task) bool, lggr zerolog.Logger) ([]*task.Task, error) {
taskUIDs, err := b.Client.SMembers(b.tasksKey(queueUID)).Result()
if err == redis.Nil {
return []*task.Task{}, nil
}
taskUIDs, err := b.allTaskUIDsByQueueUID(b.Client, queueUID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -938,10 +935,7 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string)
b.deadletterQueueKey(queueUID),
b.pendingTaskQueueKey(queueUID),
}
taskUIDs, err := rds.SMembers(b.tasksKey(queueUID)).Result()
if err == redis.Nil {
return []string{}, nil
}
taskUIDs, err := b.allTaskUIDsByQueueUID(rds, queueUID)
if err != nil {
return []string{}, err
}
Expand All @@ -950,3 +944,24 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string)
}
return keysToDelete, nil
}

func (b *Backend) allTaskUIDsByQueueUID(rds redis.Cmdable, queueUID string) ([]string, error) {
var chunkSize = int64(b.ChunkSizeInGet)
var cursor uint64
var taskUIDs []string
for {
keys, nextCursor, err := rds.SScan(b.tasksKey(queueUID), cursor, "", chunkSize).Result()
if err == redis.Nil {
return []string{}, nil
}
if err != nil {
return []string{}, err
}
taskUIDs = append(taskUIDs, keys...)
cursor = nextCursor
if cursor == 0 {
break
}
}
return taskUIDs, nil
}
5 changes: 4 additions & 1 deletion pkg/backend/redis/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (b *Backend) ensureWorkerExistsByUID(rds redis.Cmdable, queue *taskqueue.Ta
}

func (b *Backend) allWorkersKeysForDeleteQueue(rds redis.Cmdable, queueUID string) ([]string, error) {
keysToDelete := []string{b.workersKey(queueUID)}
keysToDelete := []string{}
workerUIDs, err := rds.SMembers(b.workersKey(queueUID)).Result()
if err == redis.Nil {
return []string{}, nil
Expand All @@ -497,5 +497,8 @@ func (b *Backend) allWorkersKeysForDeleteQueue(rds redis.Cmdable, queueUID strin
b.workerPendingTaskQueueKey(queueUID, workerUID),
b.workerTasksKey(queueUID, workerUID))
}

// If you are not using transactions, you need to delete the wokers key at the end.
keysToDelete = append(keysToDelete, b.workersKey(queueUID))
return keysToDelete, nil
}
7 changes: 4 additions & 3 deletions pkg/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ var _ = Describe("Worker", func() {
bcknd, err = backendfactory.NewBackend(logger, backendconfig.Config{
BackendType: "redis",
Redis: &backendconfig.RedisConfig{
Client: client,
Backoff: backendConfig,
ChunkSizeInGet: 1000,
Client: client,
Backoff: backendConfig,
ChunkSizeInGet: 1000,
ChunkSizeInDelete: 1000,
},
})
Expect(err).NotTo(HaveOccurred())
Expand Down