diff --git a/satellite/internalpb/metainfo_sat.pb.go b/satellite/internalpb/metainfo_sat.pb.go index bec42f58d1d7..2dd6536e1dc7 100644 --- a/satellite/internalpb/metainfo_sat.pb.go +++ b/satellite/internalpb/metainfo_sat.pb.go @@ -36,13 +36,10 @@ type StreamID struct { SatelliteSignature []byte `protobuf:"bytes,9,opt,name=satellite_signature,json=satelliteSignature,proto3" json:"satellite_signature,omitempty"` StreamId []byte `protobuf:"bytes,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` Placement int32 `protobuf:"varint,13,opt,name=placement,proto3" json:"placement,omitempty"` - // temporary field to determine if we should go with new pending_objects table or - // fallback to pending object in objects table. - UsePendingObjectsTable bool `protobuf:"varint,14,opt,name=use_pending_objects_table,json=usePendingObjectsTable,proto3" json:"use_pending_objects_table,omitempty"` - Versioned bool `protobuf:"varint,15,opt,name=versioned,proto3" json:"versioned,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Versioned bool `protobuf:"varint,15,opt,name=versioned,proto3" json:"versioned,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *StreamID) Reset() { *m = StreamID{} } @@ -139,13 +136,6 @@ func (m *StreamID) GetPlacement() int32 { return 0 } -func (m *StreamID) GetUsePendingObjectsTable() bool { - if m != nil { - return m.UsePendingObjectsTable - } - return false -} - func (m *StreamID) GetVersioned() bool { if m != nil { return m.Versioned @@ -240,42 +230,41 @@ func init() { func init() { proto.RegisterFile("metainfo_sat.proto", fileDescriptor_47c60bd892d94aaf) } var fileDescriptor_47c60bd892d94aaf = []byte{ - // 589 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xcb, 0x6e, 0xd4, 0x30, - 0x14, 0x25, 0x0c, 0x33, 0x9d, 0xf1, 0x4c, 0x3b, 0x95, 0xfb, 0x90, 0x29, 0x45, 0x13, 0x15, 0x21, - 0x0d, 0x9b, 0x04, 0xb5, 0xab, 0x2e, 0xa9, 0x86, 0xc5, 0x88, 0x47, 0x4b, 0x5a, 0x36, 0x6c, 0x22, - 0x27, 0xbe, 0x8d, 0xdc, 0x26, 0x76, 0x64, 0x3b, 0xa8, 0x5d, 0xf2, 0x07, 0x7c, 0x16, 0x3b, 0xf6, - 0x2c, 0xca, 0xaf, 0xa0, 0x38, 0xaf, 0x91, 0x68, 0x17, 0xb0, 0xf3, 0x3d, 0xf7, 0xf8, 0xfa, 0xf8, - 0xdc, 0x83, 0x70, 0x06, 0x86, 0x72, 0x71, 0x29, 0x43, 0x4d, 0x8d, 0x97, 0x2b, 0x69, 0x24, 0xc6, - 0x9a, 0x1a, 0x48, 0x53, 0x6e, 0xc0, 0x6b, 0xba, 0x7b, 0x9b, 0x20, 0x62, 0x75, 0x9b, 0x1b, 0x2e, - 0x45, 0xc5, 0xda, 0x43, 0x89, 0x4c, 0x64, 0x7d, 0x9e, 0x25, 0x52, 0x26, 0x29, 0xf8, 0xb6, 0x8a, - 0x8a, 0x4b, 0xdf, 0xf0, 0x0c, 0xb4, 0xa1, 0x59, 0x5e, 0x13, 0x36, 0x9a, 0x41, 0x55, 0x7d, 0xf0, - 0xf3, 0x09, 0x1a, 0x9e, 0x1b, 0x05, 0x34, 0x5b, 0x2e, 0xf0, 0x2e, 0x1a, 0x44, 0x45, 0x7c, 0x0d, - 0x86, 0x38, 0xae, 0x33, 0x9f, 0x04, 0x75, 0x85, 0x5f, 0xa3, 0xed, 0xfa, 0x55, 0x60, 0xa1, 0x8c, - 0xae, 0x20, 0x36, 0xe1, 0x35, 0xdc, 0x92, 0xc7, 0x96, 0x85, 0xdb, 0xde, 0xa9, 0x6d, 0xbd, 0x83, - 0x5b, 0x4c, 0xd0, 0xda, 0x57, 0x50, 0x9a, 0x4b, 0x41, 0x7a, 0xae, 0x33, 0xef, 0x05, 0x4d, 0x89, - 0x3f, 0xa3, 0x9d, 0xee, 0x07, 0x61, 0x4e, 0x15, 0xcd, 0xc0, 0x80, 0xd2, 0x64, 0xe2, 0x3a, 0xf3, - 0xf1, 0xa1, 0xeb, 0xad, 0xfc, 0xef, 0x6d, 0x7b, 0x3c, 0x6b, 0x79, 0xc1, 0x36, 0xdc, 0x83, 0xe2, - 0x25, 0x5a, 0x8f, 0x15, 0x50, 0x3b, 0x94, 0x51, 0x03, 0xa4, 0x6f, 0xc7, 0xed, 0x79, 0x95, 0x21, - 0x5e, 0x63, 0x88, 0x77, 0xd1, 0x18, 0x72, 0x32, 0xfc, 0x71, 0x37, 0x7b, 0xf4, 0xfd, 0xf7, 0xcc, - 0x09, 0x26, 0xcd, 0xd5, 0x05, 0x35, 0x80, 0x3f, 0xa0, 0x29, 0xdc, 0xe4, 0x5c, 0xad, 0x0c, 0x1b, - 0xfc, 0xc3, 0xb0, 0x8d, 0xee, 0xb2, 0x1d, 0xf7, 0x0a, 0x6d, 0x66, 0x45, 0x6a, 0x78, 0x4e, 0x95, - 0xa9, 0xcd, 0x23, 0x63, 0xd7, 0x99, 0x0f, 0x83, 0x69, 0x8b, 0x57, 0xc6, 0x61, 0x1f, 0x6d, 0xb5, - 0x1b, 0x0f, 0x35, 0x4f, 0x04, 0x35, 0x85, 0x02, 0x32, 0xaa, 0x6c, 0x6e, 0x5b, 0xe7, 0x4d, 0x07, - 0x3f, 0x43, 0x23, 0x6d, 0x97, 0x17, 0x72, 0x46, 0x90, 0xa5, 0x0d, 0x2b, 0x60, 0xc9, 0xf0, 0x3e, - 0x1a, 0xe5, 0x29, 0x8d, 0x21, 0x03, 0x61, 0xc8, 0xba, 0xeb, 0xcc, 0xfb, 0x41, 0x07, 0xe0, 0x63, - 0xf4, 0xb4, 0xd0, 0x10, 0xe6, 0x20, 0x18, 0x17, 0x49, 0x2d, 0x4c, 0x87, 0x86, 0x46, 0x29, 0x90, - 0x0d, 0xab, 0x6f, 0xb7, 0xd0, 0x70, 0x56, 0xf5, 0x2b, 0x81, 0xfa, 0xa2, 0xec, 0x96, 0x83, 0xeb, - 0x6d, 0x02, 0x23, 0x53, 0x4b, 0xed, 0x80, 0x83, 0x6f, 0x3d, 0x34, 0x3a, 0x87, 0xa4, 0x7c, 0x64, - 0xb9, 0xc0, 0xc7, 0xab, 0x0a, 0x1d, 0x6b, 0xe3, 0xbe, 0xf7, 0x77, 0xac, 0xbd, 0x26, 0x83, 0x2b, - 0xfa, 0x67, 0x68, 0x6c, 0x3d, 0x13, 0x45, 0x16, 0x81, 0xb2, 0x61, 0xeb, 0x07, 0xa8, 0x84, 0x3e, - 0x5a, 0x04, 0x6f, 0xa3, 0x3e, 0x17, 0x0c, 0x6e, 0x6c, 0xc4, 0xfa, 0x41, 0x55, 0xe0, 0x23, 0xb4, - 0xae, 0xa4, 0x34, 0x61, 0xce, 0x21, 0x86, 0xf2, 0xd5, 0x32, 0x09, 0x93, 0x93, 0x69, 0xb9, 0xa0, - 0x5f, 0x77, 0xb3, 0xb5, 0xb3, 0x12, 0x5f, 0x2e, 0x82, 0x71, 0xc9, 0xaa, 0x0a, 0x86, 0x3f, 0xa1, - 0x1d, 0xa9, 0x78, 0xc2, 0x05, 0x4d, 0x43, 0xa9, 0x18, 0xa8, 0x30, 0xe5, 0x19, 0x37, 0x9a, 0x0c, - 0xdc, 0xde, 0x7c, 0x7c, 0xf8, 0xbc, 0x13, 0xfa, 0x86, 0x31, 0x05, 0x5a, 0x03, 0x3b, 0x2d, 0x69, - 0xef, 0x4b, 0x56, 0xb0, 0xd5, 0xdc, 0xed, 0xb0, 0x7b, 0x12, 0xb9, 0xf6, 0xdf, 0x89, 0x7c, 0x20, - 0x17, 0xc3, 0x87, 0x72, 0x71, 0xf2, 0xf2, 0xcb, 0x0b, 0x6d, 0xa4, 0xba, 0xf2, 0xb8, 0xf4, 0xed, - 0xc1, 0x6f, 0x49, 0x3e, 0x17, 0x06, 0x94, 0xa0, 0x69, 0x1e, 0x45, 0x03, 0xab, 0xe1, 0xe8, 0x4f, - 0x00, 0x00, 0x00, 0xff, 0xff, 0x81, 0xe3, 0xf1, 0xc8, 0x7c, 0x04, 0x00, 0x00, + // 563 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xcb, 0x6e, 0xd3, 0x40, + 0x14, 0xc5, 0xa4, 0x49, 0x9d, 0x49, 0x9a, 0x44, 0xd3, 0x14, 0x8d, 0x42, 0x51, 0xac, 0x22, 0x24, + 0xb3, 0xb1, 0x51, 0xbb, 0x62, 0x49, 0x14, 0x16, 0xe1, 0xd5, 0xe2, 0xc0, 0x86, 0x8d, 0x35, 0xb6, + 0x6f, 0xad, 0x69, 0x6d, 0x8f, 0x35, 0x33, 0x41, 0xcd, 0x92, 0x3f, 0x60, 0xcd, 0x17, 0xf1, 0x0d, + 0x2c, 0xca, 0xaf, 0x20, 0x8f, 0x5f, 0x91, 0x68, 0x17, 0xb0, 0x9b, 0x7b, 0xee, 0x99, 0x3b, 0x77, + 0xce, 0x39, 0x08, 0xa7, 0xa0, 0x28, 0xcb, 0x2e, 0xb9, 0x2f, 0xa9, 0x72, 0x72, 0xc1, 0x15, 0xc7, + 0x58, 0x52, 0x05, 0x49, 0xc2, 0x14, 0x38, 0x75, 0x77, 0x36, 0x81, 0x2c, 0x14, 0xdb, 0x5c, 0x31, + 0x9e, 0x95, 0xac, 0x19, 0x8a, 0x79, 0xcc, 0xab, 0xf3, 0x3c, 0xe6, 0x3c, 0x4e, 0xc0, 0xd5, 0x55, + 0xb0, 0xb9, 0x74, 0x15, 0x4b, 0x41, 0x2a, 0x9a, 0xe6, 0x15, 0x61, 0x54, 0x0f, 0x2a, 0xeb, 0x93, + 0x1f, 0x7b, 0xc8, 0x5c, 0x2b, 0x01, 0x34, 0x5d, 0x2d, 0xf1, 0x23, 0xd4, 0x0b, 0x36, 0xe1, 0x35, + 0x28, 0x62, 0x58, 0x86, 0x3d, 0xf4, 0xaa, 0x0a, 0xbf, 0x40, 0xd3, 0xea, 0x55, 0x88, 0x7c, 0x1e, + 0x5c, 0x41, 0xa8, 0xfc, 0x6b, 0xd8, 0x92, 0x87, 0x9a, 0x85, 0x9b, 0xde, 0xb9, 0x6e, 0xbd, 0x85, + 0x2d, 0x26, 0x68, 0xff, 0x2b, 0x08, 0xc9, 0x78, 0x46, 0x3a, 0x96, 0x61, 0x77, 0xbc, 0xba, 0xc4, + 0x9f, 0xd1, 0x51, 0xfb, 0x03, 0x3f, 0xa7, 0x82, 0xa6, 0xa0, 0x40, 0x48, 0x32, 0xb4, 0x0c, 0x7b, + 0x70, 0x6a, 0x39, 0x3b, 0xff, 0x7b, 0xdd, 0x1c, 0x2f, 0x1a, 0x9e, 0x37, 0x85, 0x3b, 0x50, 0xbc, + 0x42, 0x07, 0xa1, 0x00, 0xaa, 0x87, 0x46, 0x54, 0x01, 0xe9, 0xea, 0x71, 0x33, 0xa7, 0x14, 0xc4, + 0xa9, 0x05, 0x71, 0x3e, 0xd5, 0x82, 0x2c, 0xcc, 0x9f, 0xb7, 0xf3, 0x07, 0xdf, 0x7f, 0xcf, 0x0d, + 0x6f, 0x58, 0x5f, 0x5d, 0x52, 0x05, 0xf8, 0x3d, 0x1a, 0xc3, 0x4d, 0xce, 0xc4, 0xce, 0xb0, 0xde, + 0x3f, 0x0c, 0x1b, 0xb5, 0x97, 0xf5, 0xb8, 0xe7, 0x68, 0x92, 0x6e, 0x12, 0xc5, 0x72, 0x2a, 0x54, + 0x25, 0x1e, 0x19, 0x58, 0x86, 0x6d, 0x7a, 0xe3, 0x06, 0x2f, 0x85, 0xc3, 0x2e, 0x3a, 0x6c, 0x1c, + 0xf7, 0x25, 0x8b, 0x33, 0xaa, 0x36, 0x02, 0x48, 0xbf, 0x94, 0xb9, 0x69, 0xad, 0xeb, 0x0e, 0x7e, + 0x8c, 0xfa, 0x52, 0x9b, 0xe7, 0xb3, 0x88, 0x20, 0x4d, 0x33, 0x4b, 0x60, 0x15, 0xe1, 0x63, 0xd4, + 0xcf, 0x13, 0x1a, 0x42, 0x0a, 0x99, 0x22, 0x07, 0x96, 0x61, 0x77, 0xbd, 0x16, 0x28, 0xba, 0x95, + 0x25, 0x10, 0x91, 0xb1, 0xde, 0xa7, 0x05, 0xde, 0xec, 0x99, 0xa3, 0xc9, 0xf8, 0xe4, 0x5b, 0x07, + 0xf5, 0xd7, 0x10, 0x17, 0xfc, 0xd5, 0x12, 0xbf, 0xdc, 0x7d, 0xcc, 0xd0, 0x8a, 0x1c, 0x3b, 0x7f, + 0x27, 0xd4, 0xa9, 0xe3, 0xb4, 0xb3, 0xca, 0x1c, 0x0d, 0xf4, 0xf7, 0xb3, 0x4d, 0x1a, 0x80, 0xd0, + 0xb9, 0xe9, 0x7a, 0xa8, 0x80, 0x3e, 0x68, 0x04, 0x4f, 0x51, 0x97, 0x65, 0x11, 0xdc, 0xe8, 0xb4, + 0x74, 0xbd, 0xb2, 0xc0, 0x67, 0xe8, 0x40, 0x70, 0xae, 0xfc, 0x9c, 0x41, 0x08, 0xc5, 0xab, 0x85, + 0xa9, 0xc3, 0xc5, 0xb8, 0xd0, 0xfa, 0xd7, 0xed, 0x7c, 0xff, 0xa2, 0xc0, 0x57, 0x4b, 0x6f, 0x50, + 0xb0, 0xca, 0x22, 0xc2, 0x1f, 0xd1, 0x11, 0x17, 0x2c, 0x66, 0x19, 0x4d, 0x7c, 0x2e, 0x22, 0x10, + 0x7e, 0xc2, 0x52, 0xa6, 0x24, 0xe9, 0x59, 0x1d, 0x7b, 0x70, 0xfa, 0xa4, 0x5d, 0xf4, 0x55, 0x14, + 0x09, 0x90, 0x12, 0xa2, 0xf3, 0x82, 0xf6, 0xae, 0x60, 0x79, 0x87, 0xf5, 0xdd, 0x16, 0xbb, 0x23, + 0x5c, 0xfb, 0xff, 0x1d, 0xae, 0x7b, 0x2c, 0x36, 0xef, 0xb3, 0x78, 0xf1, 0xec, 0xcb, 0x53, 0xa9, + 0xb8, 0xb8, 0x72, 0x18, 0x77, 0xf5, 0xc1, 0x6d, 0x48, 0x2e, 0xcb, 0x14, 0x88, 0x8c, 0x26, 0x79, + 0x10, 0xf4, 0xf4, 0x0e, 0x67, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x0b, 0x41, 0x2d, 0x1d, 0x47, + 0x04, 0x00, 0x00, } diff --git a/satellite/internalpb/metainfo_sat.proto b/satellite/internalpb/metainfo_sat.proto index e2508110f1c8..d9c70a576a24 100644 --- a/satellite/internalpb/metainfo_sat.proto +++ b/satellite/internalpb/metainfo_sat.proto @@ -12,6 +12,7 @@ import "google/protobuf/timestamp.proto"; import "metainfo.proto"; message StreamID { + reserved 14; bytes bucket = 1; bytes encrypted_object_key = 2; int64 version = 3; @@ -29,10 +30,6 @@ message StreamID { int32 placement = 13; - // temporary field to determine if we should go with new pending_objects table or - // fallback to pending object in objects table. - bool use_pending_objects_table = 14; - bool versioned = 15; } diff --git a/satellite/metainfo/config.go b/satellite/metainfo/config.go index d49050357774..62aeb0d1975a 100644 --- a/satellite/metainfo/config.go +++ b/satellite/metainfo/config.go @@ -4,7 +4,6 @@ package metainfo import ( - "encoding/binary" "fmt" "strconv" "strings" @@ -146,11 +145,6 @@ type Config struct { ServerSideCopy bool `help:"enable code for server-side copy, deprecated. please leave this to true." default:"true"` ServerSideCopyDisabled bool `help:"disable already enabled server-side copy. this is because once server side copy is enabled, delete code should stay changed, even if you want to disable server side copy" default:"false"` - UsePendingObjectsTable bool `help:"enable new flow for upload which is using pending_objects table" default:"false"` - // flags to simplify testing by enabling feature only for specific projects or for for specific percentage of projects - UsePendingObjectsTableProjects []string `help:"list of projects which will have UsePendingObjectsTable feature flag enabled" default:"" hidden:"true"` - UsePendingObjectsTableRollout int `help:"percentage (0-100) of projects which should have this feature enabled" default:"0" hidden:"true"` - UseBucketLevelObjectVersioning bool `help:"enable the use of bucket level object versioning" default:"false"` // flag to simplify testing by enabling bucket level versioning feature only for specific projects UseBucketLevelObjectVersioningProjects []string `help:"list of projects which will have UseBucketLevelObjectVersioning feature flag enabled" default:"" hidden:"true"` @@ -177,28 +171,12 @@ func (c Config) Metabase(applicationName string) metabase.Config { type ExtendedConfig struct { Config - usePendingObjectsTableProjects []uuid.UUID - usePendingObjectsTableRolloutCursor uuid.UUID - useBucketLevelObjectVersioningProjects []uuid.UUID } // NewExtendedConfig creates new instance of extended config. func NewExtendedConfig(config Config) (_ ExtendedConfig, err error) { extendedConfig := ExtendedConfig{Config: config} - for _, projectIDString := range config.UsePendingObjectsTableProjects { - projectID, err := uuid.FromString(projectIDString) - if err != nil { - return ExtendedConfig{}, err - } - extendedConfig.usePendingObjectsTableProjects = append(extendedConfig.usePendingObjectsTableProjects, projectID) - } - - extendedConfig.usePendingObjectsTableRolloutCursor, err = createRolloutCursor(config.UsePendingObjectsTableRollout) - if err != nil { - return ExtendedConfig{}, err - } - for _, projectIDString := range config.UseBucketLevelObjectVersioningProjects { projectID, err := uuid.FromString(projectIDString) if err != nil { @@ -210,27 +188,6 @@ func NewExtendedConfig(config Config) (_ ExtendedConfig, err error) { return extendedConfig, nil } -// UsePendingObjectsTableByProject checks if UsePendingObjectsTable should be enabled for specific project. -func (ec ExtendedConfig) UsePendingObjectsTableByProject(projectID uuid.UUID) bool { - // if its globally enabled don't look at projects - if ec.UsePendingObjectsTable { - return true - } - for _, p := range ec.usePendingObjectsTableProjects { - if p == projectID { - return true - } - } - - if !ec.usePendingObjectsTableRolloutCursor.IsZero() { - if projectID.Less(ec.usePendingObjectsTableRolloutCursor) { - return true - } - } - - return false -} - // UseBucketLevelObjectVersioningByProject checks if UseBucketLevelObjectVersioning should be enabled for specific project. func (ec ExtendedConfig) UseBucketLevelObjectVersioningByProject(projectID uuid.UUID) bool { // if its globally enabled don't look at projects @@ -245,16 +202,3 @@ func (ec ExtendedConfig) UseBucketLevelObjectVersioningByProject(projectID uuid. return false } - -func createRolloutCursor(percentage int) (uuid.UUID, error) { - if percentage <= 0 { - return uuid.UUID{}, nil - } else if percentage >= 100 { - return uuid.Max(), nil - } - - cursorValue := uint32(1 << 32 * (float32(percentage) / 100)) - bytes := make([]byte, 16) - binary.BigEndian.PutUint32(bytes, cursorValue) - return uuid.FromBytes(bytes) -} diff --git a/satellite/metainfo/config_test.go b/satellite/metainfo/config_test.go index 0546894ccbd4..126a309ca9a4 100644 --- a/satellite/metainfo/config_test.go +++ b/satellite/metainfo/config_test.go @@ -99,87 +99,6 @@ func TestRSConfigValidation(t *testing.T) { } } -func TestExtendedConfig_UsePendingObjectsTable(t *testing.T) { - projectA := testrand.UUID() - projectB := testrand.UUID() - projectC := testrand.UUID() - config, err := metainfo.NewExtendedConfig(metainfo.Config{ - UsePendingObjectsTable: false, - UsePendingObjectsTableProjects: []string{ - projectA.String(), - projectB.String(), - }, - }) - require.NoError(t, err) - - require.True(t, config.UsePendingObjectsTableByProject(projectA)) - require.True(t, config.UsePendingObjectsTableByProject(projectB)) - require.False(t, config.UsePendingObjectsTableByProject(projectC)) - - config, err = metainfo.NewExtendedConfig(metainfo.Config{ - UsePendingObjectsTable: true, - UsePendingObjectsTableProjects: []string{ - projectA.String(), - }, - }) - require.NoError(t, err) - - require.True(t, config.UsePendingObjectsTableByProject(projectA)) - require.True(t, config.UsePendingObjectsTableByProject(projectB)) - require.True(t, config.UsePendingObjectsTableByProject(projectC)) - - config, err = metainfo.NewExtendedConfig(metainfo.Config{ - UsePendingObjectsTable: false, - UsePendingObjectsTableProjects: []string{ - "01000000-0000-0000-0000-000000000000", - }, - }) - require.NoError(t, err) - require.True(t, config.UsePendingObjectsTableByProject(uuid.UUID{1})) -} - -func TestExtendedConfig_UsePendingObjectsTableRollout(t *testing.T) { - uuidA := testrand.UUID() - config, err := metainfo.NewExtendedConfig(metainfo.Config{ - UsePendingObjectsTable: false, - UsePendingObjectsTableRollout: 0, - }) - require.NoError(t, err) - - require.False(t, config.UsePendingObjectsTableByProject(uuidA)) - require.False(t, config.UsePendingObjectsTableByProject(makeUUID("00000001-0000-0000-0000-000000000000"))) - require.False(t, config.UsePendingObjectsTableByProject(makeUUID("FFFFFFFF-0000-0000-0000-000000000000"))) - - config, err = metainfo.NewExtendedConfig(metainfo.Config{ - UsePendingObjectsTable: false, - UsePendingObjectsTableRollout: 50, - }) - require.NoError(t, err) - - require.True(t, config.UsePendingObjectsTableByProject(makeUUID("00000001-0000-0000-0000-000000000000"))) - require.False(t, config.UsePendingObjectsTableByProject(makeUUID("FFFFFFFF-0000-0000-0000-000000000000"))) - - config, err = metainfo.NewExtendedConfig(metainfo.Config{ - UsePendingObjectsTable: false, - UsePendingObjectsTableRollout: 25, - }) - require.NoError(t, err) - - require.True(t, config.UsePendingObjectsTableByProject(makeUUID("00000001-0000-0000-0000-000000000000"))) - require.True(t, config.UsePendingObjectsTableByProject(makeUUID("3FFFFFFF-0000-0000-0000-000000000000"))) - require.False(t, config.UsePendingObjectsTableByProject(makeUUID("40000000-0000-0000-0000-000000000000"))) - require.False(t, config.UsePendingObjectsTableByProject(makeUUID("FFFFFFFF-0000-0000-0000-000000000000"))) - - config, err = metainfo.NewExtendedConfig(metainfo.Config{ - UsePendingObjectsTable: false, - UsePendingObjectsTableRollout: 100, - }) - require.NoError(t, err) - - require.True(t, config.UsePendingObjectsTableByProject(makeUUID("00000001-0000-0000-0000-000000000000"))) - require.True(t, config.UsePendingObjectsTableByProject(makeUUID("FFFFFFFF-0000-0000-0000-000000000000"))) -} - func TestExtendedConfig_UseBucketLevelObjectVersioning(t *testing.T) { projectA := testrand.UUID() projectB := testrand.UUID() @@ -197,7 +116,7 @@ func TestExtendedConfig_UseBucketLevelObjectVersioning(t *testing.T) { require.False(t, config.UseBucketLevelObjectVersioningByProject(projectC)) config, err = metainfo.NewExtendedConfig(metainfo.Config{ - UsePendingObjectsTable: false, + UseBucketLevelObjectVersioning: false, UseBucketLevelObjectVersioningProjects: []string{ "01000000-0000-0000-0000-000000000000", }, @@ -205,8 +124,3 @@ func TestExtendedConfig_UseBucketLevelObjectVersioning(t *testing.T) { require.NoError(t, err) require.True(t, config.UseBucketLevelObjectVersioningByProject(uuid.UUID{1})) } - -func makeUUID(uuidString string) uuid.UUID { - value, _ := uuid.FromString(uuidString) - return value -} diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index 1523c49369ef..60cbac15892d 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "fmt" - "sort" "time" "github.com/jtolio/eventkit" @@ -138,8 +137,6 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe nonce = req.EncryptedMetadataNonce[:] } - usePendingObjectsTable := endpoint.config.UsePendingObjectsTableByProject(keyInfo.ProjectID) - opts := metabase.BeginObjectNextVersion{ ObjectStream: metabase.ObjectStream{ ProjectID: keyInfo.ProjectID, @@ -153,8 +150,6 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe EncryptedMetadata: req.EncryptedMetadata, EncryptedMetadataEncryptedKey: req.EncryptedMetadataEncryptedKey, EncryptedMetadataNonce: nonce, - - UsePendingObjectsTable: usePendingObjectsTable, } if !expiresAt.IsZero() { opts.ExpiresAt = &expiresAt @@ -176,8 +171,6 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe EncryptionParameters: req.EncryptionParameters, Placement: int32(bucket.Placement), Versioned: bucket.Versioning == buckets.VersioningEnabled, - - UsePendingObjectsTable: usePendingObjectsTable, }) if err != nil { endpoint.log.Error("internal", zap.Error(err)) @@ -295,8 +288,6 @@ func (endpoint *Endpoint) CommitObject(ctx context.Context, req *pb.ObjectCommit DisallowDelete: !allowDelete, - UsePendingObjectsTable: streamID.UsePendingObjectsTable, - Versioned: streamID.Versioned, } // uplink can send empty metadata with not empty key/nonce @@ -944,7 +935,6 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq cursorKey := metabase.ObjectKey(req.EncryptedCursor) cursorVersion := metabase.Version(0) - cursorStreamID := uuid.UUID{} if len(cursorKey) != 0 { cursorKey = prefix + cursorKey @@ -956,8 +946,6 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq // // it should be set in case of pending and committed objects cursorVersion = metabase.MaxVersion - // for the same reasons as above we need to set maximum UUID as a cursor stream id - cursorStreamID = uuid.Max() } includeCustomMetadata := true @@ -1000,140 +988,36 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq } resp.More = result.More } else { - if status == metabase.Pending && endpoint.config.UsePendingObjectsTableByProject(keyInfo.ProjectID) { - type ObjectListItem struct { - Item *pb.ObjectListItem - StreamID uuid.UUID - } - - pendingObjectsEntries := make([]ObjectListItem, 0, limit) - // TODO when objects table will be free from pending objects only this listing method will remain - err = endpoint.metabase.IteratePendingObjects(ctx, metabase.IteratePendingObjects{ + err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx, + metabase.IterateObjectsWithStatus{ ProjectID: keyInfo.ProjectID, BucketName: string(req.Bucket), Prefix: prefix, - Cursor: metabase.PendingObjectsCursor{ - Key: cursorKey, - StreamID: cursorStreamID, + Cursor: metabase.IterateCursor{ + Key: cursorKey, + Version: cursorVersion, }, Recursive: req.Recursive, BatchSize: limit + 1, + Pending: status == metabase.Pending, IncludeCustomMetadata: includeCustomMetadata, IncludeSystemMetadata: includeSystemMetadata, - }, func(ctx context.Context, it metabase.PendingObjectsIterator) error { - entry := metabase.PendingObjectEntry{} - for it.Next(ctx, &entry) { - item, err := endpoint.pendingObjectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) + }, func(ctx context.Context, it metabase.ObjectsIterator) error { + entry := metabase.ObjectEntry{} + for len(resp.Items) < limit && it.Next(ctx, &entry) { + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) if err != nil { return err } - pendingObjectsEntries = append(pendingObjectsEntries, ObjectListItem{ - Item: item, - StreamID: entry.StreamID, - }) - } - return nil - }) - if err != nil { - return nil, endpoint.convertMetabaseErr(err) - } - - // we always need results from both tables for now - objectsEntries := make([]ObjectListItem, 0, limit) - err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx, - metabase.IterateObjectsWithStatus{ - ProjectID: keyInfo.ProjectID, - BucketName: string(req.Bucket), - Prefix: prefix, - Cursor: metabase.IterateCursor{ - Key: cursorKey, - Version: cursorVersion, - }, - Recursive: req.Recursive, - BatchSize: limit + 1, - Pending: true, - IncludeCustomMetadata: includeCustomMetadata, - IncludeSystemMetadata: includeSystemMetadata, - }, func(ctx context.Context, it metabase.ObjectsIterator) error { - entry := metabase.ObjectEntry{} - for it.Next(ctx, &entry) { - item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) - if err != nil { - return err - } - objectsEntries = append(objectsEntries, ObjectListItem{ - Item: item, - StreamID: entry.StreamID, - }) - } - return nil - }, - ) - if err != nil { - return nil, endpoint.convertMetabaseErr(err) - } - - // combine results from both tables and sort them by object key to be able to cut results to the limit - allResults := make([]ObjectListItem, 0, len(pendingObjectsEntries)+len(objectsEntries)) - allResults = append(allResults, pendingObjectsEntries...) - allResults = append(allResults, objectsEntries...) - sort.Slice(allResults, func(i, j int) bool { - keyCompare := bytes.Compare(allResults[i].Item.EncryptedObjectKey, allResults[j].Item.EncryptedObjectKey) - switch { - case keyCompare == -1: - return true - case keyCompare == 1: - return false - case allResults[i].Item.Version < allResults[j].Item.Version: - return true - case allResults[i].Item.Version > allResults[j].Item.Version: - return false - default: - return allResults[i].StreamID.Less(allResults[j].StreamID) + resp.Items = append(resp.Items, item) } - }) - if len(allResults) >= limit { - resp.More = len(allResults) > limit - allResults = allResults[:limit] - } - resp.Items = make([]*pb.ObjectListItem, len(allResults)) - for i, objectListItem := range allResults { - resp.Items[i] = objectListItem.Item - } - } else { - // we always need results from both tables for now - err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx, - metabase.IterateObjectsWithStatus{ - ProjectID: keyInfo.ProjectID, - BucketName: string(req.Bucket), - Prefix: prefix, - Cursor: metabase.IterateCursor{ - Key: cursorKey, - Version: cursorVersion, - }, - Recursive: req.Recursive, - BatchSize: limit + 1, - Pending: status == metabase.Pending, - IncludeCustomMetadata: includeCustomMetadata, - IncludeSystemMetadata: includeSystemMetadata, - }, func(ctx context.Context, it metabase.ObjectsIterator) error { - entry := metabase.ObjectEntry{} - for len(resp.Items) < limit && it.Next(ctx, &entry) { - item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, placement) - if err != nil { - return err - } - resp.Items = append(resp.Items, item) - } - // we need to take into account also potential results from IteratePendingObjects - resp.More = resp.More || it.Next(ctx, &entry) - return nil - }, - ) - if err != nil { - return nil, endpoint.convertMetabaseErr(err) - } + resp.More = it.Next(ctx, &entry) + return nil + }, + ) + if err != nil { + return nil, endpoint.convertMetabaseErr(err) } } endpoint.log.Info("Object List", zap.Stringer("Project ID", keyInfo.ProjectID), zap.String("operation", "list"), zap.String("type", "object")) @@ -1192,9 +1076,6 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb. } metabase.ListLimit.Ensure(&limit) - resp = &pb.ObjectListPendingStreamsResponse{} - resp.Items = []*pb.ObjectListItem{} - options := metabase.IteratePendingObjectsByKey{ ObjectLocation: metabase.ObjectLocation{ ProjectID: keyInfo.ProjectID, @@ -1204,24 +1085,6 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb. BatchSize: limit + 1, Cursor: cursor, } - if endpoint.config.UsePendingObjectsTableByProject(keyInfo.ProjectID) { - err = endpoint.metabase.IteratePendingObjectsByKeyNew(ctx, - options, func(ctx context.Context, it metabase.PendingObjectsIterator) error { - entry := metabase.PendingObjectEntry{} - for it.Next(ctx, &entry) { - item, err := endpoint.pendingObjectEntryToProtoListItem(ctx, req.Bucket, entry, "", true, true, placement) - if err != nil { - return err - } - resp.Items = append(resp.Items, item) - } - return nil - }, - ) - if err != nil { - return nil, endpoint.convertMetabaseErr(err) - } - } objectsEntries := make([]*pb.ObjectListItem, 0, limit) err = endpoint.metabase.IteratePendingObjectsByKey(ctx, @@ -1241,6 +1104,9 @@ func (endpoint *Endpoint) ListPendingObjectStreams(ctx context.Context, req *pb. return nil, endpoint.convertMetabaseErr(err) } + resp = &pb.ObjectListPendingStreamsResponse{} + resp.Items = []*pb.ObjectListItem{} + // TODO currently this request have a bug if we would like to list all pending objects // with the same name if we have more than single page of them (1000) because protobuf // cursor doesn't include additional things like StreamID so it's a bit useless to do @@ -1331,7 +1197,7 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB ObjectKey: metabase.ObjectKey(pbStreamID.EncryptedObjectKey), Version: metabase.Version(pbStreamID.Version), StreamID: streamID, - }, pbStreamID.UsePendingObjectsTable) + }) } } } else { @@ -1715,87 +1581,6 @@ func (endpoint *Endpoint) objectEntryToProtoListItem(ctx context.Context, bucket return item, nil } -func (endpoint *Endpoint) pendingObjectEntryToProtoListItem(ctx context.Context, bucket []byte, - entry metabase.PendingObjectEntry, prefixToPrependInSatStreamID metabase.ObjectKey, - includeSystem, includeMetadata bool, placement storj.PlacementConstraint) (item *pb.ObjectListItem, err error) { - - item = &pb.ObjectListItem{ - EncryptedObjectKey: []byte(entry.ObjectKey), - Status: pb.Object_UPLOADING, - } - - expiresAt := time.Time{} - if entry.ExpiresAt != nil { - expiresAt = *entry.ExpiresAt - } - - if includeSystem { - item.ExpiresAt = expiresAt - item.CreatedAt = entry.CreatedAt - } - - if includeMetadata { - var nonce storj.Nonce - if len(entry.EncryptedMetadataNonce) > 0 { - nonce, err = storj.NonceFromBytes(entry.EncryptedMetadataNonce) - if err != nil { - return nil, err - } - } - - streamMeta := &pb.StreamMeta{} - err = pb.Unmarshal(entry.EncryptedMetadata, streamMeta) - if err != nil { - return nil, err - } - - if entry.Encryption != (storj.EncryptionParameters{}) { - streamMeta.EncryptionType = int32(entry.Encryption.CipherSuite) - streamMeta.EncryptionBlockSize = entry.Encryption.BlockSize - } - - if entry.EncryptedMetadataEncryptedKey != nil { - streamMeta.LastSegmentMeta = &pb.SegmentMeta{ - EncryptedKey: entry.EncryptedMetadataEncryptedKey, - KeyNonce: entry.EncryptedMetadataNonce, - } - } - - metadataBytes, err := pb.Marshal(streamMeta) - if err != nil { - return nil, err - } - - item.EncryptedMetadata = metadataBytes - item.EncryptedMetadataNonce = nonce - item.EncryptedMetadataEncryptedKey = entry.EncryptedMetadataEncryptedKey - } - - // Add Stream ID to list items if listing is for pending objects. - // The client requires the Stream ID to use in the MultipartInfo. - satStreamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{ - Bucket: bucket, - EncryptedObjectKey: append([]byte(prefixToPrependInSatStreamID), []byte(entry.ObjectKey)...), - Version: int64(metabase.PendingVersion), - CreationDate: entry.CreatedAt, - ExpirationDate: expiresAt, - StreamId: entry.StreamID[:], - MultipartObject: true, - EncryptionParameters: &pb.EncryptionParameters{ - CipherSuite: pb.CipherSuite(entry.Encryption.CipherSuite), - BlockSize: int64(entry.Encryption.BlockSize), - }, - Placement: int32(placement), - UsePendingObjectsTable: true, - }) - if err != nil { - return nil, err - } - item.StreamId = &satStreamID - - return item, nil -} - // DeleteCommittedObject deletes all the pieces of the storage nodes that belongs // to the specified object. // @@ -1867,17 +1652,12 @@ func (endpoint *Endpoint) DeleteCommittedObject( // // NOTE: this method is exported for being able to individually test it without // having import cycles. -func (endpoint *Endpoint) DeletePendingObject(ctx context.Context, stream metabase.ObjectStream, usePendingObjectTable bool) (deletedObjects []*pb.Object, err error) { +func (endpoint *Endpoint) DeletePendingObject(ctx context.Context, stream metabase.ObjectStream) (deletedObjects []*pb.Object, err error) { req := metabase.DeletePendingObject{ ObjectStream: stream, } - var result metabase.DeleteObjectResult - if usePendingObjectTable { - result, err = endpoint.metabase.DeletePendingObjectNew(ctx, req) - } else { - result, err = endpoint.metabase.DeletePendingObject(ctx, req) - } + result, err := endpoint.metabase.DeletePendingObject(ctx, req) if err != nil { return nil, err } diff --git a/satellite/metainfo/endpoint_object_test.go b/satellite/metainfo/endpoint_object_test.go index e17a86a4f10f..dc7fd17b1878 100644 --- a/satellite/metainfo/endpoint_object_test.go +++ b/satellite/metainfo/endpoint_object_test.go @@ -19,7 +19,6 @@ import ( "github.com/zeebo/errs" "go.uber.org/zap" "golang.org/x/exp/maps" - "golang.org/x/exp/slices" "storj.io/common/errs2" "storj.io/common/identity" @@ -40,7 +39,6 @@ import ( "storj.io/storj/satellite/buckets" "storj.io/storj/satellite/internalpb" "storj.io/storj/satellite/metabase" - "storj.io/storj/satellite/metabase/metabasetest" "storj.io/storj/satellite/metainfo" "storj.io/storj/satellite/overlay" "storj.io/storj/storagenode" @@ -824,296 +822,6 @@ func TestEndpoint_Object_No_StorageNodes(t *testing.T) { }) } -func TestEndpoint_Object_No_StorageNodes_UsePendingObjectsTable(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: testplanet.Combine( - func(log *zap.Logger, index int, config *satellite.Config) { - config.Metainfo.UsePendingObjectsTable = true - }, - ), - Uplink: func(log *zap.Logger, index int, config *testplanet.UplinkConfig) { - // we need to not encrypt paths because one of tests is creating object - // manually in DB directly. With path encryption listing would skip such object. - config.DefaultPathCipher = storj.EncNull - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - projectID := planet.Uplinks[0].Projects[0].ID - - project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) - require.NoError(t, err) - defer ctx.Check(project.Close) - - bucketName := "testbucket" - deleteBucket := func() error { - _, err := project.DeleteBucketWithObjects(ctx, bucketName) - return err - } - - t.Run("UploadID check", func(t *testing.T) { - defer ctx.Check(deleteBucket) - - _, err = project.CreateBucket(ctx, bucketName) - require.NoError(t, err) - - for _, tt := range []struct { - expires time.Time - options uplink.ListUploadsOptions - }{ - { - options: uplink.ListUploadsOptions{System: false, Custom: false}, - }, - { - options: uplink.ListUploadsOptions{System: true, Custom: false}, - }, - { - options: uplink.ListUploadsOptions{System: true, Custom: true}, - }, - { - options: uplink.ListUploadsOptions{System: false, Custom: true}, - }, - { - expires: time.Now().Add(24 * time.Hour), - options: uplink.ListUploadsOptions{System: false, Custom: false}, - }, - { - expires: time.Now().Add(24 * time.Hour), - options: uplink.ListUploadsOptions{System: true, Custom: false}, - }, - { - expires: time.Now().Add(24 * time.Hour), - options: uplink.ListUploadsOptions{System: true, Custom: true}, - }, - { - expires: time.Now().Add(24 * time.Hour), - options: uplink.ListUploadsOptions{System: false, Custom: true}, - }, - } { - t.Run(fmt.Sprintf("expires:%v;system:%v;custom:%v", !tt.expires.IsZero(), tt.options.System, tt.options.Custom), func(t *testing.T) { - objectKey := "multipart-object" - uploadInfo, err := project.BeginUpload(ctx, bucketName, objectKey, &uplink.UploadOptions{ - Expires: tt.expires, - }) - require.NoError(t, err) - - iterator := project.ListUploads(ctx, bucketName, &tt.options) - require.True(t, iterator.Next()) - require.Equal(t, uploadInfo.UploadID, iterator.Item().UploadID) - require.NoError(t, iterator.Err()) - - err = project.AbortUpload(ctx, bucketName, objectKey, iterator.Item().UploadID) - require.NoError(t, err) - }) - } - }) - - t.Run("object in pending_object and object tables", func(t *testing.T) { - defer ctx.Check(deleteBucket) - - _, err = project.CreateBucket(ctx, bucketName) - require.NoError(t, err) - - // pending object in objects table - _, err := planet.Satellites[0].Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{ - ObjectStream: metabase.ObjectStream{ - ProjectID: projectID, - BucketName: bucketName, - ObjectKey: metabase.ObjectKey("objects_table"), - StreamID: testrand.UUID(), - Version: metabase.NextVersion, - }, - Encryption: metabasetest.DefaultEncryption, - UsePendingObjectsTable: false, - }) - require.NoError(t, err) - - // pending object in pending_objects table - _, err = project.BeginUpload(ctx, bucketName, "pending_object_table", nil) - require.NoError(t, err) - - keys := []string{} - iterator := project.ListUploads(ctx, bucketName, nil) - for iterator.Next() { - keys = append(keys, iterator.Item().Key) - } - require.NoError(t, iterator.Err()) - require.ElementsMatch(t, []string{ - "objects_table", - "pending_object_table", - }, keys) - - iterator = project.ListUploads(ctx, bucketName, nil) - for iterator.Next() { - require.NoError(t, project.AbortUpload(ctx, bucketName, iterator.Item().Key, iterator.Item().UploadID)) - } - require.NoError(t, iterator.Err()) - - iterator = project.ListUploads(ctx, bucketName, nil) - require.False(t, iterator.Next()) - require.NoError(t, iterator.Err()) - }) - - t.Run("ListPendingObjectStreams", func(t *testing.T) { - defer ctx.Check(deleteBucket) - - _, err = project.CreateBucket(ctx, bucketName) - require.NoError(t, err) - - _, err = project.BeginUpload(ctx, bucketName, "pending_object", nil) - require.NoError(t, err) - - iterator := project.ListUploads(ctx, bucketName, &uplink.ListUploadsOptions{ - Prefix: "pending_object", - }) - require.True(t, iterator.Next()) - require.Equal(t, "pending_object", iterator.Item().Key) - require.NoError(t, iterator.Err()) - - // pending object in objects table - _, err := planet.Satellites[0].Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{ - ObjectStream: metabase.ObjectStream{ - ProjectID: projectID, - BucketName: bucketName, - ObjectKey: metabase.ObjectKey("pending_object"), - StreamID: testrand.UUID(), - Version: metabase.NextVersion, - }, - Encryption: metabasetest.DefaultEncryption, - UsePendingObjectsTable: false, - }) - require.NoError(t, err) - - keys := []string{} - iterator = project.ListUploads(ctx, bucketName, &uplink.ListUploadsOptions{ - Prefix: "pending_object", - }) - for iterator.Next() { - keys = append(keys, iterator.Item().Key) - } - require.NoError(t, iterator.Err()) - - // we should have two objects with the same name, one from pending_objects - // table and second from objects table - require.ElementsMatch(t, []string{ - "pending_object", - "pending_object", - }, keys) - }) - - t.Run("mixed objects from both tables", func(t *testing.T) { - type TestCases struct { - PendingObjectsTable []string - ObjectsTable []string - } - - for _, tc := range []TestCases{ - { - PendingObjectsTable: []string{"A", "B", "C"}, - ObjectsTable: []string{"X", "Y", "Z"}, - }, - { - PendingObjectsTable: []string{"A", "Y", "C"}, - ObjectsTable: []string{"X", "B", "Z"}, - }, - { - - PendingObjectsTable: []string{"X", "B", "Z"}, - ObjectsTable: []string{"A", "Y", "C"}, - }, - { - PendingObjectsTable: []string{"A", "B", "C", "X", "Y", "Z"}, - }, - { - ObjectsTable: []string{"A", "B", "C", "X", "Y", "Z"}, - }, - } { - t.Run("", func(t *testing.T) { - defer ctx.Check(deleteBucket) - - _, err = project.CreateBucket(ctx, bucketName) - require.NoError(t, err) - - allKeys := []string{} - // create objects in pending_objects table - for _, key := range tc.PendingObjectsTable { - _, err = project.BeginUpload(ctx, bucketName, key, nil) - require.NoError(t, err) - allKeys = append(allKeys, key) - } - - // create objects in objects table - for _, key := range tc.ObjectsTable { - _, err := planet.Satellites[0].Metabase.DB.BeginObjectNextVersion(ctx, metabase.BeginObjectNextVersion{ - ObjectStream: metabase.ObjectStream{ - ProjectID: projectID, - BucketName: bucketName, - ObjectKey: metabase.ObjectKey(key), - StreamID: testrand.UUID(), - Version: metabase.NextVersion, - }, - Encryption: metabasetest.DefaultEncryption, - UsePendingObjectsTable: false, - }) - require.NoError(t, err) - allKeys = append(allKeys, key) - } - - slices.Sort(allKeys) - - for _, limit := range []int{1, 2, 3, 10, 1000} { - ctx := testuplink.WithListLimit(ctx, limit) - resultKeys := []string{} - iterator := project.ListUploads(ctx, bucketName, nil) - for iterator.Next() { - resultKeys = append(resultKeys, iterator.Item().Key) - } - require.NoError(t, iterator.Err()) - require.Equal(t, allKeys, resultKeys) - } - }) - } - }) - - t.Run("override on upload with segments", func(t *testing.T) { - defer ctx.Check(deleteBucket) - - for i := 0; i < 5; i++ { - err := planet.Uplinks[0].Upload(ctx, planet.Satellites[0], bucketName, "test-object", testrand.Bytes(1*memory.KiB)) - require.NoError(t, err) - } - - segments, err := planet.Satellites[0].Metabase.DB.TestingAllSegments(ctx) - require.NoError(t, err) - require.Len(t, segments, 1) - }) - - t.Run("upload with expiration time", func(t *testing.T) { - defer ctx.Check(deleteBucket) - - expectedExpirateAt := time.Now().Add(time.Hour) - err := planet.Uplinks[0].UploadWithExpiration(ctx, planet.Satellites[0], bucketName, "test-object", testrand.Bytes(1*memory.KiB), expectedExpirateAt) - require.NoError(t, err) - - project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) - require.NoError(t, err) - defer ctx.Check(project.Close) - - object, err := project.StatObject(ctx, bucketName, "test-object") - require.NoError(t, err) - require.WithinDuration(t, expectedExpirateAt, object.System.Expires, time.Second) - - iterator := project.ListObjects(ctx, bucketName, &uplink.ListObjectsOptions{ - System: true, - }) - require.True(t, iterator.Next()) - require.WithinDuration(t, expectedExpirateAt, iterator.Item().System.Expires, time.Second) - require.NoError(t, iterator.Err()) - }) - }) -} - func TestEndpoint_Object_UploadLimit(t *testing.T) { testplanet.Run(t, testplanet.Config{ SatelliteCount: 1, UplinkCount: 1, @@ -2128,74 +1836,6 @@ func TestEndpoint_DeleteCommittedObject(t *testing.T) { testDeleteObject(t, createObject, deleteObject) } -func TestEndpoint_DeletePendingObject(t *testing.T) { - createPendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte) { - // TODO This should be replaced by a call to testplanet.Uplink.MultipartUpload when available. - project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) - require.NoError(t, err, "failed to retrieve project") - defer func() { require.NoError(t, project.Close()) }() - - _, err = project.EnsureBucket(ctx, bucket) - require.NoError(t, err, "failed to create bucket") - - info, err := project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{}) - require.NoError(t, err, "failed to start multipart upload") - - upload, err := project.UploadPart(ctx, bucket, key, info.UploadID, 1) - require.NoError(t, err, "failed to put object part") - _, err = upload.Write(data) - require.NoError(t, err, "failed to put object part") - require.NoError(t, upload.Commit(), "failed to put object part") - } - deletePendingObject := func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID) { - projectID := planet.Uplinks[0].Projects[0].ID - - deletedObjects, err := planet.Satellites[0].Metainfo.Endpoint.DeletePendingObject(ctx, - metabase.ObjectStream{ - ProjectID: projectID, - BucketName: bucket, - ObjectKey: metabase.ObjectKey(encryptedKey), - Version: metabase.DefaultVersion, - StreamID: streamID, - }, false) - require.NoError(t, err) - require.Len(t, deletedObjects, 1) - } - testDeleteObject(t, createPendingObject, deletePendingObject) -} - -func TestEndpoint_AbortMultipartUpload_UsePendingObjectsTable(t *testing.T) { - testplanet.Run(t, testplanet.Config{ - SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1, - Reconfigure: testplanet.Reconfigure{ - Satellite: func(log *zap.Logger, index int, config *satellite.Config) { - config.Metainfo.UsePendingObjectsTable = true - }, - }, - }, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) { - project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0]) - require.NoError(t, err) - defer ctx.Check(project.Close) - - _, err = project.CreateBucket(ctx, "testbucket") - require.NoError(t, err) - - uploadInfo, err := project.BeginUpload(ctx, "testbucket", "key", nil) - require.NoError(t, err) - - objects, err := planet.Satellites[0].Metabase.DB.TestingAllPendingObjects(ctx) - require.NoError(t, err) - require.Len(t, objects, 1) - - err = project.AbortUpload(ctx, "testbucket", "key", uploadInfo.UploadID) - require.NoError(t, err) - - objects, err = planet.Satellites[0].Metabase.DB.TestingAllPendingObjects(ctx) - require.NoError(t, err) - require.Len(t, objects, 0) - }) -} - func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, key string, data []byte), deleteObject func(ctx context.Context, t *testing.T, planet *testplanet.Planet, bucket, encryptedKey string, streamID uuid.UUID), diff --git a/satellite/metainfo/endpoint_segment.go b/satellite/metainfo/endpoint_segment.go index 17ebc99eb6bd..026c1a7e3055 100644 --- a/satellite/metainfo/endpoint_segment.go +++ b/satellite/metainfo/endpoint_segment.go @@ -123,8 +123,6 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin }, RootPieceID: rootPieceID, Pieces: pieces, - - UsePendingObjectsTable: streamID.UsePendingObjectsTable, }) if err != nil { return nil, endpoint.convertMetabaseErr(err) @@ -379,8 +377,6 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm Redundancy: rs, Pieces: pieces, Placement: storj.PlacementConstraint(streamID.Placement), - - UsePendingObjectsTable: streamID.UsePendingObjectsTable, } err = endpoint.validateRemoteSegment(ctx, mbCommitSegment, originalLimits) @@ -496,8 +492,6 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment EncryptedETag: req.EncryptedETag, InlineData: req.EncryptedInlineData, - - UsePendingObjectsTable: streamID.UsePendingObjectsTable, }) if err != nil { return nil, endpoint.convertMetabaseErr(err) diff --git a/scripts/testdata/satellite-config.yaml.lock b/scripts/testdata/satellite-config.yaml.lock index a659c802562c..212c9ae429d6 100755 --- a/scripts/testdata/satellite-config.yaml.lock +++ b/scripts/testdata/satellite-config.yaml.lock @@ -757,9 +757,6 @@ identity.key-path: /root/.local/share/storj/identity/satellite/identity.key # enable the use of bucket level object versioning # metainfo.use-bucket-level-object-versioning: false -# enable new flow for upload which is using pending_objects table -# metainfo.use-pending-objects-table: false - # address(es) to send telemetry to (comma-separated) # metrics.addr: collectora.storj.io:9000