-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
05a49bb
commit 5e7e45b
Showing
8 changed files
with
459 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package handlers | ||
|
||
import ( | ||
"net/http" | ||
) | ||
|
||
// A list of HTTP endpoints. | ||
// TODO(xichen): API versioning. | ||
const ( | ||
healthPath = "/health" | ||
writePath = "/write" | ||
) | ||
|
||
// RegisterService registers handler service. | ||
func RegisterService(mux *http.ServeMux, s Service) { | ||
mux.HandleFunc(healthPath, s.Health) | ||
mux.HandleFunc(writePath, s.Write) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package handlers | ||
|
||
import ( | ||
"github.com/xichen2020/eventdb/parser/json" | ||
"github.com/xichen2020/eventdb/value" | ||
) | ||
|
||
// IDFn determines the ID of a JSON event. | ||
type IDFn func(value *value.Value) ([]byte, error) | ||
|
||
// NamespaceFn determines the namespace a JSON event belongs to. | ||
type NamespaceFn func(value *value.Value) ([]byte, error) | ||
|
||
// TimeNanosFn determines the timestamp of a JSON event in nanoseconds. | ||
type TimeNanosFn func(value *value.Value) (int64, error) | ||
|
||
// Options provide a set of options for service handlers. | ||
type Options struct { | ||
parserPool *json.ParserPool | ||
idFn IDFn | ||
namespaceFn NamespaceFn | ||
timeNanosFn TimeNanosFn | ||
} | ||
|
||
// NewOptions create a new set of options. | ||
func NewOptions() *Options { | ||
o := &Options{} | ||
o.initPools() | ||
return o | ||
} | ||
|
||
// SetParserPool sets the pool for JSON parsers. | ||
func (o *Options) SetParserPool(value *json.ParserPool) *Options { | ||
opts := *o | ||
opts.parserPool = value | ||
return &opts | ||
} | ||
|
||
// ParserPool returns the pool for JSON parsers. | ||
func (o *Options) ParserPool() *json.ParserPool { | ||
return o.parserPool | ||
} | ||
|
||
// SetIDFn sets the ID function. | ||
func (o *Options) SetIDFn(value IDFn) *Options { | ||
opts := *o | ||
opts.idFn = value | ||
return &opts | ||
} | ||
|
||
// IDFn returns the ID function. | ||
func (o *Options) IDFn() IDFn { | ||
return o.idFn | ||
} | ||
|
||
// SetNamespaceFn sets the namespace function. | ||
func (o *Options) SetNamespaceFn(value NamespaceFn) *Options { | ||
opts := *o | ||
opts.namespaceFn = value | ||
return &opts | ||
} | ||
|
||
// NamespaceFn returns the ID function. | ||
func (o *Options) NamespaceFn() NamespaceFn { | ||
return o.namespaceFn | ||
} | ||
|
||
// SetTimeNanosFn sets the timestamp function. | ||
func (o *Options) SetTimeNanosFn(value TimeNanosFn) *Options { | ||
opts := *o | ||
opts.timeNanosFn = value | ||
return &opts | ||
} | ||
|
||
// TimeNanosFn returns the timestamp function. | ||
func (o *Options) TimeNanosFn() TimeNanosFn { | ||
return o.timeNanosFn | ||
} | ||
|
||
func (o *Options) initPools() { | ||
// Initialize JSON parser pool. | ||
pp := json.NewParserPool(nil) | ||
pp.Init(func() json.Parser { return json.NewParser(nil) }) | ||
o.parserPool = pp | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package handlers | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"net/http" | ||
|
||
"github.com/m3db/m3x/errors" | ||
) | ||
|
||
// Response is an HTTP response. | ||
type Response struct { | ||
State string `json:"state,omitempty"` | ||
Error string `json:"error,omitempty"` | ||
} | ||
|
||
// NewResponse creates a new empty response. | ||
func NewResponse() Response { return Response{} } | ||
|
||
func newSuccessResponse() Response { | ||
return Response{State: "OK"} | ||
} | ||
|
||
func newErrorResponse(err error) Response { | ||
var errStr string | ||
if err != nil { | ||
errStr = err.Error() | ||
} | ||
return Response{State: "Error", Error: errStr} | ||
} | ||
|
||
func writeSuccessResponse(w http.ResponseWriter) { | ||
response := newSuccessResponse() | ||
writeResponse(w, response, nil) | ||
} | ||
|
||
func writeErrorResponse(w http.ResponseWriter, err error) { | ||
writeResponse(w, nil, err) | ||
} | ||
|
||
func writeResponse(w http.ResponseWriter, resp interface{}, err error) { | ||
buf := bytes.NewBuffer(nil) | ||
if encodeErr := json.NewEncoder(buf).Encode(&resp); encodeErr != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
resp = newErrorResponse(encodeErr) | ||
json.NewEncoder(w).Encode(&resp) | ||
return | ||
} | ||
|
||
if err == nil { | ||
w.WriteHeader(http.StatusOK) | ||
} else if errors.IsInvalidParams(err) { | ||
w.WriteHeader(http.StatusBadRequest) | ||
} else { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
} | ||
w.Write(buf.Bytes()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
package handlers | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"strings" | ||
|
||
"github.com/xichen2020/eventdb/event" | ||
"github.com/xichen2020/eventdb/parser/json" | ||
"github.com/xichen2020/eventdb/storage" | ||
"github.com/xichen2020/eventdb/value" | ||
|
||
xerrors "github.com/m3db/m3x/errors" | ||
) | ||
|
||
// Service provides handlers for serving HTTP requests. | ||
type Service interface { | ||
// Health returns service health. | ||
Health(w http.ResponseWriter, r *http.Request) | ||
|
||
// Write writes a single JSON event. | ||
Write(w http.ResponseWriter, r *http.Request) | ||
} | ||
|
||
var ( | ||
errRequestMustBeGet = xerrors.NewInvalidParamsError(errors.New("request must be GET")) | ||
errRequestMustBePost = xerrors.NewInvalidParamsError(errors.New("request must be POST")) | ||
) | ||
|
||
type service struct { | ||
db storage.Database | ||
parserPool *json.ParserPool | ||
idFn IDFn | ||
namespaceFn NamespaceFn | ||
timeNanosFn TimeNanosFn | ||
} | ||
|
||
// NewService creates a new service. | ||
func NewService(db storage.Database, opts *Options) Service { | ||
if opts == nil { | ||
opts = NewOptions() | ||
} | ||
return &service{ | ||
db: db, | ||
parserPool: opts.ParserPool(), | ||
idFn: opts.IDFn(), | ||
namespaceFn: opts.NamespaceFn(), | ||
timeNanosFn: opts.TimeNanosFn(), | ||
} | ||
} | ||
|
||
func (s *service) Health(w http.ResponseWriter, r *http.Request) { | ||
w.Header().Set("Content-Type", "application/json") | ||
|
||
if httpMethod := strings.ToUpper(r.Method); httpMethod != http.MethodGet { | ||
writeErrorResponse(w, errRequestMustBeGet) | ||
return | ||
} | ||
writeSuccessResponse(w) | ||
} | ||
|
||
func (s *service) Write(w http.ResponseWriter, r *http.Request) { | ||
defer r.Body.Close() | ||
|
||
w.Header().Set("Content-Type", "application/json") | ||
if httpMethod := strings.ToUpper(r.Method); httpMethod != http.MethodPost { | ||
writeErrorResponse(w, errRequestMustBePost) | ||
return | ||
} | ||
|
||
data, err := ioutil.ReadAll(r.Body) | ||
if err != nil { | ||
err = fmt.Errorf("cannot read body: %v", err) | ||
writeErrorResponse(w, err) | ||
return | ||
} | ||
|
||
p := s.parserPool.Get() | ||
// TODO(xichen): Check this is correct wrt the parser lifetime. | ||
defer s.parserPool.Put(p) | ||
|
||
v, err := p.ParseBytes(data) | ||
if err != nil { | ||
err = fmt.Errorf("cannot parse event %s: %v", data, err) | ||
writeErrorResponse(w, err) | ||
return | ||
} | ||
|
||
// NB: Perhaps better to specify as a URL param. | ||
namespace, err := s.namespaceFn(v) | ||
if err != nil { | ||
err = fmt.Errorf("cannot determine namespace for event %s: %v", data, err) | ||
writeErrorResponse(w, err) | ||
return | ||
} | ||
|
||
id, err := s.idFn(v) | ||
if err != nil { | ||
err = fmt.Errorf("cannot determine ID for event %s: %v", data, err) | ||
writeErrorResponse(w, err) | ||
return | ||
} | ||
|
||
timeNanos, err := s.timeNanosFn(v) | ||
if err != nil { | ||
err = fmt.Errorf("cannot determine timestamp for event %s: %v", data, err) | ||
writeErrorResponse(w, err) | ||
return | ||
} | ||
|
||
// TODO(xichen): Pool the iterators. | ||
fieldIter := value.NewFieldIterator(v) | ||
ev := event.Event{ | ||
ID: id, | ||
TimeNanos: timeNanos, | ||
Fields: fieldIter, | ||
RawData: data, | ||
} | ||
|
||
err = s.db.Write(namespace, ev) | ||
if err != nil { | ||
err = fmt.Errorf("cannot write event %s: %v", data, err) | ||
writeErrorResponse(w, err) | ||
return | ||
} | ||
|
||
writeSuccessResponse(w) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package http | ||
|
||
import ( | ||
"time" | ||
) | ||
|
||
const ( | ||
defaultReadTimeout = 10 * time.Second | ||
defaultWriteTimeout = 10 * time.Second | ||
) | ||
|
||
// Options provide a set of HTTP server options. | ||
type Options struct { | ||
readTimeout time.Duration | ||
writeTimeout time.Duration | ||
} | ||
|
||
// NewOptions creates a new set of server options. | ||
func NewOptions() *Options { | ||
o := &Options{ | ||
readTimeout: defaultReadTimeout, | ||
writeTimeout: defaultWriteTimeout, | ||
} | ||
return o | ||
} | ||
|
||
// SetReadTimeout sets the timeout for a read request. | ||
func (o *Options) SetReadTimeout(value time.Duration) *Options { | ||
opts := *o | ||
opts.readTimeout = value | ||
return &opts | ||
} | ||
|
||
// ReadTimeout returns the timeout for a read request. | ||
func (o *Options) ReadTimeout() time.Duration { | ||
return o.readTimeout | ||
} | ||
|
||
// SetWriteTimeout sets the timeout for a write request. | ||
func (o *Options) SetWriteTimeout(value time.Duration) *Options { | ||
opts := *o | ||
opts.writeTimeout = value | ||
return &opts | ||
} | ||
|
||
// WriteTimeout returns the timeout for a write request. | ||
func (o *Options) WriteTimeout() time.Duration { | ||
return o.writeTimeout | ||
} |
Oops, something went wrong.