This repository has been archived by the owner on May 27, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
/
ScalaReadJournal.scala
105 lines (90 loc) · 4.13 KB
/
ScalaReadJournal.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*
* Copyright © 2017 Safety Data - CFH SAS.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka
package persistence
package query
package journal
package redis
import actor._
import stream.scaladsl._
import akka.persistence.redis._
import RedisKeys._
import scaladsl._
import com.typesafe.config.Config
import _root_.redis._
import scala.concurrent.duration._
class ScalaReadJournal private[redis] (system: ExtendedActorSystem, conf: Config) extends ReadJournal
with EventsByTagQuery
with EventsByPersistenceIdQuery
with PersistenceIdsQuery
with CurrentPersistenceIdsQuery
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery {
val redis = RedisUtils.create(conf)(system)
/** Returns the live stream of persisted identifiers.
* Identifiers may appear several times in the stream.
*/
def persistenceIds(): Source[String, NotUsed] = {
Source.fromGraph(new PersistenceIdsSource(conf, redis, system))
}
/** Returns the stream of current persisted identifiers.
* This stream is not live, once the identifiers were all returned, it is closed.
*/
def currentPersistenceIds(): Source[String, NotUsed] =
Source.fromGraph(new CurrentPersistenceIdsSource(redis))
/** Returns the live stream of events for the given `persistenceId`.
* Events are ordered by `sequenceNr`.
* When the `toSequenceNr` has been delivered, the stream is closed.
*/
def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
Source.fromGraph(new EventsByPersistenceIdSource(conf, redis, persistenceId, fromSequenceNr, toSequenceNr, system, true))
/** Returns the stream of current events for the given `persistenceId`.
* Events are ordered by `sequenceNr`.
* When the `toSequenceNr` has been delivered or no more elements are available at the current time, the stream is closed.
*/
def currentEventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
Source.fromGraph(new EventsByPersistenceIdSource(conf, redis, persistenceId, fromSequenceNr, toSequenceNr, system, false))
/** Returns the live stream of events with a given tag.
* The events are sorted in the order they occurred, you can rely on it.
*/
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = offset match {
case NoOffset =>
Source.fromGraph(new EventsByTagSource(conf, redis, tag, 0l, system, true))
case Sequence(offsetValue) =>
Source.fromGraph(new EventsByTagSource(conf, redis, tag, offsetValue, system, true))
case _ =>
throw new IllegalArgumentException("Redis does not support " + offset.getClass.getName + " offsets")
}
/** Returns the stream of current events with a given tag.
* The events are sorted in the order they occurred, you can rely on it.
*
* Returned events are those present in the store with the given tag at the time
* the stream is opened.
*
* Events deleted during this stream life might not appear in the stream if not delivered yet.
*
* Stream is closed once all events present at the time of opening have been delivered.
*
*/
def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = offset match {
case NoOffset =>
Source.fromGraph(new EventsByTagSource(conf, redis, tag, 0l, system, false))
case Sequence(offsetValue) =>
Source.fromGraph(new EventsByTagSource(conf, redis, tag, offsetValue, system, false))
case _ =>
throw new IllegalArgumentException("Redis does not support " + offset.getClass.getName + " offsets")
}
}