Skip to content

Commit

Permalink
satellite/metainfo: implement CompressedBatch
Browse files Browse the repository at this point in the history
this implements the CompressedBatch rpc by
decompressing an incoming request, dispatching
to Batch, then compressing the response with
the supported compression scheme.

this also adds DownloadObject to the set of
rpcs supported by the Batch endpoint so that
it can be used compressed.

Change-Id: If7444e7a66eb5c138d405f5c28f7f0031154911c
  • Loading branch information
zeebo committed Mar 14, 2024
1 parent d6adf9b commit 9b05b01
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 7 deletions.
4 changes: 2 additions & 2 deletions go.mod
Expand Up @@ -21,6 +21,7 @@ require (
github.com/jtolds/monkit-hw/v2 v2.0.0-20191108235325-141a0da276b3
github.com/jtolio/mito v0.0.0-20230523171229-d78ef06bb77b
github.com/jtolio/noiseconn v0.0.0-20230301220541-88105e6c8ac6
github.com/klauspost/compress v1.17.0
github.com/loov/hrtime v1.0.3
github.com/mattn/go-sqlite3 v1.14.19
github.com/nsf/jsondiff v0.0.0-20200515183724-f29ed568f4ce
Expand Down Expand Up @@ -58,7 +59,7 @@ require (
golang.org/x/time v0.5.0
gopkg.in/segmentio/analytics-go.v3 v3.1.0
gopkg.in/yaml.v3 v3.0.1
storj.io/common v0.0.0-20240307142546-4b79a61f8a8e
storj.io/common v0.0.0-20240312163747-de28b7045716
storj.io/drpc v0.0.33
storj.io/eventkit v0.0.0-20240306141230-6cb545e5f892
storj.io/monkit-jaeger v0.0.0-20240221095020-52b0792fa6cd
Expand Down Expand Up @@ -111,7 +112,6 @@ require (
github.com/jtolds/tracetagger/v2 v2.0.0-rc5 // indirect
github.com/jtolio/crawlspace v0.0.0-20231116162947-3ec5cc6b36c5 // indirect
github.com/jtolio/crawlspace/tools v0.0.0-20231116162947-3ec5cc6b36c5 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -886,8 +886,8 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ=
storj.io/common v0.0.0-20240307142546-4b79a61f8a8e h1:R6AhBE/MSH9f7fjEmHSM+Jdxcmrp+ppOjIiRsc1R6oc=
storj.io/common v0.0.0-20240307142546-4b79a61f8a8e/go.mod h1:MFl009RHY4tIqySVNy/6EmgRw2q60d26h9N/nb7JxGU=
storj.io/common v0.0.0-20240312163747-de28b7045716 h1:V3jSwIiO1O8KtihdhC4vNhtr0BYUKNkJKFI6fvdPKiA=
storj.io/common v0.0.0-20240312163747-de28b7045716/go.mod h1:MFl009RHY4tIqySVNy/6EmgRw2q60d26h9N/nb7JxGU=
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
storj.io/drpc v0.0.33 h1:yCGZ26r66ZdMP0IcTYsj7WDAUIIjzXk6DJhbhvt9FHI=
storj.io/drpc v0.0.33/go.mod h1:vR804UNzhBa49NOJ6HeLjd2H3MakC1j5Gv8bsOQT6N4=
Expand Down
60 changes: 60 additions & 0 deletions satellite/metainfo/batch.go
Expand Up @@ -8,11 +8,57 @@ import (

"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/exp/slices"

"storj.io/common/pb"
"storj.io/common/storj"
)

// CompressedBatch handles requests sent in batch that are compressed.
func (endpoint *Endpoint) CompressedBatch(ctx context.Context, req *pb.CompressedBatchRequest) (resp *pb.CompressedBatchResponse, err error) {
defer mon.Task()(&ctx)(&err)

var reqData []byte
switch req.Selected {
case pb.CompressedBatchRequest_NONE:
reqData = req.Data
case pb.CompressedBatchRequest_ZSTD:
reqData, err = endpoint.zstdDecoder.DecodeAll(req.Data, nil)
default:
err = errs.New("unsupported compression")
}
if err != nil {
return nil, errs.Wrap(err)
}

var unReq pb.BatchRequest
err = pb.Unmarshal(reqData, &unReq)
if err != nil {
return nil, errs.Wrap(err)
}

unResp, err := endpoint.Batch(ctx, &unReq)
if err != nil {
return nil, errs.Wrap(err)
}

unrespData, err := pb.Marshal(unResp)
if err != nil {
return nil, errs.Wrap(err)
}

resp = new(pb.CompressedBatchResponse)
if slices.Contains(req.Supported, pb.CompressedBatchRequest_ZSTD) {
resp.Data = endpoint.zstdEncoder.EncodeAll(unrespData, nil)
resp.Selected = pb.CompressedBatchRequest_ZSTD
} else {
resp.Data = unrespData
resp.Selected = pb.CompressedBatchRequest_NONE
}

return resp, nil
}

// Batch handle requests sent in batch.
func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp *pb.BatchResponse, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down Expand Up @@ -181,6 +227,20 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
if response != nil && response.Object != nil {
lastStreamID = response.Object.StreamId
}
case *pb.BatchRequestItem_ObjectDownload:
singleRequest.ObjectDownload.Header = req.Header
response, err := endpoint.DownloadObject(ctx, singleRequest.ObjectDownload)
if err != nil {
return resp, err
}
resp.Responses = append(resp.Responses, &pb.BatchResponseItem{
Response: &pb.BatchResponseItem_ObjectDownload{
ObjectDownload: response,
},
})
if response != nil && response.Object != nil {
lastStreamID = response.Object.StreamId
}
case *pb.BatchRequestItem_ObjectList:
singleRequest.ObjectList.Header = req.Header
response, err := endpoint.ListObjects(ctx, singleRequest.ObjectList)
Expand Down
17 changes: 17 additions & 0 deletions satellite/metainfo/endpoint.go
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

"github.com/klauspost/compress/zstd"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
Expand Down Expand Up @@ -82,6 +83,8 @@ type Endpoint struct {
defaultRS *pb.RedundancyScheme
config ExtendedConfig
versionCollector *versionCollector
zstdDecoder *zstd.Decoder
zstdEncoder *zstd.Encoder
}

// NewEndpoint creates new metainfo endpoint instance.
Expand Down Expand Up @@ -113,6 +116,18 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase
ErasureShareSize: config.RS.ErasureShareSize.Int32(),
}

decoder, err := zstd.NewReader(nil,
zstd.WithDecoderMaxMemory(64<<20),
)
if err != nil {
return nil, errs.Wrap(err)
}

encoder, err := zstd.NewWriter(nil)
if err != nil {
return nil, errs.Wrap(err)
}

return &Endpoint{
log: log,
buckets: buckets,
Expand All @@ -139,6 +154,8 @@ func NewEndpoint(log *zap.Logger, buckets *buckets.Service, metabaseDB *metabase
defaultRS: defaultRSScheme,
config: extendedConfig,
versionCollector: newVersionCollector(log),
zstdDecoder: decoder,
zstdEncoder: encoder,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion testsuite/storjscan/go.mod
Expand Up @@ -9,7 +9,7 @@ require (
github.com/zeebo/errs v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.6.0
storj.io/common v0.0.0-20240307142546-4b79a61f8a8e
storj.io/common v0.0.0-20240312163747-de28b7045716
storj.io/storj v1.63.1
storj.io/storjscan v0.0.0-20220926140643-1623c3b391b0
storj.io/uplink v1.12.3-0.20240227083244-7974a2e1a6c2
Expand Down Expand Up @@ -81,6 +81,7 @@ require (
github.com/jtolio/crawlspace/tools v0.0.0-20231116162947-3ec5cc6b36c5 // indirect
github.com/jtolio/mito v0.0.0-20230523171229-d78ef06bb77b // indirect
github.com/jtolio/noiseconn v0.0.0-20230301220541-88105e6c8ac6 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions testsuite/storjscan/go.sum
Expand Up @@ -450,6 +450,7 @@ github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0
github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
Expand Down Expand Up @@ -1208,8 +1209,8 @@ rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ=
storj.io/common v0.0.0-20240307142546-4b79a61f8a8e h1:R6AhBE/MSH9f7fjEmHSM+Jdxcmrp+ppOjIiRsc1R6oc=
storj.io/common v0.0.0-20240307142546-4b79a61f8a8e/go.mod h1:MFl009RHY4tIqySVNy/6EmgRw2q60d26h9N/nb7JxGU=
storj.io/common v0.0.0-20240312163747-de28b7045716 h1:V3jSwIiO1O8KtihdhC4vNhtr0BYUKNkJKFI6fvdPKiA=
storj.io/common v0.0.0-20240312163747-de28b7045716/go.mod h1:MFl009RHY4tIqySVNy/6EmgRw2q60d26h9N/nb7JxGU=
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
storj.io/drpc v0.0.33 h1:yCGZ26r66ZdMP0IcTYsj7WDAUIIjzXk6DJhbhvt9FHI=
storj.io/drpc v0.0.33/go.mod h1:vR804UNzhBa49NOJ6HeLjd2H3MakC1j5Gv8bsOQT6N4=
Expand Down

0 comments on commit 9b05b01

Please sign in to comment.