Skip to content

Commit

Permalink
Central processor registration (elastic#229)
Browse files Browse the repository at this point in the history
Register processors centrally, so that processor endpoints and handlers can be all in one place.

This also removes the workarounds with blank identifier imports, and instantiates one processor per request to avoid harmful state
  • Loading branch information
jalvz authored and simitt committed Nov 17, 2017
1 parent 6ffdd63 commit 6c151e0
Show file tree
Hide file tree
Showing 22 changed files with 75 additions and 233 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ collect: imports fields go-generate create-docs notice
imports:
@mkdir -p include
@mkdir -p processor
@python ${GOPATH}/src/${BEAT_PATH}/script/generate_imports.py ${BEAT_PATH} > include/list.go

.PHONY: fields
fields:
Expand Down
3 changes: 0 additions & 3 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
"net/http/httptest"
"testing"

// make sure processors are loaded
_ "github.com/elastic/apm-server/include"

"github.com/elastic/apm-server/tests"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/monitoring"
Expand Down
57 changes: 39 additions & 18 deletions beater/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,28 @@ import (

"crypto/subtle"

err "github.com/elastic/apm-server/processor/error"
"github.com/elastic/apm-server/processor/healthcheck"
"github.com/elastic/apm-server/processor/transaction"
"github.com/elastic/beats/libbeat/monitoring"
)

type processorHandler func(processor.Processor, Config, reporter) http.Handler
const (
BackendTransactionsURL = "/v1/transactions"
FrontendTransactionsURL = "/v1/client-side/transactions"
BackendErrorsURL = "/v1/errors"
FrontendErrorsURL = "/v1/client-side/errors"
HealthCheckURL = "/healthcheck"
)

type ProcessorFactory func() processor.Processor

type ProcessorHandler func(ProcessorFactory, Config, reporter) http.Handler

type routeMapping struct {
ProcessorHandler
ProcessorFactory
}

var (
serverMetrics = monitoring.Default.NewRegistry("apm-server.server")
Expand All @@ -32,38 +50,39 @@ var (
errForbidden = errors.New("forbidden request")
errPOSTRequestOnly = errors.New("only POST requests are supported")

handlerMap = map[int]processorHandler{
processor.Backend: backendHandler,
processor.Frontend: frontendHandler,
processor.HealthCheck: healthCheckHandler,
Routes = map[string]routeMapping{
BackendTransactionsURL: {backendHandler, transaction.NewProcessor},
FrontendTransactionsURL: {frontendHandler, transaction.NewProcessor},
BackendErrorsURL: {backendHandler, err.NewProcessor},
FrontendErrorsURL: {frontendHandler, err.NewProcessor},
HealthCheckURL: {healthCheckHandler, healthcheck.NewProcessor},
}
)

func newMuxer(config Config, report reporter) *http.ServeMux {
mux := http.NewServeMux()

for path, p := range processor.Registry.Processors() {
handler := handlerMap[p.Type()]
for path, mapping := range Routes {
logp.Info("Path %s added to request handler", path)
mux.Handle(path, handler(p, config, report))
mux.Handle(path, mapping.ProcessorHandler(mapping.ProcessorFactory, config, report))
}

return mux
}

func backendHandler(p processor.Processor, config Config, report reporter) http.Handler {
func backendHandler(pf ProcessorFactory, config Config, report reporter) http.Handler {
return logHandler(
authHandler(config.SecretToken,
processRequestHandler(p, config, report)))
processRequestHandler(pf, config, report)))
}

func frontendHandler(p processor.Processor, config Config, report reporter) http.Handler {
func frontendHandler(pf ProcessorFactory, config Config, report reporter) http.Handler {
return logHandler(
frontendSwitchHandler(config.EnableFrontend,
processRequestHandler(p, config, report)))
processRequestHandler(pf, config, report)))
}

func healthCheckHandler(_ processor.Processor, _ Config, _ reporter) http.Handler {
func healthCheckHandler(_ ProcessorFactory, _ Config, _ reporter) http.Handler {
return logHandler(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sendStatus(w, r, 200, nil)
Expand Down Expand Up @@ -114,14 +133,16 @@ func isAuthorized(req *http.Request, secretToken string) bool {
return subtle.ConstantTimeCompare([]byte(parts[1]), []byte(secretToken)) == 1
}

func processRequestHandler(p processor.Processor, config Config, report reporter) http.Handler {
func processRequestHandler(pf ProcessorFactory, config Config, report reporter) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
code, err := processRequest(r, p, config.MaxUnzippedSize, report)
code, err := processRequest(r, pf, config.MaxUnzippedSize, report)
sendStatus(w, r, code, err)
})
}

func processRequest(r *http.Request, p processor.Processor, maxSize int64, report reporter) (int, error) {
func processRequest(r *http.Request, pf ProcessorFactory, maxSize int64, report reporter) (int, error) {

processor := pf()

if r.Method != "POST" {
return 405, errPOSTRequestOnly
Expand All @@ -142,11 +163,11 @@ func processRequest(r *http.Request, p processor.Processor, maxSize int64, repor

}

if err = p.Validate(buf); err != nil {
if err = processor.Validate(buf); err != nil {
return 400, err
}

list, err := p.Transform(buf)
list, err := processor.Transform(buf)
if err != nil {
return 400, err
}
Expand Down
10 changes: 4 additions & 6 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/kabukky/httpscerts"
"github.com/stretchr/testify/assert"

"github.com/elastic/apm-server/processor/healthcheck"
"github.com/elastic/apm-server/processor/transaction"
"github.com/elastic/apm-server/tests"
"github.com/elastic/beats/libbeat/beat"
)
Expand Down Expand Up @@ -56,7 +54,7 @@ func TestServerHealth(t *testing.T) {
apm, teardown := setupServer(t, noSSL)
defer teardown()

req, err := http.NewRequest("GET", healthcheck.Endpoint, nil)
req, err := http.NewRequest("GET", HealthCheckURL, nil)
if err != nil {
t.Fatalf("Failed to create test request object: %v", err)
}
Expand All @@ -70,7 +68,7 @@ func TestServerFrontendSwitch(t *testing.T) {
apm, teardown := setupServer(t, noSSL)
defer teardown()

req, _ := http.NewRequest("POST", transaction.FrontendEndpoint, bytes.NewReader(testData))
req, _ := http.NewRequest("POST", FrontendTransactionsURL, bytes.NewReader(testData))

rec := httptest.NewRecorder()
apm.Handler.ServeHTTP(rec, req)
Expand Down Expand Up @@ -182,7 +180,7 @@ func withSSL(t *testing.T, domain string) *SSLConfig {
}

func makeTestRequest(t *testing.T) *http.Request {
req, err := http.NewRequest("POST", transaction.BackendEndpoint, bytes.NewReader(testData))
req, err := http.NewRequest("POST", BackendTransactionsURL, bytes.NewReader(testData))
if err != nil {
t.Fatalf("Failed to create test request object: %v", err)
}
Expand All @@ -195,7 +193,7 @@ func postTestRequest(t *testing.T, apm *http.Server, client *http.Client, schema
client = http.DefaultClient
}

addr := fmt.Sprintf("%s://%s%s", schema, apm.Addr, transaction.BackendEndpoint)
addr := fmt.Sprintf("%s://%s%s", schema, apm.Addr, BackendTransactionsURL)
return client.Post(addr, "application/json", bytes.NewReader(testData))
}

Expand Down
13 changes: 0 additions & 13 deletions include/list.go

This file was deleted.

2 changes: 0 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"

"github.com/elastic/apm-server/cmd"

_ "github.com/elastic/apm-server/include"
)

func main() {
Expand Down
4 changes: 2 additions & 2 deletions processor/error/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func BenchmarkEventWithFileLoading(b *testing.B) {
processor := NewBackendProcessor()
processor := NewProcessor()
for i := 0; i < b.N; i++ {
data, _ := tests.LoadValidData("error")
err := processor.Validate(data)
Expand All @@ -20,7 +20,7 @@ func BenchmarkEventWithFileLoading(b *testing.B) {
}

func BenchmarkEventFileLoadingOnce(b *testing.B) {
processor := NewBackendProcessor()
processor := NewProcessor()
data, _ := tests.LoadValidData("error")
for i := 0; i < b.N; i++ {
err := processor.Validate(data)
Expand Down
4 changes: 2 additions & 2 deletions processor/error/package_tests/fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestFields(t *testing.T) {
"./../../../_meta/fields.common.yml",
"./../_meta/fields.yml",
}
tests.TestEventAttrsDocumentedInFields(t, fieldsPaths, er.NewBackendProcessor)
tests.TestEventAttrsDocumentedInFields(t, fieldsPaths, er.NewProcessor)

notInEvent := set.New(
"context.db.instance",
Expand All @@ -26,5 +26,5 @@ func TestFields(t *testing.T) {
"error id icon",
"view errors",
)
tests.TestDocumentedFieldsInEvent(t, fieldsPaths, er.NewBackendProcessor, notInEvent)
tests.TestDocumentedFieldsInEvent(t, fieldsPaths, er.NewProcessor, notInEvent)
}
4 changes: 2 additions & 2 deletions processor/error/package_tests/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ func TestProcessorOK(t *testing.T) {
{Name: "TestProcessErrorMininmalPayloadLog", Path: "tests/data/valid/error/minimal_payload_log.json"},
{Name: "TestProcessErrorFull", Path: "tests/data/valid/error/payload.json"},
}
tests.TestProcessRequests(t, er.NewBackendProcessor(), requestInfo)
tests.TestProcessRequests(t, er.NewProcessor(), requestInfo)
}

// ensure invalid documents fail the json schema validation already
func TestProcessorFailedValidation(t *testing.T) {
data, err := tests.LoadInvalidData("error")
assert.Nil(t, err)
err = er.NewBackendProcessor().Validate(data)
err = er.NewProcessor().Validate(data)
assert.NotNil(t, err)
}
24 changes: 4 additions & 20 deletions processor/error/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ import (
"github.com/elastic/beats/libbeat/monitoring"
)

func init() {
pr.Registry.AddProcessor(BackendEndpoint, NewBackendProcessor())
pr.Registry.AddProcessor(FrontendEndpoint, NewFrontendProcessor())
}

var (
errorMetrics = monitoring.Default.NewRegistry("apm-server.processor.error")
validationCount = monitoring.NewInt(errorMetrics, "validation.count")
Expand All @@ -23,24 +18,17 @@ var (
)

const (
BackendEndpoint = "/v1/errors"
FrontendEndpoint = "/v1/client-side/errors"
processorName = "error"
processorName = "error"
)

func NewBackendProcessor() pr.Processor {
schema := pr.CreateSchema(errorSchema, processorName)
return &processor{schema, pr.Backend}
}
var schema = pr.CreateSchema(errorSchema, processorName)

func NewFrontendProcessor() pr.Processor {
schema := pr.CreateSchema(errorSchema, processorName)
return &processor{schema, pr.Frontend}
func NewProcessor() pr.Processor {
return &processor{schema}
}

type processor struct {
schema *jsonschema.Schema
typ int
}

func (p *processor) Validate(buf []byte) error {
Expand All @@ -66,7 +54,3 @@ func (p *processor) Transform(buf []byte) ([]beat.Event, error) {
func (p *processor) Name() string {
return processorName
}

func (p *processor) Type() int {
return p.typ
}
21 changes: 4 additions & 17 deletions processor/error/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,9 @@ import (
)

func TestImplementProcessorInterface(t *testing.T) {
constructors := []func() pr.Processor{NewFrontendProcessor, NewBackendProcessor}
for _, constructor := range constructors {
p := constructor()
assert.NotNil(t, p)
_, ok := p.(pr.Processor)
assert.True(t, ok)
assert.IsType(t, &processor{}, p)
}
}

func TestAddProcessorToRegistryOnInit(t *testing.T) {
p := pr.Registry.Processor(BackendEndpoint)
p := NewProcessor()
assert.NotNil(t, p)
assert.Equal(t, pr.Backend, p.Type())

p2 := pr.Registry.Processor(FrontendEndpoint)
assert.NotNil(t, p2)
assert.Equal(t, pr.Frontend, p2.Type())
_, ok := p.(pr.Processor)
assert.True(t, ok)
assert.IsType(t, &processor{}, p)
}
9 changes: 0 additions & 9 deletions processor/healthcheck/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
)

func init() {
pr.Registry.AddProcessor(Endpoint, NewProcessor())
}

const (
Endpoint = "/healthcheck"
processorName = "healthcheck"
)

Expand All @@ -31,7 +26,3 @@ func (p *processor) Transform(buf []byte) ([]beat.Event, error) {
func (p *processor) Name() string {
return processorName
}

func (p *processor) Type() int {
return pr.HealthCheck
}
6 changes: 0 additions & 6 deletions processor/healthcheck/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,3 @@ func TestImplementProcessorInterface(t *testing.T) {
assert.True(t, ok)
assert.IsType(t, &processor{}, p)
}

func TestAddProcessorToRegistryOnInit(t *testing.T) {
p := pr.Registry.Processor(Endpoint)
assert.NotNil(t, p)
assert.Equal(t, pr.HealthCheck, p.Type())
}
1 change: 0 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Processor interface {
Validate([]byte) error
Transform([]byte) ([]beat.Event, error)
Name() string
Type() int
}

func CreateDoc(timestamp time.Time, docMappings []m.DocMapping) beat.Event {
Expand Down
Loading

0 comments on commit 6c151e0

Please sign in to comment.