Skip to content

Commit

Permalink
reset image cleanup TTL on Submit
Browse files Browse the repository at this point in the history
Also:

- make commitTTL equal to EditDuration,
  so that image is committed to permanent
  storage after comment can no longer be edited
- move cleanupTTL to Cleanup function,
  as it's not used elsewhere in the code
- add variables to some tests sleeps, so that
  instead of being magic numbers they would
  rely on timers of structures they suppose
  to wait for
  • Loading branch information
paskal committed May 16, 2021
1 parent e40c68d commit 5f1dc2c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 23 deletions.
33 changes: 15 additions & 18 deletions backend/app/store/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
)

// Service wraps Store with common functions needed for any store implementation
// It also provides async Submit with func param retrieving all submitting ids.
// Submitted ids committed (i.e. moved from staging to final) on commitTTL expiration.
// It also provides async Submit with func param retrieving all submitting IDs.
// Submitted IDs committed (i.e. moved from staging to final) on ServiceParams.EditDuration expiration.
type Service struct {
ServiceParams

Expand All @@ -56,12 +56,6 @@ type ServiceParams struct {
MaxSize int
MaxHeight int
MaxWidth int

// duration of time after which images are checked and committed if still
// present in the submitted comment after it's EditDuration is expired
commitTTL time.Duration
// duration of time after which images are deleted from staging
cleanupTTL time.Duration
}

// StoreInfo contains image store meta information
Expand Down Expand Up @@ -95,10 +89,6 @@ type submitReq struct {

// NewService returns new Service instance
func NewService(s Store, p ServiceParams) *Service {
p.commitTTL = p.EditDuration * 15 / 10 // Commit call on every 1.5 * EditDuration
p.cleanupTTL = p.EditDuration * 25 / 10 // Cleanup call on every 2.5 * EditDuration
// In case Cleanup and Submit start at the same time (case of stale staging images check
// on the program start) these TTL values guarantee that Commit will happen before Cleanup.
return &Service{ServiceParams: p, store: s}
}

Expand Down Expand Up @@ -127,8 +117,8 @@ func (s *Service) Submit(idsFn func() []string) {
go func() {
defer s.wg.Done()
for req := range s.submitCh {
// wait for commitTTL expiration with emergency pass on term
for atomic.LoadInt32(&s.term) == 0 && time.Since(req.TS) <= s.commitTTL {
// wait for EditDuration expiration with emergency pass on term
for atomic.LoadInt32(&s.term) == 0 && time.Since(req.TS) <= s.EditDuration {
time.Sleep(time.Millisecond * 10) // small sleep to relive busy wait but keep reactive for term (close)
}
err := s.SubmitAndCommit(req.idsFn)
Expand All @@ -144,6 +134,12 @@ func (s *Service) Submit(idsFn func() []string) {

atomic.AddInt32(&s.submitCount, 1)

// reset cleanup timer before submitting the images
// to prevent them from being cleaned up while waiting for EditDuration to expire
for _, imgId := range idsFn() {
s.store.ResetCleanupTimer(imgId)
}

s.submitCh <- submitReq{idsFn: idsFn, TS: time.Now()}
}

Expand Down Expand Up @@ -186,17 +182,18 @@ func (s *Service) ExtractPictures(commentHTML string) (ids []string, err error)
return ids, nil
}

// Cleanup runs periodic cleanup with cleanupTTL. Blocking loop, should be called inside of goroutine by consumer
// Cleanup runs periodic cleanup with 1.5*ServiceParams.EditDuration. Blocking loop, should be called inside of goroutine by consumer
func (s *Service) Cleanup(ctx context.Context) {
log.Printf("[INFO] start pictures cleanup, staging ttl=%v", s.cleanupTTL)
cleanupTTL := s.EditDuration * 15 / 10 // cleanup images older than 1.5 * EditDuration
log.Printf("[INFO] start pictures cleanup, staging ttl=%v", cleanupTTL)

for {
select {
case <-ctx.Done():
log.Printf("[INFO] cleanup terminated, %v", ctx.Err())
return
case <-time.After(s.cleanupTTL):
if err := s.store.Cleanup(ctx, s.cleanupTTL); err != nil {
case <-time.After(cleanupTTL):
if err := s.store.Cleanup(ctx, cleanupTTL); err != nil {
log.Printf("[WARN] failed to cleanup, %v", err)
}
}
Expand Down
12 changes: 10 additions & 2 deletions backend/app/store/image/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,24 @@ func TestService_Cleanup(t *testing.T) {
store.On("Cleanup", mock.Anything, mock.Anything).Times(10).Return(nil)

svc := NewService(&store, ServiceParams{EditDuration: 20 * time.Millisecond})
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*549)
// cancel context after 2.1 cleanup TTLs
ctx, cancel := context.WithTimeout(context.Background(), svc.EditDuration / 100 * 15 * 21)
defer cancel()
svc.Cleanup(ctx)
store.AssertNumberOfCalls(t, "Cleanup", 10)
store.AssertNumberOfCalls(t, "Cleanup", 2)
}

func TestService_Submit(t *testing.T) {
store := MockStore{}
store.On("Commit", mock.Anything, mock.Anything).Times(7).Return(nil)
store.On("ResetCleanupTimer", mock.Anything, mock.Anything).Times(7).Return(nil)
svc := NewService(&store, ServiceParams{ImageAPI: "/blah/", EditDuration: time.Millisecond * 100})
svc.Submit(func() []string { return []string{"id1", "id2", "id3"} })
store.AssertNumberOfCalls(t, "ResetCleanupTimer", 3)
err := svc.SubmitAndCommit(func() []string { return []string{"id4", "id5"} })
assert.NoError(t, err)
svc.Submit(func() []string { return []string{"id6", "id7"} })
store.AssertNumberOfCalls(t, "ResetCleanupTimer", 5)
svc.Submit(nil)
store.AssertNumberOfCalls(t, "Commit", 2)
time.Sleep(time.Millisecond * 175)
Expand All @@ -151,22 +155,26 @@ func TestService_Submit(t *testing.T) {
func TestService_Close(t *testing.T) {
store := MockStore{}
store.On("Commit", mock.Anything, mock.Anything).Times(5).Return(nil)
store.On("ResetCleanupTimer", mock.Anything, mock.Anything).Times(5).Return(nil)
svc := Service{store: &store, ServiceParams: ServiceParams{ImageAPI: "/blah/", EditDuration: time.Hour * 24}}
svc.Submit(func() []string { return []string{"id1", "id2", "id3"} })
svc.Submit(func() []string { return []string{"id4", "id5"} })
svc.Submit(nil)
store.AssertNumberOfCalls(t, "ResetCleanupTimer", 5)
svc.Close(context.TODO())
store.AssertNumberOfCalls(t, "Commit", 5)
}

func TestService_SubmitDelay(t *testing.T) {
store := MockStore{}
store.On("Commit", mock.Anything, mock.Anything).Times(5).Return(nil)
store.On("ResetCleanupTimer", mock.Anything, mock.Anything).Times(5).Return(nil)
svc := NewService(&store, ServiceParams{EditDuration: 20 * time.Millisecond})
svc.Submit(func() []string { return []string{"id1", "id2", "id3"} })
time.Sleep(150 * time.Millisecond) // let first batch to pass TTL
svc.Submit(func() []string { return []string{"id4", "id5"} })
svc.Submit(nil)
store.AssertNumberOfCalls(t, "ResetCleanupTimer", 5)
store.AssertNumberOfCalls(t, "Commit", 3)
svc.Close(context.TODO())
store.AssertNumberOfCalls(t, "Commit", 5)
Expand Down
13 changes: 10 additions & 3 deletions backend/app/store/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,10 +1342,12 @@ func TestService_submitImages(t *testing.T) {
mockStore := image.MockStore{}
mockStore.On("Commit", "dev/pic1.png").Once().Return(nil)
mockStore.On("Commit", "dev/pic2.png").Once().Return(nil)
mockStore.On("ResetCleanupTimer", "dev/pic1.png").Once().Return(nil)
mockStore.On("ResetCleanupTimer", "dev/pic2.png").Once().Return(nil)
imgSvc := image.NewService(&mockStore,
image.ServiceParams{
EditDuration: 50 * time.Millisecond,
ImageAPI: "/",
ImageAPI: "/images/dev/",
ProxyAPI: "/non_existent",
})
defer imgSvc.Close(context.TODO())
Expand All @@ -1367,7 +1369,8 @@ func TestService_submitImages(t *testing.T) {
assert.NoError(t, err)

b.submitImages(c)
time.Sleep(250 * time.Millisecond)
mockStore.AssertNumberOfCalls(t, "ResetCleanupTimer", 2)
time.Sleep(b.EditDuration + 100 * time.Millisecond)
mockStore.AssertNumberOfCalls(t, "Commit", 2)
}

Expand All @@ -1385,6 +1388,10 @@ func TestService_ResubmitStagingImages(t *testing.T) {
defer teardown()
b := DataStore{Engine: eng, EditDuration: 10 * time.Millisecond, ImageService: imgSvc}

mockStore.On("ResetCleanupTimer", "dev_user/bqf122eq9r8ad657n3ng").Once().Return(nil)
mockStore.On("ResetCleanupTimer", "dev_user/bqf321eq9r8ad657n3ng").Once().Return(nil)
mockStore.On("ResetCleanupTimer", "cached_images/12318fbd4c55e9d177b8b5ae197bc89c5afd8e07-a41fcb00643f28d700504256ec81cbf2e1aac53e").Once().Return(nil)

// create comment with three images without preparing it properly
comment := store.Comment{
ID: "id-0",
Expand All @@ -1408,7 +1415,7 @@ func TestService_ResubmitStagingImages(t *testing.T) {
mockStore.On("Commit", "dev_user/bqf122eq9r8ad657n3ng").Once().Return(nil)
mockStore.On("Commit", "dev_user/bqf321eq9r8ad657n3ng").Once().Return(nil)
mockStore.On("Commit", "cached_images/12318fbd4c55e9d177b8b5ae197bc89c5afd8e07-a41fcb00643f28d700504256ec81cbf2e1aac53e").Once().Return(nil)
time.Sleep(time.Millisecond * 100)
time.Sleep(b.EditDuration + time.Millisecond * 100)

mockStore.AssertNumberOfCalls(t, "Info", 1)
mockStore.AssertNumberOfCalls(t, "Commit", 3)
Expand Down

0 comments on commit 5f1dc2c

Please sign in to comment.