From dc221efcc1a15df6f1add23445a62ad8afc67be4 Mon Sep 17 00:00:00 2001 From: boreq Date: Fri, 17 Nov 2023 08:54:14 +0100 Subject: [PATCH] Drop old events after some time The matter of the fact is that some events are stuck in the queue and we just can't post tweets for them. As discussed earlier this may be because someone revoked permissions for our app or somehow ran out of their personal API limit - the exact reason is unclear and doesn't really matter. This commit removes events which are stuck in the queue after some time. The reasoning for this is twofold. Firstly the likelihood of actually successfuly posting a tweet which is stuck is very low. As far as I can tell they are just stuck. Secondly posting tweets for notes which are old can be confusing and I'd personally be surprised if I suddently had tweets for notes that are weeks old posted in my profile. This rollout has to happen in two stages. First we start emitting tweet created events containing new data (nostr events). After a week passes we drop all tweet created events that don't have nostr events in them and change the code to no longer support tweet created events with missing nostr events. --- .../di/inject_adapters.go | 40 ++++++ cmd/crossposting-service/di/inject_pubsub.go | 1 + cmd/crossposting-service/di/wire.go | 22 +++ cmd/crossposting-service/di/wire_gen.go | 62 +++++++- internal/fixtures/fixtures.go | 47 +++++++ service/adapters/current_time_provider.go | 14 ++ service/adapters/mocks/account_repository.go | 29 ++++ .../adapters/mocks/current_time_provider.go | 19 +++ .../mocks/processed_event_repository.go | 22 +++ .../adapters/mocks/public_key_repository.go | 38 +++++ service/adapters/mocks/publisher.go | 16 +++ service/adapters/mocks/session_repository.go | 25 ++++ .../adapters/mocks/transaction_provider.go | 19 +++ service/adapters/mocks/twitter.go | 37 +++++ .../adapters/mocks/user_tokens_repository.go | 32 +++++ service/adapters/sqlite/publisher.go | 14 +- service/adapters/sqlite/publisher_test.go | 11 +- service/adapters/sqlite/subscriber_test.go | 7 +- service/app/app.go | 58 +++++++- service/app/handler_process_received_event.go | 4 +- service/app/handler_send_tweet.go | 48 ++++++- service/app/handler_send_tweet_test.go | 92 ++++++++++++ service/ports/sqlitepubsub/tweet_created.go | 32 ++++- .../ports/sqlitepubsub/tweet_created_test.go | 133 ++++++++++++++++++ 24 files changed, 800 insertions(+), 22 deletions(-) create mode 100644 service/adapters/current_time_provider.go create mode 100644 service/adapters/mocks/account_repository.go create mode 100644 service/adapters/mocks/current_time_provider.go create mode 100644 service/adapters/mocks/processed_event_repository.go create mode 100644 service/adapters/mocks/public_key_repository.go create mode 100644 service/adapters/mocks/publisher.go create mode 100644 service/adapters/mocks/session_repository.go create mode 100644 service/adapters/mocks/transaction_provider.go create mode 100644 service/adapters/mocks/twitter.go create mode 100644 service/adapters/mocks/user_tokens_repository.go create mode 100644 service/app/handler_send_tweet_test.go create mode 100644 service/ports/sqlitepubsub/tweet_created_test.go diff --git a/cmd/crossposting-service/di/inject_adapters.go b/cmd/crossposting-service/di/inject_adapters.go index 106650b..09e5ae7 100644 --- a/cmd/crossposting-service/di/inject_adapters.go +++ b/cmd/crossposting-service/di/inject_adapters.go @@ -7,6 +7,7 @@ import ( "github.com/google/wire" "github.com/planetary-social/nos-crossposting-service/internal/logging" "github.com/planetary-social/nos-crossposting-service/service/adapters" + "github.com/planetary-social/nos-crossposting-service/service/adapters/mocks" "github.com/planetary-social/nos-crossposting-service/service/adapters/prometheus" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" "github.com/planetary-social/nos-crossposting-service/service/adapters/twitter" @@ -73,6 +74,45 @@ var adaptersSet = wire.NewSet( adapters.NewTwitterAccountDetailsCache, wire.Bind(new(app.TwitterAccountDetailsCache), new(*adapters.TwitterAccountDetailsCache)), + + adapters.NewCurrentTimeProvider, + wire.Bind(new(app.CurrentTimeProvider), new(*adapters.CurrentTimeProvider)), +) + +var testAdaptersSet = wire.NewSet( + prometheus.NewPrometheus, + wire.Bind(new(app.Metrics), new(*prometheus.Prometheus)), + + mocks.NewTwitter, + wire.Bind(new(app.Twitter), new(*mocks.Twitter)), + + mocks.NewCurrentTimeProvider, + wire.Bind(new(app.CurrentTimeProvider), new(*mocks.CurrentTimeProvider)), +) + +var mockTxAdaptersSet = wire.NewSet( + mocks.NewTransactionProvider, + wire.Bind(new(app.TransactionProvider), new(*mocks.TransactionProvider)), + + wire.Struct(new(app.Adapters), "*"), + + mocks.NewAccountRepository, + wire.Bind(new(app.AccountRepository), new(*mocks.AccountRepository)), + + mocks.NewSessionRepository, + wire.Bind(new(app.SessionRepository), new(*mocks.SessionRepository)), + + mocks.NewPublicKeyRepository, + wire.Bind(new(app.PublicKeyRepository), new(*mocks.PublicKeyRepository)), + + mocks.NewProcessedEventRepository, + wire.Bind(new(app.ProcessedEventRepository), new(*mocks.ProcessedEventRepository)), + + mocks.NewUserTokensRepository, + wire.Bind(new(app.UserTokensRepository), new(*mocks.UserTokensRepository)), + + mocks.NewPublisher, + wire.Bind(new(app.Publisher), new(*mocks.Publisher)), ) func newAdaptersFactoryFn(deps buildTransactionSqliteAdaptersDependencies) sqlite.AdaptersFactoryFn { diff --git a/cmd/crossposting-service/di/inject_pubsub.go b/cmd/crossposting-service/di/inject_pubsub.go index 0b79e20..191cb74 100644 --- a/cmd/crossposting-service/di/inject_pubsub.go +++ b/cmd/crossposting-service/di/inject_pubsub.go @@ -20,6 +20,7 @@ var sqlitePubsubSet = wire.NewSet( sqlite.NewSubscriber, wire.Bind(new(app.Subscriber), new(*sqlite.Subscriber)), + wire.Bind(new(sqlitepubsubport.SqliteSubscriber), new(*sqlite.Subscriber)), ) var sqliteTxPubsubSet = wire.NewSet( diff --git a/cmd/crossposting-service/di/wire.go b/cmd/crossposting-service/di/wire.go index 1d298ee..92ef713 100644 --- a/cmd/crossposting-service/di/wire.go +++ b/cmd/crossposting-service/di/wire.go @@ -11,6 +11,7 @@ import ( "github.com/google/wire" "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/internal/logging" + "github.com/planetary-social/nos-crossposting-service/service/adapters/mocks" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/config" @@ -49,6 +50,27 @@ func BuildTestAdapters(context.Context, testing.TB) (sqlite.TestedItems, func(), return sqlite.TestedItems{}, nil, nil } +type TestApplication struct { + SendTweetHandler *app.SendTweetHandler + + CurrentTimeProvider *mocks.CurrentTimeProvider + UserTokensRepository *mocks.UserTokensRepository + Twitter *mocks.Twitter +} + +func BuildTestApplication(tb testing.TB) (TestApplication, error) { + wire.Build( + wire.Struct(new(TestApplication), "*"), + + applicationSet, + testAdaptersSet, + mockTxAdaptersSet, + + fixtures.TestLogger, + ) + return TestApplication{}, nil +} + func newTestAdaptersConfig(tb testing.TB) (config.Config, error) { return config.NewConfig( fixtures.SomeString(), diff --git a/cmd/crossposting-service/di/wire_gen.go b/cmd/crossposting-service/di/wire_gen.go index 1cfd665..901149f 100644 --- a/cmd/crossposting-service/di/wire_gen.go +++ b/cmd/crossposting-service/di/wire_gen.go @@ -17,6 +17,7 @@ import ( "github.com/planetary-social/nos-crossposting-service/migrations" "github.com/planetary-social/nos-crossposting-service/service/adapters" "github.com/planetary-social/nos-crossposting-service/service/adapters/memorypubsub" + "github.com/planetary-social/nos-crossposting-service/service/adapters/mocks" "github.com/planetary-social/nos-crossposting-service/service/adapters/prometheus" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" "github.com/planetary-social/nos-crossposting-service/service/adapters/twitter" @@ -97,8 +98,9 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S tweetGenerator := domain.NewTweetGenerator(transformer) processReceivedEventHandler := app.NewProcessReceivedEventHandler(genericTransactionProvider, tweetGenerator, logger, prometheusPrometheus) receivedEventSubscriber := memorypubsub2.NewReceivedEventSubscriber(receivedEventPubSub, processReceivedEventHandler, logger) - sendTweetHandler := app.NewSendTweetHandler(genericTransactionProvider, appTwitter, logger, prometheusPrometheus) - tweetCreatedEventSubscriber := sqlitepubsub.NewTweetCreatedEventSubscriber(sendTweetHandler, subscriber, logger, prometheusPrometheus) + currentTimeProvider := adapters.NewCurrentTimeProvider() + sendTweetHandler := app.NewSendTweetHandler(genericTransactionProvider, appTwitter, currentTimeProvider, logger, prometheusPrometheus) + tweetCreatedEventSubscriber := sqlitepubsub.NewTweetCreatedEventSubscriber(sendTweetHandler, subscriber, logger) metrics := timer.NewMetrics(application, logger) migrationsStorage, err := sqlite.NewMigrationsStorage(db) if err != nil { @@ -166,6 +168,54 @@ func BuildTestAdapters(contextContext context.Context, tb testing.TB) (sqlite.Te }, nil } +func BuildTestApplication(tb testing.TB) (TestApplication, error) { + accountRepository, err := mocks.NewAccountRepository() + if err != nil { + return TestApplication{}, err + } + sessionRepository, err := mocks.NewSessionRepository() + if err != nil { + return TestApplication{}, err + } + publicKeyRepository, err := mocks.NewPublicKeyRepository() + if err != nil { + return TestApplication{}, err + } + processedEventRepository, err := mocks.NewProcessedEventRepository() + if err != nil { + return TestApplication{}, err + } + userTokensRepository, err := mocks.NewUserTokensRepository() + if err != nil { + return TestApplication{}, err + } + publisher := mocks.NewPublisher() + appAdapters := app.Adapters{ + Accounts: accountRepository, + Sessions: sessionRepository, + PublicKeys: publicKeyRepository, + ProcessedEvents: processedEventRepository, + UserTokens: userTokensRepository, + Publisher: publisher, + } + transactionProvider := mocks.NewTransactionProvider(appAdapters) + mocksTwitter := mocks.NewTwitter() + currentTimeProvider := mocks.NewCurrentTimeProvider() + logger := fixtures.TestLogger(tb) + prometheusPrometheus, err := prometheus.NewPrometheus(logger) + if err != nil { + return TestApplication{}, err + } + sendTweetHandler := app.NewSendTweetHandler(transactionProvider, mocksTwitter, currentTimeProvider, logger, prometheusPrometheus) + testApplication := TestApplication{ + SendTweetHandler: sendTweetHandler, + CurrentTimeProvider: currentTimeProvider, + UserTokensRepository: userTokensRepository, + Twitter: mocksTwitter, + } + return testApplication, nil +} + func buildTransactionSqliteAdapters(db *sql.DB, tx *sql.Tx, diBuildTransactionSqliteAdaptersDependencies buildTransactionSqliteAdaptersDependencies) (app.Adapters, error) { accountRepository, err := sqlite.NewAccountRepository(tx) if err != nil { @@ -238,6 +288,14 @@ func buildTestTransactionSqliteAdapters(db *sql.DB, tx *sql.Tx, diBuildTransacti // wire.go: +type TestApplication struct { + SendTweetHandler *app.SendTweetHandler + + CurrentTimeProvider *mocks.CurrentTimeProvider + UserTokensRepository *mocks.UserTokensRepository + Twitter *mocks.Twitter +} + func newTestAdaptersConfig(tb testing.TB) (config.Config, error) { return config.NewConfig(fixtures.SomeString(), fixtures.SomeString(), config.EnvironmentDevelopment, logging.LevelDebug, fixtures.SomeString(), fixtures.SomeString(), fixtures.SomeFile(tb), fixtures.SomeString()) } diff --git a/internal/fixtures/fixtures.go b/internal/fixtures/fixtures.go index e670c6a..2fccfd2 100644 --- a/internal/fixtures/fixtures.go +++ b/internal/fixtures/fixtures.go @@ -8,9 +8,11 @@ import ( "math/rand" "os" "testing" + "time" "github.com/nbd-wtf/go-nostr" "github.com/planetary-social/nos-crossposting-service/internal" + "github.com/planetary-social/nos-crossposting-service/internal/logging" "github.com/planetary-social/nos-crossposting-service/service/domain" "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" "github.com/planetary-social/nos-crossposting-service/service/domain/sessions" @@ -130,6 +132,51 @@ func SomeError() error { return fmt.Errorf("some error: %d", rand.Int()) } +func SomeTwitterUserAccessToken() accounts.TwitterUserAccessToken { + v, err := accounts.NewTwitterUserAccessToken(SomeString()) + if err != nil { + panic(err) + } + return v +} + +func SomeTwitterUserAccessSecret() accounts.TwitterUserAccessSecret { + v, err := accounts.NewTwitterUserAccessSecret(SomeString()) + if err != nil { + panic(err) + } + return v +} + +func SomeEventWithCreatedAt(createdAt time.Time) domain.Event { + _, sk := SomeKeyPair() + + libevent := nostr.Event{ + CreatedAt: nostr.Timestamp(createdAt.Unix()), + Kind: domain.EventKindNote.Int(), + Content: SomeString(), + } + err := libevent.Sign(sk) + if err != nil { + panic(err) + } + + event, err := domain.NewEvent(libevent) + if err != nil { + panic(err) + } + + return event +} + +func SomeEvent() domain.Event { + return SomeEventWithCreatedAt(time.Now()) +} + +func TestLogger(tb testing.TB) logging.Logger { + return logging.NewSystemLogger(logging.NewTestingLoggingSystem(tb), "test") +} + var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func randSeq(n int) string { diff --git a/service/adapters/current_time_provider.go b/service/adapters/current_time_provider.go new file mode 100644 index 0000000..e7d3a6a --- /dev/null +++ b/service/adapters/current_time_provider.go @@ -0,0 +1,14 @@ +package adapters + +import "time" + +type CurrentTimeProvider struct { +} + +func NewCurrentTimeProvider() *CurrentTimeProvider { + return &CurrentTimeProvider{} +} + +func (c CurrentTimeProvider) GetCurrentTime() time.Time { + return time.Now() +} diff --git a/service/adapters/mocks/account_repository.go b/service/adapters/mocks/account_repository.go new file mode 100644 index 0000000..d7a9877 --- /dev/null +++ b/service/adapters/mocks/account_repository.go @@ -0,0 +1,29 @@ +package mocks + +import ( + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" +) + +type AccountRepository struct { +} + +func NewAccountRepository() (*AccountRepository, error) { + return &AccountRepository{}, nil +} + +func (m *AccountRepository) GetByTwitterID(twitterID accounts.TwitterID) (*accounts.Account, error) { + return nil, errors.New("not implemented") +} + +func (m *AccountRepository) GetByAccountID(accountID accounts.AccountID) (*accounts.Account, error) { + return nil, errors.New("not implemented") +} + +func (m *AccountRepository) Save(account *accounts.Account) error { + return errors.New("not implemented") +} + +func (m *AccountRepository) Count() (int, error) { + return 0, errors.New("not implemented") +} diff --git a/service/adapters/mocks/current_time_provider.go b/service/adapters/mocks/current_time_provider.go new file mode 100644 index 0000000..1843313 --- /dev/null +++ b/service/adapters/mocks/current_time_provider.go @@ -0,0 +1,19 @@ +package mocks + +import "time" + +type CurrentTimeProvider struct { + CurrentTime time.Time +} + +func NewCurrentTimeProvider() *CurrentTimeProvider { + return &CurrentTimeProvider{} +} + +func (c *CurrentTimeProvider) GetCurrentTime() time.Time { + return c.CurrentTime +} + +func (c *CurrentTimeProvider) SetCurrentTime(currentTime time.Time) { + c.CurrentTime = currentTime +} diff --git a/service/adapters/mocks/processed_event_repository.go b/service/adapters/mocks/processed_event_repository.go new file mode 100644 index 0000000..b25c3fc --- /dev/null +++ b/service/adapters/mocks/processed_event_repository.go @@ -0,0 +1,22 @@ +package mocks + +import ( + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" +) + +type ProcessedEventRepository struct { +} + +func NewProcessedEventRepository() (*ProcessedEventRepository, error) { + return &ProcessedEventRepository{}, nil +} + +func (m *ProcessedEventRepository) Save(eventID domain.EventId, twitterID accounts.TwitterID) error { + return errors.New("not implemented") +} + +func (m *ProcessedEventRepository) WasProcessed(eventID domain.EventId, twitterID accounts.TwitterID) (bool, error) { + return false, errors.New("not implemented") +} diff --git a/service/adapters/mocks/public_key_repository.go b/service/adapters/mocks/public_key_repository.go new file mode 100644 index 0000000..2862d94 --- /dev/null +++ b/service/adapters/mocks/public_key_repository.go @@ -0,0 +1,38 @@ +package mocks + +import ( + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" +) + +type PublicKeyRepository struct { +} + +func NewPublicKeyRepository() (*PublicKeyRepository, error) { + return &PublicKeyRepository{}, nil +} + +func (m *PublicKeyRepository) Save(linkedPublicKey *domain.LinkedPublicKey) error { + return errors.New("not implemented") +} + +func (m *PublicKeyRepository) Delete(accountID accounts.AccountID, publicKey domain.PublicKey) error { + return errors.New("not implemented") +} + +func (m *PublicKeyRepository) List() ([]*domain.LinkedPublicKey, error) { + return nil, errors.New("not implemented") +} + +func (m *PublicKeyRepository) ListByPublicKey(publicKey domain.PublicKey) ([]*domain.LinkedPublicKey, error) { + return nil, errors.New("not implemented") +} + +func (m *PublicKeyRepository) ListByAccountID(accountID accounts.AccountID) ([]*domain.LinkedPublicKey, error) { + return nil, errors.New("not implemented") +} + +func (m *PublicKeyRepository) Count() (int, error) { + return 0, errors.New("not implemented") +} diff --git a/service/adapters/mocks/publisher.go b/service/adapters/mocks/publisher.go new file mode 100644 index 0000000..a57ad4c --- /dev/null +++ b/service/adapters/mocks/publisher.go @@ -0,0 +1,16 @@ +package mocks + +import ( + "github.com/planetary-social/nos-crossposting-service/service/app" +) + +type Publisher struct { +} + +func NewPublisher() *Publisher { + return &Publisher{} +} + +func (p *Publisher) PublishTweetCreated(event app.TweetCreatedEvent) error { + return nil +} diff --git a/service/adapters/mocks/session_repository.go b/service/adapters/mocks/session_repository.go new file mode 100644 index 0000000..f759a2b --- /dev/null +++ b/service/adapters/mocks/session_repository.go @@ -0,0 +1,25 @@ +package mocks + +import ( + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain/sessions" +) + +type SessionRepository struct { +} + +func NewSessionRepository() (*SessionRepository, error) { + return &SessionRepository{}, nil +} + +func (m *SessionRepository) Get(id sessions.SessionID) (*sessions.Session, error) { + return nil, errors.New("not implemented") +} + +func (m *SessionRepository) Save(session *sessions.Session) error { + return errors.New("not implemented") +} + +func (m *SessionRepository) Delete(id sessions.SessionID) error { + return errors.New("not implemented") +} diff --git a/service/adapters/mocks/transaction_provider.go b/service/adapters/mocks/transaction_provider.go new file mode 100644 index 0000000..e56d8da --- /dev/null +++ b/service/adapters/mocks/transaction_provider.go @@ -0,0 +1,19 @@ +package mocks + +import ( + "context" + + "github.com/planetary-social/nos-crossposting-service/service/app" +) + +type TransactionProvider struct { + adapters app.Adapters +} + +func NewTransactionProvider(adapters app.Adapters) *TransactionProvider { + return &TransactionProvider{adapters: adapters} +} + +func (t *TransactionProvider) Transact(ctx context.Context, f func(context.Context, app.Adapters) error) error { + return f(ctx, t.adapters) +} diff --git a/service/adapters/mocks/twitter.go b/service/adapters/mocks/twitter.go new file mode 100644 index 0000000..609175d --- /dev/null +++ b/service/adapters/mocks/twitter.go @@ -0,0 +1,37 @@ +package mocks + +import ( + "context" + + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/app" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" +) + +type Twitter struct { + PostTweetCalls []PostTweetCall +} + +func NewTwitter() *Twitter { + return &Twitter{} +} + +func (t *Twitter) PostTweet(ctx context.Context, userAccessToken accounts.TwitterUserAccessToken, userAccessSecret accounts.TwitterUserAccessSecret, tweet domain.Tweet) error { + t.PostTweetCalls = append(t.PostTweetCalls, PostTweetCall{ + UserAccessToken: userAccessToken, + UserAccessSecret: userAccessSecret, + Tweet: tweet, + }) + return nil +} + +func (t *Twitter) GetAccountDetails(ctx context.Context, userAccessToken accounts.TwitterUserAccessToken, userAccessSecret accounts.TwitterUserAccessSecret) (app.TwitterAccountDetails, error) { + return app.TwitterAccountDetails{}, errors.New("not implemented") +} + +type PostTweetCall struct { + UserAccessToken accounts.TwitterUserAccessToken + UserAccessSecret accounts.TwitterUserAccessSecret + Tweet domain.Tweet +} diff --git a/service/adapters/mocks/user_tokens_repository.go b/service/adapters/mocks/user_tokens_repository.go new file mode 100644 index 0000000..7180b42 --- /dev/null +++ b/service/adapters/mocks/user_tokens_repository.go @@ -0,0 +1,32 @@ +package mocks + +import ( + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" +) + +type UserTokensRepository struct { + mockedUserTokens map[accounts.AccountID]*accounts.TwitterUserTokens +} + +func NewUserTokensRepository() (*UserTokensRepository, error) { + return &UserTokensRepository{ + mockedUserTokens: make(map[accounts.AccountID]*accounts.TwitterUserTokens), + }, nil +} + +func (m *UserTokensRepository) Save(userTokens *accounts.TwitterUserTokens) error { + return errors.New("not implemented") +} + +func (m *UserTokensRepository) Get(id accounts.AccountID) (*accounts.TwitterUserTokens, error) { + v, ok := m.mockedUserTokens[id] + if !ok { + return nil, errors.New("user tokens not mocked") + } + return v, nil +} + +func (m *UserTokensRepository) MockUserTokens(tokens *accounts.TwitterUserTokens) { + m.mockedUserTokens[tokens.AccountID()] = tokens +} diff --git a/service/adapters/sqlite/publisher.go b/service/adapters/sqlite/publisher.go index 548d2be..41e380e 100644 --- a/service/adapters/sqlite/publisher.go +++ b/service/adapters/sqlite/publisher.go @@ -3,11 +3,11 @@ package sqlite import ( "database/sql" "encoding/json" + "time" "github.com/boreq/errors" "github.com/oklog/ulid/v2" - "github.com/planetary-social/nos-crossposting-service/service/domain" - "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" + "github.com/planetary-social/nos-crossposting-service/service/app" ) const TweetCreatedTopic = "tweet_created" @@ -21,12 +21,14 @@ func NewPublisher(pubsub *PubSub, tx *sql.Tx) *Publisher { return &Publisher{pubsub: pubsub, tx: tx} } -func (p *Publisher) PublishTweetCreated(accountID accounts.AccountID, tweet domain.Tweet) error { +func (p *Publisher) PublishTweetCreated(event app.TweetCreatedEvent) error { transport := TweetCreatedEventTransport{ - AccountID: accountID.String(), + AccountID: event.AccountID().String(), Tweet: TweetTransport{ - Text: tweet.Text(), + Text: event.Tweet().Text(), }, + Event: event.Event().Raw(), + CreatedAt: event.CreatedAt(), } payload, err := json.Marshal(transport) @@ -45,6 +47,8 @@ func (p *Publisher) PublishTweetCreated(accountID accounts.AccountID, tweet doma type TweetCreatedEventTransport struct { AccountID string `json:"accountID"` Tweet TweetTransport `json:"tweet"` + Event []byte `json:"event"` + CreatedAt *time.Time `json:"createdAt"` } type TweetTransport struct { diff --git a/service/adapters/sqlite/publisher_test.go b/service/adapters/sqlite/publisher_test.go index 3728a8a..db16a46 100644 --- a/service/adapters/sqlite/publisher_test.go +++ b/service/adapters/sqlite/publisher_test.go @@ -3,9 +3,11 @@ package sqlite_test import ( "context" "testing" + "time" "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/domain" "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" "github.com/stretchr/testify/require" @@ -16,13 +18,16 @@ func TestPublisher_ItIsPossibleToPublishEvents(t *testing.T) { adapters := NewTestAdapters(ctx, t) accountID := fixtures.SomeAccountID() + tweet := domain.NewTweet("some tweet") twitterID := fixtures.SomeTwitterID() + createdAt := time.Now() + event := fixtures.SomeEvent() + + tweetCreatedEvent := app.NewTweetCreatedEvent(accountID, tweet, createdAt, event) account, err := accounts.NewAccount(accountID, twitterID) require.NoError(t, err) - tweet := domain.NewTweet("some tweet") - err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { err = adapters.AccountRepository.Save(account) require.NoError(t, err) @@ -32,7 +37,7 @@ func TestPublisher_ItIsPossibleToPublishEvents(t *testing.T) { require.NoError(t, err) err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { - err = adapters.Publisher.PublishTweetCreated(accountID, tweet) + err = adapters.Publisher.PublishTweetCreated(tweetCreatedEvent) require.NoError(t, err) return nil diff --git a/service/adapters/sqlite/subscriber_test.go b/service/adapters/sqlite/subscriber_test.go index 45cbda8..fc151ae 100644 --- a/service/adapters/sqlite/subscriber_test.go +++ b/service/adapters/sqlite/subscriber_test.go @@ -3,9 +3,11 @@ package sqlite_test import ( "context" "testing" + "time" "github.com/planetary-social/nos-crossposting-service/internal/fixtures" "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/domain" "github.com/stretchr/testify/require" ) @@ -21,7 +23,10 @@ func TestSubscriber_TweetCreatedAnalysis(t *testing.T) { accountID := fixtures.SomeAccountID() for j := 0; j <= i; j++ { - err := adapters.Publisher.PublishTweetCreated(accountID, domain.NewTweet(fixtures.SomeString())) + tweet := domain.NewTweet(fixtures.SomeString()) + event := app.NewTweetCreatedEvent(accountID, tweet, time.Now(), fixtures.SomeEvent()) + + err := adapters.Publisher.PublishTweetCreated(event) require.NoError(t, err) } } diff --git a/service/app/app.go b/service/app/app.go index 90e50bd..2482261 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -2,6 +2,7 @@ package app import ( "context" + "time" "github.com/boreq/errors" "github.com/planetary-social/nos-crossposting-service/service/domain" @@ -65,7 +66,7 @@ type UserTokensRepository interface { } type Publisher interface { - PublishTweetCreated(accountID accounts.AccountID, tweet domain.Tweet) error + PublishTweetCreated(event TweetCreatedEvent) error } type TweetGenerator interface { @@ -209,3 +210,58 @@ func (t TwitterAccountDetails) Username() string { func (t TwitterAccountDetails) ProfileImageURL() string { return t.profileImageURL } + +type TweetCreatedEvent struct { + accountID accounts.AccountID + tweet domain.Tweet + createdAt *time.Time + event *domain.Event +} + +func NewTweetCreatedEvent( + accountID accounts.AccountID, + tweet domain.Tweet, + createdAt time.Time, + event domain.Event, +) TweetCreatedEvent { + return TweetCreatedEvent{ + accountID: accountID, + tweet: tweet, + createdAt: &createdAt, + event: &event, + } +} + +func NewTweetCreatedEventFromHistory( + accountID accounts.AccountID, + tweet domain.Tweet, + createdAt *time.Time, + event *domain.Event, +) TweetCreatedEvent { + return TweetCreatedEvent{ + accountID: accountID, + tweet: tweet, + createdAt: createdAt, + event: event, + } +} + +func (t TweetCreatedEvent) AccountID() accounts.AccountID { + return t.accountID +} + +func (t TweetCreatedEvent) Tweet() domain.Tweet { + return t.tweet +} + +func (t TweetCreatedEvent) CreatedAt() *time.Time { + return t.createdAt +} + +func (t TweetCreatedEvent) Event() *domain.Event { + return t.event +} + +type CurrentTimeProvider interface { + GetCurrentTime() time.Time +} diff --git a/service/app/handler_process_received_event.go b/service/app/handler_process_received_event.go index 6d5d84b..486a514 100644 --- a/service/app/handler_process_received_event.go +++ b/service/app/handler_process_received_event.go @@ -3,6 +3,7 @@ package app import ( "context" "fmt" + "time" "github.com/boreq/errors" "github.com/planetary-social/nos-crossposting-service/internal/logging" @@ -95,7 +96,8 @@ func (h *ProcessReceivedEventHandler) Handle(ctx context.Context, cmd ProcessRec } for _, tweet := range tweets { - if err := adapters.Publisher.PublishTweetCreated(account.AccountID(), tweet); err != nil { + tweetCreatedEvent := NewTweetCreatedEvent(account.AccountID(), tweet, time.Now(), event) + if err := adapters.Publisher.PublishTweetCreated(tweetCreatedEvent); err != nil { return errors.Wrap(err, "error publishing tweet created event") } } diff --git a/service/app/handler_send_tweet.go b/service/app/handler_send_tweet.go index 84304ae..88c5e99 100644 --- a/service/app/handler_send_tweet.go +++ b/service/app/handler_send_tweet.go @@ -2,6 +2,7 @@ package app import ( "context" + "time" "github.com/boreq/errors" "github.com/planetary-social/nos-crossposting-service/internal/logging" @@ -9,18 +10,44 @@ import ( "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" ) +const ( + dropEventsIfNotPostedFor = 7 * 24 * time.Hour +) + +var ( + whenEventWasAddedToTweetCreatedEvent = time.Date(2023, time.November, 17, 0, 0, 0, 0, time.UTC) +) + type SendTweet struct { accountID accounts.AccountID tweet domain.Tweet + event *domain.Event } -func NewSendTweet(accountID accounts.AccountID, tweet domain.Tweet) SendTweet { - return SendTweet{accountID: accountID, tweet: tweet} +func NewSendTweet(accountID accounts.AccountID, tweet domain.Tweet, event *domain.Event) SendTweet { + return SendTweet{ + accountID: accountID, + tweet: tweet, + event: event, + } +} + +func (s SendTweet) AccountID() accounts.AccountID { + return s.accountID +} + +func (s SendTweet) Tweet() domain.Tweet { + return s.tweet +} + +func (s SendTweet) Event() *domain.Event { + return s.event } type SendTweetHandler struct { transactionProvider TransactionProvider twitter Twitter + currentTimeProvider CurrentTimeProvider logger logging.Logger metrics Metrics } @@ -28,12 +55,14 @@ type SendTweetHandler struct { func NewSendTweetHandler( transactionProvider TransactionProvider, twitter Twitter, + currentTimeProvider CurrentTimeProvider, logger logging.Logger, metrics Metrics, ) *SendTweetHandler { return &SendTweetHandler{ transactionProvider: transactionProvider, twitter: twitter, + currentTimeProvider: currentTimeProvider, logger: logger.New("sendTweetHandler"), metrics: metrics, } @@ -42,11 +71,24 @@ func NewSendTweetHandler( func (h *SendTweetHandler) Handle(ctx context.Context, cmd SendTweet) (err error) { defer h.metrics.StartApplicationCall("sendTweet").End(&err) - h.logger.Debug(). + h.logger. + Debug(). WithField("accountID", cmd.accountID). WithField("tweet", cmd.tweet.Text()). Message("attempting to post a tweet") + if cmd.event != nil { + dropEventIfPostedBefore := h.currentTimeProvider.GetCurrentTime().Add(-dropEventsIfNotPostedFor) + if cmd.event.CreatedAt().Before(dropEventIfPostedBefore) { + return nil + } + } else { + dropEventIfItIsNilAndCurrentTimeIsAfter := whenEventWasAddedToTweetCreatedEvent.Add(dropEventsIfNotPostedFor) + if h.currentTimeProvider.GetCurrentTime().After(dropEventIfItIsNilAndCurrentTimeIsAfter) { + return nil + } + } + var userTokens *accounts.TwitterUserTokens if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { tmp, err := adapters.UserTokens.Get(cmd.accountID) diff --git a/service/app/handler_send_tweet_test.go b/service/app/handler_send_tweet_test.go new file mode 100644 index 0000000..30b7b69 --- /dev/null +++ b/service/app/handler_send_tweet_test.go @@ -0,0 +1,92 @@ +package app_test + +import ( + "testing" + "time" + + "github.com/planetary-social/nos-crossposting-service/cmd/crossposting-service/di" + "github.com/planetary-social/nos-crossposting-service/internal" + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" + "github.com/planetary-social/nos-crossposting-service/service/app" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" + "github.com/stretchr/testify/require" +) + +func TestSendTweetHandler_CorrectlyDropsOldEvents(t *testing.T) { + testCases := []struct { + Name string + + CurrentTime time.Time + Event *domain.Event + + ShouldPostTweet bool + }{ + { + Name: "old_events_are_dropped_after_a_week_since_code_change_passes", + + CurrentTime: date(2023, time.November, 25), + Event: nil, + + ShouldPostTweet: false, + }, + { + Name: "old_events_are_not_dropped_before_a_week_since_code_change_passes", + + CurrentTime: date(2023, time.November, 23), + Event: nil, + + ShouldPostTweet: true, + }, + { + Name: "new_events_are_dropped_after_a_week_since_they_were_created", + + CurrentTime: date(2023, time.November, 28), + Event: internal.Pointer(fixtures.SomeEventWithCreatedAt(date(2023, time.November, 20))), + + ShouldPostTweet: false, + }, + { + Name: "new_events_are_not_dropped_before_a_week_since_they_were_created", + + CurrentTime: date(2023, time.November, 27), + Event: internal.Pointer(fixtures.SomeEventWithCreatedAt(date(2023, time.November, 20))), + + ShouldPostTweet: true, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + ts, err := di.BuildTestApplication(t) + require.NoError(t, err) + + ctx := fixtures.TestContext(t) + + accountId := fixtures.SomeAccountID() + tweet := domain.NewTweet(fixtures.SomeString()) + userTokens := accounts.NewTwitterUserTokens( + accountId, + fixtures.SomeTwitterUserAccessToken(), + fixtures.SomeTwitterUserAccessSecret(), + ) + ts.UserTokensRepository.MockUserTokens(userTokens) + ts.CurrentTimeProvider.SetCurrentTime(testCase.CurrentTime) + + cmd := app.NewSendTweet(accountId, tweet, testCase.Event) + + err = ts.SendTweetHandler.Handle(ctx, cmd) + require.NoError(t, err) + + if testCase.ShouldPostTweet { + require.Len(t, ts.Twitter.PostTweetCalls, 1) + } else { + require.Len(t, ts.Twitter.PostTweetCalls, 0) + } + }) + } +} + +func date(year int, month time.Month, day int) time.Time { + return time.Date(year, month, day, 0, 0, 0, 0, time.UTC) +} diff --git a/service/ports/sqlitepubsub/tweet_created.go b/service/ports/sqlitepubsub/tweet_created.go index 4aac523..0684f00 100644 --- a/service/ports/sqlitepubsub/tweet_created.go +++ b/service/ports/sqlitepubsub/tweet_created.go @@ -16,24 +16,25 @@ type SendTweetHandler interface { Handle(ctx context.Context, cmd app.SendTweet) (err error) } +type SqliteSubscriber interface { + SubscribeToTweetCreated(ctx context.Context) <-chan *sqlite.ReceivedMessage +} + type TweetCreatedEventSubscriber struct { handler SendTweetHandler - subscriber *sqlite.Subscriber + subscriber SqliteSubscriber logger logging.Logger - metrics app.Metrics } func NewTweetCreatedEventSubscriber( handler SendTweetHandler, - subscriber *sqlite.Subscriber, + subscriber SqliteSubscriber, logger logging.Logger, - metrics app.Metrics, ) *TweetCreatedEventSubscriber { return &TweetCreatedEventSubscriber{ handler: handler, subscriber: subscriber, logger: logger.New("tweetCreatedEventSubscriber"), - metrics: metrics, } } func (s *TweetCreatedEventSubscriber) Run(ctx context.Context) error { @@ -65,7 +66,13 @@ func (s *TweetCreatedEventSubscriber) handleMessage(ctx context.Context, msg *sq } tweet := domain.NewTweet(transport.Tweet.Text) - cmd := app.NewSendTweet(accountID, tweet) + + event, err := s.getEvent(transport) + if err != nil { + return errors.Wrap(err, "error getting the event from transport") + } + + cmd := app.NewSendTweet(accountID, tweet, event) if err := s.handler.Handle(ctx, cmd); err != nil { return errors.Wrap(err, "error calling the handler") @@ -73,3 +80,16 @@ func (s *TweetCreatedEventSubscriber) handleMessage(ctx context.Context, msg *sq return nil } + +func (s *TweetCreatedEventSubscriber) getEvent(transport sqlite.TweetCreatedEventTransport) (*domain.Event, error) { + if len(transport.Event) == 0 { + return nil, nil + } + + event, err := domain.NewEventFromRaw(transport.Event) + if err != nil { + return nil, errors.Wrap(err, "error creating a domain event") + } + + return &event, nil +} diff --git a/service/ports/sqlitepubsub/tweet_created_test.go b/service/ports/sqlitepubsub/tweet_created_test.go new file mode 100644 index 0000000..009b365 --- /dev/null +++ b/service/ports/sqlitepubsub/tweet_created_test.go @@ -0,0 +1,133 @@ +package sqlitepubsub + +import ( + "context" + "encoding/base64" + "fmt" + "sync" + "testing" + "time" + + "github.com/planetary-social/nos-crossposting-service/internal" + "github.com/planetary-social/nos-crossposting-service/internal/fixtures" + "github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite" + "github.com/planetary-social/nos-crossposting-service/service/app" + "github.com/planetary-social/nos-crossposting-service/service/domain" + "github.com/planetary-social/nos-crossposting-service/service/domain/accounts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTweetCreatedEventSubscriber_CanHandleOldAndNewEvents(t *testing.T) { + event := fixtures.SomeEvent() + + testCases := []struct { + Name string + Payload string + ExpectedCommand app.SendTweet + }{ + { + Name: "old", + Payload: `{"accountID": "someAccountID", "tweet": {"text": "someTweetText"}}`, + ExpectedCommand: app.NewSendTweet( + accounts.MustNewAccountID("someAccountID"), + domain.NewTweet("someTweetText"), + nil, + ), + }, + { + Name: "new", + Payload: fmt.Sprintf( + `{"accountID": "someAccountID", "tweet": {"text": "someTweetText"}, "event": "%s", "createdAt": "%s"}`, + base64.StdEncoding.EncodeToString(event.Raw()), + time.Now().Format(time.RFC3339), + ), + ExpectedCommand: app.NewSendTweet( + accounts.MustNewAccountID("someAccountID"), + domain.NewTweet("someTweetText"), + &event, + ), + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + ctx := fixtures.TestContext(t) + h := newSendTweetHandlerMock() + s := newSqliteSubscriberMock() + logger := fixtures.TestLogger(t) + subscriber := NewTweetCreatedEventSubscriber(h, s, logger) + + go func() { + _ = subscriber.Run(ctx) + }() + + message, err := sqlite.NewMessage(fixtures.SomeString(), []byte(testCase.Payload)) + require.NoError(t, err) + + receivedMessage := sqlite.NewReceivedMessage(message) + + err = s.PublishTweetCreated(ctx, receivedMessage) + require.NoError(t, err) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + calls := h.Calls() + if assert.Len(t, calls, 1) { + call := calls[0] + assert.Equal(t, call.AccountID(), testCase.ExpectedCommand.AccountID()) + assert.Equal(t, call.Tweet(), testCase.ExpectedCommand.Tweet()) + if testCase.ExpectedCommand.Event() == nil { + require.Nil(t, call.Event()) + } else { + assert.Equal(t, call.Event().Raw(), testCase.ExpectedCommand.Event().Raw()) + } + } + }, 1*time.Second, 100*time.Millisecond) + }) + } +} + +type sendTweetHandlerMock struct { + calls []app.SendTweet + callsLock sync.Mutex +} + +func newSendTweetHandlerMock() *sendTweetHandlerMock { + return &sendTweetHandlerMock{} +} + +func (s *sendTweetHandlerMock) Handle(ctx context.Context, cmd app.SendTweet) (err error) { + s.callsLock.Lock() + defer s.callsLock.Unlock() + s.calls = append(s.calls, cmd) + return nil +} + +func (s *sendTweetHandlerMock) Calls() []app.SendTweet { + s.callsLock.Lock() + defer s.callsLock.Unlock() + return internal.CopySlice(s.calls) +} + +type sqliteSubscriberMock struct { + ch chan *sqlite.ReceivedMessage +} + +func newSqliteSubscriberMock() *sqliteSubscriberMock { + return &sqliteSubscriberMock{ + ch: make(chan *sqlite.ReceivedMessage), + } +} + +func (s *sqliteSubscriberMock) SubscribeToTweetCreated(ctx context.Context) <-chan *sqlite.ReceivedMessage { + return s.ch +} + +func (s *sqliteSubscriberMock) PublishTweetCreated(ctx context.Context, message *sqlite.ReceivedMessage) error { + select { + case s.ch <- message: + return nil + case <-ctx.Done(): + return ctx.Err() + } +}