Skip to content

Commit

Permalink
[feature] Add postDataCallbackFunc to allow cleanup (#408)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsmethurst committed Feb 22, 2022
1 parent 15d1e6b commit 55b83be
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 24 deletions.
4 changes: 2 additions & 2 deletions internal/federation/dereferencing/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm
}

avatar := true
newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, targetAccount.ID, &media.AdditionalMediaInfo{
newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, nil, targetAccount.ID, &media.AdditionalMediaInfo{
RemoteURL: &targetAccount.AvatarRemoteURL,
Avatar: &avatar,
})
Expand Down Expand Up @@ -343,7 +343,7 @@ func (d *deref) fetchRemoteAccountMedia(ctx context.Context, targetAccount *gtsm
}

header := true
newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, targetAccount.ID, &media.AdditionalMediaInfo{
newProcessing, err := d.mediaManager.ProcessMedia(ctx, data, nil, targetAccount.ID, &media.AdditionalMediaInfo{
RemoteURL: &targetAccount.HeaderRemoteURL,
Header: &header,
})
Expand Down
2 changes: 1 addition & 1 deletion internal/federation/dereferencing/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (d *deref) GetRemoteMedia(ctx context.Context, requestingUsername string, a
return t.DereferenceMedia(innerCtx, derefURI)
}

processingMedia, err := d.mediaManager.ProcessMedia(ctx, dataFunc, accountID, ai)
processingMedia, err := d.mediaManager.ProcessMedia(ctx, dataFunc, nil, accountID, ai)
if err != nil {
return nil, fmt.Errorf("GetRemoteMedia: error processing attachment: %s", err)
}
Expand Down
35 changes: 27 additions & 8 deletions internal/media/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,35 @@ import (
// Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs.
type Manager interface {
// ProcessMedia begins the process of decoding and storing the given data as an attachment.
// It will return a pointer to a Media struct upon which further actions can be performed, such as getting
// It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return raw bytes of a piece of media.
// data should be a function that the media manager can call to return a reader containing the media data.
//
// postData will be called after data has been called; it can be used to clean up any remaining resources.
// The provided function can be nil, in which case it will not be executed.
//
// accountID should be the account that the media belongs to.
//
// ai is optional and can be nil. Any additional information about the attachment provided will be put in the database.
ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error)
ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
// ProcessEmoji begins the process of decoding and storing the given data as an emoji.
// It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
// data should be a function that the media manager can call to return a reader containing the emoji data.
//
// postData will be called after data has been called; it can be used to clean up any remaining resources.
// The provided function can be nil, in which case it will not be executed.
//
// shortcode should be the emoji shortcode without the ':'s around it.
//
// id is the database ID that should be used to store the emoji.
//
// uri is the ActivityPub URI/ID of the emoji.
//
// ai is optional and can be nil. Any additional information about the emoji provided will be put in the database.
ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error)
// NumWorkers returns the total number of workers available to this manager.
NumWorkers() int
// QueueSize returns the total capacity of the queue.
Expand Down Expand Up @@ -101,8 +120,8 @@ func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) {
return m, nil
}

func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
processingMedia, err := m.preProcessMedia(ctx, data, accountID, ai)
func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
processingMedia, err := m.preProcessMedia(ctx, data, postData, accountID, ai)
if err != nil {
return nil, err
}
Expand All @@ -125,8 +144,8 @@ func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, accountID str
return processingMedia, nil
}

func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) {
processingEmoji, err := m.preProcessEmoji(ctx, data, shortcode, id, uri, ai)
func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) {
processingEmoji, err := m.preProcessEmoji(ctx, data, postData, shortcode, id, uri, ai)
if err != nil {
return nil, err
}
Expand Down
91 changes: 87 additions & 4 deletions internal/media/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlocking() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"

// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
Expand Down Expand Up @@ -111,6 +111,89 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlocking() {
suite.Equal(processedThumbnailBytesExpected, processedThumbnailBytes)
}

func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithCallback() {
ctx := context.Background()

data := func(_ context.Context) (io.Reader, int, error) {
// load bytes from a test image
b, err := os.ReadFile("./test/test-jpeg.jpg")
if err != nil {
panic(err)
}
return bytes.NewBuffer(b), len(b), nil
}

// test the callback function by setting a simple boolean
var calledPostData bool
postData := func(_ context.Context) error {
calledPostData = true
return nil
}
suite.False(calledPostData) // not called yet (obvs)

accountID := "01FS1X72SK9ZPW0J1QQ68BD264"

// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, postData, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()

// do a blocking call to fetch the attachment
attachment, err := processingMedia.LoadAttachment(ctx)
suite.NoError(err)
suite.NotNil(attachment)

// the post data callback should have been called
suite.True(calledPostData)

// make sure it's got the stuff set on it that we expect
// the attachment ID and accountID we expect
suite.Equal(attachmentID, attachment.ID)
suite.Equal(accountID, attachment.AccountID)

// file meta should be correctly derived from the image
suite.EqualValues(gtsmodel.Original{
Width: 1920, Height: 1080, Size: 2073600, Aspect: 1.7777777777777777,
}, attachment.FileMeta.Original)
suite.EqualValues(gtsmodel.Small{
Width: 512, Height: 288, Size: 147456, Aspect: 1.7777777777777777,
}, attachment.FileMeta.Small)
suite.Equal("image/jpeg", attachment.File.ContentType)
suite.Equal("image/jpeg", attachment.Thumbnail.ContentType)
suite.Equal(269739, attachment.File.FileSize)
suite.Equal("LjBzUo#6RQR._NvzRjWF?urqV@a$", attachment.Blurhash)

// now make sure the attachment is in the database
dbAttachment, err := suite.db.GetAttachmentByID(ctx, attachmentID)
suite.NoError(err)
suite.NotNil(dbAttachment)

// make sure the processed file is in storage
processedFullBytes, err := suite.storage.Get(attachment.File.Path)
suite.NoError(err)
suite.NotEmpty(processedFullBytes)

// load the processed bytes from our test folder, to compare
processedFullBytesExpected, err := os.ReadFile("./test/test-jpeg-processed.jpg")
suite.NoError(err)
suite.NotEmpty(processedFullBytesExpected)

// the bytes in storage should be what we expected
suite.Equal(processedFullBytesExpected, processedFullBytes)

// now do the same for the thumbnail and make sure it's what we expected
processedThumbnailBytes, err := suite.storage.Get(attachment.Thumbnail.Path)
suite.NoError(err)
suite.NotEmpty(processedThumbnailBytes)

processedThumbnailBytesExpected, err := os.ReadFile("./test/test-jpeg-thumbnail.jpg")
suite.NoError(err)
suite.NotEmpty(processedThumbnailBytesExpected)

suite.Equal(processedThumbnailBytesExpected, processedThumbnailBytes)
}

func (suite *ManagerTestSuite) TestSimpleJpegProcessAsync() {
ctx := context.Background()

Expand All @@ -126,7 +209,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessAsync() {
accountID := "01FS1X72SK9ZPW0J1QQ68BD264"

// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
Expand Down Expand Up @@ -210,7 +293,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegQueueSpamming() {
inProcess := []*media.ProcessingMedia{}
for i := 0; i < spam; i++ {
// process the media with no additional info provided
processingMedia, err := suite.manager.ProcessMedia(ctx, data, accountID, nil)
processingMedia, err := suite.manager.ProcessMedia(ctx, data, nil, accountID, nil)
suite.NoError(err)
inProcess = append(inProcess, processingMedia)
}
Expand Down Expand Up @@ -305,7 +388,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() {
suite.manager = diskManager

// process the media with no additional info provided
processingMedia, err := diskManager.ProcessMedia(ctx, data, accountID, nil)
processingMedia, err := diskManager.ProcessMedia(ctx, data, nil, accountID, nil)
suite.NoError(err)
// fetch the attachment id from the processing media
attachmentID := processingMedia.AttachmentID()
Expand Down
15 changes: 11 additions & 4 deletions internal/media/processingemoji.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ type ProcessingEmoji struct {
emoji will be updated incrementally as media goes through processing
*/

emoji *gtsmodel.Emoji
data DataFunc
read bool // bool indicating that data function has been triggered already
emoji *gtsmodel.Emoji
data DataFunc
postData PostDataCallbackFunc
read bool // bool indicating that data function has been triggered already

/*
below fields represent the processing state of the static of the emoji
Expand Down Expand Up @@ -212,10 +213,15 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
}

p.read = true

if p.postData != nil {
return p.postData(ctx)
}

return nil
}

func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) {
func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) {
instanceAccount, err := m.db.GetInstanceAccount(ctx, "")
if err != nil {
return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err)
Expand Down Expand Up @@ -281,6 +287,7 @@ func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, shortcode
instanceAccountID: instanceAccount.ID,
emoji: emoji,
data: data,
postData: postData,
staticState: int32(received),
database: m.db,
storage: m.storage,
Expand Down
9 changes: 8 additions & 1 deletion internal/media/processingmedia.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ProcessingMedia struct {

attachment *gtsmodel.MediaAttachment
data DataFunc
postData PostDataCallbackFunc
read bool // bool indicating that data function has been triggered already

thumbState int32 // the processing state of the media thumbnail
Expand Down Expand Up @@ -313,10 +314,15 @@ func (p *ProcessingMedia) store(ctx context.Context) error {
}

p.read = true

if p.postData != nil {
return p.postData(ctx)
}

return nil
}

func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
id, err := id.NewRandomULID()
if err != nil {
return nil, err
Expand Down Expand Up @@ -403,6 +409,7 @@ func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, accountID
processingMedia := &ProcessingMedia{
attachment: attachment,
data: data,
postData: postData,
thumbState: int32(received),
fullSizeState: int32(received),
database: m.db,
Expand Down
6 changes: 6 additions & 0 deletions internal/media/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,9 @@ type AdditionalEmojiInfo struct {

// DataFunc represents a function used to retrieve the raw bytes of a piece of media.
type DataFunc func(ctx context.Context) (reader io.Reader, fileSize int, err error)

// PostDataCallbackFunc represents a function executed after the DataFunc has been executed,
// and the returned reader has been read. It can be used to clean up any remaining resources.
//
// This can be set to nil, and will then not be executed.
type PostDataCallbackFunc func(ctx context.Context) error
4 changes: 2 additions & 2 deletions internal/processing/account/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (p *processor) UpdateAvatar(ctx context.Context, avatar *multipart.FileHead
Avatar: &isAvatar,
}

processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, accountID, ai)
processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, nil, accountID, ai)
if err != nil {
return nil, fmt.Errorf("UpdateAvatar: error processing avatar: %s", err)
}
Expand All @@ -177,7 +177,7 @@ func (p *processor) UpdateHeader(ctx context.Context, header *multipart.FileHead
Header: &isHeader,
}

processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, accountID, ai)
processingMedia, err := p.mediaManager.ProcessMedia(ctx, dataFunc, nil, accountID, ai)
if err != nil {
return nil, fmt.Errorf("UpdateHeader: error processing header: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/processing/admin/emoji.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (p *processor) EmojiCreate(ctx context.Context, account *gtsmodel.Account,

emojiURI := uris.GenerateURIForEmoji(emojiID)

processingEmoji, err := p.mediaManager.ProcessEmoji(ctx, data, form.Shortcode, emojiID, emojiURI, nil)
processingEmoji, err := p.mediaManager.ProcessEmoji(ctx, data, nil, form.Shortcode, emojiID, emojiURI, nil)
if err != nil {
return nil, gtserror.NewErrorInternalError(fmt.Errorf("error processing emoji: %s", err), "error processing emoji")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/processing/media/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (p *processor) Create(ctx context.Context, account *gtsmodel.Account, form
}

// process the media attachment and load it immediately
media, err := p.mediaManager.ProcessMedia(ctx, data, account.ID, &media.AdditionalMediaInfo{
media, err := p.mediaManager.ProcessMedia(ctx, data, nil, account.ID, &media.AdditionalMediaInfo{
Description: &form.Description,
FocusX: &focusX,
FocusY: &focusY,
Expand Down

0 comments on commit 55b83be

Please sign in to comment.