Permalink
Browse files

Merge pull request #330 from paulswartz/bz19008

bz19008. timeout MutagenTask (if supported)
  • Loading branch information...
2 parents 8baeef5 + 1d6fa4c commit 9f9c47fb30dc7c16f89e7f62379882e0817170a3 @bendk bendk committed Apr 13, 2012
Showing with 71 additions and 9 deletions.
  1. +27 −0 tv/lib/test/utiltest.py
  2. +18 −0 tv/lib/util.py
  3. +26 −9 tv/lib/workerprocess.py
View
27 tv/lib/test/utiltest.py
@@ -5,6 +5,7 @@
import os
import tempfile
import time
+import signal
import shutil
import unittest
import sys
@@ -1087,3 +1088,29 @@ def invalidator(key):
self.assertEquals(self.cache.get(1, invalidator=invalidator),
(1, 1))
+
+class AlarmTestCase(MiroTestCase):
+
+ @staticmethod
+ def _long_function():
+ time.sleep(1.5)
+ return True
+
+ def _wrapped_function(self, set_signal=True):
+ with util.alarm(1, set_signal=set_signal):
+ return self._long_function()
+
+ if hasattr(signal, 'SIGALRM'):
+ def test_alarm_works(self):
+ self.assertRaises(IOError, self._wrapped_function)
+
+ def test_context_manager__True(self):
+ with util.alarm(1) as result:
+ self.assertTrue(result)
+
+ def test_alarm_noop(self):
+ self.assertTrue(self._wrapped_function(set_signal=False))
+
+ def test_context_manager__False(self):
+ with util.alarm(0, set_signal=False) as result:
+ self.assertFalse(result)
View
18 tv/lib/util.py
@@ -35,6 +35,7 @@
from hashlib import sha1 as sha
from StringIO import StringIO
+import contextlib
import itertools
import logging
import os
@@ -43,6 +44,7 @@
import shutil
import socket
import string
+import signal
import subprocess
import sys
import tempfile
@@ -1348,3 +1350,19 @@ def next_free_directory(name):
candidate = candidates.next()
if not os.path.exists(candidate):
return candidate
+
+@contextlib.contextmanager
+def alarm(timeout, set_signal=True):
+ def alarm_handler(signum, frame):
+ raise IOError('timeout after %i seconds' % timeout)
+ if set_signal:
+ set_signal = hasattr(signal, 'SIGALRM')
+ if set_signal:
+ signal.signal(signal.SIGALRM, alarm_handler)
+ signal.alarm(timeout)
+ yield set_signal
+ if set_signal:
+ signal.alarm(0)
+
+def supports_alarm():
+ return hasattr(signal, 'SIGALRM')
View
35 tv/lib/workerprocess.py
@@ -127,7 +127,8 @@ def __init__(self):
subprocessmanager.SubprocessHandler.__init__(self)
self.threads = []
self.task_queue = WorkerTaskQueue()
- self.pending_moviedata_tasks = deque()
+ self.main_thread_tasks = deque()
+ self.supports_alarm = util.supports_alarm()
def call_handler(self, method, msg):
try:
@@ -137,9 +138,17 @@ def call_handler(self, method, msg):
elif isinstance(msg, MovieDataProgramTask):
# we have to handle this message on this thread, since
# QtKit will break if we use it on any thread except the main
- # one. Put it in pending_moviedata_tasks and handle once
+ # one. Put it in main_thread_tasks and handle once
# there's no more tasks waiting in to be processed
- self.pending_moviedata_tasks.append((method, msg))
+ self.main_thread_tasks.append((method, msg))
+ elif isinstance(msg, MutagenTask):
+ # If we're using the alarm, then MutagenTasks need to run in
+ # the main thread as well. Signals aren't support outside of
+ # the main thread.
+ if self.supports_alarm:
+ self.main_thread_tasks.append((method, msg))
+ else:
+ self.task_queue.add_task(method, msg)
elif isinstance(msg, TaskMessage):
self.task_queue.add_task(method, msg)
else:
@@ -150,8 +159,12 @@ def call_handler(self, method, msg):
def get_task_from_queue(self, queue):
# handle movie data tasks if no more tasks are coming in right now
ran_movie_data = False
- while queue.empty() and self.pending_moviedata_tasks:
- method, msg = self.pending_moviedata_tasks.popleft()
+ while queue.empty() and self.main_thread_tasks:
+ method, msg = self.main_thread_tasks.popleft()
+ if isinstance(msg, MutagenTask):
+ # if we're here, it means we want to use the signals
+ handle_task(self.handle_mutagen_task_with_alarm, msg)
+ continue
MovieDataTaskStatus(msg.task_id).send_to_main_process()
ran_movie_data = True
handle_task(method, msg)
@@ -164,7 +177,7 @@ def get_task_from_queue(self, queue):
# following is True
# - The queue has a message in it and therefore
# get_task_from_queue() will be called again soon
- # - pending_moviedata_tasks is empty
+ # - main_thread_tasks is empty
#
# So we don't have to worry about the blocking call preventing the
# MovieDataProgramTasks from running
@@ -184,11 +197,11 @@ def handle_worker_startup_info(self, msg):
def handle_cancel_file_operations(self, msg):
path_set = set(msg.paths)
self.task_queue.cancel_file_operations(path_set)
- # we need to handle pending_moviedata_tasks, since those skip the task
+ # we need to handle main_thread_tasks, since those skip the task
# queue
- filtered_tasks = deque(t for t in self.pending_moviedata_tasks
+ filtered_tasks = deque(t for t in self.main_thread_tasks
if t.source_path not in path_set)
- self.pending_moviedata_tasks = filtered_tasks
+ self.main_thread_tasks = filtered_tasks
return None
# handle_movie_data_program_task gets called in the main thread, unlike
@@ -212,6 +225,10 @@ def handle_feedparser_task(self, msg):
def handle_mutagen_task(self, msg):
return filetags.process_file(msg.source_path, msg.cover_art_directory)
+ def handle_mutagen_task_with_alarm(self, msg):
+ with util.alarm(2):
+ return self.handle_mutagen_task(msg)
+
class _SinglePriorityQueue(object):
"""Manages tasks at a single priority for WorkerTaskQueue

0 comments on commit 9f9c47f

Please sign in to comment.