/
cassandra_cql.rb
185 lines (166 loc) · 7.4 KB
/
cassandra_cql.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
require 'forwardable'
require 'cassandra-cql'
require 'cassanity/error'
require 'cassanity/instrumenters/noop'
require 'cassanity/argument_generators/keyspaces'
require 'cassanity/argument_generators/keyspace_create'
require 'cassanity/argument_generators/keyspace_drop'
require 'cassanity/argument_generators/keyspace_use'
require 'cassanity/argument_generators/column_families'
require 'cassanity/argument_generators/column_family_create'
require 'cassanity/argument_generators/column_family_drop'
require 'cassanity/argument_generators/column_family_truncate'
require 'cassanity/argument_generators/column_family_select'
require 'cassanity/argument_generators/column_family_insert'
require 'cassanity/argument_generators/column_family_update'
require 'cassanity/argument_generators/column_family_delete'
require 'cassanity/argument_generators/column_family_alter'
require 'cassanity/argument_generators/index_create'
require 'cassanity/argument_generators/index_drop'
require 'cassanity/argument_generators/batch'
require 'cassanity/argument_generators/columns'
require 'cassanity/result_transformers/result_to_array'
require 'cassanity/result_transformers/keyspaces'
require 'cassanity/result_transformers/column_families'
require 'cassanity/result_transformers/columns'
require 'cassanity/result_transformers/mirror'
require 'cassanity/retry_strategies/retry_n_times'
require 'cassanity/retry_strategies/exponential_backoff'
module Cassanity
module Executors
class CassandraCql
extend Forwardable
# Private: Hash of commands to related argument generators.
DefaultArgumentGenerators = {
keyspaces: ArgumentGenerators::Keyspaces.new,
keyspace_create: ArgumentGenerators::KeyspaceCreate.new,
keyspace_drop: ArgumentGenerators::KeyspaceDrop.new,
keyspace_use: ArgumentGenerators::KeyspaceUse.new,
column_families: ArgumentGenerators::ColumnFamilies.new,
column_family_create: ArgumentGenerators::ColumnFamilyCreate.new,
column_family_drop: ArgumentGenerators::ColumnFamilyDrop.new,
column_family_truncate: ArgumentGenerators::ColumnFamilyTruncate.new,
column_family_select: ArgumentGenerators::ColumnFamilySelect.new,
column_family_insert: ArgumentGenerators::ColumnFamilyInsert.new,
column_family_update: ArgumentGenerators::ColumnFamilyUpdate.new,
column_family_delete: ArgumentGenerators::ColumnFamilyDelete.new,
column_family_alter: ArgumentGenerators::ColumnFamilyAlter.new,
index_create: ArgumentGenerators::IndexCreate.new,
index_drop: ArgumentGenerators::IndexDrop.new,
batch: ArgumentGenerators::Batch.new,
columns: ArgumentGenerators::Columns.new,
}
# Private: Hash of commands to related result transformers.
DefaultResultTransformers = {
keyspaces: ResultTransformers::Keyspaces.new,
column_families: ResultTransformers::ColumnFamilies.new,
column_family_select: ResultTransformers::ResultToArray.new,
columns: ResultTransformers::Columns.new,
}
# Private: Default retry strategy to retry N times.
DefaultRetryStrategy = RetryStrategies::RetryNTimes.new
# Private: Default result transformer for commands that do not have one.
Mirror = ResultTransformers::Mirror.new
# Private: Forward #instrument to instrumenter.
def_delegator :@instrumenter, :instrument
# Private
attr_reader :driver
# Private
attr_reader :argument_generators
# Private
attr_reader :result_transformers
# Private: What should be used to instrument all the things.
attr_reader :instrumenter
# Private: What strategy to use when retrying Cassandra commands
attr_reader :retry_strategy
# Internal: Initializes a cassandra-cql based CQL executor.
#
# args - The Hash of arguments.
# :driver - The CassandraCQL::Database connection instance.
# :instrumenter - What should be used to instrument all the things
# (default: Cassanity::Instrumenters::Noop).
# :argument_generators - A Hash where each key is a command name
# and each value is the related argument
# generator that responds to `call`
# (optional).
# :result_transformers - A Hash where each key is a command name
# and each value is the related result
# transformer that responds to `call`
# (optional).
# :retry_strategy - What retry strategy to use on failed
# CassandraCQL calls
# (default: Cassanity::Instrumenters::RetryNTimes)
#
# Examples
#
# driver = CassandraCQL::Database.new('host', cql_version: '3.0.0')
# Cassanity::Executors::CassandraCql.new(driver: driver)
#
def initialize(args = {})
@driver = args.fetch(:driver)
@instrumenter = args[:instrumenter] || Instrumenters::Noop
@argument_generators = args.fetch(:argument_generators, DefaultArgumentGenerators)
@result_transformers = args.fetch(:result_transformers, DefaultResultTransformers)
@retry_strategy = args[:retry_strategy] || DefaultRetryStrategy
end
# Internal: Execute a CQL query.
#
# args - One or more arguments to send to execute. First should always be
# String CQL query. The rest should be the bound variables if any
# are needed.
#
# Examples
#
# call({
# command: :keyspaces,
# })
#
# call({
# command: :keyspace_create,
# arguments: {keyspace_name: 'analytics'},
# })
#
# Returns the result of execution.
# Raises Cassanity::Error if anything goes wrong during execution.
def call(args = {})
instrument('cql.cassanity') do |payload|
begin
command = args.fetch(:command)
payload[:command] = command
generator = @argument_generators.fetch(command)
rescue KeyError => e
raise Cassanity::UnknownCommand
end
arguments = args[:arguments]
if arguments
if (keyspace_name = arguments[:keyspace_name])
payload[:keyspace_name] = keyspace_name
end
if (column_family_name = arguments[:column_family_name])
payload[:column_family_name] = column_family_name
end
end
begin
execute_arguments = generator.call(arguments)
payload[:cql] = execute_arguments[0]
payload[:cql_variables] = execute_arguments[1..-1]
result = @retry_strategy.execute(payload) { @driver.execute(*execute_arguments) }
transformer = @result_transformers.fetch(command, Mirror)
transformed_result = transformer.call(result, args[:transformer_arguments])
payload[:result] = transformed_result
rescue StandardError => e
raise Cassanity::Error
end
transformed_result
end
end
# Public
def inspect
attributes = [
"driver=#{@driver.inspect}",
]
"#<#{self.class.name}:#{object_id} #{attributes.join(', ')}>"
end
end
end
end