Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Set CQL3 consistency per connection #64

Open
wants to merge 3 commits into from

3 participants

@outoftime

Not thread-safe as the assumption is you'll use one connection per thread.

Adapted from outoftime#3

@outoftime

@ottbot Finally sending this upstream... sorry to let it lie fallow!

@kreynolds
Owner

Can we mutex the with_consistency call so that this becomes thread safe again?

@outoftime

My initial assumption was that connections would not be shared between threads, but realistically one would probably be using some sort of connection pooling which makes this particular approach pretty bad.

How about as an alternative, #execute_cql_query takes an options hash as its second argument, which allows :compression and :consistency? We could just raise an ArgumentError if consistency were specified for a CQL2 connection. And it should be easy enough to check if the second argument is just a consistency argument directly for backwards compatibility.

If this seems reasonable I'd be glad to code it up and update this PR.

@kreynolds
Owner

I think that makes a lot more sense. Its a breaking change but we can just bump the minor version and if you could slightly alter the README to let people know about the new API, that'd be fantastic.

@outoftime

Great! Although I don't think it has to be a breaking change if we just check the type of the second argument?

@kreynolds
Owner

I suppose, but I prefer to minimize the overhead per query as much as possible and adding hash munging to it already makes me unhappy, though thats just how ruby rolls. We could write multiple methods and have RUBY_VERSION >= 2 use named keywords :)

@outoftime

OK, no strong objection here if you're down for a breaking change – do you think it's worth providing an alternate implementation for Ruby 2?

@kreynolds
Owner

I know that hash setup/teardown is pretty gross for GC/CPU in ruby, but I've not specifically benchmarked the difference between using keyword arguments and a hash ... I don't know what ruby does under the hood for that. For now don't bother, maybe just add a TODO note or something.

@outoftime

Will do!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 30, 2013
  1. @ottbot @outoftime

    Set the consistency level used for CQL3 queries

    ottbot authored outoftime committed
  2. @ottbot @outoftime

    Change the consistency level used for a connection

    ottbot authored outoftime committed
  3. @ottbot @outoftime

    Use a different consistency level within a block

    ottbot authored outoftime committed
This page is out of date. Refresh to see the latest.
View
60 lib/cassandra-cql/database.rb
@@ -21,7 +21,7 @@ class InvalidRequestException < Exception; end
class Database
attr_reader :connection, :schema, :keyspace
-
+
def initialize(servers, options={}, thrift_client_options={})
@options = {
:keyspace => 'system'
@@ -34,12 +34,20 @@ def initialize(servers, options={}, thrift_client_options={})
@keyspace = @options[:keyspace]
@cql_version = @options[:cql_version]
+ @use_cql3_query =
+ (@cql_version.nil? || @cql_version.split('.').first.to_i >= 3) &&
+ CassandraCQL::Thrift::Client.method_defined?(:execute_cql3_query)
+
+ if @use_cql3_query
+ self.consistency = @options.fetch(:consistency, :quorum)
+ end
+
@servers = servers
connect!
end
- def use_cql3?
- @use_cql3
+ def use_cql3_query?
+ @use_cql3_query
end
def connect!
@@ -52,12 +60,12 @@ def connect!
obj = self
@connection.add_callback(:post_connect) do
if @connection.describe_version >= '19.35.0' && (!@cql_version || @cql_version >= '3.0.0')
- @use_cql3 = true
+ @use_cql3_query = true
elsif @cql_version
- @use_cql3 = false
+ @use_cql3_query = false
@connection.set_cql_version(@cql_version)
else
- @use_cql3 = false
+ @use_cql3_query = false
end
@connection.login(@auth_request) if @auth_request
execute("USE #{@keyspace}")
@@ -71,7 +79,7 @@ def disconnect!
def active?
# TODO: This should be replaced with a CQL call that doesn't exist yet
- @connection.describe_version
+ @connection.describe_version
true
rescue Exception
false
@@ -112,24 +120,24 @@ def execute(statement, *bind_vars)
end
def execute_cql_query(cql, compression=CassandraCQL::Thrift::Compression::NONE)
- if use_cql3?
- @connection.execute_cql3_query(cql, compression, CassandraCQL::Thrift::ConsistencyLevel::QUORUM) #TODO consistency level
+ if use_cql3_query?
+ @connection.execute_cql3_query(cql, compression, consistency)
else
@connection.execute_cql_query(cql, compression)
end
rescue CassandraCQL::Thrift::InvalidRequestException
raise Error::InvalidRequestException.new($!.why)
end
-
+
def keyspace=(ks)
@keyspace = (ks.nil? ? nil : ks.to_s)
end
-
+
def keyspaces
# TODO: This should be replaced with a CQL call that doesn't exist yet
@connection.describe_keyspaces.map { |keyspace| Schema.new(keyspace) }
end
-
+
def schema
# TODO: This should be replaced with a CQL call that doesn't exist yet
Schema.new(@connection.describe_keyspace(@keyspace))
@@ -144,5 +152,33 @@ def login!(username, password)
@auth_request = request
ret
end
+
+ def with_consistency(level)
+ previous_level = @consistency_name
+ self.consistency = level
+ yield
+ self.consistency = previous_level
+ end
+
+ def consistency
+ @consistency ||= get_consistency(:quorum)
+ end
+
+ def consistency=(level)
+ @consistency_name = level
+ @consistency = get_consistency(level)
+ end
+
+ private
+
+ def get_consistency(level)
+ level_const = level.to_s.upcase
+
+ if CassandraCQL::Thrift::ConsistencyLevel.const_defined?(level_const)
+ CassandraCQL::Thrift::ConsistencyLevel.const_get(level_const)
+ else
+ raise ArgumentError.new("Invalid consistency level")
+ end
+ end
end
end
View
2  lib/cassandra-cql/statement.rb
@@ -37,7 +37,7 @@ def prepare(statement)
end
def execute(bind_vars=[], options={})
- sanitized_query = self.class.sanitize(@statement, bind_vars, @handle.use_cql3?)
+ sanitized_query = self.class.sanitize(@statement, bind_vars, @handle.use_cql3_query?)
compression_type = CassandraCQL::Thrift::Compression::NONE
if options[:compression]
compression_type = CassandraCQL::Thrift::Compression::GZIP
View
31 spec/database_spec.rb
@@ -5,14 +5,14 @@
before do
@connection = setup_cassandra_connection
end
-
+
describe "reset!" do
it "should create a new connection" do
@connection.should_receive(:connect!)
@connection.reset!
end
end
-
+
describe "login!" do
it "should call login! on connection" do
creds = { 'username' => 'myuser', 'password' => 'mypass' }
@@ -22,4 +22,29 @@
@connection.login!(creds['username'], creds['password'])
end
end
-end
+
+ describe "setting the consistency level" do
+ context "with CQL3", :cql_version => "3.0.0" do
+ it "should return the current consistency" do
+ @connection.consistency.should == CassandraCQL::Thrift::ConsistencyLevel::QUORUM
+ end
+
+ it "should change the consistency" do
+ @connection.consistency = :all
+ @connection.consistency.should == CassandraCQL::Thrift::ConsistencyLevel::ALL
+ end
+
+ it "should change the consistency within a block" do
+ @connection.with_consistency(:one) do
+ @connection.consistency.should == CassandraCQL::Thrift::ConsistencyLevel::ONE
+ end
+
+ @connection.consistency.should == CassandraCQL::Thrift::ConsistencyLevel::QUORUM
+ end
+
+ it "should raise and error if invalid consistency is used" do
+ expect { @connection.consistency = :foo }.to raise_error(ArgumentError)
+ end
+ end
+ end
+end
Something went wrong with that request. Please try again.