Skip to content

Commit

Permalink
ClassTag instead of Class parameter in AggregateEvent, #1
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Oct 25, 2016
1 parent 0492507 commit fcbdafb
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 23 deletions.
Expand Up @@ -57,7 +57,7 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
def receive = {
case EnsureActive(tagName) =>

val tag = AggregateEventTag(clazz, tagName)
val tag = new AggregateEventTag(clazz, tagName)

implicit val timeout = Timeout(globalPrepareTimeout)

Expand Down
Expand Up @@ -3,21 +3,25 @@
*/
package com.lightbend.lagom.scaladsl.persistence

import scala.reflect.ClassTag

object AggregateEventTag {
/**
* Convenience factory method of [[AggregateEventTag]] that uses the
* class name of the event type as `tag`. Note that it is needed to
* retain the original tag when the class name is changed because
* the tag is part of the store event data.
*/
def apply[Event <: AggregateEvent[Event]](eventType: Class[Event]): AggregateEventTag[Event] =
def apply[Event <: AggregateEvent[Event]: ClassTag](): AggregateEventTag[Event] = {
val eventType = implicitly[ClassTag[Event]].runtimeClass.asInstanceOf[Class[Event]]
new AggregateEventTag(eventType, eventType.getName)
}

/**
* Factory method of [[AggregateEventTag]].
*/
def apply[Event <: AggregateEvent[Event]](eventType: Class[Event], tag: String): AggregateEventTag[Event] =
new AggregateEventTag(eventType, tag)
def apply[Event <: AggregateEvent[Event]: ClassTag](tag: String): AggregateEventTag[Event] =
new AggregateEventTag(implicitly[ClassTag[Event]].runtimeClass.asInstanceOf[Class[Event]], tag)

/**
* Create an aggregate event shards tagger.
Expand All @@ -32,12 +36,13 @@ object AggregateEventTag {
* same entity will be produced by different event streams and handled by different shards in the read side
* processor, leading to out of order event handling.
*
* @param eventType The type of the event.
* @param numShards The number of shards.
* @return The aggregate event shards tagger.
*/
def sharded[Event <: AggregateEvent[Event]](eventType: Class[Event], numShards: Int): AggregateEventShards[Event] =
sharded(eventType, eventType.getName, numShards)
def sharded[Event <: AggregateEvent[Event]: ClassTag](numShards: Int): AggregateEventShards[Event] = {
val eventType = implicitly[ClassTag[Event]].runtimeClass.asInstanceOf[Class[Event]]
sharded[Event](eventType.getName, numShards)
}

/**
* Create a sharded aggregate event tag.
Expand All @@ -49,13 +54,15 @@ object AggregateEventTag {
* same entity will be produced by different event streams and handled by different shards in the read side
* processor, leading to out of order event handling.
*
* @param eventType The type of the event.
* @param baseTagName The base name for the tag, this will be combined with the shard number to form the tag name.
* @param numShards The number of shards.
* @return The aggregate event shards tagger.
*/
def sharded[Event <: AggregateEvent[Event]](eventType: Class[Event], baseTagName: String, numShards: Int): AggregateEventShards[Event] = {
new AggregateEventShards[Event](eventType, baseTagName, numShards)
def sharded[Event <: AggregateEvent[Event]: ClassTag](
baseTagName: String, numShards: Int): AggregateEventShards[Event] = {
new AggregateEventShards[Event](
implicitly[ClassTag[Event]].runtimeClass.asInstanceOf[Class[Event]],
baseTagName, numShards)
}

/**
Expand Down Expand Up @@ -102,8 +109,7 @@ sealed trait AggregateEventTagger[Event <: AggregateEvent[Event]] {
*/
final class AggregateEventTag[Event <: AggregateEvent[Event]](
val eventType: Class[Event],
val tag: String
) extends AggregateEventTagger[Event] {
val tag: String) extends AggregateEventTagger[Event] {

override def toString: String = s"AggregateEventTag($eventType, $tag)"

Expand All @@ -129,31 +135,28 @@ final class AggregateEventTag[Event <: AggregateEvent[Event]](
*/
final class AggregateEventShards[Event <: AggregateEvent[Event]](
val eventType: Class[Event],
val tag: String,
val numShards: Int
) extends AggregateEventTagger[Event] {
val tag: String,
val numShards: Int) extends AggregateEventTagger[Event] {

/**
* Get the tag for the given entity ID.
*
* @param entityId The entity ID to get the tag for.
* @return The tag.
*/
def forEntityId(entityId: String): AggregateEventTag[Event] = AggregateEventTag(
def forEntityId(entityId: String): AggregateEventTag[Event] = new AggregateEventTag(
eventType,
AggregateEventTag.shardTag(tag, AggregateEventTag.selectShard(numShards, entityId))
)
AggregateEventTag.shardTag(tag, AggregateEventTag.selectShard(numShards, entityId)))

/**
* Get all the tags for this shard.
*
* @return All the tags.
*/
val allTags: Set[AggregateEventTag[Event]] = {
(for (shardNo <- 0 until numShards) yield AggregateEventTag(
(for (shardNo <- 0 until numShards) yield new AggregateEventTag(
eventType,
AggregateEventTag.shardTag(tag, shardNo)
)).toSet
AggregateEventTag.shardTag(tag, shardNo))).toSet
}

override def toString: String = s"AggregateEventShards($eventType, $tag)"
Expand Down
Expand Up @@ -76,7 +76,7 @@ object TestEntity {
object Evt {
val NumShards = 4
// second param is optional, defaults to the class name
val aggregateEventShards = AggregateEventTag.sharded(classOf[Evt], NumShards)
val aggregateEventShards = AggregateEventTag.sharded[Evt](NumShards)

import play.api.libs.json._
import Serializers.emptySingletonFormat
Expand Down
Expand Up @@ -27,7 +27,7 @@ object Post {
object BlogEvent {
val NumShards = 20
// second param is optional, defaults to the class name
val aggregateEventShards = AggregateEventTag.sharded(classOf[BlogEvent], NumShards)
val aggregateEventShards = AggregateEventTag.sharded[BlogEvent](NumShards)
}

sealed trait BlogEvent extends AggregateEvent[BlogEvent] {
Expand Down

0 comments on commit fcbdafb

Please sign in to comment.