diff --git a/satellite/metainfo/objectdeletion/identifier.go b/satellite/metainfo/objectdeletion/identifier.go new file mode 100644 index 000000000000..ce36e5f69de2 --- /dev/null +++ b/satellite/metainfo/objectdeletion/identifier.go @@ -0,0 +1,86 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package objectdeletion + +import ( + "errors" + "strconv" + "strings" + + "github.com/zeebo/errs" + + "storj.io/common/storj" + "storj.io/common/uuid" +) + +// ObjectIdentifier contains information about an object +// that are needed for delete operation. +type ObjectIdentifier struct { + ProjectID uuid.UUID + Bucket []byte + EncryptedPath []byte +} + +// SegmentPath returns a raw path for a specific segment index. +func (id *ObjectIdentifier) SegmentPath(segmentIndex int64) ([]byte, error) { + if segmentIndex < lastSegmentIndex { + return nil, errors.New("invalid segment index") + } + segment := "l" + if segmentIndex > lastSegmentIndex { + segment = "s" + strconv.FormatInt(segmentIndex, 10) + } + + return []byte(storj.JoinPaths( + id.ProjectID.String(), + segment, + string(id.Bucket), + string(id.EncryptedPath), + )), nil +} + +// ParseSegmentPath parses a raw path and returns an +// object identifier from that path along with the path's segment index. +// example: /01// +func ParseSegmentPath(rawPath []byte) (ObjectIdentifier, int64, error) { + elements := storj.SplitPath(string(rawPath)) + if len(elements) < 4 { + return ObjectIdentifier{}, -1, errs.New("invalid path %q", string(rawPath)) + } + + projectID, err := uuid.FromString(elements[0]) + if err != nil { + return ObjectIdentifier{}, -1, errs.Wrap(err) + } + var segmentIndex int64 + if elements[1] == "l" { + segmentIndex = lastSegmentIndex + } else { + segmentIndex, err = strconv.ParseInt(elements[1][1:], 10, 64) // remove the strng `s` from segment index we got + + if err != nil { + return ObjectIdentifier{}, -1, errs.Wrap(err) + } + } + + return ObjectIdentifier{ + ProjectID: projectID, + Bucket: []byte(elements[2]), + EncryptedPath: []byte(storj.JoinPaths(elements[3:]...)), + }, segmentIndex, nil +} + +// Key returns a string concatenated by all object identifier fields plus 0. +// It's a unique string used to identify an object. +// It's not a valid key for retrieving pointers from metainfo database. +func (id *ObjectIdentifier) Key() string { + builder := strings.Builder{} + // we don't need the return value here + // Write will always return the length of the argument and nil error + _, _ = builder.Write(id.ProjectID[:]) + _, _ = builder.Write(id.Bucket) + _, _ = builder.Write(id.EncryptedPath) + + return builder.String() +} diff --git a/satellite/metainfo/objectdeletion/report.go b/satellite/metainfo/objectdeletion/report.go new file mode 100644 index 000000000000..29912cea090a --- /dev/null +++ b/satellite/metainfo/objectdeletion/report.go @@ -0,0 +1,54 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package objectdeletion + +import ( + "context" + + "go.uber.org/zap" +) + +// Report represents the deleteion status report. +type Report struct { + Deleted []*ObjectIdentifier + Failed []*ObjectIdentifier +} + +// HasFailures returns wether a delete operation has failures. +func (r Report) HasFailures() bool { + return len(r.Failed) > 0 +} + +// GenerateReport returns the result of a delete, success, or failure. +func GenerateReport(ctx context.Context, log *zap.Logger, requests []*ObjectIdentifier, deletedPaths [][]byte) Report { + defer mon.Task()(&ctx)(nil) + + report := Report{} + deletedObjects := make(map[string]*ObjectIdentifier) + for _, path := range deletedPaths { + if path == nil { + continue + } + id, _, err := ParseSegmentPath(path) + if err != nil { + log.Debug("failed to parse deleted segmnt path for report", + zap.String("Raw Segment Path", string(path)), + ) + continue + } + if _, ok := deletedObjects[id.Key()]; !ok { + deletedObjects[id.Key()] = &id + } + } + + // populate report with failed and deleted objects + for _, req := range requests { + if _, ok := deletedObjects[req.Key()]; !ok { + report.Failed = append(report.Failed, req) + } else { + report.Deleted = append(report.Deleted, req) + } + } + return report +} diff --git a/satellite/metainfo/objectdeletion/report_test.go b/satellite/metainfo/objectdeletion/report_test.go new file mode 100644 index 000000000000..76ee68dbdfa8 --- /dev/null +++ b/satellite/metainfo/objectdeletion/report_test.go @@ -0,0 +1,75 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package objectdeletion_test + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "github.com/zeebo/errs" + "go.uber.org/zap/zaptest" + + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/storj/satellite/metainfo/objectdeletion" +) + +func TestReport(t *testing.T) { + logger := zaptest.NewLogger(t) + + var testCases = []struct { + description string + numRequests int + numDeletedPaths int + expectedFailure bool + }{ + {"has-failure", 2, 1, true}, + {"all-deleted", 2, 2, false}, + } + + for _, tt := range testCases { + tt := tt + t.Run(tt.description, func(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + requests := createRequests(tt.numRequests) + deletedSegmentPaths, err := createDeletedSegmentPaths(requests, tt.numDeletedPaths) + require.NoError(t, err) + report := objectdeletion.GenerateReport(ctx, logger, requests, deletedSegmentPaths) + require.Equal(t, tt.expectedFailure, report.HasFailures()) + }) + } + +} + +func createDeletedSegmentPaths(requests []*objectdeletion.ObjectIdentifier, numDeleted int) ([][]byte, error) { + if numDeleted > len(requests) { + return nil, errs.New("invalid argument") + } + deletedSegmentPaths := make([][]byte, 0, numDeleted) + for i := 0; i < numDeleted; i++ { + path, err := requests[i].SegmentPath(int64(testrand.Intn(10))) + if err != nil { + return nil, err + } + deletedSegmentPaths = append(deletedSegmentPaths, path) + } + return deletedSegmentPaths, nil +} + +func createRequests(numRequests int) []*objectdeletion.ObjectIdentifier { + requests := make([]*objectdeletion.ObjectIdentifier, 0, numRequests) + + for i := 0; i < numRequests; i++ { + obj := objectdeletion.ObjectIdentifier{ + ProjectID: testrand.UUID(), + Bucket: []byte("test"), + EncryptedPath: []byte(strconv.Itoa(i) + "test"), + } + requests = append(requests, &obj) + } + + return requests +} diff --git a/satellite/metainfo/objectdeletion/service.go b/satellite/metainfo/objectdeletion/service.go new file mode 100644 index 000000000000..bbd8658dc997 --- /dev/null +++ b/satellite/metainfo/objectdeletion/service.go @@ -0,0 +1,349 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package objectdeletion + +import ( + "context" + + "github.com/spacemonkeygo/monkit/v3" + "github.com/zeebo/errs" + "go.uber.org/zap" + + "storj.io/common/context2" + "storj.io/common/pb" + "storj.io/common/storj" +) + +const ( + lastSegmentIndex = -1 + firstSegmentIndex = 0 +) + +var ( + mon = monkit.Package() + // Error is a general object deletion error. + Error = errs.Class("object deletion") +) + +// Config defines configuration options for Service. +type Config struct { + MaxObjectsPerRequest int `help:"maximum number of requests per batch" default:"100"` + ZombieSegmentsPerRequest int `help:"number of segments per request when looking for zombie segments" default:"3"` +} + +// Verify verifies configuration sanity. +func (config *Config) Verify() errs.Group { + var errlist errs.Group + + if config.MaxObjectsPerRequest <= 0 { + errlist.Add(Error.New("max requests per batch %d must be greater than 0", config.MaxObjectsPerRequest)) + } + + if config.ZombieSegmentsPerRequest <= 0 { + errlist.Add(Error.New("zombie segments per request %d must be greater than 0", config.ZombieSegmentsPerRequest)) + } + + return errlist +} + +// PointerDB stores pointers. +type PointerDB interface { + GetItems(ctx context.Context, paths [][]byte) ([]*pb.Pointer, error) + UnsynchronizedGetDel(ctx context.Context, paths [][]byte) (deletedPaths [][]byte, _ []*pb.Pointer, _ error) +} + +// Service implements the object deletion service +// +// architecture: Service +type Service struct { + log *zap.Logger + config Config + + pointers PointerDB +} + +// NewService returns new instance of Service. +func NewService(log *zap.Logger, pointerDB PointerDB, config Config) (*Service, error) { + if errs := config.Verify(); len(errs) > 0 { + return nil, errs.Err() + } + + return &Service{ + log: log, + config: config, + pointers: pointerDB, + }, nil +} + +// DeletePointers returns a list of pointers and their paths that are deleted. +// If a object is not found, we will consider it as a successful delete. +func (service *Service) DeletePointers(ctx context.Context, requests []*ObjectIdentifier) (_ []*pb.Pointer, _ [][]byte, err error) { + defer mon.Task()(&ctx, len(requests))(&err) + + // We should ignore client cancelling and always try to delete segments. + ctx = context2.WithoutCancellation(ctx) + + // get first and last segment to determine the object state + lastAndFirstSegmentsPath := [][]byte{} + for _, req := range requests { + lastSegmentPath, err := req.SegmentPath(lastSegmentIndex) + if err != nil { + return nil, nil, Error.Wrap(err) + } + firstSegmentPath, err := req.SegmentPath(firstSegmentIndex) + if err != nil { + return nil, nil, Error.Wrap(err) + } + lastAndFirstSegmentsPath = append(lastAndFirstSegmentsPath, + lastSegmentPath, + firstSegmentPath, + ) + } + + // Get pointers from the database + pointers, err := service.pointers.GetItems(ctx, lastAndFirstSegmentsPath) + if err != nil { + return nil, nil, Error.Wrap(err) + } + + states, err := CreateObjectStates(ctx, requests, pointers, lastAndFirstSegmentsPath) + if err != nil { + return nil, nil, Error.Wrap(err) + } + + pathsToDel, err := service.generateSegmentPathsForCompleteObjects(ctx, states) + if err != nil { + return nil, nil, Error.Wrap(err) + } + + zombiePaths, err := service.collectSegmentPathsForZombieObjects(ctx, states) + if err != nil { + return nil, nil, Error.Wrap(err) + } + pathsToDel = append(pathsToDel, zombiePaths...) + + // Delete pointers and fetch the piece ids. + // + // The deletion may fail in the database for an arbitrary reason. + // In that case we return an error and the pointers are left intact. + // + // The deletion may succeed in the database, but the connection may drop + // while the database is sending a response -- in that case we won't send + // the piecedeletion requests and and let garbage collection clean up those + // pieces. + deletedPaths, pointers, err := service.pointers.UnsynchronizedGetDel(ctx, pathsToDel) + if err != nil { + return nil, nil, Error.Wrap(err) + } + + // Update state map with other segments. + for i, pointer := range pointers { + id, segmentIdx, err := ParseSegmentPath(deletedPaths[i]) + if err != nil { + return pointers, deletedPaths, Error.Wrap(err) + } + + state, ok := states[id.Key()] + if !ok { + return pointers, deletedPaths, Error.New("object not found") + } + + if segmentIdx == lastSegmentIndex || segmentIdx == firstSegmentIndex { + continue + } + + state.OtherSegments = append(state.OtherSegments, pointer) + } + + // if object is missing, we can consider it as a successful delete + deletedPaths = append(deletedPaths, service.extractSegmentPathsForMissingObjects(ctx, states)...) + + return pointers, deletedPaths, nil +} + +// GroupPiecesByNodeID returns a map that contains pieces with node id as the key. +func GroupPiecesByNodeID(pointers []*pb.Pointer) map[storj.NodeID][]storj.PieceID { + // build piece deletion requests + piecesToDelete := map[storj.NodeID][]storj.PieceID{} + for _, p := range pointers { + if p == nil { + continue + } + if p.Type != pb.Pointer_REMOTE { + continue + } + + rootPieceID := p.GetRemote().RootPieceId + for _, piece := range p.GetRemote().GetRemotePieces() { + pieceID := rootPieceID.Derive(piece.NodeId, piece.PieceNum) + piecesToDelete[piece.NodeId] = append(piecesToDelete[piece.NodeId], pieceID) + } + } + + return piecesToDelete +} + +// generateSegmentPathsForCompleteObjects collects segment paths for objects that has last segment found in pointerDB. +func (service *Service) generateSegmentPathsForCompleteObjects(ctx context.Context, states map[string]*ObjectState) (_ [][]byte, err error) { + defer mon.Task()(&ctx)(&err) + + segmentPaths := [][]byte{} + + for _, state := range states { + switch state.Status() { + case ObjectMissing: + // nothing to do here + case ObjectMultiSegment: + // just delete the starting things, we already have the necessary information + lastSegmentPath, err := state.SegmentPath(lastSegmentIndex) + if err != nil { + return nil, Error.Wrap(err) + } + firstSegmentPath, err := state.SegmentPath(firstSegmentIndex) + if err != nil { + return nil, Error.Wrap(err) + } + + segmentPaths = append(segmentPaths, lastSegmentPath) + segmentPaths = append(segmentPaths, firstSegmentPath) + + largestSegmentIdx, err := numberOfSegmentsFromPointer(state.LastSegment) + if err != nil { + return nil, Error.Wrap(err) + } + // gather all segment paths that're not first or last segments + for index := largestSegmentIdx - 1; index > firstSegmentIndex; index-- { + path, err := state.SegmentPath(index) + if err != nil { + return nil, Error.Wrap(err) + } + segmentPaths = append(segmentPaths, path) + } + case ObjectSingleSegment: + // just add to segment path, we already have the necessary information + lastSegmentPath, err := state.SegmentPath(lastSegmentIndex) + if err != nil { + return nil, Error.Wrap(err) + } + segmentPaths = append(segmentPaths, lastSegmentPath) + case ObjectActiveOrZombie: + // we will handle it in a separate method + } + } + + return segmentPaths, nil +} + +// collectSegmentPathsForZombieObjects collects segment paths for objects that has no last segment found in pointerDB. +func (service *Service) collectSegmentPathsForZombieObjects(ctx context.Context, states map[string]*ObjectState) (_ [][]byte, err error) { + defer mon.Task()(&ctx)(&err) + + zombies := map[string]ObjectIdentifier{} + largestLoaded := map[string]int64{} + + segmentsToDel := [][]byte{} + + for _, state := range states { + if state.Status() == ObjectActiveOrZombie { + firstSegmentPath, err := state.SegmentPath(firstSegmentIndex) + if err != nil { + return nil, Error.Wrap(err) + } + segmentsToDel = append(segmentsToDel, firstSegmentPath) + + zombies[state.Key()] = state.ObjectIdentifier + largestLoaded[state.Key()] = int64(firstSegmentIndex) + } + } + + largestKnownSegment := int64(firstSegmentIndex) + for len(zombies) > 0 { + // Don't make requests for segments where we found the final segment. + for key, largest := range largestLoaded { + if largest != largestKnownSegment { + delete(largestLoaded, key) + delete(zombies, key) + } + } + // found all segments + if len(zombies) == 0 { + break + } + + // Request the next batch of segments. + startFrom := largestKnownSegment + 1 + largestKnownSegment += int64(service.config.ZombieSegmentsPerRequest) + + var zombieSegmentPaths [][]byte + for _, id := range zombies { + for i := startFrom; i <= largestKnownSegment; i++ { + path, err := id.SegmentPath(i) + if err != nil { + return nil, Error.Wrap(err) + } + zombieSegmentPaths = append(zombieSegmentPaths, path) + } + } + + // We are relying on the database to return the pointers in the same + // order as the paths we requested. + pointers, err := service.pointers.GetItems(ctx, zombieSegmentPaths) + if err != nil { + return nil, errs.Wrap(err) + } + + for i, p := range zombieSegmentPaths { + if pointers[i] == nil { + continue + } + id, segmentIdx, err := ParseSegmentPath(p) + if err != nil { + return nil, Error.Wrap(err) + } + + segmentsToDel = append(segmentsToDel, p) + largestLoaded[id.Key()] = max(largestLoaded[id.Key()], segmentIdx) + } + } + + return segmentsToDel, nil +} + +func (service *Service) extractSegmentPathsForMissingObjects(ctx context.Context, states map[string]*ObjectState) [][]byte { + paths := make([][]byte, 0, len(states)) + for _, state := range states { + if state.Status() == ObjectMissing { + lastSegmentPath, err := state.ObjectIdentifier.SegmentPath(lastSegmentIndex) + if err != nil { + // it shouldn't happen + service.log.Debug("failed to get segment path for missing object", + zap.Stringer("Project ID", state.ObjectIdentifier.ProjectID), + zap.String("Bucket", string(state.ObjectIdentifier.Bucket)), + zap.String("Encrypted Path", string(state.ObjectIdentifier.EncryptedPath)), + ) + continue + } + paths = append(paths, lastSegmentPath) + } + } + + return paths +} + +func max(a, b int64) int64 { + if a > b { + return a + } + return b +} + +func numberOfSegmentsFromPointer(pointer *pb.Pointer) (int64, error) { + meta := &pb.StreamMeta{} + err := pb.Unmarshal(pointer.Metadata, meta) + if err != nil { + return 0, err + } + + return meta.NumberOfSegments, nil +} diff --git a/satellite/metainfo/objectdeletion/service_test.go b/satellite/metainfo/objectdeletion/service_test.go new file mode 100644 index 000000000000..333770674700 --- /dev/null +++ b/satellite/metainfo/objectdeletion/service_test.go @@ -0,0 +1,371 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package objectdeletion_test + +import ( + "context" + "math/rand" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "github.com/zeebo/errs" + "go.uber.org/zap/zaptest" + + "storj.io/common/pb" + "storj.io/common/storj" + "storj.io/common/testcontext" + "storj.io/common/testrand" + "storj.io/common/uuid" + "storj.io/storj/satellite/metainfo/objectdeletion" +) + +func TestService_Delete_SingleObject(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + // mock the object that we want to delete + item := &objectdeletion.ObjectIdentifier{ + ProjectID: testrand.UUID(), + Bucket: []byte("bucketname"), + EncryptedPath: []byte("encrypted"), + } + + objectNotFound := &objectdeletion.ObjectIdentifier{ + ProjectID: testrand.UUID(), + Bucket: []byte("object-not-found"), + EncryptedPath: []byte("object-missing"), + } + + config := objectdeletion.Config{ + MaxObjectsPerRequest: 100, + ZombieSegmentsPerRequest: 3, + } + + var testCases = []struct { + segmentType string + isValidObject bool + largestSegmentIdx int + numPiecesPerSegment int32 + expectedPointersDeleted int + expectedPathDeleted int + expectedPiecesToDelete int32 + }{ + {"single-segment", true, 0, 3, 1, 1, 3}, + {"multi-segment", true, 5, 2, 6, 6, 12}, + {"inline-segment", true, 0, 0, 1, 1, 0}, + {"mixed-segment", true, 5, 3, 6, 6, 15}, + {"zombie-segment", true, 5, 2, 5, 5, 10}, + {"single-segment", false, 0, 3, 0, 1, 0}, + } + + for _, tt := range testCases { + tt := tt // quiet linting + t.Run(tt.segmentType, func(t *testing.T) { + pointerDBMock, err := newPointerDB([]*objectdeletion.ObjectIdentifier{item}, tt.segmentType, tt.largestSegmentIdx, tt.numPiecesPerSegment, false) + require.NoError(t, err) + + service, err := objectdeletion.NewService(zaptest.NewLogger(t), pointerDBMock, config) + require.NoError(t, err) + + pointers, deletedPaths, err := service.DeletePointers(ctx, []*objectdeletion.ObjectIdentifier{item}) + if !tt.isValidObject { + pointers, deletedPaths, err = service.DeletePointers(ctx, []*objectdeletion.ObjectIdentifier{objectNotFound}) + } + require.NoError(t, err) + require.Len(t, pointers, tt.expectedPointersDeleted) + require.Len(t, deletedPaths, tt.expectedPathDeleted) + + piecesToDeleteByNodes := objectdeletion.GroupPiecesByNodeID(pointers) + + totalPiecesToDelete := 0 + for _, pieces := range piecesToDeleteByNodes { + totalPiecesToDelete += len(pieces) + } + require.Equal(t, tt.expectedPiecesToDelete, int32(totalPiecesToDelete)) + }) + } +} + +func TestService_Delete_SingleObject_Failure(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + // mock the object that we want to delete + item := &objectdeletion.ObjectIdentifier{ + ProjectID: testrand.UUID(), + Bucket: []byte("bucketname"), + EncryptedPath: []byte("encrypted"), + } + + config := objectdeletion.Config{ + MaxObjectsPerRequest: 100, + ZombieSegmentsPerRequest: 3, + } + + var testCases = []struct { + segmentType string + largestSegmentIdx int + numPiecesPerSegment int32 + expectedPiecesToDelete int32 + }{ + {"single-segment", 0, 1, 0}, + {"mixed-segment", 5, 3, 0}, + {"zombie-segment", 5, 2, 0}, + } + + for _, tt := range testCases { + tt := tt // quiet linting + t.Run(tt.segmentType, func(t *testing.T) { + reqs := []*objectdeletion.ObjectIdentifier{item} + pointerDBMock, err := newPointerDB(reqs, tt.segmentType, tt.largestSegmentIdx, tt.numPiecesPerSegment, true) + require.NoError(t, err) + + service, err := objectdeletion.NewService(zaptest.NewLogger(t), pointerDBMock, config) + require.NoError(t, err) + + pointers, deletedPaths, err := service.DeletePointers(ctx, reqs) + require.Error(t, err) + require.Len(t, pointers, 0) + require.Len(t, deletedPaths, 0) + + piecesToDeleteByNodes := objectdeletion.GroupPiecesByNodeID(pointers) + + totalPiecesToDelete := 0 + for _, pieces := range piecesToDeleteByNodes { + totalPiecesToDelete += len(pieces) + } + require.Equal(t, tt.expectedPiecesToDelete, int32(totalPiecesToDelete)) + }) + } +} + +func TestService_Delete_MultipleObject(t *testing.T) { + ctx := testcontext.New(t) + defer ctx.Cleanup() + + items := make([]*objectdeletion.ObjectIdentifier, 0, 100) + + for i := 0; i < 10; i++ { + item := &objectdeletion.ObjectIdentifier{ + ProjectID: testrand.UUID(), + Bucket: []byte("bucketname"), + EncryptedPath: []byte("encrypted" + strconv.Itoa(i)), + } + items = append(items, item) + } + + config := objectdeletion.Config{ + MaxObjectsPerRequest: 100, + ZombieSegmentsPerRequest: 3, + } + + var testCases = []struct { + segmentType string + largestSegmentIdx int + numPiecesPerSegment int32 + expectedPointersDeleted int + expectedPiecesToDelete int32 + }{ + {"single-segment", 0, 3, 10, 30}, + {"multi-segment", 5, 2, 60, 120}, + {"inline-segment", 0, 0, 10, 0}, + {"mixed-segment", 5, 3, 60, 177}, + {"zombie-segment", 5, 2, 50, 100}, + } + + for _, tt := range testCases { + tt := tt // quiet linting + t.Run(tt.segmentType, func(t *testing.T) { + pointerDBMock, err := newPointerDB(items, tt.segmentType, tt.largestSegmentIdx, tt.numPiecesPerSegment, false) + require.NoError(t, err) + + service, err := objectdeletion.NewService(zaptest.NewLogger(t), pointerDBMock, config) + require.NoError(t, err) + + pointers, deletedPaths, err := service.DeletePointers(ctx, items) + require.NoError(t, err) + require.Len(t, pointers, tt.expectedPointersDeleted) + require.Len(t, deletedPaths, tt.expectedPointersDeleted) + + piecesToDeleteByNodes := objectdeletion.GroupPiecesByNodeID(pointers) + totalPiecesToDelete := 0 + for _, pieces := range piecesToDeleteByNodes { + totalPiecesToDelete += len(pieces) + } + require.Equal(t, tt.expectedPiecesToDelete, int32(totalPiecesToDelete)) + }) + } +} + +const ( + lastSegmentIdx = -1 + firstSegmentIdx = 0 +) + +type pointerDBMock struct { + pointers map[string]*pb.Pointer + hasError bool +} + +func newPointerDB(objects []*objectdeletion.ObjectIdentifier, segmentType string, numSegments int, numPiecesPerSegment int32, hasError bool) (*pointerDBMock, error) { + var ( + pointers []*pb.Pointer + err error + ) + + segmentMap := map[string]struct{ lastSegment, firstSegment, inlineSegment bool }{ + "single-segment": {true, false, false}, + "multi-segment": {true, true, false}, + "inline-segment": {true, false, true}, + "mixed-segment": {true, true, true}, + "zombie-segment": {false, true, false}, + } + + option, ok := segmentMap[segmentType] + if !ok { + return nil, errs.New("unsupported segment type") + } + + paths := [][]byte{} + for _, obj := range objects { + paths = append(paths, createPaths(obj, numSegments)...) + } + + pointers, err = createMockPointers(option.lastSegment, option.firstSegment, option.inlineSegment, paths, numPiecesPerSegment, numSegments) + if err != nil { + return nil, err + } + + pointerDB := &pointerDBMock{ + pointers: make(map[string]*pb.Pointer, len(paths)), + hasError: hasError, + } + for i, p := range paths { + pointerDB.pointers[string(p)] = pointers[i] + } + + return pointerDB, nil +} + +func (db *pointerDBMock) GetItems(ctx context.Context, paths [][]byte) ([]*pb.Pointer, error) { + if db.hasError { + return nil, errs.New("pointerDB failure") + } + pointers := make([]*pb.Pointer, len(paths)) + for i, p := range paths { + pointers[i] = db.pointers[string(p)] + } + return pointers, nil +} + +func (db *pointerDBMock) UnsynchronizedGetDel(ctx context.Context, paths [][]byte) ([][]byte, []*pb.Pointer, error) { + pointers := make([]*pb.Pointer, len(paths)) + for i, p := range paths { + pointers[i] = db.pointers[string(p)] + } + + rand.Shuffle(len(pointers), func(i, j int) { + pointers[i], pointers[j] = pointers[j], pointers[i] + paths[i], paths[j] = paths[j], paths[i] + }) + + return paths, pointers, nil +} + +func newPointer(pointerType pb.Pointer_DataType, numPiecesPerSegment int32) *pb.Pointer { + pointer := &pb.Pointer{ + Type: pointerType, + } + if pointerType == pb.Pointer_REMOTE { + remotePieces := make([]*pb.RemotePiece, 0, numPiecesPerSegment) + for i := int32(0); i < numPiecesPerSegment; i++ { + remotePieces = append(remotePieces, &pb.RemotePiece{ + PieceNum: i, + NodeId: testrand.NodeID(), + }) + } + pointer.Remote = &pb.RemoteSegment{ + RootPieceId: testrand.PieceID(), + RemotePieces: remotePieces, + } + } + return pointer +} + +func newLastSegmentPointer(pointerType pb.Pointer_DataType, numSegments int, numPiecesPerSegment int32) (*pb.Pointer, error) { + pointer := newPointer(pointerType, numPiecesPerSegment) + meta := &pb.StreamMeta{ + NumberOfSegments: int64(numSegments), + } + metaInBytes, err := pb.Marshal(meta) + if err != nil { + return nil, err + } + pointer.Metadata = metaInBytes + return pointer, nil +} + +func createMockPointers(hasLastSegment bool, hasFirstSegment bool, hasInlineSegments bool, paths [][]byte, numPiecesPerSegment int32, numSegments int) ([]*pb.Pointer, error) { + pointers := make([]*pb.Pointer, 0, len(paths)) + + isInlineAdded := false + for _, p := range paths { + _, segment, err := objectdeletion.ParseSegmentPath(p) + if err != nil { + return nil, err + } + + if segment == lastSegmentIdx { + if !hasLastSegment { + pointers = append(pointers, nil) + } else { + lastSegmentPointer, err := newLastSegmentPointer(pb.Pointer_REMOTE, numSegments, numPiecesPerSegment) + if err != nil { + return nil, err + } + pointers = append(pointers, lastSegmentPointer) + } + continue + } + if !hasFirstSegment && segment == firstSegmentIdx { + pointers = append(pointers, nil) + continue + } + if hasInlineSegments && !isInlineAdded { + pointers = append(pointers, newPointer(pb.Pointer_INLINE, 0)) + isInlineAdded = true + continue + } + pointers = append(pointers, newPointer(pb.Pointer_REMOTE, numPiecesPerSegment)) + } + + return pointers, nil +} + +func createPaths(object *objectdeletion.ObjectIdentifier, largestSegmentIdx int) [][]byte { + paths := [][]byte{} + for i := 0; i <= largestSegmentIdx; i++ { + segmentIdx := i + if segmentIdx == largestSegmentIdx { + segmentIdx = lastSegmentIdx + } + paths = append(paths, createPath(object.ProjectID, object.Bucket, segmentIdx, object.EncryptedPath)) + } + return paths +} + +func createPath(projectID uuid.UUID, bucket []byte, segmentIdx int, encryptedPath []byte) []byte { + segment := "l" + if segmentIdx > lastSegmentIdx { // lastSegmentIndex = -1 + segment = "s" + strconv.Itoa(segmentIdx) + } + + entries := make([]string, 0) + entries = append(entries, projectID.String()) + entries = append(entries, segment) + entries = append(entries, string(bucket)) + entries = append(entries, string(encryptedPath)) + return []byte(storj.JoinPaths(entries...)) +} diff --git a/satellite/metainfo/objectdeletion/state.go b/satellite/metainfo/objectdeletion/state.go new file mode 100644 index 000000000000..f16980fd7339 --- /dev/null +++ b/satellite/metainfo/objectdeletion/state.go @@ -0,0 +1,88 @@ +// Copyright (C) 2020 Storj Labs, Inc. +// See LICENSE for copying information. + +package objectdeletion + +import ( + "context" + + "storj.io/common/pb" +) + +// ObjectState determines how an object should be handled during +// a delete operation. +// It also stores pointers related to an object. +type ObjectState struct { + ObjectIdentifier + + LastSegment *pb.Pointer + ZeroSegment *pb.Pointer + + OtherSegments []*pb.Pointer +} + +// Status returns the current object status. +func (state *ObjectState) Status() ObjectStatus { + switch { + case state.LastSegment == nil && state.ZeroSegment == nil: + return ObjectMissing + case state.LastSegment != nil && state.ZeroSegment == nil: + return ObjectSingleSegment + case state.LastSegment != nil && state.ZeroSegment != nil: + return ObjectMultiSegment + case state.LastSegment == nil && state.ZeroSegment != nil: + return ObjectActiveOrZombie + default: + return ObjectMissing + } +} + +// ObjectStatus is used to determine how to retrieve segment information +// from the database. +type ObjectStatus byte + +const ( + // ObjectMissing represents when there's no object with that particular name + ObjectMissing = ObjectStatus(iota) + // ObjectMultiSegment represents a multi segment object + ObjectMultiSegment + // ObjectSingleSegment represents a single segment object + ObjectSingleSegment + // ObjectActiveOrZombie represents either an object is being uploaded, deleted or it's a zombie + ObjectActiveOrZombie +) + +// CreateObjectStates creates the current object states. +func CreateObjectStates(ctx context.Context, requests []*ObjectIdentifier, pointers []*pb.Pointer, paths [][]byte) (map[string]*ObjectState, error) { + + // Fetch headers to figure out the status of objects. + objects := make(map[string]*ObjectState) + for _, req := range requests { + objects[req.Key()] = &ObjectState{ + ObjectIdentifier: *req, + } + } + + for i, p := range paths { + // Update our state map. + id, segment, err := ParseSegmentPath(p) + if err != nil { + return nil, Error.Wrap(err) + } + + state, ok := objects[id.Key()] + if !ok { + return nil, Error.Wrap(err) + } + + switch segment { + case lastSegmentIndex: + state.LastSegment = pointers[i] + case firstSegmentIndex: + state.ZeroSegment = pointers[i] + default: + return nil, Error.New("pointerDB failure") + } + } + return objects, nil +} diff --git a/satellite/metainfo/service.go b/satellite/metainfo/service.go index 87878a4717f5..fbacc45969d2 100644 --- a/satellite/metainfo/service.go +++ b/satellite/metainfo/service.go @@ -224,6 +224,34 @@ func (s *Service) Get(ctx context.Context, path string) (_ *pb.Pointer, err erro return pointer, nil } +// GetItems gets decoded pointers from DB. +// The return value is in the same order as the argument paths. +func (s *Service) GetItems(ctx context.Context, paths [][]byte) (_ []*pb.Pointer, err error) { + defer mon.Task()(&ctx)(&err) + keys := make(storage.Keys, len(paths)) + for i := range paths { + keys[i] = paths[i] + } + pointerBytes, err := s.db.GetAll(ctx, keys) + if err != nil { + return nil, Error.Wrap(err) + } + + pointers := make([]*pb.Pointer, len(pointerBytes)) + for i, p := range pointerBytes { + if p == nil { + continue + } + var pointer pb.Pointer + err = pb.Unmarshal([]byte(p), &pointer) + if err != nil { + return nil, Error.Wrap(err) + } + pointers[i] = &pointer + } + return pointers, nil +} + // GetWithBytes gets the protocol buffers encoded and decoded pointer from the DB. func (s *Service) GetWithBytes(ctx context.Context, path string) (pointerBytes []byte, pointer *pb.Pointer, err error) { defer mon.Task()(&ctx)(&err) @@ -337,7 +365,37 @@ func (s *Service) Delete(ctx context.Context, path string, oldPointerBytes []byt return Error.Wrap(err) } -// UnsynchronizedDelete deletes from item from db without verifying whether the pointer has changed in the database. +// UnsynchronizedGetDel deletes items from db without verifying whether the pointers have changed in the database, +// and it returns deleted items. +func (s *Service) UnsynchronizedGetDel(ctx context.Context, paths [][]byte) ([][]byte, []*pb.Pointer, error) { + keys := make(storage.Keys, len(paths)) + for i := range paths { + keys[i] = paths[i] + } + + items, err := s.db.DeleteMultiple(ctx, keys) + if err != nil { + return nil, nil, Error.Wrap(err) + } + + pointerPaths := make([][]byte, 0, len(items)) + pointers := make([]*pb.Pointer, 0, len(items)) + + for _, item := range items { + data := &pb.Pointer{} + err = pb.Unmarshal(item.Value, data) + if err != nil { + return nil, nil, Error.Wrap(err) + } + + pointerPaths = append(pointerPaths, item.Key) + pointers = append(pointers, data) + } + + return pointerPaths, pointers, nil +} + +// UnsynchronizedDelete deletes item from db without verifying whether the pointer has changed in the database. func (s *Service) UnsynchronizedDelete(ctx context.Context, path string) (err error) { defer mon.Task()(&ctx)(&err) diff --git a/satellite/metainfo/service_test.go b/satellite/metainfo/service_test.go index 351823115e12..739f75ee4880 100644 --- a/satellite/metainfo/service_test.go +++ b/satellite/metainfo/service_test.go @@ -5,9 +5,12 @@ package metainfo_test import ( "context" + "fmt" + "strconv" "testing" "github.com/stretchr/testify/require" + "github.com/zeebo/errs" "storj.io/common/memory" "storj.io/common/pb" @@ -19,6 +22,8 @@ import ( "storj.io/storj/storage" ) +const lastSegmentIndex = -1 + func TestIterate(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, @@ -58,6 +63,81 @@ func TestIterate(t *testing.T) { }) } +// TestGetItems_ReturnValueOrder ensures the return value +// of GetItems will always be the same order as the requested paths. +// The test does following steps: +// - Uploads test data (multi-segment objects) +// - Gather all object paths with an extra invalid path at random position +// - Retrieve pointers using above paths +// - Ensure the nil pointer and last segment paths are in the same order as their +// corresponding paths. +func TestGetItems_ReturnValueOrder(t *testing.T) { + testplanet.Run(t, testplanet.Config{ + SatelliteCount: 1, StorageNodeCount: 6, UplinkCount: 1, + Reconfigure: testplanet.Reconfigure{ + Satellite: testplanet.Combine( + testplanet.ReconfigureRS(2, 2, 4, 4), + testplanet.MaxSegmentSize(3*memory.KiB), + ), + }, + }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { + + satellite := planet.Satellites[0] + uplinkPeer := planet.Uplinks[0] + + numItems := 5 + for i := 0; i < numItems; i++ { + path := fmt.Sprintf("test/path_%d", i) + err := uplinkPeer.Upload(ctx, satellite, "bucket", path, testrand.Bytes(15*memory.KiB)) + require.NoError(t, err) + } + + keys, err := satellite.Metainfo.Database.List(ctx, nil, numItems) + require.NoError(t, err) + + var paths = make([][]byte, 0, numItems+1) + var lastSegmentPathIndices []int + + // Random nil pointer + nilPointerIndex := testrand.Intn(numItems + 1) + + for i, key := range keys { + paths = append(paths, []byte(key.String())) + segmentIdx, err := parseSegmentPath([]byte(key.String())) + require.NoError(t, err) + + if segmentIdx == lastSegmentIndex { + lastSegmentPathIndices = append(lastSegmentPathIndices, i) + } + + // set a random path to be nil. + if nilPointerIndex == i { + paths[nilPointerIndex] = nil + } + } + + pointers, err := satellite.Metainfo.Service.GetItems(ctx, paths) + require.NoError(t, err) + + for i, p := range pointers { + if p == nil { + require.Equal(t, nilPointerIndex, i) + continue + } + + meta := pb.StreamMeta{} + metaInBytes := p.GetMetadata() + err = pb.Unmarshal(metaInBytes, &meta) + require.NoError(t, err) + + lastSegmentMeta := meta.GetLastSegmentMeta() + if lastSegmentMeta != nil { + require.Equal(t, lastSegmentPathIndices[i], i) + } + } + }) +} + func TestUpdatePiecesCheckDuplicates(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, StorageNodeCount: 3, UplinkCount: 1, @@ -152,3 +232,21 @@ func TestCountBuckets(t *testing.T) { require.Equal(t, 2, count) }) } + +func parseSegmentPath(segmentPath []byte) (segmentIndex int64, err error) { + elements := storj.SplitPath(string(segmentPath)) + if len(elements) < 4 { + return -1, errs.New("invalid path %q", string(segmentPath)) + } + + // var segmentIndex int64 + if elements[1] == "l" { + segmentIndex = lastSegmentIndex + } else { + segmentIndex, err = strconv.ParseInt(elements[1][1:], 10, 64) + if err != nil { + return lastSegmentIndex, errs.Wrap(err) + } + } + return segmentIndex, nil +}