Skip to content

Commit

Permalink
improve performance of record lookups; elimlinate redundant A records…
Browse files Browse the repository at this point in the history
… in SRV additional-records section
  • Loading branch information
James DeFelice committed Feb 11, 2016
1 parent 53e1e68 commit a2bc582
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 87 deletions.
140 changes: 76 additions & 64 deletions records/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,51 @@ import (
// Map host/service name to DNS answer
// REFACTOR - when discoveryinfo is integrated
// Will likely become map[string][]discoveryinfo
type rrs map[string][]string
type rrs map[string]map[string]struct{}

func (r rrs) add(name, host string) bool {
if host == "" {
return false
}
v, ok := r[name]
if !ok {
v = make(map[string]struct{})
r[name] = v
} else {
// don't overwrite existing values
_, ok = v[host]
if ok {
return false
}
}
v[host] = struct{}{}
return true
}

func (r rrs) First(name string) (string, bool) {
for host := range r[name] {
return host, true
}
return "", false
}

type rrsKind string

const (
A rrsKind = "A"
SRV = "SRV"
)

func (kind rrsKind) rrs(rg *RecordGenerator) rrs {
switch kind {
case A:
return rg.As
case SRV:
return rg.SRVs
default:
return nil
}
}

// RecordGenerator contains DNS records and methods to access and manipulate
// them. TODO(kozyraki): Refactor when discovery id is available.
Expand Down Expand Up @@ -226,10 +270,10 @@ func (rg *RecordGenerator) frameworkRecords(sj state.State, domain string, spec
host, port := f.HostPort()
if address, ok := hostToIP4(host); ok {
a := fname + "." + domain + "."
rg.insertRR(a, address, "A")
rg.insertRR(a, address, A)
if port != "" {
srv := net.JoinHostPort(a, port)
rg.insertRR("_framework._tcp."+a, srv, "SRV")
rg.insertRR("_framework._tcp."+a, srv, SRV)
}
}
}
Expand All @@ -243,9 +287,9 @@ func (rg *RecordGenerator) slaveRecords(sj state.State, domain string, spec labe
address, ok := hostToIP4(slave.PID.Host)
if ok {
a := "slave." + domain + "."
rg.insertRR(a, address, "A")
rg.insertRR(a, address, A)
srv := net.JoinHostPort(a, slave.PID.Port)
rg.insertRR("_slave._tcp."+domain+".", srv, "SRV")
rg.insertRR("_slave._tcp."+domain+".", srv, SRV)
} else {
logging.VeryVerbose.Printf("string '%q' for slave with id %q is not a valid IP address", address, slave.ID)
address = labels.DomainFrag(address, labels.Sep, spec)
Expand Down Expand Up @@ -295,16 +339,16 @@ func (rg *RecordGenerator) masterRecord(domain string, masters []string, leader
return
}
arec := "leader." + domain + "."
rg.insertRR(arec, ip, "A")
rg.insertRR(arec, ip, A)
arec = "master." + domain + "."
rg.insertRR(arec, ip, "A")
rg.insertRR(arec, ip, A)

// SRV records
tcp := "_leader._tcp." + domain + "."
udp := "_leader._udp." + domain + "."
host := "leader." + domain + "." + ":" + port
rg.insertRR(tcp, host, "SRV")
rg.insertRR(udp, host, "SRV")
rg.insertRR(tcp, host, SRV)
rg.insertRR(udp, host, SRV)

// if there is a list of masters, insert that as well
addedLeaderMasterN := false
Expand All @@ -319,7 +363,7 @@ func (rg *RecordGenerator) masterRecord(domain string, masters []string, leader
// A records (master and masterN)
if master != leaderAddress {
arec := "master." + domain + "."
added := rg.insertRR(arec, masterIP, "A")
added := rg.insertRR(arec, masterIP, A)
if !added {
// duplicate master?!
continue
Expand All @@ -332,7 +376,7 @@ func (rg *RecordGenerator) masterRecord(domain string, masters []string, leader
}

arec := "master" + strconv.Itoa(idx) + "." + domain + "."
rg.insertRR(arec, masterIP, "A")
rg.insertRR(arec, masterIP, A)
idx++

if master == leaderAddress {
Expand All @@ -346,7 +390,7 @@ func (rg *RecordGenerator) masterRecord(domain string, masters []string, leader
logging.Error.Printf("warning: leader %q is not in master list", leader)
}
arec = "master" + strconv.Itoa(idx) + "." + domain + "."
rg.insertRR(arec, ip, "A")
rg.insertRR(arec, ip, A)
}
}

Expand All @@ -355,9 +399,9 @@ func (rg *RecordGenerator) listenerRecord(listener string, ns string) {
if listener == "0.0.0.0" {
rg.setFromLocal(listener, ns)
} else if listener == "127.0.0.1" {
rg.insertRR(ns, "127.0.0.1", "A")
rg.insertRR(ns, "127.0.0.1", A)
} else {
rg.insertRR(ns, listener, "A")
rg.insertRR(ns, listener, A)
}
}

Expand Down Expand Up @@ -394,11 +438,11 @@ func (rg *RecordGenerator) taskRecords(sj state.State, domain string, spec label
canonical := ctx.taskName + "-" + ctx.taskID + "-" + ctx.slaveID + "." + fname
arec := ctx.taskName + "." + fname

rg.insertRR(arec+tail, ctx.taskIP, "A")
rg.insertRR(canonical+tail, ctx.taskIP, "A")
rg.insertRR(arec+tail, ctx.taskIP, A)
rg.insertRR(canonical+tail, ctx.taskIP, A)

rg.insertRR(arec+".slave"+tail, ctx.slaveIP, "A")
rg.insertRR(canonical+".slave"+tail, ctx.slaveIP, "A")
rg.insertRR(arec+".slave"+tail, ctx.slaveIP, A)
rg.insertRR(canonical+".slave"+tail, ctx.slaveIP, A)

// Add RFC 2782 SRV records
slaveHost := canonical + ".slave" + tail
Expand All @@ -408,12 +452,12 @@ func (rg *RecordGenerator) taskRecords(sj state.State, domain string, spec label
slaveTarget := slaveHost + ":" + port

if !task.HasDiscoveryInfo() {
rg.insertRR(tcpName+tail, slaveTarget, "SRV")
rg.insertRR(udpName+tail, slaveTarget, "SRV")
rg.insertRR(tcpName+tail, slaveTarget, SRV)
rg.insertRR(udpName+tail, slaveTarget, SRV)
}

rg.insertRR(tcpName+".slave"+tail, slaveTarget, "SRV")
rg.insertRR(udpName+".slave"+tail, slaveTarget, "SRV")
rg.insertRR(tcpName+".slave"+tail, slaveTarget, SRV)
rg.insertRR(udpName+".slave"+tail, slaveTarget, SRV)
}

if !task.HasDiscoveryInfo() {
Expand All @@ -427,10 +471,10 @@ func (rg *RecordGenerator) taskRecords(sj state.State, domain string, spec label
proto := spec(port.Protocol)
if proto != "" {
name := "_" + ctx.taskName + "._" + proto + "." + fname
rg.insertRR(name+tail, target, "SRV")
rg.insertRR(name+tail, target, SRV)
} else {
rg.insertRR(tcpName+tail, target, "SRV")
rg.insertRR(udpName+tail, target, "SRV")
rg.insertRR(tcpName+tail, target, SRV)
rg.insertRR(udpName+tail, target, SRV)
}
}
}
Expand Down Expand Up @@ -473,53 +517,21 @@ func (rg *RecordGenerator) setFromLocal(host string, ns string) {
continue
}

rg.insertRR(ns, ip.String(), "A")
rg.insertRR(ns, ip.String(), A)
}
}
}

func (rg *RecordGenerator) exists(name, host, rtype string) bool {
if rtype == "A" {
if val, ok := rg.As[name]; ok {
// check if A record already exists
// identical tasks on same slave
for _, b := range val {
if b == host {
return true
}
}
}
} else {
if val, ok := rg.SRVs[name]; ok {
// check if SRV record already exists
for _, b := range val {
if b == host {
return true
}
}
}
}
return false
}

// insertRR adds a record to the appropriate record map for the given name/host pair,
// but only if the pair is unique. returns true if added, false otherwise.
// TODO(???): REFACTOR when storage is updated
func (rg *RecordGenerator) insertRR(name, host, rtype string) bool {
if host == "" || rg.exists(name, host, rtype) {
return false
}

logging.VeryVerbose.Println("[" + rtype + "]\t" + name + ": " + host)

if rtype == "A" {
val := rg.As[name]
rg.As[name] = append(val, host)
} else {
val := rg.SRVs[name]
rg.SRVs[name] = append(val, host)
func (rg *RecordGenerator) insertRR(name, host string, kind rrsKind) (added bool) {
if rrs := kind.rrs(rg); rrs != nil {
if added = rrs.add(name, host); added {
logging.VeryVerbose.Println("[" + string(kind) + "]\t" + name + ": " + host)
}
}
return true
return
}

// leaderIP returns the ip for the mesos master
Expand Down
34 changes: 30 additions & 4 deletions records/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ func init() {
logging.SetupLogs()
}

func (rg *RecordGenerator) exists(name, host, rtype string) bool {
if rtype == "A" {
if val, ok := rg.As[name]; ok {
// check if A record already exists
// identical tasks on same slave
_, ok := val[host]
return ok
}
} else {
if val, ok := rg.SRVs[name]; ok {
// check if SRV record already exists
_, ok := val[host]
return ok
}
}
return false
}

func TestMasterRecord(t *testing.T) {
// masterRecord(domain string, masters []string, leader string)
type expectedRR struct {
Expand Down Expand Up @@ -139,9 +157,9 @@ func TestMasterRecord(t *testing.T) {
t.Fatalf("test case %d: missing expected record: name=%q host=%q rtype=%s, As=%v", i+1, e.name, e.host, e.rtype, rg.As)
}
if e.rtype == "A" {
expectedA[e.name] = append(expectedA[e.name], e.host)
expectedA.add(e.name, e.host)
} else {
expectedSRV[e.name] = append(expectedSRV[e.name], e.host)
expectedSRV.add(e.name, e.host)
}
}
if !reflect.DeepEqual(rg.As, expectedA) {
Expand Down Expand Up @@ -244,8 +262,16 @@ func TestInsertState(t *testing.T) {
{rgDocker.As, "nginx.marathon.mesos.", []string{"1.2.3.11"}},
{rgDocker.As, "car-store.marathon.slave.mesos.", []string{"1.2.3.11"}},
} {
if got := tt.rrs[tt.name]; !reflect.DeepEqual(got, tt.want) {
t.Errorf("test #%d: %q: got: %q, want: %q", i, tt.name, got, tt.want)
// convert want into a map[string]struct{} for simpler comparison
want := map[string]struct{}{}
for _, x := range tt.want {
want[x] = struct{}{}
}
if got := tt.rrs[tt.name]; !reflect.DeepEqual(got, want) {
if len(got) == 0 && len(want) == 0 {
continue
}
t.Errorf("test #%d: %q: got: %q, want: %q", i, tt.name, got, want)
}
}
}
Expand Down
31 changes: 19 additions & 12 deletions resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func (res *Resolver) HandleMesos(w dns.ResponseWriter, r *dns.Msg) {

func (res *Resolver) handleSRV(rs *records.RecordGenerator, name string, m, r *dns.Msg) error {
var errs multiError
for _, srv := range rs.SRVs[name] {
added := map[string]struct{}{} // track the A RR's we've already added, avoid dups
for srv := range rs.SRVs[name] {
srvRR, err := res.formatSRV(r.Question[0].Name, srv)
if err != nil {
errs.Add(err)
Expand All @@ -331,24 +332,30 @@ func (res *Resolver) handleSRV(rs *records.RecordGenerator, name string, m, r *d

m.Answer = append(m.Answer, srvRR)
host := strings.Split(srv, ":")[0]
if len(rs.As[host]) == 0 {
if _, found := added[host]; found {
// avoid dups
continue
}

aRR, err := res.formatA(host, rs.As[host][0])
if err != nil {
errs.Add(err)
if len(rs.As[host]) == 0 {
continue
}

m.Extra = append(m.Extra, aRR)
if a, ok := rs.As.First(host); ok {
aRR, err := res.formatA(host, a)
if err != nil {
errs.Add(err)
continue
}
m.Extra = append(m.Extra, aRR)
added[host] = struct{}{}
}
}
return errs
}

func (res *Resolver) handleA(rs *records.RecordGenerator, name string, m *dns.Msg) error {
var errs multiError
for _, a := range rs.As[name] {
for a := range rs.As[name] {
rr, err := res.formatA(name, a)
if err != nil {
errs.Add(err)
Expand Down Expand Up @@ -527,7 +534,7 @@ func (res *Resolver) RestHost(req *restful.Request, resp *restful.Response) {

aRRs := rs.As[dom]
records := make([]record, 0, len(aRRs))
for _, ip := range aRRs {
for ip := range aRRs {
records = append(records, record{dom, ip})
}

Expand Down Expand Up @@ -583,11 +590,11 @@ func (res *Resolver) RestService(req *restful.Request, resp *restful.Response) {

srvRRs := rs.SRVs[dom]
records := make([]record, 0, len(srvRRs))
for _, s := range srvRRs {
for s := range srvRRs {
host, port, _ := net.SplitHostPort(s)
var ip string
if r := rs.As[host]; len(r) != 0 {
ip = r[0]
if r, ok := rs.As.First(host); ok {
ip = r
}
records = append(records, record{service, host, ip, port})
}
Expand Down

0 comments on commit a2bc582

Please sign in to comment.