Skip to content

Commit

Permalink
HTTP server w/ write API (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Nov 24, 2018
1 parent 05a49bb commit a64a299
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 2 deletions.
67 changes: 65 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions server/http/handlers/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package handlers

import (
"net/http"
)

// A list of HTTP endpoints.
// TODO(xichen): API versioning.
const (
healthPath = "/health"
writePath = "/write"
)

// RegisterService registers handler service.
func RegisterService(mux *http.ServeMux, s Service) {
mux.HandleFunc(healthPath, s.Health)
mux.HandleFunc(writePath, s.Write)
}
85 changes: 85 additions & 0 deletions server/http/handlers/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package handlers

import (
"github.com/xichen2020/eventdb/parser/json"
"github.com/xichen2020/eventdb/value"
)

// IDFn determines the ID of a JSON event.
type IDFn func(value *value.Value) ([]byte, error)

// NamespaceFn determines the namespace a JSON event belongs to.
type NamespaceFn func(value *value.Value) ([]byte, error)

// TimeNanosFn determines the timestamp of a JSON event in nanoseconds.
type TimeNanosFn func(value *value.Value) (int64, error)

// Options provide a set of options for service handlers.
type Options struct {
parserPool *json.ParserPool
idFn IDFn
namespaceFn NamespaceFn
timeNanosFn TimeNanosFn
}

// NewOptions create a new set of options.
func NewOptions() *Options {
o := &Options{}
o.initPools()
return o
}

// SetParserPool sets the pool for JSON parsers.
func (o *Options) SetParserPool(value *json.ParserPool) *Options {
opts := *o
opts.parserPool = value
return &opts
}

// ParserPool returns the pool for JSON parsers.
func (o *Options) ParserPool() *json.ParserPool {
return o.parserPool
}

// SetIDFn sets the ID function.
func (o *Options) SetIDFn(value IDFn) *Options {
opts := *o
opts.idFn = value
return &opts
}

// IDFn returns the ID function.
func (o *Options) IDFn() IDFn {
return o.idFn
}

// SetNamespaceFn sets the namespace function.
func (o *Options) SetNamespaceFn(value NamespaceFn) *Options {
opts := *o
opts.namespaceFn = value
return &opts
}

// NamespaceFn returns the ID function.
func (o *Options) NamespaceFn() NamespaceFn {
return o.namespaceFn
}

// SetTimeNanosFn sets the timestamp function.
func (o *Options) SetTimeNanosFn(value TimeNanosFn) *Options {
opts := *o
opts.timeNanosFn = value
return &opts
}

// TimeNanosFn returns the timestamp function.
func (o *Options) TimeNanosFn() TimeNanosFn {
return o.timeNanosFn
}

func (o *Options) initPools() {
// Initialize JSON parser pool.
pp := json.NewParserPool(nil)
pp.Init(func() json.Parser { return json.NewParser(nil) })
o.parserPool = pp
}
58 changes: 58 additions & 0 deletions server/http/handlers/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package handlers

import (
"bytes"
"encoding/json"
"net/http"

"github.com/m3db/m3x/errors"
)

// Response is an HTTP response.
type Response struct {
State string `json:"state,omitempty"`
Error string `json:"error,omitempty"`
}

// NewResponse creates a new empty response.
func NewResponse() Response { return Response{} }

func newSuccessResponse() Response {
return Response{State: "OK"}
}

func newErrorResponse(err error) Response {
var errStr string
if err != nil {
errStr = err.Error()
}
return Response{State: "Error", Error: errStr}
}

func writeSuccessResponse(w http.ResponseWriter) {
response := newSuccessResponse()
writeResponse(w, response, nil)
}

func writeErrorResponse(w http.ResponseWriter, err error) {
writeResponse(w, nil, err)
}

func writeResponse(w http.ResponseWriter, resp interface{}, err error) {
buf := bytes.NewBuffer(nil)
if encodeErr := json.NewEncoder(buf).Encode(&resp); encodeErr != nil {
w.WriteHeader(http.StatusInternalServerError)
resp = newErrorResponse(encodeErr)
json.NewEncoder(w).Encode(&resp)
return
}

if err == nil {
w.WriteHeader(http.StatusOK)
} else if errors.IsInvalidParams(err) {
w.WriteHeader(http.StatusBadRequest)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
w.Write(buf.Bytes())
}
130 changes: 130 additions & 0 deletions server/http/handlers/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package handlers

import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"

"github.com/xichen2020/eventdb/event"
"github.com/xichen2020/eventdb/parser/json"
"github.com/xichen2020/eventdb/storage"
"github.com/xichen2020/eventdb/value"

xerrors "github.com/m3db/m3x/errors"
)

// Service provides handlers for serving HTTP requests.
type Service interface {
// Health returns service health.
Health(w http.ResponseWriter, r *http.Request)

// Write writes a single JSON event.
Write(w http.ResponseWriter, r *http.Request)
}

var (
errRequestMustBeGet = xerrors.NewInvalidParamsError(errors.New("request must be GET"))
errRequestMustBePost = xerrors.NewInvalidParamsError(errors.New("request must be POST"))
)

type service struct {
db storage.Database
parserPool *json.ParserPool
idFn IDFn
namespaceFn NamespaceFn
timeNanosFn TimeNanosFn
}

// NewService creates a new service.
func NewService(db storage.Database, opts *Options) Service {
if opts == nil {
opts = NewOptions()
}
return &service{
db: db,
parserPool: opts.ParserPool(),
idFn: opts.IDFn(),
namespaceFn: opts.NamespaceFn(),
timeNanosFn: opts.TimeNanosFn(),
}
}

func (s *service) Health(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")

if httpMethod := strings.ToUpper(r.Method); httpMethod != http.MethodGet {
writeErrorResponse(w, errRequestMustBeGet)
return
}
writeSuccessResponse(w)
}

func (s *service) Write(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

w.Header().Set("Content-Type", "application/json")
if httpMethod := strings.ToUpper(r.Method); httpMethod != http.MethodPost {
writeErrorResponse(w, errRequestMustBePost)
return
}

data, err := ioutil.ReadAll(r.Body)
if err != nil {
err = fmt.Errorf("cannot read body: %v", err)
writeErrorResponse(w, err)
return
}

p := s.parserPool.Get()
// TODO(xichen): Check this is correct wrt the parser lifetime.
defer s.parserPool.Put(p)

v, err := p.ParseBytes(data)
if err != nil {
err = fmt.Errorf("cannot parse event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

// NB: Perhaps better to specify as a URL param.
namespace, err := s.namespaceFn(v)
if err != nil {
err = fmt.Errorf("cannot determine namespace for event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

id, err := s.idFn(v)
if err != nil {
err = fmt.Errorf("cannot determine ID for event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

timeNanos, err := s.timeNanosFn(v)
if err != nil {
err = fmt.Errorf("cannot determine timestamp for event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

// TODO(xichen): Pool the iterators.
fieldIter := value.NewFieldIterator(v)
ev := event.Event{
ID: id,
TimeNanos: timeNanos,
Fields: fieldIter,
RawData: data,
}

err = s.db.Write(namespace, ev)
if err != nil {
err = fmt.Errorf("cannot write event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

writeSuccessResponse(w)
}
Loading

0 comments on commit a64a299

Please sign in to comment.