diff --git a/server/http/handlers/service.go b/server/http/handlers/service.go index 67a2501..0da51e9 100644 --- a/server/http/handlers/service.go +++ b/server/http/handlers/service.go @@ -85,72 +85,7 @@ func (s *service) Write(w http.ResponseWriter, r *http.Request) { return } - p := s.parserPool.Get() - // TODO(xichen): Check this is correct wrt the parser lifetime. - defer s.parserPool.Put(p) - - v, err := p.ParseBytes(data) - if err != nil { - err = fmt.Errorf("cannot parse event %s: %v", data, err) - writeErrorResponse(w, err) - return - } - - // NB: Perhaps better to specify as a URL param. - // Extract event namespace from JSON. - namespaceFieldName := s.dbOpts.NamespaceFieldName() - namespaceVal, ok := v.Get(namespaceFieldName) - if !ok { - err = fmt.Errorf("cannot find namespace field %s for event %v", namespaceFieldName, data) - writeErrorResponse(w, err) - return - } - namespace, err := s.namespaceFn(namespaceVal) - if err != nil { - err = fmt.Errorf("cannot determine namespace for event %s: %v", data, err) - writeErrorResponse(w, err) - return - } - - // Extract event timestamp from JSON. - timestampFieldName := s.dbOpts.TimestampFieldName() - tsVal, ok := v.Get(timestampFieldName) - if !ok { - err = fmt.Errorf("cannot find timestamp field %s for event %v", timestampFieldName, data) - writeErrorResponse(w, err) - return - } - timeNanos, err := s.timeNanosFn(tsVal) - if err != nil { - err = fmt.Errorf("cannot determine timestamp for event %s: %v", data, err) - writeErrorResponse(w, err) - return - } - - id, err := s.idFn(v) - if err != nil { - err = fmt.Errorf("cannot determine ID for event %s: %v", data, err) - writeErrorResponse(w, err) - return - } - - // TODO(xichen): Pool the iterators. - fieldIter := value.NewFieldIterator(v) - ev := event.Event{ - ID: id, - TimeNanos: timeNanos, - FieldIter: fieldIter, - RawData: data, - } - - err = s.db.Write(namespace, ev) - if err != nil { - err = fmt.Errorf("cannot write event %s: %v", data, err) - writeErrorResponse(w, err) - return - } - - writeSuccessResponse(w) + s.writeBulk(w, data) } func (s *service) WriteBulk(w http.ResponseWriter, r *http.Request) { @@ -169,6 +104,10 @@ func (s *service) WriteBulk(w http.ResponseWriter, r *http.Request) { return } + s.writeBulk(w, data) +} + +func (s *service) writeBulk(w http.ResponseWriter, data []byte) { p := s.parserPool.Get() // TODO(xichen): Check this is correct wrt the parser lifetime. defer s.parserPool.Put(p) @@ -254,7 +193,8 @@ func (s *service) WriteBulk(w http.ResponseWriter, r *http.Request) { multiErr = multiErr.Add(err) } } - err = multiErr.FinalError() + + err := multiErr.FinalError() if err != nil { err = fmt.Errorf("cannot write event %s: %v", data, err) writeErrorResponse(w, err)