Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions cmds/capacityd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package main

import (
"flag"
"os"
"time"

"github.com/rs/zerolog"

"github.com/cenkalti/backoff/v3"

"github.com/pkg/errors"
"github.com/threefoldtech/zos/pkg/capacity"
"github.com/threefoldtech/zos/pkg/environment"
Expand Down Expand Up @@ -32,6 +37,8 @@ func main() {
version.ShowAndExit(false)
}

log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})

redis, err := zbus.NewRedisClient(msgBrokerCon)
if err != nil {
log.Fatal().Msgf("fail to connect to message broker server: %v", err)
Expand Down Expand Up @@ -68,13 +75,33 @@ func main() {
log.Fatal().Err(err).Msgf("failed to write resources capacity on BCDB")
}

for {
<-time.After(time.Minute * 10)
sendUptime := func() error {
uptime, err := r.Uptime()
if err != nil {
log.Error().Err(err).Msgf("failed to read uptime")
return err
}

log.Info().Msg("send heart-beat to BCDB")
if err := store.Ping(identity.NodeID()); err != nil {
if err := store.Ping(identity.NodeID(), uptime); err != nil {
log.Error().Err(err).Msgf("failed to send heart-beat to BCDB")
return err
}
return nil
}
if err := sendUptime(); err != nil {
log.Fatal().Err(err).Send()
}

for {
next := time.Now().Add(time.Minute * 10)

if time.Now().After(next) {
backoff.Retry(sendUptime, backoff.NewExponentialBackOff())
continue
}

time.Sleep(time.Minute)
}
}

Expand Down
16 changes: 13 additions & 3 deletions cmds/identityd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pkg/errors"
"github.com/threefoldtech/zos/pkg/gedis"
"github.com/threefoldtech/zos/pkg/geoip"

"github.com/cenkalti/backoff/v3"
"github.com/threefoldtech/zos/pkg"
Expand Down Expand Up @@ -95,6 +96,10 @@ func main() {
version.ShowAndExit(false)
}

if err := os.MkdirAll(root, 0750); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

identityd failed to start on a clean disk, so this was needed

log.Fatal().Err(err).Str("root", root).Msg("failed to create root directory")
}

zinit, err := zinit.New(zinitSocket)
if err != nil {
log.Fatal().Err(err).Msg("failed to connect to zinit")
Expand Down Expand Up @@ -148,8 +153,13 @@ func main() {
log.Fatal().Err(err).Msg("failed to read farm ID")
}

loc, err := geoip.Fetch()
if err != nil {
log.Fatal().Err(err).Msg("fetch location")
}

register := func(v string) error {
return registerNode(nodeID, farmID, v, idStore)
return registerNode(nodeID, farmID, v, idStore, loc)
}

if err := backoff.Retry(func() error {
Expand Down Expand Up @@ -271,10 +281,10 @@ func bcdbClient() (identity.IDStore, error) {
return store, nil
}

func registerNode(nodeID, farmID pkg.Identifier, version string, store identity.IDStore) error {
func registerNode(nodeID, farmID pkg.Identifier, version string, store identity.IDStore, loc geoip.Location) error {
log.Info().Str("version", version).Msg("start registration of the node")

_, err := store.RegisterNode(nodeID, farmID, version)
_, err := store.RegisterNode(nodeID, farmID, version, loc)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/capacity/capacity.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package capacity

import (
"github.com/shirou/gopsutil/host"
"github.com/threefoldtech/zos/pkg"
"github.com/threefoldtech/zos/pkg/capacity/dmi"
)
Expand Down Expand Up @@ -51,3 +52,12 @@ func (r *ResourceOracle) Total() (c *Capacity, err error) {
func (r *ResourceOracle) DMI() (*dmi.DMI, error) {
return dmi.Decode()
}

// Uptime returns the uptime of the node
func (r *ResourceOracle) Uptime() (uint64, error) {
info, err := host.Info()
if err != nil {
return 0, err
}
return info.Uptime, nil
}
29 changes: 26 additions & 3 deletions pkg/capacity/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// Store is an interface to the bcdb store to report capacity
type Store interface {
Register(nodeID pkg.Identifier, c Capacity, d dmi.DMI) error
Ping(nodeID pkg.Identifier) error
Ping(nodeID pkg.Identifier, uptime uint64) error
}

// BCDBStore implements the store interface using a gedis client to BCDB
Expand All @@ -34,7 +34,9 @@ func (s *BCDBStore) Register(nodeID pkg.Identifier, c Capacity, d dmi.DMI) error
}

// Ping sends an heart-beat to BCDB
func (s *BCDBStore) Ping(nodeID pkg.Identifier) error { return nil }
func (s *BCDBStore) Ping(nodeID pkg.Identifier, uptime uint64) error {
return s.g.UptimeUpdate(nodeID, uptime)
}

// HTTPStore implement the method to push capacity information to BCDB over HTTP
type HTTPStore struct {
Expand Down Expand Up @@ -74,4 +76,25 @@ func (s *HTTPStore) Register(nodeID pkg.Identifier, c Capacity, d dmi.DMI) error
}

// Ping sends an heart-beat to BCDB
func (s *HTTPStore) Ping(nodeID pkg.Identifier) error { return nil }
func (s *HTTPStore) Ping(nodeID pkg.Identifier, uptime uint64) error {
x := struct {
Uptime uint64 `json:"uptime"`
}{uptime}

buf := bytes.Buffer{}
err := json.NewEncoder(&buf).Encode(x)
if err != nil {
return err
}

url := fmt.Sprintf(s.baseURL+"/nodes/%s/uptime", nodeID.Identity())
resp, err := http.Post(url, "application/json", &buf)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("wrong response status code received: %v", resp.Status)
}

return nil
}
15 changes: 11 additions & 4 deletions pkg/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@ func getEnvironmentFromParams(params kernel.Params) Environment {
var env Environment

runmode, found := params.Get("runmode")
if !found {
if !found || len(runmode) < 1 {
// Fallback to default production mode
runmode = make([]string, 1)
runmode[0] = string(RunningMain)
runmode = []string{string(RunningMain)}
}

switch RunningMode(runmode[0]) {
Expand All @@ -91,8 +90,16 @@ func getEnvironmentFromParams(params kernel.Params) Environment {
env = envProd
}

if RunningMode(runmode[0]) == RunningDev {
//allow override of the bcdb url in dev mode
bcdb, found := params.Get("bcdb")
if found && len(bcdb) >= 1 {
env.BcdbURL = bcdb[0]
}
}

farmerID, found := params.Get("farmer_id")
if !found || farmerID[0] == "" {
if !found || len(farmerID) < 1 || farmerID[0] == "" {
// fmt.Println("Warning: no valid farmer_id found in kernel parameter, fallback to orphanage")
env.Orphan = true

Expand Down
16 changes: 16 additions & 0 deletions pkg/gedis/commands_capacity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package gedis

import (
"github.com/threefoldtech/zos/pkg"
)

//UptimeUpdate send the uptime of the node to BCDB
func (g *Gedis) UptimeUpdate(nodeID pkg.Identifier, uptime uint64) error {

_, err := g.Send("nodes", "uptime_update", Args{
"node_id": nodeID.Identity(),
"uptime": uptime,
})

return err
}
32 changes: 9 additions & 23 deletions pkg/gedis/commands_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,21 @@ import (

"github.com/jbenet/go-base58"

"github.com/rs/zerolog/log"
"github.com/threefoldtech/zos/pkg/geoip"

"github.com/pkg/errors"
"github.com/threefoldtech/zos/pkg/gedis/types/directory"
"github.com/threefoldtech/zos/pkg/network"
"github.com/threefoldtech/zos/pkg/network/types"

"github.com/threefoldtech/zos/pkg"
"github.com/threefoldtech/zos/pkg/geoip"
)

//
// IDStore Interface
//

//RegisterNode implements pkg.IdentityManager interface
func (g *Gedis) RegisterNode(nodeID, farmID pkg.Identifier, version string) (string, error) {

l, err := geoip.Fetch()
if err != nil {
log.Error().Err(err).Msg("failed to get location of the node")
}
func (g *Gedis) RegisterNode(nodeID, farmID pkg.Identifier, version string, location geoip.Location) (string, error) {

pk := base58.Decode(nodeID.Identity())

Expand All @@ -40,11 +33,11 @@ func (g *Gedis) RegisterNode(nodeID, farmID pkg.Identifier, version string) (str
OsVersion: version,
PublicKeyHex: hex.EncodeToString(pk),
Location: directory.TfgridLocation1{
Longitude: l.Longitute,
Latitude: l.Latitude,
Continent: l.Continent,
Country: l.Country,
City: l.City,
Longitude: location.Longitute,
Latitude: location.Latitude,
Continent: location.Continent,
Country: location.Country,
City: location.City,
},
},
}))
Expand Down Expand Up @@ -165,9 +158,7 @@ func nodeFromSchema(node directory.TfgridNode2) types.Node {
}(),
PublicConfig: func() *types.PubIface {
cfg := node.PublicConfig
// This is a dirty hack because jsx schema cannot
// differentiate between an embed object not set or with default value
if cfg.Master == "" {
if cfg == nil {
return nil
}
pub := types.PubIface{
Expand All @@ -182,12 +173,7 @@ func nodeFromSchema(node directory.TfgridNode2) types.Node {

return &pub
}(),
ExitNode: func() int {
if node.ExitNode {
return 1
}
return 0
}(),
WGPorts: node.WGPorts,
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/gedis/commands_identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestRegisterNode(t *testing.T) {
pkg.StrIdentifier("node-1"),
pkg.StrIdentifier("farm-1"),
"v1.1.0",
l,
)

require.NoError(err)
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestGetFarm(t *testing.T) {

conn.On("Do", "default.farms.get", mustMarshal(t, args)).
Return(mustMarshal(t, directory.TfgridFarm1{
ID: 100,
ID: "100",
Name: "farm-1",
}), nil)

Expand Down Expand Up @@ -184,8 +185,8 @@ func TestListFarm(t *testing.T) {

conn.On("Do", "default.farms.list", mustMarshal(t, args)).
Return(mustMarshal(t, Args{"farms": []directory.TfgridFarm1{
{ID: 1, Name: "farm-1"},
{ID: 2, Name: "farm-2"},
{ID: "1", Name: "farm-1"},
{ID: "2", Name: "farm-2"},
}}), nil)

nodes, err := gedis.ListFarm("eg", "cairo")
Expand Down
7 changes: 1 addition & 6 deletions pkg/gedis/types/directory/tfgrid_farm_1.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package directory

import (
schema "github.com/threefoldtech/zos/pkg/schema"
)

//TfgridFarm1 jsx schema
type TfgridFarm1 struct {
ID int64 `json:"id,omitempty"`
ID string `json:"id"`
ThreebotID string `json:"threebot_id"`
IyoOrganization string `json:"iyo_organization"`
Name string `json:"name"`
WalletAddresses []string `json:"wallet_addresses"`
Location TfgridLocation1 `json:"location"`
Email string `json:"email"`
ResourcePrices []TfgridNodeResourcePrice1 `json:"resource_prices"`
PrefixZero schema.IPRange `json:"prefix_zero"`
}

//TfgridNodeResourcePrice1 jsx schema
Expand Down
7 changes: 4 additions & 3 deletions pkg/gedis/types/directory/tfgrid_node_2.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package directory

import (
schema "github.com/threefoldtech/zos/pkg/schema"
"net"

schema "github.com/threefoldtech/zos/pkg/schema"
)

//TfgridNode2 jsx schema
Expand All @@ -20,8 +21,8 @@ type TfgridNode2 struct {
ReservedResources TfgridNodeResourceAmount1 `json:"reserved_resources"`
Proofs []TfgridNodeProof1 `json:"proofs"`
Ifaces []TfgridNodeIface1 `json:"ifaces"`
PublicConfig TfgridNodePublicIface1 `json:"public_config"`
ExitNode bool `json:"exit_node"`
PublicConfig *TfgridNodePublicIface1 `json:"public_config,omitemtpy"`
WGPorts []uint `json:"wg_ports"`
Approved bool `json:"approved"`
PublicKeyHex string `json:"public_key_hex"`
}
Expand Down
Loading