Skip to content

Commit

Permalink
storagenode/pieces: Add Trash and RestoreTrash to piecestore (#3575)
Browse files Browse the repository at this point in the history
* storagenode/pieces: Add Trash and RestoreTrash to piecestore

* Add index for expiration trash
  • Loading branch information
isaachess committed Nov 20, 2019
1 parent 6d728d6 commit 6aeddf2
Show file tree
Hide file tree
Showing 13 changed files with 632 additions and 112 deletions.
223 changes: 178 additions & 45 deletions pkg/pb/piecestore2.pb.go

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions pkg/pb/piecestore2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ service Piecestore {
rpc Download(stream PieceDownloadRequest) returns (stream PieceDownloadResponse) {}
rpc Delete(PieceDeleteRequest) returns (PieceDeleteResponse) {}
rpc Retain(RetainRequest) returns (RetainResponse);
rpc RestoreTrash(RestoreTrashRequest) returns (RestoreTrashResponse) {}
}

// Expected order of messages from uplink:
Expand Down Expand Up @@ -62,7 +63,7 @@ message PieceDownloadRequest {
int64 offset = 1;
int64 chunk_size = 2;
}

// request for the chunk
Chunk chunk = 3;
}
Expand Down Expand Up @@ -93,6 +94,9 @@ message RetainRequest {
message RetainResponse {
}

message RestoreTrashRequest {}
message RestoreTrashResponse {}

// PieceHeader is used in piece storage to keep track of piece attributes.
message PieceHeader {
enum FormatVersion {
Expand All @@ -113,4 +117,4 @@ message PieceHeader {
// the OrderLimit authorizing storage of this piece, as signed by the satellite and sent by
// the uplink
orders.OrderLimit order_limit = 5 [(gogoproto.nullable) = false];
}
}
3 changes: 3 additions & 0 deletions private/testplanet/uplink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ func (mock *piecestoreMock) Delete(ctx context.Context, delete *pb.PieceDeleteRe
func (mock *piecestoreMock) Retain(ctx context.Context, retain *pb.RetainRequest) (_ *pb.RetainResponse, err error) {
return nil, nil
}
func (mock *piecestoreMock) RestoreTrash(context.Context, *pb.RestoreTrashRequest) (*pb.RestoreTrashResponse, error) {
return nil, nil
}

func TestDownloadFromUnresponsiveNode(t *testing.T) {
testplanet.Run(t, testplanet.Config{
Expand Down
11 changes: 11 additions & 0 deletions proto.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5588,6 +5588,12 @@
{
"name": "RetainResponse"
},
{
"name": "RestoreTrashRequest"
},
{
"name": "RestoreTrashResponse"
},
{
"name": "PieceHeader",
"fields": [
Expand Down Expand Up @@ -5661,6 +5667,11 @@
"name": "Retain",
"in_type": "RetainRequest",
"out_type": "RetainResponse"
},
{
"name": "RestoreTrash",
"in_type": "RestoreTrashRequest",
"out_type": "RestoreTrashResponse"
}
]
}
Expand Down
48 changes: 48 additions & 0 deletions storagenode/pieces/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type PieceExpirationDB interface {
// DeleteFailed marks an expiration record as having experienced a failure in deleting the
// piece from the disk
DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, failedAt time.Time) error
// Trash marks a piece as in the trash
Trash(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error
// RestoreTrash marks all piece as not being in trash
RestoreTrash(ctx context.Context, satelliteID storj.NodeID) error
}

// V0PieceInfoDB stores meta information about pieces stored with storage format V0 (where
Expand Down Expand Up @@ -275,6 +279,50 @@ func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID
return Error.Wrap(err)
}

// Trash moves the specified piece to the blob trash. If necessary, it converts
// the v0 piece to a v1 piece. It also marks the item as "trashed" in the
// pieceExpirationDB.
func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)

// Check if the MaxFormatVersionSupported piece exists. If not, we assume
// this is an old piece version and attempt to migrate it.
_, err = store.blobs.StatWithStorageFormat(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
}, filestore.MaxFormatVersionSupported)
if err != nil && !errs.IsFunc(err, os.IsNotExist) {
return Error.Wrap(err)
}

if errs.IsFunc(err, os.IsNotExist) {
// MaxFormatVersionSupported does not exist, migrate.
err = store.MigrateV0ToV1(ctx, satellite, pieceID)
if err != nil {
return Error.Wrap(err)
}
}

err = store.expirationInfo.Trash(ctx, satellite, pieceID)
err = errs.Combine(err, store.blobs.Trash(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
}))

return Error.Wrap(err)
}

// RestoreTrash restores all pieces in the trash
func (store *Store) RestoreTrash(ctx context.Context, satelliteID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)

err = store.blobs.RestoreTrash(ctx, satelliteID.Bytes())
if err != nil {
return Error.Wrap(err)
}
return Error.Wrap(store.expirationInfo.RestoreTrash(ctx, satelliteID))
}

// MigrateV0ToV1 will migrate a piece stored with storage format v0 to storage
// format v1. If the piece is not stored as a v0 piece it will return an error.
// The follow failures are possible:
Expand Down
234 changes: 234 additions & 0 deletions storagenode/pieces/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,240 @@ func tryOpeningAPiece(ctx context.Context, t testing.TB, store *pieces.Store, sa
require.NoError(t, reader.Close())
}

func TestTrashAndRestore(t *testing.T) {
type testfile struct {
data []byte
formatVer storage.FormatVersion
}
type testpiece struct {
pieceID storj.PieceID
files []testfile
expiration time.Time
}
type testsatellite struct {
satelliteID storj.NodeID
pieces []testpiece
}

size := memory.KB

// Initialize pub/priv keys for signing piece hash
publicKeyBytes, err := hex.DecodeString("01eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
privateKeyBytes, err := hex.DecodeString("afefcccadb3d17b1f241b7c83f88c088b54c01b5a25409c13cbeca6bfa22b06901eaebcb418cd629d4c01f365f33006c9de3ce70cf04da76c39cdc993f48fe53")
require.NoError(t, err)
publicKey, err := storj.PiecePublicKeyFromBytes(publicKeyBytes)
require.NoError(t, err)
privateKey, err := storj.PiecePrivateKeyFromBytes(privateKeyBytes)
require.NoError(t, err)

satellites := []testsatellite{
{
satelliteID: testrand.NodeID(),
pieces: []testpiece{
{
expiration: time.Time{}, // no expiration
pieceID: testrand.PieceID(),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV0,
},
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
},
},
{
satelliteID: testrand.NodeID(),
pieces: []testpiece{
{
pieceID: testrand.PieceID(),
expiration: time.Now().Add(24 * time.Hour),
files: []testfile{
{
data: testrand.Bytes(size),
formatVer: filestore.FormatV1,
},
},
},
},
},
}

storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

blobs, err := filestore.NewAt(zaptest.NewLogger(t), ctx.Dir("store"))
require.NoError(t, err)
defer ctx.Check(blobs.Close)

v0PieceInfo, ok := db.V0PieceInfo().(pieces.V0PieceInfoDBForTest)
require.True(t, ok, "V0PieceInfoDB can not satisfy V0PieceInfoDBForTest")

store := pieces.NewStore(zaptest.NewLogger(t), blobs, v0PieceInfo, db.PieceExpirationDB(), nil)
tStore := &pieces.StoreForTest{store}

for _, satellite := range satellites {
now := time.Now()
for _, piece := range satellite.pieces {
// If test has expiration, add to expiration db
if !piece.expiration.IsZero() {
require.NoError(t, store.SetExpiration(ctx, satellite.satelliteID, piece.pieceID, piece.expiration))
}

for _, file := range piece.files {
w, err := tStore.WriterForFormatVersion(ctx, satellite.satelliteID, piece.pieceID, file.formatVer)
require.NoError(t, err)

_, err = w.Write(file.data)
require.NoError(t, err)

// Create, sign, and commit piece hash (to piece or v0PieceInfo)
pieceHash := &pb.PieceHash{
PieceId: piece.pieceID,
Hash: w.Hash(),
PieceSize: w.Size(),
Timestamp: now,
}
signedPieceHash, err := signing.SignUplinkPieceHash(ctx, privateKey, pieceHash)
require.NoError(t, err)
require.NoError(t, w.Commit(ctx, &pb.PieceHeader{
Hash: signedPieceHash.GetHash(),
CreationTime: signedPieceHash.GetTimestamp(),
Signature: signedPieceHash.GetSignature(),
}))

if file.formatVer == filestore.FormatV0 {
err = v0PieceInfo.Add(ctx, &pieces.Info{
SatelliteID: satellite.satelliteID,
PieceID: piece.pieceID,
PieceSize: signedPieceHash.GetPieceSize(),
PieceCreation: signedPieceHash.GetTimestamp(),
OrderLimit: &pb.OrderLimit{},
UplinkPieceHash: signedPieceHash,
})
require.NoError(t, err)
}

// Verify piece matches data, has correct signature and expiration
verifyPieceData(ctx, t, store, satellite.satelliteID, piece.pieceID, file.formatVer, file.data, piece.expiration, publicKey)

}

// Trash the piece
require.NoError(t, store.Trash(ctx, satellite.satelliteID, piece.pieceID))

// Confirm is missing
r, err := store.Reader(ctx, satellite.satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)

// Verify no expiry information is returned for this piece
if !piece.expiration.IsZero() {
infos, err := store.GetExpired(ctx, time.Now().Add(720*time.Hour), 1000)
require.NoError(t, err)
var found bool
for _, info := range infos {
if info.SatelliteID == satellite.satelliteID && info.PieceID == piece.pieceID {
found = true
}
}
require.False(t, found)
}
}
}

// Restore all pieces in the first satellite
require.NoError(t, store.RestoreTrash(ctx, satellites[0].satelliteID))

// Check that each piece for first satellite is back, that they are
// MaxFormatVersionSupported (regardless of which version they began
// with), and that signature matches.
for _, piece := range satellites[0].pieces {
lastFile := piece.files[len(piece.files)-1]
verifyPieceData(ctx, t, store, satellites[0].satelliteID, piece.pieceID, filestore.MaxFormatVersionSupported, lastFile.data, piece.expiration, publicKey)
}

// Confirm 2nd satellite pieces are still missing
for _, piece := range satellites[1].pieces {
r, err := store.Reader(ctx, satellites[1].satelliteID, piece.pieceID)
require.Error(t, err)
require.Nil(t, r)
}

})
}

func verifyPieceData(ctx context.Context, t testing.TB, store *pieces.Store, satelliteID storj.NodeID, pieceID storj.PieceID, formatVer storage.FormatVersion, expected []byte, expiration time.Time, publicKey storj.PiecePublicKey) {
r, err := store.ReaderWithStorageFormat(ctx, satelliteID, pieceID, formatVer)
require.NoError(t, err)

// Get piece hash, verify signature
var pieceHash *pb.PieceHash
if formatVer > filestore.FormatV0 {
header, err := r.GetPieceHeader()
require.NoError(t, err)
pieceHash = &pb.PieceHash{
PieceId: pieceID,
Hash: header.GetHash(),
PieceSize: r.Size(),
Timestamp: header.GetCreationTime(),
Signature: header.GetSignature(),
}
} else {
info, err := store.GetV0PieceInfo(ctx, satelliteID, pieceID)
require.NoError(t, err)
pieceHash = info.UplinkPieceHash
}
require.NoError(t, signing.VerifyUplinkPieceHashSignature(ctx, publicKey, pieceHash))

// Require piece data to match expected
buf, err := ioutil.ReadAll(r)
require.NoError(t, err)
require.NoError(t, r.Close())
assert.True(t, bytes.Equal(buf, expected))

// Require expiration to match expected
infos, err := store.GetExpired(ctx, time.Now().Add(720*time.Hour), 1000)
require.NoError(t, err)
var found bool
for _, info := range infos {
if info.SatelliteID == satelliteID && info.PieceID == pieceID {
found = true
}
}
if expiration.IsZero() {
require.False(t, found)
} else {
require.True(t, found)
}
}

func TestPieceVersionMigrate(t *testing.T) {
storagenodedbtest.Run(t, func(t *testing.T, db storagenode.DB) {
const pieceSize = 1024
Expand Down
Loading

0 comments on commit 6aeddf2

Please sign in to comment.