Skip to content

Commit

Permalink
Merge pull request #937 from threefoldtech/development_proxy_indexer_…
Browse files Browse the repository at this point in the history
…has_ipv6

add new indexer work to check node having ipv6
  • Loading branch information
Omarabdul3ziz committed Apr 22, 2024
2 parents db59524 + 12acb74 commit a7c36fa
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 1 deletion.
13 changes: 13 additions & 0 deletions grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type flags struct {
dmiIndexerIntervalMins uint
speedIndexerNumWorkers uint
speedIndexerIntervalMins uint
ipv6IndexerNumWorkers uint
ipv6IndexerIntervalMins uint
}

func main() {
Expand Down Expand Up @@ -99,6 +101,8 @@ func main() {
flag.UintVar(&f.dmiIndexerNumWorkers, "dmi-indexer-workers", 1, "number of workers checking on node dmi")
flag.UintVar(&f.speedIndexerIntervalMins, "speed-indexer-interval", 5, "node speed check interval in min")
flag.UintVar(&f.speedIndexerNumWorkers, "speed-indexer-workers", 100, "number of workers checking on node speed")
flag.UintVar(&f.ipv6IndexerIntervalMins, "ipv6-indexer-interval", 1, "node ipv6 check interval in min")
flag.UintVar(&f.ipv6IndexerNumWorkers, "ipv6-indexer-workers", 10, "number of workers checking on node having ipv6")
flag.Parse()

// shows version and exit
Expand Down Expand Up @@ -191,6 +195,15 @@ func startIndexers(ctx context.Context, f flags, db db.Database, rpcRmbClient *p
f.speedIndexerNumWorkers,
)
speedIdx.Start(ctx)

ipv6Idx := indexer.NewIndexer[types.HasIpv6](
indexer.NewIpv6Work(f.ipv6IndexerIntervalMins),
"IPV6",
db,
rpcRmbClient,
f.ipv6IndexerNumWorkers,
)
ipv6Idx.Start(ctx)
}

func app(s *http.Server, f flags) error {
Expand Down
8 changes: 8 additions & 0 deletions grid-proxy/internal/explorer/db/indexer_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ func (p *PostgresDatabase) UpsertNetworkSpeed(ctx context.Context, speeds []type
}
return p.gormDB.WithContext(ctx).Table("speed").Clauses(conflictClause).Create(&speeds).Error
}

func (p *PostgresDatabase) UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error {
onConflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "node_twin_id"}},
DoUpdates: clause.AssignmentColumns([]string{"has_ipv6"}),
}
return p.gormDB.WithContext(ctx).Table("node_ipv6").Clauses(onConflictClause).Create(&ips).Error
}
6 changes: 6 additions & 0 deletions grid-proxy/internal/explorer/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (d *PostgresDatabase) Initialize() error {
&types.HealthReport{},
&types.Dmi{},
&types.Speed{},
&types.HasIpv6{},
)
if err != nil {
return errors.Wrap(err, "failed to migrate indexer tables")
Expand Down Expand Up @@ -290,6 +291,7 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node
"resources_cache.node_contracts_count",
"resources_cache.node_gpu_count AS num_gpu",
"health_report.healthy",
"node_ipv6.has_ipv6",
"resources_cache.bios",
"resources_cache.baseboard",
"resources_cache.memory",
Expand All @@ -305,6 +307,7 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node
LEFT JOIN farm ON node.farm_id = farm.farm_id
LEFT JOIN location ON node.location_id = location.id
LEFT JOIN health_report ON node.twin_id = health_report.node_twin_id
LEFT JOIN node_ipv6 ON node.twin_id = node_ipv6.node_twin_id
`)

if filter.HasGPU != nil || filter.GpuDeviceName != nil ||
Expand Down Expand Up @@ -531,6 +534,9 @@ func (d *PostgresDatabase) GetNodes(ctx context.Context, filter types.NodeFilter
if filter.Healthy != nil {
q = q.Where("health_report.healthy = ? ", *filter.Healthy)
}
if filter.HasIpv6 != nil {
q = q.Where("COALESCE(node_ipv6.has_ipv6, false) = ? ", *filter.HasIpv6)
}
if filter.FreeMRU != nil {
q = q.Where("resources_cache.free_mru >= ?", *filter.FreeMRU)
}
Expand Down
1 change: 1 addition & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Database interface {
UpsertNodeHealth(ctx context.Context, healthReports []types.HealthReport) error
UpsertNodeDmi(ctx context.Context, dmis []types.Dmi) error
UpsertNetworkSpeed(ctx context.Context, speeds []types.Speed) error
UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error
}

type ContractBilling types.ContractBilling
Expand Down
48 changes: 48 additions & 0 deletions grid-proxy/internal/indexer/ipv6.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package indexer

import (
"context"
"time"

"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/explorer/db"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer"
)

const cmd = "zos.network.has_ipv6"

var _ Work[types.HasIpv6] = (*Ipv6Work)(nil)

type Ipv6Work struct {
finders map[string]time.Duration
}

func NewIpv6Work(interval uint) *Ipv6Work {
return &Ipv6Work{
finders: map[string]time.Duration{
"up": time.Duration(interval) * time.Minute,
},
}
}

func (w *Ipv6Work) Finders() map[string]time.Duration {
return w.finders
}

func (w *Ipv6Work) Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]types.HasIpv6, error) {
var has_ipv6 bool
if err := callNode(ctx, rmb, cmd, nil, id, has_ipv6); err != nil {
return []types.HasIpv6{}, nil
}

return []types.HasIpv6{
{
NodeTwinId: id,
HasIpv6: has_ipv6,
},
}, nil
}

func (w *Ipv6Work) Upsert(ctx context.Context, db db.Database, batch []types.HasIpv6) error {
return db.UpsertNodeIpv6Report(ctx, batch)
}
11 changes: 11 additions & 0 deletions grid-proxy/pkg/types/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ func (HealthReport) TableName() string {
return "health_report"
}

// HasIpv6 holds the state of node having ipv6
// used as gorm model
type HasIpv6 struct {
NodeTwinId uint32 `gorm:"unique;not null"`
HasIpv6 bool
}

func (HasIpv6) TableName() string {
return "node_ipv6"
}

// Speed holds upload/download speeds in `bit/sec` for a node
// used as both gorm model and server json response
type Speed struct {
Expand Down
1 change: 1 addition & 0 deletions grid-proxy/pkg/types/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,5 @@ type NodeFilter struct {
PriceMin *float64 `schema:"price_min,omitempty"`
PriceMax *float64 `schema:"price_max,omitempty"`
Excluded []uint64 `schema:"excluded,omitempty"`
HasIpv6 *bool `schema:"has_ipv6,omitempty"`
}
29 changes: 29 additions & 0 deletions grid-proxy/tests/queries/mock_client/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type DBData struct {
Regions map[string]string
Locations map[string]Location
HealthReports map[uint32]bool
NodeIpv6 map[uint32]bool
DMIs map[uint32]types.Dmi
Speeds map[uint32]types.Speed
PricingPolicies map[uint]PricingPolicy
Expand Down Expand Up @@ -568,6 +569,30 @@ func loadHealthReports(db *sql.DB, data *DBData) error {
return nil
}

func loadNodeIpv6(db *sql.DB, data *DBData) error {
rows, err := db.Query(`
SELECT
COALESCE(node_twin_id, 0),
COALESCE(has_ipv6, false)
FROM
node_ipv6;`)
if err != nil {
return err
}
for rows.Next() {
var node types.HasIpv6
if err := rows.Scan(
&node.NodeTwinId,
&node.HasIpv6,
); err != nil {
return err
}
data.NodeIpv6[node.NodeTwinId] = node.HasIpv6
}

return nil
}

func loadDMIs(db *sql.DB, gormDB *gorm.DB, data *DBData) error {
var dmis []types.Dmi
err := gormDB.Table("dmi").Scan(&dmis).Error
Expand Down Expand Up @@ -692,6 +717,7 @@ func Load(db *sql.DB, gormDB *gorm.DB) (DBData, error) {
HealthReports: make(map[uint32]bool),
DMIs: make(map[uint32]types.Dmi),
Speeds: make(map[uint32]types.Speed),
NodeIpv6: make(map[uint32]bool),
PricingPolicies: make(map[uint]PricingPolicy),
DB: db,
}
Expand Down Expand Up @@ -740,6 +766,9 @@ func Load(db *sql.DB, gormDB *gorm.DB) (DBData, error) {
if err := loadHealthReports(db, &data); err != nil {
return data, err
}
if err := loadNodeIpv6(db, &data); err != nil {
return data, err
}
if err := loadDMIs(db, gormDB, &data); err != nil {
return data, err
}
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/tests/queries/mock_client/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ func (n *Node) satisfies(f types.NodeFilter, data *DBData) bool {
return false
}

if f.HasIpv6 != nil && *f.HasIpv6 != data.NodeIpv6[uint32(n.TwinID)] {
return false
}

if f.FreeSRU != nil && int64(*f.FreeSRU) > int64(free.SRU) {
return false
}
Expand Down
7 changes: 7 additions & 0 deletions grid-proxy/tests/queries/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ var nodeFilterRandomValueGenerator = map[string]func(agg NodesAggregate) interfa
}
return &v
},
"HasIpv6": func(_ NodesAggregate) interface{} {
v := true
if flip(.5) {
v = false
}
return &v
},
"FreeMRU": func(agg NodesAggregate) interface{} {
if flip(.1) {
return &agg.freeMRUs[rand.Intn(len(agg.freeMRUs))]
Expand Down
27 changes: 27 additions & 0 deletions grid-proxy/tools/db/crafter/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,3 +937,30 @@ func (c *Crafter) GeneratePricingPolicies() error {

return err
}

func (c *Crafter) GenerateNodeIpv6() error {
start := c.NodeStart
end := c.NodeStart + c.NodeCount
nodeTwinsStart := c.TwinStart + (c.FarmStart + c.FarmCount)

var reports []types.HasIpv6
for i := start; i < end; i++ {
has_ipv6 := true
if flip(.5) {
has_ipv6 = false
}

report := types.HasIpv6{
NodeTwinId: uint32(nodeTwinsStart + i),
HasIpv6: has_ipv6,
}
reports = append(reports, report)
}

if err := c.gormDB.Create(reports).Error; err != nil {
return fmt.Errorf("failed to insert node has ipv6 reports: %w", err)
}
fmt.Println("node has ipv6 reports generated")

return nil
}
6 changes: 5 additions & 1 deletion grid-proxy/tools/db/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ func generateData(db *sql.DB, gormDB *gorm.DB, seed int) error {
}

if err := generator.GenerateHealthReports(); err != nil {
return fmt.Errorf("failed to generate dmi reports: %w", err)
return fmt.Errorf("failed to generate health reports: %w", err)
}

if err := generator.GenerateNodeIpv6(); err != nil {
return fmt.Errorf("failed to generate node ipv6 reports: %w", err)
}

if err := generator.GeneratePricingPolicies(); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions grid-proxy/tools/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1080,3 +1080,16 @@ CREATE TABLE public.speed(

ALTER TABLE public.speed
OWNER TO postgres;

--
-- Name: node_ipv6; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE IF NOT EXISTS public.node_ipv6 (
node_twin_id bigint NOT NULL,
has_ipv6 boolean
);

ALTER TABLE public.node_ipv6
OWNER TO postgres;

0 comments on commit a7c36fa

Please sign in to comment.