Skip to content

Commit

Permalink
storagenode/pieces/lazyfilewalker: fix gc-filewalker
Browse files Browse the repository at this point in the history
This change fixes the GC filewalker subprocess output not being piped
to the trashHandler.

Previously, the `stdout` of the subprocess was set to an
`io.ReadWriter`.
This caused issues because the `trashHandler` (which implements
`io.ReadWriter`) wasn't actively reading from the pipe, leading
to data being buffered and not processed.

This commit changes the `stdout` type to `io.Writer`.
This clarifies the intent and ensures the subprocess can write
its output without relying on the `trashHandler` to read from
the pipe.

Updates storj/storj-private#666

Change-Id: I6139018e0eaae5c0c7bee0dfdd5bbba69b948a99
  • Loading branch information
profclems committed Apr 4, 2024
1 parent 780df77 commit 047554d
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 90 deletions.
22 changes: 3 additions & 19 deletions storagenode/pieces/lazyfilewalker/process.go
Expand Up @@ -22,7 +22,6 @@ type process struct {
executable string
args []string

stdout io.ReadWriter
stderr io.Writer

cmd execwrapper.Command
Expand All @@ -37,20 +36,12 @@ func newProcess(cmd execwrapper.Command, log *zap.Logger, executable string, arg
executable: executable,
args: args,
stderr: &zapWrapper{log.Named("subprocess")},
stdout: &bytes.Buffer{},
}
}

// setStdout overrides the stdout writer for the process.
func (p *process) setStdout(w io.ReadWriter) *process {
p.stdout = w
return p
}

// run runs the process and decodes the response into the value pointed by `resp`.
// run runs the process.
// It returns an error if the Process fails to start, or if the Process exits with a non-zero status.
// NOTE: the `resp` value must be a pointer to a struct.
func (p *process) run(ctx context.Context, req, resp interface{}) (err error) {
func (p *process) run(ctx context.Context, stdout io.Writer, req interface{}) (err error) {
defer mon.Task()(&ctx)(&err)

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -74,7 +65,7 @@ func (p *process) run(ctx context.Context, req, resp interface{}) (err error) {
}

p.cmd.SetIn(&buf)
p.cmd.SetOut(p.stdout)
p.cmd.SetOut(stdout)
p.cmd.SetErr(p.stderr)

if err := p.cmd.Start(); err != nil {
Expand All @@ -96,12 +87,5 @@ func (p *process) run(ctx context.Context, req, resp interface{}) (err error) {

p.log.Info("subprocess finished successfully")

// Decode and receive the response data struct from the subprocess
decoder := json.NewDecoder(p.stdout)
if err := decoder.Decode(&resp); err != nil {
p.log.Error("failed to decode response from subprocess", zap.Error(err))
return errLazyFilewalker.Wrap(err)
}

return nil
}
86 changes: 17 additions & 69 deletions storagenode/pieces/lazyfilewalker/supervisor.go
Expand Up @@ -4,9 +4,7 @@
package lazyfilewalker

import (
"bytes"
"context"
"encoding/json"
"time"

"github.com/spacemonkeygo/monkit/v3"
Expand Down Expand Up @@ -128,11 +126,16 @@ func (fw *Supervisor) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, sa

log := fw.log.Named(UsedSpaceFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))

err = newProcess(fw.testingUsedSpaceCmd, log, fw.executable, fw.usedSpaceArgs).run(ctx, req, &resp)
stdout := newGenericWriter(log)
err = newProcess(fw.testingUsedSpaceCmd, log, fw.executable, fw.usedSpaceArgs).run(ctx, stdout, req)
if err != nil {
return 0, 0, err
}

if err := stdout.Decode(&resp); err != nil {
return 0, 0, err
}

return resp.PiecesTotal, resp.PiecesContentSize, nil
}

Expand All @@ -153,11 +156,16 @@ func (fw *Supervisor) WalkSatellitePiecesToTrash(ctx context.Context, satelliteI

log := fw.log.Named(GCFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))

err = newProcess(fw.testingGCCmd, log, fw.executable, fw.gcArgs).setStdout(newTrashHandler(log, trashFunc)).run(ctx, req, &resp)
stdout := newTrashHandler(log, trashFunc)
err = newProcess(fw.testingGCCmd, log, fw.executable, fw.gcArgs).run(ctx, stdout, req)
if err != nil {
return nil, 0, 0, err
}

if err := stdout.Decode(&resp); err != nil {
return nil, 0, 0, err
}

return resp.PieceIDs, resp.PiecesCount, resp.PiecesSkippedCount, nil
}

Expand All @@ -173,74 +181,14 @@ func (fw *Supervisor) WalkCleanupTrash(ctx context.Context, satelliteID storj.No

log := fw.log.Named(TrashCleanupFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))

err = newProcess(fw.testingTrashCleanupCmd, log, fw.executable, fw.trashCleanupArgs).run(ctx, req, &resp)
stdout := newGenericWriter(log)
err = newProcess(fw.testingTrashCleanupCmd, log, fw.executable, fw.trashCleanupArgs).run(ctx, stdout, req)
if err != nil {
return 0, nil, err
}

return resp.BytesDeleted, resp.KeysDeleted, nil
}

type trashHandler struct {
bytes.Buffer

log *zap.Logger
lineBuffer []byte

trashFunc func(pieceID storj.PieceID) error
}

func newTrashHandler(log *zap.Logger, trashFunc func(pieceID storj.PieceID) error) *trashHandler {
return &trashHandler{
log: log.Named("trash-handler"),
trashFunc: trashFunc,
}
}

func (t *trashHandler) Write(b []byte) (n int, err error) {
t.log.Debug("received data from subprocess")

n = len(b)
t.lineBuffer = append(t.lineBuffer, b...)
for {
if b, err = t.writeLine(t.lineBuffer); err != nil {
return n, err
}
if len(b) == len(t.lineBuffer) {
break
}

t.lineBuffer = b
}

return n, nil
}

func (t *trashHandler) writeLine(b []byte) (remaining []byte, err error) {
idx := bytes.IndexByte(b, '\n')
if idx < 0 {
return b, nil
}

b, remaining = b[:idx], b[idx+1:]

return remaining, t.processTrashPiece(b)
}

func (t *trashHandler) processTrashPiece(b []byte) error {
var resp GCFilewalkerResponse
if err := json.Unmarshal(b, &resp); err != nil {
t.log.Error("failed to unmarshal data from subprocess", zap.Error(err))
return err
}

if !resp.Completed {
for _, pieceID := range resp.PieceIDs {
t.log.Debug("trashing piece", zap.String("pieceID", pieceID.String()))
return t.trashFunc(pieceID)
}
if err := stdout.Decode(&resp); err != nil {
return 0, nil, err
}

_, err := t.Buffer.Write(b)
return err
return resp.BytesDeleted, resp.KeysDeleted, nil
}
4 changes: 2 additions & 2 deletions storagenode/pieces/lazyfilewalker/supervisor_test.go
Expand Up @@ -56,7 +56,7 @@ func TestTrashHandler_Write(t *testing.T) {
}

var resp GCFilewalkerResponse
err := json.NewDecoder(trashHandler).Decode(&resp)
err := trashHandler.Decode(&resp)
require.NoError(t, err)

// check that the final response is as expected
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestTrashHandler_Write(t *testing.T) {
}

var resp GCFilewalkerResponse
err := json.NewDecoder(trashHandler).Decode(&resp)
err := trashHandler.Decode(&resp)
require.NoError(t, err)

// check that the final response is as expected
Expand Down
117 changes: 117 additions & 0 deletions storagenode/pieces/lazyfilewalker/writer.go
@@ -0,0 +1,117 @@
// Copyright (C) 2024 Storj Labs, Inc.
// See LICENSE for copying information.

package lazyfilewalker

import (
"bytes"
"encoding/json"

"go.uber.org/zap"

"storj.io/common/storj"
)

type writer interface {
Write(p []byte) (n int, err error)
Decode(v interface{}) error
}

// check that genericWriter and trashHandler implement the writer interface.
var _ writer = (*genericWriter)(nil)
var _ writer = (*trashHandler)(nil)

// genericWriter is a writer that processes the output of the lazyfilewalker subprocess.
type genericWriter struct {
buf bytes.Buffer
log *zap.Logger
}

func newGenericWriter(log *zap.Logger) *genericWriter {
return &genericWriter{
log: log,
}
}

// Decode decodes the data from the buffer into the provided value.
func (w *genericWriter) Decode(v interface{}) error {
if err := json.NewDecoder(&w.buf).Decode(&v); err != nil {
w.log.Error("failed to decode response from subprocess", zap.Error(err))
return err
}
return nil
}

// Write writes the provided bytes to the buffer.
func (w *genericWriter) Write(b []byte) (n int, err error) {
return w.buf.Write(b)
}

// trashHandler is a writer that processes the output of the gc-filewalker subprocess.
type trashHandler struct {
buf *genericWriter
log *zap.Logger
lineBuffer []byte

trashFunc func(pieceID storj.PieceID) error
}

func newTrashHandler(log *zap.Logger, trashFunc func(pieceID storj.PieceID) error) *trashHandler {
return &trashHandler{
log: log.Named("trash-handler"),
trashFunc: trashFunc,
buf: newGenericWriter(log),
}
}

// Decode decodes the data from the buffer into the provided value.
func (t *trashHandler) Decode(v interface{}) error {
return t.buf.Decode(v)
}

// Write writes the provided bytes to the buffer.
func (t *trashHandler) Write(b []byte) (n int, err error) {
n = len(b)
t.lineBuffer = append(t.lineBuffer, b...)
for {
if b, err = t.writeLine(t.lineBuffer); err != nil {
return n, err
}
if len(b) == len(t.lineBuffer) {
break
}

t.lineBuffer = b
}

return n, nil
}

func (t *trashHandler) writeLine(b []byte) (remaining []byte, err error) {
idx := bytes.IndexByte(b, '\n')
if idx < 0 {
return b, nil
}

b, remaining = b[:idx], b[idx+1:]

return remaining, t.processTrashPiece(b)
}

func (t *trashHandler) processTrashPiece(b []byte) error {
var resp GCFilewalkerResponse
if err := json.Unmarshal(b, &resp); err != nil {
t.log.Error("failed to unmarshal data from subprocess", zap.Error(err))
return err
}

if !resp.Completed {
for _, pieceID := range resp.PieceIDs {
t.log.Debug("trashing piece", zap.String("pieceID", pieceID.String()))
return t.trashFunc(pieceID)
}
}

_, err := t.buf.Write(b)
return err
}

0 comments on commit 047554d

Please sign in to comment.