Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Home

Martin Krasser edited this page · 69 revisions
Clone this wiki locally

A Scalaz-based DSL for Apache Camel

Update

Scalaz-camel is now superseded by the streamz project and its combinators for Apache Camel.

Contents

Latest Release

Introduction

This project provides a domain-specific language (DSL) for Apache Camel that is based on the Scala programming language and the Scalaz library. It applies functional programming (FP) concepts to enterprise integration patterns (EIPs) so that integration solutions can be constructed by composing EIPs with a functional DSL.

The scalaz-camel DSL is an alternative to Camel’s existing Java DSL and Scala DSL (it does not depend on the org.apache.camel.model package). It was re-designed from scratch with the goal of a better and more native integration with the Scala programming language. See also this interview.

Key Features of scalaz-camel

  • A functional DSL for composing message processors to message processing routes. Message processors can be
    • Scala functions that return the processing result directly
    • Scala functions with continuation-passing style (CPS) for asynchronous, non-blocking processing
    • Existing Camel message processors (both synchronous and asynchronous)
    • Existing Camel endpoint producers (all Camel components can be re-used)
  • A message processing route itself is a message processor and can be further composed.
  • A message processing route can be created and applied anywhere in a program, not only in special route builders.
  • Non-blocking routing of messages along the message processing chain.
  • Fault-tolerance via error handling strategies with optional message redelivery.
  • Immutability of messages so that they can be shared by concurrent message processors.
  • Predefined message processors and enterprise integration patterns (EIPs).
  • Configurable concurrency strategies.
  • Extensibility of the DSL.
  • Akka integration
    • For exchanging messages with Akka actors
    • For implementing stateful EIPs such as aggregator, resequencer etc.

Getting Started

Let’s get started with a simple example. The example source code is part of the project. Instructions how to download and build scalaz-camel can be found here. To run the example enter

$ sbt
> project scalaz-camel-samples
> test-only

The following subsections will walk through the example step-by-step. It is assumed that you already have a basic knowledge of Apache Camel and the Scala programming language. A good introduction to Apache Camel is the free Chapter 1 of Camel in Action.

Route Definition

The example is from the order processing domain. It exposes a service that accepts purchase order messages, validates them and queues valid orders for further processing. This is outlined in the following figure.

With scalaz-camel, the above route can be implemented as follows

// order placement route (main route)
val placeOrderRoute = validateOrder >=> oneway >=> to("jms:queue:valid") >=> { 
  m: Message => m.setBody("order accepted")
}

// order placement route consuming from direct:place-order endpoint (incl. error handler)
from("direct:place-order") {
  attempt { placeOrderRoute } fallback {
    case e: ValidationException => { m: Message => m.setBody("order validation failed") }
    case e: Exception           => { m: Message => m.setBody("general processing error") } >=> failWith(e)
  }
}

// order processing route
from("jms:queue:valid") {
  split { m: Message => for (item <- m.bodyAs[PurchaseOrder].items) yield m.setBody(item) } >=> choose {
    case Message(PurchaseOrderItem(_, "books", _, _), _) => orderItemToTuple >=> to("mock:books")
    case Message(PurchaseOrderItem(_, "bikes", _, _), _) => to("mock:bikes")
  } >=> { m: Message => println("received order item = %s" format m.body); m }
}

where validateOrder and orderItemToTuple are example-specific message processors:

// Continuation-passing style (CPS) message processor that validates order messages
// in a separate thread (created by Strategy.Naive). Validation responses are passed
// (asynchronously) to continuation k.
val validateOrder: MessageProcessor = (m: Message, k: MessageValidation => Unit) => {
  Strategy.Naive.apply(m.body match {
    case order: PurchaseOrder if (!order.items.isEmpty) => k(m.success)
    case _ => k(m.setException(ValidationException("invalid order")).fail)
  })
}

// Direct-style message processor that transforms an order item to a tuple. Synchronous processor.
val orderItemToTuple = (m: Message) => m.transform[PurchaseOrderItem](i => (i.customer, i.name, i.amount))

A purchase order is defined as follows.

case class PurchaseOrder(items: List[PurchaseOrderItem])
case class PurchaseOrderItem(customer: Int, category: String, name: String, amount: Int)

Order Placement Route

The example starts with the definition of placeOrderRoute, a message processing route that is composed of four processors. The composition operator is >=>. This is the Kleisli composition operator from Scalaz (the monad used for Kleisli composition is Scala’s Responder, a continuation monad. The details behind Kleisli composition, the Responder monad and implicit conversion from message processors to Kleisli structures are explained in the Architecture section (TODO). You can however safely skip that section, it is not really needed for understanding this example and for using the scalaz-camel DSL).

The first processor in the placeOrderRoute, validateOrder is a message written in continuation-passing style (CPS). It’s type is MessageProcessor which is an alias for (Message, MessageValidation => Unit) => Unit. The first parameter is the input Message and the second parameter is a continuation that is called by the message processor with the message processing result (or response). The result type is MessageValidation which can be either Failure or Success. A Failure object contains a message that caused the failure. The failed message itself contains an exception object describing failure details. A Success object contains a normal response message. The methods success and fail (from Scalaz) are used to create Success and Failure objects.

A CPS processor can call the continuation either synchronously or asynchronously (as done in validateOrder using Scalaz’s Naive concurrency strategy). Calling the continuation causes the response message to be routed to the next processor. Please note that scalaz-camel internally does not wait for a response (by blocking a thread) while running this processor (non-blocking routing).

When a message is successfully validated, it is routed to the next processor in the route. On failure, all remaining processors are bypassed and the final response is the failure response generated by the validator. Please note that bypassing remaining processors on failure is not a feature of the example validator, this is done internally by scalaz-camel. Any message processor that generates a Failure response will cause scalaz-camel to skip all remaining processors (except those defined in error handlers).

The second processor in the route, oneway, prepares the message for being sent via JMS to a queue of validated order messages. If we omitted this processor we’d tell the Camel JMS endpoint to wait for a reply (using a temporary queue) but this is not want we want. The third processor, to(“jms:queue:valid”), is an endpoint producer that actually puts validated order messages to a JMS queue.

The last processor in the chain (defined inline) finally generates a response that acknowledges the successful receipt of an order message. This processor is of type Message => Message and is called a direct-style processor. Direct-style processors indicate successful message processing by returning a message. Failures are indicated by throwing exceptions. Internally, direct-style message processors are converted to CPS processors that wait for responses in the calling thread. Alternatively, one can also do a conversion to a CPS processor that executes the direct-style processor asynchronously (not shown). Direct-style message processors of type Message => MessageValidation will be supported in upcoming versions.

The placeOrderRoute can now either be applied to messages directly (see below) or it can be connected to an endpoint consumer using the from method which takes an endpoint URI as parameter. In our example, the route is connected to the direct:place-order endpoint. Any message sent to this endpoint (e.g. via a Camel producer template) will be processed by the placeOrderRoute.

Error Handling

The example also defines an error handler with attempt { ... } fallback { ... }. The attempt block defines the route that may fail and the fallback block a partial function that returns alternative routes that should be executed depending on the Exception object contained in the failed message. The example defines error handling for the whole placeOrderRoute but one can also define error handling for route fragments or even single message processors as well. The attempt { ... } fallback { ... } expression itself is of type MessageProcessor and can therefore be composed with other message processors or even be nested.

Order Processing Route

The order processing route receives validated order messages from a JMS queue. It consumes messages from the jms:queue:valid endpoint. Here, the example doesn’t use a separate val for the order processing route, instead, the route is defined inline.

The first processor in the route is a splitter. A splitter can be created with the split method which has a parameter of type Message => Seq[Message]. This is a function that implements the split logic and creates a sequence of messages from a single input message. In our example, we split an order message into n order item messages.

The order item messages are then routed to different destinations depending on the order item details. This is done with a content-based router that can be created with choose. It takes a partial function that matches input messages and returns one of several destinations based on the matching result. In our example, order item messages are routed to different destinations (mock endpoints) based on their order item category values.

If the order item category equals "books" then the order item is additionally transformed to a tuple (before sending it to the mock endpoint). This is done with a custom direct-style message processor named orderItemToTuple (shown above). Finally, all messages (as received by the mock endpoints) are written to stdout by the last processor in the route.

Route Application

The routes are defined, so we can start sending messages. Let’s define a purchase order for two books and a freeride bike.

val order = PurchaseOrder(List(
  PurchaseOrderItem(123, "books", "Camel in Action", 1),
  PurchaseOrderItem(123, "books", "DSLs in Action", 1),
  PurchaseOrderItem(123, "bikes", "Canyon Torque FRX", 1)
))

Producer Template

We can send this purchase order with a Camel producer template to endpoint direct:place-order. The expected answer is that the message has been accepted.

template.requestBody("direct:place-order", order) must equal ("order accepted")

When sending an invalid order (e.g. a simple String) then order validation will fail and the error handler generates a response with a custom error message.

template.requestBody("direct:place-order", "wrong") must equal ("order validation failed")

Failed message exchanges matched by the error handler are automatically marked as handled (hence the producer template won’t throw an exception). To mark a message exchange as failed, even if matched by an error handler, the markFailed processor should be appended to the corresponding error handling route (as done in our example for exceptions other than ValidationException).

The following message bodies arrive at the mock endpoints:

  • mock:books endpoint: (123, “Camel in Action”, 1) and (123, “DSLs in Action”, 1) in any order
  • mock:bikes endpoint: PurchaseOrderItem(123, “bikes”, “Canyon Torque FRX”, 1)

Direct Application

There’s no need for a setting up an endpoint consumer (using from) for applying routes to messages. Routes can also be applied to messages directly as shown in the following.

val orderMessage = Message(order)

// applies placeOrderRoute to orderMessage and waits for the response (process blocks).
placeOrderRoute process orderMessage

// applies placeOrderRoute to orderMessage and returns a response promise 
// (submit does not block, promise.get blocks)
val promise = placeOrderRoute submit orderMessage
val result = promise.get // blocks until result is available

// applies placeOrderRoute to orderMessage and provides a continuation for processing the result
placeOrderRoute apply orderMessage.success { mv => ... }

Application Setup

Camel Trait

To use the scalaz-camel DSL, your application needs to extend the Camel trait and make the following imports.

import scalaz._
import scalaz.camel.core._

class MyApp extends Camel {
  import Scalaz._

  // ...
}

Concurrency

scalaz-camel can be configured with different concurrency strategies for

The default setting for both is Strategy.Sequential which uses the calling thread for execution. In our example, Strategy.Executor is used with a thread pool of size 3 for Camel.dispatchConcurrencyStrategy. This essentially places a queue (that is part of the thread pool) between processors in a route and uses the threads of the pool to route messages from one processor to the next. With Strategy.Sequential on the other hand, one could create routes that run synchronously, provided all participating processors run synchronously as well.

The Camel.multicastConcurrencyStrategy can be ignored in our example because neither multicast not scatter-gather is used for message processing. Otherwise, this concurrency strategy defines whether to perform sequential or parallel multicast or scatter-gather.

Router

Definition of routes requires the implicit definition of a Router instance. A router must to be initialized with a CamelContext instance.

val camelContext: CamelContext = ...
implicit val router = new Router(camelContext)

The router must be started before applying routes to messages.

router.start
Something went wrong with that request. Please try again.