diff --git a/cmds/capacityd/main.go b/cmds/capacityd/main.go index fadaddfae..0a8807410 100644 --- a/cmds/capacityd/main.go +++ b/cmds/capacityd/main.go @@ -2,8 +2,13 @@ package main import ( "flag" + "os" "time" + "github.com/rs/zerolog" + + "github.com/cenkalti/backoff/v3" + "github.com/pkg/errors" "github.com/threefoldtech/zos/pkg/capacity" "github.com/threefoldtech/zos/pkg/environment" @@ -32,6 +37,8 @@ func main() { version.ShowAndExit(false) } + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) + redis, err := zbus.NewRedisClient(msgBrokerCon) if err != nil { log.Fatal().Msgf("fail to connect to message broker server: %v", err) @@ -68,13 +75,33 @@ func main() { log.Fatal().Err(err).Msgf("failed to write resources capacity on BCDB") } - for { - <-time.After(time.Minute * 10) + sendUptime := func() error { + uptime, err := r.Uptime() + if err != nil { + log.Error().Err(err).Msgf("failed to read uptime") + return err + } log.Info().Msg("send heart-beat to BCDB") - if err := store.Ping(identity.NodeID()); err != nil { + if err := store.Ping(identity.NodeID(), uptime); err != nil { log.Error().Err(err).Msgf("failed to send heart-beat to BCDB") + return err + } + return nil + } + if err := sendUptime(); err != nil { + log.Fatal().Err(err).Send() + } + + for { + next := time.Now().Add(time.Minute * 10) + + if time.Now().After(next) { + backoff.Retry(sendUptime, backoff.NewExponentialBackOff()) + continue } + + time.Sleep(time.Minute) } } diff --git a/cmds/identityd/main.go b/cmds/identityd/main.go index d141bcc36..7360811da 100644 --- a/cmds/identityd/main.go +++ b/cmds/identityd/main.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "github.com/threefoldtech/zos/pkg/gedis" + "github.com/threefoldtech/zos/pkg/geoip" "github.com/cenkalti/backoff/v3" "github.com/threefoldtech/zos/pkg" @@ -95,6 +96,10 @@ func main() { version.ShowAndExit(false) } + if err := os.MkdirAll(root, 0750); err != nil { + log.Fatal().Err(err).Str("root", root).Msg("failed to create root directory") + } + zinit, err := zinit.New(zinitSocket) if err != nil { log.Fatal().Err(err).Msg("failed to connect to zinit") @@ -148,8 +153,13 @@ func main() { log.Fatal().Err(err).Msg("failed to read farm ID") } + loc, err := geoip.Fetch() + if err != nil { + log.Fatal().Err(err).Msg("fetch location") + } + register := func(v string) error { - return registerNode(nodeID, farmID, v, idStore) + return registerNode(nodeID, farmID, v, idStore, loc) } if err := backoff.Retry(func() error { @@ -271,10 +281,10 @@ func bcdbClient() (identity.IDStore, error) { return store, nil } -func registerNode(nodeID, farmID pkg.Identifier, version string, store identity.IDStore) error { +func registerNode(nodeID, farmID pkg.Identifier, version string, store identity.IDStore, loc geoip.Location) error { log.Info().Str("version", version).Msg("start registration of the node") - _, err := store.RegisterNode(nodeID, farmID, version) + _, err := store.RegisterNode(nodeID, farmID, version, loc) if err != nil { return err } diff --git a/pkg/capacity/capacity.go b/pkg/capacity/capacity.go index c04f5508a..e662d3d9f 100644 --- a/pkg/capacity/capacity.go +++ b/pkg/capacity/capacity.go @@ -1,6 +1,7 @@ package capacity import ( + "github.com/shirou/gopsutil/host" "github.com/threefoldtech/zos/pkg" "github.com/threefoldtech/zos/pkg/capacity/dmi" ) @@ -51,3 +52,12 @@ func (r *ResourceOracle) Total() (c *Capacity, err error) { func (r *ResourceOracle) DMI() (*dmi.DMI, error) { return dmi.Decode() } + +// Uptime returns the uptime of the node +func (r *ResourceOracle) Uptime() (uint64, error) { + info, err := host.Info() + if err != nil { + return 0, err + } + return info.Uptime, nil +} diff --git a/pkg/capacity/store.go b/pkg/capacity/store.go index 7cd7291fb..7a26f8f7d 100644 --- a/pkg/capacity/store.go +++ b/pkg/capacity/store.go @@ -15,7 +15,7 @@ import ( // Store is an interface to the bcdb store to report capacity type Store interface { Register(nodeID pkg.Identifier, c Capacity, d dmi.DMI) error - Ping(nodeID pkg.Identifier) error + Ping(nodeID pkg.Identifier, uptime uint64) error } // BCDBStore implements the store interface using a gedis client to BCDB @@ -34,7 +34,9 @@ func (s *BCDBStore) Register(nodeID pkg.Identifier, c Capacity, d dmi.DMI) error } // Ping sends an heart-beat to BCDB -func (s *BCDBStore) Ping(nodeID pkg.Identifier) error { return nil } +func (s *BCDBStore) Ping(nodeID pkg.Identifier, uptime uint64) error { + return s.g.UptimeUpdate(nodeID, uptime) +} // HTTPStore implement the method to push capacity information to BCDB over HTTP type HTTPStore struct { @@ -74,4 +76,25 @@ func (s *HTTPStore) Register(nodeID pkg.Identifier, c Capacity, d dmi.DMI) error } // Ping sends an heart-beat to BCDB -func (s *HTTPStore) Ping(nodeID pkg.Identifier) error { return nil } +func (s *HTTPStore) Ping(nodeID pkg.Identifier, uptime uint64) error { + x := struct { + Uptime uint64 `json:"uptime"` + }{uptime} + + buf := bytes.Buffer{} + err := json.NewEncoder(&buf).Encode(x) + if err != nil { + return err + } + + url := fmt.Sprintf(s.baseURL+"/nodes/%s/uptime", nodeID.Identity()) + resp, err := http.Post(url, "application/json", &buf) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("wrong response status code received: %v", resp.Status) + } + + return nil +} diff --git a/pkg/environment/environment.go b/pkg/environment/environment.go index 3c1e79df6..bf193bce3 100644 --- a/pkg/environment/environment.go +++ b/pkg/environment/environment.go @@ -74,10 +74,9 @@ func getEnvironmentFromParams(params kernel.Params) Environment { var env Environment runmode, found := params.Get("runmode") - if !found { + if !found || len(runmode) < 1 { // Fallback to default production mode - runmode = make([]string, 1) - runmode[0] = string(RunningMain) + runmode = []string{string(RunningMain)} } switch RunningMode(runmode[0]) { @@ -91,8 +90,16 @@ func getEnvironmentFromParams(params kernel.Params) Environment { env = envProd } + if RunningMode(runmode[0]) == RunningDev { + //allow override of the bcdb url in dev mode + bcdb, found := params.Get("bcdb") + if found && len(bcdb) >= 1 { + env.BcdbURL = bcdb[0] + } + } + farmerID, found := params.Get("farmer_id") - if !found || farmerID[0] == "" { + if !found || len(farmerID) < 1 || farmerID[0] == "" { // fmt.Println("Warning: no valid farmer_id found in kernel parameter, fallback to orphanage") env.Orphan = true diff --git a/pkg/gedis/commands_capacity.go b/pkg/gedis/commands_capacity.go new file mode 100644 index 000000000..329c985fe --- /dev/null +++ b/pkg/gedis/commands_capacity.go @@ -0,0 +1,16 @@ +package gedis + +import ( + "github.com/threefoldtech/zos/pkg" +) + +//UptimeUpdate send the uptime of the node to BCDB +func (g *Gedis) UptimeUpdate(nodeID pkg.Identifier, uptime uint64) error { + + _, err := g.Send("nodes", "uptime_update", Args{ + "node_id": nodeID.Identity(), + "uptime": uptime, + }) + + return err +} diff --git a/pkg/gedis/commands_identity.go b/pkg/gedis/commands_identity.go index 5a67d0c08..b8e1f88b7 100644 --- a/pkg/gedis/commands_identity.go +++ b/pkg/gedis/commands_identity.go @@ -8,15 +8,13 @@ import ( "github.com/jbenet/go-base58" - "github.com/rs/zerolog/log" - "github.com/threefoldtech/zos/pkg/geoip" - "github.com/pkg/errors" "github.com/threefoldtech/zos/pkg/gedis/types/directory" "github.com/threefoldtech/zos/pkg/network" "github.com/threefoldtech/zos/pkg/network/types" "github.com/threefoldtech/zos/pkg" + "github.com/threefoldtech/zos/pkg/geoip" ) // @@ -24,12 +22,7 @@ import ( // //RegisterNode implements pkg.IdentityManager interface -func (g *Gedis) RegisterNode(nodeID, farmID pkg.Identifier, version string) (string, error) { - - l, err := geoip.Fetch() - if err != nil { - log.Error().Err(err).Msg("failed to get location of the node") - } +func (g *Gedis) RegisterNode(nodeID, farmID pkg.Identifier, version string, location geoip.Location) (string, error) { pk := base58.Decode(nodeID.Identity()) @@ -40,11 +33,11 @@ func (g *Gedis) RegisterNode(nodeID, farmID pkg.Identifier, version string) (str OsVersion: version, PublicKeyHex: hex.EncodeToString(pk), Location: directory.TfgridLocation1{ - Longitude: l.Longitute, - Latitude: l.Latitude, - Continent: l.Continent, - Country: l.Country, - City: l.City, + Longitude: location.Longitute, + Latitude: location.Latitude, + Continent: location.Continent, + Country: location.Country, + City: location.City, }, }, })) @@ -165,9 +158,7 @@ func nodeFromSchema(node directory.TfgridNode2) types.Node { }(), PublicConfig: func() *types.PubIface { cfg := node.PublicConfig - // This is a dirty hack because jsx schema cannot - // differentiate between an embed object not set or with default value - if cfg.Master == "" { + if cfg == nil { return nil } pub := types.PubIface{ @@ -182,12 +173,7 @@ func nodeFromSchema(node directory.TfgridNode2) types.Node { return &pub }(), - ExitNode: func() int { - if node.ExitNode { - return 1 - } - return 0 - }(), + WGPorts: node.WGPorts, } } diff --git a/pkg/gedis/commands_identity_test.go b/pkg/gedis/commands_identity_test.go index f64c44b24..211bb43f8 100644 --- a/pkg/gedis/commands_identity_test.go +++ b/pkg/gedis/commands_identity_test.go @@ -78,6 +78,7 @@ func TestRegisterNode(t *testing.T) { pkg.StrIdentifier("node-1"), pkg.StrIdentifier("farm-1"), "v1.1.0", + l, ) require.NoError(err) @@ -155,7 +156,7 @@ func TestGetFarm(t *testing.T) { conn.On("Do", "default.farms.get", mustMarshal(t, args)). Return(mustMarshal(t, directory.TfgridFarm1{ - ID: 100, + ID: "100", Name: "farm-1", }), nil) @@ -184,8 +185,8 @@ func TestListFarm(t *testing.T) { conn.On("Do", "default.farms.list", mustMarshal(t, args)). Return(mustMarshal(t, Args{"farms": []directory.TfgridFarm1{ - {ID: 1, Name: "farm-1"}, - {ID: 2, Name: "farm-2"}, + {ID: "1", Name: "farm-1"}, + {ID: "2", Name: "farm-2"}, }}), nil) nodes, err := gedis.ListFarm("eg", "cairo") diff --git a/pkg/gedis/types/directory/tfgrid_farm_1.go b/pkg/gedis/types/directory/tfgrid_farm_1.go index 6dccf9019..63c97926d 100644 --- a/pkg/gedis/types/directory/tfgrid_farm_1.go +++ b/pkg/gedis/types/directory/tfgrid_farm_1.go @@ -1,12 +1,8 @@ package directory -import ( - schema "github.com/threefoldtech/zos/pkg/schema" -) - //TfgridFarm1 jsx schema type TfgridFarm1 struct { - ID int64 `json:"id,omitempty"` + ID string `json:"id"` ThreebotID string `json:"threebot_id"` IyoOrganization string `json:"iyo_organization"` Name string `json:"name"` @@ -14,7 +10,6 @@ type TfgridFarm1 struct { Location TfgridLocation1 `json:"location"` Email string `json:"email"` ResourcePrices []TfgridNodeResourcePrice1 `json:"resource_prices"` - PrefixZero schema.IPRange `json:"prefix_zero"` } //TfgridNodeResourcePrice1 jsx schema diff --git a/pkg/gedis/types/directory/tfgrid_node_2.go b/pkg/gedis/types/directory/tfgrid_node_2.go index dc0422018..880a34646 100644 --- a/pkg/gedis/types/directory/tfgrid_node_2.go +++ b/pkg/gedis/types/directory/tfgrid_node_2.go @@ -1,8 +1,9 @@ package directory import ( - schema "github.com/threefoldtech/zos/pkg/schema" "net" + + schema "github.com/threefoldtech/zos/pkg/schema" ) //TfgridNode2 jsx schema @@ -20,8 +21,8 @@ type TfgridNode2 struct { ReservedResources TfgridNodeResourceAmount1 `json:"reserved_resources"` Proofs []TfgridNodeProof1 `json:"proofs"` Ifaces []TfgridNodeIface1 `json:"ifaces"` - PublicConfig TfgridNodePublicIface1 `json:"public_config"` - ExitNode bool `json:"exit_node"` + PublicConfig *TfgridNodePublicIface1 `json:"public_config,omitemtpy"` + WGPorts []uint `json:"wg_ports"` Approved bool `json:"approved"` PublicKeyHex string `json:"public_key_hex"` } diff --git a/pkg/identity/store.go b/pkg/identity/store.go index 1a684894f..cacf094b6 100644 --- a/pkg/identity/store.go +++ b/pkg/identity/store.go @@ -6,13 +6,17 @@ import ( "fmt" "net/http" + "github.com/threefoldtech/zos/pkg/gedis/types/directory" + + "github.com/threefoldtech/zos/pkg/geoip" + "github.com/threefoldtech/zos/pkg" ) // IDStore is the interface defining the // client side of an identity store type IDStore interface { - RegisterNode(node pkg.Identifier, farm pkg.Identifier, version string) (string, error) + RegisterNode(node pkg.Identifier, farm pkg.Identifier, version string, loc geoip.Location) (string, error) RegisterFarm(farm pkg.Identifier, name string, email string, wallet []string) (string, error) } @@ -26,16 +30,19 @@ func NewHTTPIDStore(url string) IDStore { } // RegisterNode implements the IDStore interface -func (s *httpIDStore) RegisterNode(node pkg.Identifier, farm pkg.Identifier, version string) (string, error) { +func (s *httpIDStore) RegisterNode(node pkg.Identifier, farm pkg.Identifier, version string, loc geoip.Location) (string, error) { buf := bytes.Buffer{} - err := json.NewEncoder(&buf).Encode(struct { - NodeID string `json:"node_id"` - FarmID string `json:"farm_id"` - Version string `json:"version"` - }{ - NodeID: node.Identity(), - FarmID: farm.Identity(), - Version: version, + err := json.NewEncoder(&buf).Encode(directory.TfgridNode2{ + NodeID: node.Identity(), + FarmID: farm.Identity(), + OsVersion: version, + Location: directory.TfgridLocation1{ + City: loc.City, + Country: loc.Country, + Continent: loc.Continent, + Latitude: loc.Latitude, + Longitude: loc.Longitute, + }, }) if err != nil { return "", err @@ -53,7 +60,7 @@ func (s *httpIDStore) RegisterNode(node pkg.Identifier, farm pkg.Identifier, ver } type farmRegisterReq struct { - ID string `json:"farm_id"` + ID string `json:"id"` Name string `json:"name"` } diff --git a/pkg/kernel/kernel.go b/pkg/kernel/kernel.go index 5927b8b32..5ea65a5cf 100644 --- a/pkg/kernel/kernel.go +++ b/pkg/kernel/kernel.go @@ -20,6 +20,8 @@ func (k Params) Exists(key string) bool { // Get returns the values link to a key and a boolean // boolean if true when the key exists in the params or false otherwise +// a nil list, and a true will be returned if the `key` is set in kernel params, but with +// no associated value func (k Params) Get(key string) ([]string, bool) { v, ok := k[key] return v, ok diff --git a/pkg/provision/engine.go b/pkg/provision/engine.go index 2944301e4..449b408ec 100644 --- a/pkg/provision/engine.go +++ b/pkg/provision/engine.go @@ -146,6 +146,8 @@ func (e *defaultEngine) decommission(ctx context.Context, r *Reservation) error } func (e *defaultEngine) reply(ctx context.Context, r *Reservation, rErr error, info interface{}) error { + log.Debug().Str("id", r.ID).Msg("sending reply for reservation") + zbus := GetZBus(ctx) identity := stubs.NewIdentityManagerStub(zbus) result := &Result{ diff --git a/pkg/provision/network.go b/pkg/provision/network.go index f774bb363..d87d1f795 100644 --- a/pkg/provision/network.go +++ b/pkg/provision/network.go @@ -26,7 +26,6 @@ func networkProvisionImpl(ctx context.Context, reservation *Reservation) error { mgr := stubs.NewNetworkerStub(GetZBus(ctx)) log.Debug().Str("network", fmt.Sprintf("%+v", network)).Msg("provision network") - log.Debug().Str("nr", fmt.Sprintf("%+v", network.NetResources[0])).Msg("provision network") _, err := mgr.CreateNR(*network) if err != nil { diff --git a/tools/bcdb_mock/error.go b/tools/bcdb_mock/error.go new file mode 100644 index 000000000..d872c4c38 --- /dev/null +++ b/tools/bcdb_mock/error.go @@ -0,0 +1,23 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" +) + +func httpError(w http.ResponseWriter, err error, code int) { + b, err := json.Marshal(struct { + Error error + }{ + Error: err, + }) + if err != nil { + panic(err) + } + + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") + w.WriteHeader(code) + fmt.Fprintln(w, b) +} diff --git a/tools/bcdb_mock/farms_handlers.go b/tools/bcdb_mock/farms_handlers.go new file mode 100644 index 000000000..55645a53e --- /dev/null +++ b/tools/bcdb_mock/farms_handlers.go @@ -0,0 +1,58 @@ +package main + +import ( + "encoding/json" + "log" + "net/http" + + "github.com/threefoldtech/zos/pkg/gedis/types/directory" + + "github.com/gorilla/mux" +) + +func (s *farmStore) registerFarm(w http.ResponseWriter, r *http.Request) { + log.Println("farm register request received") + + defer r.Body.Close() + + info := directory.TfgridFarm1{} + if err := json.NewDecoder(r.Body).Decode(&info); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := s.Add(info); err != nil { + httpError(w, err, http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusCreated) +} + +func (s *farmStore) listFarm(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(s.List()) +} + +func (s *farmStore) cockpitListFarm(w http.ResponseWriter, r *http.Request) { + x := struct { + Farms []*directory.TfgridFarm1 `json:"farms"` + }{s.List()} + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(x) +} + +func (s *farmStore) getFarm(w http.ResponseWriter, r *http.Request) { + name := mux.Vars(r)["farm_id"] + farm, err := s.Get(name) + if err != nil { + httpError(w, err, http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(farm) +} diff --git a/tools/bcdb_mock/farms_store.go b/tools/bcdb_mock/farms_store.go new file mode 100644 index 000000000..dadec468f --- /dev/null +++ b/tools/bcdb_mock/farms_store.go @@ -0,0 +1,87 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "sync" + + "github.com/threefoldtech/zos/pkg/gedis/types/directory" +) + +type farmStore struct { + Farms []*directory.TfgridFarm1 `json:"farms"` + m sync.RWMutex +} + +func loadfarmStore() (*farmStore, error) { + store := &farmStore{ + Farms: []*directory.TfgridFarm1{}, + } + f, err := os.OpenFile("farms.json", os.O_RDONLY, 0660) + if err != nil { + if os.IsNotExist(err) { + return store, nil + } + return store, err + } + defer f.Close() + if err := json.NewDecoder(f).Decode(&store); err != nil { + return store, err + } + return store, nil +} + +func (s *farmStore) Save() error { + s.m.RLock() + defer s.m.RUnlock() + + f, err := os.OpenFile("farms.json", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0660) + if err != nil { + return err + } + defer f.Close() + if err := json.NewEncoder(f).Encode(s); err != nil { + return err + } + return nil +} + +func (s *farmStore) List() []*directory.TfgridFarm1 { + s.m.RLock() + defer s.m.RUnlock() + + out := make([]*directory.TfgridFarm1, len(s.Farms)) + copy(out, s.Farms) + return out +} + +func (s *farmStore) Get(name string) (*directory.TfgridFarm1, error) { + s.m.RLock() + defer s.m.RUnlock() + + for _, f := range s.Farms { + if f.Name == name { + return f, nil + } + } + return nil, fmt.Errorf("farm %s not found", name) +} + +func (s *farmStore) Add(farm directory.TfgridFarm1) error { + s.m.Lock() + defer s.m.Unlock() + + for _, f := range s.Farms { + if f.Name == farm.Name { + f.WalletAddresses = farm.WalletAddresses + f.Location = farm.Location + f.Email = farm.Email + f.ResourcePrices = farm.ResourcePrices + return nil + } + } + + s.Farms = append(s.Farms, &farm) + return nil +} diff --git a/tools/bcdb_mock/main.go b/tools/bcdb_mock/main.go index 1ee4f87cc..a8caf6b65 100644 --- a/tools/bcdb_mock/main.go +++ b/tools/bcdb_mock/main.go @@ -2,62 +2,15 @@ package main import ( "context" - "encoding/json" "flag" "log" - "net" "net/http" "os" "os/signal" - "sync" "time" "github.com/gorilla/handlers" "github.com/gorilla/mux" - "github.com/threefoldtech/zos/pkg/capacity" - "github.com/threefoldtech/zos/pkg/network/types" - "github.com/threefoldtech/zos/pkg/provision" -) - -type farmInfo struct { - ID string `json:"farm_id"` - Name string `json:"name"` - ExitNodes []string `json:"exit_nodes"` -} - -type reservation struct { - Reservation *provision.Reservation `json:"reservation"` - Result *provision.Result `json:"result"` - Deleted bool `json:"deleted"` - NodeID string `json:"node_id"` -} - -type provisionStore struct { - sync.Mutex - Reservations []*reservation `json:"reservations"` -} - -type allocationStore struct { - sync.Mutex - Allocations map[string]*allocation `json:"allocations"` -} - -type allocation struct { - Allocation *net.IPNet - SubNetUsed []uint64 -} - -type node struct { - *types.Node - capacity.Capacity - Version string `json:"version"` -} - -var ( - nodeStore map[string]*node - farmStore map[string]*farmInfo - allocStore *allocationStore - provStore *provisionStore ) var listen string @@ -67,51 +20,65 @@ func main() { flag.Parse() - nodeStore = make(map[string]*node) - farmStore = make(map[string]*farmInfo) - allocStore = &allocationStore{Allocations: make(map[string]*allocation)} - provStore = &provisionStore{Reservations: make([]*reservation, 0, 20)} - - if err := load(); err != nil { - log.Fatalf("failed to load data: %v\n", err) + nodeStore, err := loadNodeStore() + if err != nil { + log.Fatalf("error loading node store: %v\n", err) } + farmStore, err := loadfarmStore() + if err != nil { + log.Fatalf("error loading farm store: %v\n", err) + } + resStore, err := loadProvisionStore() + if err != nil { + log.Fatalf("error loading provision store: %v\n", err) + } + defer func() { - if err := save(); err != nil { + if err := nodeStore.Save(); err != nil { + log.Printf("failed to save data: %v\n", err) + } + if err := farmStore.Save(); err != nil { log.Printf("failed to save data: %v\n", err) } + if err := resStore.Save(); err != nil { + log.Printf("failed to save reservations: %v\n", err) + } }() router := mux.NewRouter() - router.HandleFunc("/nodes", registerNode).Methods("POST") - router.HandleFunc("/nodes/{node_id}", nodeDetail).Methods("GET") - router.HandleFunc("/nodes/{node_id}/interfaces", registerIfaces).Methods("POST") - router.HandleFunc("/nodes/{node_id}/ports", registerPorts).Methods("POST") - router.HandleFunc("/nodes/{node_id}/configure_public", configurePublic).Methods("POST") - // router.HandleFunc("/nodes/{node_id}/select_exit", chooseExit).Methods("POST") - router.HandleFunc("/nodes/{node_id}/capacity", registerCapacity).Methods("POST") - router.HandleFunc("/nodes", listNodes).Methods("GET") - - router.HandleFunc("/farms", registerFarm).Methods("POST") - router.HandleFunc("/farms", listFarm).Methods("GET") - router.HandleFunc("/farms/{farm_id}", getFarm).Methods("GET") - - // router.HandleFunc("/allocations", registerAlloc).Methods("POST") - // router.HandleFunc("/allocations", listAlloc).Methods("GET") - // router.HandleFunc("/allocations/{node_id}", getAlloc).Methods("GET") - - router.HandleFunc("/reservations/{node_id}", reserve).Methods("POST") - router.HandleFunc("/reservations/{node_id}/poll", pollReservations).Methods("GET") - router.HandleFunc("/reservations/{id}", getReservation).Methods("GET") - router.HandleFunc("/reservations/{id}", reservationResult).Methods("PUT") - router.HandleFunc("/reservations/{id}/deleted", reservationDeleted).Methods("PUT") - router.HandleFunc("/reservations/{id}", deleteReservation).Methods("DELETE") + router.HandleFunc("/nodes", nodeStore.registerNode).Methods("POST") + + router.HandleFunc("/nodes/{node_id}", nodeStore.nodeDetail).Methods("GET") + router.HandleFunc("/nodes/{node_id}/interfaces", nodeStore.registerIfaces).Methods("POST") + router.HandleFunc("/nodes/{node_id}/ports", nodeStore.registerPorts).Methods("POST") + router.HandleFunc("/nodes/{node_id}/configure_public", nodeStore.configurePublic).Methods("POST") + router.HandleFunc("/nodes/{node_id}/capacity", nodeStore.registerCapacity).Methods("POST") + router.HandleFunc("/nodes/{node_id}/uptime", nodeStore.updateUptimeHandler).Methods("POST") + router.HandleFunc("/nodes", nodeStore.listNodes).Methods("GET") + + router.HandleFunc("/farms", farmStore.registerFarm).Methods("POST") + router.HandleFunc("/farms", farmStore.listFarm).Methods("GET") + router.HandleFunc("/farms/{farm_id}", farmStore.getFarm).Methods("GET") + + // compatibility with gedis_http + router.HandleFunc("/nodes/list", nodeStore.cockpitListNodes).Methods("POST") + router.HandleFunc("/farms/list", farmStore.cockpitListFarm).Methods("POST") + + router.HandleFunc("/reservations/{node_id}", nodeStore.Requires("node_id", resStore.reserve)).Methods("POST") + router.HandleFunc("/reservations/{node_id}/poll", nodeStore.Requires("node_id", resStore.poll)).Methods("GET") + router.HandleFunc("/reservations/{id}", resStore.get).Methods("GET") + router.HandleFunc("/reservations/{id}", resStore.putResult).Methods("PUT") + router.HandleFunc("/reservations/{id}/deleted", resStore.putDeleted).Methods("PUT") + router.HandleFunc("/reservations/{id}", resStore.delete).Methods("DELETE") log.Printf("start on %s\n", listen) - loggedRouter := handlers.LoggingHandler(os.Stderr, router) + r := handlers.LoggingHandler(os.Stderr, router) + r = handlers.CORS()(r) + s := &http.Server{ Addr: listen, - Handler: loggedRouter, + Handler: r, } c := make(chan os.Signal) @@ -128,46 +95,3 @@ func main() { log.Printf("error during server shutdown: %v\n", err) } } - -func save() error { - stores := map[string]interface{}{ - "nodes": nodeStore, - "farms": farmStore, - "allocations": allocStore, - "reservations": provStore, - } - for name, store := range stores { - f, err := os.OpenFile(name+".json", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0660) - if err != nil { - return err - } - defer f.Close() - if err := json.NewEncoder(f).Encode(store); err != nil { - return err - } - } - return nil -} - -func load() error { - stores := map[string]interface{}{ - "nodes": &nodeStore, - "farms": &farmStore, - "allocations": &allocStore, - "reservations": &provStore, - } - for name, store := range stores { - f, err := os.OpenFile(name+".json", os.O_RDONLY, 0660) - if err != nil { - if os.IsNotExist(err) { - continue - } - return err - } - defer f.Close() - if err := json.NewDecoder(f).Decode(store); err != nil { - return err - } - } - return nil -} diff --git a/tools/bcdb_mock/network.go b/tools/bcdb_mock/network.go deleted file mode 100644 index 5a5acfd0c..000000000 --- a/tools/bcdb_mock/network.go +++ /dev/null @@ -1,95 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log" - "net/http" - - "github.com/gorilla/mux" - "github.com/threefoldtech/zos/pkg/network/types" -) - -func registerIfaces(w http.ResponseWriter, r *http.Request) { - log.Println("network interfaces register request received") - - nodeID := mux.Vars(r)["node_id"] - if _, ok := nodeStore[nodeID]; !ok { - err := fmt.Errorf("node id %s not found", nodeID) - log.Printf("node not found %v", nodeID) - http.Error(w, err.Error(), http.StatusNotFound) - return - } - - defer r.Body.Close() - - ifaces := []*types.IfaceInfo{} - if err := json.NewDecoder(r.Body).Decode(&ifaces); err != nil { - log.Printf(err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - fmt.Println("network interfaces received", ifaces) - - nodeStore[nodeID].Ifaces = ifaces - w.WriteHeader(http.StatusCreated) -} - -func registerPorts(w http.ResponseWriter, r *http.Request) { - - nodeID := mux.Vars(r)["node_id"] - if _, ok := nodeStore[nodeID]; !ok { - err := fmt.Errorf("node id %s not found", nodeID) - log.Printf("node not found %v", nodeID) - http.Error(w, err.Error(), http.StatusNotFound) - return - } - - defer r.Body.Close() - - input := struct { - Ports []uint `json:"ports"` - }{} - if err := json.NewDecoder(r.Body).Decode(&input); err != nil { - log.Printf(err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - fmt.Println("wireguard ports received", input.Ports) - - nodeStore[nodeID].WGPorts = input.Ports - w.WriteHeader(http.StatusOK) -} - -func configurePublic(w http.ResponseWriter, r *http.Request) { - nodeID := mux.Vars(r)["node_id"] - node, ok := nodeStore[nodeID] - if !ok { - http.Error(w, fmt.Sprintf("node id %s not found", nodeID), http.StatusNotFound) - return - } - - if _, ok = farmStore[node.FarmID]; !ok { - http.Error(w, fmt.Sprintf("farm id %s not found", node.FarmID), http.StatusNotFound) - return - } - - version := 0 - if node.PublicConfig != nil { - version = node.PublicConfig.Version - } - node.PublicConfig = &types.PubIface{} - - defer r.Body.Close() - if err := json.NewDecoder(r.Body).Decode(node.PublicConfig); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - node.PublicConfig.Type = types.MacVlanIface //TODO: change me once we support other types - node.PublicConfig.Version = version + 1 - - w.WriteHeader(http.StatusCreated) -} diff --git a/tools/bcdb_mock/node_handlers.go b/tools/bcdb_mock/node_handlers.go new file mode 100644 index 000000000..a753f5b7c --- /dev/null +++ b/tools/bcdb_mock/node_handlers.go @@ -0,0 +1,217 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + + "github.com/threefoldtech/zos/pkg/gedis/types/directory" + "github.com/threefoldtech/zos/pkg/network/types" + + "github.com/gorilla/mux" + "github.com/threefoldtech/zos/pkg/capacity/dmi" +) + +func (s *nodeStore) registerNode(w http.ResponseWriter, r *http.Request) { + log.Println("node register request received") + + defer r.Body.Close() + + n := directory.TfgridNode2{} + if err := json.NewDecoder(r.Body).Decode(&n); err != nil { + httpError(w, err, http.StatusBadRequest) + return + } + + if err := s.Add(n); err != nil { + httpError(w, err, http.StatusInternalServerError) + return + } + log.Printf("node registered: %+v\n", n) + + w.WriteHeader(http.StatusCreated) +} + +func (s *nodeStore) nodeDetail(w http.ResponseWriter, r *http.Request) { + nodeID := mux.Vars(r)["node_id"] + node, err := s.Get(nodeID) + if err != nil { + httpError(w, err, http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(&node); err != nil { + log.Printf("error writing node: %v", err) + } +} + +func (s *nodeStore) listNodes(w http.ResponseWriter, r *http.Request) { + nodes := s.List() + farm := r.URL.Query().Get("farm") + + for i, node := range nodes { + if node == nil { + nodes = append(nodes[:i], nodes[i+1:]...) + continue + } + + if farm != "" && node.FarmID != farm { + nodes = append(nodes[:i], nodes[i+1:]...) + continue + } + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(nodes) +} + +func (s *nodeStore) cockpitListNodes(w http.ResponseWriter, r *http.Request) { + nodes := s.List() + farm := r.URL.Query().Get("farm") + + for i, node := range nodes { + if node == nil { + nodes = append(nodes[:i], nodes[i+1:]...) + continue + } + + if farm != "" && node.FarmID != farm { + nodes = append(nodes[:i], nodes[i+1:]...) + continue + } + } + + x := struct { + Node []*directory.TfgridNode2 `json:"nodes"` + }{nodes} + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(x) +} + +func (s *nodeStore) registerCapacity(w http.ResponseWriter, r *http.Request) { + x := struct { + Capacity directory.TfgridNodeResourceAmount1 `json:"capacity,omitempty"` + DMI *dmi.DMI `json:"dmi,omitempty"` + }{} + + if err := json.NewDecoder(r.Body).Decode(&x); err != nil { + httpError(w, err, http.StatusBadRequest) + return + } + + nodeID := mux.Vars(r)["node_id"] + if err := s.updateTotalCapacity(nodeID, x.Capacity); err != nil { + httpError(w, err, http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) +} + +func (s *nodeStore) registerIfaces(w http.ResponseWriter, r *http.Request) { + log.Println("network interfaces register request received") + + defer r.Body.Close() + + input := []*types.IfaceInfo{} + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + log.Printf(err.Error()) + httpError(w, err, http.StatusBadRequest) + return + } + + log.Println("network interfaces received", input) + ifaces := make([]directory.TfgridNodeIface1, len(input)) + for i, iface := range input { + ifaces[i].Gateway = iface.Gateway + ifaces[i].Name = iface.Name + for _, r := range iface.Addrs { + ifaces[i].Addrs = append(ifaces[i].Addrs, r.ToSchema()) + } + } + + nodeID := mux.Vars(r)["node_id"] + if err := s.SetInterfaces(nodeID, ifaces); err != nil { + httpError(w, err, http.StatusNotFound) + return + } + + w.WriteHeader(http.StatusCreated) +} + +func (s *nodeStore) configurePublic(w http.ResponseWriter, r *http.Request) { + iface := types.PubIface{} + + defer r.Body.Close() + if err := json.NewDecoder(r.Body).Decode(&iface); err != nil { + httpError(w, err, http.StatusBadRequest) + return + } + + cfg := directory.TfgridNodePublicIface1{ + Gw4: iface.GW4, + Gw6: iface.GW6, + Master: iface.Master, + Type: directory.TfgridNodePublicIface1TypeMacvlan, + Version: int64(iface.Version), + } + + nodeID := mux.Vars(r)["node_id"] + if err := s.SetPublicConfig(nodeID, cfg); err != nil { + httpError(w, err, http.StatusNotFound) + return + } + + w.WriteHeader(http.StatusCreated) +} + +func (s *nodeStore) registerPorts(w http.ResponseWriter, r *http.Request) { + + defer r.Body.Close() + + input := struct { + Ports []uint `json:"ports"` + }{} + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + httpError(w, err, http.StatusBadRequest) + return + } + + fmt.Println("wireguard ports received", input.Ports) + + nodeID := mux.Vars(r)["node_id"] + if err := s.SetWGPorts(nodeID, input.Ports); err != nil { + httpError(w, err, http.StatusNotFound) + return + } + w.WriteHeader(http.StatusOK) +} + +func (s *nodeStore) updateUptimeHandler(w http.ResponseWriter, r *http.Request) { + + defer r.Body.Close() + + input := struct { + Uptime uint64 + }{} + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + httpError(w, err, http.StatusBadRequest) + return + } + + nodeID := mux.Vars(r)["node_id"] + fmt.Printf("node uptime received %s %d\n", nodeID, input.Uptime) + + if err := s.updateUptime(nodeID, int64(input.Uptime)); err != nil { + httpError(w, err, http.StatusNotFound) + return + } + w.WriteHeader(http.StatusOK) +} diff --git a/tools/bcdb_mock/nodes.go b/tools/bcdb_mock/nodes.go deleted file mode 100644 index 83018802c..000000000 --- a/tools/bcdb_mock/nodes.go +++ /dev/null @@ -1,137 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log" - "net/http" - - "github.com/gorilla/mux" - "github.com/threefoldtech/zos/pkg/capacity" - "github.com/threefoldtech/zos/pkg/capacity/dmi" - "github.com/threefoldtech/zos/pkg/network/types" -) - -func registerNode(w http.ResponseWriter, r *http.Request) { - log.Println("node register request received") - - defer r.Body.Close() - - n := node{} - if err := json.NewDecoder(r.Body).Decode(&n); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - log.Printf("node registered: %+v\n", n) - - i, ok := nodeStore[n.NodeID] - if !ok || i.Node == nil { - nodeStore[n.NodeID] = &n - - } else { - i.NodeID = n.NodeID - i.FarmID = n.FarmID - i.Version = n.Version - } - - w.WriteHeader(http.StatusCreated) -} - -func nodeDetail(w http.ResponseWriter, r *http.Request) { - nodeID := mux.Vars(r)["node_id"] - node, ok := nodeStore[nodeID] - if !ok { - http.Error(w, fmt.Sprintf("node id %s not found", nodeID), http.StatusNotFound) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - if err := json.NewEncoder(w).Encode(&node); err != nil { - log.Printf("error writing node: %v", err) - } -} - -func listNodes(w http.ResponseWriter, r *http.Request) { - var nodes = make([]*types.Node, 0, len(nodeStore)) - farm := r.URL.Query().Get("farm") - - for _, node := range nodeStore { - if node == nil { - continue - } - - if farm != "" && node.FarmID != farm { - continue - } - nodes = append(nodes, node.Node) - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _ = json.NewEncoder(w).Encode(nodes) -} - -func registerFarm(w http.ResponseWriter, r *http.Request) { - log.Println("farm register request received") - - defer r.Body.Close() - - info := farmInfo{} - if err := json.NewDecoder(r.Body).Decode(&info); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - farmStore[info.ID] = &info - w.WriteHeader(http.StatusCreated) -} - -func listFarm(w http.ResponseWriter, r *http.Request) { - var farms = make([]*farmInfo, 0, len(farmStore)) - for _, info := range farmStore { - farms = append(farms, info) - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _ = json.NewEncoder(w).Encode(farms) -} - -func getFarm(w http.ResponseWriter, r *http.Request) { - farmID := mux.Vars(r)["farm_id"] - farm, ok := farmStore[farmID] - if !ok { - http.Error(w, fmt.Sprintf("farm %s not found", farmID), http.StatusNotFound) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _ = json.NewEncoder(w).Encode(farm) -} - -func registerCapacity(w http.ResponseWriter, r *http.Request) { - x := struct { - Capacity capacity.Capacity `json:"capacity,omitempty"` - DMI *dmi.DMI `json:"dmi,omitempty"` - }{} - - nodeID := mux.Vars(r)["node_id"] - fmt.Println("search node", nodeID) - node, ok := nodeStore[nodeID] - if !ok { - http.Error(w, fmt.Sprintf("node id %s not found", nodeID), http.StatusNotFound) - return - } - - if err := json.NewDecoder(r.Body).Decode(&x); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - node.Capacity = x.Capacity - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) -} diff --git a/tools/bcdb_mock/nodes_store.go b/tools/bcdb_mock/nodes_store.go new file mode 100644 index 000000000..41a9a6875 --- /dev/null +++ b/tools/bcdb_mock/nodes_store.go @@ -0,0 +1,186 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "sync" + "time" + + "github.com/gorilla/mux" + "github.com/pkg/errors" + "github.com/threefoldtech/zos/pkg/schema" + + "github.com/threefoldtech/zos/pkg/gedis/types/directory" +) + +type nodeStore struct { + Nodes []*directory.TfgridNode2 `json:"nodes"` + m sync.RWMutex +} + +func loadNodeStore() (*nodeStore, error) { + store := &nodeStore{ + Nodes: []*directory.TfgridNode2{}, + } + f, err := os.OpenFile("nodes.json", os.O_RDONLY, 0660) + if err != nil { + if os.IsNotExist(err) { + return store, nil + } + return store, err + } + defer f.Close() + if err := json.NewDecoder(f).Decode(&store); err != nil { + return store, err + } + return store, nil +} + +func (s *nodeStore) Save() error { + s.m.RLock() + defer s.m.RUnlock() + + f, err := os.OpenFile("nodes.json", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0660) + if err != nil { + return err + } + defer f.Close() + if err := json.NewEncoder(f).Encode(s); err != nil { + return err + } + return nil +} + +func (s *nodeStore) List() []*directory.TfgridNode2 { + s.m.RLock() + defer s.m.RUnlock() + out := make([]*directory.TfgridNode2, len(s.Nodes)) + + copy(out, s.Nodes) + return out +} + +func (s *nodeStore) Get(nodeID string) (*directory.TfgridNode2, error) { + s.m.RLock() + defer s.m.RUnlock() + + for _, n := range s.Nodes { + if n.NodeID == nodeID { + return n, nil + } + } + return nil, fmt.Errorf("node %s not found", nodeID) +} + +func (s *nodeStore) Add(node directory.TfgridNode2) error { + s.m.Lock() + defer s.m.Unlock() + + for i, n := range s.Nodes { + if n.NodeID == node.NodeID { + s.Nodes[i].FarmID = node.FarmID + s.Nodes[i].OsVersion = node.OsVersion + s.Nodes[i].Location = node.Location + s.Nodes[i].Updated = schema.Date{Time: time.Now()} + return nil + } + } + + node.Created = schema.Date{Time: time.Now()} + node.Updated = schema.Date{Time: time.Now()} + s.Nodes = append(s.Nodes, &node) + return nil +} + +func (s *nodeStore) updateTotalCapacity(nodeID string, cap directory.TfgridNodeResourceAmount1) error { + return s.updateCapacity(nodeID, "total", cap) +} +func (s *nodeStore) updateReservedCapacity(nodeID string, cap directory.TfgridNodeResourceAmount1) error { + return s.updateCapacity(nodeID, "reserved", cap) +} +func (s *nodeStore) updateUsedCapacity(nodeID string, cap directory.TfgridNodeResourceAmount1) error { + return s.updateCapacity(nodeID, "used", cap) +} + +func (s *nodeStore) updateCapacity(nodeID string, t string, cap directory.TfgridNodeResourceAmount1) error { + node, err := s.Get(nodeID) + if err != nil { + return err + } + + switch t { + case "total": + node.TotalResources = cap + case "reserved": + node.ReservedResources = cap + case "used": + node.UsedResources = cap + default: + return fmt.Errorf("unsupported capacity type: %v", t) + } + + return nil +} + +func (s *nodeStore) updateUptime(nodeID string, uptime int64) error { + node, err := s.Get(nodeID) + if err != nil { + return err + } + + node.Uptime = uptime + node.Updated = schema.Date{Time: time.Now()} + + return nil +} + +func (s *nodeStore) SetInterfaces(nodeID string, ifaces []directory.TfgridNodeIface1) error { + node, err := s.Get(nodeID) + if err != nil { + return err + } + + node.Ifaces = ifaces + return nil +} + +func (s *nodeStore) SetPublicConfig(nodeID string, cfg directory.TfgridNodePublicIface1) error { + node, err := s.Get(nodeID) + if err != nil { + return err + } + + node.PublicConfig = &cfg + return nil +} + +func (s *nodeStore) SetWGPorts(nodeID string, ports []uint) error { + node, err := s.Get(nodeID) + if err != nil { + return err + } + + node.WGPorts = ports + return nil +} + +func (s *nodeStore) Requires(key string, handler http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + nodeID, ok := mux.Vars(r)[key] + if !ok { + // programming error, we should panic in this case + panic("invalid node-id key") + } + + _, err := s.Get(nodeID) + if err != nil { + // node not found + httpError(w, errors.Wrapf(err, "node not found: %s", nodeID), http.StatusNotFound) + return + } + + handler(w, r) + } +} diff --git a/tools/bcdb_mock/provision.go b/tools/bcdb_mock/provision.go deleted file mode 100644 index 029b8c041..000000000 --- a/tools/bcdb_mock/provision.go +++ /dev/null @@ -1,228 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log" - "net/http" - "strconv" - "time" - - "github.com/gorilla/mux" - "github.com/threefoldtech/zos/pkg/provision" -) - -func reserve(w http.ResponseWriter, r *http.Request) { - nodeID := mux.Vars(r)["node_id"] - - _, ok := nodeStore[nodeID] - if !ok { - http.Error(w, fmt.Sprintf("node %s not found", nodeID), http.StatusNotFound) - return - } - - defer r.Body.Close() - res := &provision.Reservation{} - if err := json.NewDecoder(r.Body).Decode(res); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - if err := provision.Verify(res); err != nil { - errmsg := fmt.Sprintf("reservation signature invalid: %s", err.Error()) - http.Error(w, errmsg, http.StatusBadRequest) - return - } - - provStore.Lock() - defer provStore.Unlock() - - res.ID = fmt.Sprintf("r-%d", len(provStore.Reservations)) - provStore.Reservations = append(provStore.Reservations, &reservation{ - Reservation: res, - NodeID: nodeID, - }) - w.Header().Set("Location", "/reservations/"+res.ID) - w.WriteHeader(http.StatusCreated) -} - -func pollReservations(w http.ResponseWriter, r *http.Request) { - nodeID := mux.Vars(r)["node_id"] - var since time.Time - s := r.URL.Query().Get("since") - if s == "" { - // if since is not specificed, send all reservation since last hour - since = time.Now().Add(-time.Hour) - } else { - timestamp, err := strconv.ParseInt(s, 10, 64) - if err != nil { - http.Error(w, "since query argument format not valid", http.StatusBadRequest) - return - } - since = time.Unix(timestamp, 0) - } - - _, ok := nodeStore[nodeID] - if !ok { - http.Error(w, fmt.Sprintf("node %s not found", nodeID), http.StatusNotFound) - return - } - - all, err := strconv.ParseBool(r.URL.Query().Get("all")) - if err != nil { - all = false - } - - output := []*provision.Reservation{} - if all { - // just get all reservation for this nodeID - output = getRes(nodeID, all, since) - } else { - // otherwise start long polling - timeout := time.Now().Add(time.Second * 20) - for { - output = getRes(nodeID, all, since) - if len(output) > 0 { - break - } - - if time.Now().After(timeout) { - break - } - time.Sleep(time.Second) - } - } - - w.Header().Add("content-type", "application/json") - w.WriteHeader(http.StatusOK) - if err := json.NewEncoder(w).Encode(output); err != nil { - log.Printf("error encoding empty reservation slice: %v", err) - } -} - -func getRes(nodeID string, all bool, since time.Time) []*provision.Reservation { - output := []*provision.Reservation{} - - provStore.Lock() - defer provStore.Unlock() - - for _, r := range provStore.Reservations { - // skip reservation aimed at another node - if r.NodeID != nodeID { - continue - } - - if all || - (!r.Reservation.Expired() && since.Before(r.Reservation.Created)) || - (r.Reservation.ToDelete && !r.Deleted) { - output = append(output, r.Reservation) - } - } - - return output -} - -func getReservation(w http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - - provStore.Lock() - defer provStore.Unlock() - - w.Header().Add("content-type", "application/json") - - obj := struct { - Reservation *provision.Reservation `json:"reservation"` - Result *provision.Result `json:"result"` - }{} - - for _, r := range provStore.Reservations { - if r.Reservation.ID == id { - w.WriteHeader(http.StatusOK) - obj.Reservation = r.Reservation - obj.Result = r.Result - if err := json.NewEncoder(w).Encode(obj); err != nil { - log.Printf("error during json encoding of reservation: %v", err) - } - return - } - } - - w.WriteHeader(http.StatusNotFound) -} - -func reservationResult(w http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - - provStore.Lock() - - var rsvt *reservation - for _, rsvt = range provStore.Reservations { - if rsvt.Reservation.ID == id { - break - } - } - provStore.Unlock() - - if r == nil { - http.Error(w, fmt.Sprintf("reservation %s not found", id), http.StatusNotFound) - return - } - - w.Header().Add("content-type", "application/json") - - defer r.Body.Close() - result := &provision.Result{} - if err := json.NewDecoder(r.Body).Decode(result); err != nil { - log.Printf("failed to decode reservation result: %v", err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - rsvt.Result = result - - w.WriteHeader(http.StatusOK) -} - -func reservationDeleted(w http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - - provStore.Lock() - defer provStore.Unlock() - - var rsvt *reservation - for _, rsvt = range provStore.Reservations { - if rsvt.Reservation.ID == id { - break - } - } - - if r == nil { - http.Error(w, fmt.Sprintf("reservation %s not found", id), http.StatusNotFound) - return - } - - rsvt.Deleted = true - - w.WriteHeader(http.StatusOK) - -} - -func deleteReservation(w http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - - provStore.Lock() - defer provStore.Unlock() - - w.Header().Add("content-type", "application/json") - - for _, r := range provStore.Reservations { - if r.Reservation.ID == id { - - r.Reservation.ToDelete = true - - w.WriteHeader(http.StatusOK) - return - } - } - - w.WriteHeader(http.StatusNotFound) -} diff --git a/tools/bcdb_mock/reservation_handlers.go b/tools/bcdb_mock/reservation_handlers.go new file mode 100644 index 000000000..9779ef0ab --- /dev/null +++ b/tools/bcdb_mock/reservation_handlers.go @@ -0,0 +1,152 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "strconv" + "time" + + "github.com/gorilla/mux" + "github.com/threefoldtech/zos/pkg/provision" +) + +func (s *reservationsStore) reserve(w http.ResponseWriter, r *http.Request) { + nodeID := mux.Vars(r)["node_id"] + + defer r.Body.Close() + res := &provision.Reservation{} + if err := json.NewDecoder(r.Body).Decode(res); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := provision.Verify(res); err != nil { + errmsg := fmt.Sprintf("reservation signature invalid: %s", err.Error()) + http.Error(w, errmsg, http.StatusBadRequest) + return + } + + s.Add(nodeID, res) + + w.Header().Set("Location", "/reservations/"+res.ID) + w.WriteHeader(http.StatusCreated) +} + +func (s *reservationsStore) poll(w http.ResponseWriter, r *http.Request) { + nodeID := mux.Vars(r)["node_id"] + var since time.Time + sinceStr := r.URL.Query().Get("since") + if sinceStr == "" { + // if since is not specificed, send all reservation since last hour + since = time.Now().Add(-time.Hour) + } else { + timestamp, err := strconv.ParseInt(sinceStr, 10, 64) + if err != nil { + http.Error(w, "since query argument format not valid", http.StatusBadRequest) + return + } + since = time.Unix(timestamp, 0) + } + + all, err := strconv.ParseBool(r.URL.Query().Get("all")) + if err != nil { + all = false + } + + output := []*provision.Reservation{} + if all { + // just get all reservation for this nodeID + output = s.GetReservations(nodeID, all, since) + } else { + // otherwise start long polling + timeout := time.Now().Add(time.Second * 20) + for { + output = s.GetReservations(nodeID, all, since) + if len(output) > 0 { + break + } + + if time.Now().After(timeout) { + break + } + time.Sleep(time.Second) + } + } + + w.Header().Add("content-type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(output); err != nil { + log.Printf("error encoding empty reservation slice: %v", err) + } +} + +func (s *reservationsStore) get(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + + reservation, err := s.Get(id) + if err != nil { + httpError(w, err, http.StatusNotFound) + return + } + + w.Header().Add("content-type", "application/json") + w.WriteHeader(http.StatusOK) + + if err := json.NewEncoder(w).Encode(reservation.Reservation); err != nil { + log.Printf("error during json encoding of reservation: %v", err) + } +} + +func (s *reservationsStore) putResult(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + + reservation, err := s.Get(id) + if err != nil { + httpError(w, err, http.StatusNotFound) + return + } + + w.Header().Add("content-type", "application/json") + + defer r.Body.Close() + if err := json.NewDecoder(r.Body).Decode(&reservation.Result); err != nil { + log.Printf("failed to decode reservation result: %v", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) +} + +func (s *reservationsStore) putDeleted(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + + reservation, err := s.Get(id) + if err != nil { + httpError(w, err, http.StatusNotFound) + return + } + + reservation.Deleted = true + + w.WriteHeader(http.StatusOK) + +} + +func (s *reservationsStore) delete(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + + w.Header().Add("content-type", "application/json") + + reservation, err := s.Get(id) + if err != nil { + httpError(w, err, http.StatusNotFound) + return + } + + reservation.Reservation.ToDelete = true + + w.WriteHeader(http.StatusOK) +} diff --git a/tools/bcdb_mock/reservation_store.go b/tools/bcdb_mock/reservation_store.go new file mode 100644 index 000000000..0d246635e --- /dev/null +++ b/tools/bcdb_mock/reservation_store.go @@ -0,0 +1,111 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "sync" + "time" + + "github.com/threefoldtech/zos/pkg/provision" +) + +type reservation struct { + Reservation *provision.Reservation `json:"reservation"` + Result *provision.Result `json:"result"` + Deleted bool `json:"deleted"` + NodeID string `json:"node_id"` +} + +type reservationsStore struct { + Reservations []*reservation `json:"reservations"` + m sync.RWMutex +} + +func loadProvisionStore() (*reservationsStore, error) { + store := &reservationsStore{ + Reservations: []*reservation{}, + } + f, err := os.OpenFile("reservations.json", os.O_RDONLY, 0660) + if err != nil { + if os.IsNotExist(err) { + return store, nil + } + return store, err + } + defer f.Close() + if err := json.NewDecoder(f).Decode(&store); err != nil { + return store, err + } + return store, nil +} + +func (s *reservationsStore) Save() error { + s.m.RLock() + defer s.m.RUnlock() + + f, err := os.OpenFile("reservations.json", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0660) + if err != nil { + return err + } + defer f.Close() + if err := json.NewEncoder(f).Encode(s); err != nil { + return err + } + return nil +} + +func (s *reservationsStore) List() []*reservation { + s.m.RLock() + defer s.m.RUnlock() + out := make([]*reservation, len(s.Reservations)) + + copy(out, s.Reservations) + return out +} + +func (s *reservationsStore) Get(ID string) (*reservation, error) { + s.m.RLock() + defer s.m.RUnlock() + + for _, r := range s.Reservations { + if r.Reservation.ID == ID { + return r, nil + } + } + + return nil, fmt.Errorf("reservation %s not found", ID) +} + +func (s *reservationsStore) Add(nodeID string, res *provision.Reservation) error { + s.m.Lock() + defer s.m.Unlock() + res.ID = fmt.Sprintf("r-%d", len(s.Reservations)) + s.Reservations = append(s.Reservations, &reservation{ + NodeID: nodeID, + Reservation: res, + }) + return nil +} + +func (s *reservationsStore) GetReservations(nodeID string, all bool, since time.Time) []*provision.Reservation { + output := []*provision.Reservation{} + + s.m.RLock() + defer s.m.RUnlock() + + for _, r := range s.Reservations { + // skip reservation aimed at another node + if r.NodeID != nodeID { + continue + } + + if all || + (!r.Reservation.Expired() && since.Before(r.Reservation.Created)) || + (r.Reservation.ToDelete && !r.Deleted) { + output = append(output, r.Reservation) + } + } + + return output +} diff --git a/tools/bcdb_mock/tnodb.go b/tools/bcdb_mock/tnodb.go deleted file mode 100644 index 9d4837251..000000000 --- a/tools/bcdb_mock/tnodb.go +++ /dev/null @@ -1,98 +0,0 @@ -package main - -// import ( -// "fmt" -// "math/rand" -// "net" -// "time" - -// "github.com/threefoldtech/zos/pkg/network/ip" -// "github.com/threefoldtech/zos/pkg/network/types" - -// "github.com/dspinhirne/netaddr-go" -// ) - -// func init() { -// rand.Seed(time.Now().UnixNano()) -// } - -// func requestAllocation(node *types.Node, store *allocationStore) (*net.IPNet, *net.IPNet, error) { -// store.Lock() -// defer store.Unlock() -// farmAlloc, ok := store.Allocations[node.FarmID] -// if !ok { -// return nil, nil, fmt.Errorf("farm %s does not have a prefix registered", node.FarmID) -// } - -// newAlloc, err := allocate(farmAlloc, uint8(node.ExitNode)) -// if err != nil { -// return nil, nil, err -// } - -// return newAlloc, farmAlloc.Allocation, nil -// } - -// func getNetworkZero(farm string, store *allocationStore) (*net.IPNet, int, error) { -// store.Lock() -// defer store.Unlock() -// farmAlloc, ok := store.Allocations[farm] -// if !ok { -// return nil, 0, fmt.Errorf("farm %s does not have a prefix registered", farm) -// } - -// ipv6net, err := netaddr.ParseIPv6Net(farmAlloc.Allocation.String()) -// if err != nil { -// return nil, 0, err -// } -// subnet := ipv6net.NthSubnet(64, 0) -// allocSize, _ := farmAlloc.Allocation.Mask.Size() -// return convert(subnet), allocSize, nil -// } - -// func allocate(farmAlloc *allocation, exitNodeNR uint8) (*net.IPNet, error) { -// ipv6net, err := netaddr.ParseIPv6Net(farmAlloc.Allocation.String()) -// if err != nil { -// return nil, err -// } - -// subnetCount := ipv6net.SubnetCount(64) -// if uint64(len(farmAlloc.SubNetUsed)) >= subnetCount { -// return nil, fmt.Errorf("all subnets already allocated") -// } - -// // random from 000f to subnetCount -// // we never hand out the network 0 to f cause we keep it for -// // administrative purposes (routing segment, mgmt, tunnel sources... ) -// rnd := rand.Int63n(int64(subnetCount)-16) + 16 -// for { -// if !isIn(rnd, farmAlloc.SubNetUsed) { -// farmAlloc.SubNetUsed = append(farmAlloc.SubNetUsed, uint64(rnd)) -// break -// } -// rnd = rand.Int63n(int64(subnetCount)-16) + 16 -// } - -// subnet := ipv6net.NthSubnet(64, uint64(rnd)) -// alloc := ip.ExitNodeRange(convert(subnet), exitNodeNR, uint16(rnd)) -// return alloc, nil - -// } - -// // FIXME: use someting better then O(n) -// func isIn(target int64, list []uint64) bool { -// for _, x := range list { -// if uint64(target) == x { -// return true -// } -// } -// return false -// } - -// // FIXME: avoid passing by string representation to convert -// func convert(subnet *netaddr.IPv6Net) *net.IPNet { -// _, net, err := net.ParseCIDR(subnet.String()) -// if err != nil { -// panic(err) -// } -// return net -// } diff --git a/tools/bcdb_mock/tnodb_test.go b/tools/bcdb_mock/tnodb_test.go deleted file mode 100644 index c01aaf08e..000000000 --- a/tools/bcdb_mock/tnodb_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -import ( - "fmt" - "net" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestPrefixAllocation(t *testing.T) { - for _, tc := range []struct { - Alloc string - }{ - {"2a02:2788:0000::/48"}, - {"2a02:2788:1000::/48"}, - {"2a02:2788:0100::/48"}, - } { - _, a, err := net.ParseCIDR(tc.Alloc) - require.NoError(t, err) - alloc := &allocation{Allocation: a} - subnet, err := allocate(alloc) - require.NoError(t, err) - fmt.Println(subnet.String()) - } -} diff --git a/tools/tfuser/cmds_provivion_test.go b/tools/tfuser/cmds_provivion_test.go new file mode 100644 index 000000000..3e22ec29e --- /dev/null +++ b/tools/tfuser/cmds_provivion_test.go @@ -0,0 +1,68 @@ +package main + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/threefoldtech/zos/pkg/identity" + "github.com/threefoldtech/zos/pkg/provision" +) + +func TestReservationSignature(t *testing.T) { + schema := []byte(`{ + "id": "", + "user_id": "Fuy6CatZGmmtWby13MESN2dYZUJi1ThjPR493w9TyWz1", + "type": "container", + "data": { + "flist": "https://hub.grid.tf/zaibon/zaibon-ubuntu-ssh-0.0.2.flist", + "flist_storage": "", + "env": {}, + "entrypoint": "/sbin/my_init", + "interactive": false, + "mounts": [], + "network": { + "network_id": "zaibon_dev_network", + "ips": [ + "172.22.4.20" + ] + } + }, + "created": "2019-10-10T10:18:46.704065596+02:00", + "duration": 300000000000, + "signature": "nJMRSZw7whyu6eTQ0sSEvS+RiHRRvk238MzsKeI6QFYQF4UGlivptcn9YN9uDm7C11a6GEJzli9Gr7YXE4dQDA==", + "to_delete": false + }`) + keypair, err := identity.GenerateKeyPair() + require.NoError(t, err) + + r := &provision.Reservation{} + err = json.Unmarshal(schema, r) + require.NoError(t, err) + + r.Duration = time.Second * 10 + r.Created = time.Now() + r.User = keypair.Identity() + + err = r.Sign(keypair.PrivateKey) + require.NoError(t, err) + + err = provision.Verify(r) + assert.NoError(t, err) + + r = &provision.Reservation{} + err = json.Unmarshal(schema, r) + require.NoError(t, err) + + r.Duration = time.Second * 10 + r.Created = time.Now() + r.User = keypair.Identity() + + err = r.Sign(keypair.PrivateKey) + require.NoError(t, err) + + err = provision.Verify(r) + assert.NoError(t, err) +}