Skip to content

Commit

Permalink
fixup! refactor: gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Aug 8, 2023
1 parent a493f43 commit bf88c80
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 30 deletions.
3 changes: 1 addition & 2 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,9 +1445,8 @@ func (gateway *Handle) getWorkspaceID(writeKey string) string {

type mockRequestHandler struct{}

func (mockRequestHandler) ProcessRequest(gateway *Handle, w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string {
func (mockRequestHandler) ProcessRequest(w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string {
// deepsource ignore: Unused method arguments
_ = gateway
_ = w
_ = r
_ = reqType
Expand Down
5 changes: 2 additions & 3 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,9 @@ func (gateway *Handle) printStats(ctx context.Context) {
}
}

// getPayloadFromRequest reads the request body and returns the payload's bytes or an error if the payload cannot be read
// ProcessWebRequest is an Interface wrapper for webhook
// ProcessWebRequest is an interface wrapper for webhook
func (gateway *Handle) ProcessWebRequest(w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string {
return gateway.rrh.ProcessRequest(gateway, w, r, reqType, payload, writeKey)
return gateway.rrh.ProcessRequest(w, r, reqType, payload, writeKey)

Check warning on line 587 in gateway/handle.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle.go#L586-L587

Added lines #L586 - L587 were not covered by tests
}

// getPayloadAndWriteKey reads the request body and returns the payload's bytes and write key or an error if the payload cannot be read
Expand Down
9 changes: 6 additions & 3 deletions gateway/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (gateway *Handle) webRequestHandler(rh RequestHandler, w http.ResponseWrite
errorMessage = err.Error()
return
}
errorMessage = rh.ProcessRequest(gateway, &w, r, reqType, payload, writeKey)
errorMessage = rh.ProcessRequest(&w, r, reqType, payload, writeKey)
atomic.AddUint64(&gateway.ackCount, 1)
gateway.TrackRequestMetrics(errorMessage)
if errorMessage != "" {
Expand All @@ -60,15 +60,18 @@ func (gateway *Handle) pixelWebRequestHandler(rh RequestHandler, w http.Response
var errorMessage string
defer func() {
if errorMessage != "" {
gateway.logger.Info(fmt.Sprintf("IP: %s -- %s -- Error while handling request: %s", misc.GetIPFromReq(r), r.URL.Path, errorMessage))
gateway.logger.Infow("Error while handling request",
"ip", misc.GetIPFromReq(r),
"path", r.URL.Path,
"error", errorMessage)
}

Check warning on line 67 in gateway/handle_http.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle_http.go#L56-L67

Added lines #L56 - L67 were not covered by tests
}()
payload, writeKey, err := gateway.getPayloadAndWriteKey(w, r, reqType)
if err != nil {
errorMessage = err.Error()
return
}
errorMessage = rh.ProcessRequest(gateway, &w, r, reqType, payload, writeKey)
errorMessage = rh.ProcessRequest(&w, r, reqType, payload, writeKey)

atomic.AddUint64(&gateway.ackCount, 1)
gateway.TrackRequestMetrics(errorMessage)

Check warning on line 77 in gateway/handle_http.go

View check run for this annotation

Codecov / codecov/patch

gateway/handle_http.go#L69-L77

Added lines #L69 - L77 were not covered by tests
Expand Down
4 changes: 2 additions & 2 deletions gateway/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func (gateway *Handle) Setup(
gateway.netHandle = &http.Client{Transport: &http.Transport{}, Timeout: gateway.conf.httpTimeout}
gateway.userWorkerBatchRequestQ = make(chan *userWorkerBatchRequestT, gateway.conf.maxDBBatchSize)
gateway.batchUserWorkerBatchRequestQ = make(chan *batchUserWorkerBatchRequestT, gateway.conf.maxDBWriterProcess)
gateway.irh = &ImportRequestHandler{}
gateway.rrh = &RegularRequestHandler{}
gateway.irh = &ImportRequestHandler{Handle: gateway}
gateway.rrh = &RegularRequestHandler{Handle: gateway}
gateway.webhookHandler = webhook.Setup(gateway, gateway.stats)
whURL, err := url.ParseRequestURI(misc.GetWarehouseURL())
if err != nil {
Expand Down
10 changes: 6 additions & 4 deletions gateway/import_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import (
)

// ImportRequestHandler is an empty struct to capture import specific request handling functionality
type ImportRequestHandler struct{}
type ImportRequestHandler struct {
*Handle
}

// ProcessRequest on ImportRequestHandler splits payload by user and throws them into the webrequestQ and waits for all their responses before returning
func (*ImportRequestHandler) ProcessRequest(gateway *Handle, w *http.ResponseWriter, r *http.Request, _ string, payload []byte, writeKey string) string {
usersPayload, payloadError := gateway.getUsersPayload(payload)
func (irh *ImportRequestHandler) ProcessRequest(w *http.ResponseWriter, r *http.Request, _ string, payload []byte, writeKey string) string {
usersPayload, payloadError := irh.getUsersPayload(payload)
if payloadError != nil {
return payloadError.Error()
}
count := len(usersPayload)
done := make(chan string, count)
for key := range usersPayload {
gateway.addToWebRequestQ(w, r, done, "batch", usersPayload[key], writeKey)
irh.addToWebRequestQ(w, r, done, "batch", usersPayload[key], writeKey)
}

var interimMsgs []string
Expand Down
12 changes: 7 additions & 5 deletions gateway/regular_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ import (
)

// RegularRequestHandler is an empty struct to capture non-import specific request handling functionality
type RegularRequestHandler struct{}
type RegularRequestHandler struct {
*Handle
}

// ProcessRequest throws a webRequest into the queue and waits for the response before returning
func (*RegularRequestHandler) ProcessRequest(gateway *Handle, w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string {
func (rrh *RegularRequestHandler) ProcessRequest(w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string {
done := make(chan string, 1)
start := time.Now()
gateway.addToWebRequestQ(w, r, done, reqType, payload, writeKey)
gateway.addToWebRequestQWaitTime.SendTiming(time.Since(start))
defer gateway.processRequestTime.Since(start)
rrh.addToWebRequestQ(w, r, done, reqType, payload, writeKey)
rrh.addToWebRequestQWaitTime.SendTiming(time.Since(start))
defer rrh.processRequestTime.Since(start)
errorMessage := <-done
return errorMessage
}
20 changes: 9 additions & 11 deletions gateway/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ import (

// RequestHandler interface for abstracting out server-side import request processing and rest of the calls
type RequestHandler interface {
ProcessRequest(gateway *Handle, w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string
ProcessRequest(w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string
}

/*
Basic WebRequest unit.
Contains some payload, could be of several types(batch, identify, track etc.)
has a `done` channel that receives a response(error if any)
*/
// webRequestT acts as a basic unit for web requests.
// Contains some payload, could be of several types(batch, identify, track etc.)
// has a `done` channel that receives a response(error if any)
type webRequestT struct {
done chan<- string
reqType string
Expand Down Expand Up @@ -47,11 +45,11 @@ type sourceDebugger struct {
writeKey string
}

// Basic worker unit that works on incoming webRequests.
//
// Has three channels used to communicate between the two goroutines each worker runs.
//
// One to receive new webRequests, one to send batches of said webRequests and the third to receive errors if any in response to sending the said batches to dbWriterWorker.
// userWebRequestWorkerT is a basic worker unit that works on incoming webRequests.
// It has three channels used to communicate between the two goroutines each worker runs:
// - one to receive new webRequests,
// - one to send batches of said webRequests
// - one to receive errors if any in response to sending the said batches to dbWriterWorker
type userWebRequestWorkerT struct {
webRequestQ chan *webRequestT
batchRequestQ chan *batchWebRequestT
Expand Down

0 comments on commit bf88c80

Please sign in to comment.