Skip to content
This repository has been archived by the owner on Sep 14, 2019. It is now read-only.

Commit

Permalink
[medium/bug] fixed couple issues in agent proxy support
Browse files Browse the repository at this point in the history
  • Loading branch information
jvehent committed Jul 20, 2014
1 parent cc0658b commit bf080b1
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/mig/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func runAgent(foreground bool) (err error) {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Init failed: '%v'", err)}.Err()
if foreground {
// if in foreground mode, don't retry, just panic
time.Sleep(1 * time.Second)
panic(err)
}
if ctx.Agent.Respawn {
Expand Down
53 changes: 43 additions & 10 deletions src/mig/agent/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import (
"runtime"
"time"

"bufio"

"bitbucket.org/kardianos/osext"
"github.com/streadway/amqp"
)
Expand Down Expand Up @@ -349,6 +351,7 @@ func initMQ(orig_ctx Context, try_proxy bool) (ctx Context, err error) {
if err != nil {
panic(err)
}
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("AMQP: host=%s, port=%d, vhost=%s", amqp_uri.Host, amqp_uri.Port, amqp_uri.Vhost)}.Debug()
if amqp_uri.Scheme == "amqps" {
ctx.MQ.UseTLS = true
}
Expand All @@ -357,34 +360,62 @@ func initMQ(orig_ctx Context, try_proxy bool) (ctx Context, err error) {
var dialConfig amqp.Config
dialConfig.Heartbeat = 2 * ctx.Sleeper
if try_proxy {
panic("no proxy support available")
dialConfig.Dial = func(network, addr string) (net.Conn, error) {
// if in try_proxy mode, the agent will try to connect to the relay using a CONNECT proxy
// but because CONNECT is a HTTP method, not available in AMQP, we need to establish
// that connection ourselves.
//
// the code below looks for HTTP(S)_PROXY environment variables and, if present, tries
// to connect to the relay through the proxy, and returns the net.Conn pointer back
ctx.Channels.Log <- mig.Log{Desc: "Attempting connection to relay using environment proxies"}.Debug()
dialConfig.Dial = func(network, addr string) (conn net.Conn, err error) {
// make a fake request object to get the proxy from env
target := "http://" + amqp_uri.Host + ":" + fmt.Sprintf("%d", amqp_uri.Port) + amqp_uri.Vhost
target := "http://" + addr
req, err := http.NewRequest("GET", target, nil)
if err != nil {
return nil, err
return
}
proxy_url, err := http.ProxyFromEnvironment(req)
if err != nil {
return nil, err
return
}
if proxy_url == nil {
err = fmt.Errorf("Failed to find a suitable proxy in environment")
return
}
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Found proxy at %s", proxy_url.Host)}.Debug()
// connect to the proxy
conn, err := net.DialTimeout("tcp", proxy_url.Host, 2*ctx.Sleeper)
conn, err = net.DialTimeout("tcp", proxy_url.Host, 5*time.Second)
if err != nil {
return nil, err
return
}
// write a CONNECT request in the tcp connection
fmt.Fprintf(conn, "CONNECT "+req.Host+" HTTP/1.1\r\nHost: "+req.Host+"\r\n\r\n")
return conn, nil
// verify status is 200, and flush the buffer
status, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
return
}
if status == "" || len(status) < 12 {
err = fmt.Errorf("Invalid status received from proxy: '%s'", status[0:len(status)-1])
return
}
// 9th character in response should be "2"
// HTTP/1.0 200 Connection established
// ^
if status[9] != '2' {
err = fmt.Errorf("Invalid status received from proxy: '%s'", status[0:len(status)-1])
return
}
return
}
} else {
dialConfig.Dial = func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 2*ctx.Sleeper)
return net.DialTimeout(network, addr, 5*time.Second)
}
}

if ctx.MQ.UseTLS {
ctx.Channels.Log <- mig.Log{Desc: "Loading AMQPS TLS parameters"}.Debug()
// import the client certificates
cert, err := tls.X509KeyPair([]byte(AGENTCERT), []byte(AGENTKEY))
if err != nil {
Expand All @@ -403,9 +434,11 @@ func initMQ(orig_ctx Context, try_proxy bool) (ctx Context, err error) {

dialConfig.TLSClientConfig = &TLSconfig
}
// Open a non-encrypted AMQP connection
// Open AMQP connection
ctx.Channels.Log <- mig.Log{Desc: "Establishing connection to relay"}.Debug()
ctx.MQ.conn, err = amqp.DialConfig(AMQPBROKER, dialConfig)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: "Connection failed"}.Debug()
panic(err)
}

Expand Down

0 comments on commit bf080b1

Please sign in to comment.