Permalink
Browse files

Add support for SUBMIT_JOB_EPOCH

* submit_job now accepts a 'when_to_run' parameter to take advantage of
  Gearman's ability to schedule jobs to be run in the future

Conflicts:
	gearman/protocol.py
	gearman/worker.py
  • Loading branch information...
1 parent 9a6891a commit ceb37df16afdb23840272813fa54e096f6ecdfb8 @jturmel jturmel committed with May 30, 2013
Showing with 60 additions and 32 deletions.
  1. +1 −1 gearman/__init__.py
  2. +20 −6 gearman/client.py
  3. +6 −3 gearman/client_handler.py
  4. +8 −6 gearman/job.py
  5. +16 −10 gearman/protocol.py
  6. +4 −1 gearman/worker.py
  7. +3 −3 tests/_core_testing.py
  8. +2 −2 tests/client_tests.py
View
@@ -18,4 +18,4 @@ def emit(self, record):
pass
gearman_root_logger = logging.getLogger('gearman')
-gearman_root_logger.addHandler(NullHandler())
+gearman_root_logger.addHandler(NullHandler())
View
@@ -31,16 +31,22 @@ def __init__(self, host_list=None, random_unique_bytes=RANDOM_UNIQUE_BYTES):
# Ignores the fact if a request has been bound to a connection or not
self.request_to_rotating_connection_queue = compat.defaultdict(collections.deque)
- def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
+ def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, when_to_run=None, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
"""Submit a single job to any gearman server"""
- job_info = dict(task=task, data=data, unique=unique, priority=priority)
+ assert (when_to_run is None or
+ (when_to_run and
+ priority == PRIORITY_NONE and
+ not background)
+ ), "priority and background cannot be set with when_to_run"
+
+ job_info = dict(task=task, data=data, unique=unique, priority=priority, when_to_run=when_to_run)
completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, poll_timeout=poll_timeout)
return gearman.util.unlist(completed_job_list)
def submit_multiple_jobs(self, jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
"""Takes a list of jobs_to_submit with dicts of
- {'task': task, 'data': data, 'unique': unique, 'priority': priority}
+ {'task': task, 'data': data, 'unique': unique, 'priority': priority, 'when_to_run': when_to_run}
"""
assert type(jobs_to_submit) in (list, tuple, set), "Expected multiple jobs, received 1?"
@@ -164,20 +170,28 @@ def continue_while_status_not_updated(any_activity):
return job_requests
def _create_request_from_dictionary(self, job_info, background=False, max_retries=0):
- """Takes a dictionary with fields {'task': task, 'unique': unique, 'data': data, 'priority': priority, 'background': background}"""
+ """Takes a dictionary with fields {'task': task, 'unique': unique, 'data': data, 'priority': priority, 'when_to_run': when_to_run, 'background': background}"""
# Make sure we have a unique identifier for ALL our tasks
job_unique = job_info.get('unique')
if job_unique == '-':
job_unique = job_info['data']
elif not job_unique:
job_unique = os.urandom(self.random_unique_bytes).encode('hex')
- current_job = self.job_class(connection=None, handle=None, task=job_info['task'], unique=job_unique, data=job_info['data'])
+ run_later = False
+ if job_info.get('when_to_run'):
+ job_info['when_to_run'] = str(job_info.get('when_to_run'))
+ run_later = True
+ # run_later jobs always are in the background, so set to True
+ background = True
+ job_info['background'] = background
+
+ current_job = self.job_class(connection=None, handle=None, task=job_info['task'], unique=job_unique, when_to_run=job_info.get('when_to_run'), data=job_info['data'])
initial_priority = job_info.get('priority', PRIORITY_NONE)
max_attempts = max_retries + 1
- current_request = self.job_request_class(current_job, initial_priority=initial_priority, background=background, max_attempts=max_attempts)
+ current_request = self.job_request_class(current_job, initial_priority=initial_priority, background=background, run_later=run_later, max_attempts=max_attempts)
return current_request
def establish_request_connection(self, current_request):
@@ -5,7 +5,7 @@
from gearman.command_handler import GearmanCommandHandler
from gearman.constants import JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import InvalidClientState
-from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority
+from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority_run_later
gearman_logger = logging.getLogger(__name__)
@@ -28,10 +28,13 @@ def send_job_request(self, current_request):
gearman_job = current_request.job
# Handle the I/O for requesting a job - determine which COMMAND we need to send
- cmd_type = submit_cmd_for_background_priority(current_request.background, current_request.priority)
+ cmd_type = submit_cmd_for_background_priority_run_later(current_request.background, current_request.priority, current_request.run_later)
outbound_data = self.encode_data(gearman_job.data)
- self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data)
+ if current_request.run_later:
+ self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, when_to_run=gearman_job.when_to_run, data=outbound_data)
+ else:
+ self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data)
# Once this command is sent, our request needs to wait for a handle
current_request.state = JOB_PENDING
View
@@ -3,27 +3,29 @@
class GearmanJob(object):
"""Represents the basics of a job... used in GearmanClient / GearmanWorker to represent job states"""
- def __init__(self, connection, handle, task, unique, data):
+ def __init__(self, connection, handle, task, unique, data, when_to_run):
self.connection = connection
self.handle = handle
self.task = task
self.unique = unique
self.data = data
+ self.when_to_run = when_to_run
def to_dict(self):
- return dict(task=self.task, job_handle=self.handle, unique=self.unique, data=self.data)
+ return dict(task=self.task, job_handle=self.handle, unique=self.unique, data=self.data, when_to_run=self.when_to_run)
def __repr__(self):
- return '<GearmanJob connection/handle=(%r, %r), task=%s, unique=%s, data=%r>' % (self.connection, self.handle, self.task, self.unique, self.data)
+ return '<GearmanJob connection/handle=(%r, %r), task=%s, unique=%s, data=%r, when_to_run=%d>' % (self.connection, self.handle, self.task, self.unique, self.data, self.when_to_run)
class GearmanJobRequest(object):
"""Represents a job request... used in GearmanClient to represent job states"""
- def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, max_attempts=1):
+ def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, run_later=False, max_attempts=1):
self.gearman_job = gearman_job
self.priority = initial_priority
self.background = background
+ self.run_later = run_later
self.connection_attempts = 0
self.max_connection_attempts = max_attempts
@@ -79,5 +81,5 @@ def complete(self):
return actually_complete
def __repr__(self):
- formatted_representation = '<GearmanJobRequest task=%r, unique=%r, priority=%r, background=%r, state=%r, timed_out=%r>'
- return formatted_representation % (self.job.task, self.job.unique, self.priority, self.background, self.state, self.timed_out)
+ formatted_representation = '<GearmanJobRequest task=%r, unique=%r, when_to_run=%r, priority=%r, background=%r, run_later=%r, state=%r, timed_out=%r>'
+ return formatted_representation % (self.job.task, self.job.unique, self.job.when_to_run, self.priority, self.background, self.run_later, self.state, self.timed_out)
View
@@ -49,8 +49,9 @@
GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG = 32
GEARMAN_COMMAND_SUBMIT_JOB_LOW = 33
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG = 34
+GEARMAN_COMMAND_SUBMIT_JOB_EPOCH = 36
-# Fake command code
+# Fake command code
GEARMAN_COMMAND_TEXT_COMMAND = 9999
GEARMAN_PARAMS_FOR_COMMAND = {
@@ -95,6 +96,8 @@
GEARMAN_COMMAND_SUBMIT_JOB_LOW: ['task', 'unique', 'data'],
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: ['task', 'unique', 'data'],
+ GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: ['task', 'unique', 'when_to_run', 'data'],
+
# Fake gearman command
GEARMAN_COMMAND_TEXT_COMMAND: ['raw_text']
}
@@ -140,6 +143,8 @@
GEARMAN_COMMAND_SUBMIT_JOB_LOW: 'GEARMAN_COMMAND_SUBMIT_JOB_LOW',
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: 'GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG',
+ GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: 'GEARMAN_COMMAND_SUBMIT_JOB_EPOCH',
+
GEARMAN_COMMAND_TEXT_COMMAND: 'GEARMAN_COMMAND_TEXT_COMMAND'
}
@@ -152,16 +157,17 @@
def get_command_name(cmd_type):
return GEARMAN_COMMAND_TO_NAME.get(cmd_type, cmd_type)
-def submit_cmd_for_background_priority(background, priority):
+def submit_cmd_for_background_priority_run_later(background, priority, run_later):
cmd_type_lookup = {
- (True, PRIORITY_NONE): GEARMAN_COMMAND_SUBMIT_JOB_BG,
- (True, PRIORITY_LOW): GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
- (True, PRIORITY_HIGH): GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
- (False, PRIORITY_NONE): GEARMAN_COMMAND_SUBMIT_JOB,
- (False, PRIORITY_LOW): GEARMAN_COMMAND_SUBMIT_JOB_LOW,
- (False, PRIORITY_HIGH): GEARMAN_COMMAND_SUBMIT_JOB_HIGH
+ (True, PRIORITY_NONE, False): GEARMAN_COMMAND_SUBMIT_JOB_BG,
+ (True, PRIORITY_LOW, False): GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
+ (True, PRIORITY_HIGH, False): GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
+ (False, PRIORITY_NONE, False): GEARMAN_COMMAND_SUBMIT_JOB,
+ (False, PRIORITY_LOW, False): GEARMAN_COMMAND_SUBMIT_JOB_LOW,
+ (False, PRIORITY_HIGH, False): GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
+ (True, PRIORITY_NONE, True): GEARMAN_COMMAND_SUBMIT_JOB_EPOCH
}
- lookup_tuple = (background, priority)
+ lookup_tuple = (background, priority, run_later)
cmd_type = cmd_type_lookup[lookup_tuple]
return cmd_type
@@ -277,4 +283,4 @@ def pack_text_command(cmd_type, cmd_args):
if cmd_line is None:
raise ProtocolError('Did not receive arguments any valid arguments: %s' % cmd_args)
- return str(cmd_line)
+ return str(cmd_line)
View
@@ -186,7 +186,7 @@ def send_job_warning(self, current_job, data, poll_timeout=None):
def create_job(self, command_handler, job_handle, task, unique, data):
"""Create a new job using our self.job_class"""
current_connection = self.handler_to_connection_map[command_handler]
- return self.job_class(current_connection, job_handle, task, unique, data)
+ return self.job_class(current_connection, job_handle, task, unique, data, None)
def on_job_execute(self, current_job):
try:
@@ -225,6 +225,9 @@ def set_job_lock(self, command_handler, lock):
return True
+ def has_job_lock(self):
+ return bool(self.command_handler_holding_job_lock is not None)
+
def check_job_lock(self, command_handler):
"""Check to see if we hold the job lock"""
return bool(self.command_handler_holding_job_lock == command_handler)
@@ -71,15 +71,15 @@ def setup_command_handler(self):
self.command_handler = self.connection_manager.connection_to_handler_map[self.connection]
def generate_job(self):
- return self.job_class(self.connection, handle=str(random.random()), task='__test_ability__', unique=str(random.random()), data=str(random.random()))
+ return self.job_class(self.connection, handle=str(random.random()), task='__test_ability__', unique=str(random.random()), data=str(random.random()), when_to_run=None)
def generate_job_dict(self):
current_job = self.generate_job()
return current_job.to_dict()
- def generate_job_request(self, priority=PRIORITY_NONE, background=False):
+ def generate_job_request(self, priority=PRIORITY_NONE, when_to_run=None, background=False):
job_handle = str(random.random())
- current_job = self.job_class(connection=self.connection, handle=job_handle, task='__test_ability__', unique=str(random.random()), data=str(random.random()))
+ current_job = self.job_class(connection=self.connection, handle=job_handle, task='__test_ability__', unique=str(random.random()), data=str(random.random()), when_to_run=when_to_run)
current_request = self.job_request_class(current_job, initial_priority=priority, background=background)
self.assertEqual(current_request.state, JOB_UNKNOWN)
@@ -7,7 +7,7 @@
from gearman.constants import PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW, JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import ExceededConnectionAttempts, ServerUnavailable, InvalidClientState
-from gearman.protocol import submit_cmd_for_background_priority, GEARMAN_COMMAND_STATUS_RES, GEARMAN_COMMAND_GET_STATUS, GEARMAN_COMMAND_JOB_CREATED, \
+from gearman.protocol import submit_cmd_for_background_priority_run_later, GEARMAN_COMMAND_STATUS_RES, GEARMAN_COMMAND_GET_STATUS, GEARMAN_COMMAND_JOB_CREATED, \
GEARMAN_COMMAND_WORK_STATUS, GEARMAN_COMMAND_WORK_FAIL, GEARMAN_COMMAND_WORK_COMPLETE, GEARMAN_COMMAND_WORK_DATA, GEARMAN_COMMAND_WORK_WARNING
from tests._core_testing import _GearmanAbstractTest, MockGearmanConnectionManager, MockGearmanConnection
@@ -312,7 +312,7 @@ def test_send_job_request(self):
queued_request = self.command_handler.requests_awaiting_handles.popleft()
self.assertEqual(queued_request, current_request)
- expected_cmd_type = submit_cmd_for_background_priority(background, priority)
+ expected_cmd_type = submit_cmd_for_background_priority_run_later(background, priority, False)
self.assert_sent_command(expected_cmd_type, task=gearman_job.task, data=gearman_job.data, unique=gearman_job.unique)
def test_get_status_of_job(self):

0 comments on commit ceb37df

Please sign in to comment.