Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limit from notice warnings #80

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/send-all-events-to-relay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,6 @@ func (m mockMetrics) ReportMessageReceived(address domain.RelayAddress, messageT

func (m mockMetrics) ReportRelayDisconnection(address domain.RelayAddress, err error) {
}

func (m mockMetrics) ReportNotice(address domain.RelayAddress, noticeType relays.NoticeType) {
}
11 changes: 11 additions & 0 deletions internal/logging/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/exec"
"reflect"
"runtime"
"time"

"github.com/davecgh/go-spew/spew"
)
Expand Down Expand Up @@ -202,6 +203,16 @@ func (d devNullLoggerEntry) WithField(key string, v any) Entry {
func (d devNullLoggerEntry) Message(msg string) {
}

// logPeriodically executes the passed action function if the current time in milliseconds
// modulo logInterval equals zero. This approach allows executing the action periodically,
// approximating the execution to happen once every `logInterval` milliseconds.
func LogPeriodically(action func(), logInterval int64) {
currentTimeMillis := time.Now().UnixNano() / int64(time.Millisecond)
if currentTimeMillis%logInterval == 0 {
action()
}
}

func Inspect(args ...interface{}) {
for _, arg := range args {
val := reflect.ValueOf(arg)
Expand Down
4 changes: 4 additions & 0 deletions service/adapters/mocks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/planetary-social/nos-event-service/service/app"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/relays"
)

type Metrics struct {
Expand Down Expand Up @@ -39,6 +40,9 @@ func (m Metrics) ReportNumberOfStoredEvents(n int) {
func (m Metrics) ReportEventSentToRelay(address domain.RelayAddress, decision app.SendEventToRelayDecision, result app.SendEventToRelayResult) {
}

func (m Metrics) ReportNotice(address domain.RelayAddress, noticeType relays.NoticeType) {
}

type ApplicationCall struct {
}

Expand Down
20 changes: 19 additions & 1 deletion service/adapters/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ const (

labelReason = "reason"

labelDecision = "decision"
labelDecision = "decision"
labelNoticeType = "noticeType"
)

type Prometheus struct {
noticeTypeCounter *prometheus.CounterVec
applicationHandlerCallsCounter *prometheus.CounterVec
applicationHandlerCallDurationHistogram *prometheus.HistogramVec
relayDownloadersGauge prometheus.Gauge
Expand All @@ -56,6 +58,13 @@ type Prometheus struct {
}

func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
noticeTypeCounter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "notice_type_total",
Help: "Total number of notices per notice type.",
},
[]string{labelAddress, labelNoticeType},
)
applicationHandlerCallsCounter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "application_handler_calls_total",
Expand Down Expand Up @@ -154,6 +163,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {

reg := prometheus.NewRegistry()
for _, v := range []prometheus.Collector{
noticeTypeCounter,
applicationHandlerCallsCounter,
applicationHandlerCallDurationHistogram,
versionGague,
Expand Down Expand Up @@ -191,6 +201,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
}

return &Prometheus{
noticeTypeCounter: noticeTypeCounter,
applicationHandlerCallsCounter: applicationHandlerCallsCounter,
applicationHandlerCallDurationHistogram: applicationHandlerCallDurationHistogram,
relayDownloadersGauge: relayDownloadersGauge,
Expand Down Expand Up @@ -281,6 +292,13 @@ func (p *Prometheus) ReportEventSentToRelay(address domain.RelayAddress, decisio
}).Inc()
}

func (p *Prometheus) ReportNotice(address domain.RelayAddress, noticeType relays.NoticeType) {
p.noticeTypeCounter.With(prometheus.Labels{
labelAddress: address.String(),
labelNoticeType: string(noticeType),
}).Inc()
}

func (p *Prometheus) Registry() *prometheus.Registry {
return p.registry
}
Expand Down
2 changes: 1 addition & 1 deletion service/domain/downloader/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, kinds []domain.E
func (t *TimeWindowTaskGenerator) maybeGenerateNewTracker(ctx context.Context) (*TimeWindowTaskTracker, bool, error) {
nextWindow := t.lastWindow.Advance()
now := t.currentTimeProvider.GetCurrentTime()
if nextWindow.End().After(now.Add(-time.Minute)) {
if nextWindow.End().After(now.Add(-windowSize)) {
return nil, false, nil
}
t.lastWindow = nextWindow
Expand Down
Loading
Loading