Skip to content

Commit

Permalink
storagenode/piecestore: improve logs for incoming requests
Browse files Browse the repository at this point in the history
- Adds "Remote Address" field to all INFO logs related to GET,
PUT, and DELETE requests
- Adds Offset and Size fields to all info logs related to GET
requests

Resolves #5404

Change-Id: I5dab1867619385362e5f1e0455dfab17d295a37a
  • Loading branch information
profclems authored and Storj Robot committed Jan 24, 2023
1 parent cb01aca commit 9596057
Showing 1 changed file with 36 additions and 11 deletions.
47 changes: 36 additions & 11 deletions storagenode/piecestore/endpoint.go
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"net"
"os"
"sync"
"sync/atomic"
Expand All @@ -29,6 +30,7 @@ import (
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/drpc"
"storj.io/drpc/drpcctx"
"storj.io/storj/storagenode/bandwidth"
"storj.io/storj/storagenode/monitor"
"storj.io/storj/storagenode/orders"
Expand Down Expand Up @@ -153,16 +155,18 @@ func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequ
return nil, rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
}

remoteAddrLogField := zap.String("Remote Address", getRemoteAddr(ctx))

if err := endpoint.store.Delete(ctx, delete.Limit.SatelliteId, delete.Limit.PieceId); err != nil {
// explicitly ignoring error because the errors

// TODO: https://storjlabs.atlassian.net/browse/V3-3222
// report rpc status of internal server error or not found error,
// e.g. not found might happen when we get a deletion request after garbage
// collection has deleted it
endpoint.log.Error("delete failed", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), zap.Error(err))
endpoint.log.Error("delete failed", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), remoteAddrLogField, zap.Error(err))
} else {
endpoint.log.Info("deleted", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId))
endpoint.log.Info("deleted", zap.Stringer("Satellite ID", delete.Limit.SatelliteId), zap.Stringer("Piece ID", delete.Limit.PieceId), remoteAddrLogField)
}

return &pb.PieceDeleteResponse{}, nil
Expand Down Expand Up @@ -321,6 +325,8 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
return rpcstatus.Errorf(rpcstatus.Aborted, "not enough available disk space, have: %v, need: %v", availableSpace, limit.Limit)
}

remoteAddrLogField := zap.String("Remote Address", getRemoteAddr(ctx))

var pieceWriter *pieces.Writer
// committed is set to true when the piece is committed.
// It is used to distinguish successful pieces where the uplink cancels the connections,
Expand All @@ -345,29 +351,30 @@ func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err err
mon.IntVal("upload_failure_size_bytes").Observe(uploadSize)
mon.IntVal("upload_failure_duration_ns").Observe(uploadDuration)
mon.FloatVal("upload_failure_rate_bytes_per_sec").Observe(uploadRate)
endpoint.log.Error("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err), zap.Int64("Size", uploadSize))
endpoint.log.Error("upload failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err), zap.Int64("Size", uploadSize), remoteAddrLogField)
} else if (errs2.IsCanceled(err) || drpc.ClosedError.Has(err)) && !committed {
mon.Counter("upload_cancel_count").Inc(1)
mon.Meter("upload_cancel_byte_meter").Mark64(uploadSize)
mon.IntVal("upload_cancel_size_bytes").Observe(uploadSize)
mon.IntVal("upload_cancel_duration_ns").Observe(uploadDuration)
mon.FloatVal("upload_cancel_rate_bytes_per_sec").Observe(uploadRate)
endpoint.log.Info("upload canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize))
endpoint.log.Info("upload canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize), remoteAddrLogField)
} else {
mon.Counter("upload_success_count").Inc(1)
mon.Meter("upload_success_byte_meter").Mark64(uploadSize)
mon.IntVal("upload_success_size_bytes").Observe(uploadSize)
mon.IntVal("upload_success_duration_ns").Observe(uploadDuration)
mon.FloatVal("upload_success_rate_bytes_per_sec").Observe(uploadRate)
endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize))
endpoint.log.Info("uploaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Size", uploadSize), remoteAddrLogField)
}
}()

endpoint.log.Info("upload started",
zap.Stringer("Piece ID", limit.PieceId),
zap.Stringer("Satellite ID", limit.SatelliteId),
zap.Stringer("Action", limit.Action),
zap.Int64("Available Space", availableSpace))
zap.Int64("Available Space", availableSpace),
remoteAddrLogField)
mon.Counter("upload_started_count").Inc(1)

pieceWriter, err = endpoint.store.Writer(ctx, limit.SatelliteId, limit.PieceId, hashAlgorithm)
Expand Down Expand Up @@ -569,13 +576,21 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err

actionSeriesTag := monkit.NewSeriesTag("action", limit.Action.String())

endpoint.log.Info("download started", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
remoteAddr := getRemoteAddr(ctx)
endpoint.log.Info("download started",
zap.Stringer("Piece ID", limit.PieceId),
zap.Stringer("Satellite ID", limit.SatelliteId),
zap.Stringer("Action", limit.Action),
zap.Int64("Offset", chunk.Offset),
zap.Int64("Size", chunk.ChunkSize),
zap.String("Remote Address", remoteAddr))

mon.Counter("download_started_count", actionSeriesTag).Inc(1)

if err := endpoint.verifyOrderLimit(ctx, limit); err != nil {
mon.Counter("download_failure_count", actionSeriesTag).Inc(1)
mon.Meter("download_verify_orderlimit_failed", actionSeriesTag).Mark(1)
endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err))
endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.String("Remote Address", remoteAddr), zap.Error(err))
return err
}

Expand All @@ -598,21 +613,21 @@ func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err
mon.IntVal("download_cancel_size_bytes", actionSeriesTag).Observe(downloadSize)
mon.IntVal("download_cancel_duration_ns", actionSeriesTag).Observe(downloadDuration)
mon.FloatVal("download_cancel_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate)
endpoint.log.Info("download canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
endpoint.log.Info("download canceled", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr))
} else if err != nil {
mon.Counter("download_failure_count", actionSeriesTag).Inc(1)
mon.Meter("download_failure_byte_meter", actionSeriesTag).Mark64(downloadSize)
mon.IntVal("download_failure_size_bytes", actionSeriesTag).Observe(downloadSize)
mon.IntVal("download_failure_duration_ns", actionSeriesTag).Observe(downloadDuration)
mon.FloatVal("download_failure_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate)
endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Error(err))
endpoint.log.Error("download failed", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr), zap.Error(err))
} else {
mon.Counter("download_success_count", actionSeriesTag).Inc(1)
mon.Meter("download_success_byte_meter", actionSeriesTag).Mark64(downloadSize)
mon.IntVal("download_success_size_bytes", actionSeriesTag).Observe(downloadSize)
mon.IntVal("download_success_duration_ns", actionSeriesTag).Observe(downloadDuration)
mon.FloatVal("download_success_rate_bytes_per_sec", actionSeriesTag).Observe(downloadRate)
endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action))
endpoint.log.Info("downloaded", zap.Stringer("Piece ID", limit.PieceId), zap.Stringer("Satellite ID", limit.SatelliteId), zap.Stringer("Action", limit.Action), zap.Int64("Offset", chunk.Offset), zap.Int64("Size", downloadSize), zap.String("Remote Address", remoteAddr))
}
}()

Expand Down Expand Up @@ -934,3 +949,13 @@ func (estimate *speedEstimation) EnsureLimit(transferred memory.Size, congested

return nil
}

// getRemoteAddr returns the remote address from the request context.
func getRemoteAddr(ctx context.Context) string {
if transport, ok := drpcctx.Transport(ctx); ok {
if conn, ok := transport.(net.Conn); ok {
return conn.RemoteAddr().String()
}
}
return ""
}

1 comment on commit 9596057

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/profitable-node-locations/21274/7

Please sign in to comment.