From 3f3dd53ec1840a5fab17bb6efd9f086cc4a6970a Mon Sep 17 00:00:00 2001 From: paul cannon Date: Sat, 3 Feb 2024 11:49:28 -0600 Subject: [PATCH] storagenode/piecestore: bit of logging cleanup Change-Id: I3bdbef8db684554eca1ed6cb8f169ff67f74be5d --- storagenode/piecestore/endpoint.go | 74 ++++++++++++++++-------------- 1 file changed, 40 insertions(+), 34 deletions(-) diff --git a/storagenode/piecestore/endpoint.go b/storagenode/piecestore/endpoint.go index 355fa38c4401..574f271d37d7 100644 --- a/storagenode/piecestore/endpoint.go +++ b/storagenode/piecestore/endpoint.go @@ -157,7 +157,10 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ return nil, rpcstatus.Wrap(rpcstatus.Unauthenticated, err) } - remoteAddrLogField := zap.String("Remote Address", getRemoteAddr(ctx)) + log := endpoint.log.With( + zap.Stringer("Satellite ID", delete.Limit.SatelliteId), + zap.Stringer("Piece ID", delete.Limit.PieceId), + zap.String("Remote Address", getRemoteAddr(ctx))) if err := endpoint.store.Delete(ctx, delete.Limit.SatelliteId, delete.Limit.PieceId); err != nil { // explicitly ignoring error because the errors @@ -166,9 +169,9 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ // report rpc status of internal server error or not found error, // e.g. not found might happen when we get a deletion request after garbage // collection has deleted it - endpoint.log.Error("delete failed", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), remoteAddrLogField, zap.Error(err)) + log.Error("delete failed", zap.Error(err)) } else { - endpoint.log.Info("deleted", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), remoteAddrLogField) + log.Info("deleted") } return &pb.PieceDeleteResponse{}, nil @@ -327,7 +330,11 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err return rpcstatus.Errorf(rpcstatus.Aborted, "not enough available disk space, have: %v, need: %v", availableSpace, limit.Limit) } - remoteAddrLogField := zap.String("Remote Address", getRemoteAddr(ctx)) + log := endpoint.log.With( + zap.Stringer("Piece ID", limit.PieceId), + zap.Stringer("Satellite ID", limit.SatelliteId), + zap.Stringer("Action", limit.Action), + zap.String("Remote Address", getRemoteAddr(ctx))) var pieceWriter *pieces.Writer // committed is set to true when the piece is committed. @@ -353,7 +360,7 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err mon.IntVal("upload_cancel_size_bytes").Observe(uploadSize) mon.IntVal("upload_cancel_duration_ns").Observe(uploadDuration) mon.FloatVal("upload_cancel_rate_bytes_per_sec").Observe(uploadRate) - endpoint.log.Info("upload canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize), remoteAddrLogField) + log.Info("upload canceled", zap.Int64("Size", uploadSize)) } else if err != nil { mon.Counter("upload_failure_count").Inc(1) mon.Meter("upload_failure_byte_meter").Mark64(uploadSize) @@ -362,11 +369,11 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err mon.FloatVal("upload_failure_rate_bytes_per_sec").Observe(uploadRate) if errors.Is(err, context.Canceled) { // Context cancellation is common in normal operation, and shouldn't throw a full error. - endpoint.log.Info("upload canceled (race lost or node shutdown)", zap.Stringer("Piece ID", limit.PieceId)) - endpoint.log.Debug("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err), zap.Int64("Size", uploadSize), remoteAddrLogField) + log.Info("upload canceled (race lost or node shutdown)") + log.Debug("upload failed", zap.Int64("Size", uploadSize), zap.Error(err)) } else { - endpoint.log.Error("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err), zap.Int64("Size", uploadSize), remoteAddrLogField) + log.Error("upload failed", zap.Int64("Size", uploadSize), zap.Error(err)) } } else { @@ -375,16 +382,11 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err mon.IntVal("upload_success_size_bytes").Observe(uploadSize) mon.IntVal("upload_success_duration_ns").Observe(uploadDuration) mon.FloatVal("upload_success_rate_bytes_per_sec").Observe(uploadRate) - endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize), remoteAddrLogField) + log.Info("uploaded", zap.Int64("Size", uploadSize)) } }() - endpoint.log.Info("upload started", - zap.Stringer("Piece ID", limit.PieceId), - zap.Stringer("Satellite ID", limit.SatelliteId), - zap.Stringer("Action", limit.Action), - zap.Int64("Available Space", availableSpace), - remoteAddrLogField) + log.Info("upload started", zap.Int64("Available Space", availableSpace)) mon.Counter("upload_started_count").Inc(1) pieceWriter, err = endpoint.store.Writer(ctx, limit.SatelliteId, limit.PieceId, hashAlgorithm) @@ -397,7 +399,7 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err if errs2.IsCanceled(cancelErr) { return } - endpoint.log.Error("error during canceling a piece write", zap.Error(cancelErr)) + log.Error("error during canceling a piece write", zap.Error(cancelErr)) } }() @@ -603,7 +605,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err actionSeriesTag := monkit.NewSeriesTag("action", limit.Action.String()) remoteAddr := getRemoteAddr(ctx) - endpoint.log.Info("download started", + log := endpoint.log.With( zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), @@ -611,12 +613,14 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err zap.Int64("Size", chunk.ChunkSize), zap.String("Remote Address", remoteAddr)) + log.Info("download started") + mon.Counter("download_started_count", actionSeriesTag).Inc(1) if err := endpoint.verifyOrderLimit(ctx, limit); err != nil { mon.Counter("download_failure_count", actionSeriesTag).Inc(1) mon.Meter("download_verify_orderlimit_failed", actionSeriesTag).Mark(1) - endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.String("Remote Address", remoteAddr), zap.Error(err)) + log.Error("download failed", zap.Error(err)) return err } @@ -638,7 +642,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err mon.IntVal("download_cancel_size_bytes", actionSeriesTag).Observe(downloadSize) mon.IntVal("download_cancel_duration_ns", actionSeriesTag).Observe(downloadDuration) mon.FloatVal("download_cancel_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate) - endpoint.log.Info("download canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr)) + log.Info("download canceled") } else if err != nil { mon.Counter("download_failure_count", actionSeriesTag).Inc(1) mon.Meter("download_failure_byte_meter", actionSeriesTag).Mark64(downloadSize) @@ -646,10 +650,10 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err mon.IntVal("download_failure_duration_ns", actionSeriesTag).Observe(downloadDuration) mon.FloatVal("download_failure_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate) if errors.Is(err, context.Canceled) { - endpoint.log.Info("download canceled (race lost or node shutdown)", zap.Stringer("Piece ID", limit.PieceId)) - endpoint.log.Debug("download canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr), zap.Error(err)) + log.Info("download canceled (race lost or node shutdown)") + log.Debug("download canceled", zap.Error(err)) } else { - endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr), zap.Error(err)) + log.Error("download failed", zap.Error(err)) } } else { mon.Counter("download_success_count", actionSeriesTag).Inc(1) @@ -657,7 +661,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err mon.IntVal("download_success_size_bytes", actionSeriesTag).Observe(downloadSize) mon.IntVal("download_success_duration_ns", actionSeriesTag).Observe(downloadDuration) mon.FloatVal("download_success_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate) - endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr)) + log.Info("downloaded") } mon.IntVal("download_orders_amount", actionSeriesTag).Observe(largestOrder.Amount) }() @@ -686,7 +690,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err } restoredFromTrash = true mon.Meter("download_file_in_trash", monkit.NewSeriesTag("namespace", limit.SatelliteId.String()), monkit.NewSeriesTag("piece_id", limit.PieceId.String())).Mark(1) - endpoint.log.Warn("file found in trash", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.String("Remote Address", remoteAddr)) + log.Warn("file found in trash") // try to open the file again pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId) @@ -701,7 +705,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err return } // no reason to report this error to the uplink - endpoint.log.Error("failed to close piece reader", zap.Error(err)) + log.Error("failed to close piece reader", zap.Error(err)) } }() @@ -710,7 +714,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err if message.Limit.Action == pb.PieceAction_GET_REPAIR { pieceHash, orderLimit, err := endpoint.store.GetHashAndLimit(ctx, limit.SatelliteId, limit.PieceId, pieceReader) if err != nil { - endpoint.log.Error("could not get hash and order limit", zap.Error(err)) + log.Error("could not get hash and order limit", zap.Error(err)) return rpcstatus.Wrap(rpcstatus.Internal, err) } @@ -718,7 +722,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err return stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit, RestoredFromTrash: restoredFromTrash}) }) if err != nil { - endpoint.log.Error("error sending hash and order limit", zap.Error(err)) + log.Error("error sending hash and order limit", zap.Error(err)) return rpcstatus.Wrap(rpcstatus.Internal, err) } } else if restoredFromTrash { @@ -727,7 +731,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err return stream.Send(&pb.PieceDownloadResponse{RestoredFromTrash: restoredFromTrash}) }) if err != nil { - endpoint.log.Error("error sending response", zap.Error(err)) + log.Error("error sending response", zap.Error(err)) return rpcstatus.Wrap(rpcstatus.Internal, err) } } @@ -761,7 +765,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err return nil // We don't need to return an error when client cancels. } - done, err := endpoint.sendData(ctx, stream, pieceReader, currentOffset, chunkSize) + done, err := endpoint.sendData(ctx, log, stream, pieceReader, currentOffset, chunkSize) if err != nil || done { return err } @@ -841,19 +845,19 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err return rpcstatus.Wrap(rpcstatus.Internal, errs.Combine(sendErr, recvErr)) } -func (endpoint *Endpoint) sendData(ctx context.Context, stream pb.DRPCPiecestore_DownloadStream, pieceReader *pieces.Reader, currentOffset int64, chunkSize int64) (result bool, err error) { +func (endpoint *Endpoint) sendData(ctx context.Context, log *zap.Logger, stream pb.DRPCPiecestore_DownloadStream, pieceReader *pieces.Reader, currentOffset int64, chunkSize int64) (result bool, err error) { defer mon.Task()(&ctx)(&err) chunkData := make([]byte, chunkSize) _, err = pieceReader.Seek(currentOffset, io.SeekStart) if err != nil { - endpoint.log.Error("error seeking on piecereader", zap.Error(err)) + log.Error("error seeking on piecereader", zap.Error(err)) return true, rpcstatus.Wrap(rpcstatus.Internal, err) } // ReadFull is required to ensure we are sending the right amount of data. _, err = io.ReadFull(pieceReader, chunkData) if err != nil { - endpoint.log.Error("error reading from piecereader", zap.Error(err)) + log.Error("error reading from piecereader", zap.Error(err)) return true, rpcstatus.Wrap(rpcstatus.Internal, err) } @@ -974,8 +978,10 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques CreatedBefore: retainReq.GetCreationDate(), Filter: filter, }) - if !queued { - endpoint.log.Debug("Retain job not queued for satellite", zap.Stringer("Satellite ID", peer.ID)) + if queued { + endpoint.log.Info("Retain job queued", zap.Stringer("Satellite ID", peer.ID)) + } else { + endpoint.log.Info("Retain job not queued (job already exists)", zap.Stringer("Satellite ID", peer.ID)) } return &pb.RetainResponse{}, nil