Skip to content

Commit

Permalink
Collect extra metrics (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Nov 13, 2023
1 parent ae4bc4e commit 22f91fe
Show file tree
Hide file tree
Showing 16 changed files with 314 additions and 53 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ Go-related metrics are available. We also have custom metrics:
- `public_key_downloader_relays_count`
- `relay_connection_state`
- `twitter_api_calls`
- `accounts_count`
- `linked_public_keys_count`

See `service/adapters/prometheus`.

Expand Down
1 change: 1 addition & 0 deletions cmd/crossposting-service/di/inject_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ var applicationSet = wire.NewSet(
app.NewGetTwitterAccountDetailsHandler,
app.NewLogoutHandler,
app.NewUnlinkPublicKeyHandler,
app.NewUpdateMetricsHandler,
)
2 changes: 2 additions & 0 deletions cmd/crossposting-service/di/inject_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/planetary-social/nos-crossposting-service/service/ports/http"
"github.com/planetary-social/nos-crossposting-service/service/ports/http/frontend"
"github.com/planetary-social/nos-crossposting-service/service/ports/memorypubsub"
"github.com/planetary-social/nos-crossposting-service/service/ports/timer"
)

var portsSet = wire.NewSet(
Expand All @@ -13,4 +14,5 @@ var portsSet = wire.NewSet(
frontend.NewFrontendFileSystem,

memorypubsub.NewReceivedEventSubscriber,
timer.NewMetrics,
)
4 changes: 3 additions & 1 deletion cmd/crossposting-service/di/inject_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ var memoryPubsubSet = wire.NewSet(

var sqlitePubsubSet = wire.NewSet(
sqlitepubsubport.NewTweetCreatedEventSubscriber,
sqlite.NewSubscriber,
sqlite.NewPubSub,

sqlite.NewSubscriber,
wire.Bind(new(app.Subscriber), new(*sqlite.Subscriber)),
)

var sqliteTxPubsubSet = wire.NewSet(
Expand Down
9 changes: 9 additions & 0 deletions cmd/crossposting-service/di/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/planetary-social/nos-crossposting-service/service/ports/http"
"github.com/planetary-social/nos-crossposting-service/service/ports/memorypubsub"
"github.com/planetary-social/nos-crossposting-service/service/ports/sqlitepubsub"
"github.com/planetary-social/nos-crossposting-service/service/ports/timer"
)

type Service struct {
Expand All @@ -19,6 +20,7 @@ type Service struct {
downloader *app.Downloader
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber
tweetCreatedEventSubscriber *sqlitepubsub.TweetCreatedEventSubscriber
metricsTimer *timer.Metrics
migrationsRunner *migrations.Runner
migrations migrations.Migrations
migrationsProgressCallback migrations.ProgressCallback
Expand All @@ -31,6 +33,7 @@ func NewService(
downloader *app.Downloader,
receivedEventSubscriber *memorypubsub.ReceivedEventSubscriber,
tweetCreatedEventSubscriber *sqlitepubsub.TweetCreatedEventSubscriber,
metricsTimer *timer.Metrics,
migrationsRunner *migrations.Runner,
migrations migrations.Migrations,
migrationsProgressCallback migrations.ProgressCallback,
Expand All @@ -42,6 +45,7 @@ func NewService(
downloader: downloader,
receivedEventSubscriber: receivedEventSubscriber,
tweetCreatedEventSubscriber: tweetCreatedEventSubscriber,
metricsTimer: metricsTimer,
migrationsRunner: migrationsRunner,
migrations: migrations,
migrationsProgressCallback: migrationsProgressCallback,
Expand Down Expand Up @@ -88,6 +92,11 @@ func (s Service) Run(ctx context.Context) error {
errCh <- errors.Wrap(s.tweetCreatedEventSubscriber.Run(ctx), "tweet created event subscriber error")
}()

runners++
go func() {
errCh <- errors.Wrap(s.metricsTimer.Run(ctx), "metrics timer error")
}()

var err error
for i := 0; i < runners; i++ {
err = multierror.Append(err, errors.Wrap(<-errCh, "error returned by runner"))
Expand Down
10 changes: 7 additions & 3 deletions cmd/crossposting-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions service/adapters/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Prometheus struct {
twitterAPICallsCounter *prometheus.CounterVec
purplePagesLookupResultCounter *prometheus.CounterVec
tweetCreatedCountPerAccountGauge *prometheus.GaugeVec
numberOfAccountsGauge prometheus.Gauge
numberOfLinkedPublicKeysGauge prometheus.Gauge

registry *prometheus.Registry

Expand Down Expand Up @@ -126,6 +128,18 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
},
[]string{labelAccountID},
)
numberOfAccountsGauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "accounts_count",
Help: "Number of accounts.",
},
)
numberOfLinkedPublicKeysGauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "linked_public_keys_count",
Help: "Number of linked public keys.",
},
)

reg := prometheus.NewRegistry()
for _, v := range []prometheus.Collector{
Expand All @@ -139,6 +153,8 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
twitterAPICallsCounter,
purplePagesLookupResultCounter,
tweetCreatedCountPerAccountGauge,
numberOfAccountsGauge,
numberOfLinkedPublicKeysGauge,

collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
collectors.NewGoCollector(),
Expand Down Expand Up @@ -173,6 +189,8 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
twitterAPICallsCounter: twitterAPICallsCounter,
purplePagesLookupResultCounter: purplePagesLookupResultCounter,
tweetCreatedCountPerAccountGauge: tweetCreatedCountPerAccountGauge,
numberOfAccountsGauge: numberOfAccountsGauge,
numberOfLinkedPublicKeysGauge: numberOfLinkedPublicKeysGauge,

registry: reg,

Expand Down Expand Up @@ -252,6 +270,14 @@ func (p *Prometheus) ReportTweetCreatedCountPerAccount(m map[accounts.AccountID]
}
}

func (p *Prometheus) ReportNumberOfAccounts(count int) {
p.numberOfAccountsGauge.Set(float64(count))
}

func (p *Prometheus) ReportNumberOfLinkedPublicKeys(count int) {
p.numberOfLinkedPublicKeysGauge.Set(float64(count))
}

type ApplicationCall struct {
handlerName string
p *Prometheus
Expand Down
11 changes: 11 additions & 0 deletions service/adapters/sqlite/account_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ ON CONFLICT(account_id) DO UPDATE SET
return nil
}

func (m *AccountRepository) Count() (int, error) {
row := m.tx.QueryRow("SELECT COUNT(*) FROM accounts")

var count int
if err := row.Scan(&count); err != nil {
return 0, errors.Wrap(err, "row scan error")
}

return count, nil
}

func (m *AccountRepository) readAccount(result *sql.Row) (*accounts.Account, error) {
var accountIDtmp string
var twitterIDtmp int64
Expand Down
58 changes: 58 additions & 0 deletions service/adapters/sqlite/account_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,61 @@ func TestAccountRepository_ItIsPossibleToRetrieveSavedData(t *testing.T) {
})
require.NoError(t, err)
}

func TestAccountRepository_CountReturnsNumberOfAccounts(t *testing.T) {
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.AccountRepository.Count()
require.NoError(t, err)
require.Equal(t, 0, n)

return nil
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
accountID := fixtures.SomeAccountID()
twitterID := fixtures.SomeTwitterID()

account, err := accounts.NewAccount(accountID, twitterID)
require.NoError(t, err)
err = adapters.AccountRepository.Save(account)
require.NoError(t, err)

return nil
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.AccountRepository.Count()
require.NoError(t, err)
require.Equal(t, 1, n)

return nil
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
accountID := fixtures.SomeAccountID()
twitterID := fixtures.SomeTwitterID()

account, err := accounts.NewAccount(accountID, twitterID)
require.NoError(t, err)
err = adapters.AccountRepository.Save(account)
require.NoError(t, err)

return nil
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.AccountRepository.Count()
require.NoError(t, err)
require.Equal(t, 2, n)

return nil
})
require.NoError(t, err)
}
11 changes: 11 additions & 0 deletions service/adapters/sqlite/public_key_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ WHERE account_id = $1`,
return m.readPublicKeys(rows)
}

func (m *PublicKeyRepository) Count() (int, error) {
row := m.tx.QueryRow("SELECT COUNT(*) FROM public_keys")

var count int
if err := row.Scan(&count); err != nil {
return 0, errors.Wrap(err, "row scan error")
}

return count, nil
}

func (m *PublicKeyRepository) readPublicKeys(rows *sql.Rows) ([]*domain.LinkedPublicKey, error) {
var results []*domain.LinkedPublicKey
for rows.Next() {
Expand Down
58 changes: 58 additions & 0 deletions service/adapters/sqlite/public_key_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,61 @@ func TestPublicKeyRepository_DeletingPublicKeysDeletesThem(t *testing.T) {
})
require.NoError(t, err)
}

func TestPublicKeyRepository_CountCountsPublicKeys(t *testing.T) {
ctx := fixtures.TestContext(t)
adapters := NewTestAdapters(ctx, t)

accountID := fixtures.SomeAccountID()
twitterID := fixtures.SomeTwitterID()

account, err := accounts.NewAccount(accountID, twitterID)
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.AccountRepository.Save(account)
require.NoError(t, err)

return nil
})
require.NoError(t, err)

createdAt := time.Now()
publicKey1 := fixtures.SomePublicKey()
publicKey2 := fixtures.SomePublicKey()

linkedPublicKey1, err := domain.NewLinkedPublicKey(accountID, publicKey1, createdAt)
require.NoError(t, err)

linkedPublicKey2, err := domain.NewLinkedPublicKey(accountID, publicKey2, createdAt)
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.PublicKeyRepository.Count()
require.NoError(t, err)
require.Equal(t, 0, n)

return nil
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
err = adapters.PublicKeyRepository.Save(linkedPublicKey1)
require.NoError(t, err)

err = adapters.PublicKeyRepository.Save(linkedPublicKey2)
require.NoError(t, err)

return nil
})
require.NoError(t, err)

err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error {
n, err := adapters.PublicKeyRepository.Count()
require.NoError(t, err)
require.Equal(t, 2, n)

return nil
})
require.NoError(t, err)
}

0 comments on commit 22f91fe

Please sign in to comment.