Permalink
Browse files

Distributed application with Akka Clustering

  • Loading branch information...
paoloambrosio committed Nov 27, 2016
1 parent 5735a10 commit 9bd3fe14d31874621ce3158daf3e559e2f8ad6b6
View
@@ -9,16 +9,16 @@ It has no tests, but on the other side there isn't much code either!
Run the application locally (each command on different terminal):
```sh
REMOTE_PORT=2551 sbt backend/run
-REMOTE_PORT=2552 sbt backend/run
-REMOTE_PORT=2553 BACKEND_NODES=127.0.0.1:2551,127.0.0.1:2552 sbt frontend/run
+REMOTE_PORT=2552 CLUSTER_SEEDS=127.0.0.1:2551 sbt backend/run
+REMOTE_PORT=2553 CLUSTER_SEEDS=127.0.0.1:2551,127.0.0.1:2552 sbt frontend/run
curl http://localhost:9000/greet/World
```
Run it on Kubernetes (Minikube):
```sh
eval $(minikube docker-env)
sbt docker:publishLocal
-kubectl create -f kubernetes/backend-service.yaml
+kubectl create -f kubernetes/cluster-service.yaml
kubectl create -f kubernetes/backend-deployment.yaml
kubectl create -f kubernetes/frontend-service.yaml
kubectl create -f kubernetes/frontend-deployment.yaml
View
@@ -10,10 +10,6 @@ lazy val commonSettings = buildSettings ++ Seq(
lazy val dockerSettings = Seq(
dockerExposedPorts := Seq(Docker.akkaTcpPort),
- bashScriptExtraDefines := Seq(Docker.bashExports)
-)
-
-lazy val frontendDockerSettings = dockerSettings ++ Seq(
dockerCommands := Seq(
Cmd("FROM", "java:latest"),
Cmd("USER", "root"),
@@ -25,7 +21,7 @@ lazy val frontendDockerSettings = dockerSettings ++ Seq(
case Cmd("FROM", _) => true
case _ => false
},
- bashScriptExtraDefines := Seq(Docker.frontendBashExports)
+ bashScriptExtraDefines := Seq(Docker.bashExports)
)
lazy val root = (project in file("."))
@@ -41,7 +37,7 @@ lazy val frontend = project
.settings(libraryDependencies ++= akkaHttp)
.enablePlugins(JavaAppPackaging)
.enablePlugins(DockerPlugin)
- .settings(frontendDockerSettings)
+ .settings(dockerSettings)
.dependsOn(`backend-api`, common)
lazy val backend = project
@@ -1,7 +1,7 @@
akka {
actor {
- provider = remote
+ provider = cluster
}
remote {
@@ -11,6 +11,12 @@ akka {
port = ${app.remote.port}
}
}
+
+ cluster {
+ // seed-nodes = [ ... ]
+ metrics.enabled = off
+ auto-down-unreachable-after = 5s
+ }
}
app {
@@ -20,4 +26,7 @@ app {
port = 0
port = ${?REMOTE_PORT}
}
+ cluster {
+ seeds = ${?CLUSTER_SEEDS}
+ }
}
@@ -1,19 +1,23 @@
package com.example
-import com.typesafe.config.Config
+import com.typesafe.config.{Config, ConfigValueFactory}
-trait BackendPathsConfig {
+import scala.collection.JavaConversions._
+
+trait ClusterSeedNodesConfig {
def actorSystemName: String
def config: Config
- def backendPaths() = remoteNodesAddress.map(node => s"$node/user/backend")
+ def configWithSeedNodes: Config = {
+ config.withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seedNodes))
+ }
- private def remoteNodesAddress: List[String] = {
+ private def seedNodes: List[String] = {
val remoteInterface = config.getString("app.remote.interface")
val remotePort = config.getInt("app.remote.port")
- if (config.hasPath("app.backend.nodes")) {
- config.getString("app.backend.nodes")
+ if (config.hasPath("app.cluster.seeds")) {
+ config.getString("app.cluster.seeds")
.split(',').toList
.map(toAkkaNodeRef(remotePort))
.flatten
@@ -4,10 +4,10 @@ import akka.actor.ActorSystem
import akka.event.LoggingAdapter
import com.typesafe.config.ConfigFactory
-trait ExampleApp {
+trait ExampleApp extends ClusterSeedNodesConfig {
val actorSystemName = "example"
val config = ConfigFactory.load()
- implicit val system = ActorSystem(actorSystemName, config)
+ implicit val system = ActorSystem(actorSystemName, configWithSeedNodes)
val log: LoggingAdapter = system.log
}
@@ -1,3 +1,18 @@
+akka {
+ actor {
+ deployment {
+ /backend {
+ router = round-robin-group
+ routees.paths = ["/user/backend"]
+ cluster {
+ enabled = on
+ allow-local-routees = off
+ }
+ }
+ }
+ }
+}
+
app {
http {
interface = "0.0.0.0"
@@ -1,19 +1,19 @@
package com.example.frontend
import akka.actor.ActorRef
-import akka.routing.RoundRobinGroup
+import akka.routing.FromConfig
import akka.util.Timeout
-import com.example.{BackendPathsConfig, ExampleApp}
+import com.example.ExampleApp
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
-object FrontendApp extends App with ExampleApp with BackendPathsConfig
+object FrontendApp extends App with ExampleApp
with HttpServerStartup with FrontendRestApi {
override implicit val executionContext: ExecutionContext = system.dispatcher
- override val backend: ActorRef = system.actorOf(RoundRobinGroup(backendPaths()).props(), "backend")
+ override val backend: ActorRef = system.actorOf(FromConfig().props(), "backend")
override implicit val backendTimeout: Timeout = 5 seconds
val interface = config.getString("app.http.interface")
@@ -13,3 +13,6 @@ spec:
containers:
- name: backend
image: backend:0.1
+ env:
+ - name: CLUSTER_DISCOVERY_SERVICE
+ value: cluster-service
@@ -1,11 +1,11 @@
apiVersion: v1
kind: Service
metadata:
- name: backend-service
+ name: cluster-service
spec:
type: ClusterIP
clusterIP: None
selector:
- app: backend
+ cluster: example
ports:
- port: 2551
@@ -8,12 +8,13 @@ spec:
metadata:
labels:
app: frontend
+ cluster: example
spec:
containers:
- name: frontend
image: frontend:0.1
ports:
- containerPort: 9000
env:
- - name: BACKEND_DISCOVERY_SERVICE
- value: backend-service
+ - name: CLUSTER_DISCOVERY_SERVICE
+ value: cluster-service
View
@@ -16,7 +16,8 @@ object Dependencies {
val akka = Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
- "com.typesafe.akka" %% "akka-remote" % akkaV
+ "com.typesafe.akka" %% "akka-remote" % akkaV,
+ "com.typesafe.akka" %% "akka-cluster" % akkaV
)
val akkaHttp = Seq(
"com.typesafe.akka" %% "akka-http-experimental" % akkaHttpV,
View
@@ -3,30 +3,30 @@ object Docker {
val akkaTcpPort = 2551
val bashExports = """
-export REMOTE_INTERFACE=$(hostname --ip-address)
-
-# Configure the default remote port unless passed
-if [ -z "$REMOTE_PORT" ]; then
- export REMOTE_PORT=""" + akkaTcpPort + """
-fi
-"""
+MAX_CLUSTER_SEEDS=3
- val frontendBashExports = """
-# Configure backend nodes from service if passed
-# otherwise keep them as they are
-if [ ! -z "$BACKEND_DISCOVERY_SERVICE" ]; then
- BACKEND_NODES=$(
- host -t A $BACKEND_DISCOVERY_SERVICE | \
+# Configure cluster seeds from service if passed
+# otherwise keep the seeds as they are
+if [ ! -z "$CLUSTER_DISCOVERY_SERVICE" ]; then
+ CLUSTER_SEEDS=$(
+ host -t A $CLUSTER_DISCOVERY_SERVICE | \
grep 'has address' |
+ head -n $MAX_CLUSTER_SEEDS |
cut -d ' ' -f 4 |
xargs |
sed -e 's/ /,/g'
)
- if [ ! -z "$BACKEND_NODES" ]; then
- export BACKEND_NODES
+ if [ ! -z "$CLUSTER_SEEDS" ]; then
+ export CLUSTER_SEEDS
fi
fi
-""" + bashExports
+export REMOTE_INTERFACE=$(hostname --ip-address)
+
+# Configure the default cluster port unless passed
+if [ -z "$REMOTE_PORT" ]; then
+ export REMOTE_PORT=""" + akkaTcpPort + """
+fi
+"""
}

0 comments on commit 9bd3fe1

Please sign in to comment.