Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pinterest/pymemcache
Browse files Browse the repository at this point in the history
  • Loading branch information
cgordon committed Apr 16, 2015
2 parents fe6ad07 + b7ba927 commit 0646937
Show file tree
Hide file tree
Showing 4 changed files with 491 additions and 245 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,4 @@ Credits
* [Julien Danjou](http://github.com/jd)
* [INADA Naoki](http://github.com/methane)
* [James Socol](http://github.com/jsocol)
* [Joshua Harlow](http://github.com/harlowja)
219 changes: 201 additions & 18 deletions pymemcache/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def json_deserializer(key, value, flags):
import socket
import six

from pymemcache import pool


RECV_SIZE = 4096
VALID_STORE_RESULTS = {
Expand Down Expand Up @@ -152,6 +154,23 @@ class MemcacheUnexpectedCloseError(MemcacheServerError):
pass


# Common helper functions.

def _check_key(key, key_prefix=b''):
"""Checks key and add key_prefix."""
if isinstance(key, six.text_type):
try:
key = key.encode('ascii')
except UnicodeEncodeError:
raise MemcacheIllegalInputError("No ascii key: %r" % (key,))
key = key_prefix + key
if b' ' in key:
raise MemcacheIllegalInputError("Key contains spaces: %r" % (key,))
if len(key) > 250:
raise MemcacheIllegalInputError("Key is too long: %r" % (key,))
return key


class Client(object):
"""
A client for a single memcached server.
Expand Down Expand Up @@ -266,7 +285,6 @@ def __init__(self,
self.ignore_exc = ignore_exc
self.socket_module = socket_module
self.sock = None
self.buf = b''
if isinstance(key_prefix, six.text_type):
key_prefix = key_prefix.encode('ascii')
if not isinstance(key_prefix, bytes):
Expand All @@ -275,17 +293,7 @@ def __init__(self,

def check_key(self, key):
"""Checks key and add key_prefix."""
if isinstance(key, six.text_type):
try:
key = key.encode('ascii')
except UnicodeEncodeError as e:
raise MemcacheIllegalInputError("No ascii key: %r" % (key,))
key = self.key_prefix + key
if b' ' in key:
raise MemcacheIllegalInputError("Key contains spaces: %r" % (key,))
if len(key) > 250:
raise MemcacheIllegalInputError("Key is too long: %r" % (key,))
return key
return _check_key(key, key_prefix=self.key_prefix)

def _connect(self):
sock = self.socket_module.socket(self.socket_module.AF_INET,
Expand All @@ -307,7 +315,6 @@ def close(self):
except Exception:
pass
self.sock = None
self.buf = b''

def set(self, key, value, expire=0, noreply=True):
"""
Expand Down Expand Up @@ -694,9 +701,10 @@ def _fetch_cmd(self, name, keys, expect_cas):

self.sock.sendall(cmd)

buf = b''
result = {}
while True:
self.buf, line = _readline(self.sock, self.buf)
buf, line = _readline(self.sock, buf)
self._raise_errors(line, name)

if line == b'END':
Expand All @@ -711,9 +719,7 @@ def _fetch_cmd(self, name, keys, expect_cas):
raise ValueError("Unable to parse line %s: %s"
% (line, str(e)))

self.buf, value = _readvalue(self.sock,
self.buf,
int(size))
buf, value = _readvalue(self.sock, buf, int(size))
key = checked_keys[key]

if self.deserializer:
Expand Down Expand Up @@ -767,7 +773,8 @@ def _store_cmd(self, name, key, expire, noreply, data, cas=None):
if noreply:
return True

self.buf, line = _readline(self.sock, self.buf)
buf = b''
buf, line = _readline(self.sock, buf)
self._raise_errors(line, name)

if line in VALID_STORE_RESULTS[name]:
Expand Down Expand Up @@ -816,6 +823,182 @@ def __delitem__(self, key):
self.delete(key, noreply=True)


class PooledClient(object):
"""A thread-safe pool of clients (with the same client api)."""

def __init__(self,
server,
serializer=None,
deserializer=None,
connect_timeout=None,
timeout=None,
no_delay=False,
ignore_exc=False,
socket_module=socket,
key_prefix=b'',
max_pool_size=None):
self.server = server
self.serializer = serializer
self.deserializer = deserializer
self.connect_timeout = connect_timeout
self.timeout = timeout
self.no_delay = no_delay
self.ignore_exc = ignore_exc
self.socket_module = socket_module
if isinstance(key_prefix, six.text_type):
key_prefix = key_prefix.encode('ascii')
if not isinstance(key_prefix, bytes):
raise TypeError("key_prefix should be bytes.")
self.key_prefix = key_prefix
self.client_pool = pool.ObjectPool(
self._create_client,
after_remove=lambda client: client.close(),
max_size=max_pool_size)

def check_key(self, key):
"""Checks key and add key_prefix."""
return _check_key(key, key_prefix=self.key_prefix)

def _create_client(self):
client = Client(self.server,
serializer=self.serializer,
deserializer=self.deserializer,
connect_timeout=self.connect_timeout,
timeout=self.timeout,
no_delay=self.no_delay,
# We need to know when it fails *always* so that we
# can remove/destroy it from the pool...
ignore_exc=False,
socket_module=self.socket_module,
key_prefix=self.key_prefix)
return client

def close(self):
self.client_pool.clear()

def set(self, key, value, expire=0, noreply=True):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.set(key, value, expire=expire, noreply=noreply)

def set_many(self, values, expire=0, noreply=True):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.set_many(values, expire=expire, noreply=noreply)

def replace(self, key, value, expire=0, noreply=True):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.replace(key, value, expire=expire, noreply=noreply)

def append(self, key, value, expire=0, noreply=True):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.append(key, value, expire=expire, noreply=noreply)

def prepend(self, key, value, expire=0, noreply=True):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.prepend(key, value, expire=expire, noreply=noreply)

def cas(self, key, value, cas, expire=0, noreply=False):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.cas(key, value, cas,
expire=expire, noreply=noreply)

def get(self, key):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
try:
return client.get(key)
except Exception:
if self.ignore_exc:
return None
else:
raise

def get_many(self, keys):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
try:
return client.get_many(keys)
except Exception:
if self.ignore_exc:
return {}
else:
raise

def gets(self, key):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
try:
return client.gets(key)
except Exception:
if self.ignore_exc:
return (None, None)
else:
raise

def gets_many(self, keys):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
try:
return client.gets_many(keys)
except Exception:
if self.ignore_exc:
return {}
else:
raise

def delete(self, key, noreply=True):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.delete(key, noreply=noreply)

def delete_many(self, keys, noreply=True):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.delete_many(keys, noreply=noreply)

def add(self, key, value, expire=0, noreply=True):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.add(key, value, expire=expire, noreply=noreply)

def incr(self, key, value, noreply=False):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.incr(key, value, noreply=noreply)

def decr(self, key, value, noreply=False):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.decr(key, value, noreply=noreply)

def touch(self, key, expire=0, noreply=True):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.touch(key, expire=expire, noreply=noreply)

def stats(self, *args):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
try:
return client.stats(*args)
except Exception:
if self.ignore_exc:
return {}
else:
raise

def flush_all(self, delay=0, noreply=True):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
return client.flush_all(delay=delay, noreply=noreply)

def quit(self):
with self.client_pool.get_and_release(destroy_on_fail=True) as client:
try:
client.quit()
finally:
self.client_pool.destroy(client)

def __setitem__(self, key, value):
self.set(key, value, noreply=True)

def __getitem__(self, key):
value = self.get(key)
if value is None:
raise KeyError
return value

def __delitem__(self, key):
self.delete(key, noreply=True)


def _readline(sock, buf):
"""Read line of text from the socket.
Expand Down
110 changes: 110 additions & 0 deletions pymemcache/pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright 2015 Yahoo.com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import contextlib
import sys
import threading

import six


class ObjectPool(object):
"""A pool of objects that release/creates/destroys as needed."""

def __init__(self, obj_creator,
after_remove=None, max_size=None):
self._used_objs = collections.deque()
self._free_objs = collections.deque()
self._obj_creator = obj_creator
self._lock = threading.Lock()
self._after_remove = after_remove
max_size = max_size or 2 ** 31
if not isinstance(max_size, six.integer_types) or max_size < 0:
raise ValueError('"max_size" must be a positive integer')
self.max_size = max_size

@property
def used(self):
return tuple(self._used_objs)

@property
def free(self):
return tuple(self._free_objs)

@contextlib.contextmanager
def get_and_release(self, destroy_on_fail=False):
obj = self.get()
try:
yield obj
except Exception:
exc_info = sys.exc_info()
if not destroy_on_fail:
self.release(obj)
else:
self.destroy(obj)
six.reraise(exc_info[0], exc_info[1], exc_info[2])
self.release(obj)

def get(self):
with self._lock:
if not self._free_objs:
curr_count = len(self._used_objs)
if curr_count >= self.max_size:
raise RuntimeError("Too many objects,"
" %s >= %s" % (curr_count,
self.max_size))
obj = self._obj_creator()
self._used_objs.append(obj)
return obj
else:
obj = self._free_objs.pop()
self._used_objs.append(obj)
return obj

def destroy(self, obj, silent=True):
was_dropped = False
with self._lock:
try:
self._used_objs.remove(obj)
was_dropped = True
except ValueError:
if not silent:
raise
if was_dropped and self._after_remove is not None:
self._after_remove(obj)

def release(self, obj, silent=True):
with self._lock:
try:
self._used_objs.remove(obj)
self._free_objs.append(obj)
except ValueError:
if not silent:
raise

def clear(self):
if self._after_remove is not None:
needs_destroy = []
with self._lock:
needs_destroy.extend(self._used_objs)
needs_destroy.extend(self._free_objs)
self._free_objs.clear()
self._used_objs.clear()
for obj in needs_destroy:
self._after_remove(obj)
else:
with self._lock:
self._free_objs.clear()
self._used_objs.clear()

0 comments on commit 0646937

Please sign in to comment.