Permalink
Browse files

Initial websocket support

  • Loading branch information...
1 parent e821301 commit 2850b6261a137d521681be97897df6b0e88b33c6 @rubys committed Apr 28, 2012
Showing with 221 additions and 5 deletions.
  1. +98 −0 demo/diskusage.rb
  2. +1 −0 lib/wunderbar.rb
  3. +5 −3 lib/wunderbar/job-control.rb
  4. +2 −2 lib/wunderbar/version.rb
  5. +115 −0 lib/wunderbar/websocket.rb
View
@@ -0,0 +1,98 @@
+require 'wunderbar'
+
+_html do
+ _head_ do
+ _script src: '/jquery.min.js'
+ _script src: '/jquery.tablesorter.min.js'
+ _style %{
+ p {margin: 0; height: 1.2em}
+ table {border-spacing: 1em 0}
+ th, td {padding: 0.2em 0.5em}
+ th {border-bottom: solid black}
+ tbody tr:hover {background-color: #FF8}
+ td:nth-child(2) {text-align: right}
+ #msg {display: none; height: 9em; overflow: auto; border: 1px solid}
+ .stdin {color: purple}
+ .stderr {color: red}
+ }
+ end
+
+ _body? do
+ _div.status!
+ _div.msg!
+
+ # directory is DOCUMENT_ROOT + PATH_INFO
+ dir = env['DOCUMENT_ROOT'].dup.untaint
+ prefix = "#{env['REQUEST_URI']}/" if not env['PATH_INFO'].to_s.end_with? '/'
+ if env['PATH_INFO'].to_s.start_with? '/'
+ info = File.expand_path(env['PATH_INFO'][1..-1].untaint, dir)
+ info.taint unless info.start_with?(dir) and File.exist?(info)
+ dir = info
+ end
+
+ _h1 dir
+
+ # initial table (names and dates without sizes)
+ _table_ do
+ _thead do
+ _tr do
+ _th 'Name'
+ _th 'Size'
+ _th 'Date'
+ end
+ end
+ _tbody do
+ Dir.chdir(dir) do
+ Dir['*'].sort.each do |name|
+ _tr_ do
+ href = nil
+ href = "#{prefix}#{name}/" if File.directory? name.untaint
+ _td {_a name, href: href}
+ _td
+ _td File.stat(File.join(dir, name.untaint)).mtime
+ end
+ end
+ end
+ end
+ end
+
+ # extract sizes in a background process
+ port = _.websocket {Dir.chdir(dir) {system 'du -sb *'}}
+
+ _script %{
+ ws = new WebSocket("ws://#{env['HTTP_HOST']}:#{port}/");
+ ws.onclose = function() {$("#status").html("<p>socket closed.</p>")}
+ ws.onopen = function() {$("#status").html("<p>socket connected...</p>")};
+
+ ws.onmessage = function(evt) {
+ var data = JSON.parse(evt.data);
+ var match = data.line.match(/^(\\d+)\\s+(.*)/);
+
+ // update table using output from 'du' command
+ if (data.type == 'stdout' && match) {
+ $('tbody tr').each(function() {
+ if (match && $("td:first a", this).text() == match[2]) {
+ $("td", this).eq(1).text(match[1])
+ match = null
+ }
+ })
+ }
+
+ // display all other messages received
+ if (data.type != 'stdout' || match) {
+ var msg = $("#msg");
+ msg.append($("<p></p>").text(data.line).addClass(data.type));
+ msg.prop('scrollTop', msg.prop("scrollHeight") - msg.height());
+ if (data.type != 'stdin') msg.show();
+ }
+
+ // make table sortable
+ $('table').tablesorter();
+ }
+ }
+ end
+end
+
+__END__
+# Customize what directory is searched
+$ROOT = ENV['DOCUMENT_ROOT']
View
@@ -11,5 +11,6 @@
require 'wunderbar/job-control'
require 'wunderbar/logger'
require 'wunderbar/server'
+require 'wunderbar/websocket'
W_ = Wunderbar
@@ -1,6 +1,6 @@
# run command/block as a background daemon
module Wunderbar
- def submit(cmd=nil)
+ def self.submit(cmd=nil)
fork do
# detach from tty
Process.setsid
@@ -17,8 +17,10 @@ def submit(cmd=nil)
# clear environment of cgi cruft
require 'cgi'
- ENV.delete_if {|key,value| key =~ /^HTTP_/}
- CGI::QueryExtension.public_instance_methods.each do |method|
+ ENV.keys.select {|key| key =~ /^HTTP_/}.each do |key|
+ ENV.delete key.dup.untaint
+ end
+ ::CGI::QueryExtension.public_instance_methods.each do |method|
ENV.delete method.to_s.upcase
end
View
@@ -1,8 +1,8 @@
module Wunderbar
module VERSION #:nodoc:
MAJOR = 0
- MINOR = 12
- TINY = 1
+ MINOR = 13
+ TINY = 0
STRING = [MAJOR, MINOR, TINY].join('.')
end
View
@@ -0,0 +1,115 @@
+#!/usr/bin/ruby
+require 'rubygems'
+require 'open3'
+require 'socket'
+
+begin
+ require 'em-websocket'
+rescue LoadError
+ module EM
+ class Channel
+ def initialize
+ require 'em-websocket'
+ end
+ end
+ end
+end
+
+module Wunderbar
+ class Channel < EM::Channel
+ attr_reader :port
+
+ def initialize(port, limit=nil)
+ TCPSocket.new('localhost', port).close
+ raise ArgumentError.new "Socket #{port} is not available"
+ rescue Errno::ECONNREFUSED
+ super()
+ @port = port
+ @memory = []
+ @memory_channel = subscribe do |msg|
+ @memory << msg.chomp unless Symbol === msg
+ @memory.shift while limit and @memory.length > limit
+ end
+ websocket.run
+ end
+
+ def websocket
+ @websocket ||= Thread.new do
+ EM::WebSocket.start(:host => '0.0.0.0', :port => @port) do |ws|
+ ws.onopen {@memory.each {|msg| ws.send msg }}
+
+ sid = subscribe do |msg|
+ if msg == :shutdown
+ ws.close_websocket
+ else
+ ws.send msg
+ end
+ end
+
+ ws.onclose {unsubscribe sid}
+ end
+ end
+ end
+
+ def _(msg=nil, &block)
+ return self if msg==nil
+ push(msg.to_json)
+ end
+
+ def system(command)
+ Open3.popen3(command) do |pin, pout, perr|
+ _ :type=>:stdin, :line=>command
+ [
+ Thread.new do
+ pout.sync=true
+ _ :type=>:stdout, :line=>pout.readline.chomp until pout.eof?
+ end,
+ Thread.new do
+ perr.sync=true
+ _ :type=>:stderr, :line=>perr.readline.chomp until perr.eof?
+ end,
+ Thread.new { pin.close }
+ ].each {|thread| thread.join}
+ end
+ end
+
+ def close
+ unsubscribe @memory_channel if @memory_channel
+ push :shutdown
+ sleep 1
+ EM::WebSocket.stop
+ websocket.join
+ end
+ end
+
+ if defined? EventMachine::WebSocket
+ def self.websocket(port=nil, &block)
+ if not port
+ socket = TCPServer.new(0)
+ port = Socket.unpack_sockaddr_in(socket.getsockname).first
+ socket.close
+ end
+
+ submit do
+ begin
+ channel = Wunderbar::Channel.new(port)
+ channel.instance_eval &block
+ rescue Exception => exception
+ channel._ :type=>:stderr, :line=>exception.inspect
+ exception.backtrace.each do |frame|
+ next if Wunderbar::CALLERS_TO_IGNORE.any? {|re| frame =~ re}
+ channel._ :type=>:stderr, :line=>" #{frame}"
+ end
+ ensure
+ channel.push :shutdown
+ sleep 5
+ channel.close if channel
+ end
+ end
+
+ sleep 1
+
+ port
+ end
+ end
+end

0 comments on commit 2850b62

Please sign in to comment.