Skip to content

Commit

Permalink
Merge pull request #28 from sudo-bmitch/pr-http-server
Browse files Browse the repository at this point in the history
Adding Server.Run and Shutdown calls
  • Loading branch information
sudo-bmitch committed Dec 23, 2023
2 parents a0e225f + f295744 commit f4bae9d
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 43 deletions.
12 changes: 6 additions & 6 deletions blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/olareg/olareg/types"
)

func (s *server) blobGet(repoStr, arg string) http.HandlerFunc {
func (s *Server) blobGet(repoStr, arg string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
repo, err := s.store.RepoGet(repoStr)
if err != nil {
Expand Down Expand Up @@ -64,7 +64,7 @@ func (s *server) blobGet(repoStr, arg string) http.HandlerFunc {
}
}

func (s *server) blobDelete(repoStr, arg string) http.HandlerFunc {
func (s *Server) blobDelete(repoStr, arg string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
repo, err := s.store.RepoGet(repoStr)
if err != nil {
Expand Down Expand Up @@ -109,7 +109,7 @@ type blobUploadState struct {
Offset int64 `json:"offset"`
}

func (s *server) blobUploadGet(repoStr, sessionID string) http.HandlerFunc {
func (s *Server) blobUploadGet(repoStr, sessionID string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
blobUploadMu.Lock()
bc, ok := blobUploadSessions[repoStr+":"+sessionID]
Expand Down Expand Up @@ -141,7 +141,7 @@ func (s *server) blobUploadGet(repoStr, sessionID string) http.HandlerFunc {
}
}

func (s *server) blobUploadPost(repoStr string) http.HandlerFunc {
func (s *Server) blobUploadPost(repoStr string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// start a new upload session with the backend storage and track as current upload
repo, err := s.store.RepoGet(repoStr)
Expand Down Expand Up @@ -243,7 +243,7 @@ func (s *server) blobUploadPost(repoStr string) http.HandlerFunc {
}
}

func (s *server) blobUploadPatch(repoStr, sessionID string) http.HandlerFunc {
func (s *Server) blobUploadPatch(repoStr, sessionID string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
blobUploadMu.Lock()
bc, ok := blobUploadSessions[repoStr+":"+sessionID]
Expand Down Expand Up @@ -314,7 +314,7 @@ func (s *server) blobUploadPatch(repoStr, sessionID string) http.HandlerFunc {
}
}

func (s *server) blobUploadPut(repoStr, sessionID string) http.HandlerFunc {
func (s *Server) blobUploadPut(repoStr, sessionID string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
blobUploadMu.Lock()
bc, ok := blobUploadSessions[repoStr+":"+sessionID]
Expand Down
51 changes: 34 additions & 17 deletions cmd/olareg/serve.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package main

import (
"context"
"fmt"
"net"
"net/http"
"time"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"

Expand All @@ -16,6 +17,8 @@ type serveOpts struct {
root *rootOpts
addr string
port int
tlsCert string
tlsKey string
storeType string
storeDir string
apiPush bool
Expand All @@ -35,7 +38,9 @@ func newServeCmd(root *rootOpts) *cobra.Command {
RunE: opts.run,
}
newCmd.Flags().StringVar(&opts.addr, "addr", "", "listener interface or address")
newCmd.Flags().IntVar(&opts.port, "port", 80, "listener port")
newCmd.Flags().IntVar(&opts.port, "port", 5000, "listener port")
newCmd.Flags().StringVar(&opts.tlsCert, "tls-cert", "", "TLS certificate for HTTPS")
newCmd.Flags().StringVar(&opts.tlsKey, "tls-key", "", "TLS key for HTTPS")
newCmd.Flags().StringVar(&opts.storeDir, "dir", ".", "root directory for storage")
newCmd.Flags().StringVar(&opts.storeType, "store-type", "dir", "storage type (dir, mem)")
newCmd.Flags().BoolVar(&opts.apiPush, "api-push", true, "enable push APIs")
Expand All @@ -52,6 +57,11 @@ func (opts *serveOpts) run(cmd *cobra.Command, args []string) error {
return fmt.Errorf("unable to parse store type %s: %w", opts.storeType, err)
}
conf := config.Config{
HTTP: config.ConfigHTTP{
Addr: fmt.Sprintf("%s:%d", opts.addr, opts.port),
CertFile: opts.tlsCert,
KeyFile: opts.tlsKey,
},
Storage: config.ConfigStorage{
StoreType: storeType,
RootDir: opts.storeDir,
Expand All @@ -64,21 +74,28 @@ func (opts *serveOpts) run(cmd *cobra.Command, args []string) error {
Referrer: config.ConfigAPIReferrer{Enabled: &opts.apiReferrer},
},
}
handler := olareg.New(conf)
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", opts.addr, opts.port))
if err != nil {
return fmt.Errorf("unable to start listener: %w", err)
}
opts.root.log.Info("listening for connections", "addr", opts.addr, "port", opts.port)
s := &http.Server{
ReadHeaderTimeout: 5 * time.Second,
Handler: handler,
}
// TODO: add signal handler to shutdown
err = s.Serve(listener)
// TODO: handle different error responses, graceful exit should not error
s := olareg.New(conf)
// include signal handler to gracefully shutdown
ctx, cancel := context.WithCancel(context.Background())
cleanShutdown := make(chan struct{})
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
<-sig
opts.root.log.Debug("Interrupt received, shutting down")
err := s.Shutdown(ctx)
if err != nil {
opts.root.log.Warn("graceful shutdown failed", "err", err)
}
// clean shutdown
cancel()
close(cleanShutdown)
}()
// run the server
err = s.Run(ctx)
if err != nil {
return err
}
<-cleanShutdown
return nil
}
9 changes: 7 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ const (
)

type Config struct {
HTTP ConfigHTTP
Storage ConfigStorage
API ConfigAPI
Log slog.Logger
// TODO: TLS and listener options? not needed here if only providing handler
// TODO: GC policy, delete untagged? timeouts for partial blobs?
// TODO: proxy settings, pull only, or push+pull cache
// TODO: memory option to load from disk
// TODO: auth options (basic, bearer)
// TODO: allowed actions: get/head, put, delete, catalog
}

type ConfigHTTP struct {
Addr string // address and port to listen on, e.g. ":5000"
CertFile string // public certificate for https server (leave blank for http)
KeyFile string // private key for https server (leave blank for http)
}

type ConfigStorage struct {
Expand Down
10 changes: 5 additions & 5 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/olareg/olareg/types"
)

func (s *server) manifestDelete(repoStr, arg string) http.HandlerFunc {
func (s *Server) manifestDelete(repoStr, arg string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
repo, err := s.store.RepoGet(repoStr)
if err != nil {
Expand Down Expand Up @@ -89,7 +89,7 @@ func (s *server) manifestDelete(repoStr, arg string) http.HandlerFunc {
}
}

func (s *server) manifestGet(repoStr, arg string) http.HandlerFunc {
func (s *Server) manifestGet(repoStr, arg string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
repo, err := s.store.RepoGet(repoStr)
if err != nil {
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s *server) manifestGet(repoStr, arg string) http.HandlerFunc {
}
}

func (s *server) manifestPut(repoStr, arg string) http.HandlerFunc {
func (s *Server) manifestPut(repoStr, arg string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
tag := ""
var dExpect digest.Digest
Expand Down Expand Up @@ -378,7 +378,7 @@ func (s *server) manifestPut(repoStr, arg string) http.HandlerFunc {
}
}

func (s *server) manifestVerifyImage(repo store.Repo, m types.Manifest) []types.ErrorInfo {
func (s *Server) manifestVerifyImage(repo store.Repo, m types.Manifest) []types.ErrorInfo {
// TODO: allow validation to be disabled
es := []types.ErrorInfo{}
r, err := repo.BlobGet(m.Config.Digest)
Expand All @@ -401,7 +401,7 @@ func (s *server) manifestVerifyImage(repo store.Repo, m types.Manifest) []types.
return nil
}

func (s *server) manifestVerifyIndex(repo store.Repo, m types.Index) []types.ErrorInfo {
func (s *Server) manifestVerifyIndex(repo store.Repo, m types.Index) []types.ErrorInfo {
// TODO: allow validation to be disabled
es := []types.ErrorInfo{}
for _, d := range m.Manifests {
Expand Down
59 changes: 50 additions & 9 deletions olareg.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package olareg

import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"path"
"regexp"
"strings"
"time"

"github.com/olareg/olareg/config"
"github.com/olareg/olareg/internal/slog"
Expand All @@ -22,8 +27,8 @@ var (
)

// New runs an http handler
func New(conf config.Config) http.Handler {
s := &server{
func New(conf config.Config) *Server {
s := &Server{
conf: conf,
log: conf.Log,
}
Expand All @@ -45,15 +50,51 @@ func New(conf config.Config) http.Handler {
return s
}

type server struct {
conf config.Config
store store.Store
log slog.Logger
// TODO: add context?
type Server struct {
conf config.Config
store store.Store
log slog.Logger
httpServer *http.Server
// TODO: implement disk cache, GC handling, etc for the non-disk and non-config data
}

func (s *server) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
func (s *Server) Run(ctx context.Context) error {
if s.httpServer != nil {
return fmt.Errorf("server is already running, run shutdown first")
}
s.log.Info("launching server", "addr", s.conf.HTTP.Addr)
hs := &http.Server{
Addr: s.conf.HTTP.Addr,
ReadHeaderTimeout: 5 * time.Second,
Handler: s,
}
s.httpServer = hs
if ctx != nil {
hs.BaseContext = func(l net.Listener) context.Context { return ctx }
}
var err error
if s.conf.HTTP.CertFile != "" && s.conf.HTTP.KeyFile != "" {
err = hs.ListenAndServeTLS(s.conf.HTTP.CertFile, s.conf.HTTP.KeyFile)
} else {
err = hs.ListenAndServe()
}
// graceful exit should not error
if err != nil && errors.Is(err, http.ErrServerClosed) {
err = nil
}
return err
}

func (s *Server) Shutdown(ctx context.Context) error {
if s.httpServer == nil {
return fmt.Errorf("server is not running")
}
err := s.httpServer.Shutdown(ctx)
s.httpServer = nil
return err
}

func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
// parse request path, cleaning traversal attacks, and stripping leading and trailing slash
pathEl := strings.Split(strings.Trim(path.Clean("/"+req.URL.Path), "/"), "/")
resp.Header().Set("Docker-Distribution-API-Version", "registry/2.0")
Expand Down Expand Up @@ -173,7 +214,7 @@ func matchV2(pathEl []string, params ...string) ([]string, bool) {
return matches, true
}

func (s *server) v2Ping(resp http.ResponseWriter, req *http.Request) {
func (s *Server) v2Ping(resp http.ResponseWriter, req *http.Request) {
resp.Header().Add("Content-Type", "application/json")
resp.Header().Add("Content-Length", "2")
resp.WriteHeader(200)
Expand Down
6 changes: 3 additions & 3 deletions referrer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// referrerGet searches for the referrers response in the index.
// All errors should return an empty response, no 404's should be generated.
func (s *server) referrerGet(repoStr, arg string) http.HandlerFunc {
func (s *Server) referrerGet(repoStr, arg string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// all errors should return an empty index
i := types.Index{
Expand Down Expand Up @@ -62,7 +62,7 @@ func (s *server) referrerGet(repoStr, arg string) http.HandlerFunc {
}

// referrerAdd adds a new referrer entry to a given subject.
func (s *server) referrerAdd(repo store.Repo, subject digest.Digest, desc types.Descriptor) error {
func (s *Server) referrerAdd(repo store.Repo, subject digest.Digest, desc types.Descriptor) error {
index, err := repo.IndexGet()
if err != nil {
return err
Expand Down Expand Up @@ -132,7 +132,7 @@ func (s *server) referrerAdd(repo store.Repo, subject digest.Digest, desc types.
}

// referrerDelete removes a referrer entry from a subject.
func (s *server) referrerDelete(repo store.Repo, subject digest.Digest, desc types.Descriptor) error {
func (s *Server) referrerDelete(repo store.Repo, subject digest.Digest, desc types.Descriptor) error {
// get the index.json
index, err := repo.IndexGet()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/olareg/olareg/types"
)

func (s *server) tagList(repoStr string) http.HandlerFunc {
func (s *Server) tagList(repoStr string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
repo, err := s.store.RepoGet(repoStr)
if err != nil {
Expand Down

0 comments on commit f4bae9d

Please sign in to comment.