Skip to content

Commit

Permalink
Added a ForcedSearchQueue scheduler for processing: forced search (an…
Browse files Browse the repository at this point in the history
…d manual search) queueitems and FailedQueueItems. These can be run in parallel with the searchQueueScheduler thread (backlog and daily).

* Fixes regarding checking if a forceSearch is currently running.
* Including some name changes from manual search to forced search.
* Added docstrings
  • Loading branch information
p0psicles committed Apr 19, 2016
1 parent b776d65 commit d999e08
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 58 deletions.
10 changes: 5 additions & 5 deletions gui/slick/views/manage_manageSearches.mako
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ ${('Not in progress', 'In Progress')[dailySearchStatus]}<br>
<br>
<h3>Search Queue:</h3>
Backlog: <i>${queueLength['backlog']} pending items</i><br>
Daily: <i>${queueLength['daily']} pending items</i><br>
Forced: <i>${queueLength['forced_search']} pending items</i><br>
Manual: <i>${queueLength['manual_search']} pending items</i><br>
Failed: <i>${queueLength['failed']} pending items</i><br>
Backlog: <i>${searchQueueLength['backlog']} pending items</i><br>
Daily: <i>${searchQueueLength['daily']} pending items</i><br>
Forced: <i>${forcedSearchQueueLength['forced_search']} pending items</i><br>
Manual: <i>${forcedSearchQueueLength['manual_search']} pending items</i><br>
Failed: <i>${forcedSearchQueueLength['failed']} pending items</i><br>
</div>
</div>
</%block>
12 changes: 11 additions & 1 deletion sickbeard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
versionCheckScheduler = None
showQueueScheduler = None
searchQueueScheduler = None
forcedSearchQueueScheduler = None
manualSnatchScheduler = None
properFinderScheduler = None
autoPostProcesserScheduler = None
Expand Down Expand Up @@ -631,7 +632,7 @@ def initialize(consoleLogging=True): # pylint: disable=too-many-locals, too-man
USE_PUSHBULLET, PUSHBULLET_NOTIFY_ONSNATCH, PUSHBULLET_NOTIFY_ONDOWNLOAD, PUSHBULLET_NOTIFY_ONSUBTITLEDOWNLOAD, PUSHBULLET_API, PUSHBULLET_DEVICE, \
versionCheckScheduler, VERSION_NOTIFY, AUTO_UPDATE, NOTIFY_ON_UPDATE, PROCESS_AUTOMATICALLY, NO_DELETE, UNPACK, CPU_PRESET, \
KEEP_PROCESSED_DIR, PROCESS_METHOD, DELRARCONTENTS, TV_DOWNLOAD_DIR, UPDATE_FREQUENCY, \
showQueueScheduler, searchQueueScheduler, manualSnatchScheduler, ROOT_DIRS, CACHE_DIR, ACTUAL_CACHE_DIR, TIMEZONE_DISPLAY, \
showQueueScheduler, searchQueueScheduler, forcedSearchQueueScheduler, manualSnatchScheduler, ROOT_DIRS, CACHE_DIR, ACTUAL_CACHE_DIR, TIMEZONE_DISPLAY, \
NAMING_PATTERN, NAMING_MULTI_EP, NAMING_ANIME_MULTI_EP, NAMING_FORCE_FOLDERS, NAMING_ABD_PATTERN, NAMING_CUSTOM_ABD, NAMING_SPORTS_PATTERN, NAMING_CUSTOM_SPORTS, NAMING_ANIME_PATTERN, NAMING_CUSTOM_ANIME, NAMING_STRIP_YEAR, \
RENAME_EPISODES, AIRDATE_EPISODES, FILE_TIMESTAMP_TIMEZONE, properFinderScheduler, PROVIDER_ORDER, autoPostProcesserScheduler, \
providerList, newznabProviderList, torrentRssProviderList, \
Expand Down Expand Up @@ -1480,6 +1481,10 @@ def path_leaf(path):
cycleTime=datetime.timedelta(seconds=3),
threadName="SEARCHQUEUE")

forcedSearchQueueScheduler = scheduler.Scheduler(search_queue.ForcedSearchQueue(),
cycleTime=datetime.timedelta(seconds=3),
threadName="FORCEDSEARCHQUEUE")

# TODO: update_interval should take last daily/backlog times into account!
update_interval = datetime.timedelta(minutes=DAILYSEARCH_FREQUENCY)
dailySearchScheduler = scheduler.Scheduler(dailysearcher.DailySearcher(),
Expand Down Expand Up @@ -1564,6 +1569,10 @@ def start():
searchQueueScheduler.enable = True
searchQueueScheduler.start()

# start the forced search queue checker
forcedSearchQueueScheduler.enable = True
forcedSearchQueueScheduler.start()

# start the search queue checker
manualSnatchScheduler.enable = True
manualSnatchScheduler.start()
Expand Down Expand Up @@ -1623,6 +1632,7 @@ def halt():
versionCheckScheduler,
showQueueScheduler,
searchQueueScheduler,
forcedSearchQueueScheduler,
manualSnatchScheduler,
autoPostProcesserScheduler,
traktCheckerScheduler,
Expand Down
3 changes: 1 addition & 2 deletions sickbeard/dailysearcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ def run(self, force=False): # pylint:disable=too-many-branches
logger.log(u"Daily search is still running, not starting it again", logger.DEBUG)
return

if sickbeard.searchQueueScheduler.action.is_manualsearch_in_progress():
if sickbeard.forcedSearchQueueScheduler.action.is_forced_search_in_progress():
logger.log(u"Manual search is running. Can't start Daily search", logger.WARNING)
return


self.amActive = True
_ = force
logger.log(u"Searching for new released episodes ...")
Expand Down
2 changes: 1 addition & 1 deletion sickbeard/failedProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def process(self):
segment = parsed.show.getEpisode(parsed.season_number, episode)

cur_failed_queue_item = search_queue.FailedQueueItem(parsed.show, [segment])
sickbeard.searchQueueScheduler.action.add_item(cur_failed_queue_item)
sickbeard.forcedSearchQueueScheduler.action.add_item(cur_failed_queue_item)

return True

Expand Down
14 changes: 7 additions & 7 deletions sickbeard/manual_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,21 @@ def getEpisodes(search_thread, searchstatus):

def collectEpisodesFromSearchThread(show):
"""
Collects all episodes from from the searchQueueScheduler and looks for episodes that are in status queued or searching.
Collects all episodes from from the forcedSearchQueueScheduler
and looks for episodes that are in status queued or searching.
If episodes are found in FORCED_SEARCH_HISTORY, these are set to status finished.
"""
episodes = []

# Queued Searches
searchstatus = SEARCH_STATUS_QUEUED
for search_thread in sickbeard.searchQueueScheduler.action.get_all_ep_from_queue(show):
for search_thread in sickbeard.forcedSearchQueueScheduler.action.get_all_ep_from_queue(show):
episodes += getEpisodes(search_thread, searchstatus)

# Running Searches
searchstatus = SEARCH_STATUS_SEARCHING
if sickbeard.searchQueueScheduler.action.is_manualsearch_in_progress():
search_thread = sickbeard.searchQueueScheduler.action.currentItem
if sickbeard.forcedSearchQueueScheduler.action.is_forced_search_in_progress():
search_thread = sickbeard.forcedSearchQueueScheduler.action.currentItem

if search_thread.success:
searchstatus = SEARCH_STATUS_FINISHED
Expand Down Expand Up @@ -162,7 +163,6 @@ def get_provider_cache_results(indexer, show_all_results=None, perform_search=No
manual_search_type = search_show.get('manual_search_type')
sql_episode = '' if manual_search_type == 'season' else episode


down_cur_quality = 0
show_obj = Show.find(sickbeard.showList, int(show))

Expand Down Expand Up @@ -195,7 +195,7 @@ def get_provider_cache_results(indexer, show_all_results=None, perform_search=No
if not int(show_all_results):
sql_return = main_db_con.select(common_sql + additional_sql,
(cur_provider.provider_type.title(), cur_provider.image_name(),
cur_provider.name, cur_provider.get_id(),
cur_provider.name, cur_provider.get_id(),
minseed, minleech, show, "%|{0}|%".format(sql_episode), season))
else:
sql_return = main_db_con.select(common_sql,
Expand Down Expand Up @@ -224,7 +224,7 @@ def get_provider_cache_results(indexer, show_all_results=None, perform_search=No
# make a queue item for it and put it on the queue
ep_queue_item = search_queue.ForcedSearchQueueItem(ep_obj.show, ep_obj, bool(int(down_cur_quality)), True, manual_search_type) # pylint: disable=maybe-no-member

sickbeard.searchQueueScheduler.action.add_item(ep_queue_item)
sickbeard.forcedSearchQueueScheduler.action.add_item(ep_queue_item)

# give the CPU a break and some time to start the queue
time.sleep(cpu_presets[sickbeard.CPU_PRESET])
Expand Down
4 changes: 2 additions & 2 deletions sickbeard/properFinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ def run(self, force=False): # pylint: disable=unused-argument
:param force: Start even if already running (currently not used, defaults to False)
"""
logger.log(u"Beginning the search for new propers")

if self.amActive:
logger.log(u"Find propers is still running, not starting it again", logger.DEBUG)
return

if sickbeard.searchQueueScheduler.action.is_manualsearch_in_progress():
if sickbeard.forcedSearchQueueScheduler.action.is_forced_search_in_progress():
logger.log(u"Manual search is running. Can't start Find propers", logger.WARNING)
return

Expand Down
4 changes: 2 additions & 2 deletions sickbeard/searchBacklog.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def searchBacklog(self, which_shows=None):
if self.amActive:
logger.log(u"Backlog is still running, not starting it again", logger.DEBUG)
return
if sickbeard.searchQueueScheduler.action.is_manualsearch_in_progress():

if sickbeard.forcedSearchQueueScheduler.action.is_forced_search_in_progress():
logger.log(u"Manual search is running. Can't start Backlog Search", logger.WARNING)
return

Expand Down
120 changes: 87 additions & 33 deletions sickbeard/search_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,6 @@ def is_in_queue(self, show, segment):
return True
return False

def is_ep_in_queue(self, segment):
for cur_item in self.queue:
if isinstance(cur_item, (ForcedSearchQueueItem, FailedQueueItem)) and cur_item.segment == segment:
return True
return False

def is_show_in_queue(self, show):
for cur_item in self.queue:
if isinstance(cur_item, (ForcedSearchQueueItem, FailedQueueItem)) and cur_item.show.indexerid == show:
return True
return False

def get_all_ep_from_queue(self, show):
ep_obj_list = []
for cur_item in self.queue:
if isinstance(cur_item, (ForcedSearchQueueItem, FailedQueueItem)) and str(cur_item.show.indexerid) == show:
ep_obj_list.append(cur_item)
return ep_obj_list

def pause_backlog(self):
self.min_priority = generic_queue.QueuePriorities.HIGH

Expand All @@ -81,12 +62,6 @@ def is_backlog_paused(self):
# backlog priorities are NORMAL, this should be done properly somewhere
return self.min_priority >= generic_queue.QueuePriorities.NORMAL

def is_manualsearch_in_progress(self):
# Only referenced in webserve.py, only current running manualsearch or failedsearch is needed!!
if isinstance(self.currentItem, (ForcedSearchQueueItem, FailedQueueItem)):
return True
return False

def is_backlog_in_progress(self):
for cur_item in self.queue + [self.currentItem]:
if isinstance(cur_item, BacklogQueueItem):
Expand All @@ -100,18 +75,12 @@ def is_dailysearch_in_progress(self):
return False

def queue_length(self):
length = {'backlog': 0, 'daily': 0, 'forced_search': 0, 'manual_search': 0, 'failed': 0}
length = {'backlog': 0, 'daily': 0}
for cur_item in self.queue:
if isinstance(cur_item, DailySearchQueueItem):
length['daily'] += 1
elif isinstance(cur_item, BacklogQueueItem):
length['backlog'] += 1
elif isinstance(cur_item, FailedQueueItem):
length['failed'] += 1
elif isinstance(cur_item, ForcedSearchQueueItem) and not cur_item.manual_search:
length['forced_search'] += 1
elif isinstance(cur_item, ForcedSearchQueueItem) and cur_item.manual_search:
length['manual_search'] += 1
return length

def add_item(self, item):
Expand All @@ -121,7 +90,92 @@ def add_item(self, item):
elif isinstance(item, BacklogQueueItem) and not self.is_in_queue(item.show, item.segment):
# backlog searches
generic_queue.GenericQueue.add_item(self, item)
elif isinstance(item, (ForcedSearchQueueItem, FailedQueueItem)) and not self.is_ep_in_queue(item.segment):
else:
logger.log(u"Not adding item, it's already in the queue", logger.DEBUG)

def force_daily(self):
if not self.is_dailysearch_in_progress and not self.currentItem.amActive:
self.force = True
return True
return False


class ForcedSearchQueue(generic_queue.GenericQueue):
"""Search Queueu used for Forced Search, Failed Search and """
def __init__(self):
"""Initialize ForcedSearch Queue"""
generic_queue.GenericQueue.__init__(self)
self.queue_name = "FORCEDSEARCHQUEUE"

def is_in_queue(self, show, segment):
"""
Verify if the show and segment (episode or number of episodes) are scheduled.
"""
for cur_item in self.queue:
if cur_item.show == show and cur_item.segment == segment:
return True
return False

def is_ep_in_queue(self, segment):
"""
Verify if the show and segment (episode or number of episodes) are scheduled in a
ForcedSearchQueueItem or FailedQueueItem.
"""
for cur_item in self.queue:
if isinstance(cur_item, (ForcedSearchQueueItem, FailedQueueItem)) and cur_item.segment == segment:
return True
return False

def is_show_in_queue(self, show):
"""Verify if the show is queued in this queue as a ForcedSearchQueueItem or FailedQueueItem."""
for cur_item in self.queue:
if isinstance(cur_item, (ForcedSearchQueueItem, FailedQueueItem)) and cur_item.show.indexerid == show:
return True
return False

def get_all_ep_from_queue(self, show):
"""
Get QueueItems from the queue if the queue item is scheduled to search for the passed Show.
@param show: Show indexer_id
@return: A list of ForcedSearchQueueItem or FailedQueueItem items
@todo: In future a show object should be passed instead of the indexer_id, as we might migrate
to a system with multiple indexer_id's for one added show.
"""
ep_obj_list = []
for cur_item in self.queue:
if isinstance(cur_item, (ForcedSearchQueueItem, FailedQueueItem)) and str(cur_item.show.indexerid) == show:
ep_obj_list.append(cur_item)
return ep_obj_list

def is_backlog_paused(self):
"""
Verify if the ForcedSearchQueue's min_priority has been changed. This indicates that the
queue has been paused.
# backlog priorities are NORMAL, this should be done properly somewhere
"""
return self.min_priority >= generic_queue.QueuePriorities.NORMAL

def is_forced_search_in_progress(self):
"""Tests of a forced search is currently running, it doesn't check what's in queue"""
if isinstance(self.currentItem, (ForcedSearchQueueItem, FailedQueueItem)):
return True
return False

def queue_length(self):
length = {'forced_search': 0, 'manual_search': 0, 'failed': 0}
for cur_item in self.queue:
if isinstance(cur_item, FailedQueueItem):
length['failed'] += 1
elif isinstance(cur_item, ForcedSearchQueueItem) and not cur_item.manual_search:
length['forced_search'] += 1
elif isinstance(cur_item, ForcedSearchQueueItem) and cur_item.manual_search:
length['manual_search'] += 1
return length

def add_item(self, item):
"""Add a new ForcedSearchQueueItem or FailedQueueItem to the ForcedSearchQueue"""
if isinstance(item, (ForcedSearchQueueItem, FailedQueueItem)) and not self.is_ep_in_queue(item.segment):
# manual, snatch and failed searches
generic_queue.GenericQueue.add_item(self, item)
else:
Expand Down
2 changes: 1 addition & 1 deletion sickbeard/webapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ def run(self):

# make a queue item for it and put it on the queue
ep_queue_item = search_queue.ForcedSearchQueueItem(show_obj, ep_obj)
sickbeard.searchQueueScheduler.action.add_item(ep_queue_item) # @UndefinedVariable
sickbeard.forcedSearchQueueScheduler.action.add_item(ep_queue_item) # @UndefinedVariable

# wait until the queue item tells us whether it worked or not
while ep_queue_item.success is None: # @UndefinedVariable
Expand Down
11 changes: 7 additions & 4 deletions sickbeard/webserve.py
Original file line number Diff line number Diff line change
Expand Up @@ -2275,7 +2275,7 @@ def searchEpisode(self, show=None, season=None, episode=None, manual_search=None
# make a queue item for it and put it on the queue
ep_queue_item = search_queue.ForcedSearchQueueItem(ep_obj.show, ep_obj, bool(int(down_cur_quality)), bool(manual_search))

sickbeard.searchQueueScheduler.action.add_item(ep_queue_item)
sickbeard.forcedSearchQueueScheduler.action.add_item(ep_queue_item)

# give the CPU a break and some time to start the queue
time.sleep(cpu_presets[sickbeard.CPU_PRESET])
Expand Down Expand Up @@ -2417,7 +2417,7 @@ def retryEpisode(self, show, season, episode, down_cur_quality=0):

# make a queue item for it and put it on the queue
ep_queue_item = search_queue.FailedQueueItem(ep_obj.show, [ep_obj], bool(int(down_cur_quality))) # pylint: disable=no-member
sickbeard.searchQueueScheduler.action.add_item(ep_queue_item)
sickbeard.forcedSearchQueueScheduler.action.add_item(ep_queue_item)

if not ep_queue_item.started and ep_queue_item.success is None:
return json.dumps(
Expand Down Expand Up @@ -3864,8 +3864,11 @@ def index(self):
# t.backlogPI = sickbeard.backlogSearchScheduler.action.getProgressIndicator()

return t.render(backlogPaused=sickbeard.searchQueueScheduler.action.is_backlog_paused(),
backlogRunning=sickbeard.searchQueueScheduler.action.is_backlog_in_progress(), dailySearchStatus=sickbeard.dailySearchScheduler.action.amActive,
findPropersStatus=sickbeard.properFinderScheduler.action.amActive, queueLength=sickbeard.searchQueueScheduler.action.queue_length(),
backlogRunning=sickbeard.searchQueueScheduler.action.is_backlog_in_progress(),
dailySearchStatus=sickbeard.dailySearchScheduler.action.amActive,
findPropersStatus=sickbeard.properFinderScheduler.action.amActive,
searchQueueLength=sickbeard.searchQueueScheduler.action.queue_length(),
forcedSearchQueueLength=sickbeard.forcedSearchQueueScheduler.action.queue_length(),
subtitlesFinderStatus=sickbeard.subtitlesFinderScheduler.action.amActive,
title='Manage Searches', header='Manage Searches', topmenu='manage',
controller="manage", action="manageSearches")
Expand Down

0 comments on commit d999e08

Please sign in to comment.