Skip to content

Commit

Permalink
Initial sample and instructions of how to run
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 16, 2013
1 parent 8e28a52 commit 59c0e99
Show file tree
Hide file tree
Showing 11 changed files with 354 additions and 1 deletion.
45 changes: 45 additions & 0 deletions .gitignore
@@ -0,0 +1,45 @@
*#
*.iml
*.ipr
*.iws
*.pyc
*.tm.epoch
*.vim
*/project/boot
*/project/build/target
*/project/project.target.config-classes
*-shim.sbt
*~
.#*
.*.swp
.DS_Store
.cache
.cache
.classpath
.codefellow
.ensime*
.eprj
.history
.idea
.manager
.multi-jvm
.project
.scala_dependencies
.scalastyle
.settings
.tags
.tags_sorted_by_file
.target
.worksheet
Makefile
TAGS
lib_managed
logs
project/boot/*
project/plugins/project
src_managed
target
tm*.lck
tm*.log
tm.out
worker*.log
13 changes: 13 additions & 0 deletions LICENSE
@@ -0,0 +1,13 @@
Copyright 2013 Typesafe, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
2 changes: 1 addition & 1 deletion README.md
@@ -1,4 +1,4 @@
activator-akka-cluster-sharding-scala
=====================================

Activator template for the Akka Cluster Sharding feature
Activator template for the Akka Cluster Sharding feature. See [tutorial](https://github.com/typesafehub/activator-akka-cluster-sharding-scala/blob/master/tutorial/index.html).
4 changes: 4 additions & 0 deletions activator.properties
@@ -0,0 +1,4 @@
name=akka-cluster-sharding-scala
title=Akka Cluster Sharding with Scala!
description=Illustrates sharding of actors in a cluster of Akka nodes.
tags=akka,cluster,scala,sample
10 changes: 10 additions & 0 deletions build.sbt
@@ -0,0 +1,10 @@
name := "akka-cluster-sharding"

version := "0.1"

scalaVersion := "2.10.3"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-contrib" % "2.3-M2"
)

1 change: 1 addition & 0 deletions project/build.properties
@@ -0,0 +1 @@
sbt.version=0.13.1
32 changes: 32 additions & 0 deletions src/main/resources/application.conf
@@ -0,0 +1,32 @@
akka {
loglevel = INFO

actor {
provider = "akka.cluster.ClusterActorRefProvider"
}

remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}

cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]

auto-down-unreachable-after = 10s
}

persistence {
journal.plugin = "akka.persistence.journal.leveldb-shared"
journal.leveldb-shared.store {
native = off
dir = "target/shared-journal"
}
snapshot-store.local.dir = "target/snapshots"
}
}
47 changes: 47 additions & 0 deletions src/main/scala/sample/cluster/counter/Bot.scala
@@ -0,0 +1,47 @@
package sample.cluster.counter

import scala.concurrent.duration._
import akka.actor.Actor
import akka.contrib.pattern.ClusterSharding
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.ActorLogging

object Bot {
case object Tick
}

class Bot extends Actor with ActorLogging {
import Bot._
import context.dispatcher
val tickTask = context.system.scheduler.schedule(3.seconds, 3.seconds, self, Tick)

val region = ClusterSharding(context.system).shardRegion("Counter")
val anotherRegion = ClusterSharding(context.system).shardRegion("AnotherCounter")

def rnd = ThreadLocalRandom.current

override def postStop(): Unit = {
super.postStop()
tickTask.cancel()
}

def receive = {
case Tick =>

val entryId = rnd.nextInt(100)
val operation = rnd.nextInt(5) match {
case 0 => Counter.Get(entryId)
case 1 => Counter.EntryEnvelope(entryId, Counter.Decrement)
case _ => Counter.EntryEnvelope(entryId, Counter.Increment)
}

val r = if (rnd.nextBoolean()) region else anotherRegion
log.info("Sending operation {} to {} with entryId {}", operation,
r.path.name, entryId)
r ! operation

case value: Int =>
log.info("Current value at {} is [{}]", sender.path, value)
}

}
50 changes: 50 additions & 0 deletions src/main/scala/sample/cluster/counter/Counter.scala
@@ -0,0 +1,50 @@
package sample.cluster.counter

import scala.concurrent.duration._
import akka.actor.ReceiveTimeout
import akka.contrib.pattern.ShardRegion
import akka.persistence.EventsourcedProcessor

object Counter {
case object Increment
case object Decrement
case class Get(counterId: Long)
case class EntryEnvelope(id: Long, payload: Any)

case object Stop
case class CounterChanged(delta: Int)

val idExtractor: ShardRegion.IdExtractor = {
case EntryEnvelope(id, payload) => (id.toString, payload)
case msg @ Get(id) => (id.toString, msg)
}

val shardResolver: ShardRegion.ShardResolver = msg => msg match {
case EntryEnvelope(id, _) => (id % 30).toString
case Get(id) => (id % 30).toString
}
}

class Counter extends EventsourcedProcessor {
import Counter._
import ShardRegion.Passivate

context.setReceiveTimeout(120.seconds)

var count = 0

def updateState(event: CounterChanged): Unit =
count += event.delta

override def receiveReplay: Receive = {
case evt: CounterChanged => updateState(evt)
}

override def receiveCommand: Receive = {
case Increment => persist(CounterChanged(+1))(updateState)
case Decrement => persist(CounterChanged(-1))(updateState)
case Get(_) => sender ! count
case ReceiveTimeout => context.parent ! Passivate(stopMessage = Stop)
case Stop => context.stop(self)
}
}
76 changes: 76 additions & 0 deletions src/main/scala/sample/cluster/counter/ShardingApp.scala
@@ -0,0 +1,76 @@
package sample.cluster.counter

import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props
import akka.contrib.pattern.ClusterSharding
import akka.persistence.journal.leveldb.SharedLeveldbStore
import akka.actor.Identify
import akka.actor.ActorPath
import akka.pattern.ask
import akka.util.Timeout
import akka.actor.ActorIdentity
import akka.persistence.journal.leveldb.SharedLeveldbJournal

object ShardingApp {
def main(args: Array[String]): Unit = {
if (args.isEmpty)
startup(Seq("2551", "2552", "0"))
else
startup(args)
}

def startup(ports: Seq[String]): Unit = {
ports foreach { port =>
// Override the configuration of the port
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.load())

// Create an Akka system
val system = ActorSystem("ClusterSystem", config)

startupSharedJournal(system, startStore = (port == "2551"), path =
ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/user/store"))

ClusterSharding(system).start(
typeName = "Counter",
entryProps = Some(Props[Counter]),
idExtractor = Counter.idExtractor,
shardResolver = Counter.shardResolver)

ClusterSharding(system).start(
typeName = "AnotherCounter",
entryProps = Some(Props[Counter]),
idExtractor = Counter.idExtractor,
shardResolver = Counter.shardResolver)

system.actorOf(Props[Bot], "bot")
}

def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = {
// Start the shared journal one one node (don't crash this SPOF)
// This will not be needed with a distributed journal
if (startStore)
system.actorOf(Props[SharedLeveldbStore], "store")
// register the shared journal
import system.dispatcher
implicit val timeout = Timeout(1.minute)
val f = (system.actorSelection(path) ? Identify(None))
f.onSuccess {
case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system)
case _ =>
system.log.error("Shared journal not started at {}", path)
system.shutdown()
}
f.onFailure {
case _ =>
system.log.error("Lookup of shared journal at {} timed out", path)
system.shutdown()
}
}

}

}

75 changes: 75 additions & 0 deletions tutorial/index.html
@@ -0,0 +1,75 @@
<!-- <html> -->
<head>
<title>Akka Cluster Sharding with Scala</title>
</head>

<body>

<div>
<p>
This is a runnable sample of
<a href="http://doc.akka.io/docs/akka/2.3-M2/scala/contrib/cluster-sharding.html" target="_blank">Akka cluster Sharding</a>.
</p>

<p>
Open three terminal windows.
</p>

<p>
In the first terminal window, start the first seed node with the following command:
</p>

<pre><code>
sbt "runMain sample.cluster.counter.ShardingApp 2551"
</code></pre>

<p>
2551 corresponds to the port of the first seed-nodes element in the configuration. In the log
output you see that the cluster node has been started and changed status to 'Up'.
</p>

<p>
In the second terminal window, start the second seed node with the following command:
</p>

<pre><code>
sbt "runMain sample.cluster.counter.ShardingApp 2552"
</code></pre>

<p>
2552 corresponds to the port of the second seed-nodes element in the configuration. In the
log output you see that the cluster node has been started and joins the other seed node and
becomes a member of the cluster. Its status changed to 'Up'.
</p>

<p>
Switch over to the first terminal window and see in the log output that the member joined.
</p>

<p>
You should also be able to see log output of the <code>Bot</code> that generates random
operations (Increment/Decrement/Get) to random <code>Counter</code> entries.
</p>

<p>
Start more nodes with

<pre><code>
sbt "runMain sample.cluster.counter.ShardingApp 0"
</code></pre>

<p>
Now you don't need to specify the port number, 0 means that it will use a random available port.
It joins one of the configured seed nodes. Look at the log output in the different terminal
windows.
</p>

<p>
Note that this sample runs the shared journal on the node with port 2551. This is a
SPOF. A real system would use a distributed journal.
</p>

</div>

</body>
</html>

0 comments on commit 59c0e99

Please sign in to comment.