forked from RailsEventStore/rails_event_store
-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.rb
105 lines (86 loc) · 2.93 KB
/
events.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
module RubyEventStore
module ROM
module Repositories
class Events < ::ROM::Repository[:events]
class CreateEventsChangeset < ::ROM::Changeset::Create
# Convert to Hash
map(&:to_h)
map do
rename_keys event_id: :id
accept_keys %i[id data metadata event_type]
end
end
POSITION_SHIFT = 1.freeze
# relations :stream_entries
# commands :create
# struct_namespace Entities
# auto_struct false
### Writer interface
def create(serialized_records, stream: nil, expected_version: nil)
events.transaction(savepoint: true) do
events.changeset(CreateEventsChangeset, serialized_records).commit.tap do
if stream
link(
serialized_records.map(&:event_id),
stream,
expected_version || ExpectedVersion.any,
global_stream: true
)
end
end
end
end
def link(event_ids, stream, expected_version, global_stream: nil)
(event_ids - events.by_pks(event_ids).pluck(:id)).each do |id|
raise EventNotFound.new(id)
end
resolved_version = expected_version.resolve_for(stream, ->(stream) {
(stream_entries.max_position(stream) || {})[:position]
})
tuples = []
event_ids.each_with_index do |event_id, index|
tuples << {
stream: stream.name,
position: compute_position(resolved_version, index),
event_id: event_id
} unless stream.global?
tuples << {
stream: GLOBAL_STREAM,
event_id: event_id
} if global_stream
end
stream_entries.changeset(:create, tuples).commit
end
### Reader interface
def exist?(event_id)
events.by_pk(event_id).exist?
rescue Sequel::DatabaseError => ex
return false if ex.message =~ /PG::InvalidTextRepresentation.*uuid/
raise
end
def by_id(event_id)
events.map_with(:serialized_record_mapper).by_pk(event_id).one!
end
def read(direction, stream, from: :head, limit: nil)
unless from.equal?(:head)
offset_entry_id = stream_entries.by_stream_and_event_id(stream, from)[:id]
end
Mappers::SerializedRecord.new.call(
stream_entries
.ordered(direction, stream, offset_entry_id)
.limit(limit)
.combine(:event)
.to_a
.map(&:event)
)
rescue ::ROM::TupleCountMismatchError
raise EventNotFound.new(from)
end
private
def compute_position(version, offset)
version + offset + POSITION_SHIFT if version
end
end
end
end
end