Skip to content

Commit

Permalink
replace async client API / federator msg processing with worker pools (
Browse files Browse the repository at this point in the history
…#497)

* replace async client API / federator msg processing with worker pools
* appease our lord-and-saviour, the linter
  • Loading branch information
NyaaaWhatsUpDoc committed Apr 28, 2022
1 parent cc5f2e9 commit 420e2fb
Show file tree
Hide file tree
Showing 64 changed files with 573 additions and 336 deletions.
15 changes: 12 additions & 3 deletions cmd/gotosocial/action/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation/federatingdb"
"github.com/superseriousbusiness/gotosocial/internal/gotosocial"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/oidc"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/router"
"github.com/superseriousbusiness/gotosocial/internal/transport"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/web"
"github.com/superseriousbusiness/gotosocial/internal/worker"
)

// Start creates and starts a gotosocial server
Expand All @@ -87,7 +89,14 @@ var Start action.GTSAction = func(ctx context.Context) error {
return fmt.Errorf("error creating instance instance: %s", err)
}

federatingDB := federatingdb.New(dbService)
// Create the client API and federator worker pools
// NOTE: these MUST NOT be used until they are passed to the
// processor and it is started. The reason being that the processor
// sets the Worker process functions and start the underlying pools
clientWorker := worker.New[messages.FromClientAPI](-1, -1)
fedWorker := worker.New[messages.FromFederator](-1, -1)

federatingDB := federatingdb.New(dbService, fedWorker)

router, err := router.New(ctx, dbService)
if err != nil {
Expand Down Expand Up @@ -138,8 +147,8 @@ var Start action.GTSAction = func(ctx context.Context) error {
}

// create and start the message processor using the other services we've created so far
processor := processing.NewProcessor(typeConverter, federator, oauthServer, mediaManager, storage, dbService, emailSender)
if err := processor.Start(ctx); err != nil {
processor := processing.NewProcessor(typeConverter, federator, oauthServer, mediaManager, storage, dbService, emailSender, clientWorker, fedWorker)
if err := processor.Start(); err != nil {
return fmt.Errorf("error starting processor: %s", err)
}

Expand Down
14 changes: 10 additions & 4 deletions cmd/gotosocial/action/testrig/testrig.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/api/s2s/webfinger"
"github.com/superseriousbusiness/gotosocial/internal/api/security"
"github.com/superseriousbusiness/gotosocial/internal/gotosocial"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oidc"
"github.com/superseriousbusiness/gotosocial/internal/web"
"github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)

Expand All @@ -71,6 +73,10 @@ var Start action.GTSAction = func(ctx context.Context) error {
storageBackend := testrig.NewTestStorage()
testrig.StandardStorageSetup(storageBackend, "./testrig/media")

// Create client API and federator worker pools
clientWorker := worker.New[messages.FromClientAPI](-1, -1)
fedWorker := worker.New[messages.FromFederator](-1, -1)

// build backend handlers
oauthServer := testrig.NewTestOauthServer(dbService)
transportController := testrig.NewTestTransportController(testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) {
Expand All @@ -79,14 +85,14 @@ var Start action.GTSAction = func(ctx context.Context) error {
StatusCode: 200,
Body: r,
}, nil
}), dbService)
}), dbService, fedWorker)
mediaManager := testrig.NewTestMediaManager(dbService, storageBackend)
federator := testrig.NewTestFederator(dbService, transportController, storageBackend, mediaManager)
federator := testrig.NewTestFederator(dbService, transportController, storageBackend, mediaManager, fedWorker)

emailSender := testrig.NewEmailSender("./web/template/", nil)

processor := testrig.NewTestProcessor(dbService, storageBackend, federator, emailSender, mediaManager)
if err := processor.Start(ctx); err != nil {
processor := testrig.NewTestProcessor(dbService, storageBackend, federator, emailSender, mediaManager, clientWorker, fedWorker)
if err := processor.Start(); err != nil {
return fmt.Errorf("error starting processor: %s", err)
}

Expand Down
9 changes: 7 additions & 2 deletions internal/api/client/account/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)

Expand Down Expand Up @@ -60,13 +62,16 @@ func (suite *AccountStandardTestSuite) SetupTest() {
testrig.InitTestConfig()
testrig.InitTestLog()

fedWorker := worker.New[messages.FromFederator](-1, -1)
clientWorker := worker.New[messages.FromClientAPI](-1, -1)

suite.db = testrig.NewTestDB()
suite.storage = testrig.NewTestStorage()
suite.mediaManager = testrig.NewTestMediaManager(suite.db, suite.storage)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db), suite.storage, suite.mediaManager)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker), suite.storage, suite.mediaManager, fedWorker)
suite.sentEmails = make(map[string]string)
suite.emailSender = testrig.NewEmailSender("../../../../web/template/", suite.sentEmails)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager, clientWorker, fedWorker)
suite.accountModule = account.New(suite.processor).(*account.Module)
testrig.StandardDBSetup(suite.db, nil)
testrig.StandardStorageSetup(suite.storage, "../../../../testrig/media")
Expand Down
9 changes: 7 additions & 2 deletions internal/api/client/admin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)

Expand Down Expand Up @@ -78,13 +80,16 @@ func (suite *AdminStandardTestSuite) SetupTest() {
testrig.InitTestConfig()
testrig.InitTestLog()

fedWorker := worker.New[messages.FromFederator](-1, -1)
clientWorker := worker.New[messages.FromClientAPI](-1, -1)

suite.db = testrig.NewTestDB()
suite.storage = testrig.NewTestStorage()
suite.mediaManager = testrig.NewTestMediaManager(suite.db, suite.storage)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db), suite.storage, suite.mediaManager)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker), suite.storage, suite.mediaManager, fedWorker)
suite.sentEmails = make(map[string]string)
suite.emailSender = testrig.NewEmailSender("../../../../web/template/", suite.sentEmails)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager, clientWorker, fedWorker)
suite.adminModule = admin.New(suite.processor).(*admin.Module)
testrig.StandardDBSetup(suite.db, nil)
testrig.StandardStorageSetup(suite.storage, "../../../../testrig/media")
Expand Down
10 changes: 8 additions & 2 deletions internal/api/client/fileserver/servefile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)

Expand Down Expand Up @@ -74,12 +76,16 @@ func (suite *ServeFileTestSuite) SetupSuite() {
// setup standard items
testrig.InitTestConfig()
testrig.InitTestLog()

fedWorker := worker.New[messages.FromFederator](-1, -1)
clientWorker := worker.New[messages.FromClientAPI](-1, -1)

suite.db = testrig.NewTestDB()
suite.storage = testrig.NewTestStorage()
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db), suite.storage, testrig.NewTestMediaManager(suite.db, suite.storage))
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker), suite.storage, testrig.NewTestMediaManager(suite.db, suite.storage), fedWorker)
suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil)

suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, testrig.NewTestMediaManager(suite.db, suite.storage))
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, testrig.NewTestMediaManager(suite.db, suite.storage), clientWorker, fedWorker)
suite.tc = testrig.NewTestTypeConverter(suite.db)
suite.mediaManager = testrig.NewTestMediaManager(suite.db, suite.storage)
suite.oauthServer = testrig.NewTestOauthServer(suite.db)
Expand Down
10 changes: 8 additions & 2 deletions internal/api/client/followrequest/followrequest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)

Expand Down Expand Up @@ -74,12 +76,16 @@ func (suite *FollowRequestStandardTestSuite) SetupSuite() {
func (suite *FollowRequestStandardTestSuite) SetupTest() {
testrig.InitTestConfig()
testrig.InitTestLog()

fedWorker := worker.New[messages.FromFederator](-1, -1)
clientWorker := worker.New[messages.FromClientAPI](-1, -1)

suite.db = testrig.NewTestDB()
suite.storage = testrig.NewTestStorage()
suite.mediaManager = testrig.NewTestMediaManager(suite.db, suite.storage)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db), suite.storage, suite.mediaManager)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker), suite.storage, suite.mediaManager, fedWorker)
suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager, clientWorker, fedWorker)
suite.followRequestModule = followrequest.New(suite.processor).(*followrequest.Module)
testrig.StandardDBSetup(suite.db, nil)
testrig.StandardStorageSetup(suite.storage, "../../../../testrig/media")
Expand Down
10 changes: 8 additions & 2 deletions internal/api/client/media/mediacreate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)

Expand Down Expand Up @@ -81,14 +83,18 @@ func (suite *MediaCreateTestSuite) SetupSuite() {
// setup standard items
testrig.InitTestConfig()
testrig.InitTestLog()

fedWorker := worker.New[messages.FromFederator](-1, -1)
clientWorker := worker.New[messages.FromClientAPI](-1, -1)

suite.db = testrig.NewTestDB()
suite.storage = testrig.NewTestStorage()
suite.tc = testrig.NewTestTypeConverter(suite.db)
suite.mediaManager = testrig.NewTestMediaManager(suite.db, suite.storage)
suite.oauthServer = testrig.NewTestOauthServer(suite.db)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db), suite.storage, suite.mediaManager)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker), suite.storage, suite.mediaManager, fedWorker)
suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager, clientWorker, fedWorker)

// setup module being tested
suite.mediaModule = mediamodule.New(suite.processor).(*mediamodule.Module)
Expand Down
10 changes: 8 additions & 2 deletions internal/api/client/media/mediaupdate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)

Expand Down Expand Up @@ -79,14 +81,18 @@ func (suite *MediaUpdateTestSuite) SetupSuite() {
// setup standard items
testrig.InitTestConfig()
testrig.InitTestLog()

fedWorker := worker.New[messages.FromFederator](-1, -1)
clientWorker := worker.New[messages.FromClientAPI](-1, -1)

suite.db = testrig.NewTestDB()
suite.storage = testrig.NewTestStorage()
suite.tc = testrig.NewTestTypeConverter(suite.db)
suite.mediaManager = testrig.NewTestMediaManager(suite.db, suite.storage)
suite.oauthServer = testrig.NewTestOauthServer(suite.db)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db), suite.storage, suite.mediaManager)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker), suite.storage, suite.mediaManager, fedWorker)
suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager, clientWorker, fedWorker)

// setup module being tested
suite.mediaModule = mediamodule.New(suite.processor).(*mediamodule.Module)
Expand Down
11 changes: 8 additions & 3 deletions internal/api/client/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)

Expand Down Expand Up @@ -79,13 +81,17 @@ func (suite *StatusStandardTestSuite) SetupSuite() {
func (suite *StatusStandardTestSuite) SetupTest() {
testrig.InitTestConfig()
testrig.InitTestLog()

fedWorker := worker.New[messages.FromFederator](-1, -1)
clientWorker := worker.New[messages.FromClientAPI](-1, -1)

suite.db = testrig.NewTestDB()
suite.tc = testrig.NewTestTypeConverter(suite.db)
suite.storage = testrig.NewTestStorage()
suite.mediaManager = testrig.NewTestMediaManager(suite.db, suite.storage)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(suite.testHttpClient(), suite.db), suite.storage, suite.mediaManager)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(suite.testHttpClient(), suite.db, fedWorker), suite.storage, suite.mediaManager, fedWorker)
suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager, clientWorker, fedWorker)
suite.statusModule = status.New(suite.processor).(*status.Module)
testrig.StandardDBSetup(suite.db, nil)
testrig.StandardStorageSetup(suite.storage, "../../../../testrig/media")
Expand All @@ -104,7 +110,6 @@ func (suite *StatusStandardTestSuite) testHttpClient() pub.HttpClient {
fmt.Println(remoteAccountWebfingerURI)

httpClient := testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) {

// respond correctly to a webfinger lookup
if req.URL.String() == remoteAccountWebfingerURI {
responseJson := fmt.Sprintf(`
Expand Down
8 changes: 6 additions & 2 deletions internal/api/client/user/user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/processing"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)

Expand Down Expand Up @@ -56,6 +58,8 @@ type UserStandardTestSuite struct {
func (suite *UserStandardTestSuite) SetupTest() {
testrig.InitTestLog()
testrig.InitTestConfig()
fedWorker := worker.New[messages.FromFederator](-1, -1)
clientWorker := worker.New[messages.FromClientAPI](-1, -1)
suite.testTokens = testrig.NewTestTokens()
suite.testClients = testrig.NewTestClients()
suite.testApplications = testrig.NewTestApplications()
Expand All @@ -65,10 +69,10 @@ func (suite *UserStandardTestSuite) SetupTest() {
suite.storage = testrig.NewTestStorage()
suite.tc = testrig.NewTestTypeConverter(suite.db)
suite.mediaManager = testrig.NewTestMediaManager(suite.db, suite.storage)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db), suite.storage, suite.mediaManager)
suite.federator = testrig.NewTestFederator(suite.db, testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker), suite.storage, suite.mediaManager, fedWorker)
suite.sentEmails = make(map[string]string)
suite.emailSender = testrig.NewEmailSender("../../../../web/template/", suite.sentEmails)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager)
suite.processor = testrig.NewTestProcessor(suite.db, suite.storage, suite.federator, suite.emailSender, suite.mediaManager, clientWorker, fedWorker)
suite.userModule = user.New(suite.processor).(*user.Module)
testrig.StandardDBSetup(suite.db, suite.testAccounts)
testrig.StandardStorageSetup(suite.storage, "../../../../testrig/media")
Expand Down

0 comments on commit 420e2fb

Please sign in to comment.