Skip to content

Commit

Permalink
Blocking queue replaced by a queue builder which defaults to a Concur…
Browse files Browse the repository at this point in the history
…rentLinkedQueue.
  • Loading branch information
Guido Medina committed Mar 11, 2016
1 parent e100204 commit 045cd27
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 63 deletions.
47 changes: 42 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,13 @@ The following options are available for configuring this serializer:

idstrategy = "incremental"

# Define a default size for serializer pool
# Try to define the size to be at least as big as the max possible
# number of threads that may be used for serialization, i.e. max
# number of threads allowed for the scheduler
serializer-pool-size = 16
# Define a default queue builder, by default ConcurrentLinkedQueue is used.
# To pass your own queue builder implement the trait KryoSerializer.QueueBuilder
# useful for paranoid GC users that want to use JCtools MpmcArrayQueue for example.
# If you pass a bounded queue make sure its capacity will be equal or greater than
# the concurrent threads your application will ever have running concurrently:
#
# custom-queue-builder = "a.b.c.KryoQueueBuilder"

# Define a default size for byte buffers used during serialization
buffer-size = 4096
Expand Down Expand Up @@ -249,6 +251,41 @@ only an object of a top-level class to be sent. It picks a matching serializer f
this top-level class, e.g. a default Java serializer, and then it serializes the
whole object graph with this object as a root using this Java serializer.

Kryo queue builder examples:
----------------------------

* Scala bounded queue builder with a capacity of 32:

package a.b.c

import akka.serialization.Serializer
import com.romix.akka.serialization.kryo.QueueBuilder
import org.jctools.queues.MpmcArrayQueue
import java.util.Queue

class KryoQueueBuilder extends QueueBuilder {
def build: Queue[Serializer] = {
new MpmcArrayQueue[Serializer](32)
}
}

* Java bounded queue builder with a capacity of 32:

package a.b.c;

import akka.serialization.Serializer;
import com.romix.akka.serialization.kryo.QueueBuilder;
import org.jctools.queues.MpmcArrayQueue;
import java.util.Queue;

public class KryoQueueBuilder implements QueueBuilder {

@Override
public Queue<Serializer> build() {
return new MpmcArrayQueue<>(32);
}
}


How do you create mappings or classes sections with proper content?
-------------------------------------------------------------------
Expand Down
8 changes: 5 additions & 3 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ akka {
# The serialization byte buffers are doubled as needed until they exceed maxBufferSize and an exception is thrown. Can be -1 for no maximum.
max-buffer-size = -1

# Define a default size for serializer pool
serializer-pool-size = 16

# Define a default queue builder, by default ConcurrentLinkedQueue is used.
# Look at the main documentation for a concrete example:
#
# custom-queue-builder = "a.b.c.KryoQueueBuilder"

# If set, akka uses manifests to put a class name
# of the top-level object into each message
use-manifests = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package com.romix.akka.serialization.kryo

import java.io.UnsupportedEncodingException
import java.security.{NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, SecureRandom}
import java.security.SecureRandom
import java.util
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.zip.{Deflater, Inflater}
import javax.crypto.{NoSuchPaddingException, BadPaddingException, IllegalBlockSizeException, Cipher}
import javax.crypto.spec.{SecretKeySpec, IvParameterSpec}
import javax.crypto.Cipher
import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}

import akka.actor.{ActorRef, ExtendedActorSystem}
import akka.event.Logging
Expand Down Expand Up @@ -192,8 +193,6 @@ class KryoSerializer(val system: ExtendedActorSystem) extends Serializer {
log.debug("Got max-buffer-size: {}", maxBufferSize)
}

val serializerPoolSize = settings.SerializerPoolSize

val idStrategy = settings.IdStrategy

locally {
Expand Down Expand Up @@ -296,10 +295,21 @@ class KryoSerializer(val system: ExtendedActorSystem) extends Serializer {
log.debug("Got transformations: {}", settings.PostSerTransformations)
}

val queueBuilder: QueueBuilder =
if (settings.CustomQueueBuilder == null) null
else system.dynamicAccess.getClassFor[AnyRef](settings.CustomQueueBuilder) match {
case Success(clazz) => clazz.newInstance().asInstanceOf[QueueBuilder]
case Failure(e) =>
log.error("Class could not be loaded: {} ", settings.CustomQueueBuilder)
throw e
}
locally {
log.debug("Got queue builder: {}", queueBuilder)
}

val serializer = try new KryoBasedSerializer(getKryo(idStrategy, serializerType),
bufferSize,
maxBufferSize,
serializerPoolSize,
useManifests)
catch {
case e: Exception => {
Expand Down Expand Up @@ -333,11 +343,10 @@ class KryoSerializer(val system: ExtendedActorSystem) extends Serializer {
obj
}

val serializerPool = new ObjectPool[Serializer](serializerPoolSize, () => {
val serializerPool = new SerializerPool(queueBuilder, () => {
new KryoBasedSerializer(getKryo(idStrategy, serializerType),
bufferSize,
maxBufferSize,
serializerPoolSize,
useManifests)
})

Expand Down Expand Up @@ -473,7 +482,6 @@ class KryoBasedSerializer(
val kryo: Kryo,
val bufferSize: Int,
val maxBufferSize: Int,
val bufferPoolSize: Int,
val useManifests: Boolean) extends Serializer {

// This is whether "fromBinary" requires a "clazz" or not
Expand Down Expand Up @@ -515,51 +523,38 @@ class KryoBasedSerializer(

}

import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

// Support pooling of objects. Useful if you want to reduce
// the GC overhead and memory pressure.
class ObjectPool[T](number: Int, newInstance: () => T) {
/**
* Returns a SerializerPool, useful to reduce GC overhead.
*
* @param queueBuilder queue builder.
* @param newInstance Akka serializer instance creator.
*/
class SerializerPool(queueBuilder: QueueBuilder, newInstance: () => Serializer) {

private val size = new AtomicInteger(0)
private val pool = new ArrayBlockingQueue[T](number)
private val pool = if (queueBuilder == null) new ConcurrentLinkedQueue[Serializer] else queueBuilder.build

def fetch(): T = {
def fetch(): Serializer = {
pool.poll() match {
case o if o != null => o
case null => createOrBlock
case null => newInstance()
}
}

def release(o: T): Unit = {
def release(o: Serializer): Unit = {
pool.offer(o)
}

def add(o: T): Unit = {
def add(o: Serializer): Unit = {
pool.add(o)
}
}

private def createOrBlock: T = {
size.get match {
case e: Int if e == number => block
case _ => create
}
}

private def create: T = {
size.incrementAndGet match {
case e: Int if e > number =>
size.decrementAndGet; fetch()
case e: Int => newInstance()
}
}
/**
* Kryo custom queue builder, to replace ConcurrentLinkedQueue for another Queue,
* Notice that it must be a multiple producer and multiple consumer queue type,
* you could use for example JCtools MpmcArrayQueue.
*/
trait QueueBuilder {

private def block: T = {
val timeout = 5000
pool.poll(timeout, TimeUnit.MILLISECONDS) match {
case o if o != null => o
case _ => throw new Exception("Couldn't acquire object in %d milliseconds.".format(timeout))
}
}
def build: util.Queue[Serializer]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,11 @@

package com.romix.akka.serialization.kryo

import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.ExtendedActorSystem
import akka.actor.{ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
import akka.event.Logging
import com.typesafe.config.Config

import akka.actor.{ ActorSystem, Extension, ExtendedActorSystem, Address, DynamicAccess }
import akka.event.Logging
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ArrayBuffer
import scala.util.{ Try, Success, Failure }
import java.io.NotSerializableException
import scala.util.Try

object KryoSerialization {

Expand All @@ -45,8 +38,6 @@ object KryoSerialization {

val MaxBufferSize: Int = config.getInt("akka.actor.kryo.max-buffer-size")

val SerializerPoolSize: Int = config.getInt("akka.actor.kryo.serializer-pool-size")

// Each entry should be: FQCN -> integer id
val ClassNameMappings: Map[String, String] = configToMap(getConfig("akka.actor.kryo.mappings"))

Expand Down Expand Up @@ -75,6 +66,8 @@ object KryoSerialization {

val KryoCustomSerializerInit: String = Try(config.getString("akka.actor.kryo.kryo-custom-serializer-init")).getOrElse(null)

val CustomQueueBuilder: String = Try(config.getString("akka.actor.kryo.custom-queue-builder")).getOrElse(null)

private def configToMap(cfg: Config): Map[String, String] =
cfg.root.unwrapped.asScala.toMap.map { case (k, v) => (k, v.toString) }
}
Expand Down

0 comments on commit 045cd27

Please sign in to comment.