Skip to content

Commit

Permalink
Merge pull request #8 from Unisay/master
Browse files Browse the repository at this point in the history
[#7] Truncate journal and snapshot store
  • Loading branch information
jdgoldie committed Jul 13, 2015
2 parents d2b3892 + d864bfd commit 28038b5
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 6 deletions.
Expand Up @@ -28,13 +28,21 @@ import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.util._


object InMemoryMessageStore {
private val mm = new HashMap[String, Set[PersistentRepr]] with MultiMap[String, PersistentRepr]

def truncate(): Unit =
mm.clear()
}

/**
* A simple class that provides some basic CRUD functions on a HashMap + MultiMap to support
* journal plugins.
*/
trait InMemoryMessageStore {

val mm = new HashMap[String, Set[PersistentRepr]] with MultiMap[String, PersistentRepr]
import InMemoryMessageStore.mm

/**
* Adds messages to the multi map for the given persistence Id
Expand Down Expand Up @@ -99,7 +107,7 @@ trait InMemoryMessageStore {

/** Updates messages for a given persistenceId using the supplied function
*
* @param persistenceId
* @param persistenceId
* @param p
* @return
*/
Expand Down Expand Up @@ -129,7 +137,7 @@ trait InMemoryJournalBase extends InMemoryMessageStore {
}

/**
* Implelemtation of AsyncWriteJournal backed by the InMemoryMessage store
* Implementation of AsyncWriteJournal backed by the InMemoryMessage store
*/
class InMemoryJournal extends InMemoryJournalBase with AsyncWriteJournal with AsyncRecovery with ActorLogging {

Expand Down
Expand Up @@ -27,13 +27,21 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future


object InMemorySnapshotStore {

//SelectedSnapshot might not be the best name, but has meta + payload and is already defined
private val ss = new HashMap[String, Set[SelectedSnapshot]] with MultiMap[String, SelectedSnapshot]

def truncate(): Unit =
ss.clear()
}

/**
* Supports SnapshotAPI implementations backed by a MultiMap
*/
class InMemorySnapshotStore extends SnapshotStore with ActorLogging {

//SelectedSnapshot might not be the best name, but has meta + payload and is already defined
val ss = new HashMap[String, Set[SelectedSnapshot]] with MultiMap[String, SelectedSnapshot]
import InMemorySnapshotStore.ss

/**
* Finds the youngest snapshot that matches selection criteria.
Expand Down
Expand Up @@ -16,7 +16,9 @@

package akka.persistence.inmem.journal

import akka.persistence.JournalProtocol.{ReplayMessages, ReplayMessagesSuccess}
import akka.persistence.journal.{JournalPerfSpec, JournalSpec}
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory

class InMemoryJournalSpec extends JournalSpec with JournalPerfSpec {
Expand All @@ -33,6 +35,19 @@ class InMemoryJournalSpec extends JournalSpec with JournalPerfSpec {
| }
|}
""".stripMargin)

"A journal" must {

"truncate all messages" in {
val receiverProbe = TestProbe()

InMemoryMessageStore.truncate()
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)

receiverProbe.expectMsg(ReplayMessagesSuccess)
}
}

}


Expand Up @@ -16,13 +16,35 @@

package akka.persistence.inmem.snapshot

import akka.persistence.SnapshotProtocol.{LoadSnapshot, LoadSnapshotResult}
import akka.persistence.SnapshotSelectionCriteria
import akka.persistence.snapshot.SnapshotStoreSpec
import com.typesafe.config.{ConfigFactory, Config}
import akka.testkit.TestProbe
import com.typesafe.config.{Config, ConfigFactory}

class InMemorySnapshotSpec extends SnapshotStoreSpec {
override lazy val config: Config = ConfigFactory.parseString(
"""
|akka.persistence.snapshot-store.plugin = "akka.persistence.inmem.snapshot-store"
|akka.persistence.journal.plugin = "akka.persistence.inmem.journal"
""".stripMargin)


override protected def beforeEach(): Unit = {
InMemorySnapshotStore.truncate()
super.beforeEach()
}

"A snapshot store" must {

"not load a snapshot given store is truncated" in {
val senderProbe = TestProbe()

InMemorySnapshotStore.truncate()

snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
}
}

}

0 comments on commit 28038b5

Please sign in to comment.