Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/shirou/gopsutil v2.19.11+incompatible
github.com/stretchr/testify v1.5.1
github.com/termie/go-shutil v0.0.0-20140729215957-bcacb06fecae
github.com/threefoldtech/tfexplorer v0.2.7-0.20200429090008-f462f9a8d76c
github.com/threefoldtech/tfexplorer v0.2.7-0.20200505141628-4d7a0c749784
github.com/threefoldtech/zbus v0.1.3
github.com/urfave/cli v1.22.3
github.com/vishvananda/netlink v1.0.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,10 @@ github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG
github.com/termie/go-shutil v0.0.0-20140729215957-bcacb06fecae h1:vgGSvdW5Lqg+I1aZOlG32uyE6xHpLdKhZzcTEktz5wM=
github.com/termie/go-shutil v0.0.0-20140729215957-bcacb06fecae/go.mod h1:quDq6Se6jlGwiIKia/itDZxqC5rj6/8OdFyMMAwTxCs=
github.com/threefoldtech/tfexplorer v0.2.5/go.mod h1:TugylEDgMNKk4ZIzee9PpVmIcLNLaRLdiXuhHTt/AR0=
github.com/threefoldtech/tfexplorer v0.2.7-0.20200429090008-f462f9a8d76c h1:5tx6tw6P2uzQCCAAsjubA7p4ozKJXV0TMedIPCbGDgk=
github.com/threefoldtech/tfexplorer v0.2.7-0.20200429090008-f462f9a8d76c/go.mod h1:ozAe2OYR4ALp2PtyKT1A6UiTI+0ZcFU2UfUaRd1P5eE=
github.com/threefoldtech/tfexplorer v0.2.7-0.20200505125555-7d20b5212c0e h1:HqNiMBz0RpYg8RBDQl67soy2WPjAWHCBPzb7jl1Dtxc=
github.com/threefoldtech/tfexplorer v0.2.7-0.20200505125555-7d20b5212c0e/go.mod h1:ozAe2OYR4ALp2PtyKT1A6UiTI+0ZcFU2UfUaRd1P5eE=
github.com/threefoldtech/tfexplorer v0.2.7-0.20200505141628-4d7a0c749784 h1:CCyZTGGV7BVVIGDduxLtebFsVHxJt6i2Z7rq6ostfmU=
github.com/threefoldtech/tfexplorer v0.2.7-0.20200505141628-4d7a0c749784/go.mod h1:ozAe2OYR4ALp2PtyKT1A6UiTI+0ZcFU2UfUaRd1P5eE=
github.com/threefoldtech/zbus v0.1.3 h1:18DnIzximRbATle5ZdZz0i84n/bCYB8k/gkhr2dXayc=
github.com/threefoldtech/zbus v0.1.3/go.mod h1:ZtiRpcqzEBJetVQDsEbw0p48h/AF3O1kf0tvd30I0BU=
github.com/threefoldtech/zos v0.2.4-rc2/go.mod h1:7A2oflcmSVsHFC4slOcydWgJyFBMFMH9wsaTRv+CnTA=
Expand Down
10 changes: 5 additions & 5 deletions pkg/provision/explorer/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ func NewPoller(cl *client.Client, inputConv provision.ReservationConverterFunc,
}

// Poll implements provision.ReservationPoller
func (r *Poller) Poll(nodeID pkg.Identifier, from uint64) ([]*provision.Reservation, error) {
func (r *Poller) Poll(nodeID pkg.Identifier, from uint64) ([]*provision.Reservation, uint64, error) {

list, err := r.wl.Workloads(nodeID.Identity(), from)
list, lastID, err := r.wl.Workloads(nodeID.Identity(), from)
if err != nil {
return nil, fmt.Errorf("error while retrieving workloads from explorer: %w", err)
return nil, 0, fmt.Errorf("error while retrieving workloads from explorer: %w", err)
}

result := make([]*provision.Reservation, 0, len(list))
for _, wl := range list {
r, err := r.inputConv(wl)
if err != nil {
return nil, err
return nil, 0, err
}

result = append(result, r)
Expand All @@ -54,5 +54,5 @@ func (r *Poller) Poll(nodeID pkg.Identifier, from uint64) ([]*provision.Reservat
})
}

return result, nil
return result, lastID, nil
}
2 changes: 0 additions & 2 deletions pkg/provision/primitives/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,6 @@ func WorkloadToProvisionType(w workloads.ReservationWorkload) (*provision.Reserv
if err != nil {
return nil, err
}
case nil:
// no content, no conversion
default:
return nil, fmt.Errorf("unknown workload type (%s) (%T)", w.Type.String(), tmp)
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/provision/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ import (
// ReservationType type
type ReservationType string

const (
//NOOPReservation type
NOOPReservation ReservationType = "noop"
)

// Reservation struct
type Reservation struct {
// ID of the reservation
Expand Down
24 changes: 5 additions & 19 deletions pkg/provision/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ReservationPoller interface {
// from is the used as a filter to which reservation to use as
// reservation.ID >= from. So a client to the Poll method should make
// sure to call it with the last (MAX) reservation ID he receieved.
Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, error)
Poll(nodeID pkg.Identifier, from uint64) (reservations []*Reservation, lastID uint64, err error)
}

// PollSource does a long poll on address to get new and to be deleted
Expand All @@ -50,7 +50,6 @@ type pollSource struct {

func (s *pollSource) Reservations(ctx context.Context) <-chan *Reservation {
ch := make(chan *Reservation)

// on the first run we will get all the reservation
// ever made to this know, to make sure we provision
// everything at boot
Expand All @@ -65,39 +64,26 @@ func (s *pollSource) Reservations(ctx context.Context) <-chan *Reservation {
on = time.Now().Add(s.maxSleep)
log.Info().Uint64("next", next).Msg("Polling for reservations")

res, err := s.store.Poll(pkg.StrIdentifier(s.nodeID), next)
res, lastID, err := s.store.Poll(pkg.StrIdentifier(s.nodeID), next)
if err != nil && err != ErrPollEOS {
// if this is not a temporary error, then skip the reservation entirely
// and try to get the next one
if shouldRetry(err) {
log.Error().Err(err).Uint64("next", next).Msg("failed to get reservation, retry same")
} else {
log.Error().Err(err).Uint64("next", next).Msg("failed to get reservation")
next++
next = lastID + 1
}
continue
}

next = lastID + 1

select {
case <-ctx.Done():
return
default:
for _, r := range res {
current, _, err := r.SplitID()
if err != nil {
log.Warn().Err(err).Str("id", r.ID).Msg("skipping reservation")
continue
}
if current >= next {
next = current + 1
}

if r.Type == NOOPReservation {
// special type of reservation that does nothing
// we just ignore it here.
continue
}

ch <- r
}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/provision/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type TestPollSource struct {
mock.Mock
}

func (s *TestPollSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, error) {
func (s *TestPollSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, uint64, error) {
returns := s.Called(nodeID, from)
return returns.Get(0).([]*Reservation), returns.Error(1)
return returns.Get(0).([]*Reservation), uint64(returns.Get(1).(int)), returns.Error(2)
}

func TestHTTPReservationSource(t *testing.T) {
Expand All @@ -32,7 +32,7 @@ func TestHTTPReservationSource(t *testing.T) {
Return([]*Reservation{
&Reservation{ID: "1-1"},
&Reservation{ID: "1-2"},
}, ErrPollEOS)
}, 1, ErrPollEOS)

reservations := []*Reservation{}
for res := range chn {
Expand Down Expand Up @@ -60,13 +60,13 @@ func TestHTTPReservationSourceMultiple(t *testing.T) {
Return([]*Reservation{
&Reservation{ID: "1-1"},
&Reservation{ID: "2-1"},
}, nil) // return nil error so it tries again
}, 2, nil) // return nil error so it tries again

store.On("Poll", nodeID, uint64(3)).
Return([]*Reservation{
&Reservation{ID: "3-1"},
&Reservation{ID: "4-1"},
}, ErrPollEOS)
}, 6, ErrPollEOS)

reservations := []*Reservation{}
for res := range chn {
Expand All @@ -86,9 +86,9 @@ type TestTrackSource struct {
Calls []int64
}

func (s *TestTrackSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, error) {
func (s *TestTrackSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, uint64, error) {
if s.ID == s.Max {
return nil, ErrPollEOS
return nil, 0, ErrPollEOS
}

s.Calls = append(s.Calls, time.Now().Unix())
Expand All @@ -101,7 +101,7 @@ func (s *TestTrackSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservati
&Reservation{
ID: fmt.Sprint(s.ID, "-", "0"),
},
}, nil
}, s.ID, nil
}

func TestHTTPReservationSourceSleep(t *testing.T) {
Expand Down