Skip to content

Commit

Permalink
Refactor/re-use code.
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Du committed Dec 5, 2018
1 parent ce88b3b commit d952f08
Showing 1 changed file with 7 additions and 67 deletions.
74 changes: 7 additions & 67 deletions server/http/handlers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,72 +85,7 @@ func (s *service) Write(w http.ResponseWriter, r *http.Request) {
return
}

p := s.parserPool.Get()
// TODO(xichen): Check this is correct wrt the parser lifetime.
defer s.parserPool.Put(p)

v, err := p.ParseBytes(data)
if err != nil {
err = fmt.Errorf("cannot parse event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

// NB: Perhaps better to specify as a URL param.
// Extract event namespace from JSON.
namespaceFieldName := s.dbOpts.NamespaceFieldName()
namespaceVal, ok := v.Get(namespaceFieldName)
if !ok {
err = fmt.Errorf("cannot find namespace field %s for event %v", namespaceFieldName, data)
writeErrorResponse(w, err)
return
}
namespace, err := s.namespaceFn(namespaceVal)
if err != nil {
err = fmt.Errorf("cannot determine namespace for event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

// Extract event timestamp from JSON.
timestampFieldName := s.dbOpts.TimestampFieldName()
tsVal, ok := v.Get(timestampFieldName)
if !ok {
err = fmt.Errorf("cannot find timestamp field %s for event %v", timestampFieldName, data)
writeErrorResponse(w, err)
return
}
timeNanos, err := s.timeNanosFn(tsVal)
if err != nil {
err = fmt.Errorf("cannot determine timestamp for event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

id, err := s.idFn(v)
if err != nil {
err = fmt.Errorf("cannot determine ID for event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

// TODO(xichen): Pool the iterators.
fieldIter := value.NewFieldIterator(v)
ev := event.Event{
ID: id,
TimeNanos: timeNanos,
FieldIter: fieldIter,
RawData: data,
}

err = s.db.Write(namespace, ev)
if err != nil {
err = fmt.Errorf("cannot write event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

writeSuccessResponse(w)
s.writeBulk(w, data)
}

func (s *service) WriteBulk(w http.ResponseWriter, r *http.Request) {
Expand All @@ -169,6 +104,10 @@ func (s *service) WriteBulk(w http.ResponseWriter, r *http.Request) {
return
}

s.writeBulk(w, data)
}

func (s *service) writeBulk(w http.ResponseWriter, data []byte) {
p := s.parserPool.Get()
// TODO(xichen): Check this is correct wrt the parser lifetime.
defer s.parserPool.Put(p)
Expand Down Expand Up @@ -254,7 +193,8 @@ func (s *service) WriteBulk(w http.ResponseWriter, r *http.Request) {
multiErr = multiErr.Add(err)
}
}
err = multiErr.FinalError()

err := multiErr.FinalError()
if err != nil {
err = fmt.Errorf("cannot write event %s: %v", data, err)
writeErrorResponse(w, err)
Expand Down

0 comments on commit d952f08

Please sign in to comment.