Skip to content

Commit

Permalink
bloom blocks downloading queue (grafana#11201)
Browse files Browse the repository at this point in the history
implemented bloom blocks downloading queue to control the concurrency of
downloading the blocks from the storage

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
  • Loading branch information
vlad-diachenko committed Nov 24, 2023
1 parent 10fe48b commit 75cfe59
Show file tree
Hide file tree
Showing 15 changed files with 541 additions and 250 deletions.
14 changes: 14 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2248,6 +2248,16 @@ bloom_shipper:
# Working directory to store downloaded Bloom Blocks.
# CLI flag: -bloom.shipper.working-directory
[working_directory: <string> | default = "bloom-shipper"]

blocks_downloading_queue:
# The count of parallel workers that download Bloom Blocks.
# CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count
[workers_count: <int> | default = 100]

# Maximum number of task in queue per tenant per bloom-gateway. Enqueuing
# the tasks above this limit will fail an error.
# CLI flag: -bloom.shipper.blocks-downloading-queue.max_tasks_enqueued_per_tenant
[max_tasks_enqueued_per_tenant: <int> | default = 10000]
```

### chunk_store_config
Expand Down Expand Up @@ -2990,6 +3000,10 @@ shard_streams:
# CLI flag: -bloom-compactor.false-positive-rate
[bloom_false_positive_rate: <float> | default = 0.01]

# Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.
# CLI flag: -bloom-gateway.blocks-downloading-parallelism
[bloom_gateway_blocks_downloading_parallelism: <int> | default = 50]

# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type Gateway struct {
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
Expand All @@ -205,7 +205,7 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, s
return nil, err
}

bloomShipper, err := bloomshipper.NewShipper(client, storageCfg.BloomShipperConfig, logger)
bloomShipper, err := bloomshipper.NewShipper(client, storageCfg.BloomShipperConfig, overrides, logger, reg)
if err != nil {
return nil, err
}
Expand Down
26 changes: 22 additions & 4 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestBloomGateway_StartStopService(t *testing.T) {
},
}

gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {

t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {

t.Run("returns error if chunk refs do not belong to tenant", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)

ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00")
Expand All @@ -212,7 +212,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {

t.Run("gateway tracks active users", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -248,3 +248,21 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
require.ElementsMatch(t, tenants, gw.activeUsers.ActiveUsers())
})
}

type fakeLimits struct {
}

func (f fakeLimits) BloomGatewayShardSize(_ string) int {
//TODO implement me
panic("implement me")
}

func (f fakeLimits) BloomGatewayEnabled(_ string) bool {
//TODO implement me
panic("implement me")
}

func (f fakeLimits) BloomGatewayBlocksDownloadingParallelism(_ string) int {
//TODO implement me
panic("implement me")
}
1 change: 1 addition & 0 deletions pkg/bloomgateway/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
type Limits interface {
BloomGatewayShardSize(tenantID string) int
BloomGatewayEnabled(tenantID string) bool
BloomGatewayBlocksDownloadingParallelism(userID string) int
}

type ShardingStrategy interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ func (t *Loki) initBloomGateway() (services.Service, error) {

shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger)

gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer)
gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, t.Overrides, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/bloom/v1/block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
)

const (
bloomFileName = "bloom"
seriesFileName = "series"
BloomFileName = "bloom"
SeriesFileName = "series"
)

type BlockWriter interface {
Expand Down Expand Up @@ -66,12 +66,12 @@ func (b *DirectoryBlockWriter) Init() error {
return errors.Wrap(err, "creating bloom block dir")
}

b.index, err = os.Create(filepath.Join(b.dir, seriesFileName))
b.index, err = os.Create(filepath.Join(b.dir, SeriesFileName))
if err != nil {
return errors.Wrap(err, "creating series file")
}

b.blooms, err = os.Create(filepath.Join(b.dir, bloomFileName))
b.blooms, err = os.Create(filepath.Join(b.dir, BloomFileName))
if err != nil {
return errors.Wrap(err, "creating bloom file")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func NewDirectoryBlockReader(dir string) *DirectoryBlockReader {
func (r *DirectoryBlockReader) Init() error {
if !r.initialized {
var err error
r.index, err = os.Open(filepath.Join(r.dir, seriesFileName))
r.index, err = os.Open(filepath.Join(r.dir, SeriesFileName))
if err != nil {
return errors.Wrap(err, "opening series file")
}

r.blooms, err = os.Open(filepath.Join(r.dir, bloomFileName))
r.blooms, err = os.Open(filepath.Join(r.dir, BloomFileName))
if err != nil {
return errors.Wrap(err, "opening bloom file")
}
Expand Down
230 changes: 230 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package bloomshipper

import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/queue"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
)

type blockDownloader struct {
logger log.Logger

workingDirectory string
queueMetrics *queue.Metrics
queue *queue.RequestQueue
blockClient BlockClient
limits Limits
activeUsersService *util.ActiveUsersCleanupService

ctx context.Context
manager *services.Manager
onWorkerStopCallback func()
}

func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) {
queueMetrics := queue.NewMetrics(reg, constants.Loki, "bloom_blocks_downloader")
//add cleanup service
downloadingQueue := queue.NewRequestQueue(config.BlocksDownloadingQueue.MaxTasksEnqueuedPerTenant, time.Minute, queueMetrics)
activeUsersService := util.NewActiveUsersCleanupWithDefaultValues(queueMetrics.Cleanup)

ctx := context.Background()
manager, err := services.NewManager(downloadingQueue, activeUsersService)
if err != nil {
return nil, fmt.Errorf("error creating service manager: %w", err)
}
err = services.StartManagerAndAwaitHealthy(ctx, manager)
if err != nil {
return nil, fmt.Errorf("error starting service manager: %w", err)
}

b := &blockDownloader{
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
onWorkerStopCallback: onWorkerStopNoopCallback,
}

for i := 0; i < config.BlocksDownloadingQueue.WorkersCount; i++ {
go b.serveDownloadingTasks(fmt.Sprintf("worker-%d", i))
}
return b, nil
}

type BlockDownloadingTask struct {
ctx context.Context
block BlockRef
// ErrCh is a send-only channel to write an error to
ErrCh chan<- error
// ResultsCh is a send-only channel to return the block querier for the downloaded block
ResultsCh chan<- blockWithQuerier
}

func NewBlockDownloadingTask(ctx context.Context, block BlockRef, resCh chan<- blockWithQuerier, errCh chan<- error) *BlockDownloadingTask {
return &BlockDownloadingTask{
ctx: ctx,
block: block,
ErrCh: errCh,
ResultsCh: resCh,
}
}

// noop implementation
var onWorkerStopNoopCallback = func() {}

func (d *blockDownloader) serveDownloadingTasks(workerID string) {
logger := log.With(d.logger, "worker", workerID)
level.Debug(logger).Log("msg", "starting worker")

d.queue.RegisterConsumerConnection(workerID)
defer d.queue.UnregisterConsumerConnection(workerID)
//this callback is used only in the tests to assert that worker is stopped
defer d.onWorkerStopCallback()

idx := queue.StartIndexWithLocalQueue

for {
item, newIdx, err := d.queue.Dequeue(d.ctx, idx, workerID)
if err != nil {
if !errors.Is(err, queue.ErrStopped) && !errors.Is(err, context.Canceled) {
level.Error(logger).Log("msg", "failed to dequeue task", "err", err)
continue
}
level.Info(logger).Log("msg", "stopping worker")
return
}
task, ok := item.(*BlockDownloadingTask)
if !ok {
level.Error(logger).Log("msg", "failed to cast to BlockDownloadingTask", "item", fmt.Sprintf("%+v", item), "type", fmt.Sprintf("%T", item))
continue
}

idx = newIdx
blockPath := task.block.BlockPath
//todo add cache before downloading
level.Debug(logger).Log("msg", "start downloading the block", "block", blockPath)
block, err := d.blockClient.GetBlock(task.ctx, task.block)
if err != nil {
level.Error(logger).Log("msg", "error downloading the block", "block", blockPath, "err", err)
task.ErrCh <- fmt.Errorf("error downloading the block %s : %w", blockPath, err)
continue
}
directory, err := d.extractBlock(&block, time.Now())
if err != nil {
level.Error(logger).Log("msg", "error extracting the block", "block", blockPath, "err", err)
task.ErrCh <- fmt.Errorf("error extracting the block %s : %w", blockPath, err)
continue
}
level.Debug(d.logger).Log("msg", "block has been downloaded and extracted", "block", task.block.BlockPath, "directory", directory)
blockQuerier := d.createBlockQuerier(directory)
task.ResultsCh <- blockWithQuerier{
BlockRef: task.block,
BlockQuerier: blockQuerier,
}
}
}

func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, references []BlockRef) (chan blockWithQuerier, chan error) {
d.activeUsersService.UpdateUserTimestamp(tenantID, time.Now())
// we need to have errCh with size that can keep max count of errors to prevent the case when
// the queue worker reported the error to this channel before the current goroutine
// and this goroutine will go to the deadlock because it won't be able to report an error
// because nothing reads this channel at this moment.
errCh := make(chan error, len(references))
blocksCh := make(chan blockWithQuerier, len(references))

downloadingParallelism := d.limits.BloomGatewayBlocksDownloadingParallelism(tenantID)
for _, reference := range references {
task := NewBlockDownloadingTask(ctx, reference, blocksCh, errCh)
level.Debug(d.logger).Log("msg", "enqueuing task to download block", "block", reference.BlockPath)
err := d.queue.Enqueue(tenantID, nil, task, downloadingParallelism, nil)
if err != nil {
errCh <- fmt.Errorf("error enquing downloading task for block %s : %w", reference.BlockPath, err)
return blocksCh, errCh
}
}
return blocksCh, errCh
}

type blockWithQuerier struct {
BlockRef
*v1.BlockQuerier
}

// extract the files into directory and returns absolute path to this directory.
func (d *blockDownloader) extractBlock(block *Block, ts time.Time) (string, error) {
workingDirectoryPath := filepath.Join(d.workingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10))
err := os.MkdirAll(workingDirectoryPath, os.ModePerm)
if err != nil {
return "", fmt.Errorf("can not create directory to extract the block: %w", err)
}
archivePath, err := writeDataToTempFile(workingDirectoryPath, block)
if err != nil {
return "", fmt.Errorf("error writing data to temp file: %w", err)
}
defer func() {
os.Remove(archivePath)
// todo log err
}()
err = extractArchive(archivePath, workingDirectoryPath)
if err != nil {
return "", fmt.Errorf("error extracting archive: %w", err)
}
return workingDirectoryPath, nil
}

func (d *blockDownloader) createBlockQuerier(directory string) *v1.BlockQuerier {
reader := v1.NewDirectoryBlockReader(directory)
block := v1.NewBlock(reader)
return v1.NewBlockQuerier(block)
}

func (d *blockDownloader) stop() {
_ = services.StopManagerAndAwaitStopped(d.ctx, d.manager)
}

func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) {
defer block.Data.Close()
archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:])

archiveFile, err := os.Create(archivePath)
if err != nil {
return "", fmt.Errorf("error creating empty file to store the archiver: %w", err)
}
defer archiveFile.Close()
_, err = io.Copy(archiveFile, block.Data)
if err != nil {
return "", fmt.Errorf("error writing data to archive file: %w", err)
}
return archivePath, nil
}

func extractArchive(archivePath string, workingDirectoryPath string) error {
file, err := os.Open(archivePath)
if err != nil {
return fmt.Errorf("error opening archive file %s: %w", file.Name(), err)
}
return v1.UnTarGz(workingDirectoryPath, file)
}

0 comments on commit 75cfe59

Please sign in to comment.