Skip to content
This repository has been archived by the owner on May 17, 2022. It is now read-only.

Commit

Permalink
Added more logging, host validation, and default timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisgoffinet committed Aug 15, 2018
1 parent 117b201 commit 594d835
Showing 1 changed file with 39 additions and 7 deletions.
46 changes: 39 additions & 7 deletions pkg/qflow/qflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"io/ioutil"
"net/http"
_ "net/http/pprof"
"net/url"
"os"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -19,6 +21,7 @@ type Endpoint struct {
Hosts []string
Writer chan interface{}
DurableChannel chan interface{}
Timeout time.Duration
}

type Handler struct {
Expand All @@ -35,22 +38,21 @@ func ReplicateChannel(endpoint *Endpoint) {
count++

if count%1000 == 0 {
log.Debug("Processed batch of 1000")
log.Debug("processed 1000 operations")
}

r := bytes.NewReader(req.Body)
url := fmt.Sprintf("%s%s", endpoint.Hosts[count%sizeEndpoints], req.URL)
proxyReq, err := http.NewRequest(req.Method, url, r)
if err != nil {
fmt.Printf("error: %s\n", err)
log.Debugf("error: %s\n", err)
continue
}

timeout := time.Duration(5 * time.Second)
client := &http.Client{Timeout: timeout}
client := &http.Client{Timeout: endpoint.Timeout}
proxyRes, err := client.Do(proxyReq)
if err != nil {
fmt.Printf("error: %s\n", err)
log.Debugf("error: %s\n", err)
endpoint.Writer <- item
continue
}
Expand All @@ -64,7 +66,7 @@ func (h *Handler) HandleRequest(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()

if err != nil {
log.Printf("Error reading body: %v", err)
log.Debugf("error reading body: %v", err)
http.Error(w, "can't read body", http.StatusBadRequest)
return
}
Expand All @@ -77,15 +79,29 @@ func (h *Handler) HandleRequest(w http.ResponseWriter, req *http.Request) {

func ListenAndServe(config *Config, addr string, dataDir string) {
var ep []Endpoint
var timeout = config.HTTP.Timeout

if timeout.Seconds() == 0.0 {
timeout = 10 * time.Second
}

if _, err := os.Stat(dataDir); os.IsNotExist(err) {
log.Infof("creating data directory: %s", dataDir)
err = os.MkdirAll(dataDir, 0755)
if err != nil {
log.Fatal(err)
}
}

for _, endpoint := range config.Endpoints {
for _, host := range endpoint.Hosts {
if !isValidUrl(host) {
log.Fatalf("(%s) [%s] is not a valid endpoint url", endpoint.Name, host)
}
}

log.Infof("registered (%s) with endpoints: [%s]", endpoint.Name, strings.Join(endpoint.Hosts, ","))
log.Infof("config options: (http timeout: %s)", timeout)

writer := make(chan interface{})
c := durable.Channel(writer, &durable.Config{
Expand All @@ -103,6 +119,7 @@ func ListenAndServe(config *Config, addr string, dataDir string) {
Hosts: endpoint.Hosts,
Writer: writer,
DurableChannel: c,
Timeout: timeout,
}
ep = append(ep, *e)

Expand All @@ -112,6 +129,21 @@ func ListenAndServe(config *Config, addr string, dataDir string) {

handler := &Handler{Endpoints: ep}
http.HandleFunc("/", handler.HandleRequest)
log.Printf("Listening on %s", addr)
log.Printf("listening on %s", addr)
log.Fatal(http.ListenAndServe(addr, nil))
}

// isValidUrl handles checking if a url is valid
func isValidUrl(s string) bool {
url, err := url.ParseRequestURI(s)

if url.Scheme != "http" && url.Scheme != "https" {
return false
}

if err != nil {
return false
} else {
return true
}
}

0 comments on commit 594d835

Please sign in to comment.