-
Notifications
You must be signed in to change notification settings - Fork 9
/
InMemoryStateStoreActor.java
274 lines (234 loc) · 11.2 KB
/
InMemoryStateStoreActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
// Copyright © 2012-2020 VLINGO LABS. All rights reserved.
//
// This Source Code Form is subject to the terms of the
// Mozilla Public License, v. 2.0. If a copy of the MPL
// was not distributed with this file, You can obtain
// one at https://mozilla.org/MPL/2.0/.
package io.vlingo.symbio.store.state.inmemory;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import io.vlingo.actors.Actor;
import io.vlingo.actors.ActorInstantiator;
import io.vlingo.actors.Definition;
import io.vlingo.common.Completes;
import io.vlingo.common.Failure;
import io.vlingo.common.Success;
import io.vlingo.reactivestreams.Stream;
import io.vlingo.symbio.BaseEntry;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.EntryAdapterProvider;
import io.vlingo.symbio.Metadata;
import io.vlingo.symbio.Source;
import io.vlingo.symbio.State;
import io.vlingo.symbio.StateAdapterProvider;
import io.vlingo.symbio.store.QueryExpression;
import io.vlingo.symbio.store.Result;
import io.vlingo.symbio.store.StorageException;
import io.vlingo.symbio.store.dispatch.Dispatchable;
import io.vlingo.symbio.store.dispatch.Dispatcher;
import io.vlingo.symbio.store.dispatch.DispatcherControl;
import io.vlingo.symbio.store.dispatch.DispatcherControl.DispatcherControlInstantiator;
import io.vlingo.symbio.store.dispatch.control.DispatcherControlActor;
import io.vlingo.symbio.store.dispatch.inmemory.InMemoryDispatcherControlDelegate;
import io.vlingo.symbio.store.state.StateStore;
import io.vlingo.symbio.store.state.StateStoreEntryReader;
import io.vlingo.symbio.store.state.StateStream;
import io.vlingo.symbio.store.state.StateTypeStateStoreMap;
public class InMemoryStateStoreActor<RS extends State<?>> extends Actor
implements StateStore {
private final List<Dispatchable<Entry<?>,RS>> dispatchables;
private final List<Dispatcher<Dispatchable<Entry<?>,RS>>> dispatchers;
private final DispatcherControl dispatcherControl;
private final List<Entry<?>> entries;
private final Map<String,StateStoreEntryReader<?>> entryReaders;
private final EntryAdapterProvider entryAdapterProvider;
private final StateAdapterProvider stateAdapterProvider;
private final Map<String, Map<String, RS>> store;
@SuppressWarnings({ "unchecked", "rawtypes" })
public InMemoryStateStoreActor(
final List<Dispatcher<Dispatchable<Entry<?>, RS>>> dispatchers,
final long checkConfirmationExpirationInterval,
final long confirmationExpiration) {
if (dispatchers == null) {
throw new IllegalArgumentException("Dispatcher must not be null.");
}
this.dispatchers = dispatchers;
this.entryAdapterProvider = EntryAdapterProvider.instance(stage().world());
this.stateAdapterProvider = StateAdapterProvider.instance(stage().world());
this.entries = new CopyOnWriteArrayList<>();
this.entryReaders = new HashMap<>();
this.store = new HashMap<>();
this.dispatchables = new CopyOnWriteArrayList<>();
final InMemoryDispatcherControlDelegate<Entry<?>, RS> dispatcherControlDelegate = new InMemoryDispatcherControlDelegate<>(dispatchables);
this.dispatcherControl = stage().actorFor(
DispatcherControl.class,
Definition.has(
DispatcherControlActor.class,
new DispatcherControlInstantiator(
dispatchers,
dispatcherControlDelegate,
checkConfirmationExpirationInterval,
confirmationExpiration)));
}
// public InMemoryStateStoreActor(
// final Dispatcher<Dispatchable<Entry<?>, RS>> dispatcher,
// final long checkConfirmationExpirationInterval,
// final long confirmationExpiration) {
// this(Arrays.asList(dispatcher), checkConfirmationExpirationInterval, confirmationExpiration);
// }
public InMemoryStateStoreActor(final List<Dispatcher<Dispatchable<Entry<?>, RS>>> dispatchers) {
this(dispatchers, 1000L, 1000L);
}
// public InMemoryStateStoreActor(final Dispatcher<Dispatchable<Entry<?>, RS>> dispatcher) {
// this(dispatcher, 1000L, 1000L);
// }
@Override
public void stop() {
if (dispatcherControl != null) {
dispatcherControl.stop();
}
super.stop();
}
@Override
@SuppressWarnings("unchecked")
public <ET extends Entry<?>> Completes<StateStoreEntryReader<ET>> entryReader(final String name) {
StateStoreEntryReader<?> reader = entryReaders.get(name);
if (reader == null) {
reader = childActorFor(StateStoreEntryReader.class, Definition.has(InMemoryStateStoreEntryReaderActor.class, new StateStoreEntryReaderInstantiator(entries, name)));
entryReaders.put(name, reader);
}
return completes().with((StateStoreEntryReader<ET>) reader);
}
@Override
public void read(final String id, final Class<?> type, final ReadResultInterest interest, final Object object) {
readFor(id, type, interest, object);
}
@Override
public Completes<Stream> streamAllOf(final Class<?> stateType) {
final String storeName = StateTypeStateStoreMap.storeNameFrom(stateType);
return completes().with(new StateStream<>(stage(), store.get(storeName), stateAdapterProvider));
}
@Override
public Completes<Stream> streamSomeUsing(final QueryExpression query) {
// TODO Auto-generated method stub
return null;
}
@Override
public <S,C> void write(final String id, final S state, final int stateVersion, final List<Source<C>> sources, final Metadata metadata, final WriteResultInterest interest, final Object object) {
writeWith(id, state, stateVersion, sources, metadata, interest, object);
}
private void readFor(final String id, final Class<?> type, final ReadResultInterest interest, final Object object) {
if (interest != null) {
if (id == null || type == null) {
interest.readResultedIn(Failure.of(new StorageException(Result.Error, id == null ? "The id is null." : "The type is null.")), id, null, -1, null, object);
return;
}
final String storeName = StateTypeStateStoreMap.storeNameFrom(type);
if (storeName == null) {
interest.readResultedIn(Failure.of(new StorageException(Result.NoTypeStore, "No type store for: " + type.getSimpleName())), id, null, -1, null, object);
return;
}
final Map<String, RS> typeStore = store.get(storeName);
if (typeStore == null) {
interest.readResultedIn(Failure.of(new StorageException(Result.NotFound, "Store not found: " + storeName)), id, null, -1, null, object);
return;
}
final RS raw = typeStore.get(id);
if (raw != null) {
final Object state = stateAdapterProvider.fromRaw(raw);
interest.readResultedIn(Success.of(Result.Success), id, state, raw.dataVersion, raw.metadata, object);
} else {
for (final String storeId : typeStore.keySet()) {
logger().debug("UNFOUND STATES\n=====================");
logger().debug("STORE ID: '" + storeId + "' STATE: " + typeStore.get(storeId));
}
interest.readResultedIn(Failure.of(new StorageException(Result.NotFound, "Not found.")), id, null, -1, null, object);
}
} else {
logger().warn(
getClass().getSimpleName() +
" readText() missing ReadResultInterest for: " +
(id == null ? "unknown id" : id));
}
}
private <S,C> void writeWith(final String id, final S state, final int stateVersion, final List<Source<C>> sources, final Metadata metadata, final WriteResultInterest interest, final Object object) {
if (interest != null) {
if (state == null) {
interest.writeResultedIn(Failure.of(new StorageException(Result.Error, "The state is null.")), id, state, stateVersion, sources, object);
} else {
try {
final String storeName = StateTypeStateStoreMap.storeNameFrom(state.getClass());
if (storeName == null) {
interest.writeResultedIn(Failure.of(new StorageException(Result.NoTypeStore, "No type store for: " + state.getClass())), id, state, stateVersion, sources, object);
return;
}
Map<String, RS> typeStore = store.get(storeName);
if (typeStore == null) {
typeStore = new HashMap<>();
final Map<String, RS> existingTypeStore = store.putIfAbsent(storeName, typeStore);
if (existingTypeStore != null) {
typeStore = existingTypeStore;
}
}
final RS raw = metadata == null ?
stateAdapterProvider.asRaw(id, state, stateVersion) :
stateAdapterProvider.asRaw(id, state, stateVersion, metadata);
final RS persistedState = typeStore.putIfAbsent(raw.id, raw);
if (persistedState != null) {
if (persistedState.dataVersion >= raw.dataVersion) {
interest.writeResultedIn(Failure.of(new StorageException(Result.ConcurrencyViolation, "Version conflict.")), id, state, stateVersion, sources, object);
return;
}
}
typeStore.put(id, raw);
final List<Entry<?>> entries = appendEntries(sources, stateVersion, metadata);
dispatch(id, storeName, raw, entries);
interest.writeResultedIn(Success.of(Result.Success), id, state, stateVersion, sources, object);
} catch (final Exception e) {
logger().error(getClass().getSimpleName() + " writeText() error because: " + e.getMessage(), e);
interest.writeResultedIn(Failure.of(new StorageException(Result.Error, e.getMessage(), e)), id, state, stateVersion, sources, object);
}
}
} else {
logger().warn(
getClass().getSimpleName() +
" writeText() missing WriteResultInterest for: " +
(state == null ? "unknown id" : id));
}
}
private <C> List<Entry<?>> appendEntries(final List<Source<C>> sources, final int stateVersion, final Metadata metadata) {
final List<Entry<?>> adapted = entryAdapterProvider.asEntries(sources, stateVersion, metadata);
for (final Entry<?> each : adapted) {
((BaseEntry<?>) each).__internal__setId(String.valueOf(entries.size()));
entries.add(each);
}
return adapted;
}
private void dispatch(final String id, final String storeName, final RS raw, final List<Entry<?>> entries) {
final String dispatchId = storeName + ":" + id;
final Dispatchable<Entry<?>, RS> dispatchable = new Dispatchable<>(dispatchId, LocalDateTime.now(), raw, entries);
this.dispatchables.add(dispatchable);
this.dispatchers.forEach(p -> p.dispatch(dispatchable));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static class StateStoreEntryReaderInstantiator implements ActorInstantiator<InMemoryStateStoreEntryReaderActor> {
private static final long serialVersionUID = 8463366612347915854L;
final String name;
final List<Entry<?>> entries;
StateStoreEntryReaderInstantiator(final List<Entry<?>> entries, final String name) {
this.entries = entries;
this.name = name;
}
@Override
public InMemoryStateStoreEntryReaderActor instantiate() {
return new InMemoryStateStoreEntryReaderActor(entries, name);
}
@Override
public Class<InMemoryStateStoreEntryReaderActor> type() {
return InMemoryStateStoreEntryReaderActor.class;
}
}
}