Skip to content

Commit

Permalink
Don't explicitly list CfDef attrs as SysMan method params
Browse files Browse the repository at this point in the history
  • Loading branch information
thobbs committed Jul 17, 2012
1 parent 21dc860 commit 194a195
Showing 1 changed file with 19 additions and 227 deletions.
246 changes: 19 additions & 227 deletions pycassa/system_manager.py
Expand Up @@ -2,7 +2,7 @@

from pycassa.connection import Connection
from pycassa.cassandra.ttypes import IndexType, KsDef, CfDef, ColumnDef,\
InvalidRequestException, SchemaDisagreementException
SchemaDisagreementException
import pycassa.marshal as marshal
import pycassa.types as types

Expand Down Expand Up @@ -102,7 +102,7 @@ def get_keyspace_properties(self, keyspace):
"""
Gets a keyspace's properties.
Returns a :class:`dict` with 'strategy_class' and
Returns a :class:`dict` with 'strategy_class' and
'strategy_options' as keys.
"""
if keyspace is None:
Expand All @@ -112,7 +112,6 @@ def get_keyspace_properties(self, keyspace):
return {'replication_strategy': ks_def.strategy_class,
'strategy_options': ks_def.strategy_options}


def list_keyspaces(self):
""" Returns a list of all keyspace names. """
return [ks.name for ks in self._conn.describe_keyspaces()]
Expand All @@ -136,12 +135,12 @@ def describe_schema_versions(self):
def describe_partitioner(self):
""" Gives the partitioner that the cluster is using """
part = self._conn.describe_partitioner()
return part[part.rfind('.') + 1: ]
return part[part.rfind('.') + 1:]

def describe_snitch(self):
""" Gives the snitch that the cluster is using """
snitch = self._conn.describe_snitch()
return snitch[snitch.rfind('.') + 1: ]
return snitch[snitch.rfind('.') + 1:]

def _system_add_keyspace(self, ksdef):
return self._schema_update(self._conn.system_add_keyspace, ksdef)
Expand Down Expand Up @@ -236,49 +235,7 @@ def _system_add_column_family(self, cfdef):
self._conn.set_keyspace(cfdef.keyspace)
return self._schema_update(self._conn.system_add_column_family, cfdef)

def _qualify_type_class(self, classname):
if classname:
if isinstance(classname, types.CassandraType):
s = str(classname)
elif isinstance(classname, basestring):
s = classname
else:
raise TypeError(
"Column family validators and comparators " \
"must be specified as instances of " \
"pycassa.types.CassandraType subclasses or strings.")

if s.find('.') == -1:
return 'org.apache.cassandra.db.marshal.%s' % s
else:
return s
else:
return None

def create_column_family(self, keyspace, name, super=False,
comparator_type=None,
subcomparator_type=None,
column_validation_classes=None,
key_cache_size=None,
row_cache_size=None,
gc_grace_seconds=None,
read_repair_chance=None,
default_validation_class=None,
key_validation_class=None,
min_compaction_threshold=None,
max_compaction_threshold=None,
key_cache_save_period_in_seconds=None,
row_cache_save_period_in_seconds=None,
replicate_on_write=None,
merge_shards_chance=None,
row_cache_provider=None,
key_alias=None,
compaction_strategy=None,
compaction_strategy_options=None,
row_cache_keys_to_save=None,
compression_options=None,
caching=None,
comment=None):
def create_column_family(self, keyspace, name, column_validation_classes=None, **cf_kwargs):

"""
Creates a new column family in a given keyspace. If a value is not
Expand All @@ -287,205 +244,44 @@ def create_column_family(self, keyspace, name, super=False,
`keyspace` should be the name of the keyspace the column family will
be created in. `name` gives the name of the column family.
Column family options follow:
================================ =====================================
Name Description
================================ =====================================
super Whether or not this column family is
a super column family
comparator_type What type the column names will be,
which affects their sort order. This
should be an instance of
:class:`~.types.CassandraType`
subcomparator_type Like `comparator_type`, but if this
is a super column family, this
applies to the subcolumn names
column_validation_classes A dictionary mapping column names to
column value types. The types should
be expressed as instances of
:class:`~.types.CassandraType`
default_validation_class The data type for all column values
in the column family
key_validation_class The data type for row keys
key_cache_size The size of the key cache, either in
a percentage of total keys (0.15, for
example) or an absolute number of
keys (2000, for example)
row_cache_size Like `key_cache_size`, but for the
row cache
gc_grace_seconds Number of seconds before tombstones
are removed during compactions
read_repair_chance Probability (as a float from 0 to 1)
of read repair occurring
min_compaction_threshold Number of similarly sized SSTables
that must be present for a compaction
to occur. Does not apply to
``LeveledCompactionStrategy``.
max_compaction_threshold The maximum number of SSTables that
may be included in a single
compaction. Does not apply to
``LeveledCompactionStrategy``.
key_cache_save_period_in_seconds How often the key cache should be
saved. This helps to avoid high
latency reads from a cold cache
after restarts.
row_cache_save_period_in_seconds Same as the above, but for row cache
replicate_on_write Whether counter operations are
replicated at write-time
merge_shards_chance The probability that counter shards
will be merged
row_cache_provider The class that will be used to store
the row cache
key_alias A "column name" to be used for the
key. This currently only matters for
CQL.
compaction_strategy The name of the compaction
strategy class. Current choices are
``SizeTieredCompactoinStrategy`` and
``LeveledCompactionStrategy``.
compaction_strategy_options A ``dict`` of options for the
compaction strategy
row_cache_keys_to_save A list of keys to be saved in the row
cache; :const:`None` allows any row
to be cached
compression_options A dict of options for compression.
Available keys include
``sstable_compression``, which may be
:const:`None` for no compression,
``SnappyCompressor``,
``DeflateCompressor``, or a custom
compressor, and ``chunk_length_kb``,
which must be a power of 2.
caching Specify caching policy, one of
`all`, `keys_only`, `rows_only` or
`none`, defaults to `keys_only`.
comment A human readable comment describing
the column family
================================ =====================================
.. versionadded:: 1.4.0
The `column_validation_classes` parameter.
.. versionadded:: 1.7.0
The `caching` parameter.
"""

self._conn.set_keyspace(keyspace)
cfdef = CfDef()
cfdef.keyspace = keyspace
cfdef.name = name

if super:
cfdef.column_type = 'Super'
if cf_kwargs.pop('super', False):
cf_kwargs.setdefault('column_type', 'Super')

cfdef.comparator_type = self._qualify_type_class(comparator_type)
cfdef.subcomparator_type = self._qualify_type_class(subcomparator_type)
cfdef.default_validation_class = self._qualify_type_class(default_validation_class)
cfdef.key_validation_class = self._qualify_type_class(key_validation_class)
for k, v in cf_kwargs.iteritems():
setattr(cfdef, k, v)

if column_validation_classes:
for (columnName, value_type) in column_validation_classes.items():
cfdef = self._alter_column_cfdef(cfdef, columnName, value_type)

cfdef.replicate_on_write = replicate_on_write
cfdef.comment = comment
cfdef.key_alias = key_alias
if row_cache_provider:
cfdef.row_cache_provider = row_cache_provider

self._cfdef_assign(key_cache_size, cfdef, 'key_cache_size')
self._cfdef_assign(row_cache_size, cfdef, 'row_cache_size')
self._cfdef_assign(gc_grace_seconds, cfdef, 'gc_grace_seconds')
self._cfdef_assign(read_repair_chance, cfdef, 'read_repair_chance')
self._cfdef_assign(min_compaction_threshold, cfdef, 'min_compaction_threshold')
self._cfdef_assign(max_compaction_threshold, cfdef, 'max_compaction_threshold')
self._cfdef_assign(key_cache_save_period_in_seconds, cfdef, 'key_cache_save_period_in_seconds')
self._cfdef_assign(row_cache_save_period_in_seconds, cfdef, 'row_cache_save_period_in_seconds')
self._cfdef_assign(merge_shards_chance, cfdef, 'merge_shards_chance')
self._cfdef_assign(compaction_strategy, cfdef, 'compaction_strategy')
self._cfdef_assign(compaction_strategy_options, cfdef, 'compaction_strategy_options')
self._cfdef_assign(row_cache_keys_to_save, cfdef, 'row_cache_keys_to_save')
self._cfdef_assign(compression_options, cfdef, 'compression_options')
self._cfdef_assign(caching, cfdef, 'caching')

self._system_add_column_family(cfdef)

def _cfdef_assign(self, attr, cfdef, attr_name):
if attr is not None:
if attr < 0:
self._raise_ire('%s must be non-negative' % attr_name)
else:
setattr(cfdef, attr_name, attr)
for (colname, value_type) in column_validation_classes.items():
cfdef = self._alter_column_cfdef(cfdef, colname, value_type)

def _raise_ire(self, why):
ire = InvalidRequestException()
ire.why = why
raise ire
self._system_add_column_family(cfdef)

def _system_update_column_family(self, cfdef):
return self._schema_update(self._conn.system_update_column_family, cfdef)

def alter_column_family(self, keyspace, column_family,
key_cache_size=None,
row_cache_size=None,
gc_grace_seconds=None,
read_repair_chance=None,
default_validation_class=None,
column_validation_classes=None,
min_compaction_threshold=None,
max_compaction_threshold=None,
key_cache_save_period_in_seconds=None,
row_cache_save_period_in_seconds=None,
replicate_on_write=None,
merge_shards_chance=None,
row_cache_provider=None,
key_alias=None,
compaction_strategy=None,
compaction_strategy_options=None,
row_cache_keys_to_save=None,
compression_options=None,
caching=None,
comment=None):

def alter_column_family(self, keyspace, column_family, column_validation_classes=None, **cf_kwargs):
"""
Alters an existing column family.
Parameter meanings are the same as for :meth:`create_column_family`,
but column family attributes which may not be modified are not
included here.
Parameter meanings are the same as for :meth:`create_column_family`.
"""

self._conn.set_keyspace(keyspace)
cfdef = self.get_keyspace_column_families(keyspace)[column_family]

self._cfdef_assign(key_cache_size, cfdef, 'key_cache_size')
self._cfdef_assign(row_cache_size, cfdef, 'row_cache_size')
self._cfdef_assign(gc_grace_seconds, cfdef, 'gc_grace_seconds')
self._cfdef_assign(read_repair_chance, cfdef, 'read_repair_chance')
self._cfdef_assign(min_compaction_threshold, cfdef, 'min_compaction_threshold')
self._cfdef_assign(max_compaction_threshold, cfdef, 'max_compaction_threshold')
self._cfdef_assign(key_cache_save_period_in_seconds, cfdef, 'key_cache_save_period_in_seconds')
self._cfdef_assign(row_cache_save_period_in_seconds, cfdef, 'row_cache_save_period_in_seconds')
self._cfdef_assign(compaction_strategy, cfdef, 'compaction_strategy')
self._cfdef_assign(compaction_strategy_options, cfdef, 'compaction_strategy_options')
self._cfdef_assign(row_cache_keys_to_save, cfdef, 'row_cache_keys_to_save')
self._cfdef_assign(compression_options, cfdef, 'compression_options')
self._cfdef_assign(caching, cfdef, 'caching')
self._cfdef_assign(merge_shards_chance, cfdef, 'merge_shards_chance')
self._cfdef_assign(comment, cfdef, 'comment')

cfdef.default_validation_class = self._qualify_type_class(default_validation_class)
cfdef.replicate_on_write = replicate_on_write
cfdef.key_alias = key_alias
if row_cache_provider:
cfdef.row_cache_provider = row_cache_provider
for k, v in cf_kwargs.iteritems():
setattr(cfdef, k, v)

if column_validation_classes:
for (columnName, value_type) in column_validation_classes.items():
cfdef = self._alter_column_cfdef(cfdef, columnName, value_type)
for (colname, value_type) in column_validation_classes.items():
cfdef = self._alter_column_cfdef(cfdef, colname, value_type)

self._system_update_column_family(cfdef)

Expand All @@ -505,8 +301,6 @@ def _alter_column_cfdef(self, cfdef, column, value_type):

packed_column = packer(column)

value_type = self._qualify_type_class(value_type)

cfdef.column_metadata = cfdef.column_metadata or []
matched = False
for c in cfdef.column_metadata:
Expand Down Expand Up @@ -573,8 +367,6 @@ def create_index(self, keyspace, column_family, column, value_type,
packer = marshal.packer_for(cfdef.comparator_type)
packed_column = packer(column)

value_type = self._qualify_type_class(value_type)

coldef = ColumnDef(packed_column, value_type, index_type, index_name)

for c in cfdef.column_metadata:
Expand Down Expand Up @@ -617,7 +409,7 @@ def _wait_for_agreement(self):

def _schema_update(self, schema_func, *args):
"""
Call schema updates functions and properly
Call schema updates functions and properly
waits for agreement if needed.
"""
while True:
Expand Down

0 comments on commit 194a195

Please sign in to comment.