Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serializer for org.opencv.core.Mat #110

Closed
toaditoad opened this issue Jan 31, 2017 · 10 comments
Closed

Serializer for org.opencv.core.Mat #110

toaditoad opened this issue Jan 31, 2017 · 10 comments

Comments

@toaditoad
Copy link

toaditoad commented Jan 31, 2017

I'm having trouble to implement a serializer for a org.opencv.core.Mat object representing e.g. a frame of a video file. The idea is that a system A sends a video frame in greyscale to a system B in order to detect objects in it. For that purpose, the frame of type Mat has to be serialized and sent over the network.
However, it seems that it is never actually sent and/or it stops the actor of sending heartbeats what causes the system to fail.

Any ideas what to do about it?

I also asked it on Stackoverflow: http://stackoverflow.com/questions/41957087/akka-remoting-custom-serializer-for-org-opencv-core-mat

configuration.conf (on system A and B)

serializers {
  kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
}
serialization-bindings {
  "org.opencv.core.Mat" = kryo
}
kryo  {
  kryo-custom-serializer-init = "de.itd.util.KryoInit"
  type = "nograph"
  idstrategy = "explicit"
  buffer-size = 4096
  max-buffer-size = -1
  use-manifests = true
  use-unsafe = false
  post-serialization-transformations = "lz4"
  kryo-trace = true
  resolve-subclasses = false
}

de.itd.util.KryoInit.scala (on system A and B)

package de.itd.util

import com.esotericsoftware.kryo.Kryo
import org.opencv.core.Mat

class KryoInit {
  def customize(kryo: Kryo): Unit  = {
    kryo.addDefaultSerializer(classOf[Mat], classOf[MatKryoSerializer])
    kryo.register(classOf[Mat], 21)
  }
}

de.itd.util.MatKryoSerializer (on system A and B)

package de.itd.util

import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import org.opencv.core.{CvType, Mat}

class MatKryoSerializer extends Serializer[Mat] {
  override def write(kryo: Kryo, output: Output, m: Mat): Unit = {
    val bufferSize: Int = m.rows * m.cols * m.channels
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize)
    m.get(0, 0, arrayByte)

    output.write(arrayByte)
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[Mat]): Mat = {
    val rows = 2160
    val cols = 4096
    val channels = 1
    val bufferSize = rows * cols * channels
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize)

    val frame = new Mat(rows, cols , CvType.CV_8U)
    input.readBytes(arrayByte)
    frame.put(0, 0, arrayByte)

    frame
  }
}

Log of system A (sending a frame to system B)

00:00 TRACE: [kryo] Registration required: true
00:00 TRACE: [kryo] References: false
00:00 TRACE: [kryo] Register class ID 21: org.opencv.core.Mat (de.itd.util.MatKryoSerializer)
[INFO] [01/31/2017 12:31:48.390] [JavaFX Application Thread] [akka.remote.Remoting] Starting remoting
[INFO] [01/31/2017 12:31:48.598] [JavaFX Application Thread] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://MainActorSystem@10.150.80.177:2552]
[INFO] [01/31/2017 12:31:48.602] [JavaFX Application Thread] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://MainActorSystem@10.150.80.177:2552]
12:31:48.633 [JavaFX Application Thread] INFO de.itd.controller.MainViewController - MainActorSystem started.
12:31:49.168 [MainActorSystem-akka.actor.default-dispatcher-2] INFO de.itd.actor.local.MainActor - Detector DetectionActor-0 registered.
12:31:54.788 [JavaFX Application Thread] INFO de.itd.controller.MainViewController - detectCars
12:31:56.318 [MainActorSystem-akka.actor.default-dispatcher-4] INFO de.itd.actor.local.MainActor - DetectionActor-0 asked for a frame.
00:08 TRACE: [kryo.FieldSerializerConfig] useAsm: false
00:08 TRACE: [kryo.FieldSerializerConfig] useAsm: false
00:08 TRACE: [kryo] Register class ID 0: int (com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer)
00:08 TRACE: [kryo] Register class ID 1: String (com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer)
00:08 TRACE: [kryo] Register class ID 2: float (com.esotericsoftware.kryo.serializers.DefaultSerializers$FloatSerializer)
00:08 TRACE: [kryo] Register class ID 3: boolean (com.esotericsoftware.kryo.serializers.DefaultSerializers$BooleanSerializer)
00:08 TRACE: [kryo] Register class ID 4: byte (com.esotericsoftware.kryo.serializers.DefaultSerializers$ByteSerializer)
00:08 TRACE: [kryo] Register class ID 5: char (com.esotericsoftware.kryo.serializers.DefaultSerializers$CharSerializer)
00:08 TRACE: [kryo] Register class ID 6: short (com.esotericsoftware.kryo.serializers.DefaultSerializers$ShortSerializer)
00:08 TRACE: [kryo] Register class ID 7: long (com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer)
00:08 TRACE: [kryo] Register class ID 8: double (com.esotericsoftware.kryo.serializers.DefaultSerializers$DoubleSerializer)
00:08 TRACE: [kryo] Register class ID 9: void (com.esotericsoftware.kryo.serializers.DefaultSerializers$VoidSerializer)
00:08 TRACE: [kryo] Register class ID 10: scala.Enumeration$Val (com.romix.scala.serialization.kryo.EnumerationSerializer)
00:08 TRACE: [kryo] Register class ID 11: scala.Enumeration$Value (com.romix.scala.serialization.kryo.EnumerationSerializer)
00:08 TRACE: [kryo] Registration required: true
00:08 TRACE: [kryo] References: false
00:08 TRACE: [kryo] Register class ID 21: org.opencv.core.Mat (de.itd.util.MatKryoSerializer)
00:08 DEBUG: [kryo] Write: Mat [ 2160*4096*CV_8UC1, isCont=true, isSubmat=false, nativeObj=0x7feb070c0bd0, dataAddr=0x145725020 ]
00:08 TRACE: [kryo] Object graph complete.
[WARN] [01/31/2017 12:32:07.645] [MainActorSystem-akka.remote.default-remote-dispatcher-5] [akka.tcp://MainActorSystem@10.150.80.177:2552/system/remote-watcher] Detected unreachable: [akka.tcp://RemoteActorSystem@10.150.20.159:2553]
[WARN] [01/31/2017 12:32:07.650] [MainActorSystem-akka.remote.default-remote-dispatcher-13] [akka.remote.Remoting] Association to [akka.tcp://RemoteActorSystem@10.150.20.159:2553] having UID [-664475844] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[INFO] [01/31/2017 12:32:08.288] [MainActorSystem-akka.actor.default-dispatcher-4] [akka://MainActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FRemoteActorSystem%4010.150.20.159%3A2553-1] Message [akka.remote.transport.AssociationHandle$InboundPayload] from Actor[akka://MainActorSystem/deadLetters] to Actor[akka://MainActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FRemoteActorSystem%4010.150.20.159%3A2553-1#-764637076] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Log of system B (should receive a frame from system A)

00:00 TRACE: [kryo] Registration required: true
00:00 TRACE: [kryo] References: false
00:00 TRACE: [kryo] Register class ID 21: org.opencv.core.Mat (de.itd.util.MatKryoSerializer)
[INFO] [01/31/2017 12:31:29.946] [main] [akka.remote.Remoting] Starting remoting
[INFO] [01/31/2017 12:31:30.253] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://RemoteActorSystem@10.150.20.159:2553]
[INFO] [01/31/2017 12:31:30.255] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://RemoteActorSystem@10.150.20.159:2553]
12:31:30.272 [main] INFO de.itd.ui.Main$ - RemoteActorSystem started.
12:31:47.050 [RemoteActorSystem-akka.actor.default-dispatcher-2] INFO de.itd.actor.remote.DetectionGroupActor - Receiving initialization message...
12:31:54.308 [RemoteActorSystem-akka.actor.default-dispatcher-5] INFO de.itd.actor.remote.DetectionActor - Frame is available.
[WARN] [01/31/2017 12:32:06.285] [RemoteActorSystem-akka.remote.default-remote-dispatcher-7] [akka.tcp://RemoteActorSystem@10.150.20.159:2553/system/remote-watcher] Detected unreachable: [akka.tcp://MainActorSystem@10.150.80.177:2552]
[WARN] [01/31/2017 12:32:06.291] [RemoteActorSystem-akka.remote.default-remote-dispatcher-6] [akka.remote.Remoting] Association to [akka.tcp://MainActorSystem@10.150.80.177:2552] having UID [-946314302] is irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover from this situation.
[INFO] [01/31/2017 12:32:06.365] [RemoteActorSystem-akka.actor.default-dispatcher-2] [akka://RemoteActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FMainActorSystem%4010.150.80.177%3A2552-2] Message [akka.remote.transport.AssociationHandle$InboundPayload] from Actor[akka://RemoteActorSystem/deadLetters] to Actor[akka://RemoteActorSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FMainActorSystem%4010.150.80.177%3A2552-2#-1039432132] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[WARN] [01/31/2017 12:32:06.367] [RemoteActorSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://RemoteActorSystem@10.150.20.159:2553/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FMainActorSystem%4010.150.80.177%3A2552-1/endpointWriter] AssociationError [akka.tcp://RemoteActorSystem@10.150.20.159:2553] -> [akka.tcp://MainActorSystem@10.150.80.177:2552]: Error [Invalid address: akka.tcp://MainActorSystem@10.150.80.177:2552] [
akka.remote.InvalidAssociation: Invalid address: akka.tcp://MainActorSystem@10.150.80.177:2552
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system has a UID that has been quarantined. Association aborted.
]
[WARN] [01/31/2017 12:32:06.371] [RemoteActorSystem-akka.remote.default-remote-dispatcher-6] [akka.remote.Remoting] Tried to associate with unreachable remote address [akka.tcp://MainActorSystem@10.150.80.177:2552]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has a UID that has been quarantined. Association aborted.]
@romix
Copy link
Collaborator

romix commented Jan 31, 2017

It seems like a Kryo issue. Could you try to separate the issue from Akka? I.e. can you try to create/obtain the Kryo instance and see if you can serialize/deserialize your data using it, but without using Akka or trying to send anything over the network?

@toaditoad
Copy link
Author

toaditoad commented Feb 1, 2017

Thanks for getting back to me, @romix! I followed your suggestion and separated the issue from Akka what turned out to work fine. The following shows serialization/deserialization of a case class, an "empty" Mat and a Mat representing a video frame. Sorry for not providing a complete MWE but that seems impossible due to compiling OpenCV.

package de.itd.kryoserialization

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import org.opencv.core.{CvType, Mat}
import org.opencv.imgproc.Imgproc
import org.opencv.videoio.VideoCapture

object KryoTroubleshooting extends App {
  System.load("/lib/libopencv_java310.dylib")

  val kryo: Kryo = new Kryo()
  kryo.addDefaultSerializer(classOf[Greeting], classOf[GreetingKryoSerializer])
  kryo.register(classOf[Greeting], 21)

  kryo.addDefaultSerializer(classOf[Mat], classOf[MatKryoSerializer])
  kryo.register(classOf[Mat], 22)

  println("roundTrip: Greeting")
  case class Greeting(message: String)
  val greetingMsg = Greeting("Hello")
  val kryoGreetingMsg = roundTrip(greetingMsg)
  assert(greetingMsg == kryoGreetingMsg)

  println("roundTrip: empty Mat")
  val emptyMat = new Mat(2160, 4096, CvType.CV_8UC1)
  val kryoEmptyMat = roundTrip(emptyMat)
  // obviously, the Mat instances are not the same because their native addresses are different but their contents match
  assert(emptyMat.get(100, 200) sameElements kryoEmptyMat.get(100, 200))
  assert(emptyMat.get(300, 150) sameElements kryoEmptyMat.get(300, 150))
  assert(emptyMat.get(900, 2222) sameElements kryoEmptyMat.get(900, 2222))

  println("roundTrip: video frame Mat")
  val videoCapture = new VideoCapture()
  videoCapture.open("footage.mov")
  val frame = new Mat()
  videoCapture.read(frame)
  videoCapture.release()
  val greyFrame = transformToGreyscale(frame)
  val kryoGreyFrame = roundTrip(greyFrame)
  assert(greyFrame.get(100, 200) sameElements kryoGreyFrame.get(100, 200))
  assert(greyFrame.get(300, 150) sameElements kryoGreyFrame.get(300, 150))
  assert(greyFrame.get(900, 2222) sameElements kryoGreyFrame.get(900, 2222))

  def roundTrip[T](obj: T): T = {
    val outStream = new ByteArrayOutputStream()
    val output = new Output(outStream, 4096)
    kryo.writeClassAndObject(output, obj)
    output.flush()

    val input = new Input(new ByteArrayInputStream(outStream.toByteArray), 4096)
    val obj1 = kryo.readClassAndObject(input)

    obj1.asInstanceOf[T]
  }

  def transformToGreyscale(m: Mat): Mat = {
    val greyMat = new Mat()
    Imgproc.cvtColor(m, greyMat, Imgproc.COLOR_BGR2GRAY)
    Imgproc.equalizeHist(greyMat, greyMat)
    greyMat
  }
}

Coming back to my original situation, I realized that the code works when running the two actor systems in a single JVM on two different ports. Putting them on two different machines lets the serialization of a Mat holding a video frame fail but not the "empty" Mat. Does that make any sense?

@toaditoad
Copy link
Author

@romix, please don't waste any more thoughts on that for now... I'm not 100% sure yet what I have changed but I came up with a working solution that sends one Mat object to an actor in a different actor system running on a different machine using akka-remote. I left the code basically untouched but tried it on two machines with macOS. Tomorrow, I will verify it in the original environment involving one machine with macOS and one with Windows. I will update and probably close this issue as soon as I can confirm that it is not kryo related.

@romix
Copy link
Collaborator

romix commented Feb 1, 2017

@toaditoad Thanks for trying to isolate the issue.

  • In your test above, I'd use two different Kryo instances to mimic the situation of using two different machines.

  • If you think that this could be an issue related to using macOS or Windows, may be it is related to the UnsafeInput/UnsafeOutput being (implicitly) used for serialization? Could you check? Typically, it is supposed to happen only if you use machines with different processor architectures, but may be it may also happen if you use different OSes?

@toaditoad
Copy link
Author

  • You mean, instead of just having one val kryo: Kryo = new Kryo(), I should have val kryo1: Kryo = new Kryo() and val kryo2: Kryo = new Kryo() in order to update the roundtrip method accordingly: kryo1.writeClassAndObject(output, obj) and val obj1 = kryo2.readClassAndObject(input)? If that is correct, the result is the same and works fine.

  • I have only used the "default" setting from the readme that sets it to false (use-unsafe = false). Or is the UnsafeInput/UnsafeOutput used anyway?

@romix
Copy link
Collaborator

romix commented Feb 1, 2017

You mean, instead of just having one val kryo: Kryo = new Kryo(), I should have val kryo1: Kryo = new Kryo() and val kryo2: Kryo = new Kryo() in order to update the roundtrip method accordingly: kryo1.writeClassAndObject(output, obj) and val obj1 = kryo2.readClassAndObject(input)? If that is correct, the result is the same and works fine.

Yes. That's what I meant.

I have only used the "default" setting from the readme that sets it to false (use-unsafe = false). Or is the UnsafeInput/UnsafeOutput used anyway?

UnsafeInput/UnsafeOutput should not be used in this case. But I'd check anyways that it is really the case ;-) I'm just trying to exclude as many possibilities as possible.

One more thing to check:

In your original trace I can see this: akka.remote.InvalidAssociation: Invalid address: akka.tcp://MainActorSystem@10.150.80.177:2552 Is it supposed to be like this? May be there is a connection issue?

@toaditoad
Copy link
Author

Thanks for your support, @romix. Unfortunately, I cannot reproduce the issue any more. Today I tried the version that had worked using two macOS machines on the original machines (Windows and macOS) and it worked just fine... By now, I can only take wild guesses what has changed.

In your original trace I can see this: akka.remote.InvalidAssociation: Invalid address: akka.tcp://MainActorSystem@10.150.80.177:2552 Is it supposed to be like this? May be there is a connection issue?

Yes, that is the correct address of one of the actor systems. I doubt that there is a connection issue because right before sending the video frame, the two systems communicate with each other - that is why I thought it is rather an issue with Kryo than with Akka itself. But as said, I cannot reproduce the issue any more. Thanks again for your great work and I really appreciate your support!

I'm attaching the current code snippets for future reference. Please note that in contrast to the original post, this version serializes objects of case class Frame(videoPos: Double, frame: Mat).

akka {
  extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
  actor {
    provider = remote
    warn-about-java-serializer-usage = no
    enable-additional-serialization-bindings = on
    serializers {
      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
    }
    serialization-bindings {
      "de.itd.actor.common.Message$Frame" = kryo
    }
    kryo  {
      kryo-custom-serializer-init = "de.itd.util.KryoInit"
      type = "nograph"
      idstrategy = "explicit"
      buffer-size = 4096
      max-buffer-size = -1
      use-manifests = true
      use-unsafe = false
      post-serialization-transformations = "lz4"
      kryo-trace = true
      resolve-subclasses = false
    }
  }
}
class KryoInit {
  def customize(kryo: Kryo): Unit  = {
    kryo.addDefaultSerializer(classOf[Frame], classOf[FrameKryoSerializer])
    kryo.register(classOf[Frame], 20)
  }
}
class FrameKryoSerializer extends Serializer[Frame] {
  override def write(kryo: Kryo, output: Output, frame: Frame): Unit = {
    output.writeDouble(frame.videoPos)
    val m: Mat = frame.frame
    val rows = m.rows()
    val cols = m.cols()
    val channels = m.channels()
    val matType = m.`type`()
    output.writeInt(rows)
    output.writeInt(cols)
    output.writeInt(channels)
    output.writeInt(matType)

    val bufferSize: Int = rows * cols * channels
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize)
    m.get(0, 0, arrayByte)
    output.write(arrayByte)
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[Frame]): Frame = {
    val videoFramePos = input.readDouble()

    val rows = input.readInt()
    val cols = input.readInt()
    val channels = input.readInt()
    val matType = input.readInt()

    val bufferSize = rows * cols * channels
    val arrayByte: Array[Byte] = new Array[Byte](bufferSize)
    val frame = new Mat(rows, cols , matType)
    input.readBytes(arrayByte)
    frame.put(0, 0, arrayByte)

    Frame(videoFramePos, frame)
  }
}

@toaditoad
Copy link
Author

toaditoad commented Feb 2, 2017

I discovered that the behavior is not very consistent what is not an issue with Kryo but rather with the network itself. I should also point out that these video frames are retrieved from a 4k video recording resulting in pretty large mat objects (even when converted into greyscale). Sending such an object (even when compressed with lz4 or deflate) is pretty time-consuming and causes Akka to skip/lose heartbeats. I found a working but not nice way to make the communication work by increasing the akka.remote configuration with:

transport-failure-detector {
  heartbeat-interval = 4 s
  acceptable-heartbeat-pause = 25 s
}
watch-failure-detector {
  heartbeat-interval = 1 s
  threshold = 200.0
  min-std-deviation = 500 ms
  acceptable-heartbeat-pause = 20 s
  expected-response-after = 10 s
}

Obviously, this is not a good approach and I will investigate other options in order to avoid sending such large messages. I would like to perform some basic measurements to support my statement that it is not an option and was wondering how to get the message size after serialization. Is input.available() safe to use? I remember reading that it is never correct to use the return value of this method... What about input.getBuffer.length or input.limit()?

@luben
Copy link
Contributor

luben commented Feb 2, 2017

@toaditoad , there is a new remoting transport (search for Akka Artery) that should solve this problem (head of the line blocking) by maintaining different streams for user and system messages. There is also a provision for large messages stream that may fit your case.

@toaditoad
Copy link
Author

@luben, thank you for pointing Akka Artery out! I have heard about it but haven't had the chance to look at it in detail. You're right, it sounds promising.

After figuring out that the akka.actor.remote.artery.large-message-destinations array has to match among all involved actor systems (I thought it just has to be specified on the sending side..), I'm struggling with the maximum-large-frame-size. It seems that this setting does not work with values higher than 7 MB. However, since this gets more and more unrelated to my original Kryo issue (what wasn't related after all), I opened another issue in Akka's issue tracker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants