Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ActorRef serialization: transportInformation should be set #179

Closed
gbrd opened this issue Jan 16, 2018 · 10 comments
Closed

ActorRef serialization: transportInformation should be set #179

gbrd opened this issue Jan 16, 2018 · 10 comments

Comments

@gbrd
Copy link
Contributor

gbrd commented Jan 16, 2018

Please see akka/akka#24321 for details.
I will try to submit a PR

gbrd added a commit to gbrd/akka-persistence-mongo that referenced this issue Jan 16, 2018
…uld be set

Signed-off-by: Gaël Bréard <gael.breard@orange.com>
@scullxbones
Copy link
Owner

Hi @gbrd ,

Ok so this never worked? That's surprising. I guess the issue wouldn't show up if the sender was local?

From what I have been able to tell, we're currently delegating to the ActorRef serializer which apparently doesn't store all the information needed to be full fidelity? That's unexpected.

Can you provide more information? I'm just struggling with this issue, sorry. I'm also pretty concerned about adding a dependency on an undocumented (other than the big warning comment INTERNAL API) API.

@gbrd
Copy link
Contributor Author

gbrd commented Jan 17, 2018

Hi @scullxbones
Yes it only shows up in very specific conditions (serializing a EmptyLocalActorRef, i.e. an actorRef that was created from a local lookup but not found)

To be honest the API sounds very strange to me too, but I don't master this code completely. I understood it's a kind of optimization/trick.

Steps to reproduce :

  • launch a single cluster node on port 2551 that deploy an actor using akka-cluster-sharding and akka.cluster.sharding.state-store-mode = persistence
  • it will persist a ShardRegionRegistred(actorRef on port 2551)
  • kill -9
  • run it again, it will deserialize the ShardRegionRegistred(actorRef on port 2551) as an EmptyLocalActorRef because it's local (same port) but lookup will fail
  • thus it will persist ShardRegionTerminated(actorRef without address/port) (here is the bug)
  • run it again but changing configuration file and set port = 2552. deserialization will show errors/inconsistence due to missing host/address

Main :

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._


object AkkaQuickstart extends App {


  // Create the 'helloAkka' actor system
  val system: ActorSystem = ActorSystem("ClusterSystem")

  implicit val timeout: Timeout = 10.seconds
  implicit val ec = system.dispatcher


  val extractEntityId: ShardRegion.ExtractEntityId = {
    case EntityEnvelope(id, payload)  (id.toString, payload)
    case msg@Get(id)  (id.toString, msg)
  }

  val numberOfShards = 100

  val extractShardId: ShardRegion.ExtractShardId = {
    case EntityEnvelope(id, _)  (id % numberOfShards).toString
    case Get(id)  (id % numberOfShards).toString
    case ShardRegion.StartEntity(id) 
      // StartEntity is used by remembering entities feature
      (id.toLong % numberOfShards).toString
  }

  val counterRegion: ActorRef = ClusterSharding(system).start(
    typeName = "Counter",
    entityProps = Props[Counter],
    settings = ClusterShardingSettings(system),
    extractEntityId = extractEntityId,
    extractShardId = extractShardId)

  counterRegion ! EntityEnvelope(0,Increment)

  (counterRegion ? Get(0)).mapTo[Int].onComplete {
    case c => println("counter = " + c)
  }

Counter actor:

import akka.actor.ReceiveTimeout
import akka.persistence.PersistentActor
import akka.cluster.sharding.ShardRegion

import scala.concurrent.duration._

case object Increment
case object Decrement
final case class Get(counterId: Long)
final case class EntityEnvelope(id: Long, payload: Any)

case object Stop
final case class CounterChanged(delta: Int)


object  Counter {


}


class Counter extends PersistentActor {
  import ShardRegion.Passivate

  context.setReceiveTimeout(120.seconds)

  // self.path.name is the entity identifier (utf-8 URL-encoded)
  override def persistenceId: String = "Counter-" + self.path.name

  var count = 0

  def updateState(event: CounterChanged): Unit = {
    count += event.delta
  }

  override def receiveRecover: Receive = {
    case evt: CounterChanged  updateState(evt)
  }

  override def receiveCommand: Receive = {
    case Increment       persist(CounterChanged(+1))(updateState)
    case Decrement       persist(CounterChanged(-1))(updateState)
    case Get(_)          sender() ! count
    case ReceiveTimeout  context.parent ! Passivate(stopMessage = Stop)
    case Stop            context.stop(self)
  }

}

Config file:

akka {
  loglevel = "DEBUG"
  actor {
    provider = "cluster"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551"]

  }
}
akka.cluster.sharding.state-store-mode = persistence


//akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
//akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"
akka.persistence.snapshot-store.plugin = "akka-contrib-mongodb-persistence-snapshot"
akka.contrib.persistence.mongodb.mongo.database = "test2-db"


//akka.persistence.journal.plugin = "cassandra-journal"
//akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"

build.sbt

name := "akka-quickstart-scala"

version := "1.0"

scalaVersion := "2.12.3"

lazy val akkaVersion = "2.5.9"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-testkit" % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
  "org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8",


  "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.80",

  "com.github.scullxbones" %% "akka-persistence-mongo-casbah" % "2.0.4",
  "org.mongodb" %% "casbah" % "3.1.1"
)

@gbrd
Copy link
Contributor Author

gbrd commented Jan 17, 2018

My patch does not work for casbah because of Future{} that make us leave the scope of currentTransportInformation Dynamic variable.

Do you think that we should duplicate the code in CasbahPersistenceJournaller and RxMongoJournaller ?

@scullxbones
Copy link
Owner

Can it be done in MongoDataModel? In Payload.apply maybe?

I feel a lot better about this in general with the new public API being added to Serialization:

def withTransportInformation(system: ExtendedActorSystem)(f: () => Unit): Unit

Or something to that effect.

@scullxbones
Copy link
Owner

Also, is there a way to add a test for this? Seems like it would be difficult but I thought i'd at least ask.

@gbrd
Copy link
Contributor Author

gbrd commented Jan 19, 2018

Yes it would be much better in Payload.apply. I did not put here initially because I did not see we had access to actorSystem (which is needed) through implicit Serialization parameter.
Maybe we should wait the new API ?
About test: I did not look how existing tests looks like, it would probably need using remoting and multiple actorsystem (or stop/start the same one with different config).

@scullxbones
Copy link
Owner

Maybe we should wait the new API ?

I'd be okay with that - it's not clear though when that would be coming. Can implement against the old internal API I guess for now. Is this issue blocking you currently?

@gbrd
Copy link
Contributor Author

gbrd commented Jan 26, 2018

Yes and no, I "overloaded" the class in my protect..
I can do a PR with the modification in Payload.apply if it helps.

gbrd added a commit to gbrd/akka-persistence-mongo that referenced this issue Jan 26, 2018
…uld be set

Signed-off-by: Gaël Bréard <gael.breard@orange.com>
gbrd added a commit to gbrd/akka-persistence-mongo that referenced this issue Jan 26, 2018
@gbrd
Copy link
Contributor Author

gbrd commented Jan 26, 2018

I modified the fix with a cleaner solution.
I added one test.

@scullxbones
Copy link
Owner

This is in the 2.0.5 release. Thanks again for the PR!

gbrd added a commit to gbrd/akka-persistence-mongo that referenced this issue Oct 9, 2018
ActorRef serialization: transportInformation should be set
(Add serialization with transport information method in public API)

Signed-off-by: Gaël Bréard <gael.breard@orange.com>
scullxbones pushed a commit that referenced this issue Oct 13, 2018
* fix: replace fix #179 with akka/akka #24321

ActorRef serialization: transportInformation should be set
(Add serialization with transport information method in public API)

Signed-off-by: Gaël Bréard <gael.breard@orange.com>

* fix: remove SerializationHelper itself, do missing change in rxmongo

Signed-off-by: Gaël Bréard <gael.breard@orange.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants