Skip to content
This repository has been archived by the owner on Jul 9, 2022. It is now read-only.

OrderProcessing sample

olegz edited this page Jun 5, 2012 · 39 revisions

This example demonstrates a simple flow for imaginary Order Processing system You can see the flow diagram below.

Order Processing

(If image doesn't show follow this link: http://i1205.photobucket.com/albums/bb431/z_oleg/Screenshot2011-06-23at94026AM.png)

User submits the order via MessagingGateway which needs to be:

  1. validated (proceed or fail)
  2. split if validation is successful (basically separate 'bikes' order from 'books' order)
  3. route order to specific processors ('bikeProcessor' vs 'bookProcessor')
  4. aggregate processing results and send reply back to the user

With Scala DSL we don't need to define an explicit Gateway since we already have a reference to the integration flow:

val errorFlow = handle { m: Message[_] => println("Received ERROR: " + m); "ERROR processing order" }

val aggregateOrder = aggregate()

val processBikeOrder =
      handle { m: Message[_] => println("Processing bikes order: " + m); m } -->
      aggregateOrder

val orderProcessingFlow =
      filter { p: PurchaseOrder => !p.items.isEmpty }.where(exceptionOnRejection = true) -->
      split { p: PurchaseOrder => p.items } -->
      Channel.withDispatcher(taskExecutor = Executors.newCachedThreadPool) -->
      route { pi: PurchaseOrderItem => pi.itemType }(
          when("books") then
            handle { m: Message[_] => println("Processing books order: " + m); m } -->
            aggregateOrder,
          when("bikes") then
            processBikeOrder
      )

As you can see form above, whenever the EIP endpoint required to perform some custom logic it is provided via apply(..) method where you can use Scala function (as in the above example). Also note that it is not required to explicitly define channels to connect endpoints if such connectivity is relying on default channel (synchronous point-to-point channel). Spring Integration Scala DSL will simply auto-create one, thus greatly simplifying the configuration. So you can see that , filter and splitter do not have an explicit channel configuration connecting them. However you DO see a channel definition that connects splitter and router. We need to define such channel explicitly since we want the message exchange between splitter and router to be asynchronous, thus we have configured this channel with Executor (default Thread executor: Executors.newCachedThreadPool)

You also see that routed Message flows are later aggregated and since the Aggregator is the last component in the Message flow but does not define an output Channel the reply is sent to an implicit reply channel provided by the sendAndRecieve method.

Now we simply interact with this Message flow by sending it valid or invalid order:

    val validOrderResult = orderProcessingFlow.sendAndReceive[Any](validOrder, errorFlow = errorFlow)

... and print a reply

    println("Reply: " + validOrderResult)

You'll see the log statements showing order processing by the order processors

Processing books order: [Payload=PurchaseOrderItem(books,Spring Integration in Action)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=1, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891276, id=c2219cb0-1999-42ed-a0e2-963a4bc7401c, sequenceSize=3}]
Processing bikes order: [Payload=PurchaseOrderItem(bikes,Canyon Torque FRX)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=3, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891277, id=f098e556-04d0-4839-b548-0d8cbe0ff058, sequenceSize=3}]
Processing books order: [Payload=PurchaseOrderItem(books,DSLs in Action)][Headers={errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, sequenceNumber=2, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@75d3d7, correlationId=ffa0feac-8e0c-4607-a7fd-59186cc1a4ec, timestamp=1308839891277, id=a7be1699-6bcb-4a1b-bfea-13b158dd29ab, sequenceSize=3}]

.. and then a reply after a short delay

Reply: [PurchaseOrderItem(bikes,Canyon Torque FRX), PurchaseOrderItem(books,Spring Integration in Action), PurchaseOrderItem(books,DSLs in Action)]

That is pretty much it.

You can also uncomment the call with the 'invalidOrder' and see how Message errors are handled (see comment inside the code).

The full source code for this sample available here: OrderProcessingApp.scala


[Back to Getting Started] (https://github.com/SpringSource/spring-integration-dsl-scala/wiki/Getting-Started)