diff --git a/cmd/send-all-events-to-relay/main.go b/cmd/send-all-events-to-relay/main.go index c9aa850..3bf7c69 100644 --- a/cmd/send-all-events-to-relay/main.go +++ b/cmd/send-all-events-to-relay/main.go @@ -334,6 +334,9 @@ func (m mockMetrics) ReportRelayConnectionsState(v map[domain.RelayAddress]relay func (m mockMetrics) ReportNumberOfSubscriptions(address domain.RelayAddress, n int) { } +func (m mockMetrics) ReportRateLimitBackoffMs(address domain.RelayAddress, n int) { +} + func (m mockMetrics) ReportMessageReceived(address domain.RelayAddress, messageType relays.MessageType, err *error) { } diff --git a/service/adapters/prometheus/prometheus.go b/service/adapters/prometheus/prometheus.go index 208b3a0..9607b9d 100644 --- a/service/adapters/prometheus/prometheus.go +++ b/service/adapters/prometheus/prometheus.go @@ -46,6 +46,7 @@ type Prometheus struct { relayConnectionStateGauge *prometheus.GaugeVec receivedEventsCounter *prometheus.CounterVec relayConnectionSubscriptionsGauge *prometheus.GaugeVec + relayRateLimitBackoffMsGauge *prometheus.GaugeVec relayConnectionReceivedMessagesCounter *prometheus.CounterVec relayConnectionDisconnectionsCounter *prometheus.CounterVec storedRelayAddressesGauge prometheus.Gauge @@ -127,6 +128,13 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { }, []string{labelAddress}, ) + relayRateLimitBackoffMsGauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "relay_rate_limit_backoff_ms_gauge", + Help: "Rate limit wait in milliseconds.", + }, + []string{labelAddress}, + ) relayConnectionReceivedMessagesCounter := prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "relay_connection_received_messages_counter", @@ -173,6 +181,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { relayConnectionStateGauge, receivedEventsCounter, relayConnectionSubscriptionsGauge, + relayRateLimitBackoffMsGauge, relayConnectionReceivedMessagesCounter, relayConnectionDisconnectionsCounter, storedRelayAddressesGauge, @@ -210,6 +219,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { relayConnectionStateGauge: relayConnectionStateGauge, receivedEventsCounter: receivedEventsCounter, relayConnectionSubscriptionsGauge: relayConnectionSubscriptionsGauge, + relayRateLimitBackoffMsGauge: relayRateLimitBackoffMsGauge, relayConnectionReceivedMessagesCounter: relayConnectionReceivedMessagesCounter, relayConnectionDisconnectionsCounter: relayConnectionDisconnectionsCounter, storedRelayAddressesGauge: storedRelayAddressesGauge, @@ -260,6 +270,12 @@ func (p *Prometheus) ReportNumberOfSubscriptions(address domain.RelayAddress, n }).Set(float64(n)) } +func (p *Prometheus) ReportRateLimitBackoffMs(address domain.RelayAddress, n int) { + p.relayRateLimitBackoffMsGauge.With(prometheus.Labels{ + labelAddress: address.String(), + }).Set(float64(n)) +} + func (p *Prometheus) ReportMessageReceived(address domain.RelayAddress, messageType relays.MessageType, err *error) { labels := prometheus.Labels{ labelAddress: address.String(), diff --git a/service/domain/relays/rate_limit_notice_backoff_manager.go b/service/domain/relays/rate_limit_notice_backoff_manager.go index 661d24f..439c326 100644 --- a/service/domain/relays/rate_limit_notice_backoff_manager.go +++ b/service/domain/relays/rate_limit_notice_backoff_manager.go @@ -40,8 +40,16 @@ const maxBackoffMs = 10000 const secondsToDecreaseRateLimitNoticeCount = 60 * 5 // 5 minutes = 300 seconds func (r *RateLimitNoticeBackoffManager) Wait() { + backoffMs := r.GetBackoffMs() + + if backoffMs > 0 { + time.Sleep(time.Duration(backoffMs) * time.Millisecond) + } +} + +func (r *RateLimitNoticeBackoffManager) GetBackoffMs() int { if !r.IsSet() { - return + return 0 } backoffMs := int(math.Min(float64(maxBackoffMs), math.Pow(2, float64(r.rateLimitNoticeCount))*50)) @@ -52,9 +60,7 @@ func (r *RateLimitNoticeBackoffManager) Wait() { r.updateLastBumpTime() } - if backoffMs > 0 { - time.Sleep(time.Duration(backoffMs) * time.Millisecond) - } + return backoffMs } func (r *RateLimitNoticeBackoffManager) updateLastBumpTime() time.Time { diff --git a/service/domain/relays/relay_connection_test.go b/service/domain/relays/relay_connection_test.go index 47256bd..f9bed15 100644 --- a/service/domain/relays/relay_connection_test.go +++ b/service/domain/relays/relay_connection_test.go @@ -253,6 +253,9 @@ func (m2 mockMetrics) ReportRelayConnectionsState(m map[domain.RelayAddress]rela func (m2 mockMetrics) ReportNumberOfSubscriptions(address domain.RelayAddress, n int) { } +func (m2 mockMetrics) ReportRateLimitBackoffMs(address domain.RelayAddress, n int) { +} + func (m2 mockMetrics) ReportMessageReceived(address domain.RelayAddress, messageType relays.MessageType, err *error) { } diff --git a/service/domain/relays/relay_connections.go b/service/domain/relays/relay_connections.go index 4bf308b..aec3b7e 100644 --- a/service/domain/relays/relay_connections.go +++ b/service/domain/relays/relay_connections.go @@ -12,6 +12,7 @@ import ( type Metrics interface { ReportRelayConnectionsState(m map[domain.RelayAddress]RelayConnectionState) ReportNumberOfSubscriptions(address domain.RelayAddress, n int) + ReportRateLimitBackoffMs(address domain.RelayAddress, n int) ReportMessageReceived(address domain.RelayAddress, messageType MessageType, err *error) ReportRelayDisconnection(address domain.RelayAddress, err error) ReportNotice(address domain.RelayAddress, noticeType NoticeType) @@ -91,10 +92,21 @@ func (d *RelayConnections) storeMetrics() { m := make(map[domain.RelayAddress]RelayConnectionState) for _, connection := range d.connections { + rateLimitNoticeBackoffManager := d.getRateLimitNoticeBackoffManager(connection.Address()) + d.metrics.ReportRateLimitBackoffMs(connection.Address(), rateLimitNoticeBackoffManager.GetBackoffMs()) m[connection.Address()] = connection.State() } d.metrics.ReportRelayConnectionsState(m) } +func (r *RelayConnections) getRateLimitNoticeBackoffManager(relayAddress domain.RelayAddress) *RateLimitNoticeBackoffManager { + rateLimitNoticeBackoffManager, exists := r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()] + if !exists { + rateLimitNoticeBackoffManager = NewRateLimitNoticeBackoffManager() + r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()] = rateLimitNoticeBackoffManager + } + + return rateLimitNoticeBackoffManager +} // Notice that a single connection can serve multiple req. This can cause a too many concurrent requests error if not throttled. func (r *RelayConnections) getConnection(relayAddress domain.RelayAddress) *RelayConnection { @@ -110,11 +122,7 @@ func (r *RelayConnections) getConnection(relayAddress domain.RelayAddress) *Rela // Sometimes different addreses can point to the same relay. Example is // wss://feeds.nostr.band/video and wss://feeds.nostr.band/audio. For these // cases, we want to share the rate limit notice backoff manager. - rateLimitNoticeBackoffManager, exists := r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()] - if !exists { - rateLimitNoticeBackoffManager = NewRateLimitNoticeBackoffManager() - r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()] = rateLimitNoticeBackoffManager - } + rateLimitNoticeBackoffManager := r.getRateLimitNoticeBackoffManager(relayAddress) connection := NewRelayConnection(factory, rateLimitNoticeBackoffManager, r.logger, r.metrics) go connection.Run(r.longCtx)