From f0140a49b5481acb97127a95c02426bb3a4c7384 Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Thu, 14 Mar 2024 11:46:05 -0300 Subject: [PATCH] Changes to hunt timeout issue --- service/app/handler_process_saved_event.go | 3 ++- service/domain/downloader/downloader.go | 3 +++ service/domain/relays/relay_connection.go | 21 +++++++++++++++++++-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/service/app/handler_process_saved_event.go b/service/app/handler_process_saved_event.go index d0b915f..b6080be 100644 --- a/service/app/handler_process_saved_event.go +++ b/service/app/handler_process_saved_event.go @@ -26,7 +26,8 @@ var ( ) ) -const sendEventToRelayTimeout = 6 * time.Second +// const sendEventToRelayTimeout = 6 * time.Second +const sendEventToRelayTimeout = 30 * time.Second type ProcessSavedEvent struct { id domain.EventId diff --git a/service/domain/downloader/downloader.go b/service/domain/downloader/downloader.go index 7b4fdd6..4783964 100644 --- a/service/domain/downloader/downloader.go +++ b/service/domain/downloader/downloader.go @@ -178,6 +178,9 @@ func (d *Downloader) updateDownloaders(ctx context.Context) error { return errors.Wrap(err, "error getting relays") } + // Test if removing the relay for reads solves the timeout issue + relays.Delete(domain.MustNewRelayAddress("wss://relay.nos.social")) + d.relayDownloadersLock.Lock() defer d.relayDownloadersLock.Unlock() diff --git a/service/domain/relays/relay_connection.go b/service/domain/relays/relay_connection.go index f20cc7e..24c308a 100644 --- a/service/domain/relays/relay_connection.go +++ b/service/domain/relays/relay_connection.go @@ -101,10 +101,17 @@ func (r *RelayConnection) Run(ctx context.Context) { r.metrics.ReportRelayDisconnection(r.connectionFactory.Address(), err) + backoff := r.backoffManager.GetReconnectionBackoff(err) + + // We control relay.nos.social, so we can be more aggressive with the backoff + if r.Address().String() == "wss://relay.nos.social" { + backoff = 1 * time.Minute + } + select { case <-ctx.Done(): return - case <-time.After(r.backoffManager.GetReconnectionBackoff(err)): + case <-time.After(backoff): continue } } @@ -150,7 +157,16 @@ func (r *RelayConnection) GetEvents(ctx context.Context, filter domain.Filter) ( return ch, nil } -// pushes the event to the eventsToSend channel and blocks until a sendEventResponse is received +// SendEvent schedules an event to be sent to the relay. It does so by creating +// and adding an eventToSendRequest to the `eventsToSend` map, which manages +// pending requests for each event. This approach prevents multiple requests for +// sending the same event from being initiated. Each eventToSendRequest includes +// a `ch` channel, which receives a sendEventResponse indicating the success or +// failure of the send operation. This success signal is send to the ch channel +// through the `passSendEventResponseToChannel` function after an OK message is +// received. Additionally, the event is enqueued into `newEventsCh`, a channel +// that is monitored by a loop within `sendEvents`, to manage the actual +// transmission of the event to the relay. func (r *RelayConnection) SendEvent(ctx context.Context, event domain.Event) error { ctx, cancel := context.WithTimeout(ctx, sendEventTimeout) defer cancel() @@ -303,6 +319,7 @@ func (r *RelayConnection) run(ctx context.Context) error { }() go func() { + // Loops through newEventsCh and sends events to the relay if err := r.sendEvents(ctx, conn); err != nil { if !r.writeErrorShouldNotBeLogged(err) { r.logger.Error().