Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 3 commits
  • 8 files changed
  • 0 commit comments
  • 1 contributor
View
50 src/main/scala/com/biosimilarity/lift/lib/http/LockFreeMap.scala
@@ -0,0 +1,50 @@
+package com.biosimilarity.lift.lib.websocket
+
+import scala.collection.immutable.{Map => ImmutableMap}
+import scala.collection.mutable.{Map => MutableMap, MapLike}
+import scala.collection.generic.{CanBuildFrom, MutableMapFactory}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
+
+/**
+ * a lock free map that looks like a mutable map to the outside world and internally
+ * delegates to a immutable map. Any mutations create a new immutable map and then use
+ * atomic get and set to mutate the delegate map. If the get and set fails it will
+ * redo the creation of the ew immutable map and try again, etc, etc.
+ *
+ * Very high for scenarios with large number of reads compared to writes/mutations.
+ *
+ */
+class LockFreeMap[A,B] extends MutableMap[A,B]
+ with MapLike[A, B, LockFreeMap[A,B]]
+{
+
+ private val _ref = new AtomicReference(ImmutableMap[A,B]())
+
+ override def empty: LockFreeMap[A, B] = LockFreeMap.empty[A, B]
+
+ def get(key: A): Option[B] = _ref.get.get(key)
+
+ def iterator: Iterator[(A, B)] = _ref.get.iterator
+
+ def -=(key: A): this.type = transaction{_ - key}
+
+ def += (kv: (A, B)): this.type = transaction(_ + kv)
+
+ private def transaction(mutator: ImmutableMap[A,B]=>ImmutableMap[A,B]): this.type = {
+ def tryit = {
+ val original = _ref.get
+ val updated = mutator(original)
+ _ref.compareAndSet(original, updated)
+ }
+ while ( !tryit ) {}
+ this
+ }
+
+}
+
+
+object LockFreeMap extends MutableMapFactory[LockFreeMap] {
+ implicit def canBuildFrom[A, B]: CanBuildFrom[Coll, (A, B), LockFreeMap[A, B]] = new MapCanBuildFrom[A, B]
+ def empty[A, B]: LockFreeMap[A, B] = new LockFreeMap[A, B]
+}
View
58 src/main/scala/com/biosimilarity/lift/lib/http/SocketServlet.scala
@@ -20,27 +20,55 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.WebSocketServlet
import org.eclipse.jetty.websocket.WebSocket
+import java.net.URI
+import com.biosimilarity.lift.lib.kvdbJSON.KVDBJSONAPIDispatcher
+
+/**
+
+GLENandGREG need to talk through if the client sets a UUID in the URI or just puts UUID's in each message.
+I (aka Glen) am not clear if more framing is needed. I.e. I think we send the framing in each message from the
+client. We "could" have the framing at the socket level and send it once as well then all messages get
+that from URI, etc, etc... Anyways this is enough text to trigger my memory for discussion
+
+GLENandGREG when should serveAPI be called? Per web socket connection? That is my best educated 2 cent guess ;-)
+
+*/
+object SocketServlet {
+ lazy val dispatcher = {
+ val dsp =
+ new KVDBJSONAPIDispatcher( new URI( "amqp", "localhost", "/kvdb", "" ) )
+
+ val trgtURI =
+ new URI( "agent", "localhost", "/kvdbDispatchStore1", "" )
+
+ val srcURI =
+ new URI( "agent", "localhost", "/kvdbDispatchStore2", "" )
+
+ // Configure it to serve requests with a "to" header of agent://localhost/kvdbDispatchStore1
+ // and a "from" header of agent://localhost/kvdbDispatchStore2
+ // and deposit replies to the kvdbReplyQueue in the kvdbReplyExchange
+ dsp.addSingletonKVDB( trgtURI )
+ // dsp.addReplyQueue( srcURI, "localhost", "kvdbReply" )
+ dsp.serveAPI
+
+ dsp
+ }
+
+}
class SocketServlet extends org.eclipse.jetty.websocket.WebSocketServlet {
+
+ import SocketServlet.dispatcher
+
def doWebSocketConnect(
request : HttpServletRequest,
protocol : String
- ) : WebSocket = {
+ ) : WebSocket = {
+ val uri = new URI(request.getParameter("uri"))
+ println("socket received " + request)
QueuingWebSocket(
- theWSMgr,
- new Queue[String]( )
+ dispatcher
+ , uri
)
}
}
-
-// class KVDBSocketServlet extends org.eclipse.jetty.websocket.WebSocketServlet {
-// def doWebSocketConnect(
-// request : HttpServletRequest,
-// protocol : String
-// ) : WebSocket = {
-// KVDBWebSocket(
-// theKVDBWSMgr,
-// request
-// )
-// }
-// }
View
83 src/main/scala/com/biosimilarity/lift/lib/http/WebSocket.scala
@@ -48,93 +48,52 @@ import _root_.java.io.ObjectInputStream
import _root_.java.io.ByteArrayInputStream
import _root_.java.util.Timer
import _root_.java.util.TimerTask
+import java.net.URI
+import com.biosimilarity.lift.lib.kvdbJSON.KVDBJSONAPIDispatcher
case class SocketConnectionPair(
- socket : WebSocket with Seq[String],
- connection : WebSocket.Connection
+ requestQueue : Seq[String], // this is the queue that the dispatcher listens for incoming messages on
+ responseConnection : WebSocket.Connection // this is the websocket that the dispatcher will send outgoing messages with
)
-case class WSMgr(
- socketURIMap : HashMap[URI,SocketConnectionPair],
- socketMap : HashMap[WebSocket,URI],
- host : String
-) extends MapProxy[WebSocket,URI]
-with UUIDOps {
- override def self = socketMap
- implicit def asURI( ws : WebSocket ) : URI = {
- new URI( "websocket", host, "/" + getUUID, "" )
- }
-}
-
-case object theWSMgr extends WSMgr(
- new HashMap[URI,SocketConnectionPair]( ),
- new HashMap[WebSocket,URI]( ),
- "localhost"
-)
case class QueuingWebSocket(
- wsMgr : WSMgr,
- queue : Queue[String]
+ dispatcher: KVDBJSONAPIDispatcher
+ , uri: URI
) extends WebSocket
with WebSocket.OnTextMessage
- with SeqProxy[String]
{
- override def self = queue
+
+ val queue = Queue[String]()
override def onOpen(
- connection: WebSocket.Connection
+ wsConnection: WebSocket.Connection
) : Unit = {
- // BUGBUG -- lgm : is this thread safe?
- println( "in onOpen with " + connection )
- val uri = wsMgr.asURI( this )
- wsMgr += ( ( this, uri ) )
- wsMgr.socketURIMap += ( ( uri, SocketConnectionPair( this, connection ) ) )
+ println( "in onOpen with " + wsConnection )
+ dispatcher.socketURIMap += ( uri -> SocketConnectionPair(queue,wsConnection) )
}
override def onClose(
closeCode: Int,
message: String
) : Unit = {
- // BUGBUG -- lgm : is this thread safe?
println( "in onClose with " + closeCode + " and " + message )
- for( uri <- wsMgr.get( this ) ) {
- wsMgr.socketURIMap -= uri
- }
- wsMgr -= this
+ dispatcher.socketURIMap -= uri
}
override def onMessage(
message: String
) : Unit = {
- // BUGBUG -- lgm : is this thread safe?
- println( "in onMessage with " + message )
+ // is this thread safe?
queue += message
+ // GLENandGREG need to send the message throught the dispatcher here. The following code is AMQP'ish not websocket'ish
+ // but we need something like this except it does the
+/*
+ val srcScope : AMQPNodeJSScope = new AMQPNodeJSStdScope()
+ val srcQM = new srcScope.AMQPNodeJSQueueM( host, exchange )
+ val srcQ = srcQM.zeroJSON
+ srcQ ! putMsgHdrsBody
+*/
}
}
-
-package usage {
-/* ------------------------------------------------------------------
- * Mostly self-contained object to support unit testing
- * ------------------------------------------------------------------ */
-
- import com.biosimilarity.lift.model.store._
- import com.biosimilarity.lift.model.agent._
- import com.biosimilarity.lift.model.msg._
-
- import com.biosimilarity.lift.lib.UUIDOps
- import com.biosimilarity.lift.lib.moniker._
-
- import scala.xml._
- import scala.collection.MapProxy
- import scala.collection.mutable.Map
- import scala.collection.mutable.HashMap
- import scala.collection.mutable.LinkedHashMap
- import scala.collection.mutable.ListBuffer
- import scala.collection.mutable.MutableList
-
- import java.io.StringReader
-
-}
-
-
View
11 src/main/scala/com/biosimilarity/lift/lib/http/client/BaseClientApp.scala
@@ -11,7 +11,8 @@ import org.eclipse.jetty.websocket.WebSocketClientFactory;
trait BaseClientApp extends App {
- val url = new URI("ws://127.0.0.1:8090/querysocket")
+// lazy val url = new URI("ws://127.0.0.1:8090/querysocket")
+ lazy val url = new URI("ws://127.0.0.1:8090/websocket")
lazy val client = {
val factory = new WebSocketClientFactory()
@@ -29,9 +30,9 @@ trait BaseClientApp extends App {
}).get(5, TimeUnit.SECONDS)
}
- val messagesReceivedCounter = new AtomicInteger
+ lazy val messagesReceivedCounter = new AtomicInteger
- val waiter = new Object
+ lazy val waiter = new Object
def wait(seconds: Int) = waiter.synchronized(waiter.wait(seconds*1000))
def sendGet(key: String) = send("get", key)
@@ -54,9 +55,11 @@ trait BaseClientApp extends App {
.replace(":key:", key)
.replace(":value:", value)
println("--sending message: " + msg);
- connection sendMessage msg
+ rawSend(msg)
}
+ def rawSend(msg: String) = connection sendMessage msg
+
def run
try {
View
12 src/main/scala/com/biosimilarity/lift/lib/http/client/GetPutClient.scala
@@ -3,9 +3,17 @@ package com.biosimilarity.lift.lib.websocket.client
object GetPutClient extends BaseClientApp {
+ val getMessage = """{ "headers" : [agent://localhost/kvdbDispatchStore1,agent://localhost/kvdbDispatchStore2,f5e9e43b-c590-4285-a66c-96faef286aa3,e64c44b5-dfff-4a0e-8e39-49458fd99683,{ "response" : null }], "body" : { "getRequest" : { "ask" : { "node" : [{ "machine" : ["sl390"] }, { "os" : ["Ubuntu","11.04"] } ] } } } }"""
+
+ val putMessage = """ { "headers" : [agent://localhost/kvdbDispatchStore1,agent://localhost/kvdbDispatchStore2,f5e9e43b-c590-4285-a66c-96faef286aa3,e64c44b5-dfff-4a0e-8e39-49458fd99683,{ "response" : null }], "body" : { "putRequest" : { "tell" : [ { "node" : [{ "machine" : ["sl390"] }, { "os" : ["Ubuntu","11.04"] } ] }, "running" ] } } }"""
+
def run = {
- sendPut ("helloWorld", "The Hello World Value")
- sendGet("helloWorld")
+ (1 to 5) foreach { i =>
+ rawSend(getMessage)
+ rawSend(putMessage)
+ }
+ // sendPut ("helloWorld", "The Hello World Value")
+ // sendGet("helloWorld")
}
}
View
32 src/main/scala/com/biosimilarity/lift/lib/kvdbJSON/dispatch/Dispatcher.scala
@@ -21,6 +21,8 @@ import com.biosimilarity.lift.model.msg._
import com.biosimilarity.lift.lib._
import com.biosimilarity.lift.lib.moniker._
+import com.biosimilarity.lift.lib.websocket.LockFreeMap
+
import scala.util.continuations._
import scala.collection.mutable.HashMap
import scala.xml._
@@ -59,9 +61,6 @@ with UUIDOps {
ApplicationDefaults.asInstanceOf[ConfigurationDefaults]
}
- // Websocket support
- def wsMgr : WSMgr
-
// comms
def srcURI : URI
def srcHost( srcURI : URI ) : String = srcURI.getHost
@@ -90,6 +89,8 @@ with UUIDOps {
def replyQ( replyHost : String, replyExchange : String ) : stblReplyScope.AMQPQueue[String] =
replyQM( replyHost, replyExchange ).zeroJSON
+ def socketURIMap: LockFreeMap[URI,SocketConnectionPair]
+
// namespace
def kvdbScope : PersistedTermStoreScope[String,String,String,String]
lazy val stblKVDBScope : PersistedTermStoreScope[String,String,String,String] =
@@ -111,8 +112,7 @@ with UUIDOps {
writer : java.io.Writer
) extends ReplyCacheTrgt
case class WebSocketTrgt(
- socket : WebSocket,
- connection : WebSocket.Connection
+ wsConnection : WebSocket.Connection
) extends ReplyCacheTrgt
// this controls what URI's will be dispatched to with results
@@ -338,11 +338,10 @@ with UUIDOps {
}
case "websocket" => {
- wsMgr.socketURIMap.get( reply ) match {
- case Some( sockCnxnPair ) => {
+ socketURIMap.get( reply ) match {
+ case Some( SocketConnectionPair(_,wsConnection) ) => {
WebSocketTrgt(
- sockCnxnPair.socket,
- sockCnxnPair.connection
+ wsConnection
)
}
case _ => {
@@ -417,8 +416,8 @@ with UUIDOps {
tweet( "sending " + rsp + " to " + q )
q ! rsp
}
- case WebSocketTrgt( socket, connection ) => {
- connection.sendMessage( rsp )
+ case WebSocketTrgt( wsConnection ) => {
+ wsConnection.sendMessage( rsp )
}
case JVMWriter( writer ) => {
writer.write( rsp )
@@ -442,8 +441,8 @@ with UUIDOps {
tweet( "sending " + rsp + " to " + q )
q ! rsp
}
- case WebSocketTrgt( socket, connection ) => {
- connection.sendMessage( rsp )
+ case WebSocketTrgt( wsConnection ) => {
+ wsConnection.sendMessage( rsp )
}
case JVMWriter( writer ) => {
writer.write( rsp )
@@ -560,8 +559,8 @@ with UUIDOps {
serveAPI( srcHost( uri ), srcExchange( uri ) )
}
case "websocket" => {
- for( sockCnxnPair <- wsMgr.socketURIMap.get( uri ) ) {
- serveAPI( sockCnxnPair.socket )
+ for( SocketConnectionPair(queue,_) <- socketURIMap.get( uri ) ) {
+ serveAPI( queue )
}
}
case "stream" => {
@@ -585,12 +584,13 @@ with UUIDOps {
class KVDBJSONAPIDispatcher(
override val srcURI : URI
) extends KVDBJSONAPIDispatcherT {
- override def wsMgr : WSMgr = theWSMgr
override def kvdbScope : PersistedTermStoreScope[String,String,String,String] = PTSS
override def kvdbPersistenceScope : stblKVDBScope.PersistenceScope = {
// This cast makes me sad...
PTSS.Being.asInstanceOf[stblKVDBScope.PersistenceScope]
}
+
+ val socketURIMap = LockFreeMap[URI,SocketConnectionPair]()
// this controls what URI's will be dispatched to KVDB's
override def namespace : HashMap[URI,stblKVDBPersistenceScope.PersistedMonadicGeneratorJunction]
View
12 src/main/webapp/WEB-INF/web.xml
@@ -28,4 +28,16 @@ PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
<url-pattern>/*</url-pattern>
</filter-mapping>
+
+ <servlet>
+ <servlet-name>websocket</servlet-name>
+ <servlet-class>com.biosimilarity.lift.lib.websocket.SocketServlet</servlet-class>
+ </servlet>
+
+ <servlet-mapping>
+ <servlet-name>websocket</servlet-name>
+ <url-pattern>/websocket</url-pattern>
+ </servlet-mapping>
+
+
</web-app>
View
60 src/test/scala/RunWebApp.scala
@@ -1,33 +1,33 @@
-import _root_.org.mortbay.jetty.Connector
-import _root_.org.mortbay.jetty.Server
-import _root_.org.mortbay.jetty.webapp.WebAppContext
-import org.mortbay.jetty.nio._
+// import _root_.org.mortbay.jetty.Connector
+// import _root_.org.mortbay.jetty.Server
+// import _root_.org.mortbay.jetty.webapp.WebAppContext
+// import org.mortbay.jetty.nio._
object RunWebApp extends App {
- val server = new Server
- val scc = new SelectChannelConnector
- scc.setPort(8080)
- server.setConnectors(Array(scc))
-
- val context = new WebAppContext()
- context.setServer(server)
- context.setContextPath("/")
- context.setWar("src/main/webapp")
-
- server.addHandler(context)
-
- try {
- println(">>> STARTING EMBEDDED JETTY SERVER, PRESS ANY KEY TO STOP")
- server.start()
- while (System.in.available() == 0) {
- Thread.sleep(5000)
- }
- server.stop()
- server.join()
- } catch {
- case exc : Exception => {
- exc.printStackTrace()
- System.exit(100)
- }
- }
+ // val server = new Server
+ // val scc = new SelectChannelConnector
+ // scc.setPort(8080)
+ // server.setConnectors(Array(scc))
+ //
+ // val context = new WebAppContext()
+ // context.setServer(server)
+ // context.setContextPath("/")
+ // context.setWar("src/main/webapp")
+ //
+ // server.addHandler(context)
+ //
+ // try {
+ // println(">>> STARTING EMBEDDED JETTY SERVER, PRESS ANY KEY TO STOP")
+ // server.start()
+ // while (System.in.available() == 0) {
+ // Thread.sleep(5000)
+ // }
+ // server.stop()
+ // server.join()
+ // } catch {
+ // case exc : Exception => {
+ // exc.printStackTrace()
+ // System.exit(100)
+ // }
+ // }
}

No commit comments for this range

Something went wrong with that request. Please try again.