diff --git a/go.mod b/go.mod index 911cc17ee..437f0d511 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/shirou/gopsutil v2.19.11+incompatible github.com/stretchr/testify v1.5.1 github.com/termie/go-shutil v0.0.0-20140729215957-bcacb06fecae - github.com/threefoldtech/tfexplorer v0.2.7-0.20200429090008-f462f9a8d76c + github.com/threefoldtech/tfexplorer v0.2.7-0.20200505141628-4d7a0c749784 github.com/threefoldtech/zbus v0.1.3 github.com/urfave/cli v1.22.3 github.com/vishvananda/netlink v1.0.0 diff --git a/go.sum b/go.sum index efa81cd5e..c2e31d401 100644 --- a/go.sum +++ b/go.sum @@ -557,8 +557,10 @@ github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG github.com/termie/go-shutil v0.0.0-20140729215957-bcacb06fecae h1:vgGSvdW5Lqg+I1aZOlG32uyE6xHpLdKhZzcTEktz5wM= github.com/termie/go-shutil v0.0.0-20140729215957-bcacb06fecae/go.mod h1:quDq6Se6jlGwiIKia/itDZxqC5rj6/8OdFyMMAwTxCs= github.com/threefoldtech/tfexplorer v0.2.5/go.mod h1:TugylEDgMNKk4ZIzee9PpVmIcLNLaRLdiXuhHTt/AR0= -github.com/threefoldtech/tfexplorer v0.2.7-0.20200429090008-f462f9a8d76c h1:5tx6tw6P2uzQCCAAsjubA7p4ozKJXV0TMedIPCbGDgk= -github.com/threefoldtech/tfexplorer v0.2.7-0.20200429090008-f462f9a8d76c/go.mod h1:ozAe2OYR4ALp2PtyKT1A6UiTI+0ZcFU2UfUaRd1P5eE= +github.com/threefoldtech/tfexplorer v0.2.7-0.20200505125555-7d20b5212c0e h1:HqNiMBz0RpYg8RBDQl67soy2WPjAWHCBPzb7jl1Dtxc= +github.com/threefoldtech/tfexplorer v0.2.7-0.20200505125555-7d20b5212c0e/go.mod h1:ozAe2OYR4ALp2PtyKT1A6UiTI+0ZcFU2UfUaRd1P5eE= +github.com/threefoldtech/tfexplorer v0.2.7-0.20200505141628-4d7a0c749784 h1:CCyZTGGV7BVVIGDduxLtebFsVHxJt6i2Z7rq6ostfmU= +github.com/threefoldtech/tfexplorer v0.2.7-0.20200505141628-4d7a0c749784/go.mod h1:ozAe2OYR4ALp2PtyKT1A6UiTI+0ZcFU2UfUaRd1P5eE= github.com/threefoldtech/zbus v0.1.3 h1:18DnIzximRbATle5ZdZz0i84n/bCYB8k/gkhr2dXayc= github.com/threefoldtech/zbus v0.1.3/go.mod h1:ZtiRpcqzEBJetVQDsEbw0p48h/AF3O1kf0tvd30I0BU= github.com/threefoldtech/zos v0.2.4-rc2/go.mod h1:7A2oflcmSVsHFC4slOcydWgJyFBMFMH9wsaTRv+CnTA= diff --git a/pkg/provision/explorer/source.go b/pkg/provision/explorer/source.go index 48708f4b4..56b1ca63c 100644 --- a/pkg/provision/explorer/source.go +++ b/pkg/provision/explorer/source.go @@ -30,18 +30,18 @@ func NewPoller(cl *client.Client, inputConv provision.ReservationConverterFunc, } // Poll implements provision.ReservationPoller -func (r *Poller) Poll(nodeID pkg.Identifier, from uint64) ([]*provision.Reservation, error) { +func (r *Poller) Poll(nodeID pkg.Identifier, from uint64) ([]*provision.Reservation, uint64, error) { - list, err := r.wl.Workloads(nodeID.Identity(), from) + list, lastID, err := r.wl.Workloads(nodeID.Identity(), from) if err != nil { - return nil, fmt.Errorf("error while retrieving workloads from explorer: %w", err) + return nil, 0, fmt.Errorf("error while retrieving workloads from explorer: %w", err) } result := make([]*provision.Reservation, 0, len(list)) for _, wl := range list { r, err := r.inputConv(wl) if err != nil { - return nil, err + return nil, 0, err } result = append(result, r) @@ -54,5 +54,5 @@ func (r *Poller) Poll(nodeID pkg.Identifier, from uint64) ([]*provision.Reservat }) } - return result, nil + return result, lastID, nil } diff --git a/pkg/provision/primitives/converter.go b/pkg/provision/primitives/converter.go index 65722c23c..b88c93831 100644 --- a/pkg/provision/primitives/converter.go +++ b/pkg/provision/primitives/converter.go @@ -256,8 +256,6 @@ func WorkloadToProvisionType(w workloads.ReservationWorkload) (*provision.Reserv if err != nil { return nil, err } - case nil: - // no content, no conversion default: return nil, fmt.Errorf("unknown workload type (%s) (%T)", w.Type.String(), tmp) } diff --git a/pkg/provision/reservation.go b/pkg/provision/reservation.go index f247505fb..3828a09cc 100644 --- a/pkg/provision/reservation.go +++ b/pkg/provision/reservation.go @@ -15,11 +15,6 @@ import ( // ReservationType type type ReservationType string -const ( - //NOOPReservation type - NOOPReservation ReservationType = "noop" -) - // Reservation struct type Reservation struct { // ID of the reservation diff --git a/pkg/provision/source.go b/pkg/provision/source.go index 245a734ee..86b6d9f31 100644 --- a/pkg/provision/source.go +++ b/pkg/provision/source.go @@ -27,7 +27,7 @@ type ReservationPoller interface { // from is the used as a filter to which reservation to use as // reservation.ID >= from. So a client to the Poll method should make // sure to call it with the last (MAX) reservation ID he receieved. - Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, error) + Poll(nodeID pkg.Identifier, from uint64) (reservations []*Reservation, lastID uint64, err error) } // PollSource does a long poll on address to get new and to be deleted @@ -50,7 +50,6 @@ type pollSource struct { func (s *pollSource) Reservations(ctx context.Context) <-chan *Reservation { ch := make(chan *Reservation) - // on the first run we will get all the reservation // ever made to this know, to make sure we provision // everything at boot @@ -65,7 +64,7 @@ func (s *pollSource) Reservations(ctx context.Context) <-chan *Reservation { on = time.Now().Add(s.maxSleep) log.Info().Uint64("next", next).Msg("Polling for reservations") - res, err := s.store.Poll(pkg.StrIdentifier(s.nodeID), next) + res, lastID, err := s.store.Poll(pkg.StrIdentifier(s.nodeID), next) if err != nil && err != ErrPollEOS { // if this is not a temporary error, then skip the reservation entirely // and try to get the next one @@ -73,31 +72,18 @@ func (s *pollSource) Reservations(ctx context.Context) <-chan *Reservation { log.Error().Err(err).Uint64("next", next).Msg("failed to get reservation, retry same") } else { log.Error().Err(err).Uint64("next", next).Msg("failed to get reservation") - next++ + next = lastID + 1 } continue } + next = lastID + 1 + select { case <-ctx.Done(): return default: for _, r := range res { - current, _, err := r.SplitID() - if err != nil { - log.Warn().Err(err).Str("id", r.ID).Msg("skipping reservation") - continue - } - if current >= next { - next = current + 1 - } - - if r.Type == NOOPReservation { - // special type of reservation that does nothing - // we just ignore it here. - continue - } - ch <- r } } diff --git a/pkg/provision/source_test.go b/pkg/provision/source_test.go index 478882972..f87a5cc0a 100644 --- a/pkg/provision/source_test.go +++ b/pkg/provision/source_test.go @@ -15,9 +15,9 @@ type TestPollSource struct { mock.Mock } -func (s *TestPollSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, error) { +func (s *TestPollSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, uint64, error) { returns := s.Called(nodeID, from) - return returns.Get(0).([]*Reservation), returns.Error(1) + return returns.Get(0).([]*Reservation), uint64(returns.Get(1).(int)), returns.Error(2) } func TestHTTPReservationSource(t *testing.T) { @@ -32,7 +32,7 @@ func TestHTTPReservationSource(t *testing.T) { Return([]*Reservation{ &Reservation{ID: "1-1"}, &Reservation{ID: "1-2"}, - }, ErrPollEOS) + }, 1, ErrPollEOS) reservations := []*Reservation{} for res := range chn { @@ -60,13 +60,13 @@ func TestHTTPReservationSourceMultiple(t *testing.T) { Return([]*Reservation{ &Reservation{ID: "1-1"}, &Reservation{ID: "2-1"}, - }, nil) // return nil error so it tries again + }, 2, nil) // return nil error so it tries again store.On("Poll", nodeID, uint64(3)). Return([]*Reservation{ &Reservation{ID: "3-1"}, &Reservation{ID: "4-1"}, - }, ErrPollEOS) + }, 6, ErrPollEOS) reservations := []*Reservation{} for res := range chn { @@ -86,9 +86,9 @@ type TestTrackSource struct { Calls []int64 } -func (s *TestTrackSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, error) { +func (s *TestTrackSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, uint64, error) { if s.ID == s.Max { - return nil, ErrPollEOS + return nil, 0, ErrPollEOS } s.Calls = append(s.Calls, time.Now().Unix()) @@ -101,7 +101,7 @@ func (s *TestTrackSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservati &Reservation{ ID: fmt.Sprint(s.ID, "-", "0"), }, - }, nil + }, s.ID, nil } func TestHTTPReservationSourceSleep(t *testing.T) {