From d432a7197a8c26d25950fd1763229c9a5fb3035e Mon Sep 17 00:00:00 2001 From: Clement Sam Date: Tue, 26 Sep 2023 12:49:34 +0000 Subject: [PATCH] storagenode/piecestore: notify if download piece was restored from trash Updates https://github.com/storj/storj/issues/6146 Change-Id: Iece285eb5ecb6898b29096416ab10e43338480b0 --- storagenode/piecestore/endpoint.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index 9ddfd5cf3ba7..6ad4e575f8af 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -665,6 +665,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err close(downloadedBytes) }() + restoredFromTrash := false pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId) if err != nil { if !errs.Is(err, fs.ErrNotExist) { @@ -683,6 +684,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err endpoint.monitor.VerifyDirReadableLoop.TriggerWait() return rpcstatus.Wrap(rpcstatus.NotFound, tryRestoreErr) } + restoredFromTrash = true mon.Meter("download_file_in_trash", monkit.NewSeriesTag("namespace", limit.SatelliteId.String()), monkit.NewSeriesTag("piece_id", limit.PieceId.String())).Mark(1) endpoint.log.Warn("file found in trash", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.String("Remote Address", remoteAddr)) @@ -713,12 +715,21 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err } err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) { - return stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit}) + return stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit, RestoredFromTrash: restoredFromTrash}) }) if err != nil { endpoint.log.Error("error sending hash and order limit", zap.Error(err)) return rpcstatus.Wrap(rpcstatus.Internal, err) } + } else if restoredFromTrash { + // notify that the piece was restored from trash + err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) { + return stream.Send(&pb.PieceDownloadResponse{RestoredFromTrash: restoredFromTrash}) + }) + if err != nil { + endpoint.log.Error("error sending response", zap.Error(err)) + return rpcstatus.Wrap(rpcstatus.Internal, err) + } } // TODO: verify chunk.Size behavior logic with regards to reading all