Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/cache-offloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"neurocode.io/cache-offloader/pkg/metrics"
"neurocode.io/cache-offloader/pkg/probes"
"neurocode.io/cache-offloader/pkg/storage"
"neurocode.io/cache-offloader/pkg/worker"
)

func getInMemoryStorage(cfg config.Config) http.Cacher {
Expand Down Expand Up @@ -61,8 +62,10 @@ func main() {
cfg := config.New()
setupLogging(cfg.ServerConfig.LogLevel)
m := metrics.NewPrometheusCollector()
maxInFlightRevalidationRequests := 1000
opts := http.ServerOpts{
Config: cfg,
Worker: worker.NewUpdateQueue(maxInFlightRevalidationRequests),
MetricsCollector: m,
ReadinessChecker: probes.NewReadinessChecker(),
}
Expand Down
37 changes: 36 additions & 1 deletion pkg/http/cache-mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 55 additions & 46 deletions pkg/http/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@ import (
)

//go:generate mockgen -source=./cache.go -destination=./cache-mock_test.go -package=http
type Cacher interface {
LookUp(context.Context, string) (*model.Response, error)
Store(context.Context, string, *model.Response) error
}
type (
Worker interface {
Start(string, func())
}
Cacher interface {
LookUp(context.Context, string) (*model.Response, error)
Store(context.Context, string, *model.Response) error
}

type MetricsCollector interface {
CacheHit(method string, statusCode int)
CacheMiss(method string, statusCode int)
}
MetricsCollector interface {
CacheHit(method string, statusCode int)
CacheMiss(method string, statusCode int)
}

type handler struct {
cacher Cacher
metricsCollector MetricsCollector
cfg config.CacheConfig
}
handler struct {
cacher Cacher
worker Worker
metricsCollector MetricsCollector
cfg config.CacheConfig
}
)

func handleGzipServeErr(err error) {
if err != nil {
Expand Down Expand Up @@ -94,47 +100,50 @@ func errHandler(res http.ResponseWriter, req *http.Request, err error) {
http.Error(res, "service unavailable", http.StatusBadGateway)
}

func newCacheHandler(c Cacher, m MetricsCollector, cfg config.CacheConfig) handler {
func newCacheHandler(c Cacher, m MetricsCollector, w Worker, cfg config.CacheConfig) handler {
return handler{
cacher: c,
worker: w,
metricsCollector: m,
cfg: cfg,
}
}

func (h handler) asyncCacheRevalidate(hashKey string, res http.ResponseWriter, req *http.Request) {
ctx := context.Background()
newReq := req.WithContext(ctx)

netTransport := &http.Transport{
MaxIdleConnsPerHost: 1000,
DisableKeepAlives: false,
IdleConnTimeout: time.Hour * 1,
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
}
client := &http.Client{
Timeout: time.Second * 10,
Transport: netTransport,
}

newReq.URL.Host = h.cfg.DownstreamHost.Host
newReq.URL.Scheme = h.cfg.DownstreamHost.Scheme
newReq.RequestURI = ""
resp, err := client.Do(newReq)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Errored when sending request to the server")
func (h handler) asyncCacheRevalidate(hashKey string, req *http.Request) func() {
return func() {
ctx := context.Background()
newReq := req.WithContext(ctx)

netTransport := &http.Transport{
MaxIdleConnsPerHost: 1000,
DisableKeepAlives: false,
IdleConnTimeout: time.Hour * 1,
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
}
client := &http.Client{
Timeout: time.Second * 10,
Transport: netTransport,
}

return
}
err = h.cacheResponse(ctx, hashKey)(resp)
newReq.URL.Host = h.cfg.DownstreamHost.Host
newReq.URL.Scheme = h.cfg.DownstreamHost.Scheme
newReq.RequestURI = ""
resp, err := client.Do(newReq)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Errored when sending request to the server")

if err != nil {
log.Print("Error occurred caching response")
return
}
err = h.cacheResponse(ctx, hashKey)(resp)

if err != nil {
log.Print("Error occurred caching response")
}
}
}

Expand Down Expand Up @@ -178,7 +187,7 @@ func (h handler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
h.metricsCollector.CacheHit(req.Method, result.Status)

if result.IsStale() {
go h.asyncCacheRevalidate(hashKey, res, req)
go h.worker.Start(hashKey, h.asyncCacheRevalidate(hashKey, req))
}
serveResponseFromMemory(res, result)
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/http/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func mustURL(t *testing.T, downstreamURL string) *url.URL {

func TestCacheHandler(t *testing.T) {
ctrl := gomock.NewController(t)
// defer ctrl.Finish()
defer ctrl.Finish()

proxied := http.StatusUseProxy
endpoint := "/status/200?q=1"
Expand Down Expand Up @@ -286,13 +286,18 @@ func TestCacheHandler(t *testing.T) {
cfg: config.CacheConfig{
DownstreamHost: mustURL(t, downstreamServer.URL),
},
worker: func() Worker {
mock := NewMockWorker(ctrl)
mock.EXPECT().Start(gomock.Any(), gomock.Any())

return mock
}(),
cacher: func() Cacher {
mock := NewMockCacher(ctrl)
mock.EXPECT().LookUp(gomock.Any(), gomock.Any()).Return(&model.Response{
Status: http.StatusOK,
Body: []byte("hello"),
}, nil)
// mock.EXPECT().Store(gomock.Any(), gomock.Any(), gomock.Any())

return mock
}(),
Expand Down
3 changes: 2 additions & 1 deletion pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import (
type ServerOpts struct {
Config config.Config
Cacher Cacher
Worker Worker
MetricsCollector MetricsCollector
ReadinessChecker ReadinessChecker
}

func RunServer(opts ServerOpts) {
mux := h.NewServeMux()
mux.Handle("/", newCacheHandler(opts.Cacher, opts.MetricsCollector, opts.Config.CacheConfig))
mux.Handle("/", newCacheHandler(opts.Cacher, opts.MetricsCollector, opts.Worker, opts.Config.CacheConfig))
mux.Handle("/metrics/prometheus", metricsHandler())
mux.HandleFunc("/probes/liveness", livenessHandler)

Expand Down
3 changes: 3 additions & 0 deletions pkg/metrics/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ func TestPrometheusMetrics(t *testing.T) {
t.Run("should return a prometheus registry", func(t *testing.T) {
collector := NewPrometheusCollector()
assert.NotNil(t, collector)

prometheus.Unregister(collector.httpMetrics)
})

t.Run("should use NA for invalid HTTP method", func(t *testing.T) {
Expand All @@ -21,5 +23,6 @@ func TestPrometheusMetrics(t *testing.T) {

assert.Nil(t, err)
assert.NotNil(t, metric)
prometheus.Unregister(collector.httpMetrics)
})
}
81 changes: 0 additions & 81 deletions pkg/storage/debouncer.go

This file was deleted.

48 changes: 48 additions & 0 deletions pkg/worker/cache-updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package worker

import (
"sync"

"github.com/rs/zerolog/log"
)

type UpdateQueue struct {
mtx sync.RWMutex
queue map[string]bool
size int
}

func NewUpdateQueue(size int) *UpdateQueue {
if size <= 0 {
size = 1000
}

return &UpdateQueue{
queue: make(map[string]bool, size),
size: size,
}
}

func (debouncer *UpdateQueue) Start(key string, work func()) {
if len(debouncer.queue) >= debouncer.size {
log.Warn().Msg("UpdateQueue is full, dropping request")

return
}

debouncer.mtx.Lock()

if _, ok := debouncer.queue[key]; ok {
debouncer.mtx.Unlock()

return
}
debouncer.queue[key] = true
debouncer.mtx.Unlock()

work()

debouncer.mtx.Lock()
delete(debouncer.queue, key)
debouncer.mtx.Unlock()
}
Loading