Skip to content

Commit

Permalink
feat(cluster) better keyspace pooling
Browse files Browse the repository at this point in the history
Taking advantage of the recent fix in the init module that pools sockets
by their keyspace, this avoids setting a keyspace before each operation
in order to allow for `coordinator_options` in `execute()` and
`batch()`.

Instead, we set coordinators their keyspace when opening their
connection, as it should be. Cassandra keyspaces are expected to exist
when we connect to it via a cluster instance, or else the connection
will fail, as it should be. If no keyspace is set for the cluster, none
will be used. If a keyspace is set at the `coordinator_options` argument
(per-query level of granularity), then the cluster will set the keyspace
beore spawning the coordinator, hence it will always use a different
connection pool.
  • Loading branch information
thibaultcha committed Oct 3, 2016
1 parent a8dcd0b commit b0329a3
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 74 deletions.
35 changes: 23 additions & 12 deletions lib/cassandra/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ function _Host:connect()
end

if self.keyspace then
local res, err = self:set_keyspace(self.keyspace)
local keyspace_req = requests.keyspace.new(self.keyspace)
local res, err = self:send(keyspace_req)
if not res then return nil, err end
end
end
Expand Down Expand Up @@ -312,6 +313,27 @@ function _Host:close(...)
return self.sock:close(...)
end

--- Change the client's keyspace.
-- Closes the current connection and open a new one to the given
-- keyspace.
-- The connection is closed and reopen so that we use a different connection
-- pool for usage in ngx_lua.
-- @param[type=string] keyspace Name of the desired keyspace.
-- @treturn boolean `ok`: `true` if success, `nil` if failure.
-- @treturn string `err`: String describing the error if failure.
function _Host:change_keyspace(keyspace)
local ok, err = self:close()
if not ok then return nil, err end

local sock, err = socket.tcp()
if err then return nil, err end

self.sock = sock
self.keyspace = keyspace

return self:connect()
end

--- Query options.
-- @field consistency Consistency level for this request.
-- See `cassandra.consistencies` table.
Expand Down Expand Up @@ -407,17 +429,6 @@ end

_Host.page_iterator = page_iterator

--- Set the client's keyspace.
-- Sends a query to change which keyspace the client is connected to.
-- @param[type=string] keyspace Name of the desired keyspace.
-- @treturn table `res`: Table holding the query result if success, `nil` if failure.
-- @treturn string `err`: String describing the error if failure.
-- @treturn number `cql_err`: If a server-side error occurred, the CQL error code.
function _Host:set_keyspace(keyspace)
local keyspace_req = requests.keyspace.new(keyspace)
return self:send(keyspace_req)
end

--- Prepare a query.
-- Sends a PREPARE request for the given query. The result of this request will
-- contain a query id, which can be given to `execute` if the `prepared` option
Expand Down
62 changes: 16 additions & 46 deletions lib/resty/cassandra/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,18 @@ end
-- utils
----------------------------

local function spawn_peer(host, port, opts)
local function spawn_peer(host, port, keyspace, opts)
opts = opts or {}
opts.host = host
opts.port = port
opts.keyspace = keyspace
return cassandra.new(opts)
end

local function check_peer_health(self, host, retry)
local peer, err = spawn_peer(host, self.default_port, self.peers_opts)
local function check_peer_health(self, host, keyspace, retry)
local peer, err = spawn_peer(host, self.default_port,
keyspace or self.keyspace,
self.peers_opts)
if not peer then return nil, err
else
peer:settimeout(self.timeout_connect)
Expand Down Expand Up @@ -375,13 +378,13 @@ local function first_coordinator(self)
return nil, no_host_available_error(errors)
end

local function next_coordinator(self)
local function next_coordinator(self, keyspace)
local errors = {}

for _, peer_rec in self.lb_policy:iter() do
local ok, err, retry = can_try_peer(self, peer_rec.host)
if ok then
local peer, err = check_peer_health(self, peer_rec.host, retry)
local peer, err = check_peer_health(self, peer_rec.host, keyspace, retry)
if peer then
log(DEBUG, _log_prefix, 'load balancing policy chose host at ', peer.host)
return peer
Expand Down Expand Up @@ -677,38 +680,7 @@ do
local query_req = requests.query.new
local batch_req = requests.batch.new
local prep_req = requests.execute_prepared.new

--- Coordinator options.
-- Options to pass to coordinators chosen by the load balancing policy
-- on `execute`/`batch`/`iterate`.
-- @field keyspace Keyspace to use for the current request.
-- (`string`, optional)
-- @table `coordinator_options`

local function prepare_coordinator(self, coordinator, coordinator_options)
local reused, err = coordinator:getreusedtimes()
if not reused then return nil, err end

local keyspace

if coordinator_options and coordinator_options.keyspace then
keyspace = coordinator_options.keyspace
elseif self.keyspace then
--elseif self.keyspace and reused < 1 then
-- Note: ideally we would not set the keyspace on each query, but for now there
-- is no way to know if a reused connection has its keyspace set or not,
-- so we must set the keyspace regardless of if the connection is coming
-- from the pool or is a new one.
keyspace = self.keyspace
end

if keyspace then
local res, err = coordinator:set_keyspace(keyspace)
if not res then return nil, err end
end

return true
end
local empty_t = {}

--- Execute a query.
-- Sends a request to the coordinator chosen by the configured load
Expand Down Expand Up @@ -753,11 +725,10 @@ do
if not ok then return nil, 'could not refresh cluster: '..err end
end

local coordinator, err = next_coordinator(self)
if not coordinator then return nil, err end
coordinator_options = coordinator_options or empty_t

local ok, err = prepare_coordinator(self, coordinator, coordinator_options)
if not ok then return nil, err end
local coordinator, err = next_coordinator(self, coordinator_options.keyspace)
if not coordinator then return nil, err end

local request
local opts = get_request_opts(options)
Expand Down Expand Up @@ -803,17 +774,16 @@ do
-- @treturn table `res`: Table holding the query result if success, `nil` if failure.
-- @treturn string `err`: String describing the error if failure.
-- @treturn number `cql_err`: If a server-side error occurred, the CQL error code.
function _Cluster:batch(queries_t, options)
function _Cluster:batch(queries_t, options, coordinator_options)
if not self.init then
local ok, err = self:refresh()
if not ok then return nil, 'could not refresh cluster: '..err end
end

local coordinator, err = next_coordinator(self)
if not coordinator then return nil, err end
coordinator_options = coordinator_options or empty_t

local ok, err = prepare_coordinator(self, coordinator, options)
if not ok then return nil, err end
local coordinator, err = next_coordinator(self, coordinator_options.keyspace)
if not coordinator then return nil, err end

local opts = get_request_opts(options)

Expand Down
30 changes: 14 additions & 16 deletions spec/02-integration/01-host_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ describe("cassandra (host)", function()
assert.equal("timeout", err)
assert.True(maybe_down)
end)
it("connects directly in a keyspace", function()
local peer_k = assert(cassandra.new {keyspace = "system"})
assert(peer_k:connect())

local rows = assert(peer_k:execute "SELECT * FROM local")
assert.equal("local", rows[1].key)
end)
end)

describe("close()", function()
Expand Down Expand Up @@ -249,7 +256,7 @@ describe("cassandra (host)", function()
end)
describe("protocol v3 options", function()
setup(function()
assert(peer:set_keyspace(helpers.keyspace))
assert(peer:change_keyspace(helpers.keyspace))
assert(peer:execute [[
CREATE TABLE IF NOT EXISTS options(
id int PRIMARY KEY,
Expand Down Expand Up @@ -344,22 +351,13 @@ describe("cassandra (host)", function()
end)
end)

describe("set_keyspace()", function()
it("sets a peer's keyspace", function()
describe("change_keyspace()", function()
it("changes a peer's keyspace", function()
local peer_k = assert(cassandra.new())
assert(peer_k:connect())

local res = assert(peer_k:set_keyspace "system")
assert.equal(0, #res)
assert.equal("SET_KEYSPACE", res.type)
assert.equal("system", res.keyspace)

local rows = assert(peer_k:execute "SELECT * FROM local")
assert.equal("local", rows[1].key)
end)
it("connects directly in a keyspace", function()
local peer_k = assert(cassandra.new {keyspace = "system"})
assert(peer_k:connect())
assert(peer_k:change_keyspace "system")
assert.equal("system", peer_k.keyspace)

local rows = assert(peer_k:execute "SELECT * FROM local")
assert.equal("local", rows[1].key)
Expand All @@ -368,7 +366,7 @@ describe("cassandra (host)", function()

describe("batch()", function()
setup(function()
assert(peer:set_keyspace(helpers.keyspace))
assert(peer:change_keyspace(helpers.keyspace))
assert(peer:execute [[
CREATE TABLE IF NOT EXISTS things(
id uuid PRIMARY KEY,
Expand Down Expand Up @@ -531,7 +529,7 @@ describe("cassandra (host)", function()

describe("Types marshalling", function()
setup(function()
assert(peer:set_keyspace(helpers.keyspace))
assert(peer:change_keyspace(helpers.keyspace))
assert(peer:execute [[
CREATE TYPE IF NOT EXISTS address(
street text,
Expand Down
36 changes: 36 additions & 0 deletions t/06-cluster.t
Original file line number Diff line number Diff line change
Expand Up @@ -1128,3 +1128,39 @@ GET /t
127.0.0.1 is back up: true
--- no_error_log
[error]
=== TEST 25: next_coordinator() sets coordinator keyspace on connect
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local Cluster = require 'resty.cassandra.cluster'
local cluster, err = Cluster.new()
if not cluster then
ngx.log(ngx.ERR, err)
return
end
local ok, err = cluster:refresh()
if not ok then
ngx.log(ngx.ERR, err)
return
end
local coordinator, err = cluster:next_coordinator('system')
if not coordinator then
ngx.log(ngx.ERR, err)
return
end
ngx.say(coordinator.keyspace)
}
}
--- request
GET /t
--- response_body
system
--- no_error_log
[error]

0 comments on commit b0329a3

Please sign in to comment.