From 573a654ca930f040b721101c54cf344f1cdd88ce Mon Sep 17 00:00:00 2001 From: Christophe de Carvalho Date: Wed, 30 Oct 2019 17:19:07 +0100 Subject: [PATCH 1/4] provisiond: fix the way how the reservation source poll for new reservation fixes #353 --- pkg/gedis/gedis.go | 2 +- pkg/gedis/gedis_test.go | 2 +- pkg/provision/source.go | 13 ++++++++----- pkg/zdb/admin.go | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/gedis/gedis.go b/pkg/gedis/gedis.go index 60c3ed234..dc12cee93 100644 --- a/pkg/gedis/gedis.go +++ b/pkg/gedis/gedis.go @@ -7,7 +7,7 @@ import ( "net/url" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/pkg/errors" ) diff --git a/pkg/gedis/gedis_test.go b/pkg/gedis/gedis_test.go index 8b785f7de..ae535456d 100644 --- a/pkg/gedis/gedis_test.go +++ b/pkg/gedis/gedis_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/stretchr/testify/mock" ) diff --git a/pkg/provision/source.go b/pkg/provision/source.go index 29b3a6f74..b7f3373bd 100644 --- a/pkg/provision/source.go +++ b/pkg/provision/source.go @@ -55,18 +55,21 @@ func (s *pollSource) Reservations(ctx context.Context) <-chan *Reservation { all := true go func() { next := time.Now() + last := time.Now() defer close(ch) + for { - time.Sleep(next.Sub(time.Now())) - since := next - next = time.Now().Add(s.maxSleep) + now := time.Now() + time.Sleep(next.Sub(now)) + next = now.Add(s.maxSleep) - res, err := s.store.Poll(pkg.StrIdentifier(s.nodeID), all, since) + res, err := s.store.Poll(pkg.StrIdentifier(s.nodeID), all, last) if err != nil && err != ErrPollEOS { log.Error().Err(err).Msg("failed to get reservation") - time.Sleep(time.Second * 20) + continue } + last = time.Now() all = false select { diff --git a/pkg/zdb/admin.go b/pkg/zdb/admin.go index e94bdee03..eadc8307f 100644 --- a/pkg/zdb/admin.go +++ b/pkg/zdb/admin.go @@ -3,7 +3,7 @@ package zdb import ( "fmt" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) // CreateNamespace creates a new namespace. Only admin can do this. From 75a68449a8eaccf0add787248f9e35bd25abb5a4 Mon Sep 17 00:00:00 2001 From: Christophe de Carvalho Date: Thu, 31 Oct 2019 08:24:12 +0100 Subject: [PATCH 2/4] change version of github.com/gomodule/redigo to v1.7.0 + go mod tidy For some reason the latest version of redigo is 1.7.0 and not 2.0.0 to fixe #355 https://github.com/gomodule/redigo/commit/02dc2736dbc3a4a56907b0d5a9fead2f71b499db was needed fixes #355 --- go.mod | 19 +++++++++++++++---- go.sum | 22 ++++------------------ 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 280f87203..ad6fb5829 100644 --- a/go.mod +++ b/go.mod @@ -10,31 +10,42 @@ require ( github.com/blang/semver v3.5.1+incompatible github.com/cenkalti/backoff v2.2.1+incompatible github.com/cenkalti/backoff/v3 v3.0.0 + github.com/containerd/cgroups v0.0.0-20190911145653-fc51bcbe4714 // indirect github.com/containerd/containerd v1.2.3 + github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 // indirect + github.com/containerd/cri v1.11.1 // indirect + github.com/containerd/fifo v0.0.0-20190816180239-bda0ff6ed73c // indirect + github.com/containerd/typeurl v0.0.0-20190911142611-5eb25027c9fd // indirect github.com/containernetworking/cni v0.7.1 github.com/containernetworking/plugins v0.8.2 github.com/dave/jennifer v1.3.0 github.com/deckarep/golang-set v1.7.1 github.com/dgrijalva/jwt-go v3.2.0+incompatible - github.com/garyburd/redigo v1.6.0 - github.com/go-check/check v0.0.0-20190902080502-41f04d3bba15 // indirect + github.com/docker/distribution v2.7.1+incompatible // indirect + github.com/docker/docker v1.13.1 // indirect + github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect github.com/go-ole/go-ole v1.2.4 // indirect github.com/go-redis/redis v6.15.5+incompatible - github.com/gomodule/redigo v2.0.0+incompatible + github.com/gogo/googleapis v1.3.0 // indirect + github.com/gomodule/redigo v1.7.0 github.com/google/shlex v0.0.0-20181106134648-c34317bd91bf github.com/google/uuid v1.1.1 github.com/gorilla/handlers v1.4.2 github.com/gorilla/mux v1.7.3 github.com/iancoleman/strcase v0.0.0-20190422225806-e506e3ef7365 github.com/jbenet/go-base58 v0.0.0-20150317085156-6237cf65f3a6 + github.com/opencontainers/go-digest v1.0.0-rc1 // indirect + github.com/opencontainers/image-spec v1.0.1 // indirect + github.com/opencontainers/runc v0.1.1 // indirect github.com/opencontainers/runtime-spec v1.0.1 github.com/pkg/errors v0.8.1 github.com/rs/zerolog v1.15.0 github.com/shirou/gopsutil v2.18.12+incompatible + github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect github.com/stretchr/testify v1.3.0 + github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2 // indirect github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8 github.com/threefoldtech/zbus v0.1.1 - github.com/threefoldtech/zosv2/modules v0.0.0-20191001184911-b6d8bc81a7f0 github.com/urfave/cli v1.22.1 github.com/vishvananda/netlink v1.0.0 github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect diff --git a/go.sum b/go.sum index e697c16a1..83b1ff7bc 100644 --- a/go.sum +++ b/go.sum @@ -65,11 +65,8 @@ github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c h1:+pKlWGMw7gf6bQ github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA= github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/dspinhirne/netaddr-go v0.0.0-20180510133009-a6cfb692cb10/go.mod h1:qYpr/lzZIoEWpzbsTHa3Tl9V+g2sN/MAjkIyEItb7/g= github.com/garyburd/redigo v1.6.0 h1:0VruCpn7yAIIu7pWVClQC8wxCJEcG3nyzpMSHKi1PQc= github.com/garyburd/redigo v1.6.0/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= -github.com/go-check/check v0.0.0-20190902080502-41f04d3bba15 h1:xJdCV5uP69sUzCIIzmhAw6EKKdVk3Tu48oLzM86+XPI= -github.com/go-check/check v0.0.0-20190902080502-41f04d3bba15/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-redis/redis v6.15.5+incompatible h1:pLky8I0rgiblWfa8C1EV7fPEUv0aH6vKRaYHc/YRHVk= @@ -90,8 +87,8 @@ github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= -github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= +github.com/gomodule/redigo v1.7.0 h1:ZKld1VOtsGhAe37E7wMxEDgAlGM5dvFY+DiOhSkhP9Y= +github.com/gomodule/redigo v1.7.0/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -159,6 +156,8 @@ github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8 h1:2c1EFnZHIPCW8q github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4= github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM= github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 h1:udFKJ0aHUL60LboW/A+DfgoHVedieIzIXE8uylPue0U= +github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc= github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.0.6 h1:hcP1GmhGigz/O7h1WVUM5KklBp1JoNS9FggWKdj/j3s= @@ -171,13 +170,8 @@ github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2 h1:b6uOv7YOFK0 github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8 h1:RB0v+/pc8oMzPsN97aZYEwNuJ6ouRJ2uhjxemJ9zvrY= github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8/go.mod h1:IlWNj9v/13q7xFbaK4mbyzMNwrZLaWSHx/aibKIZuIg= -github.com/threefoldtech/zbus v0.0.0-20190711124326-09379d5f12e0 h1:k+e+laNT3indP3ToSnw4yrD9d9nDTOjLKlc11QzspFo= -github.com/threefoldtech/zbus v0.0.0-20190711124326-09379d5f12e0/go.mod h1:ZtiRpcqzEBJetVQDsEbw0p48h/AF3O1kf0tvd30I0BU= github.com/threefoldtech/zbus v0.1.1 h1:GjkWaBq6hYOc4mPAU0ltNjAmCpXPwWs+y+ka1HqIEFM= github.com/threefoldtech/zbus v0.1.1/go.mod h1:ZtiRpcqzEBJetVQDsEbw0p48h/AF3O1kf0tvd30I0BU= -github.com/threefoldtech/zosv2 v0.0.0-20191002114057-b953302b36fc h1:J2lqgqkT3LmbbzMb4vMf1RHmSOIMed+0QY2HFbTmTRc= -github.com/threefoldtech/zosv2/modules v0.0.0-20191001184911-b6d8bc81a7f0 h1:97hA6GlUHa+QBspggGLZvOtX9r9k+dgXSQzjjAiASGg= -github.com/threefoldtech/zosv2/modules v0.0.0-20191001184911-b6d8bc81a7f0/go.mod h1:Te9x3NPyMBxjdALwqX5I8ksUUCte6Fv7FFISIKAYus4= github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= @@ -206,8 +200,6 @@ golang.org/x/net v0.0.0-20190509222800-a4d6f7feada5/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190514140710-3ec191127204/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= -golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk= -golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191021144547-ec77196f6094 h1:5O4U9trLjNpuhpynaDsqwCk+Tw6seqJz1EbqbnzHrc8= golang.org/x/net v0.0.0-20191021144547-ec77196f6094/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -220,7 +212,6 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190514135907-3a4b5fb9f71f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -234,21 +225,16 @@ golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b h1:mSUCVIwDx4hfXJfWsOPfdzEHxzb2Xjl6BQ8YgPnazQA= -golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.zx2c4.com/wireguard/wgctrl v0.0.0-20190607034155-226bf4e412cd h1:p1UpgxMNb+biOCHPGtOYD4ne6yVrV2LWAyDPaeaRGRo= golang.zx2c4.com/wireguard/wgctrl v0.0.0-20190607034155-226bf4e412cd/go.mod h1:IZOMt4p2Xgw4FEU9w2Fq579uV0SgT7h5zxJnDyUHg60= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4c= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= -google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.24.0 h1:vb/1TCsVn3DcJlQ0Gs1yB1pKI6Do2/QNwxdKqmc/b0s= google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA= gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo= From f8577722a442fcd3d9140cf26416a53ceae4a0a1 Mon Sep 17 00:00:00 2001 From: Muhamad Azmy Date: Thu, 31 Oct 2019 17:42:57 +0100 Subject: [PATCH 3/4] Fix Poll mechanism --- pkg/gedis/commands_provision.go | 14 +++++------- pkg/gedis/commands_provision_test.go | 22 +++++++++--------- pkg/provision/remote_store.go | 19 ++++++---------- pkg/provision/reservation.go | 21 +++++++++++++++++ pkg/provision/source.go | 30 +++++++++++++----------- pkg/provision/source_test.go | 34 ++++++++++++++-------------- 6 files changed, 79 insertions(+), 61 deletions(-) diff --git a/pkg/gedis/commands_provision.go b/pkg/gedis/commands_provision.go index 993d88a1f..fb4edf69b 100644 --- a/pkg/gedis/commands_provision.go +++ b/pkg/gedis/commands_provision.go @@ -80,17 +80,15 @@ func (g *Gedis) Get(id string) (*provision.Reservation, error) { return reservationFromSchema(workload) } -// Poll implements provision.ReservationPoller -func (g *Gedis) Poll(nodeID pkg.Identifier, all bool, since time.Time) ([]*provision.Reservation, error) { +// Poll retrieves reservations from BCDB. from acts like a cursor, first call should use +// 0 to retrieve everything. Next calls should use the last (MAX) ID of the previous poll. +// Note that from is a reservation ID not a workload ID. so user the Reservation.SplitID() method +// to get the reservation part. +func (g *Gedis) Poll(nodeID pkg.Identifier, from uint64) ([]*provision.Reservation, error) { - epoch := since.Unix() - // all means sends all reservation so we ask since the beginning of (unix) time - if all { - epoch = 0 - } result, err := Bytes(g.Send("workload_manager", "workloads_list", Args{ "node_id": nodeID.Identity(), - "epoch": epoch, + "from": from, })) if err != nil { diff --git a/pkg/gedis/commands_provision_test.go b/pkg/gedis/commands_provision_test.go index d657b16ed..46fbc804c 100644 --- a/pkg/gedis/commands_provision_test.go +++ b/pkg/gedis/commands_provision_test.go @@ -56,62 +56,62 @@ func TestProvisionPoll(t *testing.T) { } node := pkg.StrIdentifier("node-1") - time := time.Now() args := Args{ "node_id": node.Identity(), - "epoch": time.Unix(), + "from": 0, } - workloadVol := `{"workload_id": 10, "type": 0, "size": 100}` - workloadZdb := `{"workload_id": 10, "mode": 0, "size": 100}` + workloadVol := `{"workload_id": 1, "type": 0, "size": 100}` + workloadZdb := `{"workload_id": 1, "mode": 0, "size": 100}` conn.On("Do", "default.workload_manager.workloads_list", mustMarshal(t, args)). Return(mustMarshal(t, Args{ "workloads": []types.TfgridReservationWorkload1{ { - WorkloadID: "1", + WorkloadID: "1-1", Type: types.TfgridReservationWorkload1TypeVolume, Workload: json.RawMessage(workloadVol), }, { - WorkloadID: "2", + WorkloadID: "2-1", Type: types.TfgridReservationWorkload1TypeZdb, Workload: json.RawMessage(workloadZdb), }, }, }), nil) - reservations, err := gedis.Poll(node, false, time) //setting false to true will force epoch to 0 + reservations, err := gedis.Poll(node, 0) //setting false to true will force epoch to 0 require.NoError(err) require.Len(reservations, 2) + require.Equal(reservations[0].ID, "1-1") require.Equal(reservations[0].Type, provision.VolumeReservation) require.Equal(reservations[1].Type, provision.ZDBReservation) conn.AssertCalled(t, "Close") args = Args{ "node_id": node.Identity(), - "epoch": 0, + "from": 10, } conn.On("Do", "default.workload_manager.workloads_list", mustMarshal(t, args)). Return(mustMarshal(t, Args{ "workloads": []types.TfgridReservationWorkload1{ { - WorkloadID: "1", + WorkloadID: "1-1", Type: types.TfgridReservationWorkload1TypeVolume, Workload: json.RawMessage(workloadVol), }, { - WorkloadID: "2", + WorkloadID: "2-1", Type: types.TfgridReservationWorkload1TypeZdb, Workload: json.RawMessage(workloadZdb), }, }, }), nil) - reservations, err = gedis.Poll(node, true, time) + reservations, err = gedis.Poll(node, 10) require.NoError(err) require.Len(reservations, 2) diff --git a/pkg/provision/remote_store.go b/pkg/provision/remote_store.go index 710c75506..f0080a77b 100644 --- a/pkg/provision/remote_store.go +++ b/pkg/provision/remote_store.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" "net/url" - "time" "github.com/rs/zerolog/log" "github.com/threefoldtech/zos/pkg" @@ -51,22 +50,18 @@ func (s *HTTPStore) Reserve(r *Reservation, nodeID pkg.Identifier) (string, erro return resource, nil } -// Poll retrieves reservations from BCDB. If all is true, it returns all the reservations -// for this node. -// otherwise it returns only the reservation never sent yet or the reservation that needs to be deleted -// and do long polling -func (s *HTTPStore) Poll(nodeID pkg.Identifier, all bool, since time.Time) ([]*Reservation, error) { +// Poll retrieves reservations from BCDB. from acts like a cursor, first call should use +// 0 to retrieve everything. Next calls should use the last (MAX) ID of the previous poll. +// Note that from is a reservation ID not a workload ID. so user the Reservation.SplitID() method +// to get the reservation part. +func (s *HTTPStore) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, error) { u, err := url.Parse(fmt.Sprintf("%s/reservations/%s/poll", s.baseURL, nodeID.Identity())) if err != nil { return nil, err } q := u.Query() - if all { - q.Add("all", "true") - } - if since.Unix() > 0 { - q.Add("since", fmt.Sprintf("%d", since.Unix())) - } + q.Add("from", fmt.Sprintf("%d", from)) + u.RawQuery = q.Encode() log.Info().Str("url", u.String()).Msg("fetching") diff --git a/pkg/provision/reservation.go b/pkg/provision/reservation.go index f4e85dc0d..753859da5 100644 --- a/pkg/provision/reservation.go +++ b/pkg/provision/reservation.go @@ -4,8 +4,11 @@ import ( "bytes" "encoding/json" "fmt" + "strconv" + "strings" "time" + "github.com/pkg/errors" "github.com/threefoldtech/zos/pkg/versioned" ) @@ -55,6 +58,24 @@ type Reservation struct { ToDelete bool `json:"to_delete"` } +//SplitID gets the reservation part and the workload part from a full reservation ID +func (r *Reservation) SplitID() (reservation uint64, workload uint64, err error) { + parts := strings.SplitN(r.ID, "-", 2) + if len(parts) != 2 { + return reservation, workload, fmt.Errorf("invalid reservation id format (wront length)") + } + reservation, err = strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return reservation, workload, errors.Wrap(err, "invalid reservation id format (reservation part)") + } + workload, err = strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return reservation, workload, errors.Wrap(err, "invalid reservation id format (workload part)") + } + + return +} + // Expired returns a boolean depending if the reservation // has expire or not at the time of the function call func (r *Reservation) Expired() bool { diff --git a/pkg/provision/source.go b/pkg/provision/source.go index b7f3373bd..0f1ba5e18 100644 --- a/pkg/provision/source.go +++ b/pkg/provision/source.go @@ -27,9 +27,10 @@ var ( // to poll the BCDB for new reservation type ReservationPoller interface { // Poll ask the store to send us reservation for a specific node ID - // if all is true, the store sends all the reservation every registered for the node ID - // otherwise it just sends reservation not pulled yet. - Poll(nodeID pkg.Identifier, all bool, since time.Time) ([]*Reservation, error) + // from is the used as a filter to which reservation to use as + // reservation.ID > from. So a client to the Poll method should make + // sure to call it with the last (MAX) reservation ID he receieved. + Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, error) } // PollSource does a long poll on address to get new and to be deleted @@ -52,31 +53,34 @@ func (s *pollSource) Reservations(ctx context.Context) <-chan *Reservation { // ever made to this know, to make sure we provision // everything at boot // after that, we only ask for the new reservations - all := true go func() { - next := time.Now() - last := time.Now() defer close(ch) - + var from uint64 + last := time.Now() for { now := time.Now() - time.Sleep(next.Sub(now)) - next = now.Add(s.maxSleep) + time.Sleep(last.Sub(now)) + last = now.Add(s.maxSleep) - res, err := s.store.Poll(pkg.StrIdentifier(s.nodeID), all, last) + res, err := s.store.Poll(pkg.StrIdentifier(s.nodeID), from) if err != nil && err != ErrPollEOS { log.Error().Err(err).Msg("failed to get reservation") continue } - last = time.Now() - all = false - select { case <-ctx.Done(): return default: for _, r := range res { + next, _, err := r.SplitID() + if err != nil { + log.Warn().Err(err).Str("id", r.ID).Msg("skipping reservation") + continue + } + if next > from { + from = next + } ch <- r } } diff --git a/pkg/provision/source_test.go b/pkg/provision/source_test.go index d042abc90..76e42585f 100644 --- a/pkg/provision/source_test.go +++ b/pkg/provision/source_test.go @@ -14,8 +14,8 @@ type TestPollSource struct { mock.Mock } -func (s *TestPollSource) Poll(nodeID pkg.Identifier, all bool, since time.Time) ([]*Reservation, error) { - returns := s.Called(nodeID, all, since) +func (s *TestPollSource) Poll(nodeID pkg.Identifier, from uint64) ([]*Reservation, error) { + returns := s.Called(nodeID, from) return returns.Get(0).([]*Reservation), returns.Error(1) } @@ -27,10 +27,10 @@ func TestHTTPReservationSource(t *testing.T) { source := PollSource(&store, nodeID) chn := source.Reservations(context.Background()) - store.On("Poll", nodeID, true, mock.Anything). + store.On("Poll", nodeID, uint64(0)). Return([]*Reservation{ - &Reservation{ID: "res-1"}, - &Reservation{ID: "res-2"}, + &Reservation{ID: "1-1"}, + &Reservation{ID: "1-2"}, }, ErrPollEOS) reservations := []*Reservation{} @@ -39,8 +39,8 @@ func TestHTTPReservationSource(t *testing.T) { } require.Len(reservations, 2) - require.Equal("res-1", reservations[0].ID) - require.Equal("res-2", reservations[1].ID) + require.Equal("1-1", reservations[0].ID) + require.Equal("1-2", reservations[1].ID) } func TestHTTPReservationSourceMultiple(t *testing.T) { @@ -55,16 +55,16 @@ func TestHTTPReservationSourceMultiple(t *testing.T) { chn := source.Reservations(context.Background()) - store.On("Poll", nodeID, true, mock.Anything). + store.On("Poll", nodeID, uint64(0)). Return([]*Reservation{ - &Reservation{ID: "res-1"}, - &Reservation{ID: "res-2"}, + &Reservation{ID: "1-1"}, + &Reservation{ID: "2-1"}, }, nil) // return nil error so it tries again - store.On("Poll", nodeID, false, mock.Anything). + store.On("Poll", nodeID, uint64(2)). Return([]*Reservation{ - &Reservation{ID: "res-3"}, - &Reservation{ID: "res-4"}, + &Reservation{ID: "3-1"}, + &Reservation{ID: "4-1"}, }, ErrPollEOS) reservations := []*Reservation{} @@ -73,8 +73,8 @@ func TestHTTPReservationSourceMultiple(t *testing.T) { } require.Len(reservations, 4) - require.Equal("res-1", reservations[0].ID) - require.Equal("res-2", reservations[1].ID) - require.Equal("res-3", reservations[2].ID) - require.Equal("res-4", reservations[3].ID) + require.Equal("1-1", reservations[0].ID) + require.Equal("2-1", reservations[1].ID) + require.Equal("3-1", reservations[2].ID) + require.Equal("4-1", reservations[3].ID) } From 5c50099221bd317a9bda1f35cc93b2a76016a1fe Mon Sep 17 00:00:00 2001 From: Christophe de Carvalho Date: Fri, 1 Nov 2019 12:19:23 +0100 Subject: [PATCH 4/4] bcdb_mock: use reservation id as cursor --- tools/bcdb_mock/error.go | 14 ++++--------- tools/bcdb_mock/reservation_handlers.go | 27 ++++++++++--------------- tools/bcdb_mock/reservation_store.go | 14 ++++++++----- 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/tools/bcdb_mock/error.go b/tools/bcdb_mock/error.go index d872c4c38..fb7311f9f 100644 --- a/tools/bcdb_mock/error.go +++ b/tools/bcdb_mock/error.go @@ -2,22 +2,16 @@ package main import ( "encoding/json" - "fmt" "net/http" ) func httpError(w http.ResponseWriter, err error, code int) { - b, err := json.Marshal(struct { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") + w.WriteHeader(code) + json.NewEncoder(w).Encode(struct { Error error }{ Error: err, }) - if err != nil { - panic(err) - } - - w.Header().Set("Content-Type", "application/json; charset=utf-8") - w.Header().Set("X-Content-Type-Options", "nosniff") - w.WriteHeader(code) - fmt.Fprintln(w, b) } diff --git a/tools/bcdb_mock/reservation_handlers.go b/tools/bcdb_mock/reservation_handlers.go index 9779ef0ab..0a6f34f6a 100644 --- a/tools/bcdb_mock/reservation_handlers.go +++ b/tools/bcdb_mock/reservation_handlers.go @@ -36,34 +36,29 @@ func (s *reservationsStore) reserve(w http.ResponseWriter, r *http.Request) { func (s *reservationsStore) poll(w http.ResponseWriter, r *http.Request) { nodeID := mux.Vars(r)["node_id"] - var since time.Time - sinceStr := r.URL.Query().Get("since") - if sinceStr == "" { - // if since is not specificed, send all reservation since last hour - since = time.Now().Add(-time.Hour) - } else { - timestamp, err := strconv.ParseInt(sinceStr, 10, 64) + + var ( + from = uint64(0) + err error + ) + fromStr := r.URL.Query().Get("from") + if fromStr != "" { + from, err = strconv.ParseUint(fromStr, 10, 64) if err != nil { http.Error(w, "since query argument format not valid", http.StatusBadRequest) return } - since = time.Unix(timestamp, 0) - } - - all, err := strconv.ParseBool(r.URL.Query().Get("all")) - if err != nil { - all = false } output := []*provision.Reservation{} - if all { + if from == 0 { // just get all reservation for this nodeID - output = s.GetReservations(nodeID, all, since) + output = s.GetReservations(nodeID, from) } else { // otherwise start long polling timeout := time.Now().Add(time.Second * 20) for { - output = s.GetReservations(nodeID, all, since) + output = s.GetReservations(nodeID, from) if len(output) > 0 { break } diff --git a/tools/bcdb_mock/reservation_store.go b/tools/bcdb_mock/reservation_store.go index 0d246635e..169335030 100644 --- a/tools/bcdb_mock/reservation_store.go +++ b/tools/bcdb_mock/reservation_store.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "sync" - "time" "github.com/threefoldtech/zos/pkg/provision" ) @@ -80,7 +79,7 @@ func (s *reservationsStore) Get(ID string) (*reservation, error) { func (s *reservationsStore) Add(nodeID string, res *provision.Reservation) error { s.m.Lock() defer s.m.Unlock() - res.ID = fmt.Sprintf("r-%d", len(s.Reservations)) + res.ID = fmt.Sprintf("%d-1", len(s.Reservations)) s.Reservations = append(s.Reservations, &reservation{ NodeID: nodeID, Reservation: res, @@ -88,7 +87,7 @@ func (s *reservationsStore) Add(nodeID string, res *provision.Reservation) error return nil } -func (s *reservationsStore) GetReservations(nodeID string, all bool, since time.Time) []*provision.Reservation { +func (s *reservationsStore) GetReservations(nodeID string, from uint64) []*provision.Reservation { output := []*provision.Reservation{} s.m.RLock() @@ -100,8 +99,13 @@ func (s *reservationsStore) GetReservations(nodeID string, all bool, since time. continue } - if all || - (!r.Reservation.Expired() && since.Before(r.Reservation.Created)) || + resID, _, err := r.Reservation.SplitID() + if err != nil { + continue + } + + if from == 0 || + (!r.Reservation.Expired() && resID > from) || (r.Reservation.ToDelete && !r.Deleted) { output = append(output, r.Reservation) }