Skip to content

Commit

Permalink
Improved reconnection process for Sonarr SignalR feeds.
Browse files Browse the repository at this point in the history
  • Loading branch information
morpheus65535 committed May 15, 2021
1 parent 85c2cbc commit 44dd478
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 70 deletions.
70 changes: 36 additions & 34 deletions bazarr/signalr_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,47 @@
class SonarrSignalrClient(threading.Thread):
def __init__(self):
super(SonarrSignalrClient, self).__init__()
self.stopped = True
self.apikey_sonarr = None
self.session = Session()
self.connection = None

def stop(self):
self.connection.close()
self.stopped = True
logging.info('BAZARR SignalR client for Sonarr is now disconnected.')

def restart(self):
if not self.stopped:
self.stop()
if settings.general.getboolean('use_sonarr'):
self.run()

def run(self):
def start(self):
if get_sonarr_version().startswith('2.'):
logging.warning('BAZARR can only sync from Sonarr v3 SignalR feed to get real-time update. You should '
'consider upgrading.')
return

logging.debug('BAZARR connecting to Sonarr SignalR feed...')
self.configure()
while not self.connection.is_open:
try:
self.connection.start()
except ConnectionError:
gevent.sleep(5)
logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.')
if not args.dev:
scheduler.execute_job_now('update_series')
scheduler.execute_job_now('sync_episodes')

def stop(self, log=True):
try:
self.connection.close()
except Exception as e:
pass
if log:
logging.info('BAZARR SignalR client for Sonarr is now disconnected.')

def restart(self):
if self.connection.is_open:
self.stop(log=False)
if settings.general.getboolean('use_sonarr'):
self.start()

def exception_handler(self, type, exception, traceback):
logging.error('BAZARR connection to Sonarr SignalR feed has been lost. Reconnecting...')
self.restart()

def configure(self):
self.apikey_sonarr = settings.sonarr.apikey
self.connection = Connection(url_sonarr() + "/signalr", self.session)
self.connection.qs = {'apikey': self.apikey_sonarr}
Expand All @@ -51,25 +71,7 @@ def run(self):
for item in sonarr_method:
sonarr_hub.client.on(item, dispatcher)

while True:
if not self.stopped:
return
if self.connection.started:
gevent.sleep(5)
else:
try:
logging.debug('BAZARR connecting to Sonarr SignalR feed...')
self.connection.start()
except ConnectionError:
logging.error('BAZARR connection to Sonarr SignalR feed has been lost. Reconnecting...')
gevent.sleep(15)
else:
self.stopped = False
logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.')
if not args.dev:
scheduler.execute_job_now('update_series')
scheduler.execute_job_now('sync_episodes')
gevent.sleep()
self.connection.exception += self.exception_handler


class RadarrSignalrClient(threading.Thread):
Expand All @@ -82,7 +84,8 @@ def start(self):
self.configure()
logging.debug('BAZARR connecting to Radarr SignalR feed...')
self.connection.start()
gevent.sleep()
if not args.dev:
scheduler.execute_job_now('update_movies')

def stop(self):
logging.info('BAZARR SignalR client for Radarr is now disconnected.')
Expand All @@ -92,7 +95,6 @@ def restart(self):
if self.connection.transport.state.value in [0, 1, 2]:
self.stop()
if settings.general.getboolean('use_radarr'):
self.configure()
self.start()

def configure(self):
Expand Down
7 changes: 1 addition & 6 deletions libs/signalr/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
from gevent import monkey

monkey.patch_socket()
monkey.patch_ssl()

from ._connection import Connection

__version__ = '0.0.7'
__version__ = '0.0.12'
32 changes: 20 additions & 12 deletions libs/signalr/_connection.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import gevent
import sys
from threading import Thread
from signalr.events import EventHook
from signalr.hubs import Hub
from signalr.transports import AutoTransport
Expand All @@ -14,13 +15,16 @@ def __init__(self, url, session):
self.qs = {}
self.__send_counter = -1
self.token = None
self.id = None
self.data = None
self.received = EventHook()
self.error = EventHook()
self.starting = EventHook()
self.stopping = EventHook()
self.exception = EventHook()
self.is_open = False
self.__transport = AutoTransport(session, self)
self.__greenlet = None
self.__listener_thread = None
self.started = False

def handle_error(**kwargs):
Expand All @@ -46,28 +50,32 @@ def start(self):

negotiate_data = self.__transport.negotiate()
self.token = negotiate_data['ConnectionToken']
self.id = negotiate_data['ConnectionId']

listener = self.__transport.start()

def wrapped_listener():
try:
listener()
gevent.sleep()
except Exception as e:
gevent.kill(self.__greenlet)
self.started = False

self.__greenlet = gevent.spawn(wrapped_listener)
while self.is_open:
try:
listener()
except:
self.exception.fire(*sys.exc_info())
self.is_open = False

self.is_open = True
self.__listener_thread = Thread(target=wrapped_listener)
self.__listener_thread.start()
self.started = True

def wait(self, timeout=30):
gevent.joinall([self.__greenlet], timeout)
Thread.join(self.__listener_thread, timeout)

def send(self, data):
self.__transport.send(data)

def close(self):
gevent.kill(self.__greenlet)
self.is_open = False
self.__listener_thread.join()
self.__transport.close()

def register_hub(self, name):
Expand Down
15 changes: 8 additions & 7 deletions libs/signalr/transports/_sse_transport.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import sseclient
from ._transport import Transport
from requests.exceptions import ConnectionError


class ServerSentEventsTransport(Transport):
Expand All @@ -13,16 +12,18 @@ def _get_name(self):
return 'serverSentEvents'

def start(self):
self.__response = sseclient.SSEClient(self._get_url('connect'), session=self._session)
connect_url = self._get_url('connect')
self.__response = iter(sseclient.SSEClient(connect_url, session=self._session))
self._session.get(self._get_url('start'))

def _receive():
try:
for notification in self.__response:
if notification.data != 'initialized':
self._handle_notification(notification.data)
except ConnectionError:
raise ConnectionError
notification = next(self.__response)
except StopIteration:
return
else:
if notification.data != 'initialized':
self._handle_notification(notification.data)

return _receive

Expand Down
5 changes: 2 additions & 3 deletions libs/signalr/transports/_transport.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from abc import abstractmethod
import json
import sys

import threading
if sys.version_info[0] < 3:
from urllib import quote_plus
else:
from urllib.parse import quote_plus

import gevent


class Transport:
Expand Down Expand Up @@ -48,7 +47,7 @@ def _handle_notification(self, message):
if len(message) > 0:
data = json.loads(message)
self._connection.received.fire(**data)
gevent.sleep()
#thread.sleep() #TODO: investigate if we should sleep here

def _get_url(self, action, **kwargs):
args = kwargs.copy()
Expand Down
10 changes: 3 additions & 7 deletions libs/signalr/transports/_ws_transport.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import sys

import gevent

if sys.version_info[0] < 3:
from urlparse import urlparse, urlunparse
Expand Down Expand Up @@ -39,17 +38,14 @@ def start(self):
self._session.get(self._get_url('start'))

def _receive():
try:
for notification in self.ws:
self._handle_notification(notification)
except ConnectionError:
raise ConnectionError
notification = self.ws.recv()
self._handle_notification(notification)

return _receive

def send(self, data):
self.ws.send(json.dumps(data))
gevent.sleep()
#thread.sleep() #TODO: inveistage if we should sleep here or not

def close(self):
self.ws.close()
Expand Down
2 changes: 1 addition & 1 deletion libs/version.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rarfile=3.0
rebulk=3.0.1
requests=2.18.4
semver=2.13.0
signalr-client=0.0.7 <-- Modified to work with Sonarr
signalr-client-threads=0.0.12 <-- Modified to work with Sonarr
signalrcore=0.9.2 <-- https://github.com/mandrewcito/signalrcore/pull/60
SimpleConfigParser=0.1.0 <-- modified version: do not update!!!
six=1.11.0
Expand Down

0 comments on commit 44dd478

Please sign in to comment.