Permalink
Browse files

Reimplement using sockjs-netty

  • Loading branch information...
1 parent e7e0086 commit 4ec3b6e5d593e2369a93744b9aa24b92acc53de9 @marekjelen marekjelen committed Feb 26, 2012
View
@@ -0,0 +1 @@
+jruby-1.6.7
View
1 .rvmrc
@@ -1 +0,0 @@
-rvm use jruby-1.6.5@wildcloud
View
@@ -3,13 +3,29 @@ PATH
specs:
wildcloud-websockets (0.0.1-java)
hot_bunnies (= 1.3.4)
+ httparty (= 0.8.1)
+ json (= 1.6.5)
multi_json (= 1.0.4)
+ sinatra (= 1.3.2)
GEM
remote: http://rubygems.org/
specs:
hot_bunnies (1.3.4-java)
+ httparty (0.8.1)
+ multi_json
+ multi_xml
+ json (1.6.5-java)
multi_json (1.0.4)
+ multi_xml (0.4.1)
+ rack (1.4.1)
+ rack-protection (1.2.0)
+ rack
+ sinatra (1.3.2)
+ rack (~> 1.3, >= 1.3.6)
+ rack-protection (~> 1.2)
+ tilt (~> 1.3, >= 1.3.3)
+ tilt (1.3.3)
PLATFORMS
java
View
@@ -15,7 +15,7 @@ The service does not **yet** implement the interface as described below.
## Interface
Application authorizes client with _secret_ application id and custom client id. The service returns socket_id, that
-should be used for identifying that specific client (e.g. user of a web application).
+should be used for identifying that specific client (e.g. session of a user of a web application).
POST /authorize/application_id/client_id
@@ -33,14 +33,10 @@ with content of the message as the request body with other parameters encoded as
where session id represent single connection between the client and this service (e.g. one window or tab in a browser).
-To publish a message, the application posts a request to the service with socket_id and optional session_id
+To publish a message, the application posts a request to the service with socket_id
POST /publish/socket_id
-or
-
- POST /publish/socket_id/session_id
-
with JSON encoded body containing the message and optional parameters.
{"message": "some message"}
@@ -90,6 +86,10 @@ On the client-side the user connects using SockJS to the server and specifies th
* WP7 - not tested
* Internet explorer mobile
+## Implementation
+
+* Uses [sockjs-netty](https://github.com/cgbystrom/sockjs-netty)
+
## License
Project is licensed under the terms of the Apache 2 License.
View
@@ -15,10 +15,11 @@
# limitations under the License.
require 'rubygems'
+require 'bundler/setup'
$: << File.expand_path('../../lib', __FILE__)
-require 'wildcloud/websockets/server'
+require 'wildcloud/websockets/engine'
-@server = Wildcloud::Websockets::Server.new
-@server.start
+Engine = Wildcloud::Websockets::Engine.new
+Engine.start
View
Binary file not shown.
@@ -12,92 +12,149 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-require 'net/http'
-require 'singleton'
+require 'digest/sha1'
require 'thread'
-require 'uri'
+
+require 'wildcloud/websockets/java'
+require 'wildcloud/websockets/server'
+
+require 'json'
+require 'hot_bunnies'
+require 'httparty'
+require 'sinatra/base'
module Wildcloud
module Websockets
- class Engine
- include Singleton
+ class Service
- def initialize
- @sockets = {}
- @deliveries = {}
- @queue = Queue.new
+ include SessionCallback
+
+ attr_reader :socket_id, :session_id
+
+ def initialize(engine, socket_id)
+ @engine = engine
+ @socket_id = socket_id
+ end
+
+ def onOpen(session)
+ @session = session
+ end
+
+ def onClose
+ @engine.remove_session(self)
+ end
+
+ def onMessage(message)
+ @engine.handle_message(self, message)
+ end
+
+ def publish(message)
+ @session.send(message)
+ end
+
+ def onError(error)
+ error.print_stack_trace
+ end
+
+ end
+
+ class Pusher
+
+ include HTTParty
+
+ def self.queue
+ @queue ||= Queue.new
+ end
+
+ def initialize(engine)
+ @engine = engine
@thread = Thread.new do
loop do
begin
- url, data = @queue.pop
- url = URI.parse(url)
- http = Net::HTTP.new(url.host, url.port)
- post = Net::HTTP::Post.new(url.path)
- post.body = data
- post['Content-Type'] = 'text/plain'
- response = http.request(post)
+ handler(Pusher.queue.pop)
rescue Exception => e
puts e
end
end
end
- # TODO: remove?
- authorize('echo', '')
- authorize('close', '')
- authorize('disabled_websocket_echo', '')
end
- def authorize(app_id, user_id)
- socket_id = "#{app_id}#{user_id}"
- @sockets[socket_id] = {:socket_id => socket_id, :app_id => app_id, :user_id => user_id, :sockets => {}, :callback => 'http://localhost:4000/publish/appclient'}
- socket_id
+ def handler(message)
+ message = JSON.parse(message)
+ self.class.post(message['callback'], :body => JSON.dump({ 'message' => message['message'], 'socket_id' => message['socket_id'] }))
end
- def validate(socket_id, session_id = nil)
- if session_id
- @sockets.key?(socket_id) && @sockets[socket_id][:sockets].key?(session_id)
- else
- @sockets.key?(socket_id)
- end
+ end
+
+ class Api < Sinatra::Base
+
+ get '/' do
+ 'Api'
+ end
+ post '/acceptor' do
+ data = JSON.parse(request.body.read)
+ HTTParty.post("http://localhost:4567/publish/#{data['socket_id']}", :body => data['message'])
end
- def add_socket(socket_id, session_id, socket)
- return socket.close('Go away!') if socket_id == 'close' # TODO: remove?
+ post '/publish/:socket_id' do
+ settings.engine.publish("Echo: #{request.body.read}", params[:socket_id])
+ 'ok'
+ end
- return nil unless @sockets.key?(socket_id)
+ post '/authorize/:application_id/:client_id' do
+ data = JSON.parse(request.body.read)
+ settings.engine.authorize(params[:application_id], params[:client_id], data[:callback])
+ end
- return socket.close('Another connection still open', 2010) if @sockets[socket_id][:sockets].key?(session_id)
+ end
- @sockets[socket_id][:sockets][session_id] = socket
+ class Engine
- #message, @deliveries[socket_id] = @deliveries[socket_id], []
- #publish(socket_id, message)
- end
+ import SessionCallbackFactory
- def remove_socket(socket_id, session_id, socket)
- return nil unless @sockets.key?(socket_id)
- @sockets[socket_id][:sockets].delete_if { |k,v| v == socket }
+ def initialize
+ @router = ServiceRouter.new
+ @server = Server.new(@router, '0.0.0.0', 8081)
+ @pusher = Pusher.new(self)
+ @sockets = {}
+ @applications = {}
+
+ authorize('demo', 'demo', 'http://localhost:4567/acceptor')
+ register_socket('1e16f7bc75de48ae2a156466a3d0521f525a3187')
end
- def publish(socket_id, message)
- if @sockets[socket_id] && @sockets[socket_id][:sockets].size > 0
- @sockets[socket_id][:sockets].each { |id, socket| socket.call(message) }
- else
- (@deliveries[socket_id] ||= []) << message
+ def start
+ @server.start
+ Api.set(:engine, self)
+ Thread.new do
+ Api.run!
end
end
- def on_message(socket_id, message)
- # TODO: remove?
- case socket_id
- when 'echo'
- publish('echo', message)
- when 'disabled_websocket_echo'
- publish('disabled_websocket_echo', message)
- else
- @queue << [@sockets[socket_id][:callback], message]
+ def authorize(application_id, client_id, callback)
+ socket_id = Digest::SHA1.hexdigest("#{application_id}:#{client_id}")
+ @applications[socket_id] = { :application_id => application_id, :client_id => client_id, :callback => callback }
+ end
+
+ def publish(message, socket_id)
+ @sockets[socket_id].each { |session| session.publish(message) }
+ end
+
+ def handle_message(session, message)
+ Pusher.queue << JSON.dump({ 'callback' => @applications[session.socket_id][:callback], 'socket_id' => session.socket_id, 'message' => message })
+ end
+
+ def register_socket(socket_id)
+ service = Service.new(self, socket_id)
+ @router.register_service("/#{socket_id}", service, true, 128 * 1024)
+ ( @sockets[socket_id] ||= [] ) << service
+ end
+
+ def remove_session(session)
+ if @sockets.key?(session.socket_id)
+ @sockets[session.socket_id].delete(session)
end
end
Oops, something went wrong.

0 comments on commit 4ec3b6e

Please sign in to comment.