Skip to content

Commit

Permalink
Merge pull request #83 from kradalby/more-integration-tests
Browse files Browse the repository at this point in the history
Improve reliability of PollMapHandler, more integration tests
  • Loading branch information
kradalby committed Aug 13, 2021
2 parents 5bfcf5c + a8d9fdc commit 5b1b40c
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 139 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// ignoring it let us speed up the integration test
// development
integration_test.go
integration_test/

Dockerfile*
docker-compose*
Expand Down
125 changes: 30 additions & 95 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,21 +286,6 @@ func (h *Headscale) PollNetMapHandler(c *gin.Context) {
}
h.db.Save(&m)

update := make(chan []byte, 1)

pollData := make(chan []byte, 1)
defer close(pollData)

cancelKeepAlive := make(chan []byte, 1)
defer close(cancelKeepAlive)

log.Trace().
Str("handler", "PollNetMap").
Str("id", c.Param("id")).
Str("machine", m.Name).
Msg("Storing update channel")
h.clientsPolling.Store(m.ID, update)

data, err := h.getMapResponse(mKey, req, m)
if err != nil {
log.Error().
Expand Down Expand Up @@ -351,6 +336,23 @@ func (h *Headscale) PollNetMapHandler(c *gin.Context) {
return
}

// Only create update channel if it has not been created
var update chan []byte
log.Trace().
Str("handler", "PollNetMap").
Str("id", c.Param("id")).
Str("machine", m.Name).
Msg("Creating or loading update channel")
if result, ok := h.clientsPolling.LoadOrStore(m.ID, make(chan []byte, 1)); ok {
update = result.(chan []byte)
}

pollData := make(chan []byte, 1)
defer close(pollData)

cancelKeepAlive := make(chan []byte, 1)
defer close(cancelKeepAlive)

log.Info().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Expand All @@ -365,87 +367,15 @@ func (h *Headscale) PollNetMapHandler(c *gin.Context) {
Str("handler", "PollNetMap").
Str("machine", m.Name).
Msg("Notifying peers")
peers, _ := h.getPeers(m)
for _, p := range *peers {
pUp, ok := h.clientsPolling.Load(uint64(p.ID))
if ok {
log.Info().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Str("peer", m.Name).
Str("address", p.Addresses[0].String()).
Msgf("Notifying peer %s (%s)", p.Name, p.Addresses[0])
pUp.(chan []byte) <- []byte{}
} else {
log.Info().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Str("peer", m.Name).
Msgf("Peer %s does not appear to be polling", p.Name)
}
}

go h.keepAlive(cancelKeepAlive, pollData, mKey, req, m)
// TODO: Why does this block?
go h.notifyChangesToPeers(&m)

c.Stream(func(w io.Writer) bool {
select {
case data := <-pollData:
log.Trace().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Int("bytes", len(data)).
Msg("Sending data")
_, err := w.Write(data)
if err != nil {
log.Error().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Err(err).
Msg("Cannot write data")
}
now := time.Now().UTC()
m.LastSeen = &now
h.db.Save(&m)
return true

case <-update:
log.Debug().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Msg("Received a request for update")
data, err := h.getMapResponse(mKey, req, m)
if err != nil {
log.Error().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Err(err).
Msg("Could not get the map update")
}
_, err = w.Write(*data)
if err != nil {
log.Error().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Err(err).
Msg("Could not write the map response")
}
return true

case <-c.Request.Context().Done():
log.Info().
Str("handler", "PollNetMap").
Str("machine", m.Name).
Msg("The client has closed the connection")
now := time.Now().UTC()
m.LastSeen = &now
h.db.Save(&m)
cancelKeepAlive <- []byte{}
h.clientsPolling.Delete(m.ID)
close(update)
return false

}
})
h.PollNetMapStream(c, m, req, mKey, pollData, update, cancelKeepAlive)
log.Trace().
Str("handler", "PollNetMap").
Str("id", c.Param("id")).
Str("machine", m.Name).
Msg("Finished stream, closing PollNetMap session")
}

func (h *Headscale) keepAlive(cancel chan []byte, pollData chan []byte, mKey wgkey.Key, req tailcfg.MapRequest, m Machine) {
Expand Down Expand Up @@ -514,10 +444,15 @@ func (h *Headscale) getMapResponse(mKey wgkey.Key, req tailcfg.MapRequest, m Mac
DERPMap: h.cfg.DerpMap,
UserProfiles: []tailcfg.UserProfile{profile},
}
log.Trace().
Str("func", "getMapResponse").
Str("machine", req.Hostinfo.Hostname).
Msgf("Generated map response: %s", tailMapResponseToString(resp))

var respBody []byte
if req.Compress == "zstd" {
src, _ := json.Marshal(resp)

encoder, _ := zstd.NewWriter(nil)
srcCompressed := encoder.EncodeAll(src, nil)
respBody, err = encodeMsg(srcCompressed, &mKey, h.privateKey)
Expand Down
5 changes: 1 addition & 4 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,7 @@ func (h *Headscale) expireEphemeralNodesWorker() {
if err != nil {
log.Error().Err(err).Str("machine", m.Name).Msg("🤮 Cannot delete ephemeral machine from the database")
}
err = h.notifyChangesToPeers(&m)
if err != nil {
continue
}
h.notifyChangesToPeers(&m)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/rs/zerolog v1.23.0 // indirect
github.com/spf13/cobra v1.1.3
github.com/spf13/viper v1.8.1
github.com/stretchr/testify v1.7.0 // indirect
github.com/tailscale/hujson v0.0.0-20200924210142-dde312d0d6a2
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,7 @@ github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5J
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As=
github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
Loading

0 comments on commit 5b1b40c

Please sign in to comment.