Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit e061a4b
Showing
18 changed files
with
592 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
logs | ||
project/project | ||
project/target | ||
target | ||
tmp | ||
.history | ||
dist |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
Demo of mongo async driver + tailable cursors | ||
============================================= | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package controllers | ||
|
||
import play.api._ | ||
import play.api.mvc._ | ||
import play.api.Play.current | ||
import play.api.libs.json._ | ||
import play.api.libs.concurrent._ | ||
import play.api.libs.iteratee._ | ||
|
||
import play.modules.mongodb._ | ||
import play.modules.mongodb.PlayBsonImplicits._ | ||
|
||
import org.asyncmongo.api._ | ||
import org.asyncmongo.handlers.{BSONReaderHandler, BSONWriter, BSONReader} | ||
import org.asyncmongo.handlers.DefaultBSONHandlers._ | ||
import org.asyncmongo.protocol._ | ||
import akka.dispatch.Future | ||
import akka.util.duration._ | ||
|
||
object Application extends Controller { | ||
|
||
def index = Action { | ||
Ok(views.html.index()) | ||
} | ||
|
||
def watchCollection = WebSocket.using[JsValue] { request => | ||
val coll = MongoAsyncPlugin.collection("acappedcollection") | ||
val in = Iteratee.foreach[JsValue] { json => | ||
println("received " + json) | ||
coll.insert(json) | ||
} | ||
val out = enumerate(Some(coll.find[JsValue, JsValue, JsValue](Json.obj(), None, 0, 0, QueryFlags.TailableCursor | QueryFlags.AwaitData))) | ||
(in, out) | ||
} | ||
|
||
def enumerate[T](futureCursor: Option[Future[Cursor[T]]]) :Enumerator[T] = { | ||
var currentCursor :Option[Cursor[T]] = None | ||
Enumerator.generateM { | ||
if(currentCursor.isDefined && currentCursor.get.iterator.hasNext){ | ||
Promise.pure(Some(currentCursor.get.iterator.next)) | ||
} else if(currentCursor.isDefined && currentCursor.get.hasNext) { | ||
val p = Promise[Option[T]]() | ||
def periodicChecker(cursor: Cursor[T]) :Unit = { | ||
if(cursor.iterator.hasNext) { | ||
currentCursor = Some(cursor) | ||
p.redeem(Some(cursor.iterator.next)) | ||
} else { | ||
play.core.Invoker.system.scheduler.scheduleOnce(500 milliseconds)({ | ||
currentCursor.get.next.get.onSuccess { | ||
case yop => periodicChecker(yop) | ||
} | ||
}) | ||
} | ||
} | ||
periodicChecker(currentCursor.get) | ||
p | ||
} else if(!currentCursor.isDefined && futureCursor.isDefined) { | ||
new AkkaPromise(futureCursor.get.map { cursor => | ||
println("redeemed from first cursor") | ||
currentCursor = Some(cursor) | ||
if(cursor.iterator.hasNext) { | ||
Some(cursor.iterator.next) | ||
} | ||
else { | ||
None | ||
} | ||
}) | ||
} else { | ||
println("Nothing to enumerate") | ||
Promise.pure(None) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
/* | ||
* Copyright 2012 Pascal Voitot | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package play.modules.mongodb | ||
|
||
import play.api.libs.json._ | ||
import org.asyncmongo.bson._ | ||
import org.asyncmongo.handlers._ | ||
import org.jboss.netty.buffer._ | ||
import org.asyncmongo.handlers.DefaultBSONHandlers._ | ||
import org.asyncmongo.utils.Converters | ||
|
||
object PlayBsonImplicits extends PlayBsonImplicits | ||
|
||
trait BSONBuilder[T] { | ||
def write(t: T, bson: Bson): Bson | ||
} | ||
|
||
|
||
object MongoHelpers { | ||
def Date(d: java.util.Date) = Json.obj("$date" -> d.getTime) | ||
def Date(l: Long) = Json.obj("$date" -> l) | ||
|
||
def ObjectId(s: String) = Json.obj("$oid" -> s) | ||
def ObjectId(s: Array[Byte]) = Json.obj("$oid" -> Converters.hex2Str(s)) | ||
|
||
def RegEx(regex: String, options: String = "") = Json.obj("$regex" -> regex, "$options" -> options) | ||
def LessThan(obj: JsObject) = Json.obj("$lt" -> obj) | ||
|
||
} | ||
|
||
trait PlayBsonImplicits { | ||
|
||
implicit object JsObjectBSONBuilder extends BSONBuilder[JsObject] { | ||
def write(o: JsObject, bson: Bson) = { | ||
o.fields.foreach{ t:(String, JsValue) => val b = _toBson(t); println(b); bson.write(b) } | ||
bson | ||
} | ||
} | ||
|
||
implicit object JsArrayBSONBuilder extends BSONBuilder[JsArray] { | ||
def write(o: JsArray, bson: Bson) = { | ||
o.value.zipWithIndex.map{ t:(JsValue, Int) => | ||
(t._2.toString, t._1) }.foreach{ t:(String, JsValue) => val b = _toBson(t); println(b); bson.write(b) | ||
} | ||
bson | ||
} | ||
} | ||
|
||
|
||
def write2BSON[T](t: T, bson: Bson)(implicit builder:BSONBuilder[T]): Bson = { | ||
builder.write(t, bson) | ||
} | ||
|
||
def _manageSpecials(t: (String, JsObject)): Either[(String, JsObject), BSONElement] = { | ||
if(t._2.fields.length > 0) { | ||
t._2.fields(0) match { | ||
case ("$oid", JsString(v)) => Right(BSONObjectID(t._1, Converters.str2Hex(v))) | ||
case ("$date", JsNumber(v)) => Right(BSONDateTime(t._1, v.toLong)) | ||
case (k, _) if(Seq("$gt", "$lt").contains(k)) => Left(t) | ||
case (k, _) if(k.startsWith("$")) => throw new RuntimeException("unmanaged special %s".format(k)) | ||
case _ => Left(t) | ||
} | ||
} else Left(t) | ||
} | ||
|
||
def _toBson(t: (String, JsValue)): BSONElement = { | ||
t._2 match { | ||
case s: JsString => BSONString(t._1, s.value) | ||
case i: JsNumber => BSONDouble(t._1, i.value.toDouble) | ||
case o: JsObject => | ||
_manageSpecials((t._1, o)).fold ( | ||
normal => BSONDocument(normal._1, write2BSON(normal._2, new Bson()).getBuffer), | ||
special => special | ||
) | ||
|
||
case a: JsArray => | ||
val _bson = new Bson() | ||
JsArrayBSONBuilder.write(a, _bson) | ||
BSONArray(t._1, _bson.getBuffer) | ||
case b: JsBoolean => BSONBoolean(t._1, b.value) | ||
case JsNull => BSONNull(t._1) | ||
case u: JsUndefined => BSONUndefined(t._1) | ||
} | ||
} | ||
|
||
implicit object JsObjectWriter extends BSONWriter[JsObject] { | ||
def write(doc: JsObject): ChannelBuffer = { | ||
val bson = new Bson() | ||
JsObjectBSONBuilder.write(doc, bson) | ||
bson.getBuffer | ||
} | ||
} | ||
|
||
implicit object JsArrayWriter extends BSONWriter[JsArray] { | ||
def write(doc: JsArray): ChannelBuffer = { | ||
val bson = new Bson() | ||
JsArrayBSONBuilder.write(doc, bson) | ||
bson.getBuffer | ||
} | ||
} | ||
|
||
implicit object JsValueWriter extends BSONWriter[JsValue] { | ||
def write(doc: JsValue): ChannelBuffer = { | ||
doc match { | ||
case o: JsObject => JsObjectWriter.write(o) | ||
case a: JsArray => JsArrayWriter.write(a) | ||
case _ => throw new RuntimeException("JsValue can only JsObject/JsArray") | ||
} | ||
} | ||
} | ||
|
||
def toTuple(e: BSONElement): (String, JsValue) = e match { | ||
case BSONDouble(name, value) => name -> JsNumber(value) | ||
case BSONString(name, value) => name -> JsString(value) | ||
case BSONDocument(name, value) => name -> JsObjectReader.read(value) | ||
case BSONArray(name, value) => name -> JsArrayReader.read(value) | ||
case oid @ BSONObjectID(name, value) => name -> Json.obj( "$oid" -> oid.stringify ) | ||
case BSONBoolean(name, value) => name -> JsBoolean(value) | ||
case BSONDateTime(name, value) => name -> Json.obj("$date" -> value) | ||
case BSONTimestamp(name, value) => name -> Json.obj("$time" -> value.toInt, "i" -> (value >>> 4) ) | ||
case BSONRegex(name, value, flags) => name -> Json.obj("$regex" -> value, "$options" -> flags) | ||
case BSONNull(name) => name -> JsNull | ||
case BSONUndefined(name) => name -> JsUndefined("") | ||
case BSONInteger(name, value) => name -> JsNumber(value) | ||
case BSONLong(name, value) => name -> JsNumber(value) | ||
case BSONBinary(name, value, subType) => | ||
val arr = new Array[Byte](value.readableBytes()) | ||
value.readBytes(arr) | ||
name -> Json.obj( | ||
"$binary" -> Converters.hex2Str(arr), | ||
"$type" -> Converters.hex2Str(Array(subType.value.toByte)) | ||
) | ||
case BSONDBPointer(name, value, id) => name -> Json.obj("$ref" -> value, "$id" -> Converters.hex2Str(id)) | ||
// NOT STANDARD AT ALL WITH JSON and MONGO | ||
case BSONJavaScript(name, value) => name -> Json.obj("$js" -> value) | ||
case BSONSymbol(name, value) => name -> Json.obj("$sym" -> value) | ||
case BSONJavaScriptWS(name, value) => name -> Json.obj("$jsws" -> value) | ||
case BSONMinKey(name) => name -> Json.obj("$minkey" -> 0) | ||
case BSONMaxKey(name) => name -> Json.obj("$maxkey" -> 0) | ||
} | ||
|
||
object JsArrayReader extends BSONReader[JsArray] { | ||
def read(buffer: ChannelBuffer): JsArray = { | ||
val it = DefaultBSONReader.read(buffer) | ||
|
||
it.foldLeft(Json.arr()) { (acc: JsArray, e: BSONElement) => acc :+ toTuple(e)._2 } | ||
} | ||
} | ||
|
||
object JsObjectReader extends BSONReader[JsObject] { | ||
def read(buffer: ChannelBuffer): JsObject = { | ||
val it = DefaultBSONReader.read(buffer) | ||
|
||
it.foldLeft(Json.obj()) { (acc: JsObject, e: BSONElement) => acc ++ JsObject(Seq(toTuple(e))) } | ||
} | ||
} | ||
|
||
implicit object JsValueReader extends BSONReader[JsValue] { | ||
def read(buffer: ChannelBuffer): JsValue = JsObjectReader.read(buffer) | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
* Copyright 2012 Pascal Voitot | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package play.modules.mongodb | ||
|
||
import org.asyncmongo.api._ | ||
import org.asyncmongo.actors.MongoConnection | ||
import scala.collection.JavaConversions._ | ||
import play.api._ | ||
|
||
class MongoAsyncPlugin(app :Application) extends Plugin { | ||
lazy val helper: MongoAsyncHelper = { | ||
val conf = MongoAsyncPlugin.parseConf(app) | ||
println(conf) | ||
try { | ||
MongoAsyncHelper(conf._1, conf._2) | ||
} catch { | ||
case e => throw PlayException("MongoAsyncPlugin Initialization Error", "An exception occurred while initializing the MongoAsyncPlugin.", Some(e)) | ||
} | ||
} | ||
|
||
def db: DB = helper.db | ||
def connection: MongoConnection = helper.connection | ||
def collection(name :String): Collection = helper.db(name) | ||
|
||
override def onStart { | ||
Logger info "MongoAsyncPlugin starting..." | ||
Logger.info("MongoAsyncPlugin successfully started with db '%s'! Servers:\n\t\t%s" | ||
.format( | ||
helper.dbName, | ||
helper.servers.map { s => "[%s:%s]".format(s._1, s._2) }.mkString("\n\t\t") | ||
) | ||
) | ||
} | ||
} | ||
|
||
/** | ||
* MongoDB access methods. | ||
*/ | ||
object MongoAsyncPlugin { | ||
val DEFAULT_HOST = "localhost" | ||
val DEFAULT_PORT = 27017 | ||
|
||
def connection(implicit app :Application) = current.connection | ||
def db(implicit app :Application) = current.db | ||
def collection(name :String)(implicit app :Application) = current.collection(name) | ||
|
||
/** | ||
* returns the current instance of the plugin. | ||
*/ | ||
def current(implicit app :Application): MongoAsyncPlugin = app.plugin[MongoAsyncPlugin] match { | ||
case Some(plugin) => plugin | ||
case _ => throw PlayException("MongoAsyncPlugin Error", "The MongoAsyncPlugin has not been initialized! Please edit your conf/play.plugins file and add the following line: '400:play.modules.mongodb.MongoAsyncPlugin' (400 is an arbitrary priority and may be changed to match your needs).") | ||
} | ||
|
||
/** | ||
* returns the current instance of the plugin (from a [[play.Application]] - Scala's [[play.api.Application]] equivalent for Java). | ||
*/ | ||
def current(app :play.Application): MongoAsyncPlugin = app.plugin(classOf[MongoAsyncPlugin]) match { | ||
case plugin if plugin != null => plugin | ||
case _ => throw PlayException("MongoAsyncPlugin Error", "The MongoAsyncPlugin has not been initialized! Please edit your conf/play.plugins file and add the following line: '400:play.modules.mongodb.MongoAsyncPlugin' (400 is an arbitrary priority and may be changed to match your needs).") | ||
} | ||
|
||
private def parseConf(app :Application): (String, List[(String, Int)]) = { | ||
( | ||
app.configuration.getString("mongodb.db") match { | ||
case Some(db) => db | ||
case _ => throw app.configuration.globalError("Missing configuration key 'mongodb.db'!") | ||
}, | ||
app.configuration.getStringList("mongodb.servers") match { | ||
case Some(list) => | ||
list.map ({ address: String => | ||
val hostport = address.span( _ != ':' ) | ||
|
||
( | ||
if(hostport._1.isEmpty) DEFAULT_HOST else hostport._1, | ||
if(hostport._2.isEmpty) DEFAULT_PORT else hostport._2.drop(1).toInt | ||
) | ||
}).toList | ||
case _ => | ||
List((app.configuration.getString("mongodb.host").getOrElse(DEFAULT_HOST), | ||
app.configuration.getInt("mongodb.port").getOrElse(DEFAULT_PORT))) | ||
} | ||
) | ||
} | ||
} | ||
|
||
private[mongodb] case class MongoAsyncHelper(dbName: String, servers: List[(String, Int)]) { | ||
lazy val connection = MongoConnection(servers) | ||
|
||
lazy val db = DB(dbName, connection) | ||
|
||
def collection(name :String): Collection = db(name) | ||
} | ||
|
||
|
Oops, something went wrong.