From e4a02f4d0b7a88e48920281da595a68e93387584 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Tue, 26 Mar 2024 15:05:43 +0100 Subject: [PATCH] receive: try to fix pool race Signed-off-by: Michael Hoffmann --- .golangci.yml | 1 + pkg/receive/handler.go | 20 ++-- pkg/receive/writer.go | 4 +- .../storepb/remotewritepb/rpc_vtproto.pb.go | 97 ------------------- scripts/genproto.sh | 4 +- 5 files changed, 13 insertions(+), 113 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 14a5d412039..e97dd2c6e8c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -17,6 +17,7 @@ run: skip-dirs: - vendor - internal/cortex + - pkg/store/storepb/remotewritepb # output configuration options diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 8079a94bbc1..131749ff7aa 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -825,11 +825,7 @@ func (h *Handler) sendLocalWrite( span.SetTag("endpoint", writeDestination.endpoint) span.SetTag("replica", writeDestination.replica) - wreq := remotewritepb.WriteRequestFromVTPool() - wreq.Timeseries = trackedSeries.timeSeries - defer wreq.ReturnToVTPool() - - err := h.writer.Write(tracingCtx, tenant, wreq) + err := h.writer.Write(tracingCtx, tenant, trackedSeries.timeSeries) if err != nil { span.SetTag("error", true) span.SetTag("error.msg", err.Error()) @@ -864,15 +860,15 @@ func (h *Handler) sendRemoteWrite( // This is called "real" because it's 1-indexed. realReplicationIndex := int64(endpointReplica.replica + 1) - wreq := remotewritepb.StoreWriteRequestFromVTPool() - wreq.Timeseries = trackedSeries.timeSeries - wreq.Tenant = tenant - // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. - wreq.Replica = realReplicationIndex + wreq := &remotewritepb.StoreWriteRequest{ + Timeseries: trackedSeries.timeSeries, + Tenant: tenant, + // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. + Replica: realReplicationIndex, + } // Actually make the request against the endpoint we determined should handle these time series. cl.RemoteWriteAsync(ctx, wreq, endpointReplica, trackedSeries.seriesIDs, responses, func(err error) { - wreq.ReturnToVTPool() if err == nil { h.forwardRequests.WithLabelValues(labelSuccess).Inc() if !alreadyReplicated { @@ -908,8 +904,6 @@ func quorumReached(successes []int, successThreshold int) bool { // RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. func (h *Handler) RemoteWrite(ctx context.Context, r *remotewritepb.StoreWriteRequest) (*remotewritepb.StoreWriteResponse, error) { - defer r.ReturnToVTPool() - span, ctx := tracing.StartSpan(ctx, "receive_grpc") defer span.Finish() diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index d14bc64363a..be164bf7bb0 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -72,7 +72,7 @@ func NewWriter(logger log.Logger, multiTSDB TenantStorage, opts *WriterOptions) } } -func (r *Writer) Write(ctx context.Context, tenantID string, wreq *remotewritepb.WriteRequest) error { +func (r *Writer) Write(ctx context.Context, tenantID string, timeseries []*remotewritepb.TimeSeries) error { tLogger := log.With(r.logger, "tenant", tenantID) var ( @@ -112,7 +112,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *remotewritepb tooFarInFuture: r.opts.TooFarInFutureTimeWindow, Appender: app, } - for _, t := range wreq.Timeseries { + for _, t := range timeseries { // Check if time series labels are valid. If not, skip the time series // and report the error. lset := remotewritepb.LabelsToPromLabels(t.Labels) diff --git a/pkg/store/storepb/remotewritepb/rpc_vtproto.pb.go b/pkg/store/storepb/remotewritepb/rpc_vtproto.pb.go index 9d357cf0bc3..c3ddb704ce1 100644 --- a/pkg/store/storepb/remotewritepb/rpc_vtproto.pb.go +++ b/pkg/store/storepb/remotewritepb/rpc_vtproto.pb.go @@ -5,13 +5,9 @@ package remotewritepb import ( - context "context" binary "encoding/binary" fmt "fmt" protohelpers "github.com/planetscale/vtprotobuf/protohelpers" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" proto "google.golang.org/protobuf/proto" protoimpl "google.golang.org/protobuf/runtime/protoimpl" io "io" @@ -437,99 +433,6 @@ func (m *ChunkedSeries) CloneMessageVT() proto.Message { return m.CloneVT() } -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -// WriteableStoreClient is the client API for WriteableStore service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type WriteableStoreClient interface { - // WriteRequest allows you to write metrics to this store via remote write - RemoteWrite(ctx context.Context, in *StoreWriteRequest, opts ...grpc.CallOption) (*StoreWriteResponse, error) -} - -type writeableStoreClient struct { - cc grpc.ClientConnInterface -} - -func NewWriteableStoreClient(cc grpc.ClientConnInterface) WriteableStoreClient { - return &writeableStoreClient{cc} -} - -func (c *writeableStoreClient) RemoteWrite(ctx context.Context, in *StoreWriteRequest, opts ...grpc.CallOption) (*StoreWriteResponse, error) { - out := new(StoreWriteResponse) - err := c.cc.Invoke(ctx, "/thanos.WriteableStore/RemoteWrite", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// WriteableStoreServer is the server API for WriteableStore service. -// All implementations must embed UnimplementedWriteableStoreServer -// for forward compatibility -type WriteableStoreServer interface { - // WriteRequest allows you to write metrics to this store via remote write - RemoteWrite(context.Context, *StoreWriteRequest) (*StoreWriteResponse, error) - mustEmbedUnimplementedWriteableStoreServer() -} - -// UnimplementedWriteableStoreServer must be embedded to have forward compatible implementations. -type UnimplementedWriteableStoreServer struct { -} - -func (UnimplementedWriteableStoreServer) RemoteWrite(context.Context, *StoreWriteRequest) (*StoreWriteResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method RemoteWrite not implemented") -} -func (UnimplementedWriteableStoreServer) mustEmbedUnimplementedWriteableStoreServer() {} - -// UnsafeWriteableStoreServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to WriteableStoreServer will -// result in compilation errors. -type UnsafeWriteableStoreServer interface { - mustEmbedUnimplementedWriteableStoreServer() -} - -func RegisterWriteableStoreServer(s grpc.ServiceRegistrar, srv WriteableStoreServer) { - s.RegisterService(&WriteableStore_ServiceDesc, srv) -} - -func _WriteableStore_RemoteWrite_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := StoreWriteRequestFromVTPool() - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(WriteableStoreServer).RemoteWrite(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/thanos.WriteableStore/RemoteWrite", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(WriteableStoreServer).RemoteWrite(ctx, req.(*StoreWriteRequest)) - } - return interceptor(ctx, in, info, handler) -} - -// WriteableStore_ServiceDesc is the grpc.ServiceDesc for WriteableStore service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var WriteableStore_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "thanos.WriteableStore", - HandlerType: (*WriteableStoreServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "RemoteWrite", - Handler: _WriteableStore_RemoteWrite_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "store/storepb/remotewritepb/rpc.proto", -} - func (m *StoreWriteResponse) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil diff --git a/scripts/genproto.sh b/scripts/genproto.sh index 87cd3e0ccbd..47fc73d94be 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -62,9 +62,11 @@ for dir in ${DIRS}; do ${PROTOC_BIN} \ --go_out=paths=source_relative:. \ --plugin protoc-gen-go=${PROTOC_GEN_GO_BIN} \ + --go-grpc_out=paths=source_relative:. \ + --plugin protoc-gen-go-grpc=${PROTOC_GEN_GO_GRPC_BIN} \ --go-vtproto_out=paths=source_relative:. \ --plugin protoc-gen-go-vtproto=${PROTOC_GEN_GO_VTPROTO_BIN} \ - --go-vtproto_opt=features=grpc+marshal+unmarshal+size+pool+clone \ + --go-vtproto_opt=features=marshal+unmarshal+size+pool+clone \ -I=. \ -I="${VTPROTO_PATH}" \ ${dir}/*.proto