Skip to content

Commit

Permalink
Merge c9f46e7 into a093273
Browse files Browse the repository at this point in the history
  • Loading branch information
igorvpcleao committed Mar 28, 2018
2 parents a093273 + c9f46e7 commit a0d5e5c
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 10 deletions.
12 changes: 7 additions & 5 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

[[constraint]]
name = "github.com/topfreegames/extensions"
version = "5.9.0"
version = "5.11.1"

[[constraint]]
name = "github.com/go-gorp/gorp"
Expand Down
2 changes: 2 additions & 0 deletions api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/topfreegames/extensions/jaeger"
extnethttpmiddleware "github.com/topfreegames/extensions/middleware"
"github.com/topfreegames/extensions/mongo/interfaces"
extworkermiddleware "github.com/topfreegames/extensions/worker/middleware"
"github.com/topfreegames/khan/es"
"github.com/topfreegames/khan/log"
"github.com/topfreegames/khan/models"
Expand Down Expand Up @@ -515,6 +516,7 @@ func (app *App) configureGoWorkers() {
workers.Logger = dlog.New(&buf, "test: ", 0)
}

workers.Middleware.Append(extworkermiddleware.NewResponseTimeMetricsMiddleware(app.DDStatsD))
workers.Process(queues.KhanQueue, app.Dispatcher.PerformDispatchHook, workerCount)
workers.Process(queues.KhanESQueue, app.ESWorker.PerformUpdateES, workerCount)
workers.Process(queues.KhanMongoQueue, app.MongoWorker.PerformUpdateMongo, workerCount)
Expand Down
23 changes: 23 additions & 0 deletions api/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"github.com/valyala/fasttemplate"
)

const hookInternalFailures = "hook_internal_failures"
const requestingHookMilliseconds = "requesting_hook_milliseconds"
const requestingHookURLStatus = "requesting_hook_status"

//Dispatcher is responsible for sending web hooks to workers
type Dispatcher struct {
app *App
Expand Down Expand Up @@ -104,6 +108,7 @@ func basicAuth(username, password string) string {
// PerformDispatchHook dispatches web hooks for a specific game and event type
func (d *Dispatcher) PerformDispatchHook(m *workers.Msg) {
app := d.app
statsd := app.DDStatsD

item := m.Args()
data := item.MustMap()
Expand Down Expand Up @@ -143,6 +148,9 @@ func (d *Dispatcher) PerformDispatchHook(m *workers.Msg) {
requestURL, err := d.interpolateURL(hook.URL, payload)
if err != nil {
app.addError()
tags := []string{"error:interpolateurl"}
statsd.Increment(hookInternalFailures, tags...)

log.E(l, "Could not interpolate webhook.", func(cm log.CM) {
cm.Write(
zap.String("requestURL", hook.URL),
Expand All @@ -164,6 +172,9 @@ func (d *Dispatcher) PerformDispatchHook(m *workers.Msg) {
parsedURL, err := url.Parse(requestURL)
if err != nil {
app.addError()
tags := []string{"error:parserequesturl"}
statsd.Increment(hookInternalFailures, tags...)

log.E(l, "Could not parse request requestURL.", func(cm log.CM) {
cm.Write(
zap.String(requestURL, hook.URL),
Expand All @@ -184,15 +195,27 @@ func (d *Dispatcher) PerformDispatchHook(m *workers.Msg) {
req.SetRequestURI(requestURL)
resp := fasthttp.AcquireResponse()

start := time.Now()
err = client.DoTimeout(req, resp, timeout)
if err != nil {
app.addError()
tags := []string{"error:timeout"}
statsd.Increment(hookInternalFailures, tags...)

log.E(l, "Could not request webhook.", func(cm log.CM) {
cm.Write(zap.String("requestURL", hook.URL), zap.Error(err))
})
continue
}

// elapsed must be set after checking the error
// in order avoid noising the avg time with timeouts
elapsed := time.Since(start)
statsd.Timing(requestingHookMilliseconds, elapsed)

tags := []string{fmt.Sprintf("status:%d", resp.StatusCode())}
statsd.Increment(requestingHookURLStatus, tags...)

if resp.StatusCode() > 399 {
app.addError()
log.E(l, "Could not request webhook.", func(cm log.CM) {
Expand Down
8 changes: 6 additions & 2 deletions models/es_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package models

import (
"context"
"time"
"encoding/json"
"time"

"github.com/jrallison/go-workers"
"github.com/topfreegames/khan/es"
Expand Down Expand Up @@ -36,7 +36,7 @@ func (w *ESWorker) PerformUpdateES(m *workers.Msg) {

index := data["index"].(string)
op := data["op"].(string)
clan := data["clan"].(map[string]interface {})
clan := data["clan"].(map[string]interface{})
clanID := data["clanID"].(string)

l := w.Logger.With(
Expand Down Expand Up @@ -65,6 +65,7 @@ func (w *ESWorker) PerformUpdateES(m *workers.Msg) {
l.Error("Failed to index clan into Elastic Search")
return
}

l.Info("Successfully indexed clan into Elastic Search.", zap.Duration("latency", time.Now().Sub(start)))
} else if op == "update" {
_, err := w.ES.Client.
Expand All @@ -77,6 +78,7 @@ func (w *ESWorker) PerformUpdateES(m *workers.Msg) {
if err != nil {
l.Error("Failed to update clan from Elastic Search.", zap.Error(err))
}

l.Info("Successfully updated clan from Elastic Search.", zap.Duration("latency", time.Now().Sub(start)))
} else if op == "delete" {
_, err := w.ES.Client.
Expand All @@ -85,9 +87,11 @@ func (w *ESWorker) PerformUpdateES(m *workers.Msg) {
Type("clan").
Id(clanID).
Do(context.TODO())

if err != nil {
l.Error("Failed to delete clan from Elastic Search.", zap.Error(err))
}

l.Info("Successfully deleted clan from Elastic Search.", zap.Duration("latency", time.Now().Sub(start)))
}
}
Expand Down
1 change: 0 additions & 1 deletion models/mongo_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func (w *MongoWorker) configureMongoWorker(config *viper.Viper) {
func (w *MongoWorker) PerformUpdateMongo(m *workers.Msg) {
item := m.Args()
data := item.MustMap()

game := data["game"].(string)
op := data["op"].(string)
clan := data["clan"].(map[string]interface{})
Expand Down
2 changes: 1 addition & 1 deletion util/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
package util

// VERSION identifies Khan's current version
var VERSION = "3.1.0"
var VERSION = "3.2.0"

0 comments on commit a0d5e5c

Please sign in to comment.