Permalink
Browse files

Updated the codebase against Scala 2.10, Akka 2.1 and Specs2

  • Loading branch information...
1 parent d6280b0 commit 29906965dff630a24b2130cfb29c45d824df0ef4 @janm janm committed Nov 16, 2012
View
@@ -1,4 +1,4 @@
-
/.idea
*.iml
-/target
+/target
+project/target
View
@@ -0,0 +1,22 @@
+name := "amqp-client"
+
+version := "1.0"
+
+scalaVersion := "2.10.0-RC2"
+
+scalacOptions += "-feature"
+
+resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
+
+resolvers += "repo.codahale.com" at "http://repo.codahale.com"
+
+libraryDependencies <<= scalaVersion { scala_version =>
+ val akkaVersion = "2.1.0-RC2"
+ Seq(
+ "com.typesafe.akka" % "akka-kernel" % akkaVersion cross CrossVersion.full,
+ "com.typesafe.akka" % "akka-actor" % akkaVersion cross CrossVersion.full,
+ "com.rabbitmq" % "amqp-client" % "2.8.1",
+ "com.typesafe.akka" % "akka-testkit" % akkaVersion % "test" cross CrossVersion.full,
+ "org.specs2" % "specs2" % "1.12.2" % "test" cross CrossVersion.full
+ )
+}
View
176 pom.xml
@@ -1,176 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.aphelia</groupId>
- <artifactId>amqp-client_${scala.version}</artifactId>
- <version>1.1-SNAPSHOT</version>
- <packaging>jar</packaging>
- <name>amqp-client</name>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <scala.version>2.9.2</scala.version>
- <akka.version>2.0.3</akka.version>
- </properties>
-
- <licenses>
- <license>
- <url>https://raw.github.com/sstone/amqp-client/master/LICENCE.txt</url>
- <name>The MIT License</name>
- <distribution>repo</distribution>
- </license>
- </licenses>
-
- <reporting>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.0</version>
- </plugin>
- </plugins>
- </reporting>
- <build>
- <sourceDirectory>src/main/scala</sourceDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.3.2</version>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-site-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <reportPlugins>
- <plugin>
- <artifactId>maven-project-info-reports-plugin</artifactId>
- <version>2.2</version>
- </plugin>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.0</version>
- <configuration>
- <jvmArgs>
- <jvmArg>-Xms64m</jvmArg>
- <jvmArg>-Xmx1024m</jvmArg>
- </jvmArgs>
- </configuration>
- </plugin>
- </reportPlugins>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.1.2</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.0</version>
- <executions>
- <execution>
- <id>process-resources</id>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.10</version>
- <configuration>
- <useFile>false</useFile>
- <disableXmlReport>true</disableXmlReport>
- <!-- If you have classpath issue like NoDefClassError,... -->
- <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
- <includes>
- <include>**/*Spec.*</include>
- <include>**/*Test.*</include>
- <include>**/*Suite.*</include>
- </includes>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <repositories>
- <repository>
- <id>typesafe</id>
- <name>Typesafe Repository</name>
- <url>http://repo.typesafe.com/typesafe/releases/</url>
- </repository>
- <repository>
- <id>typesafe snapshots</id>
- <name>Typesafe snapshots Repository</name>
- <url>http://repo.typesafe.com/typesafe/snapshots/</url>
- </repository>
- </repositories>
-
- <dependencies>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>2.8.7</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor</artifactId>
- <version>${akka.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-slf4j</artifactId>
- <version>${akka.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>1.0.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-testkit</artifactId>
- <version>${akka.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.9.2</artifactId>
- <version>1.8</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.8.2</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
View
@@ -0,0 +1,5 @@
+resolvers += "Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/"
+
+addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0-SNAPSHOT")
+
+addSbtPlugin("com.github.gseitz" % "sbt-release" % "0.6")
@@ -1,9 +1,9 @@
package com.aphelia.amqp
+import scala.language.postfixOps
+
import akka.pattern.ask
-import akka.util.duration._
import com.rabbitmq.client.ConnectionFactory
-import akka.dispatch.Await
import com.aphelia.amqp.RpcClient.{Response, Request}
import akka.actor._
import akka.actor.FSM.{Transition, SubscribeTransitionCallBack}
@@ -13,7 +13,9 @@ import akka.util.Timeout
import com.rabbitmq.client.AMQP.{BasicProperties, Queue}
import akka.actor.Status.Failure
import com.aphelia.amqp.RpcServer.ProcessResult
-
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
object App {
@@ -26,7 +28,7 @@ object App {
waitForConnection(system, conn).await()
val c = ConnectionOwner.createActor(conn, Props(new ChannelOwner()))
waitForConnection(system, c).await()
- implicit val timeout = Timeout(2 seconds)
+ implicit val timeout = Timeout(2.seconds)
val check = Await.result(c.ask(DeclareQueue(QueueParameters("no_such_queue", passive = true))), timeout.duration)
check match {
case Amqp.Error(_, cause) => throw cause
@@ -56,7 +58,7 @@ object App {
}
}))
// create a consumer that will pass all messages to the foo Actor; the consumer will declare the bindings
- val consumer = ConnectionOwner.createActor(conn, Props(new Consumer(List(Binding(exchange, queue, "my_key", autoack = false)), foo)), 5000 millis)
+ val consumer = ConnectionOwner.createActor(conn, Props(new Consumer(List(Binding(exchange, queue, "my_key", autoack = false)), foo)), 5000.millis)
val producer = ConnectionOwner.createActor(conn, Props(new ChannelOwner()))
waitForConnection(system, consumer, producer).await()
producer ! Publish("amq.direct", "my_key", "yo!".getBytes, Some(new BasicProperties.Builder().contentType("my content").build()))
@@ -89,7 +91,7 @@ object App {
}
}))
// create a consumer that will pass all messages to the foo Actor; the consumer will declare the bindings
- val consumer = ConnectionOwner.createActor(conn, Props(new Consumer(List(Binding(exchange, queue, "my_key", autoack = false)), foo)), 5000 millis)
+ val consumer = ConnectionOwner.createActor(conn, Props(new Consumer(List(Binding(exchange, queue, "my_key", autoack = false)), foo)), 5000.millis)
val producer = ConnectionOwner.createActor(conn, Props(new ChannelOwner()))
waitForConnection(system, consumer, producer).await()
producer ! Publish("amq.direct", "my_key", "yo!".getBytes)
@@ -149,15 +151,15 @@ object App {
// which means that the broker will generate a unique, random name when the queue is declared
val queue = QueueParameters(name = "", passive = false, exclusive = true)
// create 2 servers, each with its own private queue bound to the same key
- val server1 = ConnectionOwner.createActor(conn, Props(new RpcServer(queue, exchange, "my_key", proc)), 2000 millis)
- val server2 = ConnectionOwner.createActor(conn, Props(new RpcServer(queue, exchange, "my_key", proc)), 2000 millis)
- val client = ConnectionOwner.createActor(conn, Props(new RpcClient()), 2000 millis)
+ val server1 = ConnectionOwner.createActor(conn, Props(new RpcServer(queue, exchange, "my_key", proc)), 2000.millis)
+ val server2 = ConnectionOwner.createActor(conn, Props(new RpcServer(queue, exchange, "my_key", proc)), 2000.millis)
+ val client = ConnectionOwner.createActor(conn, Props(new RpcClient()), 2000.millis)
waitForConnection(system, server1, server2, client).await()
for (i <- 0 to 10) {
try {
// send one request and wait for 2 responses
- val future = client.ask(Request(Publish("amq.direct", "my_key", "client1".getBytes) :: Nil, 2))(1000 millis)
- val result = Await.result(future, 1000 millis).asInstanceOf[Response]
+ val future = client.ask(Request(Publish("amq.direct", "my_key", "client1".getBytes) :: Nil, 2))(1000.millis)
+ val result = Await.result(future, 1000.millis).asInstanceOf[Response]
println("result : " + result)
Thread.sleep(100)
}
@@ -215,17 +217,17 @@ object App {
ExchangeParameters(exchange, passive = true, exchangeType = "direct"),
key,
proc)
- ), 2000 millis
+ ), 2000.millis
)
while (true) {
Thread.sleep(100)
}
}
else {
- val client = ConnectionOwner.createActor(conn, Props(new RpcClient()), 2000 millis)
+ val client = ConnectionOwner.createActor(conn, Props(new RpcClient()), 2000.millis)
waitForConnection(system, client)
- val future = client.ask(Request(Publish(exchange, key, message.getBytes) :: Nil, numberOfResponse))(1000 millis)
- val result = Await.result(future, 1000 millis).asInstanceOf[Response]
+ val future = client.ask(Request(Publish(exchange, key, message.getBytes) :: Nil, numberOfResponse))(1000.millis)
+ val result = Await.result(future, 1000.millis).asInstanceOf[Response]
println("result " + result)
}
system.shutdown()
@@ -1,19 +1,26 @@
package com.aphelia.amqp
-import akka.util.duration._
+import scala.language.postfixOps
+
import java.io.IOException
import com.aphelia.amqp.ConnectionOwner._
-import akka.util.Duration
import com.rabbitmq.client.{Connection, ShutdownSignalException, ShutdownListener, ConnectionFactory}
import akka.actor._
-import akka.dispatch.Await
import akka.util.Timeout._
import akka.pattern.ask
-import com.aphelia.amqp.Amqp._
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
+import scala.concurrent.Await
+import akka.util.Timeout
+import com.aphelia.amqp.Amqp.{QueueParameters, ChannelParameters, Binding, ExchangeParameters}
import com.aphelia.amqp.ConnectionOwner.CreateChannel
+import com.aphelia.amqp.Amqp.ChannelParameters
import com.aphelia.amqp.ConnectionOwner.Create
import scala.Some
+import com.aphelia.amqp.Amqp.ExchangeParameters
import com.aphelia.amqp.ConnectionOwner.Shutdown
+import com.aphelia.amqp.Amqp.Binding
+import com.aphelia.amqp.Amqp.QueueParameters
object ConnectionOwner {
@@ -46,17 +53,17 @@ object ConnectionOwner {
* @deprecated use createChildActor instead
*/
@Deprecated
- def createActor(conn: ActorRef, props: Props, name: Option[String] = None, timeout: Duration = 5000 millis): ActorRef = {
+ def createActor(conn: ActorRef, props: Props, name: Option[String] = None, timeout: Timeout = 5000.millis): ActorRef = {
val future = conn.ask(Create(props, name))(timeout).mapTo[ActorRef]
- Await.result(future, timeout)
+ Await.result(future, timeout.duration)
}
@Deprecated
- def createActor(conn: ActorRef, props: Props, timeout: Duration): ActorRef = createActor(conn, props, None, timeout)
+ def createActor(conn: ActorRef, props: Props, timeout: Timeout): ActorRef = createActor(conn, props, None, timeout)
- def createChildActor(conn: ActorRef, channelOwner: Props, name: Option[String] = None, timeout: Duration = 5000 millis): ActorRef = {
+ def createChildActor(conn: ActorRef, channelOwner: Props, name: Option[String] = None, timeout: Timeout = 5000.millis): ActorRef = {
val future = conn.ask(Create(channelOwner, name))(timeout).mapTo[ActorRef]
- Await.result(future, timeout)
+ Await.result(future, timeout.duration)
}
@@ -96,20 +103,20 @@ object ConnectionOwner {
* @param reconnectionDelay
* @param system
*/
-class RabbitMQConnection(host: String = "localhost", port: Int = 5672, vhost: String = "/", user: String = "guest", password: String = "guest", name: String, reconnectionDelay: Duration = 10000 millis, system: ActorSystem = ActorSystem("amqp-system")) {
+class RabbitMQConnection(host: String = "localhost", port: Int = 5672, vhost: String = "/", user: String = "guest", password: String = "guest", name: String, reconnectionDelay: FiniteDuration = 10000.millis, system: ActorSystem = ActorSystem("amqp-system")) {
lazy val owner = system.actorOf(Props(new ConnectionOwner(buildConnFactory(host = host, port = port, vhost = vhost, user = user, password = password), reconnectionDelay)), name = name)
def start = {
- waitForConnection(system, owner).await()
+ Amqp.waitForConnection(system, owner).await()
this
}
def stop = system.stop(owner)
- def createChild(props: Props, name: Option[String] = None, timeout: Duration = 5000 millis): ActorRef = {
+ def createChild(props: Props, name: Option[String] = None, timeout: Timeout = 5000.millis): ActorRef = {
val future = owner.ask(Create(props, name))(timeout).mapTo[ActorRef]
- Await.result(future, timeout)
+ Await.result(future, timeout.duration)
}
def createRpcServer(bindings: List[Binding], processor: RpcServer.IProcessor, channelParams: Option[ChannelParameters]) = {
@@ -144,7 +151,7 @@ class RabbitMQConnection(host: String = "localhost", port: Int = 5672, vhost: St
* @param connFactory connection factory
* @param reconnectionDelay delay between reconnection attempts
*/
-class ConnectionOwner(connFactory: ConnectionFactory, reconnectionDelay: Duration = 10000 millis) extends Actor with FSM[State, Data] {
+class ConnectionOwner(connFactory: ConnectionFactory, reconnectionDelay: FiniteDuration = 10000.millis) extends Actor with FSM[State, Data] {
startWith(Disconnected, Uninitialized)
Oops, something went wrong.

1 comment on commit 2990696

sstone commented on 2990696 Nov 16, 2012

Thanks! since there are projects using amqp-client which will probably not switch to 2.10 right away I'll probably create a scala2.10 branch and merge into it and keep master on 2.9.2 for some time.

Please sign in to comment.