Skip to content

Commit

Permalink
Merge pull request #11 from ziollek/new_metrics
Browse files Browse the repository at this point in the history
new metrics, bump dependencies, more conciese logs
  • Loading branch information
ziollek committed Oct 30, 2023
2 parents f8490d9 + 311ea63 commit 023a03b
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 939 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Continious integration
name: Continuous integration

on:
workflow_dispatch:
Expand All @@ -13,7 +13,7 @@ jobs:
build:
runs-on: ubuntu-latest
container:
image: 'golang:1.16-buster'
image: 'golang:1.21'

steps:
- uses: actions/checkout@v2
Expand All @@ -22,10 +22,9 @@ jobs:
ref: ${{ github.head_ref }}

- name: Run lint
uses: golangci/golangci-lint-action@v2
uses: golangci/golangci-lint-action@v3
with:
version: v1.42.1
skip-go-installation: true
version: v1.54
skip-pkg-cache: true

- name: Run tests
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,11 @@ cluster-a.local.:5300 {
cluster-b.local.:5300 {
forward . 10.9.0.1:53
}
```
```

## Metrics

| Metric | Labels | Description |
|-----------|--------------------------------------------------------|-------------------------------------|
| request_count_total | server, qualified (that will be proxied further), type | Count of requests handled by plugin |
| sub_request_count_total | server, prefix, type, code | Count of sub-requests generated by plugin |
201 changes: 153 additions & 48 deletions gathersrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/metrics"
"github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
"strings"
"sync"
"time"
)

var proxyTypes = [...]uint16{dns.TypeSRV, dns.TypeA, dns.TypeAAAA, dns.TypeTXT}
Expand All @@ -26,66 +25,136 @@ type GatherSrv struct {
}

type NextResp struct {
Code int
Err error
Code int
Err error
empty bool
}

func (gatherSrv GatherSrv) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
var wg sync.WaitGroup
func (nr *NextResp) Reduce(subsequentResponse *NextResp) {
// return no error if at least one sub-request went well
if nr.empty || (nr.Err != nil && subsequentResponse.Err == nil) {
nr.empty = false
nr.Err = subsequentResponse.Err
nr.Code = subsequentResponse.Code
}
}

type subRequest struct {
prefix string
request *dns.Msg
}

// we need a channel that:
// * clear all remaining messages on close
// * drop all incoming messages after close
type closableChannel[T any] struct {
messageCh chan T
lockCh chan bool
open bool
}

func (cc *closableChannel[T]) Read() <-chan T {
return cc.messageCh
}

func (cc *closableChannel[T]) Deposit(message T) {
cc.lockCh <- true
defer func() {
<-cc.lockCh
}()
if cc.open {
cc.messageCh <- message
}
}

func (cc *closableChannel[T]) Close() {
cc.lockCh <- true
defer func() {
<-cc.lockCh
close(cc.messageCh)
}()
cc.open = false
for len(cc.messageCh) > 0 {
<-cc.messageCh
}
}

func newClosableChannel[T any](size int) *closableChannel[T] {
return &closableChannel[T]{
messageCh: make(chan T, size),
lockCh: make(chan bool, 1),
open: true,
}
}

state := request.Request{W: w, Req: r}
if !gatherSrv.IsQualifiedQuestion(state.Req.Question[0]) {
unqualifiedRequestCount.WithLabelValues(metrics.WithServer(ctx)).Inc()
func (gatherSrv GatherSrv) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
questionType := dns.Type(r.Question[0].Qtype).String()
if !gatherSrv.IsQualifiedQuestion(r.Question[0]) {
requestCount.WithLabelValues(metrics.WithServer(ctx), "false", questionType).Inc()
return plugin.NextOrFailure(gatherSrv.Name(), gatherSrv.Next, ctx, w, r)
}
requestCount.WithLabelValues(metrics.WithServer(ctx), "true", questionType).Inc()

qualifiedRequestCount.WithLabelValues(metrics.WithServer(ctx)).Inc()
// build proper number of sub-requests depends on defined clusters
subRequests := gatherSrv.prepareSubRequests(r)
respChan := newClosableChannel[*NextResp](len(subRequests))
defer respChan.Close()
pw := NewResponsePrinter(w, r, gatherSrv.Domain, gatherSrv.Clusters, len(subRequests))

pw := NewResponsePrinter(w, r, gatherSrv.Domain, gatherSrv.Clusters)
question := state.Req.Question[0].Name
protocolPrefix, questionWithoutPrefix := divideDomain(question)
// call sub-requests in parallel manner
doSubRequest := func(ctx context.Context, pw dns.ResponseWriter, s *subRequest) {
code, err := plugin.NextOrFailure(gatherSrv.Name(), gatherSrv.Next, ctx, pw, s.request)
subRequestCount.WithLabelValues(metrics.WithServer(ctx), s.prefix, questionType, fmt.Sprintf("%d", code))
if err != nil {
log.Warningf(
"Error occurred for: type=%s, question=%s, error=%s",
questionType,
s.request.Question[0].Name,
err,
)
}
respChan.Deposit(&NextResp{Code: code, Err: err})
}
for _, subRequestParams := range subRequests {
go doSubRequest(ctx, pw, subRequestParams)
}

// gather all responses or return partial response on context done
mergedResponse := &NextResp{empty: true}
for waitCnt := len(subRequests); waitCnt > 0; waitCnt-- {
select {
case subResponse := <-respChan.Read():
mergedResponse.Reduce(subResponse)
case <-ctx.Done():
waitCnt = 0
}
}
pw.Flush()
return mergedResponse.Code, mergedResponse.Err
}

isSend := false
respChan := make(chan NextResp, len(gatherSrv.Clusters))
func (gatherSrv GatherSrv) prepareSubRequests(r *dns.Msg) (calls []*subRequest) {
question := r.Question[0].Name
protocolPrefix, questionWithoutPrefix := divideDomain(r.Question[0].Name)

// TODO: parallel proxy requests
for _, cluster := range gatherSrv.Clusters {
if strings.HasPrefix(questionWithoutPrefix, cluster.Prefix) {
state.Req.Question[0].Name = protocolPrefix + strings.Replace(
sr := r.Copy()
sr.Question[0].Name = protocolPrefix + strings.Replace(
strings.TrimPrefix(questionWithoutPrefix, cluster.Prefix), gatherSrv.Domain, cluster.Suffix, 1,
)
pw.counter = 1
wg.Add(1)
go func(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) {
code, err := plugin.NextOrFailure(gatherSrv.Name(), gatherSrv.Next, ctx, pw, r)
respChan <- NextResp{Code: code, Err: err}
wg.Done()
}(ctx, pw, r.Copy())
isSend = true
calls = append(calls, &subRequest{cluster.Prefix, sr})
}
}
if !isSend {

if len(calls) == 0 {
for _, cluster := range gatherSrv.Clusters {
state.Req.Question[0].Name = strings.Replace(question, gatherSrv.Domain, cluster.Suffix, 1)
wg.Add(1)
go func(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) {
code, err := plugin.NextOrFailure(gatherSrv.Name(), gatherSrv.Next, ctx, pw, r)
respChan <- NextResp{Code: code, Err: err}
wg.Done()
}(ctx, pw, r.Copy())
isSend = true
sr := r.Copy()
sr.Question[0].Name = strings.Replace(question, gatherSrv.Domain, cluster.Suffix, 1)
calls = append(calls, &subRequest{cluster.Prefix, sr})
}
}
if !isSend {
close(respChan)
return plugin.NextOrFailure(gatherSrv.Name(), gatherSrv.Next, ctx, w, r)
} else {
wg.Wait()
r := <-respChan
close(respChan)
// todo - figure out better way to merge response codes & errors
return r.Code, r.Err
}
return
}

func (gatherSrv GatherSrv) IsQualifiedQuestion(question dns.Question) bool {
Expand All @@ -105,19 +174,21 @@ type GatherResponsePrinter struct {
counter int
clusters []Cluster
state *dns.Msg
start time.Time
dns.ResponseWriter
}

// NewResponsePrinter returns ResponseWriter.
func NewResponsePrinter(w dns.ResponseWriter, r *dns.Msg, domain string, clusters []Cluster) *GatherResponsePrinter {
func NewResponsePrinter(w dns.ResponseWriter, r *dns.Msg, domain string, clusters []Cluster, counter int) *GatherResponsePrinter {
return &GatherResponsePrinter{
lockCh: make(chan bool, 1),
ResponseWriter: w,
originalQuestion: r.Question[0],
domain: domain,
clusters: clusters,
counter: len(clusters),
counter: counter,
state: nil,
start: time.Now(),
}
}

Expand Down Expand Up @@ -164,8 +235,7 @@ func (w *GatherResponsePrinter) WriteMsg(res *dns.Msg) error {
w.state.Extra = append(w.state.Extra, rr)
}
}
log.Infof("gather, %v", w.state)
return w.ResponseWriter.WriteMsg(w.state)
return nil
}

func (w *GatherResponsePrinter) Masquerade(rr dns.RR) {
Expand All @@ -191,7 +261,42 @@ func (w *GatherResponsePrinter) Masquerade(rr dns.RR) {
}
}
}
}

func (w *GatherResponsePrinter) Flush() {
w.lockCh <- true
defer func() {
<-w.lockCh
}()
if w.state != nil {
if err := w.ResponseWriter.WriteMsg(w.state); err != nil {
log.Errorf(
"error occurred while writing response: question=%v, error=%s", w.originalQuestion, err,
)
}
}
w.shortMessage()
}

func (w *GatherResponsePrinter) shortMessage() {
if w.state != nil {
questionType := dns.Type(w.state.Question[0].Qtype).String()
log.Infof(
"type=%s, question=%s, response=%s, answer-records=%d, extra-records=%d, gathered=%d, not-gatherer=%d, duration=%s",
questionType,
w.state.Question[0].Name,
strings.Split(w.state.MsgHdr.String(), "\n")[0],
len(w.state.Answer),
len(w.state.Extra),
len(w.clusters)-w.counter,
w.counter,
time.Since(w.start),
)
} else {
log.Errorf(
"response printer has an empty state, original question was: %v", w.originalQuestion,
)
}
}

func divideDomain(domain string) (string, string) {
Expand Down
20 changes: 11 additions & 9 deletions gathersrv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gathersrv

import (
"context"
"errors"
"github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/test"
"github.com/miekg/dns"
Expand All @@ -19,15 +20,15 @@ type Assertion struct {
func TestShouldNotProxyUnqualifiedRequests(t *testing.T) {
unqualifiedQuestions := map[string]Assertion{
// asks about type which is not supported by plugin
"_http._tcp.demo.svc.distro.local.": {
GivenName: "_http._tcp.demo.svc.distro.local.",
"_smtp._tcp.demo.svc.distro.local.": {
GivenName: "_smtp._tcp.demo.svc.distro.local.",
GivenType: dns.TypeMX,
ExpectedRcode: dns.RcodeNameError,
ExpectedError: nil,
},
// asks about domain which is not handled
"_http._tcp.demo.svc.unsupported.local.": {
GivenName: "_http._tcp.demo.svc.distro.local.",
GivenName: "_http._tcp.demo.svc.unsupported.local.",
GivenType: dns.TypeSRV,
ExpectedRcode: dns.RcodeNameError,
ExpectedError: nil,
Expand Down Expand Up @@ -176,12 +177,12 @@ func TestShouldTranslateResponsesFromClusters(t *testing.T) {
}
}

func TestShouldTranslateResponsesWithDifferenceResponseCodesFromClusters(t *testing.T) {
func TestShouldProvideNoErrorResponseWhenNoErrorAndErrorResponsesOccurredTogether(t *testing.T) {
failResponse := Assertion{
GivenName: "_http._tcp.demo.svc.distro.local.",
GivenType: dns.TypeSRV,
ExpectedRcode: dns.RcodeNameError,
ExpectedError: nil,
ExpectedError: errors.New("timeout occurred"),
}
okResponse := Assertion{
GivenName: "_http._tcp.demo.svc.distro.local.",
Expand Down Expand Up @@ -222,14 +223,15 @@ func TestShouldTranslateResponsesWithDifferenceResponseCodesFromClusters(t *test
Next: PrepareContentNextHandler(expectedQuestions, answersFromCluster, extrasFromClusters),
Domain: "distro.local.",
Clusters: []Cluster{
{
Suffix: "cluster-a.local.",
Prefix: "a-",
},

{
Suffix: "cluster-b.local.",
Prefix: "b-",
},
{
Suffix: "cluster-a.local.",
Prefix: "a-",
},
},
}
msg := CheckAssertion(t, gatherPlugin, okResponse)
Expand Down

0 comments on commit 023a03b

Please sign in to comment.