Skip to content
This repository has been archived by the owner on Feb 18, 2023. It is now read-only.

Readme and scaladoc complete #3

Merged
merged 2 commits into from
Feb 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
138 changes: 137 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,140 @@ Reactive akka ZooKeeper client.

[![Build Status](https://travis-ci.org/AppMinistry/akka-zk.svg?branch=master)](https://travis-ci.org/AppMinistry/akka-zk)

Work in progress.
## Usage

### Dependencies

libraryDependencies ++= Seq(
"uk.co.appministry" %% "akka-zk" % "0.1.0"
)

## Examples

### Creating the client

The `akka-zk` ZooKeeper client uses `akka streams` for delivering watch notifications. It is possible to create the client as a regular actor:

```scala
val actorSystem = ActorSystem("examples")
val zkClient = system.actorOf(Props(new ZkClientActor))
```

However, using this method, access to `watch` events will not be possible. To be able to receive these events, streams have to be used:

```scala
val actorSystem = ActorSystem("examples")
implicit val materializer = ActorMaterializer()
val source = Source.actorPublisher[ZkClientStreamProtocol.StreamResponse](Props(new ZkClientActor))
val zkClient = Flow[ZkClientStreamProtocol.StreamResponse].to(Sink.foreach { message =>
message match {
case m: ZkClientStreamProtocol.ChildChange =>
case m: ZkClientStreamProtocol.DataChange =>
case m: ZkClientStreamProtocol.StateChange =>
}
}).runWith(source)
```

### Client configuration

The client does not have any special configuration needs. All configuration is passed with `ZkRequestProtocol.Connect` message.

### Connecting

```scala
val system = ActorSystem("examples")

val runner = system.actorOf(Props(new Actor {

val zkClient = context.system.actorOf(Props(new ZkClientActor))
context.watch(zkClient)

override def supervisorStrategy = OneForOneStrategy() {
case _: ZkClientConnectionFailedException =>
// ZooKeeper client was unable to connect to the server for `connectionAttempts` times.
// The client is stopped and a new actor has to be created.
Escalate
}

def receive = {
case "connect" =>
zkClient ! ZkRequestProtocol.Connect(connectionString = "10.100.0.21:2181",
connectionAttempts = 5,
sessionTimeout = 30 seconds)
case ZkResponseProtocol.Connected(request) =>
// zkClient is now ready for work
}

}))
runner ! "connect"
```

### Subscribing to and unsubscribing from data / children changes

ZooKeeper client emits three types of events related to ZooKeeper state changes:

- `ZkClientStreamingResponse.StateChange(event: WatchedEventMeta)`: this is a client connection state change event
- `ZkClientStreamingResponse.ChildChange(event: WatchedEventMeta)`: this is a znode children change event
- `ZkClientStreamingResponse.DataChange(event: WatchedEventMeta)`: this is a znode data change event

To receive these events, create the client using the `akka streams Source` method.

The `StateChange` events are automatically delivered, there is no subscription required. However, the `ChildChange` and `DataChange` events
are per `path` thus requiring an explicit subscription. To initialize a subscription:

```scala
def receive = {
case "subscribe" =>
zkClient ! ZkRequestProtocol.SubscribeChildChanges("/some/zookeeper/path")
zkClient ! ZkRequestProtocol.SubscribeDataChanges("/some/other/zookeeper/path")
case ZkResponseProtocol.SubscriptionSuccess(request) =>
request match {
case _: ZkRequestProtocol.SubscribeChildChanges =>
// from now on, the child changes for the requested path will be streaming via the Flow
case _: ZkRequestProtocol.SubscribeDataChanges =>
// from now on, the data changes for the requested path will be streaming via the Flow
}
case "unsubscribe" =>
zkClient ! ZkRequestProtocol.UnsubscribeChildChanges("/some/zookeeper/path")
zkClient ! ZkRequestProtocol.UnsubscribeDataChanges("/some/other/zookeeper/path")
case ZkResponseProtocol.UnsubscriptionSuccess(request) =>
request match {
case _: ZkRequestProtocol.UnsubscribeChildChanges =>
// child change for the requested path will stop streaming via the Flow
case _: ZkRequestProtocol.UnsubscribeDataChanges =>
// data change for the requested path will stop streaming via the Flow
}
}
```

### Handling underlying ZooKeeper errors

Any operation that failed will be wrapped in and returned as `ZkResponseProtocol.OperationError(originalRequest, cause)`. An example:

```scala
def receive = {
case "create-node" =>
zkClient ! ZkRequestProtocol.CreatePersistent("/some/zookeeper/path/for/which/the/parent/does/not/exist")
case ZkResponseProtocol.OperationError(request, cause) =>
request match {
case r: ZkRequestProtocol.CreatePersistent =>
log.error(s"Failed to create znode: ${r.path}. Reason: $cause.")
case _ =>
}
}
```

## License

Author: Rad Gruchalski (radek@gruchalski.com)

This work will be available under Apache License, Version 2.0.

Copyright 2017 Rad Gruchalski (radek@gruchalski.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. You may obtain a copy of the License at

[http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0)

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
39 changes: 27 additions & 12 deletions src/main/scala/uk/co/appministry/akka/zk/ZkClientActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ object PathExistenceStatus {
final case object DoesNotExist extends Status
}

/**
* Convenience ZooKeeper SASL properties.
*/
object SaslProperties {
final val JavaLoginConfigParam = "java.security.auth.login.config"
final val ZkSaslClient = "zookeeper.sasl.client"
Expand Down Expand Up @@ -106,6 +109,9 @@ object ZkClientProtocolDefaults {
val SessionTimeout = 30 seconds
}

/**
* Metric names.
*/
object ZkClientMetricNames {
sealed abstract class MetricName(val name: String)
case object ChildChangePathsObservedCount extends MetricName("child-change-paths-observed-count")
Expand Down Expand Up @@ -448,27 +454,33 @@ object ZkResponseProtocol {

}

/**
* ZooKeeper client streaming protocol.
*/
object ZkClientStreamProtocol {

/**
* A streaming response.
*/
sealed trait StreamResponse

/**
* Subscriber event for child change subscriptions.
* @param event original ZooKeeper event
*/
final case class ChildChange(val event: WatchedEvent) extends StreamResponse
final case class ChildChange(val event: WatchedEventMeta) extends StreamResponse

/**
* Subscriber event for data change subscriptions.
* @param event original ZooKeeper event
*/
final case class DataChange(val event: WatchedEvent) extends StreamResponse
final case class DataChange(val event: WatchedEventMeta) extends StreamResponse

/**
* Issued to the parent of the ZkClient when the client connection state changes.
* @param event the original request
*/
final case class StateChange(val event: WatchedEvent) extends StreamResponse
final case class StateChange(val event: WatchedEventMeta) extends StreamResponse

}

Expand Down Expand Up @@ -496,27 +508,27 @@ object ZkInternalProtocol {
* A ZooKeeper state change event is ready for processing.
* @param event the state event
*/
private[zk] final case class ZkProcessStateChange(val event: WatchedEvent)
private[zk] final case class ZkProcessStateChange(val event: WatchedEventMeta)

/**
* A ZooKeeper child change event is ready for processing.
* @param event the data or child event
*/
private[zk] final case class ZkProcessChildChange(val event: WatchedEvent)
private[zk] final case class ZkProcessChildChange(val event: WatchedEventMeta)

/**
* A ZooKeeper node data change event is ready for processing.
* @param event the data or child event
*/
private[zk] final case class ZkProcessDataChange(val event: WatchedEvent)
private[zk] final case class ZkProcessDataChange(val event: WatchedEventMeta)
}

/**
* ZooKeeper client state representation.
* @param currentAttempt current connection attempt
* @param connectRequest maximum number of connection attempts
* @param requestor [[akka.actor.ActorRef]] of the actor requesting the connection
* @param connection underlaying ZooKeeper connection
* @param connection underlying ZooKeeper connection
* @param serializer serializer used for reading and writing the data
*/
case class ZkClientState(val currentAttempt: Int,
Expand All @@ -530,9 +542,12 @@ case class ZkClientState(val currentAttempt: Int,
/**
* Akka ZooKeeper client.
*
* TODO: usage...
* TODO: Intead of using ActorPublisher, consider using the [[org.reactivestreams.Publisher]].
* According to the Akka docs, ActorPublisher may be removed in the future.
* <p>
* Reactive ZooKeeper client.<br/>
* Check <a href="https://github.com/AppMinistry/akka-zk">https://github.com/AppMinistry/akka-zk</a> for details.
* <p>
* TODO: Intead of using ActorPublisher, consider using the [[org.reactivestreams.Publisher]].<br/>
* According to the Akka docs, ActorPublisher may be removed in the future.
*/
class ZkClientActor extends Actor with ActorPublisher[ZkClientStreamProtocol.StreamResponse] with ActorLogging with ZkClientWatcher {

Expand Down Expand Up @@ -632,7 +647,7 @@ class ZkClientActor extends Actor with ActorPublisher[ZkClientStreamProtocol.Str
streamMaybeProduce(ZkClientStreamProtocol.StateChange(event))

case ZkInternalProtocol.ZkProcessDataChange(event) =>
Option(event.getPath) match {
Option(event.underlying.getPath) match {
case Some(path) =>
if (state.dataSubscriptions.contains(path)) {
streamMaybeProduce(ZkClientStreamProtocol.DataChange(event))
Expand All @@ -642,7 +657,7 @@ class ZkClientActor extends Actor with ActorPublisher[ZkClientStreamProtocol.Str
}

case ZkInternalProtocol.ZkProcessChildChange(event) =>
Option(event.getPath) match {
Option(event.underlying.getPath) match {
case Some(path) =>
if ( state.childSubscriptions.contains(path) ) {
streamMaybeProduce(ZkClientStreamProtocol.ChildChange(event))
Expand Down
26 changes: 17 additions & 9 deletions src/main/scala/uk/co/appministry/akka/zk/ZkClientWatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package uk.co.appministry.akka.zk
import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
import org.apache.zookeeper.{WatchedEvent, Watcher}

case class EventMetadata(val event: WatchedEvent) {
/**
* A rich wrapper for the [[org.apache.zookeeper.WatchedEvent]]
* @param underlying original event
*/
case class WatchedEventMeta(val underlying: WatchedEvent) {

val dataChangeTriggeringEvents = List(
EventType.NodeDataChanged,
Expand All @@ -15,10 +19,10 @@ case class EventMetadata(val event: WatchedEvent) {
EventType.NodeCreated,
EventType.NodeDeleted )

lazy val stateChanged = Option(event.getPath) == None
lazy val znodeChanged = Option(event.getPath) != None
lazy val dataChanged = dataChangeTriggeringEvents.contains(event.getType)
lazy val childrenChanged = childChangeTriggeringEvents.contains(event.getType)
lazy val stateChanged = Option(underlying.getPath) == None
lazy val znodeChanged = Option(underlying.getPath) != None
lazy val dataChanged = dataChangeTriggeringEvents.contains(underlying.getType)
lazy val childrenChanged = childChangeTriggeringEvents.contains(underlying.getType)
}

/**
Expand All @@ -30,9 +34,13 @@ trait ZkClientWatcher extends Watcher { this: ZkClientActor =>

private var currentState = KeeperState.Disconnected

/**
* Process an incoming [[org.apache.zookeeper.WatchedEvent]].
* @param event event to process
*/
override def process(event: WatchedEvent): Unit = {

val meta = EventMetadata(event)
val meta = WatchedEventMeta(event)

if (currentState != event.getState && event.getState == KeeperState.SyncConnected) {
self ! ZkInternalProtocol.ZkConnectionSuccessful()
Expand All @@ -41,15 +49,15 @@ trait ZkClientWatcher extends Watcher { this: ZkClientActor =>
currentState = event.getState

if (meta.stateChanged) {
self ! ZkInternalProtocol.ZkProcessStateChange(event)
self ! ZkInternalProtocol.ZkProcessStateChange(meta)
}

if (meta.dataChanged) {
self ! ZkInternalProtocol.ZkProcessDataChange(event)
self ! ZkInternalProtocol.ZkProcessDataChange(meta)
}

if (meta.childrenChanged) {
self ! ZkInternalProtocol.ZkProcessChildChange(event)
self ! ZkInternalProtocol.ZkProcessChildChange(meta)
}
}

Expand Down
32 changes: 32 additions & 0 deletions src/main/scala/uk/co/appministry/akka/zk/ZkSerializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,42 @@ package uk.co.appministry.akka.zk

import java.io._

/**
* ZooKeeper data serialization error.
* @param cause original error
*/
case class ZkSerializerMarshallingError(val cause: Throwable) extends Exception(cause)

/**
* ZooKeeper data serializer.
*/
trait ZkSerializer {

/**
* Deserializer bytes to an object.
* @param bytes byte array
* @return an object
*/
def deserialize(bytes: Array[Byte]): Any

/**
* Serialize an object to a byte array.
* @param serializable an object
* @return a byte array
*/
def serialize(serializable: Any): Array[Byte]
}

/**
* Simple ZooKeeper client serializer.
*/
case class SimpleSerializer() extends ZkSerializer {

/**
* Deserializer bytes to an object.
* @param bytes byte array
* @return an object
*/
override def deserialize(bytes: Array[Byte]): Any = {
try {
val inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes))
Expand All @@ -20,6 +47,11 @@ case class SimpleSerializer() extends ZkSerializer {
}
}

/**
* Serialize an object to a byte array.
* @param serializable an object
* @return a byte array
*/
override def serialize(serializable: Any): Array[Byte] = {
try {
val byteArrayOutputStream = new ByteArrayOutputStream()
Expand Down