Skip to content
Browse files

Updating documentation, adding semi-auto retry behavior for clients, …

…and changing some comments
  • Loading branch information...
1 parent 14ab73e commit 49aae5171e3f6c160eb860a2c337a1d62188d449 Matthew Tai committed Jun 29, 2010
View
7 AUTHORS
@@ -1,7 +0,0 @@
-Primary authors:
-
-* Matthew Tai <mtai@yelp.com>
-
-Inspired by python-gearman 1.x:
-
-* Samuel Stauffer <samuel@descolada.com>
View
18 AUTHORS.txt
@@ -0,0 +1,18 @@
+python-gearman 2.x:
+========================================
+Primary authors:
+
+* Matthew Tai <mtai@yelp.com>
+
+python-gearman 1.x
+========================================
+Primary authors:
+
+* Samuel Stauffer <samuel@descolada.com>
+
+Contributors:
+
+* Justin Azoff
+* Kristopher <kris.tate@gmail.com>
+* Eric Sumner <eric@justin.tv>
+
View
1 MANIFEST.in
@@ -1,2 +1,3 @@
+include *.rst
include *.txt
recursive-include docs *.rst
View
5 README.rst
@@ -9,18 +9,15 @@ Python Gearman API - Client, worker, and admin client interfaces
For information on the Gearman protocol and a Gearman server, see http://www.gearman.org/
Installation
-======
-
+============
Work in progress
Status
======
-
2.0.0 - Beta build, in testing
Links
=====
-
* `2.x source <http://github.com/mtai/python-gearman/>`_
* `2.x documentation <http://github.com/mtai/python-gearman/tree/master/docs/>`_
View
37 docs/examples.rst
@@ -19,7 +19,7 @@ Function available to all examples
print "Job %s finished! Result: %s - %s" % (job_request.job.unique, job_request.state, job_request.result)
elif job_request.timed_out:
print "Job %s timed out!" % job_request.unique
- elif job_request.connection_failed:
+ elif job_request.state == JOB_UNKNOWN:
print "Job %s connection failed!" % job_request.unique
Client Examples
@@ -31,8 +31,8 @@ Client Examples
gm_client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730'])
# See gearman/job.py to see attributes on the GearmanJobRequest
- # Defaults to PRIORITY_NONE, background=False (synchronous task), wait_until_complete=False
- completed_job_request = gm_client.submit_job("task_name", "arbitrary binary data", wait_until_complete=True)
+ # Defaults to PRIORITY_NONE, background=False (synchronous task), wait_until_complete=True
+ completed_job_request = gm_client.submit_job("task_name", "arbitrary binary data")
check_request_status(completed_job_request)
2) Single job - high priority, background, blocking call
@@ -42,7 +42,7 @@ Client Examples
gm_client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730'])
# See gearman/job.py to see attributes on the GearmanJobRequest
- submitted_job_request = gm_client.submit_job("task_name", "arbitrary binary data", priority=gearman.PRIORITY_HIGH, background=True, wait_until_complete=True)
+ submitted_job_request = gm_client.submit_job("task_name", "arbitrary binary data", priority=gearman.PRIORITY_HIGH, background=True)
check_request_status(submitted_job_request)
@@ -54,7 +54,7 @@ Client Examples
gm_client = gearman.GearmanClient(['localhost:4730'])
list_of_jobs = [dict(task="task_name", data="binary data"), dict(task="other_task", data="other binary data")]
- submitted_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False)
+ submitted_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False, wait_until_complete=False)
# Once we know our jobs are accepted, we can do other stuff and wait for results later in the function
# Similar to multithreading and doing a join except this is all done in a single process
@@ -76,7 +76,7 @@ Client Examples
failed_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False)
# Let's pretend our assigned requests' Gearman servers all failed
- assert all(request.connection_failed for request in failed_requests), "All connections didn't fail!"
+ assert all(request.state == JOB_UNKNOWN for request in failed_requests), "All connections didn't fail!"
# Let's pretend our assigned requests' don't fail but some simply timeout
retried_connection_failed_requests = gm_client.submit_multiple_requests(failed_requests, wait_until_complete=True, timeout=1.0)
@@ -124,7 +124,7 @@ Worker Examples
# See gearman/job.py to see attributes on the GearmanJob
# Send back a reversed version of the 'data' string
- def task_listener_reverse(gearman_job):
+ def task_listener_reverse(gearman_worker, gearman_job):
return reversed(gearman_job.data)
# gm_worker.set_client_id is optional
@@ -163,6 +163,29 @@ Worker Examples
self.db_connections.rollback()
return continue_working
+3) Sending inflight job updates
+-------------------------------
+::
+
+ gm_worker = gearman.GearmanWorker(['localhost:4730'])
+
+ # See gearman/job.py to see attributes on the GearmanJob
+ # Send back a reversed version of the 'data' string
+ def task_listener_reverse_inflight(gearman_worker, gearman_job):
+ reversed_data = reversed(gearman_job.data)
+ total_chars = len(reversed_data)
+
+ for idx, character in enumerate(reversed_data):
+ gearman_worker.send_job_data(gearman_job, str(character))
+ gearman_worker.send_job_status(gearman_job, idx + 1, total_chars)
+
+ # gm_worker.set_client_id is optional
+ gm_worker.register_task('reverse', task_listener_reverse_inflight)
+
+ # Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity
+ gm_worker.work()
+
+
Admin Client Examples
=====================
1) Checking in on a single host
View
2 gearman/__init__.py
@@ -9,4 +9,4 @@
from gearman.worker import GearmanWorker
from gearman.connection_manager import DataEncoder
-from gearman.constants import PRIORITY_NONE, PRIORITY_LOW, PRIORITY_HIGH, JOB_PENDING, JOB_QUEUED, JOB_FAILED, JOB_COMPLETE
+from gearman.constants import PRIORITY_NONE, PRIORITY_LOW, PRIORITY_HIGH, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
View
8 gearman/admin_client.py
@@ -28,7 +28,9 @@ def __init__(self, host_list=None, timeout=DEFAULT_ADMIN_CLIENT_TIMEOUT):
self.admin_client_timeout = timeout
self.current_connection = util.unlist(self.connection_list)
+ self.current_handler = None
+ def establish_connection(self):
if not self.attempt_connect(self.current_connection):
raise ServerUnavailable('Found no valid connections in list: %r' % self.connection_list)
@@ -38,6 +40,7 @@ def ping_server(self):
"""Sends off a basic debugging string to check the responsiveness of the Gearman server"""
start_time = time.time()
+ self.establish_connection()
self.current_handler.send_echo_request(ECHO_STRING)
server_response = self.wait_until_server_responds(GEARMAN_COMMAND_ECHO_REQ)
if server_response != ECHO_STRING:
@@ -47,6 +50,7 @@ def ping_server(self):
return elapsed_time
def send_maxqueue(self, task, max_size):
+ self.establish_connection()
self.current_handler.send_text_command('%s %s %s' % (GEARMAN_SERVER_COMMAND_MAXQUEUE, task, max_size))
return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_MAXQUEUE)
@@ -55,18 +59,22 @@ def send_shutdown(self, graceful=True):
if graceful:
actual_command += ' graceful'
+ self.establish_connection()
self.current_handler.send_text_command(actual_command)
return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_SHUTDOWN)
def get_status(self):
+ self.establish_connection()
self.current_handler.send_text_command(GEARMAN_SERVER_COMMAND_STATUS)
return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_STATUS)
def get_version(self):
+ self.establish_connection()
self.current_handler.send_text_command(GEARMAN_SERVER_COMMAND_VERSION)
return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_VERSION)
def get_workers(self):
+ self.establish_connection()
self.current_handler.send_text_command(GEARMAN_SERVER_COMMAND_WORKERS)
return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_WORKERS)
View
96 gearman/client.py
@@ -8,8 +8,8 @@
from gearman.connection_manager import GearmanConnectionManager
from gearman.client_handler import GearmanClientCommandHandler
-from gearman.constants import PRIORITY_NONE, PRIORITY_LOW, PRIORITY_HIGH, JOB_PENDING
-from gearman.errors import ServerUnavailable, ConnectionError, InvalidClientState
+from gearman.constants import PRIORITY_NONE, PRIORITY_LOW, PRIORITY_HIGH, JOB_UNKNOWN, JOB_PENDING
+from gearman.errors import ServerUnavailable, ConnectionError, InvalidClientState, ExceededConnectionAttempts
from gearman.job import GearmanJob, GearmanJobRequest
gearman_logger = logging.getLogger(__name__)
@@ -33,45 +33,36 @@ def __init__(self, host_list=None, random_unique_bytes=RANDOM_UNIQUE_BYTES):
# Ignores the fact if a request has been bound to a connection or not
self.request_to_rotating_connection_queue = collections.defaultdict(collections.deque)
- def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=False, timeout=None):
+ def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, timeout=None):
"""Submit a single job to any gearman server"""
job_info = dict(task=task, data=data, unique=unique, priority=priority)
- completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, timeout=timeout)
+ completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, timeout=timeout)
return gearman.util.unlist(completed_job_list)
- def submit_multiple_jobs(self, jobs_to_submit, background=False, wait_until_complete=False, timeout=None):
+ def submit_multiple_jobs(self, jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, timeout=None):
"""Takes a list of jobs_to_submit with dicts of
{'task': task, 'data': data, 'unique': unique, 'priority': priority}
"""
assert type(jobs_to_submit) in (list, tuple, set), "Expected multiple jobs, received 1?"
# Convert all job dicts to job request objects
- requests_to_submit = [self._create_request_from_dictionary(job_info, background=background) for job_info in jobs_to_submit]
+ max_attempts = max_retries + 1
+ requests_to_submit = [self._create_request_from_dictionary(job_info, background=background, max_attempts=max_attempts) for job_info in jobs_to_submit]
return self.submit_multiple_requests(requests_to_submit, wait_until_complete=wait_until_complete, timeout=timeout)
- def submit_multiple_requests(self, job_requests, wait_until_complete=False, timeout=None):
+ def submit_multiple_requests(self, job_requests, wait_until_complete=True, timeout=None):
"""Take GearmanJobRequests, assign them connections, and request that they be done.
* Blocks until our jobs are accepted (should be fast) OR times out
* Optionally blocks until jobs are all complete
- You MUST check the status of your requests after calling this function as "timed_out" or "connection_failed" maybe True
+ You MUST check the status of your requests after calling this function as "timed_out" or "state == JOB_UNKNOWN" maybe True
"""
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
stopwatch = gearman.util.Stopwatch(timeout)
- for current_request in job_requests:
- chosen_connection = self._choose_request_connection(current_request)
-
- current_request.job.connection = chosen_connection
- current_request.connection_failed = False
- current_request.timed_out = False
-
- current_command_handler = self.connection_to_handler_map[chosen_connection]
- current_command_handler.send_job_request(current_request)
-
# We should always wait until our job is accepted, this should be fast
time_remaining = stopwatch.get_time_remaining()
processed_requests = self.wait_until_jobs_accepted(job_requests, timeout=time_remaining)
@@ -86,16 +77,20 @@ def submit_multiple_requests(self, job_requests, wait_until_complete=False, time
def wait_until_jobs_accepted(self, job_requests, timeout=None):
"""Go into a select loop until all our jobs have moved to STATE_PENDING"""
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
- job_connections = self._get_request_connections(job_requests)
def is_request_pending(current_request):
return bool(current_request.state == JOB_PENDING)
# Poll until we know we've gotten acknowledgement that our job's been accepted
+ # If our connection fails while we're waiting for it to be accepted, automatically retry right here
def continue_while_jobs_pending(any_activity):
- return any(bool(is_request_pending(current_request) and not current_request.connection_failed) for current_request in job_requests)
+ for current_request in job_requests:
+ if current_request.state == JOB_UNKNOWN:
+ self._send_job_request(current_request)
- self.poll_connections_until_stopped(job_connections, continue_while_jobs_pending, timeout=timeout)
+ return any(is_request_pending(current_request) for current_request in job_requests)
+
+ self.poll_connections_until_stopped(self.connection_list, continue_while_jobs_pending, timeout=timeout)
# Mark any job still in the queued state to timeout
for current_request in job_requests:
@@ -106,22 +101,26 @@ def continue_while_jobs_pending(any_activity):
def wait_until_jobs_completed(self, job_requests, timeout=None):
"""Go into a select loop until all our jobs have completed or failed"""
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
- job_connections = self._get_request_connections(job_requests)
def is_request_incomplete(current_request):
return not current_request.complete
# Poll until we get responses for all our functions
+ # Do NOT attempt to auto-retry connection failures as we have no idea how for a worker got
def continue_while_jobs_incomplete(any_activity):
- return any(is_request_incomplete(current_request) and not current_request.connection_failed for current_request in job_requests)
+ for current_request in job_requests:
+ if is_request_incomplete(current_request) and current_request.state != JOB_UNKNOWN:
+ return True
+
+ return False
- self.poll_connections_until_stopped(job_connections, continue_while_jobs_incomplete, timeout=timeout)
+ self.poll_connections_until_stopped(self.connection_list, continue_while_jobs_incomplete, timeout=timeout)
# Mark any job still in the queued state to timeout
for current_request in job_requests:
- job_incomplete = is_request_incomplete(current_request)
- current_request.timed_out = job_incomplete
- if not job_incomplete:
+ current_request.timed_out = is_request_incomplete(current_request)
+
+ if not current_request.timed_out:
self.request_to_rotating_connection_queue.pop(current_request, None)
return job_requests
@@ -147,30 +146,26 @@ def get_job_statuses(self, job_requests, timeout=None):
def wait_until_job_statuses_received(self, job_requests, timeout=None):
"""Go into a select loop until we received statuses on all our requests"""
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
- job_connections = self._get_request_connections(job_requests)
-
def is_status_not_updated(current_request):
return bool(current_request.server_status.get('time_received') == current_request.server_status.get('last_time_received'))
# Poll to make sure we send out our request for a status update
def continue_while_status_not_updated(any_activity):
- return any(bool(is_status_not_updated(current_request) and not current_request.connection_failed) for current_request in job_requests)
+ for current_request in job_requests:
+ if is_status_not_updated(current_request) and current_request.state != JOB_UNKNOWN:
+ return True
+
+ return False
- self.poll_connections_until_stopped(job_connections, continue_while_status_not_updated, timeout=timeout)
+ self.poll_connections_until_stopped(self.connection_list, continue_while_status_not_updated, timeout=timeout)
for current_request in job_requests:
current_request.server_status = current_request.server_status or {}
current_request.timed_out = is_status_not_updated(current_request)
return job_requests
- def _get_request_connections(self, job_requests):
- submitted_job_connections = set(current_request.job.connection for current_request in job_requests)
- submitted_job_connections.discard(None)
-
- return submitted_job_connections
-
- def _create_request_from_dictionary(self, job_info, background=False):
+ def _create_request_from_dictionary(self, job_info, background=False, max_attempts=1):
"""Takes a dictionary with fields {'task': task, 'unique': unique, 'data': data, 'priority': priority, 'background': background}"""
# Make sure we have a unique identifier for ALL our tasks
job_unique = job_info.get('unique')
@@ -180,7 +175,7 @@ def _create_request_from_dictionary(self, job_info, background=False):
job_unique = os.urandom(self.random_unique_bytes).encode('hex')
current_job = self.job_class(connection=None, handle=None, task=job_info['task'], unique=job_unique, data=job_info['data'])
- current_request = self.job_request_class(current_job, initial_priority=job_info.get('priority', PRIORITY_NONE), background=background)
+ current_request = self.job_request_class(current_job, initial_priority=job_info.get('priority', PRIORITY_NONE), background=background, max_attempts=max_attempts)
return current_request
def _choose_request_connection(self, current_request):
@@ -210,18 +205,17 @@ def _choose_request_connection(self, current_request):
rotating_connections.rotate(-skipped_connections)
return chosen_connection
- def handle_error(self, current_connection):
- """When we enter a connection error state, we should mark all requests as connection_failed
+ def _send_job_request(self, current_request):
+ """Attempt to send out a job request"""
+ if current_request.connection_attempts >= current_request.max_connection_attempts:
+ raise ExceededConnectionAttempts("Exceeded %d connection attempt(s) :: %r" % (current_request.max_connection_attempts, current_request))
- This doesn't have much meaning for backgrounded requests
- """
- current_handler = self.connection_to_handler_map[current_connection]
- pending_requests, inflight_requests = current_handler.get_requests()
-
- for current_request in pending_requests:
- current_request.connection_failed = True
+ chosen_connection = self._choose_request_connection(current_request)
- for current_request in inflight_requests:
- current_request.connection_failed = True
+ current_request.job.connection = chosen_connection
+ current_request.connection_attempts += 1
+ current_request.timed_out = False
- super(GearmanClient, self).handle_error(current_connection)
+ current_command_handler = self.connection_to_handler_map[chosen_connection]
+ current_command_handler.send_job_request(current_request)
+ return current_request
View
35 gearman/client_handler.py
@@ -3,7 +3,7 @@
import logging
from gearman.command_handler import GearmanCommandHandler
-from gearman.constants import JOB_PENDING, JOB_QUEUED, JOB_FAILED, JOB_COMPLETE
+from gearman.constants import JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import InvalidClientState
from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority
@@ -23,6 +23,8 @@ def __init__(self, connection_manager=None):
##################################################################
def send_job_request(self, current_request):
"""Register a newly created job request"""
+ self._assert_request_state(current_request, JOB_UNKNOWN)
+
gearman_job = current_request.job
# Handle the I/O for requesting a job - determine which COMMAND we need to send
@@ -32,17 +34,20 @@ def send_job_request(self, current_request):
self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data)
# Once this command is sent, our request needs to wait for a handle
+ current_request.state = JOB_PENDING
+
self.requests_awaiting_handles.append(current_request)
def send_get_status_of_job(self, current_request):
"""Forward the status of a job"""
self.send_command(GEARMAN_COMMAND_GET_STATUS, job_handle=current_request.job.handle)
- def get_requests(self):
- """Fetch all requests that this CommandHandler is aware of"""
- pending_requests = self.requests_awaiting_handles
- inflight_requests = self.handle_to_request_map.itervalues()
- return pending_requests, inflight_requests
+ def on_io_error(self):
+ for pending_request in self.requests_awaiting_handles:
+ pending_request.state = JOB_UNKNOWN
+
+ for inflight_request in self.handle_to_request_map.itervalues():
+ inflight_request.state = JOB_UNKNOWN
##################################################################
## Gearman command callbacks with kwargs defined by protocol.py ##
@@ -55,21 +60,21 @@ def recv_job_created(self, job_handle):
if not self.requests_awaiting_handles:
raise InvalidClientState('Received a job_handle with no pending requests')
- # If our client got a JOB_CREATED, our request now has a server handle
+ # If our client got aJOB_CREATED, our request now has a server handle
current_request = self.requests_awaiting_handles.popleft()
self._assert_request_state(current_request, JOB_PENDING)
# Update the state of this request
current_request.job.handle = job_handle
- current_request.state = JOB_QUEUED
+ current_request.state =JOB_CREATED
self.handle_to_request_map[job_handle] = current_request
return True
def recv_work_data(self, job_handle, data):
# Queue a WORK_DATA update
current_request = self.handle_to_request_map[job_handle]
- self._assert_request_state(current_request, JOB_QUEUED)
+ self._assert_request_state(current_request, JOB_CREATED)
current_request.data_updates.append(self.decode_data(data))
@@ -78,7 +83,7 @@ def recv_work_data(self, job_handle, data):
def recv_work_warning(self, job_handle, data):
# Queue a WORK_WARNING update
current_request = self.handle_to_request_map[job_handle]
- self._assert_request_state(current_request, JOB_QUEUED)
+ self._assert_request_state(current_request, JOB_CREATED)
current_request.warning_updates.append(self.decode_data(data))
@@ -87,7 +92,7 @@ def recv_work_warning(self, job_handle, data):
def recv_work_status(self, job_handle, numerator, denominator):
# Queue a WORK_STATUS update
current_request = self.handle_to_request_map[job_handle]
- self._assert_request_state(current_request, JOB_QUEUED)
+ self._assert_request_state(current_request, JOB_CREATED)
# The protocol spec is ambiguous as to what type the numerator and denominator is...
# For now, let's cast to a float as I its safe to assume that we need to get a number back here
@@ -99,7 +104,7 @@ def recv_work_status(self, job_handle, numerator, denominator):
def recv_work_complete(self, job_handle, data):
# Update the state of our request and store our returned result
current_request = self.handle_to_request_map[job_handle]
- self._assert_request_state(current_request, JOB_QUEUED)
+ self._assert_request_state(current_request, JOB_CREATED)
current_request.result = self.decode_data(data)
current_request.state = JOB_COMPLETE
@@ -109,7 +114,7 @@ def recv_work_complete(self, job_handle, data):
def recv_work_fail(self, job_handle):
# Update the state of our request and mark this job as failed
current_request = self.handle_to_request_map[job_handle]
- self._assert_request_state(current_request, JOB_QUEUED)
+ self._assert_request_state(current_request, JOB_CREATED)
current_request.state = JOB_FAILED
@@ -120,7 +125,7 @@ def recv_work_exception(self, job_handle, data):
# http://groups.google.com/group/gearman/browse_thread/thread/5c91acc31bd10688/529e586405ed37fe
#
current_request = self.handle_to_request_map[job_handle]
- self._assert_request_state(current_request, JOB_QUEUED)
+ self._assert_request_state(current_request, JOB_CREATED)
current_request.exception = self.decode_data(data)
@@ -129,7 +134,7 @@ def recv_work_exception(self, job_handle, data):
def recv_status_res(self, job_handle, known, running, numerator, denominator):
# If we received a STATUS_RES update about this request, update our known status
current_request = self.handle_to_request_map[job_handle]
- self._assert_request_state(current_request, JOB_QUEUED)
+ self._assert_request_state(current_request, JOB_CREATED)
# Make our server_status response Python friendly
current_request.server_status = {
View
3 gearman/command_handler.py
@@ -16,6 +16,9 @@ def initial_state(self, *largs, **kwargs):
"""Called by a Connection Manager after we've been instantiated and we're ready to send off commands"""
pass
+ def on_io_error(self):
+ pass
+
def decode_data(self, data):
"""Convenience function :: handle binary string -> object unpacking"""
return self.connection_manager.data_encoder.decode(data)
View
6 gearman/connection_manager.py
@@ -226,9 +226,11 @@ def handle_write(self, current_connection):
current_connection.send_data_to_socket()
def handle_error(self, current_connection):
- dead_handler = self.connection_to_handler_map.pop(current_connection, None)
- dead_connection = self.handler_to_connection_map.pop(dead_handler, None)
+ dead_handler = self.connection_to_handler_map.pop(current_connection, None)
+ if dead_handler:
+ dead_handler.on_io_error()
+ self.handler_to_connection_map.pop(dead_handler, None)
current_connection.close()
##################################
View
9 gearman/constants.py
@@ -5,7 +5,8 @@
PRIORITY_LOW = 'LOW'
PRIORITY_HIGH = 'HIGH'
-JOB_PENDING = 'PENDING'
-JOB_QUEUED = 'QUEUED'
-JOB_FAILED = 'FAILED'
-JOB_COMPLETE = 'COMPLETE'
+JOB_UNKNOWN = 'UNKNOWN' # Request state is currently unknown, either unsubmitted or connection failed
+JOB_PENDING = 'PENDING' # Request has been submitted, pending handle
+JOB_CREATED = 'CREATED' # Request has been accepted
+JOB_FAILED = 'FAILED' # Request received an explicit fail
+JOB_COMPLETE = 'COMPLETE' # Request received an explicit complete
View
3 gearman/errors.py
@@ -13,6 +13,9 @@ class ProtocolError(GearmanError):
class UnknownCommandError(GearmanError):
pass
+class ExceededConnectionAttempts(GearmanError):
+ pass
+
class InvalidClientState(GearmanError):
pass
View
17 gearman/job.py
@@ -1,5 +1,5 @@
import collections
-from gearman.constants import PRIORITY_NONE, JOB_PENDING, JOB_QUEUED, JOB_FAILED, JOB_COMPLETE
+from gearman.constants import PRIORITY_NONE, JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import ConnectionError
class GearmanJob(object):
@@ -20,14 +20,14 @@ def __repr__(self):
class GearmanJobRequest(object):
"""Represents a job request... used in GearmanClient to represent job states"""
- def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, max_retries=0):
+ def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, max_attempts=1):
self.gearman_job = gearman_job
self.priority = initial_priority
self.background = background
- self.retries_attempted = 0
- self.retries_max = max_retries
+ self.connection_attempts = 0
+ self.max_connection_attempts = max_attempts
self.initialize_request()
@@ -46,9 +46,8 @@ def initialize_request(self):
# Holds STATUS_REQ responses
self.server_status = {}
- self.state = JOB_PENDING
+ self.state = JOB_UNKNOWN
self.timed_out = False
- self.connection_failed = False
def reset(self):
self.initialize_request()
@@ -61,8 +60,12 @@ def job(self):
@property
def complete(self):
- background_complete = bool(self.background and self.state in (JOB_QUEUED))
+ background_complete = bool(self.background and self.state in (JOB_CREATED))
foreground_complete = bool(not self.background and self.state in (JOB_FAILED, JOB_COMPLETE))
actually_complete = background_complete or foreground_complete
return actually_complete
+
+ def __repr__(self):
+ formatted_representation = '<GearmanJobRequest task=%r, unique=%r, priority=%r, background=%r, state=%r, timed_out=%r>'
+ return formatted_representation % (self.job.task, self.job.unique, self.priority, self.background, self.state, self.timed_out)
View
2 gearman/worker.py
@@ -164,7 +164,7 @@ def create_job(self, command_handler, job_handle, task, unique, data):
def on_job_execute(self, current_job):
try:
function_callback = self.worker_abilities[current_job.task]
- job_result = function_callback(current_job)
+ job_result = function_callback(self, current_job)
except Exception:
return self.on_job_exception(current_job, sys.exc_info())
View
8 tests/_core_testing.py
@@ -7,9 +7,9 @@
from gearman.connection import GearmanConnection
from gearman.connection_manager import GearmanConnectionManager, NoopEncoder
-from gearman.constants import PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW, DEFAULT_GEARMAN_PORT
+from gearman.constants import PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW, DEFAULT_GEARMAN_PORT, JOB_UNSENT, JOB_ACCEPTED
from gearman.errors import ConnectionError
-from gearman.job import GearmanJob, GearmanJobRequest, JOB_QUEUED
+from gearman.job import GearmanJob, GearmanJobRequest
from gearman.protocol import get_command_name
class MockGearmanConnection(GearmanConnection):
@@ -89,8 +89,8 @@ def generate_job_request(self, priority=PRIORITY_NONE, background=False):
current_job = self.job_class(connection=self.connection, handle=job_handle, task='__test_ability__', unique=str(random.random()), data=str(random.random()))
current_request = self.job_request_class(current_job, initial_priority=priority, background=background)
- # Start this off as someone being queued
- current_request.state = JOB_QUEUED
+ self.assertEqual(current_request.state, JOB_UNSENT)
+ current_request.state = JOB_ACCEPTED
return current_request
View
2 tests/admin_client_tests.py
@@ -10,7 +10,7 @@
from tests._core_testing import _GearmanAbstractTest, MockGearmanConnectionManager, MockGearmanConnection
-class MockGearmanAdminClient(MockGearmanConnectionManager, GearmanAdminClient):
+class MockGearmanAdminClient(GearmanAdminClient, MockGearmanConnectionManager):
pass
class CommandHandlerStateMachineTest(_GearmanAbstractTest):
View
40 tests/client_tests.py
@@ -5,7 +5,7 @@
from gearman.client import GearmanClient
from gearman.client_handler import GearmanClientCommandHandler
-from gearman.constants import PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW, JOB_PENDING, JOB_QUEUED, JOB_FAILED, JOB_COMPLETE
+from gearman.constants import PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW, JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import ServerUnavailable, InvalidClientState
from gearman.protocol import submit_cmd_for_background_priority, GEARMAN_COMMAND_STATUS_RES, GEARMAN_COMMAND_GET_STATUS, GEARMAN_COMMAND_JOB_CREATED, \
GEARMAN_COMMAND_WORK_STATUS, GEARMAN_COMMAND_WORK_FAIL, GEARMAN_COMMAND_WORK_COMPLETE, GEARMAN_COMMAND_WORK_DATA, GEARMAN_COMMAND_WORK_WARNING
@@ -30,9 +30,7 @@ def tearDown(self):
def generate_job_request(self):
current_request = super(ClientTest, self).generate_job_request()
-
- job_handle = current_request.job.handle
- self.command_handler.handle_to_request_map[job_handle] = current_request
+ self.command_handler.handle_to_request_map[current_request.job.handle] = current_request
return current_request
def test_connection_rotation_for_requests(self):
@@ -107,14 +105,14 @@ def mark_jobs_created(rx_conns, wr_conns, ex_conns):
job_dictionaries = [current_job.to_dict() for current_job in expected_job_list]
# Test multiple job submission
- job_requests = self.connection_manager.submit_multiple_jobs(job_dictionaries)
+ job_requests = self.connection_manager.submit_multiple_jobs(job_dictionaries, wait_until_complete=False)
for current_request, expected_job in zip(job_requests, expected_job_list):
current_job = current_request.job
self.assert_jobs_equal(current_job, expected_job)
self.assertEqual(current_request.priority, PRIORITY_NONE)
self.assertEqual(current_request.background, False)
- self.assertEqual(current_request.state, JOB_QUEUED)
+ self.assertEqual(current_request.state, JOB_CREATED)
self.assertFalse(current_request.complete)
@@ -125,14 +123,14 @@ def mark_job_created(rx_conns, wr_conns, ex_conns):
return rx_conns, wr_conns, ex_conns
self.connection_manager.handle_connection_activity = mark_job_created
- job_request = self.connection_manager.submit_job(expected_job.task, expected_job.data, unique=expected_job.unique, background=True, priority=PRIORITY_LOW)
+ job_request = self.connection_manager.submit_job(expected_job.task, expected_job.data, unique=expected_job.unique, background=True, priority=PRIORITY_LOW, wait_until_complete=False)
current_job = job_request.job
self.assert_jobs_equal(current_job, expected_job)
self.assertEqual(job_request.priority, PRIORITY_LOW)
self.assertEqual(job_request.background, True)
- self.assertEqual(job_request.state, JOB_QUEUED)
+ self.assertEqual(job_request.state, JOB_CREATED)
self.assertTrue(job_request.complete)
@@ -156,9 +154,9 @@ def test_wait_for_multiple_jobs_to_complete_or_timeout(self):
failed_request = self.generate_job_request()
timeout_request = self.generate_job_request()
- completed_request.state = JOB_QUEUED
- failed_request.state = JOB_QUEUED
- timeout_request.state = JOB_QUEUED
+ completed_request.state = JOB_CREATED
+ failed_request.state = JOB_CREATED
+ timeout_request.state = JOB_CREATED
self.update_requests = True
def multiple_job_updates(rx_conns, wr_conns, ex_conns):
@@ -186,15 +184,15 @@ def multiple_job_updates(rx_conns, wr_conns, ex_conns):
self.assertEqual(finished_failed_request.result, None)
self.assertFalse(finished_failed_request.timed_out)
- self.assertEqual(finished_timeout_request.state, JOB_QUEUED)
+ self.assertEqual(finished_timeout_request.state, JOB_CREATED)
self.assertEqual(finished_timeout_request.result, None)
self.assertTrue(finished_timeout_request.timed_out)
def test_get_job_status(self):
self.connection.connect()
single_request = self.generate_job_request()
- single_request.state = JOB_QUEUED
+ single_request.state = JOB_CREATED
def retrieve_status(rx_conns, wr_conns, ex_conns):
self.command_handler.recv_command(GEARMAN_COMMAND_STATUS_RES, job_handle=single_request.job.handle, known='1', running='0', numerator='0.0', denominator='1.0')
@@ -215,7 +213,7 @@ def test_get_job_status_timeout(self):
self.connection.connect()
single_request = self.generate_job_request()
- single_request.state = JOB_QUEUED
+ single_request.state = JOB_CREATED
def retrieve_status_timeout(rx_conns, wr_conns, ex_conns):
pass
@@ -279,7 +277,7 @@ def test_received_job_created(self):
self.command_handler.recv_command(GEARMAN_COMMAND_JOB_CREATED, job_handle=new_handle)
self.assertEqual(current_request.job.handle, new_handle)
- self.assertEqual(current_request.state, JOB_QUEUED)
+ self.assertEqual(current_request.state, JOB_CREATED)
self.assertEqual(self.command_handler.handle_to_request_map[new_handle], current_request)
def test_received_job_created_out_of_order(self):
@@ -293,7 +291,7 @@ def test_required_state_pending(self):
new_handle = str(random.random())
- invalid_states = [JOB_QUEUED, JOB_COMPLETE, JOB_FAILED]
+ invalid_states = [JOB_UNKNOWN, JOB_CREATED, JOB_COMPLETE, JOB_FAILED]
for bad_state in invalid_states:
current_request.state = bad_state
@@ -308,11 +306,11 @@ def test_required_state_queued(self):
job_handle = current_request.job.handle
new_data = str(random.random())
- invalid_states = [JOB_PENDING, JOB_COMPLETE, JOB_FAILED]
+ invalid_states = [JOB_UNKNOWN, JOB_PENDING, JOB_COMPLETE, JOB_FAILED]
for bad_state in invalid_states:
current_request.state = bad_state
- # All these commands expect to be in JOB_QUEUED
+ # All these commands expect to be in JOB_CREATED
self.assertRaises(InvalidClientState, self.command_handler.recv_command, GEARMAN_COMMAND_WORK_DATA, job_handle=job_handle, data=new_data)
self.assertRaises(InvalidClientState, self.command_handler.recv_command, GEARMAN_COMMAND_WORK_WARNING, job_handle=job_handle, data=new_data)
@@ -332,18 +330,18 @@ def test_in_flight_work_updates(self):
# Test WORK_DATA
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_DATA, job_handle=job_handle, data=new_data)
self.assertEqual(current_request.data_updates.popleft(), new_data)
- self.assertEqual(current_request.state, JOB_QUEUED)
+ self.assertEqual(current_request.state, JOB_CREATED)
# Test WORK_WARNING
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_WARNING, job_handle=job_handle, data=new_data)
self.assertEqual(current_request.warning_updates.popleft(), new_data)
- self.assertEqual(current_request.state, JOB_QUEUED)
+ self.assertEqual(current_request.state, JOB_CREATED)
# Test WORK_STATUS
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_STATUS, job_handle=job_handle, numerator=0.0, denominator=1.0)
self.assertEqual(current_request.status_updates.popleft(), (0.0, 1.0))
- self.assertEqual(current_request.state, JOB_QUEUED)
+ self.assertEqual(current_request.state, JOB_CREATED)
def test_work_complete(self):
current_request = self.generate_job_request()
View
2 tests/protocol_tests.py
@@ -7,7 +7,7 @@
import types
from gearman.connection import GearmanConnection
-from gearman.constants import JOB_PENDING, JOB_QUEUED, JOB_FAILED, JOB_COMPLETE
+from gearman.constants import JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import ConnectionError, ServerUnavailable, ProtocolError
from gearman import protocol

0 comments on commit 49aae51

Please sign in to comment.
Something went wrong with that request. Please try again.