From 5d129d5ced80633cb5808ae9f6e51d38dd79c427 Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Thu, 9 May 2024 18:41:40 +0200 Subject: [PATCH 01/10] Add periodic refresh for connection. Added `refresh_interval_seconds` to Meta's config. --- config/config.go | 9 +++++---- example-config.yaml | 2 ++ messagix/client.go | 21 ++++++++++++++++++++- user.go | 2 +- 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/config/config.go b/config/config.go index 85baee3..91dfab4 100644 --- a/config/config.go +++ b/config/config.go @@ -46,10 +46,11 @@ type Config struct { *bridgeconfig.BaseConfig `yaml:",inline"` Meta struct { - Mode BridgeMode `yaml:"mode"` - IGE2EE bool `yaml:"ig_e2ee"` - Proxy string `yaml:"proxy"` - GetProxyFrom string `yaml:"get_proxy_from"` + Mode BridgeMode `yaml:"mode"` + IGE2EE bool `yaml:"ig_e2ee"` + Proxy string `yaml:"proxy"` + GetProxyFrom string `yaml:"get_proxy_from"` + RefreshIntervalSeconds uint64 `yaml:"refresh_interval_seconds"` } `yaml:"meta"` Bridge BridgeConfig `yaml:"bridge"` diff --git a/example-config.yaml b/example-config.yaml index 213da63..151ba33 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -95,6 +95,8 @@ meta: # HTTP endpoint to request new proxy address from, for dynamically assigned proxies. # The endpoint must return a JSON body with a string field called proxy_url. get_proxy_from: + # Interval to force refresh the connection (disconnect and re-connect), default is 5 minutes. Set 0 to disable refreshes. + refresh_interval_seconds: 300 # Bridge config bridge: diff --git a/messagix/client.go b/messagix/client.go index 1bf14fe..eafa4dd 100644 --- a/messagix/client.go +++ b/messagix/client.go @@ -74,7 +74,7 @@ type Client struct { stopCurrentConnection atomic.Pointer[context.CancelFunc] } -func NewClient(cookies *cookies.Cookies, logger zerolog.Logger) *Client { +func NewClient(cookies *cookies.Cookies, logger zerolog.Logger, refreshIntervalSeconds uint64) *Client { if cookies.Platform == types.Unset { panic("messagix: platform must be set in cookies") } @@ -107,6 +107,11 @@ func NewClient(cookies *cookies.Cookies, logger zerolog.Logger) *Client { } cli.socket = cli.newSocketClient() + logger.Debug().Uint64("refresh_interval_seconds", refreshIntervalSeconds).Msg("Setting refresh interval") + if refreshIntervalSeconds > 0 { + go cli.refreshConnectionPeriodically(time.Duration(refreshIntervalSeconds) * time.Second) + } + return cli } @@ -263,6 +268,7 @@ func (c *Client) Disconnect() { (*fn)() } c.socket.Disconnect() + c.socket.conn = nil } func (c *Client) SaveSession(path string) error { @@ -388,3 +394,16 @@ func (c *Client) GetTaskId() int { c.activeTasks = append(c.activeTasks, id) return id } + +func (c *Client) refreshConnectionPeriodically(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for range ticker.C { + c.Logger.Info().Msg("Refreshing connection") + c.Disconnect() + err := c.Connect() + if err != nil { + c.Logger.Err(err).Msg("Error refreshing connection") + } + } +} diff --git a/user.go b/user.go index 0ec3550..2b4d2cb 100644 --- a/user.go +++ b/user.go @@ -602,7 +602,7 @@ func (user *User) unlockedConnectWithCookies(cookies *cookies.Cookies) error { log := user.log.With().Str("component", "messagix").Logger() user.log.Debug().Msg("Connecting to Meta") // TODO set proxy for media client? - cli := messagix.NewClient(cookies, log) + cli := messagix.NewClient(cookies, log, user.bridge.Config.Meta.RefreshIntervalSeconds) if user.bridge.Config.Meta.GetProxyFrom != "" || user.bridge.Config.Meta.Proxy != "" { cli.GetNewProxy = user.getProxy if !cli.UpdateProxy("connect") { From f0c6370b8e9d6228e27dfde873d1337096827cca Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Mon, 13 May 2024 15:01:30 +0200 Subject: [PATCH 02/10] Move refresh connection logic from client.go to user.go --- messagix/client.go | 20 +------------------- user.go | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/messagix/client.go b/messagix/client.go index eafa4dd..ff91e3d 100644 --- a/messagix/client.go +++ b/messagix/client.go @@ -74,7 +74,7 @@ type Client struct { stopCurrentConnection atomic.Pointer[context.CancelFunc] } -func NewClient(cookies *cookies.Cookies, logger zerolog.Logger, refreshIntervalSeconds uint64) *Client { +func NewClient(cookies *cookies.Cookies, logger zerolog.Logger) *Client { if cookies.Platform == types.Unset { panic("messagix: platform must be set in cookies") } @@ -107,11 +107,6 @@ func NewClient(cookies *cookies.Cookies, logger zerolog.Logger, refreshIntervalS } cli.socket = cli.newSocketClient() - logger.Debug().Uint64("refresh_interval_seconds", refreshIntervalSeconds).Msg("Setting refresh interval") - if refreshIntervalSeconds > 0 { - go cli.refreshConnectionPeriodically(time.Duration(refreshIntervalSeconds) * time.Second) - } - return cli } @@ -394,16 +389,3 @@ func (c *Client) GetTaskId() int { c.activeTasks = append(c.activeTasks, id) return id } - -func (c *Client) refreshConnectionPeriodically(interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - for range ticker.C { - c.Logger.Info().Msg("Refreshing connection") - c.Disconnect() - err := c.Connect() - if err != nil { - c.Logger.Err(err).Msg("Error refreshing connection") - } - } -} diff --git a/user.go b/user.go index 2b4d2cb..0f952f9 100644 --- a/user.go +++ b/user.go @@ -542,6 +542,19 @@ func (user *User) unlockedConnect() { } go user.sendMarkdownBridgeAlert(context.TODO(), "Failed to connect to %s: %v", user.bridge.ProtocolName, err) } + + var refreshIntervalSeconds = user.bridge.Config.Meta.RefreshIntervalSeconds + user.log.Debug().Uint64("refresh_interval_seconds", refreshIntervalSeconds).Msg("Setting refresh interval") + + if refreshIntervalSeconds > 0 { + go func() { + var refreshTimer = time.NewTimer(time.Duration(refreshIntervalSeconds) * time.Second) + <-refreshTimer.C + user.log.Info().Msg("Refreshing connection") + user.Disconnect() + user.Connect() + }() + } } func (user *User) Login(ctx context.Context, cookies *cookies.Cookies) error { @@ -602,7 +615,7 @@ func (user *User) unlockedConnectWithCookies(cookies *cookies.Cookies) error { log := user.log.With().Str("component", "messagix").Logger() user.log.Debug().Msg("Connecting to Meta") // TODO set proxy for media client? - cli := messagix.NewClient(cookies, log, user.bridge.Config.Meta.RefreshIntervalSeconds) + cli := messagix.NewClient(cookies, log) if user.bridge.Config.Meta.GetProxyFrom != "" || user.bridge.Config.Meta.Proxy != "" { cli.GetNewProxy = user.getProxy if !cli.UpdateProxy("connect") { From d9ae865131328114fd729117d082f769a1afc21f Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Mon, 13 May 2024 15:04:46 +0200 Subject: [PATCH 03/10] Increase default refresh_interval_seconds to 1 day --- example-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/example-config.yaml b/example-config.yaml index 151ba33..ed9d504 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -95,8 +95,8 @@ meta: # HTTP endpoint to request new proxy address from, for dynamically assigned proxies. # The endpoint must return a JSON body with a string field called proxy_url. get_proxy_from: - # Interval to force refresh the connection (disconnect and re-connect), default is 5 minutes. Set 0 to disable refreshes. - refresh_interval_seconds: 300 + # Interval to force refresh the connection (disconnect and re-connect), default is 1 day. Set 0 to disable refreshes. + refresh_interval_seconds: 86400 # Bridge config bridge: From d62b28ec5282e074e36f8b619208640f6db6cac3 Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Wed, 15 May 2024 13:46:23 +0200 Subject: [PATCH 04/10] Expose `MinFullReconnectInterval` in config as `min_full_reconnect_interval_seconds` and DRY'd condition --- config/config.go | 10 +++++----- example-config.yaml | 2 ++ portal.go | 2 +- user.go | 13 ++++++++----- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/config/config.go b/config/config.go index 91dfab4..a911939 100644 --- a/config/config.go +++ b/config/config.go @@ -46,11 +46,11 @@ type Config struct { *bridgeconfig.BaseConfig `yaml:",inline"` Meta struct { - Mode BridgeMode `yaml:"mode"` - IGE2EE bool `yaml:"ig_e2ee"` - Proxy string `yaml:"proxy"` - GetProxyFrom string `yaml:"get_proxy_from"` - RefreshIntervalSeconds uint64 `yaml:"refresh_interval_seconds"` + Mode BridgeMode `yaml:"mode"` + IGE2EE bool `yaml:"ig_e2ee"` + Proxy string `yaml:"proxy"` + GetProxyFrom string `yaml:"get_proxy_from"` + MinFullReconnectIntervalSeconds int `yaml:"min_full_reconnect_interval_seconds"` } `yaml:"meta"` Bridge BridgeConfig `yaml:"bridge"` diff --git a/example-config.yaml b/example-config.yaml index ed9d504..ee5fce1 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -97,6 +97,8 @@ meta: get_proxy_from: # Interval to force refresh the connection (disconnect and re-connect), default is 1 day. Set 0 to disable refreshes. refresh_interval_seconds: 86400 + # Minimum interval between full reconnects in seconds + min_full_reconnect_interval_seconds: 1 # Bridge config bridge: diff --git a/portal.go b/portal.go index 3334fb8..a556f8b 100644 --- a/portal.go +++ b/portal.go @@ -609,7 +609,7 @@ func (portal *Portal) handleMatrixMessage(ctx context.Context, sender *User, evt } else { ctx = context.WithValue(ctx, msgconvContextKeyClient, sender.Client) tasks, otid, err = portal.MsgConv.ToMeta(ctx, evt, content, relaybotFormatted) - if errors.Is(err, metaTypes.ErrPleaseReloadPage) && time.Since(sender.lastFullReconnect) > MinFullReconnectInterval { + if errors.Is(err, metaTypes.ErrPleaseReloadPage) && sender.canReconnect() { log.Err(err).Msg("Got please reload page error while converting message, reloading page in background") go sender.FullReconnect() err = errReloading diff --git a/user.go b/user.go index 0f952f9..7ec3996 100644 --- a/user.go +++ b/user.go @@ -60,7 +60,6 @@ var ( ErrNotLoggedIn = errors.New("not logged in") ) -const MinFullReconnectInterval = 1 * time.Hour const setDisconnectStateAfterConnectAttempts = 3 func (br *MetaBridge) GetUserByMXID(userID id.UserID) *User { @@ -1027,7 +1026,7 @@ func (user *User) e2eeEventHandler(rawEvt any) { } user.BridgeState.Send(user.waState) case *events.CATRefreshError: - if errors.Is(evt.Error, types.ErrPleaseReloadPage) && time.Since(user.lastFullReconnect) > MinFullReconnectInterval { + if errors.Is(evt.Error, types.ErrPleaseReloadPage) && user.canReconnect() { user.log.Err(evt.Error).Msg("Got CATRefreshError, reloading page") go user.FullReconnect() return @@ -1041,7 +1040,7 @@ func (user *User) e2eeEventHandler(rawEvt any) { go user.sendMarkdownBridgeAlert(context.TODO(), "Error in WhatsApp connection: %s", evt.PermanentDisconnectDescription()) case events.PermanentDisconnect: cf, ok := evt.(*events.ConnectFailure) - if ok && cf.Reason == events.ConnectFailureLoggedOut && time.Since(user.lastFullReconnect) > MinFullReconnectInterval { + if ok && cf.Reason == events.ConnectFailureLoggedOut && user.canReconnect() { user.log.Debug().Msg("Doing full reconnect after WhatsApp 401 error") go user.FullReconnect() } @@ -1146,7 +1145,7 @@ func (user *User) eventHandler(rawEvt any) { StateEvent: status.StateUnknownError, Error: MetaServerUnavailable, } - if time.Since(user.lastFullReconnect) > MinFullReconnectInterval { + if user.canReconnect() { user.log.Debug().Msg("Doing full reconnect after server unavailable error") go user.FullReconnect() } @@ -1196,10 +1195,14 @@ func (user *User) unlockedDisconnect() { user.metaState = status.BridgeState{} } +func (user *User) canReconnect() bool { + return time.Since(user.lastFullReconnect) > time.Duration(user.bridge.Config.Meta.MinFullReconnectIntervalSeconds)*time.Second +} + func (user *User) FullReconnect() { user.Lock() defer user.Unlock() - if time.Since(user.lastFullReconnect) < MinFullReconnectInterval { + if !user.canReconnect() { return } user.unlockedDisconnect() From 23ab631025a2497198667b64654e061845820cab Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Wed, 15 May 2024 13:48:54 +0200 Subject: [PATCH 05/10] Rename `refresh_interval_seconds` to `force_refresh_interval_seconds` and use `FullReconnect` for force refreshes --- config/config.go | 1 + example-config.yaml | 8 ++++---- user.go | 13 ++++++------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/config/config.go b/config/config.go index a911939..5e2ce4d 100644 --- a/config/config.go +++ b/config/config.go @@ -51,6 +51,7 @@ type Config struct { Proxy string `yaml:"proxy"` GetProxyFrom string `yaml:"get_proxy_from"` MinFullReconnectIntervalSeconds int `yaml:"min_full_reconnect_interval_seconds"` + ForceRefreshIntervalSeconds int `yaml:"force_refresh_interval_seconds"` } `yaml:"meta"` Bridge BridgeConfig `yaml:"bridge"` diff --git a/example-config.yaml b/example-config.yaml index ee5fce1..c2e4b71 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -95,10 +95,10 @@ meta: # HTTP endpoint to request new proxy address from, for dynamically assigned proxies. # The endpoint must return a JSON body with a string field called proxy_url. get_proxy_from: - # Interval to force refresh the connection (disconnect and re-connect), default is 1 day. Set 0 to disable refreshes. - refresh_interval_seconds: 86400 - # Minimum interval between full reconnects in seconds - min_full_reconnect_interval_seconds: 1 + # Minimum interval between full reconnects in seconds, default is 1 hour + min_full_reconnect_interval_seconds: 3600 + # Interval to force refresh the connection (full reconnect), default is 1 day. Set 0 to disable force refreshes. + force_refresh_interval_seconds: 86400 # Bridge config bridge: diff --git a/user.go b/user.go index 7ec3996..1b90c9e 100644 --- a/user.go +++ b/user.go @@ -542,16 +542,15 @@ func (user *User) unlockedConnect() { go user.sendMarkdownBridgeAlert(context.TODO(), "Failed to connect to %s: %v", user.bridge.ProtocolName, err) } - var refreshIntervalSeconds = user.bridge.Config.Meta.RefreshIntervalSeconds - user.log.Debug().Uint64("refresh_interval_seconds", refreshIntervalSeconds).Msg("Setting refresh interval") - - if refreshIntervalSeconds > 0 { + refreshInterval := time.Duration(user.bridge.Config.Meta.ForceRefreshIntervalSeconds) * time.Second + if refreshInterval > 0 { + user.log.Debug().Msgf("Connection will be refreshed at %s", time.Now().Add(refreshInterval).Format(time.RFC3339)) go func() { - var refreshTimer = time.NewTimer(time.Duration(refreshIntervalSeconds) * time.Second) + var refreshTimer = time.NewTimer(refreshInterval) + <-refreshTimer.C user.log.Info().Msg("Refreshing connection") - user.Disconnect() - user.Connect() + user.FullReconnect() }() } } From 69ac214e5e2d98a4091f8176d4dc556f4d408166 Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Wed, 15 May 2024 14:00:56 +0200 Subject: [PATCH 06/10] Cleanup --- messagix/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/messagix/client.go b/messagix/client.go index ff91e3d..1bf14fe 100644 --- a/messagix/client.go +++ b/messagix/client.go @@ -263,7 +263,6 @@ func (c *Client) Disconnect() { (*fn)() } c.socket.Disconnect() - c.socket.conn = nil } func (c *Client) SaveSession(path string) error { From d739b85030d7ac6213d9e13b0e60cc771b05197d Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Wed, 15 May 2024 14:17:54 +0200 Subject: [PATCH 07/10] Make sure to return before setting the refresh interval if somethings goes wrong while connecting --- user.go | 1 + 1 file changed, 1 insertion(+) diff --git a/user.go b/user.go index 1b90c9e..4f30917 100644 --- a/user.go +++ b/user.go @@ -540,6 +540,7 @@ func (user *User) unlockedConnect() { }) } go user.sendMarkdownBridgeAlert(context.TODO(), "Failed to connect to %s: %v", user.bridge.ProtocolName, err) + return } refreshInterval := time.Duration(user.bridge.Config.Meta.ForceRefreshIntervalSeconds) * time.Second From 7acec445901f584d2eed63b309733b0ce2d1d41e Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Wed, 15 May 2024 14:18:32 +0200 Subject: [PATCH 08/10] Stop refresh timer on `messagix.Event_SocketError` --- user.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/user.go b/user.go index 4f30917..3b1defd 100644 --- a/user.go +++ b/user.go @@ -239,6 +239,7 @@ type User struct { incomingTables chan *table.LSTable lastFullReconnect time.Time + forceRefreshTimer *time.Timer } var ( @@ -547,9 +548,9 @@ func (user *User) unlockedConnect() { if refreshInterval > 0 { user.log.Debug().Msgf("Connection will be refreshed at %s", time.Now().Add(refreshInterval).Format(time.RFC3339)) go func() { - var refreshTimer = time.NewTimer(refreshInterval) + user.forceRefreshTimer = time.NewTimer(refreshInterval) - <-refreshTimer.C + <-user.forceRefreshTimer.C user.log.Info().Msg("Refreshing connection") user.FullReconnect() }() @@ -1165,6 +1166,7 @@ func (user *User) eventHandler(rawEvt any) { user.BridgeState.Send(user.metaState) go user.sendMarkdownBridgeAlert(context.TODO(), "Error in %s connection: %v", user.bridge.ProtocolName, evt.Err) user.StopBackfillLoop() + user.forceRefreshTimer.Stop() default: user.log.Warn().Type("event_type", evt).Msg("Unrecognized event type from messagix") } From 1a056a488047c077f3d37b69977fd564b15b548e Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Wed, 15 May 2024 14:30:18 +0200 Subject: [PATCH 09/10] Follow linter rules: replace Msgf() --- user.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/user.go b/user.go index 3b1defd..98cb4d7 100644 --- a/user.go +++ b/user.go @@ -546,8 +546,8 @@ func (user *User) unlockedConnect() { refreshInterval := time.Duration(user.bridge.Config.Meta.ForceRefreshIntervalSeconds) * time.Second if refreshInterval > 0 { - user.log.Debug().Msgf("Connection will be refreshed at %s", time.Now().Add(refreshInterval).Format(time.RFC3339)) go func() { + user.log.Debug().Time("next_refresh", time.Now().Add(refreshInterval)).Msg("Setting force refresh timer") user.forceRefreshTimer = time.NewTimer(refreshInterval) <-user.forceRefreshTimer.C From ef497afde62b88beacf4aab175e658b52786cf69 Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Wed, 15 May 2024 14:31:13 +0200 Subject: [PATCH 10/10] Update user.go Co-authored-by: Tulir Asokan --- user.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/user.go b/user.go index 98cb4d7..3ba3c3f 100644 --- a/user.go +++ b/user.go @@ -1166,7 +1166,9 @@ func (user *User) eventHandler(rawEvt any) { user.BridgeState.Send(user.metaState) go user.sendMarkdownBridgeAlert(context.TODO(), "Error in %s connection: %v", user.bridge.ProtocolName, evt.Err) user.StopBackfillLoop() - user.forceRefreshTimer.Stop() + if user.forceRefreshTimer != nil { + user.forceRefreshTimer.Stop() + } default: user.log.Warn().Type("event_type", evt).Msg("Unrecognized event type from messagix") }