-
Notifications
You must be signed in to change notification settings - Fork 26
/
ScalikeJDBCWriteJournal.scala
107 lines (97 loc) · 4.84 KB
/
ScalikeJDBCWriteJournal.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package akka.persistence.journal.sqlasync
import akka.persistence.common.StoragePlugin
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.{AtomicWrite, PersistentRepr}
import scalikejdbc._
import scalikejdbc.async._
import scala.collection.immutable
import scala.concurrent.Future
import scala.util.{Success, Try}
private[sqlasync] trait ScalikeJDBCWriteJournal extends AsyncWriteJournal with StoragePlugin {
private[this] lazy val journalTable = {
val tableName = extension.config.journalTableName
SQLSyntaxSupportFeature.verifyTableName(tableName)
SQLSyntax.createUnsafely(tableName)
}
protected[this] def updateSequenceNr(persistenceId: String, sequenceNr: Long)(implicit session: TxAsyncDBSession): Future[Unit]
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
def serialize(keys: Map[String, Long]): (SQLSyntax, immutable.Seq[Try[Unit]]) = {
val result = messages.map { write =>
write.payload.foldLeft(Try(Vector.empty[SQLSyntax])) { (acc, x) =>
for {
a <- acc
bytes <- serialization.serialize(x)
} yield a :+ sqls"(${keys(x.persistenceId)}, ${x.sequenceNr}, $bytes)"
}
}
val batch = result.collect {
case Success(x) => x
}.flatten
(sqls.csv(batch: _*), result.map(_.map(_ => ())))
}
log.debug("Write messages, {}", messages)
if (messages.isEmpty) {
Future.successful(Nil)
} else {
sessionProvider.localTx { implicit session =>
val persistenceIds = messages.map(_.persistenceId).toSet
for {
keys <- persistenceIds.foldLeft(Future.successful(Map.empty[String, Long])) { (acc, id) =>
for {
map <- acc
persistenceKey <- surrogateKeyOf(id)
} yield map.updated(id, persistenceKey)
}
(batch, result) = serialize(keys)
sql = sql"INSERT INTO $journalTable (persistence_key, sequence_nr, message) VALUES $batch"
_ <- logging(sql).update().future()
} yield result
}
}
}
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
log.debug("Delete messages, persistenceId = {}, toSequenceNr = {}", persistenceId, toSequenceNr)
sessionProvider.localTx { implicit session =>
for {
key <- surrogateKeyOf(persistenceId)
select = sql"SELECT sequence_nr FROM $journalTable WHERE persistence_key = $key ORDER BY sequence_nr DESC LIMIT 1"
highest <- logging(select).map(_.long("sequence_nr")).single().future().map(_.getOrElse(0L))
delete = sql"DELETE FROM $journalTable WHERE persistence_key = $key AND sequence_nr <= $toSequenceNr"
_ <- logging(delete).update().future()
_ <- if (highest <= toSequenceNr) updateSequenceNr(persistenceId, highest) else Future.successful(())
} yield ()
}
}
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = {
log.debug("Replay messages, persistenceId = {}, fromSequenceNr = {}, toSequenceNr = {}", persistenceId, fromSequenceNr, toSequenceNr)
sessionProvider.localTx { implicit session =>
for {
key <- surrogateKeyOf(persistenceId)
sql = sql"SELECT message FROM $journalTable WHERE persistence_key = $key AND sequence_nr >= $fromSequenceNr AND sequence_nr <= $toSequenceNr ORDER BY sequence_nr ASC LIMIT $max"
_ <- logging(sql).map(_.bytes("message")).list().future().map { messages =>
messages.foreach { bytes =>
val message = serialization.deserialize(bytes, classOf[PersistentRepr]).get
replayCallback(message)
}
}
} yield ()
}
}
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
log.debug("Read the highest sequence number, persistenceId = {}, fromSequenceNr = {}", persistenceId, fromSequenceNr)
sessionProvider.localTx { implicit session =>
val fromMetadataTable = sql"SELECT persistence_key, sequence_nr FROM $metadataTable WHERE persistence_id = $persistenceId"
logging(fromMetadataTable).map { result =>
(result.long("persistence_key"), result.long("sequence_nr"))
}.single().future().flatMap {
case None => Future.successful(fromSequenceNr) // No persistent record exists.
case Some((key, fromMetadata)) =>
val fromJournalTable = sql"SELECT sequence_nr FROM $journalTable WHERE persistence_key = $key ORDER BY sequence_nr DESC LIMIT 1"
logging(fromJournalTable).map(_.long("sequence_nr")).single().future().map {
case Some(fromJournal) => fromJournal max fromMetadata max fromSequenceNr
case None => fromMetadata max fromSequenceNr
}
}
}
}
}