Skip to content

Commit

Permalink
storagenode/piecestore: bit of logging cleanup
Browse files Browse the repository at this point in the history
Change-Id: I3bdbef8db684554eca1ed6cb8f169ff67f74be5d
  • Loading branch information
thepaul authored and Storj Robot committed Feb 16, 2024
1 parent 991ddc9 commit 3f3dd53
Showing 1 changed file with 40 additions and 34 deletions.
74 changes: 40 additions & 34 deletions storagenode/piecestore/endpoint.go
Expand Up @@ -157,7 +157,10 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
return nil, rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
}

remoteAddrLogField := zap.String("Remote Address", getRemoteAddr(ctx))
log := endpoint.log.With(
zap.Stringer("Satellite ID", delete.Limit.SatelliteId),
zap.Stringer("Piece ID", delete.Limit.PieceId),
zap.String("Remote Address", getRemoteAddr(ctx)))

if err := endpoint.store.Delete(ctx, delete.Limit.SatelliteId, delete.Limit.PieceId); err != nil {
// explicitly ignoring error because the errors
Expand All @@ -166,9 +169,9 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
// report rpc status of internal server error or not found error,
// e.g. not found might happen when we get a deletion request after garbage
// collection has deleted it
endpoint.log.Error("delete failed", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), remoteAddrLogField, zap.Error(err))
log.Error("delete failed", zap.Error(err))
} else {
endpoint.log.Info("deleted", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), remoteAddrLogField)
log.Info("deleted")
}

return &pb.PieceDeleteResponse{}, nil
Expand Down Expand Up @@ -327,7 +330,11 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
return rpcstatus.Errorf(rpcstatus.Aborted, "not enough available disk space, have: %v, need: %v", availableSpace, limit.Limit)
}

remoteAddrLogField := zap.String("Remote Address", getRemoteAddr(ctx))
log := endpoint.log.With(
zap.Stringer("Piece ID", limit.PieceId),
zap.Stringer("Satellite ID", limit.SatelliteId),
zap.Stringer("Action", limit.Action),
zap.String("Remote Address", getRemoteAddr(ctx)))

var pieceWriter *pieces.Writer
// committed is set to true when the piece is committed.
Expand All @@ -353,7 +360,7 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
mon.IntVal("upload_cancel_size_bytes").Observe(uploadSize)
mon.IntVal("upload_cancel_duration_ns").Observe(uploadDuration)
mon.FloatVal("upload_cancel_rate_bytes_per_sec").Observe(uploadRate)
endpoint.log.Info("upload canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize), remoteAddrLogField)
log.Info("upload canceled", zap.Int64("Size", uploadSize))
} else if err != nil {
mon.Counter("upload_failure_count").Inc(1)
mon.Meter("upload_failure_byte_meter").Mark64(uploadSize)
Expand All @@ -362,11 +369,11 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
mon.FloatVal("upload_failure_rate_bytes_per_sec").Observe(uploadRate)
if errors.Is(err, context.Canceled) {
// Context cancellation is common in normal operation, and shouldn't throw a full error.
endpoint.log.Info("upload canceled (race lost or node shutdown)", zap.Stringer("Piece ID", limit.PieceId))
endpoint.log.Debug("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err), zap.Int64("Size", uploadSize), remoteAddrLogField)
log.Info("upload canceled (race lost or node shutdown)")
log.Debug("upload failed", zap.Int64("Size", uploadSize), zap.Error(err))

} else {
endpoint.log.Error("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err), zap.Int64("Size", uploadSize), remoteAddrLogField)
log.Error("upload failed", zap.Int64("Size", uploadSize), zap.Error(err))
}

} else {
Expand All @@ -375,16 +382,11 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
mon.IntVal("upload_success_size_bytes").Observe(uploadSize)
mon.IntVal("upload_success_duration_ns").Observe(uploadDuration)
mon.FloatVal("upload_success_rate_bytes_per_sec").Observe(uploadRate)
endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize), remoteAddrLogField)
log.Info("uploaded", zap.Int64("Size", uploadSize))
}
}()

endpoint.log.Info("upload started",
zap.Stringer("Piece ID", limit.PieceId),
zap.Stringer("Satellite ID", limit.SatelliteId),
zap.Stringer("Action", limit.Action),
zap.Int64("Available Space", availableSpace),
remoteAddrLogField)
log.Info("upload started", zap.Int64("Available Space", availableSpace))
mon.Counter("upload_started_count").Inc(1)

pieceWriter, err = endpoint.store.Writer(ctx, limit.SatelliteId, limit.PieceId, hashAlgorithm)
Expand All @@ -397,7 +399,7 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
if errs2.IsCanceled(cancelErr) {
return
}
endpoint.log.Error("error during canceling a piece write", zap.Error(cancelErr))
log.Error("error during canceling a piece write", zap.Error(cancelErr))
}
}()

Expand Down Expand Up @@ -603,20 +605,22 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
actionSeriesTag := monkit.NewSeriesTag("action", limit.Action.String())

remoteAddr := getRemoteAddr(ctx)
endpoint.log.Info("download started",
log := endpoint.log.With(
zap.Stringer("Piece ID", limit.PieceId),
zap.Stringer("Satellite ID", limit.SatelliteId),
zap.Stringer("Action", limit.Action),
zap.Int64("Offset", chunk.Offset),
zap.Int64("Size", chunk.ChunkSize),
zap.String("Remote Address", remoteAddr))

log.Info("download started")

mon.Counter("download_started_count", actionSeriesTag).Inc(1)

if err := endpoint.verifyOrderLimit(ctx, limit); err != nil {
mon.Counter("download_failure_count", actionSeriesTag).Inc(1)
mon.Meter("download_verify_orderlimit_failed", actionSeriesTag).Mark(1)
endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.String("Remote Address", remoteAddr), zap.Error(err))
log.Error("download failed", zap.Error(err))
return err
}

Expand All @@ -638,26 +642,26 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
mon.IntVal("download_cancel_size_bytes", actionSeriesTag).Observe(downloadSize)
mon.IntVal("download_cancel_duration_ns", actionSeriesTag).Observe(downloadDuration)
mon.FloatVal("download_cancel_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate)
endpoint.log.Info("download canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr))
log.Info("download canceled")
} else if err != nil {
mon.Counter("download_failure_count", actionSeriesTag).Inc(1)
mon.Meter("download_failure_byte_meter", actionSeriesTag).Mark64(downloadSize)
mon.IntVal("download_failure_size_bytes", actionSeriesTag).Observe(downloadSize)
mon.IntVal("download_failure_duration_ns", actionSeriesTag).Observe(downloadDuration)
mon.FloatVal("download_failure_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate)
if errors.Is(err, context.Canceled) {
endpoint.log.Info("download canceled (race lost or node shutdown)", zap.Stringer("Piece ID", limit.PieceId))
endpoint.log.Debug("download canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr), zap.Error(err))
log.Info("download canceled (race lost or node shutdown)")
log.Debug("download canceled", zap.Error(err))
} else {
endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr), zap.Error(err))
log.Error("download failed", zap.Error(err))
}
} else {
mon.Counter("download_success_count", actionSeriesTag).Inc(1)
mon.Meter("download_success_byte_meter", actionSeriesTag).Mark64(downloadSize)
mon.IntVal("download_success_size_bytes", actionSeriesTag).Observe(downloadSize)
mon.IntVal("download_success_duration_ns", actionSeriesTag).Observe(downloadDuration)
mon.FloatVal("download_success_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate)
endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr))
log.Info("downloaded")
}
mon.IntVal("download_orders_amount", actionSeriesTag).Observe(largestOrder.Amount)
}()
Expand Down Expand Up @@ -686,7 +690,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
}
restoredFromTrash = true
mon.Meter("download_file_in_trash", monkit.NewSeriesTag("namespace", limit.SatelliteId.String()), monkit.NewSeriesTag("piece_id", limit.PieceId.String())).Mark(1)
endpoint.log.Warn("file found in trash", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.String("Remote Address", remoteAddr))
log.Warn("file found in trash")

// try to open the file again
pieceReader, err = endpoint.store.Reader(ctx, limit.SatelliteId, limit.PieceId)
Expand All @@ -701,7 +705,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
return
}
// no reason to report this error to the uplink
endpoint.log.Error("failed to close piece reader", zap.Error(err))
log.Error("failed to close piece reader", zap.Error(err))
}
}()

Expand All @@ -710,15 +714,15 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
if message.Limit.Action == pb.PieceAction_GET_REPAIR {
pieceHash, orderLimit, err := endpoint.store.GetHashAndLimit(ctx, limit.SatelliteId, limit.PieceId, pieceReader)
if err != nil {
endpoint.log.Error("could not get hash and order limit", zap.Error(err))
log.Error("could not get hash and order limit", zap.Error(err))
return rpcstatus.Wrap(rpcstatus.Internal, err)
}

err = rpctimeout.Run(ctx, endpoint.config.StreamOperationTimeout, func(_ context.Context) (err error) {
return stream.Send(&pb.PieceDownloadResponse{Hash: &pieceHash, Limit: &orderLimit, RestoredFromTrash: restoredFromTrash})
})
if err != nil {
endpoint.log.Error("error sending hash and order limit", zap.Error(err))
log.Error("error sending hash and order limit", zap.Error(err))
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
} else if restoredFromTrash {
Expand All @@ -727,7 +731,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
return stream.Send(&pb.PieceDownloadResponse{RestoredFromTrash: restoredFromTrash})
})
if err != nil {
endpoint.log.Error("error sending response", zap.Error(err))
log.Error("error sending response", zap.Error(err))
return rpcstatus.Wrap(rpcstatus.Internal, err)
}
}
Expand Down Expand Up @@ -761,7 +765,7 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
return nil // We don't need to return an error when client cancels.
}

done, err := endpoint.sendData(ctx, stream, pieceReader, currentOffset, chunkSize)
done, err := endpoint.sendData(ctx, log, stream, pieceReader, currentOffset, chunkSize)
if err != nil || done {
return err
}
Expand Down Expand Up @@ -841,19 +845,19 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
return rpcstatus.Wrap(rpcstatus.Internal, errs.Combine(sendErr, recvErr))
}

func (endpoint *Endpoint) sendData(ctx context.Context, stream pb.DRPCPiecestore_DownloadStream, pieceReader *pieces.Reader, currentOffset int64, chunkSize int64) (result bool, err error) {
func (endpoint *Endpoint) sendData(ctx context.Context, log *zap.Logger, stream pb.DRPCPiecestore_DownloadStream, pieceReader *pieces.Reader, currentOffset int64, chunkSize int64) (result bool, err error) {
defer mon.Task()(&ctx)(&err)
chunkData := make([]byte, chunkSize)
_, err = pieceReader.Seek(currentOffset, io.SeekStart)
if err != nil {
endpoint.log.Error("error seeking on piecereader", zap.Error(err))
log.Error("error seeking on piecereader", zap.Error(err))
return true, rpcstatus.Wrap(rpcstatus.Internal, err)
}

// ReadFull is required to ensure we are sending the right amount of data.
_, err = io.ReadFull(pieceReader, chunkData)
if err != nil {
endpoint.log.Error("error reading from piecereader", zap.Error(err))
log.Error("error reading from piecereader", zap.Error(err))
return true, rpcstatus.Wrap(rpcstatus.Internal, err)
}

Expand Down Expand Up @@ -974,8 +978,10 @@ func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainReques
CreatedBefore: retainReq.GetCreationDate(),
Filter: filter,
})
if !queued {
endpoint.log.Debug("Retain job not queued for satellite", zap.Stringer("Satellite ID", peer.ID))
if queued {
endpoint.log.Info("Retain job queued", zap.Stringer("Satellite ID", peer.ID))
} else {
endpoint.log.Info("Retain job not queued (job already exists)", zap.Stringer("Satellite ID", peer.ID))
}

return &pb.RetainResponse{}, nil
Expand Down

0 comments on commit 3f3dd53

Please sign in to comment.