Skip to content

feat: Initial index-builder implementation #18297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

benclive
Copy link
Contributor

@benclive benclive commented Jul 1, 2025

What this PR does / why we need it:
Adds an initial implementation of an index-builder for logs stored in dataobjects.

  • It reads the loki.metastore-events kafka topic and when it gets a big enough group of events (configurable with a flag), it'll process them in a batch to generate an index object: an aggregated stream section + a bloom filter for every column in every log object.
  • Reading the log objects & building the index objects is extremely parallelisble and CPU bound. When I tested it in a dev environment it maxed out 10 vCPUs for the whole period and processed 32 objects in just over 4 minute. This should be fast enough for now.
  • I added a flag to only enable this for a specific set of tenants, because it doesn't (currently) handle multi-tenancy properly because it doesn't commit back to kafka in a multi-tenant aware way.
  • I chose to add this as part of the dataobj-consumer component, with flags to enable and disable it. I'm not sure I like this though so it might make more sense to create a new Loki component for it. That said, it's conceptually quite similar to the existing dataobj-consumer's partition-processor code.

Which issue(s) this PR fixes:
Part of https://github.com/grafana/loki-private/issues/1725

@benclive benclive requested a review from a team as a code owner July 1, 2025 14:13
@benclive benclive changed the title Benclive/prototype index builder feat: Initial index-builder implementation Jul 1, 2025
Copy link
Contributor

github-actions bot commented Jul 1, 2025

Copy link
Contributor

@chaudum chaudum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only skimmed pkg/dataobj/consumer/indexing/indexobj/builder.go because it is dictated by the pointer section of the data object and contains a lot of (copied?) boilderplate code.

@@ -31,4 +41,9 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f)

f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", 60*60*time.Second, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes")
f.BoolVar(&cfg.PartitionProcessingEnabled, prefix+"partition-processing-enabled", true, "If true, partition processing will be enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the help text it is not clear to me what this setting (or rather "partition processing") is supposed to do. Do we need two separate config options to enable index building?

f.BoolVar(&cfg.PartitionProcessingEnabled, prefix+"partition-processing-enabled", true, "If true, partition processing will be enabled")
f.BoolVar(&cfg.IndexBuildingEnabled, prefix+"index-building-enabled", false, "If true, index building will be enabled")
f.IntVar(&cfg.IndexBuildingEventsPerIndex, prefix+"index-building-events-per-index", 32, "Experimental: The number of events to batch before building an index")
f.StringVar(&cfg.IndexStoragePrefix, prefix+"index-storage-prefix", "indexing-v0/", "Experimental: A prefix to use for storing indexes in object storage. Used to separate the metastore & index files during initial testing.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"indexing-v0/" is a strange prefix IMO. Maybe call it dataobjindex-v0/ or (dataobjindex/v0/)?

f.BoolVar(&cfg.IndexBuildingEnabled, prefix+"index-building-enabled", false, "If true, index building will be enabled")
f.IntVar(&cfg.IndexBuildingEventsPerIndex, prefix+"index-building-events-per-index", 32, "Experimental: The number of events to batch before building an index")
f.StringVar(&cfg.IndexStoragePrefix, prefix+"index-storage-prefix", "indexing-v0/", "Experimental: A prefix to use for storing indexes in object storage. Used to separate the metastore & index files during initial testing.")
f.Var(&cfg.EnabledTenantIDs, prefix+"enabled-tenant-ids", "Experimental: A list of tenant IDs to enable index building for. If empty, all tenants will be enabled.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really should be a per-tenant setting.

Comment on lines +172 to +175
if err != nil {
level.Error(objLogger).Log("msg", "failed to read object", "err", err)
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a danger here: Failed downloads will get unnoticed, because only successfully downloaded objects are consumed when building the index.

indexBuilder := indexing.NewIndexBuilder(ctx, s.logger, s.eventConsumerClient, builderCfg, s.bucket, "loki.metastore-events", s.reg, s.cfg.IndexBuildingEventsPerIndex, s.cfg.IndexStoragePrefix, s.cfg.EnabledTenantIDs)
indexBuilder.Start()
defer indexBuilder.Stop()
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer if the logic for consuming and processing records would be encapsulated in the index builder.

// Fetch the column statistics in order to init the bloom filters for each column
stats, err := logs.ReadStats(ctx, logsSection)
if err != nil {
level.Error(objLogger).Log("msg", "failed to read logs stats", "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to use the sectionLogger, do you?

Suggested change
level.Error(objLogger).Log("msg", "failed to read logs stats", "err", err)
level.Error(sectionLogger).Log("msg", "failed to read logs stats", "err", err)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for other Log() calls in this function

Comment on lines +38 to +40
Namespace: "loki",
Subsystem: "indexobj",
Name: "config_target_page_size_bytes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yesterday I learned that Namespace and Subsystem are going to be removed in future versions of prometheus/client_golang.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, that comment is from 2019 🤣

Comment on lines +99 to +105
flushFailures: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Subsystem: "indexobj",
Name: "flush_failures_total",

Help: "Total number of flush failures.",
}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also have a counter metric which counts all flushes (loki_indexobj_flush_count_total or sth like that)

if err != nil {
level.Error(logger).Log("msg", "failed to create consumer", "err", err)
return nil
if cfg.PartitionProcessingEnabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I understand the setting from the very first file of the diff. I think you need to improve the help text of the setting :)

if cfg.IndexBuildingEnabled {
consumerCfg := kafkaCfg
consumerCfg.AutoCreateTopicEnabled = true
consumerCfg.AutoCreateTopicDefaultPartitions = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants