From 22933dba0056d072eb4319ca82cac94dd069daab Mon Sep 17 00:00:00 2001 From: Dmitry Verkhoturov Date: Sat, 8 May 2021 00:38:58 +0200 Subject: [PATCH 1/3] add method to renew image cleanup timer --- backend/_example/memory_store/accessor/image.go | 11 +++++++++++ backend/_example/memory_store/server/image.go | 10 ++++++++++ .../_example/memory_store/server/image_test.go | 16 +++++++++++++++- backend/_example/memory_store/server/rpc.go | 11 ++++++----- backend/app/store/image/bolt_store.go | 14 ++++++++++++++ backend/app/store/image/bolt_store_test.go | 5 ++++- backend/app/store/image/fs_store.go | 14 ++++++++++++++ backend/app/store/image/fs_store_test.go | 9 +++++++-- backend/app/store/image/image.go | 1 + backend/app/store/image/image_mock.go | 14 ++++++++++++++ backend/app/store/image/remote_store.go | 6 ++++++ backend/app/store/image/remote_store_test.go | 12 ++++++++++++ 12 files changed, 114 insertions(+), 9 deletions(-) diff --git a/backend/_example/memory_store/accessor/image.go b/backend/_example/memory_store/accessor/image.go index 2a922766b3..86b5a65c31 100644 --- a/backend/_example/memory_store/accessor/image.go +++ b/backend/_example/memory_store/accessor/image.go @@ -45,6 +45,17 @@ func (m *MemImage) Save(id string, img []byte) error { return nil } +// ResetCleanupTimer resets cleanup timer for the image +func (m *MemImage) ResetCleanupTimer(id string) error { + m.Lock() + defer m.Unlock() + if _, ok := m.insertTime[id]; ok { + m.insertTime[id] = time.Now() + return nil + } + return errors.Errorf("image %s not found", id) +} + // Load image by ID func (m *MemImage) Load(id string) ([]byte, error) { m.RLock() diff --git a/backend/_example/memory_store/server/image.go b/backend/_example/memory_store/server/image.go index ab90bd9d7e..f5e0f420bb 100644 --- a/backend/_example/memory_store/server/image.go +++ b/backend/_example/memory_store/server/image.go @@ -28,6 +28,16 @@ func (s *RPC) imgSaveWithIDHndl(id uint64, params json.RawMessage) (rr jrpc.Resp return jrpc.EncodeResponse(id, nil, err) } +func (s *RPC) imgResetClnTimerHndl(id uint64, params json.RawMessage) (rr jrpc.Response) { + var fileID string + if err := json.Unmarshal(params, &fileID); err != nil { + return jrpc.Response{Error: err.Error()} + } + err := s.img.ResetCleanupTimer(fileID) + return jrpc.EncodeResponse(id, nil, err) + +} + func (s *RPC) imgLoadHndl(id uint64, params json.RawMessage) (rr jrpc.Response) { var fileID string if err := json.Unmarshal(params, &fileID); err != nil { diff --git a/backend/_example/memory_store/server/image_test.go b/backend/_example/memory_store/server/image_test.go index 96446098ed..c27561d349 100644 --- a/backend/_example/memory_store/server/image_test.go +++ b/backend/_example/memory_store/server/image_test.go @@ -116,7 +116,21 @@ func TestRPC_imgCleanupHndl(t *testing.T) { assert.Equal(t, 1462, len(img)) assert.Equal(t, gopherPNGBytes(), img) - // cleanup + // wait for image to expire + time.Sleep(time.Millisecond * 50) + // reset the time to cleanup + err = ri.ResetCleanupTimer(id) + assert.NoError(t, err) + + // cleanup, should not affect the new image + err = ri.Cleanup(context.TODO(), time.Millisecond*45) + assert.NoError(t, err) + + // load after cleanup should succeed + _, err = ri.Load(id) + assert.NoError(t, err, "image is still on staging because it's cleanup timer was reset") + + // cleanup with short TTL, should remove the image from staging err = ri.Cleanup(context.TODO(), time.Nanosecond) assert.NoError(t, err) diff --git a/backend/_example/memory_store/server/rpc.go b/backend/_example/memory_store/server/rpc.go index 1b7dc55cdd..411c446b0c 100644 --- a/backend/_example/memory_store/server/rpc.go +++ b/backend/_example/memory_store/server/rpc.go @@ -57,10 +57,11 @@ func (s *RPC) addHandlers() { // image store handlers s.Group("image", jrpc.HandlersGroup{ - "save_with_id": s.imgSaveWithIDHndl, - "load": s.imgLoadHndl, - "commit": s.imgCommitHndl, - "cleanup": s.imgCleanupHndl, - "info": s.imgInfoHndl, + "save_with_id": s.imgSaveWithIDHndl, + "reset_cleanup_timer": s.imgResetClnTimerHndl, + "load": s.imgLoadHndl, + "commit": s.imgCommitHndl, + "cleanup": s.imgCleanupHndl, + "info": s.imgInfoHndl, }) } diff --git a/backend/app/store/image/bolt_store.go b/backend/app/store/image/bolt_store.go index 106d218c3b..f1ab2e9fee 100644 --- a/backend/app/store/image/bolt_store.go +++ b/backend/app/store/image/bolt_store.go @@ -81,6 +81,20 @@ func (b *Bolt) Commit(id string) error { }) } +// ResetCleanupTimer resets cleanup timer for the image +func (b *Bolt) ResetCleanupTimer(id string) error { + return b.db.Update(func(tx *bolt.Tx) error { + tsBuf := &bytes.Buffer{} + if err := binary.Write(tsBuf, binary.LittleEndian, time.Now().UnixNano()); err != nil { + return errors.Wrapf(err, "can't serialize timestamp for %s", id) + } + if err := tx.Bucket([]byte(insertTimeBktName)).Put([]byte(id), tsBuf.Bytes()); err != nil { + return errors.Wrapf(err, "can't put to bucket with %s", id) + } + return nil + }) +} + // Load image from DB func (b *Bolt) Load(id string) ([]byte, error) { var data []byte diff --git a/backend/app/store/image/bolt_store_test.go b/backend/app/store/image/bolt_store_test.go index aaed22fd40..8ba81060c5 100644 --- a/backend/app/store/image/bolt_store_test.go +++ b/backend/app/store/image/bolt_store_test.go @@ -95,10 +95,13 @@ func TestBoltStore_Cleanup(t *testing.T) { err = svc.Commit(img3) require.NoError(t, err) + // reset the time to cleanup + err = svc.ResetCleanupTimer(img2) + require.NoError(t, err) err = svc.Cleanup(context.Background(), time.Millisecond*100) assert.NoError(t, err) - assertBoltImgNil(t, svc.db, imagesStagedBktName, img2) + assertBoltImgNotNil(t, svc.db, imagesStagedBktName, img2) assertBoltImgNil(t, svc.db, imagesBktName, img2) assertBoltImgNotNil(t, svc.db, imagesBktName, img3) assert.NoError(t, err) diff --git a/backend/app/store/image/fs_store.go b/backend/app/store/image/fs_store.go index 919a8de08b..0f7d560817 100644 --- a/backend/app/store/image/fs_store.go +++ b/backend/app/store/image/fs_store.go @@ -62,6 +62,20 @@ func (f *FileSystem) Commit(id string) error { return errors.Wrapf(err, "failed to commit image %s", id) } +// ResetCleanupTimer resets cleanup timer for the image +func (f *FileSystem) ResetCleanupTimer(id string) error { + file := f.location(f.Staging, id) + _, err := os.Stat(file) + if err != nil { + return errors.Wrapf(err, "can't get image stats for %s", id) + } + // we don't need to update access time (second arg), + // but reading it is platform-dependent and looks different on darwin and linux, + // so it's easier to update it as well + err = os.Chtimes(file, time.Now(), time.Now()) + return errors.Wrapf(err, "problem updating %s modification time", file) +} + // Load image from FS. Uses id to get partition subdirectory. func (f *FileSystem) Load(id string) ([]byte, error) { diff --git a/backend/app/store/image/fs_store_test.go b/backend/app/store/image/fs_store_test.go index 9ac57a44b1..84f1a566f3 100644 --- a/backend/app/store/image/fs_store_test.go +++ b/backend/app/store/image/fs_store_test.go @@ -221,14 +221,19 @@ func TestFsStore_Cleanup(t *testing.T) { _, err = os.Stat(img3) assert.NoError(t, err, "file on staging") - time.Sleep(200 * time.Millisecond) // make all images expired + time.Sleep(200 * time.Millisecond) // make all images expired + err = svc.ResetCleanupTimer("user2/blah_ff3.png") // reset the time to cleanup for third image + assert.NoError(t, err) err = svc.Cleanup(context.Background(), time.Millisecond*300) assert.NoError(t, err) _, err = os.Stat(img2) assert.Error(t, err, "no file on staging anymore") _, err = os.Stat(img3) - assert.Error(t, err, "no file on staging anymore") + assert.NoError(t, err, "third image is still on staging because it's cleanup timer was reset") + + err = svc.ResetCleanupTimer("unknown_image.png") + assert.Error(t, err) } func TestFsStore_Info(t *testing.T) { diff --git a/backend/app/store/image/image.go b/backend/app/store/image/image.go index 96d2c04691..52decdce0c 100644 --- a/backend/app/store/image/image.go +++ b/backend/app/store/image/image.go @@ -75,6 +75,7 @@ type Store interface { Save(id string, img []byte) error // store image with passed id to staging Load(id string) ([]byte, error) // load image by ID + ResetCleanupTimer(id string) error // resets cleanup timer for the image, called on comment preview Commit(id string) error // move image from staging to permanent Cleanup(ctx context.Context, ttl time.Duration) error // run removal loop for old images on staging } diff --git a/backend/app/store/image/image_mock.go b/backend/app/store/image/image_mock.go index d41444e6af..fc7e6e54b1 100644 --- a/backend/app/store/image/image_mock.go +++ b/backend/app/store/image/image_mock.go @@ -86,6 +86,20 @@ func (_m *MockStore) Load(id string) ([]byte, error) { return r0, r1 } +// ResetCleanupTimer provides a mock function with given fields: id +func (_m *MockStore) ResetCleanupTimer(id string) error { + ret := _m.Called(id) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Save provides a mock function with given fields: id, img func (_m *MockStore) Save(id string, img []byte) error { ret := _m.Called(id, img) diff --git a/backend/app/store/image/remote_store.go b/backend/app/store/image/remote_store.go index 86a9a1bdbc..d033ee7e33 100644 --- a/backend/app/store/image/remote_store.go +++ b/backend/app/store/image/remote_store.go @@ -22,6 +22,12 @@ func (r *RPC) Save(id string, img []byte) error { return err } +// ResetCleanupTimer resets cleanup timer for the image +func (r *RPC) ResetCleanupTimer(id string) error { + _, err := r.Call("image.reset_cleanup_timer", id) + return err +} + // Load image with given id func (r *RPC) Load(id string) ([]byte, error) { resp, err := r.Call("image.load", id) diff --git a/backend/app/store/image/remote_store_test.go b/backend/app/store/image/remote_store_test.go index 77185b11c7..d4e0badca7 100644 --- a/backend/app/store/image/remote_store_test.go +++ b/backend/app/store/image/remote_store_test.go @@ -53,6 +53,18 @@ func TestRemote_Commit(t *testing.T) { assert.NoError(t, err) } +func TestRemote_ResetCleanupTimer(t *testing.T) { + ts := testServer(t, `{"method":"image.reset_cleanup_timer","params":"gopher_id","id":1}`, `{"id":1}`) + defer ts.Close() + c := RPC{Client: jrpc.Client{API: ts.URL, Client: http.Client{}}} + + var a Store = &c + _ = a + + err := c.ResetCleanupTimer("gopher_id") + assert.NoError(t, err) +} + func TestRemote_Cleanup(t *testing.T) { ts := testServer(t, `{"method":"image.cleanup","params":60000000000,"id":1}`, `{"id":1}`) defer ts.Close() From 15f21c8d90f4a721c41bebd0155b2be2cd5dce8a Mon Sep 17 00:00:00 2001 From: Dmitry Verkhoturov Date: Sat, 8 May 2021 02:42:32 +0200 Subject: [PATCH 2/3] reset image cleanup timer on comment preview --- backend/app/rest/api/rest_public.go | 4 ++-- backend/app/rest/api/rest_public_test.go | 23 ++++++++++++++++++++--- backend/app/store/image/image.go | 5 +++++ 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/backend/app/rest/api/rest_public.go b/backend/app/rest/api/rest_public.go index e1dabc896b..16ca252702 100644 --- a/backend/app/rest/api/rest_public.go +++ b/backend/app/rest/api/rest_public.go @@ -135,9 +135,9 @@ func (s *public) previewCommentCtrl(w http.ResponseWriter, r *http.Request) { // check if images are valid for _, id := range s.imageService.ExtractPictures(comment.Text) { - _, err = s.imageService.Load(id) + err = s.imageService.ResetCleanupTimer(id) if err != nil { - rest.SendErrorJSON(w, r, http.StatusBadRequest, err, "can't load picture from the comment", rest.ErrImgNotFound) + rest.SendErrorJSON(w, r, http.StatusBadRequest, err, "can't renew staged picture cleanup timer", rest.ErrImgNotFound) return } } diff --git a/backend/app/rest/api/rest_public_test.go b/backend/app/rest/api/rest_public_test.go index 39326c1205..0e17575c17 100644 --- a/backend/app/rest/api/rest_public_test.go +++ b/backend/app/rest/api/rest_public_test.go @@ -28,7 +28,7 @@ func TestRest_Ping(t *testing.T) { } func TestRest_Preview(t *testing.T) { - ts, _, teardown := startupT(t) + ts, srv, teardown := startupT(t) defer teardown() resp, err := post(t, ts.URL+"/api/v1/preview", `{"text": "test 123", "locator":{"url": "https://radio-t.com/blah1", "site": "radio-t"}}`) @@ -43,6 +43,23 @@ func TestRest_Preview(t *testing.T) { assert.NoError(t, err) assert.NoError(t, resp.Body.Close()) assert.Equal(t, 400, resp.StatusCode) + + resp, err = post(t, ts.URL+"/api/v1/preview", fmt.Sprintf(`{"text": "![non-existent.jpg](%s/api/v1/picture/dev_user/bad_picture)", "locator":{"url": "https://radio-t.com/blah1", "site": "radio-t"}}`, srv.RemarkURL)) + assert.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + b, err = ioutil.ReadAll(resp.Body) + assert.NoError(t, err) + assert.NoError(t, resp.Body.Close()) + assert.Contains(t, + string(b), + "{\"code\":20,\"details\":\"can't renew staged picture cleanup timer\","+ + "\"error\":\"can't get image stats for dev_user/bad_picture: stat", + ) + assert.Contains(t, + string(b), + "/pics-remark42/staging/dev_user/62/bad_picture: no such file or directory\"}\n", + ) + } func TestRest_PreviewWithWrongImage(t *testing.T) { @@ -57,8 +74,8 @@ func TestRest_PreviewWithWrongImage(t *testing.T) { assert.NoError(t, resp.Body.Close()) assert.Contains(t, string(b), - "{\"code\":20,\"details\":\"can't load picture from the comment\","+ - "\"error\":\"can't get image file for dev_user/bad_picture: can't get image stats for dev_user/bad_picture: stat ", + "{\"code\":20,\"details\":\"can't renew staged picture cleanup timer\","+ + "\"error\":\"can't get image stats for dev_user/bad_picture: stat ", ) assert.Contains(t, string(b), diff --git a/backend/app/store/image/image.go b/backend/app/store/image/image.go index 52decdce0c..9632fdde4f 100644 --- a/backend/app/store/image/image.go +++ b/backend/app/store/image/image.go @@ -193,6 +193,11 @@ func (s *Service) Cleanup(ctx context.Context) { } } +// ResetCleanupTimer resets cleanup timer for the image +func (s *Service) ResetCleanupTimer(id string) error { + return s.store.ResetCleanupTimer(id) +} + // Info returns meta information about storage func (s *Service) Info() (StoreInfo, error) { return s.store.Info() From 32110799e0bbe7f332fc2737dee84a682062082e Mon Sep 17 00:00:00 2001 From: Dmitry Verkhoturov Date: Mon, 17 May 2021 01:35:28 +0200 Subject: [PATCH 3/3] reset image cleanup TTL on Submit Also: - make commitTTL equal to EditDuration, so that image is committed to permanent storage after comment can no longer be edited - move cleanupTTL to Cleanup function, as it's not used elsewhere in the code - add variables to some tests sleeps, so that instead of being magic numbers they would rely on timers of structures they suppose to wait for --- backend/app/store/image/image.go | 10 ++++++++-- backend/app/store/image/image_test.go | 9 ++++++++- backend/app/store/service/service_test.go | 7 +++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/backend/app/store/image/image.go b/backend/app/store/image/image.go index 9632fdde4f..b7e10669ac 100644 --- a/backend/app/store/image/image.go +++ b/backend/app/store/image/image.go @@ -134,6 +134,12 @@ func (s *Service) Submit(idsFn func() []string) { atomic.AddInt32(&s.submitCount, 1) + // reset cleanup timer before submitting the images + // to prevent them from being cleaned up while waiting for EditDuration to expire + for _, imgID := range idsFn() { + _ = s.store.ResetCleanupTimer(imgID) + } + s.submitCh <- submitReq{idsFn: idsFn, TS: time.Now()} } @@ -175,9 +181,9 @@ func (s *Service) ExtractPictures(commentHTML string) (ids []string) { return ids } -// Cleanup runs periodic cleanup with 2.5*ServiceParams.EditDuration. Blocking loop, should be called inside of goroutine by consumer +// Cleanup runs periodic cleanup with 1.5*ServiceParams.EditDuration. Blocking loop, should be called inside of goroutine by consumer func (s *Service) Cleanup(ctx context.Context) { - cleanupTTL := s.EditDuration * 25 / 10 // cleanup images older than 2.5 * EditDuration + cleanupTTL := s.EditDuration * 15 / 10 // cleanup images older than 1.5 * EditDuration log.Printf("[INFO] start pictures cleanup, staging ttl=%v", cleanupTTL) for { diff --git a/backend/app/store/image/image_test.go b/backend/app/store/image/image_test.go index 229819f4c7..781248f9d4 100644 --- a/backend/app/store/image/image_test.go +++ b/backend/app/store/image/image_test.go @@ -122,7 +122,7 @@ func TestService_Cleanup(t *testing.T) { svc := NewService(&store, ServiceParams{EditDuration: 20 * time.Millisecond}) // cancel context after 2.1 cleanup TTLs - ctx, cancel := context.WithTimeout(context.Background(), svc.EditDuration / 100 * 25 * 21) + ctx, cancel := context.WithTimeout(context.Background(), svc.EditDuration / 100 * 15 * 21) defer cancel() svc.Cleanup(ctx) store.AssertNumberOfCalls(t, "Cleanup", 2) @@ -131,11 +131,14 @@ func TestService_Cleanup(t *testing.T) { func TestService_Submit(t *testing.T) { store := MockStore{} store.On("Commit", mock.Anything, mock.Anything).Times(7).Return(nil) + store.On("ResetCleanupTimer", mock.Anything, mock.Anything).Times(7).Return(nil) svc := NewService(&store, ServiceParams{ImageAPI: "/blah/", EditDuration: time.Millisecond * 100}) svc.Submit(func() []string { return []string{"id1", "id2", "id3"} }) + store.AssertNumberOfCalls(t, "ResetCleanupTimer", 3) err := svc.SubmitAndCommit(func() []string { return []string{"id4", "id5"} }) assert.NoError(t, err) svc.Submit(func() []string { return []string{"id6", "id7"} }) + store.AssertNumberOfCalls(t, "ResetCleanupTimer", 5) svc.Submit(nil) store.AssertNumberOfCalls(t, "Commit", 2) time.Sleep(time.Millisecond * 175) @@ -146,10 +149,12 @@ func TestService_Submit(t *testing.T) { func TestService_Close(t *testing.T) { store := MockStore{} store.On("Commit", mock.Anything, mock.Anything).Times(5).Return(nil) + store.On("ResetCleanupTimer", mock.Anything, mock.Anything).Times(5).Return(nil) svc := Service{store: &store, ServiceParams: ServiceParams{ImageAPI: "/blah/", EditDuration: time.Hour * 24}} svc.Submit(func() []string { return []string{"id1", "id2", "id3"} }) svc.Submit(func() []string { return []string{"id4", "id5"} }) svc.Submit(nil) + store.AssertNumberOfCalls(t, "ResetCleanupTimer", 5) svc.Close(context.TODO()) store.AssertNumberOfCalls(t, "Commit", 5) } @@ -157,11 +162,13 @@ func TestService_Close(t *testing.T) { func TestService_SubmitDelay(t *testing.T) { store := MockStore{} store.On("Commit", mock.Anything, mock.Anything).Times(5).Return(nil) + store.On("ResetCleanupTimer", mock.Anything, mock.Anything).Times(5).Return(nil) svc := NewService(&store, ServiceParams{EditDuration: 20 * time.Millisecond}) svc.Submit(func() []string { return []string{"id1", "id2", "id3"} }) time.Sleep(150 * time.Millisecond) // let first batch to pass TTL svc.Submit(func() []string { return []string{"id4", "id5"} }) svc.Submit(nil) + store.AssertNumberOfCalls(t, "ResetCleanupTimer", 5) store.AssertNumberOfCalls(t, "Commit", 3) svc.Close(context.TODO()) store.AssertNumberOfCalls(t, "Commit", 5) diff --git a/backend/app/store/service/service_test.go b/backend/app/store/service/service_test.go index e9e32384ad..fd45ce73ed 100644 --- a/backend/app/store/service/service_test.go +++ b/backend/app/store/service/service_test.go @@ -1342,6 +1342,8 @@ func TestService_submitImages(t *testing.T) { mockStore := image.MockStore{} mockStore.On("Commit", "dev/pic1.png").Once().Return(nil) mockStore.On("Commit", "dev/pic2.png").Once().Return(nil) + mockStore.On("ResetCleanupTimer", "dev/pic1.png").Once().Return(nil) + mockStore.On("ResetCleanupTimer", "dev/pic2.png").Once().Return(nil) imgSvc := image.NewService(&mockStore, image.ServiceParams{ EditDuration: 50 * time.Millisecond, @@ -1367,6 +1369,7 @@ func TestService_submitImages(t *testing.T) { assert.NoError(t, err) b.submitImages(c) + mockStore.AssertNumberOfCalls(t, "ResetCleanupTimer", 2) time.Sleep(b.EditDuration + 100 * time.Millisecond) mockStore.AssertNumberOfCalls(t, "Commit", 2) } @@ -1385,6 +1388,10 @@ func TestService_ResubmitStagingImages(t *testing.T) { defer teardown() b := DataStore{Engine: eng, EditDuration: 10 * time.Millisecond, ImageService: imgSvc} + mockStore.On("ResetCleanupTimer", "dev_user/bqf122eq9r8ad657n3ng").Once().Return(nil) + mockStore.On("ResetCleanupTimer", "dev_user/bqf321eq9r8ad657n3ng").Once().Return(nil) + mockStore.On("ResetCleanupTimer", "cached_images/12318fbd4c55e9d177b8b5ae197bc89c5afd8e07-a41fcb00643f28d700504256ec81cbf2e1aac53e").Once().Return(nil) + // create comment with three images without preparing it properly comment := store.Comment{ ID: "id-0",