Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

415 lines (348 sloc) 14.242 kB
"""
A combination of utilities used internally by pycassa and utilities
available for use by others working with pycassa.
"""
import random
import uuid
import time
import struct
__all__ = ['convert_time_to_uuid', 'convert_uuid_to_time', 'OrderedDict']
_number_types = frozenset((int, long, float))
if hasattr(struct, 'Struct'): # new in Python 2.5
_have_struct = True
_long_packer = struct.Struct('>q')
_int_packer = struct.Struct('>i')
_uuid_packer = struct.Struct('>16s')
else:
_have_struct = False
def convert_time_to_uuid(time_arg, lowest_val=True, randomize=False):
"""
Converts a datetime or timestamp to a type 1 :class:`uuid.UUID`.
This is to assist with getting a time slice of columns or creating
columns when column names are ``TimeUUIDType``. Note that this is done
automatically in most cases if name packing and value packing are
enabled.
Also, be careful not to rely on this when specifying a discrete
set of columns to fetch, as the non-timestamp portions of the
UUID will be generated randomly. This problem does not matter
with slice arguments, however, as the non-timestamp portions
can be set to their lowest or highest possible values.
:param datetime:
The time to use for the timestamp portion of the UUID.
Expected inputs to this would either be a :class:`datetime`
object or a timestamp with the same precision produced by
:meth:`time.time()`. That is, sub-second precision should
be below the decimal place.
:type datetime: :class:`datetime` or timestamp
:param lowest_val:
Whether the UUID produced should be the lowest possible value
UUID with the same timestamp as datetime or the highest possible
value.
:type lowest_val: bool
:param randomize:
Whether the clock and node bits of the UUID should be randomly
generated. The `lowest_val` argument will be ignored if this
is true.
:type randomize: bool
:rtype: :class:`uuid.UUID`
"""
if isinstance(time_arg, uuid.UUID):
return time_arg
nanoseconds = 0
if hasattr(time_arg, 'timetuple'):
seconds = int(time.mktime(time_arg.timetuple()))
microseconds = (seconds * 1e6) + time_arg.time().microsecond
nanoseconds = microseconds * 1e3
elif type(time_arg) in _number_types:
nanoseconds = int(time_arg * 1e9)
else:
raise ValueError('Argument for a v1 UUID column name or value was ' +
'neither a UUID, a datetime, or a number')
# 0x01b21dd213814000 is the number of 100-ns intervals between the
# UUID epoch 1582-10-15 00:00:00 and the Unix epoch 1970-01-01 00:00:00.
timestamp = int(nanoseconds/100) + 0x01b21dd213814000L
time_low = timestamp & 0xffffffffL
time_mid = (timestamp >> 32L) & 0xffffL
time_hi_version = (timestamp >> 48L) & 0x0fffL
if randomize:
rand_bits = random.getrandbits(8 + 8 + 48)
clock_seq_low = rand_bits & 0xffL # 8 bits, no offset
clock_seq_hi_variant = (rand_bits & 0xff00L) / 0x100 # 8 bits, 8 offset
node = (rand_bits & 0xffffffffffff0000L) / 0x10000L # 48 bits, 16 offset
else:
if lowest_val:
# Make the lowest value UUID with the same timestamp
clock_seq_low = 0 & 0xffL
clock_seq_hi_variant = 0 & 0x3fL
node = 0 & 0xffffffffffffL # 48 bits
else:
# Make the highest value UUID with the same timestamp
clock_seq_low = 0xffL
clock_seq_hi_variant = 0x3fL
node = 0xffffffffffffL # 48 bits
return uuid.UUID(fields=(time_low, time_mid, time_hi_version,
clock_seq_hi_variant, clock_seq_low, node), version=1)
def convert_uuid_to_time(uuid_arg):
"""
Converts a version 1 :class:`uuid.UUID` to a timestamp with the same precision
as :meth:`time.time()` returns. This is useful for examining the
results of queries returning a v1 :class:`~uuid.UUID`.
:param uuid_arg: a version 1 :class:`~uuid.UUID`
:rtype: timestamp
"""
ts = uuid_arg.get_time()
return (ts - 0x01b21dd213814000L)/1e7
_TYPES = ['BytesType', 'LongType', 'IntegerType', 'UTF8Type', 'AsciiType',
'LexicalUUIDType', 'TimeUUIDType']
def extract_type_name(string):
if string is None: return 'BytesType'
index = string.rfind('.')
if index == -1:
string = 'BytesType'
else:
string = string[index + 1: ]
if string not in _TYPES:
string = 'BytesType'
return string
def pack(value, data_type):
"""
Packs a value into the expected sequence of bytes that Cassandra expects.
"""
if data_type == 'BytesType':
return value
elif data_type == 'LongType':
if _have_struct:
return _long_packer.pack(long(value))
else:
return struct.pack('>q', long(value)) # q is 'long long'
elif data_type == 'IntegerType':
if _have_struct:
return _int_packer.pack(int(value))
else:
return struct.pack('>i', int(value))
elif data_type == 'UTF8Type':
try:
st = value.encode('utf-8')
except UnicodeDecodeError:
# value is already utf-8 encoded
st = value
return st
elif data_type == 'TimeUUIDType' or data_type == 'LexicalUUIDType':
if not hasattr(value, 'bytes'):
raise TypeError("%s not valid for %s" % (value, data_type))
if _have_struct:
return _uuid_packer.pack(value.bytes)
else:
return struct.pack('>16s', value.bytes)
else:
return value
def unpack(byte_array, data_type):
"""
Unpacks Cassandra's byte-representation of values into their Python
equivalents.
"""
if data_type == 'BytesType':
return byte_array
elif data_type == 'LongType':
if _have_struct:
return _long_packer.unpack(byte_array)[0]
else:
return struct.unpack('>q', byte_array)[0]
elif data_type == 'IntegerType':
if _have_struct:
return _int_packer.unpack(byte_array)[0]
else:
return struct.unpack('>i', byte_array)[0]
elif data_type == 'AsciiType':
return byte_array
elif data_type == 'UTF8Type':
return byte_array.decode('utf-8')
elif data_type == 'LexicalUUIDType' or data_type == 'TimeUUIDType':
if _have_struct:
temp_bytes = _uuid_packer.unpack(byte_array)[0]
else:
temp_bytes = struct.unpack('>16s', byte_array)[0]
return uuid.UUID(bytes=temp_bytes)
else:
return byte_array
# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Michael Bayer mike_mp@zzzcomputing.com
#
# The 'as_interface' method is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
import operator
def as_interface(obj, cls=None, methods=None, required=None):
"""Ensure basic interface compliance for an instance or dict of callables.
Checks that ``obj`` implements public methods of ``cls`` or has members
listed in ``methods``. If ``required`` is not supplied, implementing at
least one interface method is sufficient. Methods present on ``obj`` that
are not in the interface are ignored.
If ``obj`` is a dict and ``dict`` does not meet the interface
requirements, the keys of the dictionary are inspected. Keys present in
``obj`` that are not in the interface will raise TypeErrors.
Raises TypeError if ``obj`` does not meet the interface criteria.
In all passing cases, an object with callable members is returned. In the
simple case, ``obj`` is returned as-is; if dict processing kicks in then
an anonymous class is returned.
obj
A type, instance, or dictionary of callables.
cls
Optional, a type. All public methods of cls are considered the
interface. An ``obj`` instance of cls will always pass, ignoring
``required``..
methods
Optional, a sequence of method names to consider as the interface.
required
Optional, a sequence of mandatory implementations. If omitted, an
``obj`` that provides at least one interface method is considered
sufficient. As a convenience, required may be a type, in which case
all public methods of the type are required.
"""
if not cls and not methods:
raise TypeError('a class or collection of method names are required')
if isinstance(cls, type) and isinstance(obj, cls):
return obj
interface = set(methods or [m for m in dir(cls) if not m.startswith('_')])
implemented = set(dir(obj))
complies = operator.ge
if isinstance(required, type):
required = interface
elif not required:
required = set()
complies = operator.gt
else:
required = set(required)
if complies(implemented.intersection(interface), required):
return obj
# No dict duck typing here.
if not type(obj) is dict:
qualifier = complies is operator.gt and 'any of' or 'all of'
raise TypeError("%r does not implement %s: %s" % (
obj, qualifier, ', '.join(interface)))
class AnonymousInterface(object):
"""A callable-holding shell."""
if cls:
AnonymousInterface.__name__ = 'Anonymous' + cls.__name__
found = set()
for method, impl in dictlike_iteritems(obj):
if method not in interface:
raise TypeError("%r: unknown in this interface" % method)
if not callable(impl):
raise TypeError("%r=%r is not callable" % (method, impl))
setattr(AnonymousInterface, method, staticmethod(impl))
found.add(method)
if complies(found, required):
return AnonymousInterface
raise TypeError("dictionary does not contain required keys %s" %
', '.join(required - found))
# Copyright (c) 2009 Raymond Hettinger
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
from UserDict import DictMixin
class OrderedDict(dict, DictMixin):
""" A dictionary which maintains the insertion order of keys. """
def __init__(self, *args, **kwds):
""" A dictionary which maintains the insertion order of keys. """
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
try:
self.__end
except AttributeError:
self.clear()
self.update(*args, **kwds)
def clear(self):
self.__end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.__map = {} # key --> [key, prev, next]
dict.clear(self)
def __setitem__(self, key, value):
if key not in self:
end = self.__end
curr = end[1]
curr[2] = end[1] = self.__map[key] = [key, curr, end]
dict.__setitem__(self, key, value)
def __delitem__(self, key):
dict.__delitem__(self, key)
key, prev, next = self.__map.pop(key)
prev[2] = next
next[1] = prev
def __iter__(self):
end = self.__end
curr = end[2]
while curr is not end:
yield curr[0]
curr = curr[2]
def __reversed__(self):
end = self.__end
curr = end[1]
while curr is not end:
yield curr[0]
curr = curr[1]
def popitem(self, last=True):
if not self:
raise KeyError('dictionary is empty')
if last:
key = reversed(self).next()
else:
key = iter(self).next()
value = self.pop(key)
return key, value
def __reduce__(self):
items = [[k, self[k]] for k in self]
tmp = self.__map, self.__end
del self.__map, self.__end
inst_dict = vars(self).copy()
self.__map, self.__end = tmp
if inst_dict:
return (self.__class__, (items,), inst_dict)
return self.__class__, (items,)
def keys(self):
return list(self)
setdefault = DictMixin.setdefault
update = DictMixin.update
pop = DictMixin.pop
values = DictMixin.values
items = DictMixin.items
iterkeys = DictMixin.iterkeys
itervalues = DictMixin.itervalues
iteritems = DictMixin.iteritems
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, self.items())
def copy(self):
return self.__class__(self)
@classmethod
def fromkeys(cls, iterable, value=None):
d = cls()
for key in iterable:
d[key] = value
return d
def __eq__(self, other):
if isinstance(other, OrderedDict):
if len(self) != len(other):
return False
for p, q in zip(self.items(), other.items()):
if p != q:
return False
return True
return dict.__eq__(self, other)
def __ne__(self, other):
return not self == other
Jump to Line
Something went wrong with that request. Please try again.