Skip to content

Commit

Permalink
storagenode/piecestore: notify if download piece was restored from trash
Browse files Browse the repository at this point in the history
Updates #6146

Change-Id: Iece285eb5ecb6898b29096416ab10e43338480b0
  • Loading branch information
profclems authored and Storj Robot committed Oct 5, 2023
1 parent e072b37 commit d432a71
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion storagenode/piecestore/endpoint.go
Expand Up @@ -665,6 +665,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
close(downloadedBytes)
}()

restoredFromTrash := false
pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId)
if err != nil {
if !errs.Is(err, fs.ErrNotExist) {
Expand All @@ -683,6 +684,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
endpoint.monitor.VerifyDirReadableLoop.TriggerWait()
return rpcstatus.Wrap(rpcstatus.NotFound, tryRestoreErr)
}
restoredFromTrash = true
mon.Meter("download_file_in_trash", monkit.NewSeriesTag("namespace", limit.SatelliteId.String()), monkit.NewSeriesTag("piece_id", limit.PieceId.String())).Mark(1)
endpoint.log.Warn("file found in trash", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.String("Remote Address", remoteAddr))

Expand Down Expand Up @@ -713,12 +715,21 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
}

err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
return stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit})
return stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit, RestoredFromTrash: restoredFromTrash})
})
if err != nil {
endpoint.log.Error("error sending hash and order limit", zap.Error(err))
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
} else if restoredFromTrash {
// notify that the piece was restored from trash
err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
return stream.Send(&pb.PieceDownloadResponse{RestoredFromTrash: restoredFromTrash})
})
if err != nil {
endpoint.log.Error("error sending response", zap.Error(err))
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
}

// TODO: verify chunk.Size behavior logic with regards to reading all
Expand Down

0 comments on commit d432a71

Please sign in to comment.