Permalink
Browse files

Add a unit test for HTTP 101 Switching Protocols support in RequestHa…

…ndler.
  • Loading branch information...
1 parent a8afcb6 commit 3b7f4af99a690c2e99b1a08581cc44770e3f434b @FooBarWidget FooBarWidget committed Jan 28, 2014
@@ -126,7 +126,8 @@ class ProcessList: public list<ProcessPtr> {
* its Sessions, and a Process also outlives the OS process.
*/
class Process: public boost::enable_shared_from_this<Process> {
-private:
+// Actually private, but marked public so that unit tests can access the fields.
+public:
friend class Group;
/** A mutex to protect access to `lifeStatus`. */
@@ -171,6 +171,19 @@ class BufferedIO {
return output;
}
+ /**
+ * Reads a line and returns the line including the newline character. Upon
+ * encountering EOF, the empty string is returned.
+ *
+ * The `max` parameter dictates the maximum length of the returned line.
+ * If the line is longer than this number of characters, then a SecurityException
+ * is thrown, and the BufferedIO becomes unusable (enters an undefined state).
+ *
+ * @throws SystemException
+ * @throws TimeoutException
+ * @throws SecurityException
+ * @throws boost::thread_interrupted
+ */
string readLine(unsigned int max = 1024, unsigned long long *timeout = NULL) {
string output;
readUntil(boost::bind(newlineFound, _1, _2, &output, max), timeout);
@@ -106,14 +106,15 @@ def main_loop(self):
if not client:
done = True
break
+ socket_hijacked = False
try:
try:
env, input_stream = self.parse_request(client)
if env:
if env['REQUEST_METHOD'] == 'ping':
self.process_ping(env, input_stream, client)
else:
- self.process_request(env, input_stream, client)
+ socket_hijacked = self.process_request(env, input_stream, client)
except KeyboardInterrupt:
done = True
except IOError:
@@ -123,16 +124,17 @@ def main_loop(self):
except Exception:
logging.exception("WSGI application raised an exception!")
finally:
- try:
- # Shutdown the socket like this just in case the app
- # spawned a child process that keeps it open.
- client.shutdown(socket.SHUT_WR)
- except:
- pass
- try:
- client.close()
- except:
- pass
+ if not socket_hijacked:
+ try:
+ # Shutdown the socket like this just in case the app
+ # spawned a child process that keeps it open.
+ client.shutdown(socket.SHUT_WR)
+ except:
+ pass
+ try:
+ client.close()
+ except:
+ pass
except KeyboardInterrupt:
pass
@@ -232,7 +234,16 @@ def start_response(status, response_headers, exc_info = None):
headers_set[:] = [status, response_headers]
return write
+ def hijack():
+ env['passenger.hijacked_socket'] = output_stream
+ return output_stream
+
+ env['passenger.hijack'] = hijack
+
result = self.app(env, start_response)
+ if 'passenger.hijacked_socket' in env:
+ # Socket connection hijacked. Don't do anything.
+ return True
try:
for data in result:
# Don't send headers until body appears.
@@ -244,6 +255,7 @@ def start_response(status, response_headers, exc_info = None):
finally:
if hasattr(result, 'close'):
result.close()
+ return False
def process_ping(self, env, input_stream, output_stream):
output_stream.sendall(b"pong")
@@ -95,6 +95,11 @@ def initialize(owner_pipe, options = {})
"analytics_logger",
"pool_account_username"
)
+
+ @force_http_session = ENV["_PASSENGER_FORCE_HTTP_SESSION"] == "true"
+ if @force_http_session
+ @connect_password = nil
+ end
@thread_handler = options["thread_handler"] || ThreadHandler
@concurrency = 1
if options["pool_account_password_base64"]
@@ -114,7 +119,7 @@ def initialize(owner_pipe, options = {})
@server_sockets[:main] = {
:address => @main_socket_address,
:socket => @main_socket,
- :protocol => :session,
+ :protocol => @force_http_session ? :http_session : :session,
:concurrency => @concurrency
}
@@ -125,7 +130,7 @@ def initialize(owner_pipe, options = {})
:protocol => :http,
:concurrency => 1
}
-
+
@owner_pipe = owner_pipe
@options = options
@previous_signal_handlers = {}
@@ -308,7 +313,7 @@ def should_use_unix_sockets?
# is still bugged as of version 1.7.0. They can
# cause unexplicable freezes when used in combination
# with threading.
- return ruby_engine != "jruby"
+ return !@force_http_session && ruby_engine != "jruby"
end
def create_unix_socket_on_filesystem
@@ -400,7 +405,9 @@ def start_threads
main_socket_options = common_options.merge(
:server_socket => @main_socket,
:socket_name => "main socket",
- :protocol => :session
+ :protocol => @server_sockets[:main][:protocol] == :session ?
+ :session :
+ :http
)
http_socket_options = common_options.merge(
:server_socket => @http_socket,
@@ -6,6 +6,7 @@
#include <Utils/json.h>
#include <Utils/IOUtils.h>
#include <Utils/Timer.h>
+#include <Utils/BufferedIO.h>
#include <boost/shared_array.hpp>
#include <string>
@@ -51,6 +52,7 @@ namespace tut {
setPrintAppOutputAsDebuggingMessages(true);
agentOptions.passengerRoot = resourceLocator->getRoot();
+ agentOptions.defaultRubyCommand = DEFAULT_RUBY;
agentOptions.defaultUser = testConfig["default_user"].asString();
agentOptions.defaultGroup = testConfig["default_group"].asString();
root = resourceLocator->getRoot();
@@ -972,6 +974,84 @@ namespace tut {
ensure(containsSubstring(response, "Counter: 2\n"));
}
+ TEST_METHOD(53) {
+ set_test_name("It supports switching protocols when communicating over application session sockets");
+
+ init();
+ connect();
+ sendHeaders(defaultHeaders,
+ "PASSENGER_APP_ROOT", wsgiAppPath.c_str(),
+ "PATH_INFO", "/switch_protocol",
+ "HTTP_UPGRADE", "raw",
+ "HTTP_CONNECTION", "Upgrade",
+ NULL
+ );
+
+ BufferedIO io(connection);
+ string header;
+ bool done = false;
+
+ ensure_equals(io.readLine(), "HTTP/1.1 101 Switching Protocols\r\n");
+
+ do {
+ string line = io.readLine();
+ done = line.empty() || line == "\r\n";
+ if (!done) {
+ header.append(line);
+ }
+ } while (!done);
+
+ ensure("(1)", containsSubstring(header, "Upgrade: raw\r\n"));
+ ensure("(2)", containsSubstring(header, "Connection: Upgrade\r\n"));
+
+ writeExact(connection, "hello\n");
+ ensure_equals(io.readLine(), "Echo: hello\n");
+ }
+
+ TEST_METHOD(54) {
+ set_test_name("It supports switching protocols when communication over application http_session sockets");
+
+ init();
+ connect();
+ sendHeaders(defaultHeaders,
+ "_PASSENGER_FORCE_HTTP_SESSION", "true",
+ "PASSENGER_APP_ROOT", rackAppPath.c_str(),
+ "PASSENGER_APP_TYPE", "rack",
+ "REQUEST_URI", "/switch_protocol",
+ "PATH_INFO", "/switch_protocol",
+ "HTTP_UPGRADE", "raw",
+ "HTTP_CONNECTION", "Upgrade",
+ NULL
+ );
+
+ BufferedIO io(connection);
+ string header;
+ bool done = false;
+ vector<ProcessPtr> processes;
+
+ ensure_equals(io.readLine(), "HTTP/1.1 101 Switching Protocols\r\n");
+ processes = pool->getProcesses();
+ {
+ LockGuard l(pool->syncher);
+ ProcessPtr process = processes[0];
+ ensure_equals(process->sessionSockets.top()->protocol, "http_session");
+ }
+
+ do {
+ string line = io.readLine();
+ done = line.empty() || line == "\r\n";
+ if (!done) {
+ header.append(line);
+ }
+ } while (!done);
+
+ ensure("(1)", containsSubstring(header, "Upgrade: raw\r\n"));
+ ensure("(2)", containsSubstring(header, "Connection: Upgrade\r\n"));
+
+ writeExact(connection, "hello\n");
+ ensure_equals(io.readLine(), "Echo: hello\n");
+ }
+
// Test small response buffering.
// Test large response buffering.
}
View
@@ -75,6 +75,25 @@ app = lambda do |env|
sleep 0.1 # Give HelperAgent the time to process stdout first.
STDERR.puts "hello stderr!"
text_response("ok")
+ when '/switch_protocol'
+ if env['HTTP_UPGRADE'] != 'raw' || env['HTTP_CONNECTION'].downcase != 'upgrade'
+ return [500, { "Content-Type" => "text/plain" }, ["Invalid headers"]]
+ end
+ env['rack.hijack'].call
+ io = env['rack.hijack_io']
+ begin
+ io.write("Status: 101 Switching Protocols\r\n")
+ io.write("Upgrade: raw\r\n")
+ io.write("Connection: Upgrade\r\n")
+ io.write("\r\n")
+ while !io.eof?
+ line = io.readline
+ io.write("Echo: #{line}")
+ io.flush
+ end
+ ensure
+ io.close
+ end
else
[404, { "Content-Type" => "text/plain" }, ["Unknown URI"]]
end
@@ -1,4 +1,4 @@
-import os, time, cgi
+import os, sys, time, cgi
def file_exist(filename):
try:
@@ -7,6 +7,19 @@ def file_exist(filename):
except OSError:
return False
+if sys.version_info[0] >= 3:
+ def bytes_to_str(b):
+ return b.decode()
+
+ def str_to_bytes(s):
+ return s.encode('latin-1')
+else:
+ def bytes_to_str(b):
+ return b
+
+ def str_to_bytes(s):
+ return s
+
def application(env, start_response):
status = '200 OK'
body = None
@@ -131,6 +144,29 @@ def body():
elif path == '/oobw':
start_response(status, [('Content-Type', 'text/plain'), ('X-Passenger-Request-OOB-Work', 'true')])
return [str(os.getpid())]
+ elif path == '/switch_protocol':
+ if env['HTTP_UPGRADE'] != 'raw' or env['HTTP_CONNECTION'].lower() != 'upgrade':
+ status = '500 Internal Server Error'
+ body = str('Invalid headers')
+ start_response(status, [('Content-Type', 'text/plain'), ('Content-Length', len(body))])
+ return [body]
+ socket = env['passenger.hijack']()
+ io = socket.makefile()
+ socket.close()
+ try:
+ io.write(
+ b"HTTP/1.1 101 Switching Protocols\r\n" +
+ b"Upgrade: raw\r\n" +
+ b"Connection: Upgrade\r\n" +
+ b"\r\n")
+ io.flush()
+ line = io.readline()
+ while line != "":
+ io.write("Echo: " + line)
+ io.flush()
+ line = io.readline()
+ finally:
+ io.close()
else:
status = "404 Not Found"
body = "Unknown URI"

0 comments on commit 3b7f4af

Please sign in to comment.