diff --git a/cmd/crossposting-service/di/inject_adapters.go b/cmd/crossposting-service/di/inject_adapters.go index 106650b..09e5ae7 100644 --- a/cmd/crossposting-service/di/inject_adapters.go +++ b/cmd/crossposting-service/di/inject_adapters.go @@ -7,6 +7,7 @@ import ( "github.com/google/wire" "github.com/planetary-social/nos-crossposting-service/internal/logging" "github.com/planetary-social/nos-crossposting-service/service/adapters" + "github.com/planetary-social/nos-crossposting-service/service/adapters/mocks" "github.com/planetary-social/nos-crossposting-service/service/adapters/prometheus" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" "github.com/planetary-social/nos-crossposting-service/service/adapters/twitter" @@ -73,6 +74,45 @@ var adaptersSet = wire.NewSet( adapters.NewTwitterAccountDetailsCache, wire.Bind(new(app.TwitterAccountDetailsCache), new(*adapters.TwitterAccountDetailsCache)), + + adapters.NewCurrentTimeProvider, + wire.Bind(new(app.CurrentTimeProvider), new(*adapters.CurrentTimeProvider)), +) + +var testAdaptersSet = wire.NewSet( + prometheus.NewPrometheus, + wire.Bind(new(app.Metrics), new(*prometheus.Prometheus)), + + mocks.NewTwitter, + wire.Bind(new(app.Twitter), new(*mocks.Twitter)), + + mocks.NewCurrentTimeProvider, + wire.Bind(new(app.CurrentTimeProvider), new(*mocks.CurrentTimeProvider)), +) + +var mockTxAdaptersSet = wire.NewSet( + mocks.NewTransactionProvider, + wire.Bind(new(app.TransactionProvider), new(*mocks.TransactionProvider)), + + wire.Struct(new(app.Adapters), "*"), + + mocks.NewAccountRepository, + wire.Bind(new(app.AccountRepository), new(*mocks.AccountRepository)), + + mocks.NewSessionRepository, + wire.Bind(new(app.SessionRepository), new(*mocks.SessionRepository)), + + mocks.NewPublicKeyRepository, + wire.Bind(new(app.PublicKeyRepository), new(*mocks.PublicKeyRepository)), + + mocks.NewProcessedEventRepository, + wire.Bind(new(app.ProcessedEventRepository), new(*mocks.ProcessedEventRepository)), + + mocks.NewUserTokensRepository, + wire.Bind(new(app.UserTokensRepository), new(*mocks.UserTokensRepository)), + + mocks.NewPublisher, + wire.Bind(new(app.Publisher), new(*mocks.Publisher)), ) func newAdaptersFactoryFn(deps buildTransactionSqliteAdaptersDependencies) sqlite.AdaptersFactoryFn { diff --git a/cmd/crossposting-service/di/inject_pubsub.go b/cmd/crossposting-service/di/inject_pubsub.go index 0b79e20..191cb74 100644 --- a/cmd/crossposting-service/di/inject_pubsub.go +++ b/cmd/crossposting-service/di/inject_pubsub.go @@ -20,6 +20,7 @@ var sqlitePubsubSet = wire.NewSet( sqlite.NewSubscriber, wire.Bind(new(app.Subscriber), new(*sqlite.Subscriber)), + wire.Bind(new(sqlitepubsubport.SqliteSubscriber), new(*sqlite.Subscriber)), ) var sqliteTxPubsubSet = wire.NewSet( diff --git a/cmd/crossposting-service/di/wire.go b/cmd/crossposting-service/di/wire.go index 1d298ee..92ef713 100644 --- a/cmd/crossposting-service/di/wire.go +++ b/cmd/crossposting-service/di/wire.go @@ -11,6 +11,7 @@ import ( "github.com/google/wire" "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/internal/logging" + "github.com/planetary-social/nos-crossposting-service/service/adapters/mocks" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/config" @@ -49,6 +50,27 @@ func BuildTestAdapters(context.Context, testing.TB) (sqlite.TestedItems, func(), return sqlite.TestedItems{}, nil, nil } +type TestApplication struct { + SendTweetHandler *app.SendTweetHandler + + CurrentTimeProvider *mocks.CurrentTimeProvider + UserTokensRepository *mocks.UserTokensRepository + Twitter *mocks.Twitter +} + +func BuildTestApplication(tb testing.TB) (TestApplication, error) { + wire.Build( + wire.Struct(new(TestApplication), "*"), + + applicationSet, + testAdaptersSet, + mockTxAdaptersSet, + + fixtures.TestLogger, + ) + return TestApplication{}, nil +} + func newTestAdaptersConfig(tb testing.TB) (config.Config, error) { return config.NewConfig( fixtures.SomeString(), diff --git a/cmd/crossposting-service/di/wire_gen.go b/cmd/crossposting-service/di/wire_gen.go index 1cfd665..901149f 100644 --- a/cmd/crossposting-service/di/wire_gen.go +++ b/cmd/crossposting-service/di/wire_gen.go @@ -17,6 +17,7 @@ import ( "github.com/planetary-social/nos-crossposting-service/migrations" "github.com/planetary-social/nos-crossposting-service/service/adapters" "github.com/planetary-social/nos-crossposting-service/service/adapters/memorypubsub" + "github.com/planetary-social/nos-crossposting-service/service/adapters/mocks" "github.com/planetary-social/nos-crossposting-service/service/adapters/prometheus" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" "github.com/planetary-social/nos-crossposting-service/service/adapters/twitter" @@ -97,8 +98,9 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S tweetGenerator := domain.NewTweetGenerator(transformer) processReceivedEventHandler := app.NewProcessReceivedEventHandler(genericTransactionProvider, tweetGenerator, logger, prometheusPrometheus) receivedEventSubscriber := memorypubsub2.NewReceivedEventSubscriber(receivedEventPubSub, processReceivedEventHandler, logger) - sendTweetHandler := app.NewSendTweetHandler(genericTransactionProvider, appTwitter, logger, prometheusPrometheus) - tweetCreatedEventSubscriber := sqlitepubsub.NewTweetCreatedEventSubscriber(sendTweetHandler, subscriber, logger, prometheusPrometheus) + currentTimeProvider := adapters.NewCurrentTimeProvider() + sendTweetHandler := app.NewSendTweetHandler(genericTransactionProvider, appTwitter, currentTimeProvider, logger, prometheusPrometheus) + tweetCreatedEventSubscriber := sqlitepubsub.NewTweetCreatedEventSubscriber(sendTweetHandler, subscriber, logger) metrics := timer.NewMetrics(application, logger) migrationsStorage, err := sqlite.NewMigrationsStorage(db) if err != nil { @@ -166,6 +168,54 @@ func BuildTestAdapters(contextContext context.Context, tb testing.TB) (sqlite.Te }, nil } +func BuildTestApplication(tb testing.TB) (TestApplication, error) { + accountRepository, err := mocks.NewAccountRepository() + if err != nil { + return TestApplication{}, err + } + sessionRepository, err := mocks.NewSessionRepository() + if err != nil { + return TestApplication{}, err + } + publicKeyRepository, err := mocks.NewPublicKeyRepository() + if err != nil { + return TestApplication{}, err + } + processedEventRepository, err := mocks.NewProcessedEventRepository() + if err != nil { + return TestApplication{}, err + } + userTokensRepository, err := mocks.NewUserTokensRepository() + if err != nil { + return TestApplication{}, err + } + publisher := mocks.NewPublisher() + appAdapters := app.Adapters{ + Accounts: accountRepository, + Sessions: sessionRepository, + PublicKeys: publicKeyRepository, + ProcessedEvents: processedEventRepository, + UserTokens: userTokensRepository, + Publisher: publisher, + } + transactionProvider := mocks.NewTransactionProvider(appAdapters) + mocksTwitter := mocks.NewTwitter() + currentTimeProvider := mocks.NewCurrentTimeProvider() + logger := fixtures.TestLogger(tb) + prometheusPrometheus, err := prometheus.NewPrometheus(logger) + if err != nil { + return TestApplication{}, err + } + sendTweetHandler := app.NewSendTweetHandler(transactionProvider, mocksTwitter, currentTimeProvider, logger, prometheusPrometheus) + testApplication := TestApplication{ + SendTweetHandler: sendTweetHandler, + CurrentTimeProvider: currentTimeProvider, + UserTokensRepository: userTokensRepository, + Twitter: mocksTwitter, + } + return testApplication, nil +} + func buildTransactionSqliteAdapters(db *sql.DB, tx *sql.Tx, diBuildTransactionSqliteAdaptersDependencies buildTransactionSqliteAdaptersDependencies) (app.Adapters, error) { accountRepository, err := sqlite.NewAccountRepository(tx) if err != nil { @@ -238,6 +288,14 @@ func buildTestTransactionSqliteAdapters(db *sql.DB, tx *sql.Tx, diBuildTransacti // wire.go: +type TestApplication struct { + SendTweetHandler *app.SendTweetHandler + + CurrentTimeProvider *mocks.CurrentTimeProvider + UserTokensRepository *mocks.UserTokensRepository + Twitter *mocks.Twitter +} + func newTestAdaptersConfig(tb testing.TB) (config.Config, error) { return config.NewConfig(fixtures.SomeString(), fixtures.SomeString(), config.EnvironmentDevelopment, logging.LevelDebug, fixtures.SomeString(), fixtures.SomeString(), fixtures.SomeFile(tb), fixtures.SomeString()) } diff --git a/internal/fixtures/fixtures.go b/internal/fixtures/fixtures.go index e670c6a..2fccfd2 100644 --- a/internal/fixtures/fixtures.go +++ b/internal/fixtures/fixtures.go @@ -8,9 +8,11 @@ import ( "math/rand" "os" "testing" + "time" "github.com/nbd-wtf/go-nostr" "github.com/planetary-social/nos-crossposting-service/internal" + "github.com/planetary-social/nos-crossposting-service/internal/logging" "github.com/planetary-social/nos-crossposting-service/service/domain" "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" "github.com/planetary-social/nos-crossposting-service/service/domain/sessions" @@ -130,6 +132,51 @@ func SomeError() error { return fmt.Errorf("some error: %d", rand.Int()) } +func SomeTwitterUserAccessToken() accounts.TwitterUserAccessToken { + v, err := accounts.NewTwitterUserAccessToken(SomeString()) + if err != nil { + panic(err) + } + return v +} + +func SomeTwitterUserAccessSecret() accounts.TwitterUserAccessSecret { + v, err := accounts.NewTwitterUserAccessSecret(SomeString()) + if err != nil { + panic(err) + } + return v +} + +func SomeEventWithCreatedAt(createdAt time.Time) domain.Event { + _, sk := SomeKeyPair() + + libevent := nostr.Event{ + CreatedAt: nostr.Timestamp(createdAt.Unix()), + Kind: domain.EventKindNote.Int(), + Content: SomeString(), + } + err := libevent.Sign(sk) + if err != nil { + panic(err) + } + + event, err := domain.NewEvent(libevent) + if err != nil { + panic(err) + } + + return event +} + +func SomeEvent() domain.Event { + return SomeEventWithCreatedAt(time.Now()) +} + +func TestLogger(tb testing.TB) logging.Logger { + return logging.NewSystemLogger(logging.NewTestingLoggingSystem(tb), "test") +} + var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func randSeq(n int) string { diff --git a/service/adapters/current_time_provider.go b/service/adapters/current_time_provider.go new file mode 100644 index 0000000..e7d3a6a --- /dev/null +++ b/service/adapters/current_time_provider.go @@ -0,0 +1,14 @@ +package adapters + +import "time" + +type CurrentTimeProvider struct { +} + +func NewCurrentTimeProvider() *CurrentTimeProvider { + return &CurrentTimeProvider{} +} + +func (c CurrentTimeProvider) GetCurrentTime() time.Time { + return time.Now() +} diff --git a/service/adapters/mocks/account_repository.go b/service/adapters/mocks/account_repository.go new file mode 100644 index 0000000..d7a9877 --- /dev/null +++ b/service/adapters/mocks/account_repository.go @@ -0,0 +1,29 @@ +package mocks + +import ( + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" +) + +type AccountRepository struct { +} + +func NewAccountRepository() (*AccountRepository, error) { + return &AccountRepository{}, nil +} + +func (m *AccountRepository) GetByTwitterID(twitterID accounts.TwitterID) (*accounts.Account, error) { + return nil, errors.New("not implemented") +} + +func (m *AccountRepository) GetByAccountID(accountID accounts.AccountID) (*accounts.Account, error) { + return nil, errors.New("not implemented") +} + +func (m *AccountRepository) Save(account *accounts.Account) error { + return errors.New("not implemented") +} + +func (m *AccountRepository) Count() (int, error) { + return 0, errors.New("not implemented") +} diff --git a/service/adapters/mocks/current_time_provider.go b/service/adapters/mocks/current_time_provider.go new file mode 100644 index 0000000..1843313 --- /dev/null +++ b/service/adapters/mocks/current_time_provider.go @@ -0,0 +1,19 @@ +package mocks + +import "time" + +type CurrentTimeProvider struct { + CurrentTime time.Time +} + +func NewCurrentTimeProvider() *CurrentTimeProvider { + return &CurrentTimeProvider{} +} + +func (c *CurrentTimeProvider) GetCurrentTime() time.Time { + return c.CurrentTime +} + +func (c *CurrentTimeProvider) SetCurrentTime(currentTime time.Time) { + c.CurrentTime = currentTime +} diff --git a/service/adapters/mocks/processed_event_repository.go b/service/adapters/mocks/processed_event_repository.go new file mode 100644 index 0000000..b25c3fc --- /dev/null +++ b/service/adapters/mocks/processed_event_repository.go @@ -0,0 +1,22 @@ +package mocks + +import ( + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" +) + +type ProcessedEventRepository struct { +} + +func NewProcessedEventRepository() (*ProcessedEventRepository, error) { + return &ProcessedEventRepository{}, nil +} + +func (m *ProcessedEventRepository) Save(eventID domain.EventId, twitterID accounts.TwitterID) error { + return errors.New("not implemented") +} + +func (m *ProcessedEventRepository) WasProcessed(eventID domain.EventId, twitterID accounts.TwitterID) (bool, error) { + return false, errors.New("not implemented") +} diff --git a/service/adapters/mocks/public_key_repository.go b/service/adapters/mocks/public_key_repository.go new file mode 100644 index 0000000..2862d94 --- /dev/null +++ b/service/adapters/mocks/public_key_repository.go @@ -0,0 +1,38 @@ +package mocks + +import ( + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" +) + +type PublicKeyRepository struct { +} + +func NewPublicKeyRepository() (*PublicKeyRepository, error) { + return &PublicKeyRepository{}, nil +} + +func (m *PublicKeyRepository) Save(linkedPublicKey *domain.LinkedPublicKey) error { + return errors.New("not implemented") +} + +func (m *PublicKeyRepository) Delete(accountID accounts.AccountID, publicKey domain.PublicKey) error { + return errors.New("not implemented") +} + +func (m *PublicKeyRepository) List() ([]*domain.LinkedPublicKey, error) { + return nil, errors.New("not implemented") +} + +func (m *PublicKeyRepository) ListByPublicKey(publicKey domain.PublicKey) ([]*domain.LinkedPublicKey, error) { + return nil, errors.New("not implemented") +} + +func (m *PublicKeyRepository) ListByAccountID(accountID accounts.AccountID) ([]*domain.LinkedPublicKey, error) { + return nil, errors.New("not implemented") +} + +func (m *PublicKeyRepository) Count() (int, error) { + return 0, errors.New("not implemented") +} diff --git a/service/adapters/mocks/publisher.go b/service/adapters/mocks/publisher.go new file mode 100644 index 0000000..a57ad4c --- /dev/null +++ b/service/adapters/mocks/publisher.go @@ -0,0 +1,16 @@ +package mocks + +import ( + "github.com/planetary-social/nos-crossposting-service/service/app" +) + +type Publisher struct { +} + +func NewPublisher() *Publisher { + return &Publisher{} +} + +func (p *Publisher) PublishTweetCreated(event app.TweetCreatedEvent) error { + return nil +} diff --git a/service/adapters/mocks/session_repository.go b/service/adapters/mocks/session_repository.go new file mode 100644 index 0000000..f759a2b --- /dev/null +++ b/service/adapters/mocks/session_repository.go @@ -0,0 +1,25 @@ +package mocks + +import ( + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain/sessions" +) + +type SessionRepository struct { +} + +func NewSessionRepository() (*SessionRepository, error) { + return &SessionRepository{}, nil +} + +func (m *SessionRepository) Get(id sessions.SessionID) (*sessions.Session, error) { + return nil, errors.New("not implemented") +} + +func (m *SessionRepository) Save(session *sessions.Session) error { + return errors.New("not implemented") +} + +func (m *SessionRepository) Delete(id sessions.SessionID) error { + return errors.New("not implemented") +} diff --git a/service/adapters/mocks/transaction_provider.go b/service/adapters/mocks/transaction_provider.go new file mode 100644 index 0000000..e56d8da --- /dev/null +++ b/service/adapters/mocks/transaction_provider.go @@ -0,0 +1,19 @@ +package mocks + +import ( + "context" + + "github.com/planetary-social/nos-crossposting-service/service/app" +) + +type TransactionProvider struct { + adapters app.Adapters +} + +func NewTransactionProvider(adapters app.Adapters) *TransactionProvider { + return &TransactionProvider{adapters: adapters} +} + +func (t *TransactionProvider) Transact(ctx context.Context, f func(context.Context, app.Adapters) error) error { + return f(ctx, t.adapters) +} diff --git a/service/adapters/mocks/twitter.go b/service/adapters/mocks/twitter.go new file mode 100644 index 0000000..609175d --- /dev/null +++ b/service/adapters/mocks/twitter.go @@ -0,0 +1,37 @@ +package mocks + +import ( + "context" + + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/app" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" +) + +type Twitter struct { + PostTweetCalls []PostTweetCall +} + +func NewTwitter() *Twitter { + return &Twitter{} +} + +func (t *Twitter) PostTweet(ctx context.Context, userAccessToken accounts.TwitterUserAccessToken, userAccessSecret accounts.TwitterUserAccessSecret, tweet domain.Tweet) error { + t.PostTweetCalls = append(t.PostTweetCalls, PostTweetCall{ + UserAccessToken: userAccessToken, + UserAccessSecret: userAccessSecret, + Tweet: tweet, + }) + return nil +} + +func (t *Twitter) GetAccountDetails(ctx context.Context, userAccessToken accounts.TwitterUserAccessToken, userAccessSecret accounts.TwitterUserAccessSecret) (app.TwitterAccountDetails, error) { + return app.TwitterAccountDetails{}, errors.New("not implemented") +} + +type PostTweetCall struct { + UserAccessToken accounts.TwitterUserAccessToken + UserAccessSecret accounts.TwitterUserAccessSecret + Tweet domain.Tweet +} diff --git a/service/adapters/mocks/user_tokens_repository.go b/service/adapters/mocks/user_tokens_repository.go new file mode 100644 index 0000000..7180b42 --- /dev/null +++ b/service/adapters/mocks/user_tokens_repository.go @@ -0,0 +1,32 @@ +package mocks + +import ( + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" +) + +type UserTokensRepository struct { + mockedUserTokens map[accounts.AccountID]*accounts.TwitterUserTokens +} + +func NewUserTokensRepository() (*UserTokensRepository, error) { + return &UserTokensRepository{ + mockedUserTokens: make(map[accounts.AccountID]*accounts.TwitterUserTokens), + }, nil +} + +func (m *UserTokensRepository) Save(userTokens *accounts.TwitterUserTokens) error { + return errors.New("not implemented") +} + +func (m *UserTokensRepository) Get(id accounts.AccountID) (*accounts.TwitterUserTokens, error) { + v, ok := m.mockedUserTokens[id] + if !ok { + return nil, errors.New("user tokens not mocked") + } + return v, nil +} + +func (m *UserTokensRepository) MockUserTokens(tokens *accounts.TwitterUserTokens) { + m.mockedUserTokens[tokens.AccountID()] = tokens +} diff --git a/service/adapters/sqlite/publisher.go b/service/adapters/sqlite/publisher.go index 548d2be..41e380e 100644 --- a/service/adapters/sqlite/publisher.go +++ b/service/adapters/sqlite/publisher.go @@ -3,11 +3,11 @@ package sqlite import ( "database/sql" "encoding/json" + "time" "github.com/boreq/errors" "github.com/oklog/ulid/v2" - "github.com/planetary-social/nos-crossposting-service/service/domain" - "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" + "github.com/planetary-social/nos-crossposting-service/service/app" ) const TweetCreatedTopic = "tweet_created" @@ -21,12 +21,14 @@ func NewPublisher(pubsub *PubSub, tx *sql.Tx) *Publisher { return &Publisher{pubsub: pubsub, tx: tx} } -func (p *Publisher) PublishTweetCreated(accountID accounts.AccountID, tweet domain.Tweet) error { +func (p *Publisher) PublishTweetCreated(event app.TweetCreatedEvent) error { transport := TweetCreatedEventTransport{ - AccountID: accountID.String(), + AccountID: event.AccountID().String(), Tweet: TweetTransport{ - Text: tweet.Text(), + Text: event.Tweet().Text(), }, + Event: event.Event().Raw(), + CreatedAt: event.CreatedAt(), } payload, err := json.Marshal(transport) @@ -45,6 +47,8 @@ func (p *Publisher) PublishTweetCreated(accountID accounts.AccountID, tweet doma type TweetCreatedEventTransport struct { AccountID string `json:"accountID"` Tweet TweetTransport `json:"tweet"` + Event []byte `json:"event"` + CreatedAt *time.Time `json:"createdAt"` } type TweetTransport struct { diff --git a/service/adapters/sqlite/publisher_test.go b/service/adapters/sqlite/publisher_test.go index 3728a8a..db16a46 100644 --- a/service/adapters/sqlite/publisher_test.go +++ b/service/adapters/sqlite/publisher_test.go @@ -3,9 +3,11 @@ package sqlite_test import ( "context" "testing" + "time" "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/domain" "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" "github.com/stretchr/testify/require" @@ -16,13 +18,16 @@ func TestPublisher_ItIsPossibleToPublishEvents(t *testing.T) { adapters := NewTestAdapters(ctx, t) accountID := fixtures.SomeAccountID() + tweet := domain.NewTweet("some tweet") twitterID := fixtures.SomeTwitterID() + createdAt := time.Now() + event := fixtures.SomeEvent() + + tweetCreatedEvent := app.NewTweetCreatedEvent(accountID, tweet, createdAt, event) account, err := accounts.NewAccount(accountID, twitterID) require.NoError(t, err) - tweet := domain.NewTweet("some tweet") - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { err = adapters.AccountRepository.Save(account) require.NoError(t, err) @@ -32,7 +37,7 @@ func TestPublisher_ItIsPossibleToPublishEvents(t *testing.T) { require.NoError(t, err) err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { - err = adapters.Publisher.PublishTweetCreated(accountID, tweet) + err = adapters.Publisher.PublishTweetCreated(tweetCreatedEvent) require.NoError(t, err) return nil diff --git a/service/adapters/sqlite/subscriber_test.go b/service/adapters/sqlite/subscriber_test.go index 45cbda8..fc151ae 100644 --- a/service/adapters/sqlite/subscriber_test.go +++ b/service/adapters/sqlite/subscriber_test.go @@ -3,9 +3,11 @@ package sqlite_test import ( "context" "testing" + "time" "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/domain" "github.com/stretchr/testify/require" ) @@ -21,7 +23,10 @@ func TestSubscriber_TweetCreatedAnalysis(t *testing.T) { accountID := fixtures.SomeAccountID() for j := 0; j <= i; j++ { - err := adapters.Publisher.PublishTweetCreated(accountID, domain.NewTweet(fixtures.SomeString())) + tweet := domain.NewTweet(fixtures.SomeString()) + event := app.NewTweetCreatedEvent(accountID, tweet, time.Now(), fixtures.SomeEvent()) + + err := adapters.Publisher.PublishTweetCreated(event) require.NoError(t, err) } } diff --git a/service/app/app.go b/service/app/app.go index 90e50bd..2482261 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -2,6 +2,7 @@ package app import ( "context" + "time" "github.com/boreq/errors" "github.com/planetary-social/nos-crossposting-service/service/domain" @@ -65,7 +66,7 @@ type UserTokensRepository interface { } type Publisher interface { - PublishTweetCreated(accountID accounts.AccountID, tweet domain.Tweet) error + PublishTweetCreated(event TweetCreatedEvent) error } type TweetGenerator interface { @@ -209,3 +210,58 @@ func (t TwitterAccountDetails) Username() string { func (t TwitterAccountDetails) ProfileImageURL() string { return t.profileImageURL } + +type TweetCreatedEvent struct { + accountID accounts.AccountID + tweet domain.Tweet + createdAt *time.Time + event *domain.Event +} + +func NewTweetCreatedEvent( + accountID accounts.AccountID, + tweet domain.Tweet, + createdAt time.Time, + event domain.Event, +) TweetCreatedEvent { + return TweetCreatedEvent{ + accountID: accountID, + tweet: tweet, + createdAt: &createdAt, + event: &event, + } +} + +func NewTweetCreatedEventFromHistory( + accountID accounts.AccountID, + tweet domain.Tweet, + createdAt *time.Time, + event *domain.Event, +) TweetCreatedEvent { + return TweetCreatedEvent{ + accountID: accountID, + tweet: tweet, + createdAt: createdAt, + event: event, + } +} + +func (t TweetCreatedEvent) AccountID() accounts.AccountID { + return t.accountID +} + +func (t TweetCreatedEvent) Tweet() domain.Tweet { + return t.tweet +} + +func (t TweetCreatedEvent) CreatedAt() *time.Time { + return t.createdAt +} + +func (t TweetCreatedEvent) Event() *domain.Event { + return t.event +} + +type CurrentTimeProvider interface { + GetCurrentTime() time.Time +} diff --git a/service/app/handler_process_received_event.go b/service/app/handler_process_received_event.go index 6d5d84b..486a514 100644 --- a/service/app/handler_process_received_event.go +++ b/service/app/handler_process_received_event.go @@ -3,6 +3,7 @@ package app import ( "context" "fmt" + "time" "github.com/boreq/errors" "github.com/planetary-social/nos-crossposting-service/internal/logging" @@ -95,7 +96,8 @@ func (h *ProcessReceivedEventHandler) Handle(ctx context.Context, cmd ProcessRec } for _, tweet := range tweets { - if err := adapters.Publisher.PublishTweetCreated(account.AccountID(), tweet); err != nil { + tweetCreatedEvent := NewTweetCreatedEvent(account.AccountID(), tweet, time.Now(), event) + if err := adapters.Publisher.PublishTweetCreated(tweetCreatedEvent); err != nil { return errors.Wrap(err, "error publishing tweet created event") } } diff --git a/service/app/handler_send_tweet.go b/service/app/handler_send_tweet.go index 84304ae..88c5e99 100644 --- a/service/app/handler_send_tweet.go +++ b/service/app/handler_send_tweet.go @@ -2,6 +2,7 @@ package app import ( "context" + "time" "github.com/boreq/errors" "github.com/planetary-social/nos-crossposting-service/internal/logging" @@ -9,18 +10,44 @@ import ( "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" ) +const ( + dropEventsIfNotPostedFor = 7 * 24 * time.Hour +) + +var ( + whenEventWasAddedToTweetCreatedEvent = time.Date(2023, time.November, 17, 0, 0, 0, 0, time.UTC) +) + type SendTweet struct { accountID accounts.AccountID tweet domain.Tweet + event *domain.Event } -func NewSendTweet(accountID accounts.AccountID, tweet domain.Tweet) SendTweet { - return SendTweet{accountID: accountID, tweet: tweet} +func NewSendTweet(accountID accounts.AccountID, tweet domain.Tweet, event *domain.Event) SendTweet { + return SendTweet{ + accountID: accountID, + tweet: tweet, + event: event, + } +} + +func (s SendTweet) AccountID() accounts.AccountID { + return s.accountID +} + +func (s SendTweet) Tweet() domain.Tweet { + return s.tweet +} + +func (s SendTweet) Event() *domain.Event { + return s.event } type SendTweetHandler struct { transactionProvider TransactionProvider twitter Twitter + currentTimeProvider CurrentTimeProvider logger logging.Logger metrics Metrics } @@ -28,12 +55,14 @@ type SendTweetHandler struct { func NewSendTweetHandler( transactionProvider TransactionProvider, twitter Twitter, + currentTimeProvider CurrentTimeProvider, logger logging.Logger, metrics Metrics, ) *SendTweetHandler { return &SendTweetHandler{ transactionProvider: transactionProvider, twitter: twitter, + currentTimeProvider: currentTimeProvider, logger: logger.New("sendTweetHandler"), metrics: metrics, } @@ -42,11 +71,24 @@ func NewSendTweetHandler( func (h *SendTweetHandler) Handle(ctx context.Context, cmd SendTweet) (err error) { defer h.metrics.StartApplicationCall("sendTweet").End(&err) - h.logger.Debug(). + h.logger. + Debug(). WithField("accountID", cmd.accountID). WithField("tweet", cmd.tweet.Text()). Message("attempting to post a tweet") + if cmd.event != nil { + dropEventIfPostedBefore := h.currentTimeProvider.GetCurrentTime().Add(-dropEventsIfNotPostedFor) + if cmd.event.CreatedAt().Before(dropEventIfPostedBefore) { + return nil + } + } else { + dropEventIfItIsNilAndCurrentTimeIsAfter := whenEventWasAddedToTweetCreatedEvent.Add(dropEventsIfNotPostedFor) + if h.currentTimeProvider.GetCurrentTime().After(dropEventIfItIsNilAndCurrentTimeIsAfter) { + return nil + } + } + var userTokens *accounts.TwitterUserTokens if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { tmp, err := adapters.UserTokens.Get(cmd.accountID) diff --git a/service/app/handler_send_tweet_test.go b/service/app/handler_send_tweet_test.go new file mode 100644 index 0000000..30b7b69 --- /dev/null +++ b/service/app/handler_send_tweet_test.go @@ -0,0 +1,92 @@ +package app_test + +import ( + "testing" + "time" + + "github.com/planetary-social/nos-crossposting-service/cmd/crossposting-service/di" + "github.com/planetary-social/nos-crossposting-service/internal" + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" + "github.com/planetary-social/nos-crossposting-service/service/app" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" + "github.com/stretchr/testify/require" +) + +func TestSendTweetHandler_CorrectlyDropsOldEvents(t *testing.T) { + testCases := []struct { + Name string + + CurrentTime time.Time + Event *domain.Event + + ShouldPostTweet bool + }{ + { + Name: "old_events_are_dropped_after_a_week_since_code_change_passes", + + CurrentTime: date(2023, time.November, 25), + Event: nil, + + ShouldPostTweet: false, + }, + { + Name: "old_events_are_not_dropped_before_a_week_since_code_change_passes", + + CurrentTime: date(2023, time.November, 23), + Event: nil, + + ShouldPostTweet: true, + }, + { + Name: "new_events_are_dropped_after_a_week_since_they_were_created", + + CurrentTime: date(2023, time.November, 28), + Event: internal.Pointer(fixtures.SomeEventWithCreatedAt(date(2023, time.November, 20))), + + ShouldPostTweet: false, + }, + { + Name: "new_events_are_not_dropped_before_a_week_since_they_were_created", + + CurrentTime: date(2023, time.November, 27), + Event: internal.Pointer(fixtures.SomeEventWithCreatedAt(date(2023, time.November, 20))), + + ShouldPostTweet: true, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + ts, err := di.BuildTestApplication(t) + require.NoError(t, err) + + ctx := fixtures.TestContext(t) + + accountId := fixtures.SomeAccountID() + tweet := domain.NewTweet(fixtures.SomeString()) + userTokens := accounts.NewTwitterUserTokens( + accountId, + fixtures.SomeTwitterUserAccessToken(), + fixtures.SomeTwitterUserAccessSecret(), + ) + ts.UserTokensRepository.MockUserTokens(userTokens) + ts.CurrentTimeProvider.SetCurrentTime(testCase.CurrentTime) + + cmd := app.NewSendTweet(accountId, tweet, testCase.Event) + + err = ts.SendTweetHandler.Handle(ctx, cmd) + require.NoError(t, err) + + if testCase.ShouldPostTweet { + require.Len(t, ts.Twitter.PostTweetCalls, 1) + } else { + require.Len(t, ts.Twitter.PostTweetCalls, 0) + } + }) + } +} + +func date(year int, month time.Month, day int) time.Time { + return time.Date(year, month, day, 0, 0, 0, 0, time.UTC) +} diff --git a/service/ports/sqlitepubsub/tweet_created.go b/service/ports/sqlitepubsub/tweet_created.go index 4aac523..0684f00 100644 --- a/service/ports/sqlitepubsub/tweet_created.go +++ b/service/ports/sqlitepubsub/tweet_created.go @@ -16,24 +16,25 @@ type SendTweetHandler interface { Handle(ctx context.Context, cmd app.SendTweet) (err error) } +type SqliteSubscriber interface { + SubscribeToTweetCreated(ctx context.Context) <-chan *sqlite.ReceivedMessage +} + type TweetCreatedEventSubscriber struct { handler SendTweetHandler - subscriber *sqlite.Subscriber + subscriber SqliteSubscriber logger logging.Logger - metrics app.Metrics } func NewTweetCreatedEventSubscriber( handler SendTweetHandler, - subscriber *sqlite.Subscriber, + subscriber SqliteSubscriber, logger logging.Logger, - metrics app.Metrics, ) *TweetCreatedEventSubscriber { return &TweetCreatedEventSubscriber{ handler: handler, subscriber: subscriber, logger: logger.New("tweetCreatedEventSubscriber"), - metrics: metrics, } } func (s *TweetCreatedEventSubscriber) Run(ctx context.Context) error { @@ -65,7 +66,13 @@ func (s *TweetCreatedEventSubscriber) handleMessage(ctx context.Context, msg *sq } tweet := domain.NewTweet(transport.Tweet.Text) - cmd := app.NewSendTweet(accountID, tweet) + + event, err := s.getEvent(transport) + if err != nil { + return errors.Wrap(err, "error getting the event from transport") + } + + cmd := app.NewSendTweet(accountID, tweet, event) if err := s.handler.Handle(ctx, cmd); err != nil { return errors.Wrap(err, "error calling the handler") @@ -73,3 +80,16 @@ func (s *TweetCreatedEventSubscriber) handleMessage(ctx context.Context, msg *sq return nil } + +func (s *TweetCreatedEventSubscriber) getEvent(transport sqlite.TweetCreatedEventTransport) (*domain.Event, error) { + if len(transport.Event) == 0 { + return nil, nil + } + + event, err := domain.NewEventFromRaw(transport.Event) + if err != nil { + return nil, errors.Wrap(err, "error creating a domain event") + } + + return &event, nil +} diff --git a/service/ports/sqlitepubsub/tweet_created_test.go b/service/ports/sqlitepubsub/tweet_created_test.go new file mode 100644 index 0000000..009b365 --- /dev/null +++ b/service/ports/sqlitepubsub/tweet_created_test.go @@ -0,0 +1,133 @@ +package sqlitepubsub + +import ( + "context" + "encoding/base64" + "fmt" + "sync" + "testing" + "time" + + "github.com/planetary-social/nos-crossposting-service/internal" + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" + "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/planetary-social/nos-crossposting-service/service/app" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTweetCreatedEventSubscriber_CanHandleOldAndNewEvents(t *testing.T) { + event := fixtures.SomeEvent() + + testCases := []struct { + Name string + Payload string + ExpectedCommand app.SendTweet + }{ + { + Name: "old", + Payload: `{"accountID": "someAccountID", "tweet": {"text": "someTweetText"}}`, + ExpectedCommand: app.NewSendTweet( + accounts.MustNewAccountID("someAccountID"), + domain.NewTweet("someTweetText"), + nil, + ), + }, + { + Name: "new", + Payload: fmt.Sprintf( + `{"accountID": "someAccountID", "tweet": {"text": "someTweetText"}, "event": "%s", "createdAt": "%s"}`, + base64.StdEncoding.EncodeToString(event.Raw()), + time.Now().Format(time.RFC3339), + ), + ExpectedCommand: app.NewSendTweet( + accounts.MustNewAccountID("someAccountID"), + domain.NewTweet("someTweetText"), + &event, + ), + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + ctx := fixtures.TestContext(t) + h := newSendTweetHandlerMock() + s := newSqliteSubscriberMock() + logger := fixtures.TestLogger(t) + subscriber := NewTweetCreatedEventSubscriber(h, s, logger) + + go func() { + _ = subscriber.Run(ctx) + }() + + message, err := sqlite.NewMessage(fixtures.SomeString(), []byte(testCase.Payload)) + require.NoError(t, err) + + receivedMessage := sqlite.NewReceivedMessage(message) + + err = s.PublishTweetCreated(ctx, receivedMessage) + require.NoError(t, err) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + calls := h.Calls() + if assert.Len(t, calls, 1) { + call := calls[0] + assert.Equal(t, call.AccountID(), testCase.ExpectedCommand.AccountID()) + assert.Equal(t, call.Tweet(), testCase.ExpectedCommand.Tweet()) + if testCase.ExpectedCommand.Event() == nil { + require.Nil(t, call.Event()) + } else { + assert.Equal(t, call.Event().Raw(), testCase.ExpectedCommand.Event().Raw()) + } + } + }, 1*time.Second, 100*time.Millisecond) + }) + } +} + +type sendTweetHandlerMock struct { + calls []app.SendTweet + callsLock sync.Mutex +} + +func newSendTweetHandlerMock() *sendTweetHandlerMock { + return &sendTweetHandlerMock{} +} + +func (s *sendTweetHandlerMock) Handle(ctx context.Context, cmd app.SendTweet) (err error) { + s.callsLock.Lock() + defer s.callsLock.Unlock() + s.calls = append(s.calls, cmd) + return nil +} + +func (s *sendTweetHandlerMock) Calls() []app.SendTweet { + s.callsLock.Lock() + defer s.callsLock.Unlock() + return internal.CopySlice(s.calls) +} + +type sqliteSubscriberMock struct { + ch chan *sqlite.ReceivedMessage +} + +func newSqliteSubscriberMock() *sqliteSubscriberMock { + return &sqliteSubscriberMock{ + ch: make(chan *sqlite.ReceivedMessage), + } +} + +func (s *sqliteSubscriberMock) SubscribeToTweetCreated(ctx context.Context) <-chan *sqlite.ReceivedMessage { + return s.ch +} + +func (s *sqliteSubscriberMock) PublishTweetCreated(ctx context.Context, message *sqlite.ReceivedMessage) error { + select { + case s.ch <- message: + return nil + case <-ctx.Done(): + return ctx.Err() + } +}