-
Notifications
You must be signed in to change notification settings - Fork 98
/
transaction.rb
323 lines (291 loc) · 9.79 KB
/
transaction.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
module RDF
##
# An RDF transaction.
#
# Transactions provide an ACID scope for queries and mutations.
#
# Repository implementations may provide support for transactional updates
# by providing an atomic implementation of {Mutable#apply_changeset} and
# responding to `#supports?(:atomic_write)` with `true`.
#
# We carefully distinguish between read-only and read/write transactions,
# in order to enable repository implementations to take out the
# appropriate locks for concurrency control. Transactions are read-only
# by default; mutability must be explicitly requested on construction in
# order to obtain a read/write transaction.
#
# Individual repositories may make their own sets of guarantees within the
# transaction's scope. In case repository implementations should be unable
# to provide full ACID guarantees for transactions, that must be clearly
# indicated in their documentation. If update atomicity is not provided,
# `#supports?(:atomic_write)` must respond `false`.
#
# @example Executing a read-only transaction
# repository = RDF::Repository.new
#
# RDF::Transaction.begin(repository) do |tx|
# tx.query(predicate: RDF::Vocab::DOAP.developer) do |statement|
# puts statement.inspect
# end
# end
#
# @example Executing a read/write transaction
# repository = RDF::Repository.new
#
# RDF::Transaction.begin(repository, mutable: true) do |tx|
# subject = RDF::URI("http://example.org/article")
# tx.delete [subject, RDF::RDFS.label, "Old title"]
# tx.insert [subject, RDF::RDFS.label, "New title"]
# end
#
# The base class provides an atomic write implementation depending on
# {RDF::Changeset} and using {Changeset#apply}. Custom {Repository} classes
# can implement a minimial write-atomic transactions by overriding
# {#apply_changeset}.
#
# Reads within a transaction run against the live repository by default
# ({#isolation_level} is `:read_committed`). Repositories may provide support
# for snapshots by implementing {Repository#snapshot} and responding `true` to
# `#supports?(:snapshots)`. In this case, the transaction will use the
# {RDF::Dataset} returned by {#snapshot} for reads (`:repeatable_read`).
#
# For datastores that support transactions natively, implementation of a
# custom {Transaction} subclass is recommended. The {Repository} is
# responsible for specifying snapshot support and isolation level as
# appropriate. Note that repositories may provide the snapshot isolation level
# without implementing {#snapshot}.
#
# @example A repository with a custom transaction class
# class MyRepository < RDF::Repository
# DEFAULT_TX_CLASS = MyTransaction
# # ...
# # custom repository logic
# # ...
# end
#
# @see RDF::Changeset
# @see RDF::Mutable#apply_changeset
# @since 0.3.0
class Transaction
include RDF::Mutable
include RDF::Enumerable
include RDF::Queryable
##
# @see RDF::Enumerable#each
def each(*args, &block)
read_target.each(*args, &block)
end
##
# Executes a transaction against the given RDF repository.
#
# @param [RDF::Repository] repository
# @param [Boolean] mutable (false)
# Whether this is a read-only or read/write transaction.
# @param [Hash{Symbol => Object}] options
# @yield [tx]
# @yieldparam [RDF::Transaction] tx
# @return [void]
def self.begin(repository, mutable: false, **options, &block)
self.new(repository, mutable: mutable, **options, &block)
end
##
# The repository being operated upon.
#
# @return [RDF::Repository]
# @since 2.0.0
attr_reader :repository
##
# The default graph name to apply to statements inserted or deleted by the
# transaction.
#
# @return [RDF::Resource, nil]
# @since 2.0.0
attr_reader :graph_name
##
# RDF statement mutations to apply when executed.
#
# @return [RDF::Changeset]
# @since 2.0.0
attr_reader :changes
##
# Any additional options for this transaction.
#
# @return [Hash{Symbol => Object}]
attr_reader :options
##
# Initializes this transaction.
#
# @param [Hash{Symbol => Object}] options
# @param [Boolean] mutable (false)
# Whether this is a read-only or read/write transaction.
# @yield [tx]
# @yieldparam [RDF::Transaction] tx
def initialize(repository, graph_name: nil, mutable: false, **options, &block)
@repository = repository
@snapshot =
repository.supports?(:snapshots) ? repository.snapshot : repository
@options = options.dup
@mutable = mutable
@graph_name = graph_name
raise TransactionError,
'Tried to open a mutable transaction on an immutable repository' if
@mutable && !@repository.mutable?
@changes = RDF::Changeset.new
if block_given?
case block.arity
when 1 then block.call(self)
else self.instance_eval(&block)
end
end
end
##
# @see RDF::Dataset#isolation_level
def isolation_level
return :repeatable_read if repository.supports?(:snapshots)
:read_committed
end
##
# Indicates whether the transaction includes changes relative to the target
# repository's state at transaction start time.
#
# The response is guaranteed to be `true` if executing the transaction
# against the original repository state would cause a change. It may also
# return `true` in cases where the repository would not change (e.g.
# because the transaction would insert statements already present).
#
# @note `Transaction` implementers may choose to `NotImplementedError`
# if the transaction implementation cannot be implemented efficiently.
#
# @return [Boolean] true if the transaction has mutated (insert/delete)
# since transaction start time
#
# @raise [NotImplementedError] if a mutation check is not implemented
def mutated?
return !changes.empty? if self.class == Transaction
raise NotImplementedError,
'#mutated? is not implemented for #{self.class}'
end
##
# Returns `true` if this is a read/write transaction, `false` otherwise.
#
# @return [Boolean]
# @see RDF::Writable#writable?
def writable?
@mutable
end
##
# Returns `true` if this is a read/write transaction, `false` otherwise.
#
# @return [Boolean]
# @see RDF::Writable#mutable?
def mutable?
@mutable
end
##
# Returns `true` to indicate that this transaction is readable.
#
# @return [Boolean]
# @see RDF::Readable#readable?
def readable?
true
end
##
# @see RDF::Enumerable#has_statement?
def has_statement?(statement)
read_target.has_statement?(statement)
end
##
# Returns a developer-friendly representation of this transaction.
#
# @return [String]
def inspect
sprintf("#<%s:%#0x(changes: -%d/+%d)>", self.class.name,
self.__id__, self.changes.deletes.count, self.changes.inserts.count)
end
##
# Outputs a developer-friendly representation of this transaction to
# `stderr`.
#
# @return [void]
def inspect!
$stderr.puts(inspect)
end
##
# Executes the transaction
#
# @return [Boolean] `true` if the changes are successfully applied.
# @raise [TransactionError] if the transaction can't be applied
def execute
raise TransactionError, 'Cannot execute a rolled back transaction. ' \
'Open a new one instead.' if @rolledback
@changes.apply(@repository)
end
##
# Rolls back the transaction
#
# @note: the base class simply replaces its current `Changeset` with a
# fresh one. Other implementations may need to explictly rollback
# at the supporting datastore.
#
# @note: clients should not rely on using same transaction instance after
# rollback.
#
# @return [Boolean] `true` if the changes are successfully applied.
def rollback
@changes = RDF::Changeset.new
@rolledback = true
end
protected
##
# Appends an RDF statement to the sequence to insert when executed.
#
# @param [RDF::Statement] statement
# @return [void]
# @see RDF::Writable#insert_statement
def insert_statement(statement)
@changes.insert(process_statement(statement))
end
##
# Appends an RDF statement to the sequence to delete when executed.
#
# @param [RDF::Statement] statement
# @return [void]
# @see RDF::Mutable#delete_statement
def delete_statement(statement)
@changes.delete(process_statement(statement))
end
def query_pattern(*args, &block)
read_target.send(:query_pattern, *args, &block)
end
def query_execute(*args, &block)
read_target.send(:query_execute, *args, &block)
end
undef_method :load, :update, :clear
private
##
# @private Adds the default graph_name to the statement, when one it does
# not already have one.
#
# @param statement [RDF::Statement]
# @return [RDF::Statement]
def process_statement(statement)
if graph_name
statement = statement.dup
statement.graph_name = graph_name
end
statement
end
def read_target
return @snapshot if graph_name.nil?
return @snapshot.project_graph(nil) if graph_name == false
@snapshot.project_graph(graph_name)
end
public
##
# An error class for transaction failures.
#
# This error indicates that the transaction semantics have been violated in
# some way.
class TransactionError < RuntimeError; end
end # Transaction
end # RDF