@@ -56,7 +56,7 @@ func (db *DB) DeleteExpiredObjects(ctx context.Context, opts DeleteExpiredObject
5656 }
5757
5858 ok := limiter .Go (ctx , func () {
59- objectsDeleted , segmentsDeleted , err := a .DeleteObjectsAndSegments (ctx , expiredObjects )
59+ objectsDeleted , segmentsDeleted , err := a .DeleteObjectsAndSegmentsNoVerify (ctx , expiredObjects )
6060 if err != nil {
6161 db .log .Error ("failed to delete expired objects from DB" , zap .Error (err ), zap .String ("adapter" , fmt .Sprintf ("%T" , a )))
6262 }
@@ -352,8 +352,8 @@ func (db *DB) deleteObjectsAndSegmentsBatch(ctx context.Context, batchsize int,
352352 }
353353}
354354
355- // DeleteObjectsAndSegments deletes expired objects and associated segments.
356- func (p * PostgresAdapter ) DeleteObjectsAndSegments (ctx context.Context , objects []ObjectStream ) (objectsDeleted , segmentsDeleted int64 , err error ) {
355+ // DeleteObjectsAndSegmentsNoVerify deletes expired objects and associated segments.
356+ func (p * PostgresAdapter ) DeleteObjectsAndSegmentsNoVerify (ctx context.Context , objects []ObjectStream ) (objectsDeleted , segmentsDeleted int64 , err error ) {
357357 defer mon .Task ()(& ctx )(& err )
358358
359359 if len (objects ) == 0 {
@@ -406,60 +406,49 @@ func (p *PostgresAdapter) DeleteObjectsAndSegments(ctx context.Context, objects
406406 return objectsDeleted , segmentsDeleted , nil
407407}
408408
409- // DeleteObjectsAndSegments deletes expired objects and associated segments.
410- func (s * SpannerAdapter ) DeleteObjectsAndSegments (ctx context.Context , objects []ObjectStream ) (objectsDeleted , segmentsDeleted int64 , err error ) {
409+ // DeleteObjectsAndSegmentsNoVerify deletes expired objects and associated segments.
410+ //
411+ // The implementation does not do extra verification whether the stream id-s belong or belonged to the objects.
412+ // So, if the callers supplies objects with incorrect StreamID-s it may end up deleting unrelated segments.
413+ func (s * SpannerAdapter ) DeleteObjectsAndSegmentsNoVerify (ctx context.Context , objects []ObjectStream ) (objectsDeleted , segmentsDeleted int64 , err error ) {
411414 defer mon .Task ()(& ctx )(& err )
412415
413416 if len (objects ) == 0 {
414417 return 0 , 0 , nil
415418 }
416419
417420 _ , err = s .client .ReadWriteTransaction (ctx , func (ctx context.Context , tx * spanner.ReadWriteTransaction ) error {
418- // can't use Mutations here, since we only want to delete objects by the specified keys
419- // if and only if the stream_id matches.
420- var statements []spanner.Statement
421+ var streamIDs [][]byte
421422 for _ , obj := range objects {
422- obj := obj
423- statements = append (statements , spanner.Statement {
423+ streamIDs = append (streamIDs , obj .StreamID .Bytes ())
424+ }
425+
426+ deletedCounts , err := tx .BatchUpdate (ctx , []spanner.Statement {
427+ {
424428 SQL : `
425429 DELETE FROM objects
426- WHERE (project_id, bucket_name, object_key, version, stream_id) = (@ project_id, @ bucket_name, @ object_key, @ version, @ stream_id)
430+ WHERE STRUCT<ProjectID BYTES, BucketName STRING, ObjectKey BYTES, Version INT64, StreamID BYTES>( project_id, bucket_name, object_key, version, stream_id) IN UNNEST(@objects )
427431 ` ,
428- Params : map [string ]interface {}{
429- "project_id" : obj .ProjectID ,
430- "bucket_name" : obj .BucketName ,
431- "object_key" : obj .ObjectKey ,
432- "version" : obj .Version ,
433- "stream_id" : obj .StreamID ,
432+ Params : map [string ]any {
433+ "objects" : objects ,
434434 },
435- })
436- }
437- numDeleteds , err := tx .BatchUpdate (ctx , statements )
435+ },
436+ {
437+ SQL : `
438+ DELETE FROM segments
439+ WHERE stream_id IN UNNEST(@stream_ids)
440+ ` ,
441+ Params : map [string ]any {
442+ "stream_ids" : streamIDs ,
443+ },
444+ },
445+ })
438446 if err != nil {
439- return Error .Wrap (err )
440- }
441- for _ , numDeleted := range numDeleteds {
442- objectsDeleted += numDeleted
447+ return err
443448 }
444449
445- stmts := make ([]spanner.Statement , len (objects ))
446- for ix , object := range objects {
447- stmts [ix ] = spanner.Statement {
448- SQL : `DELETE FROM segments WHERE @stream_id = stream_id` ,
449- Params : map [string ]interface {}{
450- "stream_id" : object .StreamID .Bytes (),
451- },
452- }
453- }
454- if len (stmts ) > 0 {
455- numSegments , err := tx .BatchUpdate (ctx , stmts )
456- if err != nil {
457- return Error .Wrap (err )
458- }
459- for _ , v := range numSegments {
460- segmentsDeleted += v
461- }
462- }
450+ objectsDeleted = deletedCounts [0 ]
451+ segmentsDeleted = deletedCounts [1 ]
463452 return nil
464453 })
465454 if err != nil {
0 commit comments