Skip to content

Commit

Permalink
Improve upstream scaffolding and DNS resolving
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Oct 14, 2015
1 parent abad3ee commit a44ed8a
Show file tree
Hide file tree
Showing 15 changed files with 221 additions and 110 deletions.
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Application struct {
cacheZones map[string]*types.CacheZone

// A map with all simple and advanced upstream transports
upstreams map[string]http.RoundTripper
upstreams map[string]types.Upstream

// The default logger
logger types.Logger
Expand Down
6 changes: 3 additions & 3 deletions app/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (a *Application) initFromConfig() (err error) {
// Make the vhost and cacheZone maps
a.virtualHosts = make(map[string]*VirtualHost)
a.cacheZones = make(map[string]*types.CacheZone)
a.upstreams = make(map[string]http.RoundTripper)
a.upstreams = make(map[string]types.Upstream)

// Create a global application context
a.ctx, a.ctxCancel = context.WithCancel(context.Background())
Expand All @@ -48,7 +48,7 @@ func (a *Application) initFromConfig() (err error) {

// Initialize all advanced upstreams
for _, cfgUp := range a.cfg.HTTP.Upstreams {
if a.upstreams[cfgUp.ID], err = upstream.New(cfgUp); err != nil {
if a.upstreams[cfgUp.ID], err = upstream.New(cfgUp, a.logger); err != nil {
return err
}
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func (a *Application) initCacheZone(cfgCz *config.CacheZone) (err error) {
return nil
}

func (a *Application) getUpstream(upID string) (http.RoundTripper, error) {
func (a *Application) getUpstream(upID string) (types.Upstream, error) {
if upID == "" {
return nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions config/section_upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Upstream struct {
// UpstreamSettings contains all possible upstream settings.
type UpstreamSettings struct {
MaxConnectionsPerServer uint32 `json:"max_connections_per_server"`
UseIPv4 bool `json:"use_ipv4"`
UseIPv6 bool `json:"use_ipv6"`
//!TODO: add settings for timeouts, keep-alives, retries, etc.
}

Expand Down Expand Up @@ -90,6 +92,8 @@ func (addr *UpstreamAddress) UnmarshalJSON(buff []byte) error {
func GetDefaultUpstreamSettings() UpstreamSettings {
return UpstreamSettings{
MaxConnectionsPerServer: 0, // Unlimited connection number by default
UseIPv4: true,
UseIPv6: false,
//!TODO: add settings for timeouts, keep-alives, retries, etc.
}
}
119 changes: 71 additions & 48 deletions handler/proxy/impl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxy

import (
"fmt"
"io"
"net"
"net/http"
Expand All @@ -20,13 +21,14 @@ import (
// sends it to another server, proxying the response back to the
// client.
type ReverseProxy struct {
// The transport used to perform proxy requests.
// If nil, http.DefaultTransport is used.
Transport http.RoundTripper
// The transport used to perform upstream requests.
Upstream types.Upstream

// Logger specifies a logger for errors that occur when attempting
// to proxy the request.
Logger types.Logger

Settings Settings
}

// RequestHandle implements the nedomi interface
Expand All @@ -37,10 +39,6 @@ func (p *ReverseProxy) RequestHandle(_ context.Context, w http.ResponseWriter, r
// Hop-by-hop headers. These are removed when sent to the backend.
var hopHeaders = httputils.GetHopByHopHeaders()

type requestCanceler interface {
CancelRequest(*http.Request)
}

type runOnFirstRead struct {
io.Reader // optional; nil means empty body

Expand All @@ -58,53 +56,78 @@ func (c *runOnFirstRead) Read(bs []byte) (int, error) {
return c.Reader.Read(bs)
}

func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
transport := p.Transport
if transport == nil {
transport = http.DefaultTransport
}

func (p *ReverseProxy) getOutRequest(req *http.Request) (*http.Request, error) {
outreq := new(http.Request)
*outreq = *req // includes shallow copies of maps, but okay
httputils.CopyHeadersWithout(req.Header, outreq.Header, hopHeaders...)
*outreq = *req
url := *req.URL
outreq.URL = &url

if closeNotifier, ok := rw.(http.CloseNotifier); ok {
if requestCanceler, ok := transport.(requestCanceler); ok {
reqDone := make(chan struct{})
defer close(reqDone)

clientGone := closeNotifier.CloseNotify()

outreq.Body = struct {
io.Reader
io.Closer
}{
Reader: &runOnFirstRead{
Reader: outreq.Body,
fn: func() {
go func() {
select {
case <-clientGone:
requestCanceler.CancelRequest(outreq)
case <-reqDone:
}
}()
},
},
Closer: outreq.Body,
}
}
}
outreq.Header = http.Header{}
httputils.CopyHeadersWithout(req.Header, outreq.Header, hopHeaders...)
outreq.Header.Set("User-Agent", p.Settings.UserAgent) // If we don't set it, Go sets it for us to something stupid...

outreq.Proto = "HTTP/1.1"
outreq.ProtoMajor = 1
outreq.ProtoMinor = 1
outreq.Close = false
// If we don't set it, Go sets it for us to something stupid...
outreq.Header.Set("User-Agent", "nedomi") //!TODO: get user-agent from config

//!TODO: get host value from config
//outreq.Host = l.UpstreamAddress.Host
upAddr, err := p.Upstream.GetAddress(req.URL.RequestURI())
if err != nil {
return nil, fmt.Errorf("[%p] Proxy handler could not get an upstream address: %v", req, err)
}
outreq.URL.Scheme = upAddr.ResolvedURL.Scheme
outreq.URL.Host = upAddr.ResolvedURL.Host

// Set the correct host
if p.Settings.HostHeader != "" {
outreq.Host = p.Settings.HostHeader
} else if p.Settings.HostHeaderKeepOriginal {
if req.Host != "" {
outreq.Host = req.Host
} else {
outreq.Host = req.URL.Host
}
} else {
outreq.Host = upAddr.URL.Host
}

return outreq, nil
}

func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {

outreq, err := p.getOutRequest(req)
if err != nil {
p.Logger.Error(err)
http.Error(rw, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}

if closeNotifier, ok := rw.(http.CloseNotifier); ok {
reqDone := make(chan struct{})
defer close(reqDone)

clientGone := closeNotifier.CloseNotify()

outreq.Body = struct {
io.Reader
io.Closer
}{
Reader: &runOnFirstRead{
Reader: outreq.Body,
fn: func() {
go func() {
select {
case <-clientGone:
p.Upstream.CancelRequest(outreq)
case <-reqDone:
}
}()
},
},
Closer: outreq.Body,
}
}

if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
// If we aren't the first proxy retain prior
Expand All @@ -116,10 +139,10 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
outreq.Header.Set("X-Forwarded-For", clientIP)
}

res, err := transport.RoundTrip(outreq)
res, err := p.Upstream.RoundTrip(outreq)
if err != nil {
p.Logger.Logf("[%p] Proxy error: %v", req, err)
rw.WriteHeader(http.StatusInternalServerError)
http.Error(rw, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}

Expand Down
20 changes: 18 additions & 2 deletions handler/proxy/new.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxy

import (
"encoding/json"
"fmt"

"github.com/ironsmile/nedomi/config"
Expand All @@ -9,6 +10,9 @@ import (

// Settings contains the possible settings for the proxy
type Settings struct {
UserAgent string `json:"user_agent"`
HostHeader string `json:"host_header"`
HostHeaderKeepOriginal bool `json:"host_header_keep_original"`
}

// New returns a configured and ready to use Upstream instance.
Expand All @@ -17,9 +21,21 @@ func New(cfg *config.Handler, l *types.Location, next types.RequestHandler) (*Re
return nil, fmt.Errorf("No upstream set for proxy handler in %s", l.Name)
}

s := Settings{
UserAgent: "nedomi",
}

if len(cfg.Settings) != 0 {
if err := json.Unmarshal(cfg.Settings, &s); err != nil {
return nil, fmt.Errorf("handler.proxy got error while parsing settings: %s", err)
}
}

//!TODO: record statistics (times, errors, etc.) for all requests

return &ReverseProxy{
Transport: l.Upstream,
Logger: l.Logger,
Upstream: l.Upstream,
Logger: l.Logger,
Settings: s,
}, nil
}
3 changes: 1 addition & 2 deletions types/location.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package types

import (
"net/http"
"net/url"
"time"
)
Expand All @@ -14,7 +13,7 @@ type Location struct {
CacheDefaultDuration time.Duration
CacheKeyIncludesQuery bool
Cache *CacheZone //!TODO: move to the cache handler settings (plus all Cache* settings)
Upstream http.RoundTripper
Upstream Upstream
Logger Logger
}

Expand Down
13 changes: 13 additions & 0 deletions types/upstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package types

import "net/http"

// Upstream represents an object that is used by the proxy handler for making
// requests to the configured upstream server or servers.
type Upstream interface {
http.RoundTripper

CancelRequest(*http.Request)

GetAddress(string) (*UpstreamAddress, error)
}
12 changes: 4 additions & 8 deletions types/upstream_address.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package types

import (
"net"
"net/url"
)
import "net/url"

// UpstreamAddress represents a resolved upstream address
type UpstreamAddress struct {
URL *url.URL
IP net.IP
Port int
Weight float64
URL *url.URL
ResolvedURL *url.URL
Weight float64
}
4 changes: 2 additions & 2 deletions upstream/balancing/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "github.com/ironsmile/nedomi/types"
// requests between the set of upstream addresses according to the specific
// algorithm .
type Algorithm interface {
Set([]types.UpstreamAddress)
Set([]*types.UpstreamAddress)

Get(string) types.UpstreamAddress
Get(string) (*types.UpstreamAddress, error)
}
13 changes: 9 additions & 4 deletions upstream/balancing/random/impl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package random

import (
"errors"
"math/rand"
"sync"
"time"
Expand All @@ -11,22 +12,26 @@ import (
// Random randomly balances requests between its upstreams.
type Random struct {
sync.RWMutex
buckets []types.UpstreamAddress
buckets []*types.UpstreamAddress
rnd *rand.Rand
}

// Set implements the balancing algorithm interface.
func (r *Random) Set(buckets []types.UpstreamAddress) {
func (r *Random) Set(buckets []*types.UpstreamAddress) {
r.Lock()
defer r.Unlock()
r.buckets = buckets
}

// Get implements the balancing algorithm interface.
func (r *Random) Get(_ string) types.UpstreamAddress {
func (r *Random) Get(_ string) (*types.UpstreamAddress, error) {
r.RLock()
defer r.RUnlock()
return r.buckets[r.rnd.Intn(len(r.buckets))]
if len(r.buckets) == 0 {
return nil, errors.New("No upstream addresses set!")
}

return r.buckets[r.rnd.Intn(len(r.buckets))], nil
}

// New creates a new random upstream balancer.
Expand Down
6 changes: 2 additions & 4 deletions upstream/connection_limiter.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package upstream

import "net/http"

// NewConnectionLimiter creates a wrapper around the supplied RoundTripper that
// newConnectionLimiter creates a wrapper around the supplied RoundTripper that
// restricts the maximum number of concurrent requests through it.
func NewConnectionLimiter(base http.RoundTripper, limit uint32) http.RoundTripper {
func newConnectionLimiter(base upTransport, limit uint32) upTransport {
//!TODO: implement :)

return base
Expand Down
Loading

0 comments on commit a44ed8a

Please sign in to comment.