-
Notifications
You must be signed in to change notification settings - Fork 3.7k
feat: Initial index-builder implementation #18297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
💻 Deploy preview available: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only skimmed pkg/dataobj/consumer/indexing/indexobj/builder.go because it is dictated by the pointer section of the data object and contains a lot of (copied?) boilderplate code.
@@ -31,4 +41,9 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f) | |||
|
|||
f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", 60*60*time.Second, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes") | |||
f.BoolVar(&cfg.PartitionProcessingEnabled, prefix+"partition-processing-enabled", true, "If true, partition processing will be enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the help text it is not clear to me what this setting (or rather "partition processing") is supposed to do. Do we need two separate config options to enable index building?
f.BoolVar(&cfg.PartitionProcessingEnabled, prefix+"partition-processing-enabled", true, "If true, partition processing will be enabled") | ||
f.BoolVar(&cfg.IndexBuildingEnabled, prefix+"index-building-enabled", false, "If true, index building will be enabled") | ||
f.IntVar(&cfg.IndexBuildingEventsPerIndex, prefix+"index-building-events-per-index", 32, "Experimental: The number of events to batch before building an index") | ||
f.StringVar(&cfg.IndexStoragePrefix, prefix+"index-storage-prefix", "indexing-v0/", "Experimental: A prefix to use for storing indexes in object storage. Used to separate the metastore & index files during initial testing.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"indexing-v0/" is a strange prefix IMO. Maybe call it dataobjindex-v0/
or (dataobjindex/v0/
)?
f.BoolVar(&cfg.IndexBuildingEnabled, prefix+"index-building-enabled", false, "If true, index building will be enabled") | ||
f.IntVar(&cfg.IndexBuildingEventsPerIndex, prefix+"index-building-events-per-index", 32, "Experimental: The number of events to batch before building an index") | ||
f.StringVar(&cfg.IndexStoragePrefix, prefix+"index-storage-prefix", "indexing-v0/", "Experimental: A prefix to use for storing indexes in object storage. Used to separate the metastore & index files during initial testing.") | ||
f.Var(&cfg.EnabledTenantIDs, prefix+"enabled-tenant-ids", "Experimental: A list of tenant IDs to enable index building for. If empty, all tenants will be enabled.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This really should be a per-tenant setting.
if err != nil { | ||
level.Error(objLogger).Log("msg", "failed to read object", "err", err) | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a danger here: Failed downloads will get unnoticed, because only successfully downloaded objects are consumed when building the index.
indexBuilder := indexing.NewIndexBuilder(ctx, s.logger, s.eventConsumerClient, builderCfg, s.bucket, "loki.metastore-events", s.reg, s.cfg.IndexBuildingEventsPerIndex, s.cfg.IndexStoragePrefix, s.cfg.EnabledTenantIDs) | ||
indexBuilder.Start() | ||
defer indexBuilder.Stop() | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer if the logic for consuming and processing records would be encapsulated in the index builder.
// Fetch the column statistics in order to init the bloom filters for each column | ||
stats, err := logs.ReadStats(ctx, logsSection) | ||
if err != nil { | ||
level.Error(objLogger).Log("msg", "failed to read logs stats", "err", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want to use the sectionLogger
, do you?
level.Error(objLogger).Log("msg", "failed to read logs stats", "err", err) | |
level.Error(sectionLogger).Log("msg", "failed to read logs stats", "err", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for other Log() calls in this function
Namespace: "loki", | ||
Subsystem: "indexobj", | ||
Name: "config_target_page_size_bytes", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yesterday I learned that Namespace and Subsystem are going to be removed in future versions of prometheus/client_golang
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, that comment is from 2019 🤣
flushFailures: prometheus.NewCounter(prometheus.CounterOpts{ | ||
Namespace: "loki", | ||
Subsystem: "indexobj", | ||
Name: "flush_failures_total", | ||
|
||
Help: "Total number of flush failures.", | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also have a counter metric which counts all flushes (loki_indexobj_flush_count_total
or sth like that)
if err != nil { | ||
level.Error(logger).Log("msg", "failed to create consumer", "err", err) | ||
return nil | ||
if cfg.PartitionProcessingEnabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, now I understand the setting from the very first file of the diff. I think you need to improve the help text of the setting :)
if cfg.IndexBuildingEnabled { | ||
consumerCfg := kafkaCfg | ||
consumerCfg.AutoCreateTopicEnabled = true | ||
consumerCfg.AutoCreateTopicDefaultPartitions = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
What this PR does / why we need it:
Adds an initial implementation of an index-builder for logs stored in dataobjects.
loki.metastore-events
kafka topic and when it gets a big enough group of events (configurable with a flag), it'll process them in a batch to generate an index object: an aggregated stream section + a bloom filter for every column in every log object.Which issue(s) this PR fixes:
Part of https://github.com/grafana/loki-private/issues/1725