Skip to content

Tecsisa/reactive-cassandra-phantom

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

36 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

This project has been discontinued as this feature has been roughly included in the phantom project itself.

Reactive Cassandra (with Phantom DSL)

Build Status Coverage Status

A reactive streams compatible subscriber for Cassandra. This is a phantom based effort and it's strongly influenced by elastic4s.

The idea behind this library is to provide a reactive streams subscriber (and maybe a publisher later) for working with Cassandra database and, at the same time, taking advantage of phantom dsl to build CQL3 statements idiomatically with Scala. Every element that downstreams to this subscriber can be turn into a new statement that will be added to a batch query.

This library leverages the typeclass pattern in order to keep separate both the streaming logic and the query statements itself that remain under the responsibility of programmers. An implicit class enables Phantom CassandraTable to be enhanced with streaming capabilities so that integration with phantom legacy code is simple and straightforward.

The library is published on Tecsisa's bintray and it's currently on version 0.0.8.

Usage

In order to use this library, just add the Tecsisa bintray repository to the resolvers list in your build.sbt file:

  resolvers += Resolver.url("bintray-tecsisa-repo",
                                     url("http://dl.bintray.com/tecsisa/maven-bintray-repo"))

Or if you're using the bintray-sbt plugin:

  Resolver.bintrayRepo("tecsisa", "maven-bintray-repo")

And then, just import the dependency:

  libraryDependencies += "com.tecsisa" %% "reactive-cassandra-phantom" % "0.0.8"

Examples of use

Let's suppose that we're requesting a list of Wagner's operas via a reactive stream. In this case, the element streamed could be modeled as:

  case class Opera(name: String)

Following phantom's dsl we could model a Cassandra table to persist all this data as:

  abstract class OperaTable extends CassandraTable[OperaTable, Opera] with SimpleCassandraConnector {
  object name extends StringColumn(this) with PartitionKey[String]
  def fromRow(row: Row): Opera = {
    Opera(name(row))
  }
  def count(): Future[Option[Long]] = {
    select.count().one()
  }
}

Please, refer to phantom's documentation for further explanation. Of course, you'll also need a Cassandra connector in place:

private object Defaults {
  def getDefaultConnector(host: String, port: Int, keySpace: String): KeySpaceDef = {
    ContactPoints(Seq(host), port).keySpace(keySpace)
  }
}

trait SimpleCassandraConnector {

  private[this] lazy val connector = Defaults.getDefaultConnector(host, port, keySpace.name)

  def host: String = "localhost"

  def port: Int = 9142

  implicit def keySpace: KeySpace

  implicit lazy val session: Session = connector.session

  def cassandraVersion: VersionNumber = connector.cassandraVersion

  def cassandraVersions: Set[VersionNumber] = connector.cassandraVersions
}

And some data:

  object OperaData {

  // Yes, you know... I like Wagner a lot :-)
  val operas = Array(
    Opera("Das Rheingold"),
    Opera("Die Walküre"),
    Opera("Die Sieger"),
    Opera("Tristan und Isolde"),
    Opera("Die Meistersinger von Nürnberg"),
    Opera("Luthers Hochzeit"),
    Opera("Siegfried"),
    Opera("Götterdämmerung"),
    Opera("Eine Kapitulation"),
    Opera("Parsifal"),
    ........
  )
}

So far, all this is phantom stuff. Now, you'll need to bring some implicits to scope:

  com.tecsisa.streams.cassandra.ReactiveCassandra._

This implicit will put your cassandra table on steroids and you'll be able to build your subscriber easily:

  val subscriber = OperaTable.subscriber(50, 4, completionFn = () => println("streaming finished!"))

In order to get this up and running, you'll need to bring some implicit context in scope:

  • an Akka actor system reference
  • the Cassandra session and keyspace
  • and one implementation of the RequestBuilder typeclass that contains the statement to be invoked once for streamed element.
  implicit val system = ActorSystem()
  import OperaTable.{ keySpace, session }

  object OperaTable extends OperaTable with SimpleCassandraConnector {
    implicit val keySpace: KeySpace = KeySpace("streams")
  }

  implicit object OperaRequestBuilder extends RequestBuilder[OperaTable, Opera] {
    override def request(ct: OperaTable, t: Opera): ExecutableStatement =
      ct.insert().value(_.name, t.name)
  }  

In this case, we'll make an insert for every element streamed.

This subscriber can take part of standard reactive streams pipelines being connected to some publisher:

  OperaPublisher.subscribe(subscriber)

  object OperaPublisher extends Publisher[Opera] {
    override def subscribe(s: Subscriber[_ >: Opera]): Unit = {
      var remaining = OperaData.operas
      s.onSubscribe(new Subscription {
        override def cancel(): Unit = ()
        override def request(l: Long): Unit = {
          remaining.take(l.toInt).foreach(s.onNext)
          remaining = remaining.drop(l.toInt)
          if (remaining.isEmpty)
            s.onComplete()
        }
      })
    }
  }

Of course, it's also possible to use this subscriber with Akka streams pipelines following the integration guidelines provided in the Akka Streams documentation.

Currently, three kinds of CQL3 batches can be carried out:

  • Logged
  • Unlogged
  • Counter

Please, refer to the CQL3 documentation for further explanation.

Next Steps

  • A proper publisher to face the other side of the problem.
  • Incorporate prepared statements as soon as they're available in phantom. This addition should improve the streaming performance greatly.

Related Work

About

Reactive streams compatible assets for Cassandra

Resources

License

Stars

Watchers

Forks

Packages

No packages published