Permalink
Browse files

Implement proxy support for Union Station.

  • Loading branch information...
1 parent 1b7d81b commit 2c09250d31997ce9715fd70881b89c92c51b9b82 @FooBarWidget FooBarWidget committed Aug 8, 2011
@@ -282,6 +282,8 @@ DEFINE_SERVER_STR_CONFIG_SETTER(cmd_passenger_temp_dir, tempDir)
DEFINE_SERVER_STR_CONFIG_SETTER(cmd_union_station_gateway_address, unionStationGatewayAddress)
DEFINE_SERVER_INT_CONFIG_SETTER(cmd_union_station_gateway_port, unionStationGatewayPort, int, 1)
DEFINE_SERVER_STR_CONFIG_SETTER(cmd_union_station_gateway_cert, unionStationGatewayCert)
+DEFINE_SERVER_STR_CONFIG_SETTER(cmd_union_station_proxy_address, unionStationProxyAddress)
+DEFINE_SERVER_STR_CONFIG_SETTER(cmd_union_station_proxy_type, unionStationProxyType)
DEFINE_SERVER_STR_CONFIG_SETTER(cmd_passenger_analytics_log_dir, analyticsLogDir)
DEFINE_SERVER_STR_CONFIG_SETTER(cmd_passenger_analytics_log_user, analyticsLogUser)
DEFINE_SERVER_STR_CONFIG_SETTER(cmd_passenger_analytics_log_group, analyticsLogGroup)
@@ -619,6 +621,16 @@ const command_rec passenger_commands[] = {
NULL,
RSRC_CONF,
"The Union Station Gateway certificate."),
+ AP_INIT_TAKE1("UnionStationProxyAddress",
+ (Take1Func) cmd_union_station_proxy_address,
+ NULL,
+ RSRC_CONF,
+ "The address of the proxy that should be used for sending data to Union Station."),
+ AP_INIT_TAKE1("UnionStationProxyType",
+ (Take1Func) cmd_union_station_proxy_type,
+ NULL,
+ RSRC_CONF,
+ "The type of the proxy that should be used for sending data to Union Station."),
AP_INIT_TAKE1("PassengerAnalyticsLogDir",
(Take1Func) cmd_passenger_analytics_log_dir,
NULL,
@@ -399,6 +399,8 @@ struct ServerConfig {
string unionStationGatewayAddress;
int unionStationGatewayPort;
string unionStationGatewayCert;
+ string unionStationProxyAddress;
+ string unionStationProxyType;
/** Directory in which analytics logs should be saved. */
string analyticsLogDir;
@@ -421,7 +423,9 @@ struct ServerConfig {
tempDir = getSystemTempDir();
unionStationGatewayAddress = DEFAULT_UNION_STATION_GATEWAY_ADDRESS;
unionStationGatewayPort = DEFAULT_UNION_STATION_GATEWAY_PORT;
- unionStationGatewayCert = "";
+ unionStationGatewayCert = string();
+ unionStationProxyAddress = string();
+ unionStationProxyType = string();
analyticsLogUser = DEFAULT_ANALYTICS_LOG_USER;
analyticsLogGroup = DEFAULT_ANALYTICS_LOG_GROUP;
analyticsLogPermissions = DEFAULT_ANALYTICS_LOG_PERMISSIONS;
@@ -466,6 +470,13 @@ struct ServerConfig {
"/passenger-analytics-logs." +
username;
}
+
+ if (unionStationProxyType != ""
+ && unionStationProxyType != "http"
+ && unionStationProxyType != "socks5") {
+ throw ConfigurationException(string("The option 'UnionStationProxyType' ") +
+ "may only be set to 'http' or 'socks5'.");
+ }
}
};
@@ -1404,6 +1404,8 @@ class Hooks {
serverConfig.unionStationGatewayAddress,
serverConfig.unionStationGatewayPort,
serverConfig.unionStationGatewayCert,
+ serverConfig.unionStationProxyAddress,
+ serverConfig.unionStationProxyType,
serverConfig.prestartURLs);
analyticsLogger = ptr(new AnalyticsLogger(agentsStarter.getLoggingSocketAddress(),
@@ -61,6 +61,8 @@ agents_starter_start(AgentsStarter *as,
const char *unionStationGatewayAddress,
unsigned short unionStationGatewayPort,
const char *unionStationGatewayCert,
+ const char *unionStationProxyAddress,
+ const char *unionStationProxyType,
const char **prestartURLs, unsigned int prestartURLsCount,
const AfterForkCallback afterFork,
void *callbackArgument,
@@ -91,6 +93,8 @@ agents_starter_start(AgentsStarter *as,
unionStationGatewayAddress,
unionStationGatewayPort,
unionStationGatewayCert,
+ unionStationProxyAddress,
+ unionStationProxyType,
setOfprestartURLs,
afterForkFunctionObject);
return 1;
@@ -61,6 +61,8 @@ int agents_starter_start(AgentsStarter *as,
const char *unionStationGatewayAddress,
unsigned short unionStationGatewayPort,
const char *unionStationGatewayCert,
+ const char *unionStationProxyAddress,
+ const char *unionStationProxyType,
const char **prestartURLs, unsigned int prestartURLsCount,
const AfterForkCallback afterFork,
void *callbackArgument,
@@ -384,6 +384,8 @@ class AgentsStarter {
const string &unionStationGatewayAddress,
unsigned short unionStationGatewayPort,
const string &unionStationGatewayCert,
+ const string &unionStationProxyAddress,
+ const string &unionStationProxyType,
const set<string> &prestartURLs,
const function<void ()> &afterFork = function<void ()>())
{
@@ -425,6 +427,8 @@ class AgentsStarter {
.set ("union_station_gateway_address", unionStationGatewayAddress)
.setInt ("union_station_gateway_port", unionStationGatewayPort)
.set ("union_station_gateway_cert", realUnionStationGatewayCert)
+ .set ("union_station_proxy_address", unionStationProxyAddress)
+ .set ("union_station_proxy_type", unionStationProxyType)
.set ("prestart_urls", serializePrestartURLs(prestartURLs));
SocketPair fds;
@@ -185,7 +185,8 @@ class LoggingServer: public EventedMessageServer {
virtual void dump(ostream &stream) const {
stream << " Log file: file=" << filename << ", "
"opened=" << opened << ", "
- "age=" << long(ev_now(server->getLoop()) - lastUsed) << "\n";
+ "lastUsed=" << long(ev_now(server->getLoop()) - lastUsed) << "s ago, "
+ "lastFlushed=" << long(ev_now(server->getLoop()) - lastFlushed) << "s ago\n";
}
};
@@ -268,7 +269,8 @@ class LoggingServer: public EventedMessageServer {
"node=" << nodeName << ", "
"category=" << category << ", "
"opened=" << opened << ", "
- "age=" << long(ev_now(server->getLoop()) - lastUsed) << ", "
+ "lastUsed=" << long(ev_now(server->getLoop()) - lastUsed) << "s ago, "
+ "lastFlushed=" << long(ev_now(server->getLoop()) - lastFlushed) << "s ago, "
"bufferSize=" << bufferSize <<
"\n";
}
@@ -1206,11 +1208,15 @@ class LoggingServer: public EventedMessageServer {
gid_t gid = GROUP_NOT_GIVEN,
const string &unionStationGatewayAddress = DEFAULT_UNION_STATION_GATEWAY_ADDRESS,
unsigned short unionStationGatewayPort = DEFAULT_UNION_STATION_GATEWAY_PORT,
- const string &unionStationGatewayCert = "")
+ const string &unionStationGatewayCert = "",
+ const string &unionStationProxyAddress = "",
+ const string &unionStationProxyPort = "")
: EventedMessageServer(loop, fd, accountsDatabase),
remoteSender(unionStationGatewayAddress,
unionStationGatewayPort,
- unionStationGatewayCert),
+ unionStationGatewayCert,
+ unionStationProxyAddress,
+ unionStationProxyPort),
garbageCollectionTimer(loop),
sinkFlushingTimer(loop),
exitTimer(loop)
@@ -162,7 +162,9 @@ main(int argc, char *argv[]) {
false, DEFAULT_UNION_STATION_GATEWAY_ADDRESS);
int unionStationGatewayPort = options.getInt("union_station_gateway_port",
false, DEFAULT_UNION_STATION_GATEWAY_PORT);
- string unionStationGatewayCert = options.get("union_station_gateway_cert", false);
+ string unionStationGatewayCert = options.get("union_station_gateway_cert", false);
+ string unionStationProxyAddress = options.get("union_station_proxy_address", false);
+ string unionStationProxyType = options.get("union_station_proxy_type", false);
curl_global_init(CURL_GLOBAL_ALL);
@@ -251,7 +253,9 @@ main(int argc, char *argv[]) {
"u=rwx,g=rx,o=rx", GROUP_NOT_GIVEN,
unionStationGatewayAddress,
unionStationGatewayPort,
- unionStationGatewayCert);
+ unionStationGatewayCert,
+ unionStationProxyAddress,
+ unionStationProxyType);
loggingServer = &server;
@@ -276,6 +280,7 @@ main(int argc, char *argv[]) {
/********** Initialized! Enter main loop... **********/
+ P_DEBUG("Logging agent online, listening at " << socketAddress);
ev_loop(eventLoop, 0);
return exitCode;
} catch (const tracable_exception &e) {
@@ -72,6 +72,8 @@ class RemoteSender {
string ip;
unsigned short port;
string certificate;
+ string proxyAddress;
+ string proxyType;
CURL *curl;
struct curl_slist *headers;
@@ -103,6 +105,16 @@ class RemoteSender {
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlDataReceived);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
+ if (!proxyAddress.empty()) {
+ curl_easy_setopt(curl, CURLOPT_PROXY, proxyAddress.c_str());
+ if (proxyType.empty() || proxyType == "http") {
+ curl_easy_setopt(curl, CURLOPT_PROXYTYPE, CURLPROXY_HTTP);
+ } else if (proxyType == "socks5") {
+ curl_easy_setopt(curl, CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5);
+ } else {
+ throw RuntimeException("Only 'http' and 'socks5' proxies are supported.");
+ }
+ }
if (certificate.empty()) {
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0);
} else {
@@ -129,10 +141,14 @@ class RemoteSender {
}
public:
- Server(const string &ip, const string &hostName, unsigned short port, const string &cert) {
+ Server(const string &ip, const string &hostName, unsigned short port, const string &cert,
+ const string &proxyAddress, const string &proxyType)
+ {
this->ip = ip;
this->port = port;
certificate = cert;
+ this->proxyAddress = proxyAddress;
+ this->proxyType = proxyType;
hostHeader = "Host: " + hostName;
headers = NULL;
@@ -245,6 +261,8 @@ class RemoteSender {
string gatewayAddress;
unsigned short gatewayPort;
string certificate;
+ string proxyAddress;
+ string proxyType;
BlockingQueue<Item> queue;
oxt::thread *thr;
@@ -298,7 +316,8 @@ class RemoteSender {
servers.clear();
for (it = ips.begin(); it != ips.end(); it++) {
- ServerPtr server(new Server(*it, gatewayAddress, gatewayPort, certificate));
+ ServerPtr server = make_shared<Server>(*it, gatewayAddress, gatewayPort,
+ certificate, proxyAddress, proxyType);
if (server->ping()) {
servers.push_back(server);
} else {
@@ -418,12 +437,15 @@ class RemoteSender {
}
public:
- RemoteSender(const string &gatewayAddress, unsigned short gatewayPort, const string &certificate)
+ RemoteSender(const string &gatewayAddress, unsigned short gatewayPort, const string &certificate,
+ const string &proxyAddress, const string &proxyType)
: queue(1024)
{
this->gatewayAddress = gatewayAddress;
this->gatewayPort = gatewayPort;
this->certificate = certificate;
+ this->proxyAddress = proxyAddress;
+ this->proxyType = proxyType;
thr = new oxt::thread(
boost::bind(&RemoteSender::threadMain, this),
"RemoteSender thread",
@@ -57,6 +57,11 @@ static ngx_path_init_t ngx_http_proxy_temp_path = {
};
+static int
+ngx_str_equals(ngx_str_t *str, const char *value) {
+ return ngx_memn2cmp(str->data, (u_char *) value, str->len, strlen(value)) == 0;
+}
+
void *
passenger_create_main_conf(ngx_conf_t *cf)
{
@@ -92,6 +97,10 @@ passenger_create_main_conf(ngx_conf_t *cf)
conf->union_station_gateway_port = (ngx_uint_t) NGX_CONF_UNSET;
conf->union_station_gateway_cert.data = NULL;
conf->union_station_gateway_cert.len = 0;
+ conf->union_station_proxy_address.data = NULL;
+ conf->union_station_proxy_address.len = 0;
+ conf->union_station_proxy_type.data = NULL;
+ conf->union_station_proxy_type.len = 0;
conf->prestart_uris = ngx_array_create(cf->pool, 1, sizeof(ngx_str_t));
if (conf->prestart_uris == NULL) {
@@ -234,6 +243,18 @@ passenger_init_main_conf(ngx_conf_t *cf, void *conf_pointer)
conf->union_station_gateway_cert.data = (u_char *) "";
}
+ if (conf->union_station_proxy_address.len == 0) {
+ conf->union_station_proxy_address.data = (u_char *) "";
+ }
+
+ if (conf->union_station_proxy_type.len == 0) {
+ conf->union_station_proxy_type.data = (u_char *) "";
+
+ } else if (!ngx_str_equals(&conf->union_station_proxy_type, "http")
+ && !ngx_str_equals(&conf->union_station_proxy_type, "socks5")) {
+ return "union_station_proxy_type may only be 'http' or 'socks5'.";
+ }
+
return NGX_CONF_OK;
}
@@ -1145,6 +1166,20 @@ const ngx_command_t passenger_commands[] = {
offsetof(passenger_main_conf_t, union_station_gateway_cert),
NULL },
+ { ngx_string("union_station_proxy_address"),
+ NGX_HTTP_MAIN_CONF | NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_HTTP_MAIN_CONF_OFFSET,
+ offsetof(passenger_main_conf_t, union_station_proxy_address),
+ NULL },
+
+ { ngx_string("union_station_proxy_type"),
+ NGX_HTTP_MAIN_CONF | NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_HTTP_MAIN_CONF_OFFSET,
+ offsetof(passenger_main_conf_t, union_station_proxy_type),
+ NULL },
+
{ ngx_string("union_station_filter"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_HTTP_LIF_CONF | NGX_CONF_TAKE1,
union_station_filter,
@@ -80,6 +80,8 @@ typedef struct {
ngx_str_t union_station_gateway_address;
ngx_uint_t union_station_gateway_port;
ngx_str_t union_station_gateway_cert;
+ ngx_str_t union_station_proxy_address;
+ ngx_str_t union_station_proxy_type;
ngx_array_t *prestart_uris;
} passenger_main_conf_t;
@@ -239,6 +239,8 @@ start_helper_server(ngx_cycle_t *cycle) {
char *analytics_log_permissions;
char *union_station_gateway_address;
char *union_station_gateway_cert;
+ char *union_station_proxy_address;
+ char *union_station_proxy_type;
char *error_message = NULL;
core_conf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
@@ -256,6 +258,8 @@ start_helper_server(ngx_cycle_t *cycle) {
analytics_log_permissions = ngx_str_null_terminate(&passenger_main_conf.analytics_log_permissions);
union_station_gateway_address = ngx_str_null_terminate(&passenger_main_conf.union_station_gateway_address);
union_station_gateway_cert = ngx_str_null_terminate(&passenger_main_conf.union_station_gateway_cert);
+ union_station_proxy_address = ngx_str_null_terminate(&passenger_main_conf.union_station_proxy_address);
+ union_station_proxy_type = ngx_str_null_terminate(&passenger_main_conf.union_station_proxy_type);
prestart_uris = (ngx_str_t *) passenger_main_conf.prestart_uris->elts;
prestart_uris_ary = calloc(sizeof(char *), passenger_main_conf.prestart_uris->nelts);
@@ -284,6 +288,8 @@ start_helper_server(ngx_cycle_t *cycle) {
union_station_gateway_address,
passenger_main_conf.union_station_gateway_port,
union_station_gateway_cert,
+ union_station_proxy_address,
+ union_station_proxy_type,
(const char **) prestart_uris_ary, passenger_main_conf.prestart_uris->nelts,
starting_helper_server_after_fork,
cycle,
@@ -355,6 +361,8 @@ start_helper_server(ngx_cycle_t *cycle) {
free(analytics_log_permissions);
free(union_station_gateway_address);
free(union_station_gateway_cert);
+ free(union_station_proxy_address);
+ free(union_station_proxy_type);
free(error_message);
if (prestart_uris_ary != NULL) {
for (i = 0; i < passenger_main_conf.prestart_uris->nelts; i++) {

0 comments on commit 2c09250

Please sign in to comment.