From f15410e49ce1e4cb86c89eb8803292269a993779 Mon Sep 17 00:00:00 2001 From: Andreas Koerner Date: Tue, 8 Nov 2022 17:26:24 +0100 Subject: [PATCH] Task concurrency not being respected #23582 --- cmd/influxd/launcher/launcher.go | 13 +++--- influxql/query/internal/internal.pb.go | 2 +- pkg/tracing/wire/binary.pb.go | 30 ++++++------- storage/reads/datatypes/predicate.pb.go | 2 +- storage/reads/datatypes/storage_common.pb.go | 46 ++++++++++---------- tsdb/internal/fieldsindex.pb.go | 2 +- v1/services/meta/internal/meta.pb.go | 2 +- v1/services/storage/source.pb.go | 2 +- 8 files changed, 50 insertions(+), 49 deletions(-) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 2eb5099b61b..89f52beb3b0 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -451,15 +451,16 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { query.QueryServiceBridge{AsyncQueryService: m.queryController}, ) - executor, executorMetrics := executor.NewExecutor( - m.log.With(zap.String("service", "task-executor")), + exec, executorMetrics := executor.NewExecutor( + m.log.With(zap.String("service", "task-exec")), query.QueryServiceBridge{AsyncQueryService: m.queryController}, ts.UserService, combinedTaskService, combinedTaskService, executor.WithFlagger(m.flagger), ) - m.executor = executor + exec.SetLimitFunc(executor.ConcurrencyLimit(exec, fluxlang.DefaultService)) + m.executor = exec m.reg.MustRegister(executorMetrics.PrometheusCollectors()...) schLogger := m.log.With(zap.String("service", "task-scheduler")) @@ -470,7 +471,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { err error ) sch, sm, err = scheduler.NewScheduler( - executor, + exec, taskbackend.NewSchedulableTaskService(m.kvService), scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) { schLogger.Info( @@ -499,7 +500,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { taskCoord := coordinator.NewCoordinator( coordLogger, sch, - executor) + exec) taskSvc = middleware.New(combinedTaskService, taskCoord) if err := taskbackend.TaskNotifyCoordinatorOfExisting( @@ -508,7 +509,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { combinedTaskService, taskCoord, func(ctx context.Context, taskID platform2.ID, runID platform2.ID) error { - _, err := executor.ResumeCurrentRun(ctx, taskID, runID) + _, err := exec.ResumeCurrentRun(ctx, taskID, runID) return err }, coordLogger); err != nil { diff --git a/influxql/query/internal/internal.pb.go b/influxql/query/internal/internal.pb.go index 80fcd2f87ce..5fd71375640 100644 --- a/influxql/query/internal/internal.pb.go +++ b/influxql/query/internal/internal.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.17.3 +// protoc v3.6.1 // source: internal/internal.proto package query diff --git a/pkg/tracing/wire/binary.pb.go b/pkg/tracing/wire/binary.pb.go index ea25afb4994..d5cf9763089 100644 --- a/pkg/tracing/wire/binary.pb.go +++ b/pkg/tracing/wire/binary.pb.go @@ -1,15 +1,15 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.17.3 +// protoc v3.6.1 // source: binary.proto package wire import ( + timestamp "github.com/golang/protobuf/ptypes/timestamp" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -139,12 +139,12 @@ type Span struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Context *SpanContext `protobuf:"bytes,1,opt,name=context,proto3" json:"context,omitempty"` // [(gogoproto.nullable) = false]; - ParentSpanID uint64 `protobuf:"varint,2,opt,name=ParentSpanID,proto3" json:"ParentSpanID,omitempty"` - Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` - Start *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=Start,proto3" json:"Start,omitempty"` // [(gogoproto.customname) = "Start", (gogoproto.stdtime) = true, (gogoproto.nullable) = false]; - Labels []string `protobuf:"bytes,5,rep,name=labels,proto3" json:"labels,omitempty"` - Fields []*Field `protobuf:"bytes,6,rep,name=fields,proto3" json:"fields,omitempty"` // [(gogoproto.nullable) = false]; + Context *SpanContext `protobuf:"bytes,1,opt,name=context,proto3" json:"context,omitempty"` // [(gogoproto.nullable) = false]; + ParentSpanID uint64 `protobuf:"varint,2,opt,name=ParentSpanID,proto3" json:"ParentSpanID,omitempty"` + Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` + Start *timestamp.Timestamp `protobuf:"bytes,4,opt,name=Start,proto3" json:"Start,omitempty"` // [(gogoproto.customname) = "Start", (gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + Labels []string `protobuf:"bytes,5,rep,name=labels,proto3" json:"labels,omitempty"` + Fields []*Field `protobuf:"bytes,6,rep,name=fields,proto3" json:"fields,omitempty"` // [(gogoproto.nullable) = false]; } func (x *Span) Reset() { @@ -200,7 +200,7 @@ func (x *Span) GetName() string { return "" } -func (x *Span) GetStart() *timestamppb.Timestamp { +func (x *Span) GetStart() *timestamp.Timestamp { if x != nil { return x.Start } @@ -428,12 +428,12 @@ func file_binary_proto_rawDescGZIP() []byte { var file_binary_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_binary_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_binary_proto_goTypes = []interface{}{ - (FieldType)(0), // 0: wire.FieldType - (*SpanContext)(nil), // 1: wire.SpanContext - (*Span)(nil), // 2: wire.Span - (*Trace)(nil), // 3: wire.Trace - (*Field)(nil), // 4: wire.Field - (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp + (FieldType)(0), // 0: wire.FieldType + (*SpanContext)(nil), // 1: wire.SpanContext + (*Span)(nil), // 2: wire.Span + (*Trace)(nil), // 3: wire.Trace + (*Field)(nil), // 4: wire.Field + (*timestamp.Timestamp)(nil), // 5: google.protobuf.Timestamp } var file_binary_proto_depIdxs = []int32{ 1, // 0: wire.Span.context:type_name -> wire.SpanContext diff --git a/storage/reads/datatypes/predicate.pb.go b/storage/reads/datatypes/predicate.pb.go index e2d5c425333..f7caac90c02 100644 --- a/storage/reads/datatypes/predicate.pb.go +++ b/storage/reads/datatypes/predicate.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.17.3 +// protoc v3.6.1 // source: predicate.proto package datatypes diff --git a/storage/reads/datatypes/storage_common.pb.go b/storage/reads/datatypes/storage_common.pb.go index 7cf848606b2..717e632f255 100644 --- a/storage/reads/datatypes/storage_common.pb.go +++ b/storage/reads/datatypes/storage_common.pb.go @@ -1,15 +1,15 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.17.3 +// protoc v3.6.1 // source: storage_common.proto package datatypes import ( + any1 "github.com/golang/protobuf/ptypes/any" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - anypb "google.golang.org/protobuf/types/known/anypb" reflect "reflect" sync "sync" ) @@ -355,7 +355,7 @@ type ReadFilterRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ReadSource *anypb.Any `protobuf:"bytes,1,opt,name=ReadSource,proto3" json:"ReadSource,omitempty"` + ReadSource *any1.Any `protobuf:"bytes,1,opt,name=ReadSource,proto3" json:"ReadSource,omitempty"` Range *TimestampRange `protobuf:"bytes,2,opt,name=range,proto3" json:"range,omitempty"` Predicate *Predicate `protobuf:"bytes,3,opt,name=predicate,proto3" json:"predicate,omitempty"` } @@ -392,7 +392,7 @@ func (*ReadFilterRequest) Descriptor() ([]byte, []int) { return file_storage_common_proto_rawDescGZIP(), []int{0} } -func (x *ReadFilterRequest) GetReadSource() *anypb.Any { +func (x *ReadFilterRequest) GetReadSource() *any1.Any { if x != nil { return x.ReadSource } @@ -418,7 +418,7 @@ type ReadGroupRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ReadSource *anypb.Any `protobuf:"bytes,1,opt,name=ReadSource,proto3" json:"ReadSource,omitempty"` + ReadSource *any1.Any `protobuf:"bytes,1,opt,name=ReadSource,proto3" json:"ReadSource,omitempty"` Range *TimestampRange `protobuf:"bytes,2,opt,name=range,proto3" json:"range,omitempty"` Predicate *Predicate `protobuf:"bytes,3,opt,name=predicate,proto3" json:"predicate,omitempty"` // GroupKeys specifies a list of tag keys used to order the data. @@ -461,7 +461,7 @@ func (*ReadGroupRequest) Descriptor() ([]byte, []int) { return file_storage_common_proto_rawDescGZIP(), []int{1} } -func (x *ReadGroupRequest) GetReadSource() *anypb.Any { +func (x *ReadGroupRequest) GetReadSource() *any1.Any { if x != nil { return x.ReadSource } @@ -823,7 +823,7 @@ type TagKeysRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - TagsSource *anypb.Any `protobuf:"bytes,1,opt,name=TagsSource,proto3" json:"TagsSource,omitempty"` + TagsSource *any1.Any `protobuf:"bytes,1,opt,name=TagsSource,proto3" json:"TagsSource,omitempty"` Range *TimestampRange `protobuf:"bytes,2,opt,name=range,proto3" json:"range,omitempty"` Predicate *Predicate `protobuf:"bytes,3,opt,name=predicate,proto3" json:"predicate,omitempty"` } @@ -860,7 +860,7 @@ func (*TagKeysRequest) Descriptor() ([]byte, []int) { return file_storage_common_proto_rawDescGZIP(), []int{8} } -func (x *TagKeysRequest) GetTagsSource() *anypb.Any { +func (x *TagKeysRequest) GetTagsSource() *any1.Any { if x != nil { return x.TagsSource } @@ -887,7 +887,7 @@ type TagValuesRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - TagsSource *anypb.Any `protobuf:"bytes,1,opt,name=TagsSource,proto3" json:"TagsSource,omitempty"` + TagsSource *any1.Any `protobuf:"bytes,1,opt,name=TagsSource,proto3" json:"TagsSource,omitempty"` Range *TimestampRange `protobuf:"bytes,2,opt,name=range,proto3" json:"range,omitempty"` Predicate *Predicate `protobuf:"bytes,3,opt,name=predicate,proto3" json:"predicate,omitempty"` TagKey string `protobuf:"bytes,4,opt,name=tag_key,json=tagKey,proto3" json:"tag_key,omitempty"` @@ -925,7 +925,7 @@ func (*TagValuesRequest) Descriptor() ([]byte, []int) { return file_storage_common_proto_rawDescGZIP(), []int{9} } -func (x *TagValuesRequest) GetTagsSource() *anypb.Any { +func (x *TagValuesRequest) GetTagsSource() *any1.Any { if x != nil { return x.TagsSource } @@ -958,7 +958,7 @@ type ReadSeriesCardinalityRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ReadSource *anypb.Any `protobuf:"bytes,1,opt,name=ReadSource,proto3" json:"ReadSource,omitempty"` + ReadSource *any1.Any `protobuf:"bytes,1,opt,name=ReadSource,proto3" json:"ReadSource,omitempty"` Range *TimestampRange `protobuf:"bytes,2,opt,name=range,proto3" json:"range,omitempty"` Predicate *Predicate `protobuf:"bytes,3,opt,name=predicate,proto3" json:"predicate,omitempty"` } @@ -995,7 +995,7 @@ func (*ReadSeriesCardinalityRequest) Descriptor() ([]byte, []int) { return file_storage_common_proto_rawDescGZIP(), []int{10} } -func (x *ReadSeriesCardinalityRequest) GetReadSource() *anypb.Any { +func (x *ReadSeriesCardinalityRequest) GetReadSource() *any1.Any { if x != nil { return x.ReadSource } @@ -1071,7 +1071,7 @@ type MeasurementNamesRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Source *anypb.Any `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` + Source *any1.Any `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` Range *TimestampRange `protobuf:"bytes,2,opt,name=range,proto3" json:"range,omitempty"` Predicate *Predicate `protobuf:"bytes,3,opt,name=predicate,proto3" json:"predicate,omitempty"` } @@ -1108,7 +1108,7 @@ func (*MeasurementNamesRequest) Descriptor() ([]byte, []int) { return file_storage_common_proto_rawDescGZIP(), []int{12} } -func (x *MeasurementNamesRequest) GetSource() *anypb.Any { +func (x *MeasurementNamesRequest) GetSource() *any1.Any { if x != nil { return x.Source } @@ -1135,7 +1135,7 @@ type MeasurementTagKeysRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Source *anypb.Any `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` + Source *any1.Any `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` Measurement string `protobuf:"bytes,2,opt,name=measurement,proto3" json:"measurement,omitempty"` Range *TimestampRange `protobuf:"bytes,3,opt,name=range,proto3" json:"range,omitempty"` Predicate *Predicate `protobuf:"bytes,4,opt,name=predicate,proto3" json:"predicate,omitempty"` @@ -1173,7 +1173,7 @@ func (*MeasurementTagKeysRequest) Descriptor() ([]byte, []int) { return file_storage_common_proto_rawDescGZIP(), []int{13} } -func (x *MeasurementTagKeysRequest) GetSource() *anypb.Any { +func (x *MeasurementTagKeysRequest) GetSource() *any1.Any { if x != nil { return x.Source } @@ -1207,7 +1207,7 @@ type MeasurementTagValuesRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Source *anypb.Any `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` + Source *any1.Any `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` Measurement string `protobuf:"bytes,2,opt,name=measurement,proto3" json:"measurement,omitempty"` TagKey string `protobuf:"bytes,3,opt,name=tag_key,json=tagKey,proto3" json:"tag_key,omitempty"` Range *TimestampRange `protobuf:"bytes,4,opt,name=range,proto3" json:"range,omitempty"` @@ -1246,7 +1246,7 @@ func (*MeasurementTagValuesRequest) Descriptor() ([]byte, []int) { return file_storage_common_proto_rawDescGZIP(), []int{14} } -func (x *MeasurementTagValuesRequest) GetSource() *anypb.Any { +func (x *MeasurementTagValuesRequest) GetSource() *any1.Any { if x != nil { return x.Source } @@ -1287,7 +1287,7 @@ type MeasurementFieldsRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Source *anypb.Any `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` + Source *any1.Any `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` Measurement string `protobuf:"bytes,2,opt,name=measurement,proto3" json:"measurement,omitempty"` Range *TimestampRange `protobuf:"bytes,3,opt,name=range,proto3" json:"range,omitempty"` Predicate *Predicate `protobuf:"bytes,4,opt,name=predicate,proto3" json:"predicate,omitempty"` @@ -1325,7 +1325,7 @@ func (*MeasurementFieldsRequest) Descriptor() ([]byte, []int) { return file_storage_common_proto_rawDescGZIP(), []int{15} } -func (x *MeasurementFieldsRequest) GetSource() *anypb.Any { +func (x *MeasurementFieldsRequest) GetSource() *any1.Any { if x != nil { return x.Source } @@ -1406,7 +1406,7 @@ type ReadWindowAggregateRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ReadSource *anypb.Any `protobuf:"bytes,1,opt,name=ReadSource,proto3" json:"ReadSource,omitempty"` + ReadSource *any1.Any `protobuf:"bytes,1,opt,name=ReadSource,proto3" json:"ReadSource,omitempty"` Range *TimestampRange `protobuf:"bytes,2,opt,name=range,proto3" json:"range,omitempty"` Predicate *Predicate `protobuf:"bytes,3,opt,name=predicate,proto3" json:"predicate,omitempty"` WindowEvery int64 `protobuf:"varint,4,opt,name=WindowEvery,proto3" json:"WindowEvery,omitempty"` @@ -1447,7 +1447,7 @@ func (*ReadWindowAggregateRequest) Descriptor() ([]byte, []int) { return file_storage_common_proto_rawDescGZIP(), []int{17} } -func (x *ReadWindowAggregateRequest) GetReadSource() *anypb.Any { +func (x *ReadWindowAggregateRequest) GetReadSource() *any1.Any { if x != nil { return x.ReadSource } @@ -2634,7 +2634,7 @@ var file_storage_common_proto_goTypes = []interface{}{ (*ReadResponse_StringPointsFrame)(nil), // 33: influxdata.platform.storage.ReadResponse.StringPointsFrame nil, // 34: influxdata.platform.storage.CapabilitiesResponse.CapsEntry (*MeasurementFieldsResponse_MessageField)(nil), // 35: influxdata.platform.storage.MeasurementFieldsResponse.MessageField - (*anypb.Any)(nil), // 36: google.protobuf.Any + (*any1.Any)(nil), // 36: google.protobuf.Any (*Predicate)(nil), // 37: influxdata.platform.storage.Predicate } var file_storage_common_proto_depIdxs = []int32{ diff --git a/tsdb/internal/fieldsindex.pb.go b/tsdb/internal/fieldsindex.pb.go index fda9712b220..0bd4817975d 100644 --- a/tsdb/internal/fieldsindex.pb.go +++ b/tsdb/internal/fieldsindex.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.17.3 +// protoc v3.6.1 // source: internal/fieldsindex.proto package tsdb diff --git a/v1/services/meta/internal/meta.pb.go b/v1/services/meta/internal/meta.pb.go index 0dc03ad46d7..ed7b3cffb77 100644 --- a/v1/services/meta/internal/meta.pb.go +++ b/v1/services/meta/internal/meta.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.17.3 +// protoc v3.6.1 // source: internal/meta.proto package meta diff --git a/v1/services/storage/source.pb.go b/v1/services/storage/source.pb.go index d459a7812f5..37c248b423d 100644 --- a/v1/services/storage/source.pb.go +++ b/v1/services/storage/source.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.17.3 +// protoc v3.6.1 // source: source.proto package storage