Skip to content

Commit

Permalink
storagenode/piecestore: better monkit metric for download
Browse files Browse the repository at this point in the history
Download is server from two goroutines:

 * one is waiting for the orders (and updates the actual limit)
 * other one sends the valuable bytes back to the client (in case the actual order is big enough)

These two tasks are syncrhonized with the help of a `sync2.NewThrottle()`

But all of these happens in the same method, therefore we have no idea how much time is spent on waiting for next orders
 (throttle can wait until we receive new orderlimit), and how much time is spent with actual work.

This patch moves the actual work (after sending routine is waked up) to a separated method to have better visibility and measure the actual work (read data + send it).

Change-Id: Ia5068c544560a53bc2fcea6cb6fce85cfacbd95b
  • Loading branch information
elek authored and Storj Robot committed Mar 30, 2023
1 parent bef60f8 commit 00420b5
Showing 1 changed file with 38 additions and 29 deletions.
67 changes: 38 additions & 29 deletions storagenode/piecestore/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,35 +711,9 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
return nil // We don't need to return an error when client cancels.
}

chunkData := make([]byte, chunkSize)
_, err = pieceReader.Seek(currentOffset, io.SeekStart)
if err != nil {
endpoint.log.Error("error seeking on piecereader", zap.Error(err))
return rpcstatus.Wrap(rpcstatus.Internal, err)
}

// ReadFull is required to ensure we are sending the right amount of data.
_, err = io.ReadFull(pieceReader, chunkData)
if err != nil {
endpoint.log.Error("error reading from piecereader", zap.Error(err))
return rpcstatus.Wrap(rpcstatus.Internal, err)
}

err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
return stream.Send(&pb.PieceDownloadResponse{
Chunk: &pb.PieceDownloadResponse_Chunk{
Offset: currentOffset,
Data: chunkData,
},
})
})
if errs.Is(err, io.EOF) {
// err is io.EOF when uplink asked for a piece, but decided not to retrieve it,
// no need to propagate it
return nil
}
if err != nil {
return rpcstatus.Wrap(rpcstatus.Internal, err)
done, err := endpoint.sendData(ctx, stream, pieceReader, currentOffset, chunkSize)
if err != nil || done {
return err
}

currentOffset += chunkSize
Expand Down Expand Up @@ -810,6 +784,41 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
return rpcstatus.Wrap(rpcstatus.Internal, errs.Combine(sendErr, recvErr))
}

func (endpoint *Endpoint) sendData(ctx context.Context, stream pb.DRPCPiecestore_DownloadStream, pieceReader *pieces.Reader, currentOffset int64, chunkSize int64) (result bool, err error) {
defer mon.Task()(&ctx)(&err)
chunkData := make([]byte, chunkSize)
_, err = pieceReader.Seek(currentOffset, io.SeekStart)
if err != nil {
endpoint.log.Error("error seeking on piecereader", zap.Error(err))
return true, rpcstatus.Wrap(rpcstatus.Internal, err)
}

// ReadFull is required to ensure we are sending the right amount of data.
_, err = io.ReadFull(pieceReader, chunkData)
if err != nil {
endpoint.log.Error("error reading from piecereader", zap.Error(err))
return true, rpcstatus.Wrap(rpcstatus.Internal, err)
}

err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
return stream.Send(&pb.PieceDownloadResponse{
Chunk: &pb.PieceDownloadResponse_Chunk{
Offset: currentOffset,
Data: chunkData,
},
})
})
if errs.Is(err, io.EOF) {
// err is io.EOF when uplink asked for a piece, but decided not to retrieve it,
// no need to propagate it
return true, nil
}
if err != nil {
return true, rpcstatus.Wrap(rpcstatus.Internal, err)
}
return false, nil
}

// beginSaveOrder saves the order with all necessary information. It assumes it has been already verified.
func (endpoint *Endpoint) beginSaveOrder(limit *pb.OrderLimit) (_commit func(ctx context.Context, order *pb.Order), err error) {
defer mon.Task()(nil)(&err)
Expand Down

0 comments on commit 00420b5

Please sign in to comment.