From f17615e45b1856b355d34eec799fb996d09b5341 Mon Sep 17 00:00:00 2001 From: boreq Date: Thu, 30 Nov 2023 18:03:56 +0100 Subject: [PATCH] Use relay.nos.social for relay lookups --- .../di/inject_adapters.go | 24 ++++- cmd/crossposting-service/di/wire_gen.go | 4 +- service/adapters/prometheus/prometheus.go | 5 +- service/adapters/purple_pages.go | 14 ++- service/adapters/purple_pages_cache.go | 91 ++++++++++++++++++ service/adapters/relay_source.go | 92 ++----------------- service/app/app.go | 2 +- 7 files changed, 139 insertions(+), 93 deletions(-) create mode 100644 service/adapters/purple_pages_cache.go diff --git a/cmd/crossposting-service/di/inject_adapters.go b/cmd/crossposting-service/di/inject_adapters.go index 09e5ae7..2fe1967 100644 --- a/cmd/crossposting-service/di/inject_adapters.go +++ b/cmd/crossposting-service/di/inject_adapters.go @@ -1,6 +1,7 @@ package di import ( + "context" "database/sql" "github.com/boreq/errors" @@ -13,6 +14,7 @@ import ( "github.com/planetary-social/nos-crossposting-service/service/adapters/twitter" "github.com/planetary-social/nos-crossposting-service/service/app" "github.com/planetary-social/nos-crossposting-service/service/config" + "github.com/planetary-social/nos-crossposting-service/service/domain" ) var sqliteAdaptersSet = wire.NewSet( @@ -62,7 +64,7 @@ var adaptersSet = wire.NewSet( wire.Bind(new(app.AccountIDGenerator), new(*adapters.IDGenerator)), adapters.NewRelaySource, - adapters.NewPurplePages, + newPurplePages, wire.Bind(new(app.RelaySource), new(*adapters.RelaySource)), adapters.NewRelayEventDownloader, @@ -115,6 +117,26 @@ var mockTxAdaptersSet = wire.NewSet( wire.Bind(new(app.Publisher), new(*mocks.Publisher)), ) +var purplePagesAddresses = []domain.RelayAddress{ + domain.MustNewRelayAddress("wss://purplepag.es"), + domain.MustNewRelayAddress("wss://relay.nos.social"), +} + +func newPurplePages(ctx context.Context, logger logging.Logger, metrics app.Metrics) ([]*adapters.CachedPurplePages, error) { + var result []*adapters.CachedPurplePages + + for _, address := range purplePagesAddresses { + v, err := adapters.NewPurplePages(ctx, address, logger, metrics) + if err != nil { + return nil, errors.Wrap(err, "error creating purple pages") + } + + result = append(result, adapters.NewCachedPurplePages(logger, v)) + } + + return result, nil +} + func newAdaptersFactoryFn(deps buildTransactionSqliteAdaptersDependencies) sqlite.AdaptersFactoryFn { return func(db *sql.DB, tx *sql.Tx) (app.Adapters, error) { return buildTransactionSqliteAdapters(db, tx, deps) diff --git a/cmd/crossposting-service/di/wire_gen.go b/cmd/crossposting-service/di/wire_gen.go index 901149f..b465366 100644 --- a/cmd/crossposting-service/di/wire_gen.go +++ b/cmd/crossposting-service/di/wire_gen.go @@ -86,12 +86,12 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S server := http.NewServer(configConfig, application, logger, frontendFileSystem) metricsServer := http.NewMetricsServer(prometheusPrometheus, configConfig, logger) receivedEventPubSub := memorypubsub.NewReceivedEventPubSub() - purplePages, err := adapters.NewPurplePages(contextContext, logger, prometheusPrometheus) + v, err := newPurplePages(contextContext, logger, prometheusPrometheus) if err != nil { cleanup() return Service{}, nil, err } - relaySource := adapters.NewRelaySource(logger, purplePages) + relaySource := adapters.NewRelaySource(logger, v) relayEventDownloader := adapters.NewRelayEventDownloader(contextContext, logger, prometheusPrometheus) downloader := app.NewDownloader(genericTransactionProvider, receivedEventPubSub, logger, prometheusPrometheus, relaySource, relayEventDownloader) transformer := content.NewTransformer() diff --git a/service/adapters/prometheus/prometheus.go b/service/adapters/prometheus/prometheus.go index 93ebc6d..03b4401 100644 --- a/service/adapters/prometheus/prometheus.go +++ b/service/adapters/prometheus/prometheus.go @@ -124,7 +124,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { Name: "purple_pages_lookups", Help: "Number of purple pages lookups.", }, - []string{labelResult, labelErrorDescription}, + []string{labelResult, labelErrorDescription, labelRelayAddress}, ) tweetCreatedCountPerAccountGauge := prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -257,10 +257,11 @@ func (p *Prometheus) ReportSubscriptionQueueLength(topic string, n int) { p.subscriptionQueueLengthGauge.With(prometheus.Labels{labelTopic: topic}).Set(float64(n)) } -func (p *Prometheus) ReportPurplePagesLookupResult(err *error) { +func (p *Prometheus) ReportPurplePagesLookupResult(address domain.RelayAddress, err *error) { labels := prometheus.Labels{ labelResult: labelResultValueSuccess, labelErrorDescription: "none", + labelRelayAddress: address.String(), } if *err != nil { labels[labelResult] = labelResultValueError diff --git a/service/adapters/purple_pages.go b/service/adapters/purple_pages.go index 29c3110..90a3631 100644 --- a/service/adapters/purple_pages.go +++ b/service/adapters/purple_pages.go @@ -2,6 +2,7 @@ package adapters import ( "context" + "fmt" "sync" "time" @@ -19,8 +20,6 @@ var ( errLookupFoundNoEvents = errors.New("lookup found no events") ) -var purplePagesAddress = domain.MustNewRelayAddress("wss://purplepag.es") - const purplePagesLookupTimeout = 10 * time.Second const numLookups = 2 @@ -34,21 +33,22 @@ type PurplePages struct { func NewPurplePages( ctx context.Context, + address domain.RelayAddress, logger logging.Logger, metrics app.Metrics, ) (*PurplePages, error) { - connection := NewRelayConnection(purplePagesAddress, logger) + connection := NewRelayConnection(address, logger) go connection.Run(ctx) return &PurplePages{ - logger: logger, + logger: logger.New(fmt.Sprintf("PurplePages(%s)", address.String())), metrics: metrics, connection: connection, }, nil } func (p *PurplePages) GetRelays(ctx context.Context, publicKey domain.PublicKey) (result []domain.RelayAddress, err error) { - defer p.metrics.ReportPurplePagesLookupResult(&err) + defer p.metrics.ReportPurplePagesLookupResult(p.connection.Address(), &err) p.mutex.Lock() defer p.mutex.Unlock() @@ -197,6 +197,10 @@ func (p *PurplePages) getRelaysFromContacts(ctx context.Context, publicKey domai return nil, errors.New("timeout") } +func (p *PurplePages) Address() domain.RelayAddress { + return p.connection.Address() +} + type relaysOrError struct { Err error Addresses []domain.RelayAddress diff --git a/service/adapters/purple_pages_cache.go b/service/adapters/purple_pages_cache.go new file mode 100644 index 0000000..faa0523 --- /dev/null +++ b/service/adapters/purple_pages_cache.go @@ -0,0 +1,91 @@ +package adapters + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/boreq/errors" + "github.com/planetary-social/nos-crossposting-service/internal/logging" + "github.com/planetary-social/nos-crossposting-service/service/domain" +) + +type CachedPurplePages struct { + logger logging.Logger + purplePages *PurplePages + cache *RelayAddressCache +} + +func NewCachedPurplePages(logger logging.Logger, purplePages *PurplePages) *CachedPurplePages { + return &CachedPurplePages{ + logger: logger.New(fmt.Sprintf("CachedPurplePages(%s)", purplePages.Address().String())), + purplePages: purplePages, + cache: NewRelayAddressCache(), + } +} + +func (p CachedPurplePages) GetRelays(ctx context.Context, publicKey domain.PublicKey) ([]domain.RelayAddress, error) { + entry, ok := p.cache.Get(publicKey) + if ok { + if time.Since(entry.T) < refreshPurplePagesAfter { + return entry.Addresses, nil + } + } + + newRelayAddresses, err := p.getRelaysFromPurplePages(ctx, publicKey) + if err != nil { + return nil, errors.Wrap(err, "error querying purple pages") + } + + p.cache.Set(publicKey, newRelayAddresses) + return newRelayAddresses, nil +} + +func (p CachedPurplePages) getRelaysFromPurplePages(ctx context.Context, publicKey domain.PublicKey) ([]domain.RelayAddress, error) { + relayAddressesFromPurplePages, err := p.purplePages.GetRelays(ctx, publicKey) + if err != nil { + if errors.Is(err, ErrRelayListNotFoundInPurplePages) { + p.logger.Debug().WithError(err).Message("relay list not found in purple pages") + return nil, nil + } + return nil, errors.Wrap(err, "error querying purple pages") + } + return relayAddressesFromPurplePages, nil +} + +func (p CachedPurplePages) Address() domain.RelayAddress { + return p.purplePages.Address() +} + +type RelayAddressCache struct { + m map[domain.PublicKey]Entry + lock sync.Mutex +} + +func NewRelayAddressCache() *RelayAddressCache { + return &RelayAddressCache{m: make(map[domain.PublicKey]Entry)} +} + +func (c *RelayAddressCache) Set(publicKey domain.PublicKey, addresses []domain.RelayAddress) { + c.lock.Lock() + defer c.lock.Unlock() + + c.m[publicKey] = Entry{ + T: time.Now(), + Addresses: addresses, + } +} + +func (c *RelayAddressCache) Get(publicKey domain.PublicKey) (Entry, bool) { + c.lock.Lock() + defer c.lock.Unlock() + + v, ok := c.m[publicKey] + return v, ok +} + +type Entry struct { + T time.Time + Addresses []domain.RelayAddress +} diff --git a/service/adapters/relay_source.go b/service/adapters/relay_source.go index 26d4e34..4c6140e 100644 --- a/service/adapters/relay_source.go +++ b/service/adapters/relay_source.go @@ -2,10 +2,9 @@ package adapters import ( "context" - "sync" + "fmt" "time" - "github.com/boreq/errors" "github.com/planetary-social/nos-crossposting-service/internal" "github.com/planetary-social/nos-crossposting-service/internal/logging" "github.com/planetary-social/nos-crossposting-service/service/domain" @@ -21,97 +20,26 @@ var hardcodedRelayAddresses = []domain.RelayAddress{ type RelaySource struct { logger logging.Logger - purplePages *PurplePages - cache *RelayAddressCache + purplePages []*CachedPurplePages } -func NewRelaySource(logger logging.Logger, purplePages *PurplePages) *RelaySource { +func NewRelaySource(logger logging.Logger, purplePages []*CachedPurplePages) *RelaySource { return &RelaySource{ logger: logger, purplePages: purplePages, - cache: NewRelayAddressCache(), } } func (p RelaySource) GetRelays(ctx context.Context, publicKey domain.PublicKey) ([]domain.RelayAddress, error) { - result := internal.NewEmptySet[domain.RelayAddress]() + result := internal.NewSet[domain.RelayAddress](hardcodedRelayAddresses) - for _, relayAddress := range hardcodedRelayAddresses { - result.Put(relayAddress) - } - - relayAddressesFromPurplePages, err := p.getRelaysFromPurplePagesOrCache(ctx, publicKey) - if err != nil { - return nil, errors.Wrap(err, "error getting relays from purple pages") - } - - for _, relayAddress := range relayAddressesFromPurplePages { - result.Put(relayAddress) - } - - return result.List(), nil -} - -func (p RelaySource) getRelaysFromPurplePagesOrCache(ctx context.Context, publicKey domain.PublicKey) ([]domain.RelayAddress, error) { - var previousEntries []domain.RelayAddress - - entry, ok := p.cache.Get(publicKey) - if ok { - previousEntries = entry.Addresses - if time.Since(entry.T) < refreshPurplePagesAfter { - return previousEntries, nil - } - } - - relayAddressesFromPurplePages, err := p.getRelaysFromPurplePages(ctx, publicKey) - if err != nil { - return nil, errors.Wrap(err, "error querying purple pages") - } - - p.cache.Set(publicKey, relayAddressesFromPurplePages) - return relayAddressesFromPurplePages, nil -} - -func (p RelaySource) getRelaysFromPurplePages(ctx context.Context, publicKey domain.PublicKey) ([]domain.RelayAddress, error) { - relayAddressesFromPurplePages, err := p.purplePages.GetRelays(ctx, publicKey) - if err != nil { - if errors.Is(err, ErrRelayListNotFoundInPurplePages) { - p.logger.Debug().WithError(err).Message("relay list not found in purple pages") - return nil, nil + for _, purplePages := range p.purplePages { + relayAddressesFromPurplePages, err := purplePages.GetRelays(ctx, publicKey) + if err != nil { + return nil, fmt.Errorf("error getting relays from '%s'", purplePages.Address().String()) } - return nil, errors.Wrap(err, "error querying purple pages") + result.PutMany(relayAddressesFromPurplePages) } - return relayAddressesFromPurplePages, nil -} - -type RelayAddressCache struct { - m map[domain.PublicKey]Entry - lock sync.Mutex -} - -func NewRelayAddressCache() *RelayAddressCache { - return &RelayAddressCache{m: make(map[domain.PublicKey]Entry)} -} - -func (c *RelayAddressCache) Set(publicKey domain.PublicKey, addresses []domain.RelayAddress) { - c.lock.Lock() - defer c.lock.Unlock() - c.m[publicKey] = Entry{ - T: time.Now(), - Addresses: addresses, - } -} - -func (c *RelayAddressCache) Get(publicKey domain.PublicKey) (Entry, bool) { - c.lock.Lock() - defer c.lock.Unlock() - - v, ok := c.m[publicKey] - return v, ok -} - -type Entry struct { - T time.Time - Addresses []domain.RelayAddress + return result.List(), nil } diff --git a/service/app/app.go b/service/app/app.go index 348451a..ac866b1 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -142,7 +142,7 @@ type Metrics interface { ReportCallingTwitterAPIToPostATweet(err error) ReportCallingTwitterAPIToGetAUser(err error) ReportSubscriptionQueueLength(topic string, n int) - ReportPurplePagesLookupResult(err *error) + ReportPurplePagesLookupResult(address domain.RelayAddress, err *error) ReportTweetCreatedCountPerAccount(m map[accounts.AccountID]int) ReportNumberOfAccounts(count int) ReportNumberOfLinkedPublicKeys(count int)