Skip to content
This repository has been archived by the owner on Sep 10, 2021. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
Added direct proxying feature EM.enable_proxy
  • Loading branch information
jakedouglas committed May 8, 2009
1 parent 91d5dc3 commit eb70e07
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 13 deletions.
25 changes: 25 additions & 0 deletions ext/cmain.cpp
Expand Up @@ -636,3 +636,28 @@ extern "C" int evma_send_file_data_to_connection (const char *binding, const cha
return 0;
}


/****************
evma_start_proxy
*****************/

extern "C" void evma_start_proxy (const char *from, const char *to)
{
ensure_eventmachine("evma_start_proxy");
EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (from));
if (ed)
ed->StartProxy(to);
}


/***************
evma_stop_proxy
****************/

extern "C" void evma_stop_proxy (const char *from)
{
ensure_eventmachine("evma_stop_proxy");
EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (from));
if (ed)
ed->StopProxy();
}
60 changes: 52 additions & 8 deletions ext/ed.cpp
Expand Up @@ -50,6 +50,7 @@ EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em):
EventCallback (NULL),
bCallbackUnbind (true),
UnbindReasonCode (0),
ProxyTarget(NULL),
MyEventMachine (em)
{
/* There are three ways to close a socket, all of which should
Expand Down Expand Up @@ -93,6 +94,7 @@ EventableDescriptor::~EventableDescriptor()
{
if (EventCallback && bCallbackUnbind)
(*EventCallback)(GetBinding().c_str(), EM_CONNECTION_UNBOUND, NULL, UnbindReasonCode);
StopProxy();
Close();
}

Expand Down Expand Up @@ -166,6 +168,52 @@ bool EventableDescriptor::IsCloseScheduled()
}


/*******************************
EventableDescriptor::StartProxy
*******************************/

void EventableDescriptor::StartProxy(const char *to)
{
EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (to));
if (ed) {
StopProxy();
ProxyTarget = strdup(to);
return;
}
throw std::runtime_error ("Tried to proxy to an invalid descriptor");
}


/******************************
EventableDescriptor::StopProxy
******************************/

void EventableDescriptor::StopProxy()
{
if (ProxyTarget) {
free(ProxyTarget);
ProxyTarget = NULL;
}
}


/********************************************
EventableDescriptor::_GenericInboundDispatch
********************************************/

void EventableDescriptor::_GenericInboundDispatch(const char *buf, int size)
{
assert(EventCallback);

if (!ProxyTarget)
(*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, buf, size);
else if (ConnectionDescriptor::SendDataToConnection(ProxyTarget, buf, size) == -1) {
(*EventCallback)(GetBinding().c_str(), EM_PROXY_TARGET_UNBOUND, NULL, 0);
StopProxy();
}
}


/******************************************
ConnectionDescriptor::ConnectionDescriptor
******************************************/
Expand Down Expand Up @@ -553,8 +601,7 @@ void ConnectionDescriptor::_DispatchInboundData (const char *buffer, int size)
while ((s = SslBox->GetPlaintext (B, sizeof(B) - 1)) > 0) {
_CheckHandshakeStatus();
B [s] = 0;
if (EventCallback)
(*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, B, s);
_GenericInboundDispatch(B, s);
}

// If our SSL handshake had a problem, shut down the connection.
Expand All @@ -567,14 +614,12 @@ void ConnectionDescriptor::_DispatchInboundData (const char *buffer, int size)
_DispatchCiphertext();
}
else {
if (EventCallback)
(*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, buffer, size);
_GenericInboundDispatch(buffer, size);
}
#endif

#ifdef WITHOUT_SSL
if (EventCallback)
(*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, buffer, size);
_GenericInboundDispatch(buffer, size);
#endif
}

Expand Down Expand Up @@ -1323,8 +1368,7 @@ void DatagramDescriptor::Read()
memset (&ReturnAddress, 0, sizeof(ReturnAddress));
memcpy (&ReturnAddress, &sin, slen);

if (EventCallback)
(*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r);
_GenericInboundDispatch(readbuffer, r);

}
else {
Expand Down
5 changes: 5 additions & 0 deletions ext/ed.h
Expand Up @@ -80,6 +80,9 @@ class EventableDescriptor: public Bindable_t
struct epoll_event *GetEpollEvent() { return &EpollEvent; }
#endif

virtual void StartProxy(const char*);
virtual void StopProxy();

private:
bool bCloseNow;
bool bCloseAfterWriting;
Expand All @@ -99,10 +102,12 @@ class EventableDescriptor: public Bindable_t
};

void (*EventCallback)(const char*, int, const char*, int);
void _GenericInboundDispatch(const char*, int);

Int64 CreatedAt;
bool bCallbackUnbind;
int UnbindReasonCode;
char *ProxyTarget;

#ifdef HAVE_EPOLL
struct epoll_event EpollEvent;
Expand Down
6 changes: 5 additions & 1 deletion ext/eventmachine.h
Expand Up @@ -34,7 +34,8 @@ extern "C" {
EM_CONNECTION_NOTIFY_READABLE = 106,
EM_CONNECTION_NOTIFY_WRITABLE = 107,
EM_SSL_HANDSHAKE_COMPLETED = 108,
EM_SSL_VERIFY = 109
EM_SSL_VERIFY = 109,
EM_PROXY_TARGET_UNBOUND = 110

};

Expand Down Expand Up @@ -91,6 +92,9 @@ extern "C" {
const char *evma_watch_pid (int);
void evma_unwatch_pid (const char *sig);

void evma_start_proxy(const char*, const char*);
void evma_stop_proxy(const char*);

int evma_set_rlimit_nofile (int n_files);

void evma_set_epoll (int use);
Expand Down
3 changes: 1 addition & 2 deletions ext/kb.cpp
Expand Up @@ -77,6 +77,5 @@ void KeyboardDescriptor::Read()
{
char c;
read (GetSocket(), &c, 1);
if (EventCallback)
(*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, &c, 1);
_GenericInboundDispatch(&c, 1);
}
3 changes: 1 addition & 2 deletions ext/pipe.cpp
Expand Up @@ -172,8 +172,7 @@ void PipeDescriptor::Read()
// the option to do some things faster. Additionally it's
// a security guard against buffer overflows.
readbuffer [r] = 0;
if (EventCallback)
(*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r);
_GenericInboundDispatch(readbuffer, r);
}
else if (r == 0) {
break;
Expand Down
34 changes: 34 additions & 0 deletions ext/rubymain.cpp
Expand Up @@ -46,6 +46,7 @@ static VALUE Intern_ssl_handshake_completed;
static VALUE Intern_ssl_verify_peer;
static VALUE Intern_notify_readable;
static VALUE Intern_notify_writable;
static VALUE Intern_proxy_target_unbound;

static VALUE rb_cProcStatus;

Expand Down Expand Up @@ -120,6 +121,13 @@ static void event_callback (struct em_event* e)
evma_accept_ssl_peer (a1);
}
#endif
else if (a2 == EM_PROXY_TARGET_UNBOUND) {
VALUE t = rb_ivar_get (EmModule, Intern_at_conns);
VALUE q = rb_hash_aref (t, rb_str_new2(a1));
if (q == Qnil)
rb_raise (EM_eConnectionNotBound, "unknown connection: %s", a1);
rb_funcall (q, Intern_proxy_target_unbound, 0);
}
else
rb_funcall (EmModule, Intern_event_callback, 3, rb_str_new2(a1), (a2 << 1) | 1, rb_str_new(a3,a4));
}
Expand Down Expand Up @@ -867,6 +875,28 @@ static VALUE t_get_loop_time (VALUE self)
}


/*************
t_start_proxy
**************/

static VALUE t_start_proxy (VALUE self, VALUE from, VALUE to)
{
evma_start_proxy(StringValuePtr(from), StringValuePtr(to));
return Qnil;
}


/************
t_stop_proxy
*************/

static VALUE t_stop_proxy (VALUE self, VALUE from)
{
evma_stop_proxy(StringValuePtr(from));
return Qnil;
}


/*********************
Init_rubyeventmachine
*********************/
Expand All @@ -892,6 +922,7 @@ extern "C" void Init_rubyeventmachine()
Intern_ssl_verify_peer = rb_intern ("ssl_verify_peer");
Intern_notify_readable = rb_intern ("notify_readable");
Intern_notify_writable = rb_intern ("notify_writable");
Intern_proxy_target_unbound = rb_intern ("proxy_target_unbound");

// INCOMPLETE, we need to define class Connections inside module EventMachine
// run_machine and run_machine_without_threads are now identical.
Expand Down Expand Up @@ -924,6 +955,9 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "attach_fd", (VALUE (*)(...))t_attach_fd, 3);
rb_define_module_function (EmModule, "detach_fd", (VALUE (*)(...))t_detach_fd, 1);

rb_define_module_function (EmModule, "start_proxy", (VALUE (*)(...))t_start_proxy, 2);
rb_define_module_function (EmModule, "stop_proxy", (VALUE (*)(...))t_stop_proxy, 1);

rb_define_module_function (EmModule, "watch_filename", (VALUE (*)(...))t_watch_filename, 1);
rb_define_module_function (EmModule, "unwatch_filename", (VALUE (*)(...))t_unwatch_filename, 1);

Expand Down
19 changes: 19 additions & 0 deletions lib/em/connection.rb
Expand Up @@ -126,6 +126,25 @@ def ssl_verify_peer(cert)
def unbind
end

# EventMachine::Connection#proxy_target_unbound is called by the reactor after attempting
# to relay incoming data to a descriptor (set as a proxy target descriptor with
# EventMachine::enable_proxy) that has already been closed.
def proxy_target_unbound
end

# EventMachine::Connection#proxy_incoming_to is called only by user code. It sets up
# a low-level proxy relay for all data inbound for this connection, to the connection given
# as the argument. This is essentially just a helper method for enable_proxy.
# See EventMachine::enable_proxy documentation for details.
def proxy_incoming_to(conn)
EventMachine::enable_proxy(self, conn)
end

# Helper method for EventMachine::disable_proxy(self)
def stop_proxying
EventMachine::disable_proxy(self)
end

# EventMachine::Connection#close_connection is called only by user code, and never
# by the event loop. You may call this method against a connection object in any
# callback handler, whether or not the callback was made against the connection
Expand Down
63 changes: 63 additions & 0 deletions lib/eventmachine.rb
Expand Up @@ -1377,6 +1377,69 @@ def self.error_handler cb = nil, &blk
end
end

# enable_proxy allows for direct writing of incoming data back out to another descriptor, at the C++ level in the reactor.
# This is especially useful for proxies where high performance is required. Propogating data from a server response
# all the way up to Ruby, and then back down to the reactor to be sent back to the client, is often unnecessary and
# incurs a significant performance decrease.
#
# The two arguments are Connections, 'from' and 'to'. 'from' is the connection whose inbound data you want
# relayed back out. 'to' is the connection to write it to.
#
# Once you call this method, the 'from' connection will no longer get receive_data callbacks from the reactor,
# except in the case that 'to' connection has already closed when attempting to write to it. You can see
# in the example, that proxy_target_unbound will be called when this occurs. After that, further incoming
# data will be passed into receive_data as normal.
#
# Note also that this feature supports different types of descriptors - TCP, UDP, and pipes. You can relay
# data from one kind to another.
#
# Example:
#
# module ProxyConnection
# def initialize(client, request)
# @client, @request = client, request
# end
#
# def post_init
# EM::enable_proxy(self, @client)
# end
#
# def connection_completed
# send_data @request
# end
#
# def proxy_target_unbound
# close_connection
# end
#
# def unbind
# @client.close_connection_after_writing
# end
# end
#
# module ProxyServer
# def receive_data(data)
# (@buf ||= "") << data
# if @buf =~ /\r\n\r\n/ # all http headers received
# EM.connect("10.0.0.15", 80, ProxyConnection, self, data)
# end
# end
# end
#
# EM.run {
# EM.start_server("127.0.0.1", 8080, ProxyServer)
# }
def self.enable_proxy(from, to)
EM::start_proxy(from.signature, to.signature)
end

# disable_proxy takes just one argument, a Connection that has proxying enabled via enable_proxy.
# Calling this method will remove that functionality and your connection will begin receiving
# data via receive_data again.
def self.disable_proxy(from)
EM::stop_proxy(from.signature)
end

private

def self.event_callback conn_binding, opcode, data # :nodoc:
Expand Down

0 comments on commit eb70e07

Please sign in to comment.