Skip to content
Permalink
Browse files

Make journal type params more expressive (#43)

* Change key type name for consistency reason

* Polish
  • Loading branch information...
Leammas authored and notxcain committed Apr 8, 2019
1 parent 09e6927 commit fe5e903559411cba1024ce5b5cc18957b05fcf3f
@@ -11,18 +11,18 @@ import akka.stream.scaladsl.Source
import scala.concurrent.Future

private[akkapersistence] final class AkkaPersistenceEventJournalQuery[
O, I: KeyDecoder, E: PersistentDecoder
O, K: KeyDecoder, E: PersistentDecoder
](adapter: JournalAdapter[O])
extends JournalQuery[O, I, E] {
extends JournalQuery[O, K, E] {

private val decoder = PersistentDecoder[E]
private val keyDecoder = KeyDecoder[I]
private val keyDecoder = KeyDecoder[K]

private val readJournal = adapter.createReadJournal

private def createSource(
inner: Source[EventEnvelope, NotUsed]
): Source[JournalEntry[O, I, E], NotUsed] =
): Source[JournalEntry[O, K, E], NotUsed] =
inner.mapAsync(1) {
case EventEnvelope(offset, persistenceId, sequenceNr, event) =>
offset match {
@@ -63,14 +63,14 @@ private[akkapersistence] final class AkkaPersistenceEventJournalQuery[
}
}

def eventsByTag(tag: EventTag, offset: Option[O]): Source[JournalEntry[O, I, E], NotUsed] =
def eventsByTag(tag: EventTag, offset: Option[O]): Source[JournalEntry[O, K, E], NotUsed] =
createSource(
readJournal
.eventsByTag(tag.value, adapter.journalOffset(offset))
)

override def currentEventsByTag(tag: EventTag,
offset: Option[O]): Source[JournalEntry[O, I, E], NotUsed] =
offset: Option[O]): Source[JournalEntry[O, K, E], NotUsed] =
createSource(
readJournal
.currentEventsByTag(tag.value, adapter.journalOffset(offset))
@@ -7,14 +7,14 @@ import akka.NotUsed
import akka.stream.scaladsl.Source
import cats.effect.Effect

final class CommittableEventJournalQuery[F[_]: Effect, O, I, E] private[akkapersistence] (
underlying: JournalQuery[O, I, E],
final class CommittableEventJournalQuery[F[_]: Effect, O, K, E] private[akkapersistence] (
underlying: JournalQuery[O, K, E],
offsetStore: KeyValueStore[F, TagConsumer, O]
) {

private def mkCommittableSource(tag: EventTag,
consumerId: ConsumerId,
inner: Option[O] => Source[JournalEntry[O, I, E], NotUsed]) = {
inner: Option[O] => Source[JournalEntry[O, K, E], NotUsed]) = {
val tagConsumerId = TagConsumer(tag, consumerId)
Source
.single(NotUsed)
@@ -26,20 +26,20 @@ final class CommittableEventJournalQuery[F[_]: Effect, O, I, E] private[akkapers
}

def eventsByTag(tag: EventTag,
consumerId: ConsumerId): Source[Committable[F, JournalEntry[O, I, E]], NotUsed] =
consumerId: ConsumerId): Source[Committable[F, JournalEntry[O, K, E]], NotUsed] =
mkCommittableSource(tag, consumerId, underlying.eventsByTag(tag, _))

def currentEventsByTag(
tag: EventTag,
consumerId: ConsumerId
): Source[Committable[F, JournalEntry[O, I, E]], NotUsed] =
): Source[Committable[F, JournalEntry[O, K, E]], NotUsed] =
mkCommittableSource(tag, consumerId, underlying.currentEventsByTag(tag, _))
}

private[akkapersistence] object CommittableEventJournalQuery {
def apply[F[_]: Effect, Offset, I, E](
underlying: JournalQuery[Offset, I, E],
def apply[F[_]: Effect, Offset, K, E](
underlying: JournalQuery[Offset, K, E],
offsetStore: KeyValueStore[F, TagConsumer, Offset]
): CommittableEventJournalQuery[F, Offset, I, E] =
): CommittableEventJournalQuery[F, Offset, K, E] =
new CommittableEventJournalQuery(underlying, offsetStore)
}
@@ -12,26 +12,26 @@ final case class JournalEntry[O, K, A](offset: O, event: EntityEvent[K, A]) {
}

object JournalEntry {
implicit def aecorHasInstanceForEvent[X, O, I, A](
implicit A: Has[EntityEvent[I, A], X]
): Has[JournalEntry[O, I, A], X] =
implicit def aecorHasInstanceForEvent[X, O, K, A](
implicit A: Has[EntityEvent[K, A], X]
): Has[JournalEntry[O, K, A], X] =
A.contramap(_.event)

implicit def aecorHasInstanceForOffset[X, O, I, A](
implicit def aecorHasInstanceForOffset[X, O, K, A](
implicit A: Has[O, X]
): Has[JournalEntry[O, I, A], X] = A.contramap(_.offset)
): Has[JournalEntry[O, K, A], X] = A.contramap(_.offset)

}

trait JournalQuery[Offset, I, E] {
trait JournalQuery[O, K, E] {
def eventsByTag(tag: EventTag,
offset: Option[Offset]): Source[JournalEntry[Offset, I, E], NotUsed]
offset: Option[O]): Source[JournalEntry[O, K, E], NotUsed]

def currentEventsByTag(tag: EventTag,
offset: Option[Offset]): Source[JournalEntry[Offset, I, E], NotUsed]
offset: Option[O]): Source[JournalEntry[O, K, E], NotUsed]

def committable[F[_]: Effect](
offsetStore: KeyValueStore[F, TagConsumer, Offset]
): CommittableEventJournalQuery[F, Offset, I, E] =
offsetStore: KeyValueStore[F, TagConsumer, O]
): CommittableEventJournalQuery[F, O, K, E] =
new CommittableEventJournalQuery(this, offsetStore)
}

0 comments on commit fe5e903

Please sign in to comment.
You can’t perform that action at this time.