Skip to content
This repository has been archived by the owner on Oct 11, 2023. It is now read-only.

Commit

Permalink
Fixed a data race & added more debug logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxime CORBIN authored and camskkz committed Dec 5, 2018
1 parent 289dc07 commit e8606a3
Show file tree
Hide file tree
Showing 29 changed files with 6,093 additions and 197 deletions.
4 changes: 3 additions & 1 deletion .gitlab-ci.yml
Expand Up @@ -43,19 +43,21 @@ go-build:
- ln -fsv ${CI_PROJECT_DIR} ${GOPATH}/src/github.com/vente-privee/influxdb-relay
script:
- cd ${GOPATH}/src/github.com/vente-privee/influxdb-relay
- go get
- go build -a -ldflags '-extldflags "-static"' -o influxdb-relay


# tests
go-tests:
stage: test
image: vpgrp/golang:latest
image: vpgrp/golang
before_script:
- mkdir -p ${GOPATH}/src/github.com/vente-privee
- ln -fsv ${CI_PROJECT_DIR} ${GOPATH}/src/github.com/vente-privee/influxdb-relay
- go get github.com/stretchr/testify/assert
script:
- cd ${GOPATH}/src/github.com/vente-privee/influxdb-relay
- go get
- go test ./...


Expand Down
7 changes: 4 additions & 3 deletions relay/http.go
Expand Up @@ -42,7 +42,8 @@ type HTTP struct {
logger *log.Logger
}

type relayHandlerFunc func(h *HTTP, w http.ResponseWriter, r *http.Request)

type relayHandlerFunc func(h *HTTP, w http.ResponseWriter, r *http.Request, start time.Time)
type relayMiddleware func(h *HTTP, handlerFunc relayHandlerFunc) relayHandlerFunc

// Default HTTP settings and a few constants
Expand Down Expand Up @@ -170,10 +171,10 @@ func (h *HTTP) Stop() error {
// ServeHTTP is the function that handles the different route
// The response is a JSON object describing the state of the operation
func (h *HTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.start = time.Now()
//h.start = time.Now()

if fun, ok := handlers[r.URL.Path]; ok {
allMiddlewares(h, fun)(h, w, r)
allMiddlewares(h, fun)(h, w, r, time.Now())
} else {
jsonResponse(w, response{http.StatusNotFound, http.StatusText(http.StatusNotFound)})
return
Expand Down
18 changes: 12 additions & 6 deletions relay/http_handlers.go
Expand Up @@ -6,12 +6,13 @@ import (
"log"
"net/http"
"sync"
"time"

"github.com/influxdata/influxdb/models"
"github.com/vente-privee/influxdb-relay/config"
)

func (h *HTTP) handleStatus(w http.ResponseWriter, r *http.Request) {
func (h *HTTP) handleStatus(w http.ResponseWriter, r *http.Request, _ time.Time) {
if r.Method == http.MethodGet || r.Method == http.MethodHead {
st := make(map[string]map[string]string)

Expand All @@ -28,7 +29,7 @@ func (h *HTTP) handleStatus(w http.ResponseWriter, r *http.Request) {
}
}

func (h *HTTP) handlePing(w http.ResponseWriter, r *http.Request) {
func (h *HTTP) handlePing(w http.ResponseWriter, r *http.Request, _ time.Time) {
if r.Method == http.MethodGet || r.Method == http.MethodHead {
for key, value := range h.pingResponseHeaders {
w.Header().Add(key, value)
Expand All @@ -40,7 +41,7 @@ func (h *HTTP) handlePing(w http.ResponseWriter, r *http.Request) {
}
}

func (h *HTTP) handleAdmin(w http.ResponseWriter, r *http.Request) {
func (h *HTTP) handleAdmin(w http.ResponseWriter, r *http.Request, _ time.Time) {
// Client to perform the raw queries
client := http.Client{}

Expand Down Expand Up @@ -136,7 +137,7 @@ func (h *HTTP) handleAdmin(w http.ResponseWriter, r *http.Request) {
}
}

func (h *HTTP) handleStandard(w http.ResponseWriter, r *http.Request) {
func (h *HTTP) handleStandard(w http.ResponseWriter, r *http.Request, start time.Time) {
if r.Method != http.MethodPost {
w.Header().Set("Allow", http.MethodPost)
if r.Method == http.MethodOptions {
Expand All @@ -152,7 +153,7 @@ func (h *HTTP) handleStandard(w http.ResponseWriter, r *http.Request) {
_, _ = bodyBuf.ReadFrom(r.Body)

precision := queryParams.Get("precision")
points, err := models.ParsePointsWithPrecision(bodyBuf.Bytes(), h.start, precision)
points, err := models.ParsePointsWithPrecision(bodyBuf.Bytes(), start, precision)
if err != nil {
putBuf(bodyBuf)
jsonResponse(w, response{http.StatusBadRequest, "unable to parse points"})
Expand Down Expand Up @@ -194,6 +195,7 @@ func (h *HTTP) handleStandard(w http.ResponseWriter, r *http.Request) {
resp, err := b.post(outBytes, query, authHeader)
if err != nil {
log.Printf("Problem posting to relay %q backend %q: %v", h.Name(), b.name, err)
log.Printf("Content: %s", bodyBuf.String())

responses <- &responseData{}
} else {
Expand All @@ -213,6 +215,8 @@ func (h *HTTP) handleStandard(w http.ResponseWriter, r *http.Request) {

var errResponse *responseData

w.Header().Set("Content-Type", "text/plain")

for resp := range responses {

switch resp.StatusCode / 100 {
Expand Down Expand Up @@ -248,7 +252,7 @@ func (h *HTTP) handleStandard(w http.ResponseWriter, r *http.Request) {
}
}

func (h *HTTP) handleProm(w http.ResponseWriter, r *http.Request) {
func (h *HTTP) handleProm(w http.ResponseWriter, r *http.Request, _ time.Time) {
if r.Method != http.MethodPost {
w.Header().Set("Allow", http.MethodPost)
if r.Method == http.MethodOptions {
Expand Down Expand Up @@ -303,6 +307,8 @@ func (h *HTTP) handleProm(w http.ResponseWriter, r *http.Request) {

var errResponse *responseData

w.Header().Set("Content-Type", "text/plain")

for resp := range responses {

switch resp.StatusCode / 100 {
Expand Down

0 comments on commit e8606a3

Please sign in to comment.