Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renew image time to cleanup on comment Preview and Submit #992

Merged
merged 3 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions backend/_example/memory_store/accessor/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ func (m *MemImage) Save(id string, img []byte) error {
return nil
}

// ResetCleanupTimer resets cleanup timer for the image
func (m *MemImage) ResetCleanupTimer(id string) error {
m.Lock()
defer m.Unlock()
if _, ok := m.insertTime[id]; ok {
m.insertTime[id] = time.Now()
return nil
}
return errors.Errorf("image %s not found", id)
}

// Load image by ID
func (m *MemImage) Load(id string) ([]byte, error) {
m.RLock()
Expand Down
10 changes: 10 additions & 0 deletions backend/_example/memory_store/server/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ func (s *RPC) imgSaveWithIDHndl(id uint64, params json.RawMessage) (rr jrpc.Resp
return jrpc.EncodeResponse(id, nil, err)
}

func (s *RPC) imgResetClnTimerHndl(id uint64, params json.RawMessage) (rr jrpc.Response) {
var fileID string
if err := json.Unmarshal(params, &fileID); err != nil {
return jrpc.Response{Error: err.Error()}
}
err := s.img.ResetCleanupTimer(fileID)
return jrpc.EncodeResponse(id, nil, err)

}

func (s *RPC) imgLoadHndl(id uint64, params json.RawMessage) (rr jrpc.Response) {
var fileID string
if err := json.Unmarshal(params, &fileID); err != nil {
Expand Down
16 changes: 15 additions & 1 deletion backend/_example/memory_store/server/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,21 @@ func TestRPC_imgCleanupHndl(t *testing.T) {
assert.Equal(t, 1462, len(img))
assert.Equal(t, gopherPNGBytes(), img)

// cleanup
// wait for image to expire
time.Sleep(time.Millisecond * 50)
// reset the time to cleanup
err = ri.ResetCleanupTimer(id)
assert.NoError(t, err)

// cleanup, should not affect the new image
err = ri.Cleanup(context.TODO(), time.Millisecond*45)
assert.NoError(t, err)

// load after cleanup should succeed
_, err = ri.Load(id)
assert.NoError(t, err, "image is still on staging because it's cleanup timer was reset")

// cleanup with short TTL, should remove the image from staging
err = ri.Cleanup(context.TODO(), time.Nanosecond)
assert.NoError(t, err)

Expand Down
11 changes: 6 additions & 5 deletions backend/_example/memory_store/server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ func (s *RPC) addHandlers() {

// image store handlers
s.Group("image", jrpc.HandlersGroup{
"save_with_id": s.imgSaveWithIDHndl,
"load": s.imgLoadHndl,
"commit": s.imgCommitHndl,
"cleanup": s.imgCleanupHndl,
"info": s.imgInfoHndl,
"save_with_id": s.imgSaveWithIDHndl,
"reset_cleanup_timer": s.imgResetClnTimerHndl,
"load": s.imgLoadHndl,
"commit": s.imgCommitHndl,
"cleanup": s.imgCleanupHndl,
"info": s.imgInfoHndl,
})
}
4 changes: 2 additions & 2 deletions backend/app/rest/api/rest_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ func (s *public) previewCommentCtrl(w http.ResponseWriter, r *http.Request) {

// check if images are valid
for _, id := range s.imageService.ExtractPictures(comment.Text) {
_, err = s.imageService.Load(id)
err = s.imageService.ResetCleanupTimer(id)
if err != nil {
rest.SendErrorJSON(w, r, http.StatusBadRequest, err, "can't load picture from the comment", rest.ErrImgNotFound)
rest.SendErrorJSON(w, r, http.StatusBadRequest, err, "can't renew staged picture cleanup timer", rest.ErrImgNotFound)
return
}
}
Expand Down
23 changes: 20 additions & 3 deletions backend/app/rest/api/rest_public_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestRest_Ping(t *testing.T) {
}

func TestRest_Preview(t *testing.T) {
ts, _, teardown := startupT(t)
ts, srv, teardown := startupT(t)
defer teardown()

resp, err := post(t, ts.URL+"/api/v1/preview", `{"text": "test 123", "locator":{"url": "https://radio-t.com/blah1", "site": "radio-t"}}`)
Expand All @@ -43,6 +43,23 @@ func TestRest_Preview(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, resp.Body.Close())
assert.Equal(t, 400, resp.StatusCode)

resp, err = post(t, ts.URL+"/api/v1/preview", fmt.Sprintf(`{"text": "![non-existent.jpg](%s/api/v1/picture/dev_user/bad_picture)", "locator":{"url": "https://radio-t.com/blah1", "site": "radio-t"}}`, srv.RemarkURL))
assert.NoError(t, err)
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
b, err = ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
assert.NoError(t, resp.Body.Close())
assert.Contains(t,
string(b),
"{\"code\":20,\"details\":\"can't renew staged picture cleanup timer\","+
"\"error\":\"can't get image stats for dev_user/bad_picture: stat",
)
assert.Contains(t,
string(b),
"/pics-remark42/staging/dev_user/62/bad_picture: no such file or directory\"}\n",
)

}

func TestRest_PreviewWithWrongImage(t *testing.T) {
Expand All @@ -57,8 +74,8 @@ func TestRest_PreviewWithWrongImage(t *testing.T) {
assert.NoError(t, resp.Body.Close())
assert.Contains(t,
string(b),
"{\"code\":20,\"details\":\"can't load picture from the comment\","+
"\"error\":\"can't get image file for dev_user/bad_picture: can't get image stats for dev_user/bad_picture: stat ",
"{\"code\":20,\"details\":\"can't renew staged picture cleanup timer\","+
"\"error\":\"can't get image stats for dev_user/bad_picture: stat ",
)
assert.Contains(t,
string(b),
Expand Down
14 changes: 14 additions & 0 deletions backend/app/store/image/bolt_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ func (b *Bolt) Commit(id string) error {
})
}

// ResetCleanupTimer resets cleanup timer for the image
func (b *Bolt) ResetCleanupTimer(id string) error {
return b.db.Update(func(tx *bolt.Tx) error {
tsBuf := &bytes.Buffer{}
if err := binary.Write(tsBuf, binary.LittleEndian, time.Now().UnixNano()); err != nil {
return errors.Wrapf(err, "can't serialize timestamp for %s", id)
}
if err := tx.Bucket([]byte(insertTimeBktName)).Put([]byte(id), tsBuf.Bytes()); err != nil {
return errors.Wrapf(err, "can't put to bucket with %s", id)
}
return nil
})
}

// Load image from DB
func (b *Bolt) Load(id string) ([]byte, error) {
var data []byte
Expand Down
5 changes: 4 additions & 1 deletion backend/app/store/image/bolt_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,13 @@ func TestBoltStore_Cleanup(t *testing.T) {
err = svc.Commit(img3)
require.NoError(t, err)

// reset the time to cleanup
err = svc.ResetCleanupTimer(img2)
require.NoError(t, err)
err = svc.Cleanup(context.Background(), time.Millisecond*100)
assert.NoError(t, err)

assertBoltImgNil(t, svc.db, imagesStagedBktName, img2)
assertBoltImgNotNil(t, svc.db, imagesStagedBktName, img2)
assertBoltImgNil(t, svc.db, imagesBktName, img2)
assertBoltImgNotNil(t, svc.db, imagesBktName, img3)
assert.NoError(t, err)
Expand Down
14 changes: 14 additions & 0 deletions backend/app/store/image/fs_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ func (f *FileSystem) Commit(id string) error {
return errors.Wrapf(err, "failed to commit image %s", id)
}

// ResetCleanupTimer resets cleanup timer for the image
func (f *FileSystem) ResetCleanupTimer(id string) error {
file := f.location(f.Staging, id)
_, err := os.Stat(file)
if err != nil {
return errors.Wrapf(err, "can't get image stats for %s", id)
}
// we don't need to update access time (second arg),
// but reading it is platform-dependent and looks different on darwin and linux,
// so it's easier to update it as well
err = os.Chtimes(file, time.Now(), time.Now())
return errors.Wrapf(err, "problem updating %s modification time", file)
}

// Load image from FS. Uses id to get partition subdirectory.
func (f *FileSystem) Load(id string) ([]byte, error) {

Expand Down
9 changes: 7 additions & 2 deletions backend/app/store/image/fs_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,19 @@ func TestFsStore_Cleanup(t *testing.T) {
_, err = os.Stat(img3)
assert.NoError(t, err, "file on staging")

time.Sleep(200 * time.Millisecond) // make all images expired
time.Sleep(200 * time.Millisecond) // make all images expired
err = svc.ResetCleanupTimer("user2/blah_ff3.png") // reset the time to cleanup for third image
assert.NoError(t, err)
err = svc.Cleanup(context.Background(), time.Millisecond*300)
assert.NoError(t, err)

_, err = os.Stat(img2)
assert.Error(t, err, "no file on staging anymore")
_, err = os.Stat(img3)
assert.Error(t, err, "no file on staging anymore")
assert.NoError(t, err, "third image is still on staging because it's cleanup timer was reset")

err = svc.ResetCleanupTimer("unknown_image.png")
assert.Error(t, err)
}

func TestFsStore_Info(t *testing.T) {
Expand Down
16 changes: 14 additions & 2 deletions backend/app/store/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Store interface {
Save(id string, img []byte) error // store image with passed id to staging
Load(id string) ([]byte, error) // load image by ID

ResetCleanupTimer(id string) error // resets cleanup timer for the image, called on comment preview
Commit(id string) error // move image from staging to permanent
Cleanup(ctx context.Context, ttl time.Duration) error // run removal loop for old images on staging
}
Expand Down Expand Up @@ -133,6 +134,12 @@ func (s *Service) Submit(idsFn func() []string) {

atomic.AddInt32(&s.submitCount, 1)

// reset cleanup timer before submitting the images
// to prevent them from being cleaned up while waiting for EditDuration to expire
for _, imgID := range idsFn() {
_ = s.store.ResetCleanupTimer(imgID)
}

s.submitCh <- submitReq{idsFn: idsFn, TS: time.Now()}
}

Expand Down Expand Up @@ -174,9 +181,9 @@ func (s *Service) ExtractPictures(commentHTML string) (ids []string) {
return ids
}

// Cleanup runs periodic cleanup with 2.5*ServiceParams.EditDuration. Blocking loop, should be called inside of goroutine by consumer
// Cleanup runs periodic cleanup with 1.5*ServiceParams.EditDuration. Blocking loop, should be called inside of goroutine by consumer
func (s *Service) Cleanup(ctx context.Context) {
cleanupTTL := s.EditDuration * 25 / 10 // cleanup images older than 2.5 * EditDuration
cleanupTTL := s.EditDuration * 15 / 10 // cleanup images older than 1.5 * EditDuration
log.Printf("[INFO] start pictures cleanup, staging ttl=%v", cleanupTTL)

for {
Expand All @@ -192,6 +199,11 @@ func (s *Service) Cleanup(ctx context.Context) {
}
}

// ResetCleanupTimer resets cleanup timer for the image
func (s *Service) ResetCleanupTimer(id string) error {
return s.store.ResetCleanupTimer(id)
}

// Info returns meta information about storage
func (s *Service) Info() (StoreInfo, error) {
return s.store.Info()
Expand Down
14 changes: 14 additions & 0 deletions backend/app/store/image/image_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion backend/app/store/image/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestService_Cleanup(t *testing.T) {

svc := NewService(&store, ServiceParams{EditDuration: 20 * time.Millisecond})
// cancel context after 2.1 cleanup TTLs
ctx, cancel := context.WithTimeout(context.Background(), svc.EditDuration / 100 * 25 * 21)
ctx, cancel := context.WithTimeout(context.Background(), svc.EditDuration / 100 * 15 * 21)
defer cancel()
svc.Cleanup(ctx)
store.AssertNumberOfCalls(t, "Cleanup", 2)
Expand All @@ -131,11 +131,14 @@ func TestService_Cleanup(t *testing.T) {
func TestService_Submit(t *testing.T) {
store := MockStore{}
store.On("Commit", mock.Anything, mock.Anything).Times(7).Return(nil)
store.On("ResetCleanupTimer", mock.Anything, mock.Anything).Times(7).Return(nil)
svc := NewService(&store, ServiceParams{ImageAPI: "/blah/", EditDuration: time.Millisecond * 100})
svc.Submit(func() []string { return []string{"id1", "id2", "id3"} })
store.AssertNumberOfCalls(t, "ResetCleanupTimer", 3)
err := svc.SubmitAndCommit(func() []string { return []string{"id4", "id5"} })
assert.NoError(t, err)
svc.Submit(func() []string { return []string{"id6", "id7"} })
store.AssertNumberOfCalls(t, "ResetCleanupTimer", 5)
svc.Submit(nil)
store.AssertNumberOfCalls(t, "Commit", 2)
time.Sleep(time.Millisecond * 175)
Expand All @@ -146,22 +149,26 @@ func TestService_Submit(t *testing.T) {
func TestService_Close(t *testing.T) {
store := MockStore{}
store.On("Commit", mock.Anything, mock.Anything).Times(5).Return(nil)
store.On("ResetCleanupTimer", mock.Anything, mock.Anything).Times(5).Return(nil)
svc := Service{store: &store, ServiceParams: ServiceParams{ImageAPI: "/blah/", EditDuration: time.Hour * 24}}
svc.Submit(func() []string { return []string{"id1", "id2", "id3"} })
svc.Submit(func() []string { return []string{"id4", "id5"} })
svc.Submit(nil)
store.AssertNumberOfCalls(t, "ResetCleanupTimer", 5)
svc.Close(context.TODO())
store.AssertNumberOfCalls(t, "Commit", 5)
}

func TestService_SubmitDelay(t *testing.T) {
store := MockStore{}
store.On("Commit", mock.Anything, mock.Anything).Times(5).Return(nil)
store.On("ResetCleanupTimer", mock.Anything, mock.Anything).Times(5).Return(nil)
svc := NewService(&store, ServiceParams{EditDuration: 20 * time.Millisecond})
svc.Submit(func() []string { return []string{"id1", "id2", "id3"} })
time.Sleep(150 * time.Millisecond) // let first batch to pass TTL
svc.Submit(func() []string { return []string{"id4", "id5"} })
svc.Submit(nil)
store.AssertNumberOfCalls(t, "ResetCleanupTimer", 5)
store.AssertNumberOfCalls(t, "Commit", 3)
svc.Close(context.TODO())
store.AssertNumberOfCalls(t, "Commit", 5)
Expand Down
6 changes: 6 additions & 0 deletions backend/app/store/image/remote_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ func (r *RPC) Save(id string, img []byte) error {
return err
}

// ResetCleanupTimer resets cleanup timer for the image
func (r *RPC) ResetCleanupTimer(id string) error {
_, err := r.Call("image.reset_cleanup_timer", id)
return err
}

// Load image with given id
func (r *RPC) Load(id string) ([]byte, error) {
resp, err := r.Call("image.load", id)
Expand Down
12 changes: 12 additions & 0 deletions backend/app/store/image/remote_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ func TestRemote_Commit(t *testing.T) {
assert.NoError(t, err)
}

func TestRemote_ResetCleanupTimer(t *testing.T) {
ts := testServer(t, `{"method":"image.reset_cleanup_timer","params":"gopher_id","id":1}`, `{"id":1}`)
defer ts.Close()
c := RPC{Client: jrpc.Client{API: ts.URL, Client: http.Client{}}}

var a Store = &c
_ = a

err := c.ResetCleanupTimer("gopher_id")
assert.NoError(t, err)
}

func TestRemote_Cleanup(t *testing.T) {
ts := testServer(t, `{"method":"image.cleanup","params":60000000000,"id":1}`, `{"id":1}`)
defer ts.Close()
Expand Down
7 changes: 7 additions & 0 deletions backend/app/store/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,8 @@ func TestService_submitImages(t *testing.T) {
mockStore := image.MockStore{}
mockStore.On("Commit", "dev/pic1.png").Once().Return(nil)
mockStore.On("Commit", "dev/pic2.png").Once().Return(nil)
mockStore.On("ResetCleanupTimer", "dev/pic1.png").Once().Return(nil)
mockStore.On("ResetCleanupTimer", "dev/pic2.png").Once().Return(nil)
imgSvc := image.NewService(&mockStore,
image.ServiceParams{
EditDuration: 50 * time.Millisecond,
Expand All @@ -1367,6 +1369,7 @@ func TestService_submitImages(t *testing.T) {
assert.NoError(t, err)

b.submitImages(c)
mockStore.AssertNumberOfCalls(t, "ResetCleanupTimer", 2)
time.Sleep(b.EditDuration + 100 * time.Millisecond)
mockStore.AssertNumberOfCalls(t, "Commit", 2)
}
Expand All @@ -1385,6 +1388,10 @@ func TestService_ResubmitStagingImages(t *testing.T) {
defer teardown()
b := DataStore{Engine: eng, EditDuration: 10 * time.Millisecond, ImageService: imgSvc}

mockStore.On("ResetCleanupTimer", "dev_user/bqf122eq9r8ad657n3ng").Once().Return(nil)
mockStore.On("ResetCleanupTimer", "dev_user/bqf321eq9r8ad657n3ng").Once().Return(nil)
mockStore.On("ResetCleanupTimer", "cached_images/12318fbd4c55e9d177b8b5ae197bc89c5afd8e07-a41fcb00643f28d700504256ec81cbf2e1aac53e").Once().Return(nil)

// create comment with three images without preparing it properly
comment := store.Comment{
ID: "id-0",
Expand Down