Skip to content

Commit

Permalink
Refactor server code into m3 server pattern.
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Du committed Nov 19, 2018
1 parent 3f76cd8 commit beabba1
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 34 deletions.
65 changes: 63 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package: github.com/xichen2020/eventdb
import:
- package: gopkg.in/validator.v2
- package: github.com/uber-go/tally
version: ~3.3.7
testImport:
- package: github.com/stretchr/testify
version: ^1.2.2
Expand Down
103 changes: 103 additions & 0 deletions server/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package server

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"

xerrors "github.com/m3db/m3x/errors"
"github.com/xichen2020/eventdb/storage"
)

// Paths.
const (
HealthPath = "/health"
)

// Errors.
var (
errInvalidRequestMethod = func(allowedHTTPMethods []string) error {
return xerrors.NewInvalidParamsError(
fmt.Errorf(
"invalid request method, allowed methods: %s",
strings.Join(allowedHTTPMethods, ","),
),
)
}
)

// Response is a JSON HTTP response.
type Response struct {
State string `json:"state,omitempty"`
Error string `json:"error,omitempty"`
}

func registerHandlers(mux *http.ServeMux, store storage.Storage) {
registerHealthHandler(mux)
}

func registerHealthHandler(mux *http.ServeMux) {
mux.HandleFunc(HealthPath, handle(func(w http.ResponseWriter, r *http.Request) {
writeSuccessResponse(w)
}, []string{http.MethodGet}))
}

func handle(next func(w http.ResponseWriter, r *http.Request), allowedHTTPMethods []string) func(http.ResponseWriter, *http.Request) {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")

isValidMethod := false
for _, method := range allowedHTTPMethods {
if strings.ToUpper(r.Method) == method {
isValidMethod = true
}
}
if !isValidMethod {
writeErrorResponse(w, errInvalidRequestMethod(allowedHTTPMethods))
return
}
next(w, r)
})
}

func newSuccessResponse() Response {
return Response{State: "OK"}
}

func newErrorResponse(err error) Response {
var errStr string
if err != nil {
errStr = err.Error()
}
return Response{State: "Error", Error: errStr}
}

func writeSuccessResponse(w http.ResponseWriter) {
response := newSuccessResponse()
writeResponse(w, response, nil)
}

func writeErrorResponse(w http.ResponseWriter, err error) {
writeResponse(w, nil, err)
}

func writeResponse(w http.ResponseWriter, resp interface{}, err error) {
buf := bytes.NewBuffer(nil)
if encodeErr := json.NewEncoder(buf).Encode(&resp); encodeErr != nil {
w.WriteHeader(http.StatusInternalServerError)
resp = newErrorResponse(encodeErr)
json.NewEncoder(w).Encode(&resp)
return
}

if err == nil {
w.WriteHeader(http.StatusOK)
} else if xerrors.IsInvalidParams(err) {
w.WriteHeader(http.StatusBadRequest)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
w.Write(buf.Bytes())
}
56 changes: 56 additions & 0 deletions server/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package server

import "time"

const (
defaultReadTimeout = 10 * time.Second
defaultWriteTimeout = 10 * time.Second
)

// Options is a set of server options.
type Options interface {
// SetReadTimeout sets the read timeout.
SetReadTimeout(value time.Duration) Options

// ReadTimeout returns the read timeout.
ReadTimeout() time.Duration

// SetWriteTimeout sets the write timeout.
SetWriteTimeout(value time.Duration) Options

// WriteTimeout returns the write timeout.
WriteTimeout() time.Duration
}

type options struct {
readTimeout time.Duration
writeTimeout time.Duration
}

// NewOptions creates a new set of server options.
func NewOptions() Options {
return &options{
readTimeout: defaultReadTimeout,
writeTimeout: defaultWriteTimeout,
}
}

func (o *options) SetReadTimeout(value time.Duration) Options {
opts := *o
opts.readTimeout = value
return &opts
}

func (o *options) ReadTimeout() time.Duration {
return o.readTimeout
}

func (o *options) SetWriteTimeout(value time.Duration) Options {
opts := *o
opts.writeTimeout = value
return &opts
}

func (o *options) WriteTimeout() time.Duration {
return o.writeTimeout
}
63 changes: 48 additions & 15 deletions server/server.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,66 @@
package server

import (
"fmt"
"net"
"net/http"

"github.com/xichen2020/eventdb/storage"
)

// Options for a server.
type Options struct {
Port int
}
"github.com/m3db/m3x/pprof"
xserver "github.com/m3db/m3x/server"
)

// Server exposes http endpoints that handle reads and writes.
// Uses the default servemux under the hood.
type Server struct {
opts Options
store storage.Storage
type server struct {
opts Options
address string
listener net.Listener
store storage.Storage
}

// New returns a new server instance.
func New(opts Options, store storage.Storage) *Server {
return &Server{
opts: opts,
store: store,
func New(address string, store storage.Storage, opts Options) xserver.Server {
return &server{
opts: opts,
address: address,
store: store,
}
}

// ListenAndServe runs the server.
func (s *server) ListenAndServe() error {
listener, err := net.Listen("tcp", s.address)
if err != nil {
return err
}
return s.Serve(listener)
}

// Serve runs the server.
func (s *Server) Serve() error {
return http.ListenAndServe(fmt.Sprintf(":%d", s.opts.Port), nil)
func (s *server) Serve(l net.Listener) error {
mux := http.NewServeMux()
registerHandlers(mux, s.store)
pprof.RegisterHandler(mux)

server := http.Server{
Handler: mux,
ReadTimeout: s.opts.ReadTimeout(),
WriteTimeout: s.opts.WriteTimeout(),
}

s.listener = l
s.address = l.Addr().String()

go func() {
server.Serve(l)
}()

return nil
}

func (s *server) Close() {
if s.listener != nil {
s.listener.Close()
}
}
17 changes: 10 additions & 7 deletions services/eventdb/config/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package config

import (
"github.com/xichen2020/eventdb/server"
"github.com/xichen2020/eventdb/storage"
)

// Config holds all eventdb config options.
type Config struct {
Storage storage.Options `yaml"storage"`
Server server.Options `yaml:"server"`
Storage StorageConfig `yaml:"storage"`
Server ServerConfig `yaml:"server"`
}

// StorageConfig holds storage options.
type StorageConfig struct{}

// ServerConfig holds server options.
type ServerConfig struct {
Address string `yaml:"address"`
}
12 changes: 5 additions & 7 deletions services/eventdb/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ func main() {
}

// Instantiate DB.
db := storage.New(cfg.Storage)
db := storage.New(storage.NewOptions())

// Spin up server.
s := server.New(cfg.Server, db)
go func() {
if err := s.Serve(); err != nil {
logger.Fatalf("Failed to serve HTTP endpoints: %v", err)
}
}()
s := server.New(cfg.Server.Address, db, server.NewOptions())
if err := s.ListenAndServe(); err != nil {
logger.Fatalf("Failed to serve HTTP endpoints: %v", err)
}
}

func init() {
Expand Down
11 changes: 11 additions & 0 deletions storage/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package storage

// Options to configure a storage instance.
type Options interface{}

type options struct{}

// NewOptions creates a new set of storage options.
func NewOptions() Options {
return &options{}
}
3 changes: 0 additions & 3 deletions storage/database.go → storage/storage.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package storage

// Options to configure a database instance.
type Options struct{}

// Storage exposes methods to write and query data.
type Storage interface {
WriteBatch([][]byte) error
Expand Down

0 comments on commit beabba1

Please sign in to comment.