Skip to content
This repository has been archived by the owner on May 17, 2022. It is now read-only.

Commit

Permalink
Add concurrency support for http workers
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisgoffinet committed Aug 15, 2018
1 parent a57dbc7 commit fb512c5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 27 deletions.
1 change: 1 addition & 0 deletions config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
http:
timeout: 10s
concurrency: 25

endpoints:
- name: example1
Expand Down
68 changes: 42 additions & 26 deletions pkg/qflow/qflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Endpoint struct {
Hosts []string
Writer chan interface{}
DurableChannel chan interface{}
WorkerChannel chan *durable.Request
Timeout time.Duration
}

Expand All @@ -32,15 +33,6 @@ type Handler struct {
}

var (
rpcDurations = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "rpc_durations_seconds",
Help: "RPC latency distributions.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"service"},
)

endpointLatencyHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "endpoint_latency_us",
Help: "Endpoint latency distributions in microseconds",
Expand Down Expand Up @@ -68,20 +60,19 @@ var (
})
)

func ReplicateChannel(endpoint *Endpoint) {
// HTTPWorker handles making the remote HTTP calls with a bounded channel concurrency
func HTTPWorker(endpoint *Endpoint) {
var count int
var sizeEndpoints = len(endpoint.Hosts)
var microInNS = time.Microsecond.Nanoseconds()

for {
item := <-endpoint.DurableChannel
req := item.(durable.Request)
count++

if count%1000 == 0 {
log.Debug("processed 1000 operations")
}
defaultRoundTripper := http.DefaultTransport
defaultTransport := defaultRoundTripper.(*http.Transport)
defaultTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} // ignore expired SSL certificates
client := &http.Client{Timeout: endpoint.Timeout, Transport: defaultTransport}

for {
req := <-endpoint.WorkerChannel
r := bytes.NewReader(req.Body)
url := fmt.Sprintf("%s%s", endpoint.Hosts[count%sizeEndpoints], req.URL)
proxyReq, err := http.NewRequest(req.Method, url, r)
Expand All @@ -90,11 +81,6 @@ func ReplicateChannel(endpoint *Endpoint) {
continue
}

defaultRoundTripper := http.DefaultTransport
defaultTransport := defaultRoundTripper.(*http.Transport)
defaultTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} // ignore expired SSL certificates
client := &http.Client{Timeout: endpoint.Timeout, Transport: defaultTransport}

start := time.Now()
endpointRequests.WithLabelValues(endpoint.Name).Inc()
proxyRes, err := client.Do(proxyReq)
Expand All @@ -106,7 +92,7 @@ func ReplicateChannel(endpoint *Endpoint) {
if err != nil {
endpointFailures.WithLabelValues(endpoint.Name).Inc()
log.Debugf("error: %s", err)
endpoint.Writer <- item
endpoint.Writer <- req
continue
}

Expand All @@ -115,6 +101,21 @@ func ReplicateChannel(endpoint *Endpoint) {
}
}

// ReadDiskChannel handles reading from the disk backed channel
func ReadDiskChannel(endpoint *Endpoint) {
var count int
for {
item := <-endpoint.DurableChannel
req := item.(durable.Request)
count++

if count%1000 == 0 {
log.Debug("processed 1000 operations")
}
endpoint.WorkerChannel <- &req
}
}

// HandleRequest handles processing every request sent
func (h *Handler) HandleRequest(w http.ResponseWriter, req *http.Request) {
requests.Inc()
Expand Down Expand Up @@ -142,6 +143,7 @@ func ListenAndServe(config *Config, addr string, dataDir string) {
var ep []Endpoint
var timeout = config.HTTP.Timeout
var maxMsgSize = config.Queue.MaxMessageSize
var concurrency = config.HTTP.Concurrency

if timeout.Seconds() == 0.0 {
timeout = 10 * time.Second
Expand All @@ -151,6 +153,10 @@ func ListenAndServe(config *Config, addr string, dataDir string) {
maxMsgSize = 1024 * 1024 * 10 // 10mb
}

if concurrency == 0 {
concurrency = 25
}

if _, err := os.Stat(dataDir); os.IsNotExist(err) {
log.Infof("creating data directory: %s", dataDir)
err = os.MkdirAll(dataDir, 0755)
Expand All @@ -170,9 +176,14 @@ func ListenAndServe(config *Config, addr string, dataDir string) {
}

log.Infof("registered (%s) with endpoints: [%s]", endpoint.Name, strings.Join(endpoint.Hosts, ","))
log.Infof("config options: (http timeout: %s, maxMsgSize: %d)", timeout, maxMsgSize)
log.Infof("config options: (http timeout: %s, maxMsgSize: %d, concurrency: %d)",
timeout,
maxMsgSize,
concurrency)

writer := make(chan interface{})
worker := make(chan *durable.Request, concurrency)

c := durable.Channel(writer, &durable.Config{
Name: endpoint.Name,
DataPath: dataDir,
Expand All @@ -188,11 +199,16 @@ func ListenAndServe(config *Config, addr string, dataDir string) {
Hosts: endpoint.Hosts,
Writer: writer,
DurableChannel: c,
WorkerChannel: worker,
Timeout: timeout,
}
ep = append(ep, *e)

go ReplicateChannel(e)
for i := 0; i < concurrency; i++ {
go HTTPWorker(e)
}

go ReadDiskChannel(e)

}

Expand Down
3 changes: 2 additions & 1 deletion pkg/qflow/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (

type Config struct {
HTTP struct {
Timeout time.Duration `yaml:"timeout"`
Concurrency int `yaml:"concurrency"`
Timeout time.Duration `yaml:"timeout"`
}
Queue struct {
MaxMessageSize int32 `yaml:"maxMsgSize"`
Expand Down

0 comments on commit fb512c5

Please sign in to comment.