Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add new indexer work to check node having ipv6 #937

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type flags struct {
dmiIndexerIntervalMins uint
speedIndexerNumWorkers uint
speedIndexerIntervalMins uint
ipv6IndexerNumWorkers uint
ipv6IndexerIntervalMins uint
}

func main() {
Expand Down Expand Up @@ -99,6 +101,8 @@ func main() {
flag.UintVar(&f.dmiIndexerNumWorkers, "dmi-indexer-workers", 1, "number of workers checking on node dmi")
flag.UintVar(&f.speedIndexerIntervalMins, "speed-indexer-interval", 5, "node speed check interval in min")
flag.UintVar(&f.speedIndexerNumWorkers, "speed-indexer-workers", 100, "number of workers checking on node speed")
flag.UintVar(&f.ipv6IndexerIntervalMins, "ipv6-indexer-interval", 1, "node ipv6 check interval in min")
flag.UintVar(&f.ipv6IndexerNumWorkers, "ipv6-indexer-workers", 10, "number of workers checking on node having ipv6")
flag.Parse()

// shows version and exit
Expand Down Expand Up @@ -191,6 +195,15 @@ func startIndexers(ctx context.Context, f flags, db db.Database, rpcRmbClient *p
f.speedIndexerNumWorkers,
)
speedIdx.Start(ctx)

ipv6Idx := indexer.NewIndexer[types.HasIpv6](
indexer.NewIpv6Work(f.ipv6IndexerIntervalMins),
"IPV6",
db,
rpcRmbClient,
f.ipv6IndexerNumWorkers,
)
ipv6Idx.Start(ctx)
}

func app(s *http.Server, f flags) error {
Expand Down
8 changes: 8 additions & 0 deletions grid-proxy/internal/explorer/db/indexer_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ func (p *PostgresDatabase) UpsertNetworkSpeed(ctx context.Context, speeds []type
}
return p.gormDB.WithContext(ctx).Table("speed").Clauses(conflictClause).Create(&speeds).Error
}

func (p *PostgresDatabase) UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error {
onConflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "node_twin_id"}},
DoUpdates: clause.AssignmentColumns([]string{"has_ipv6"}),
}
return p.gormDB.WithContext(ctx).Table("node_ipv6").Clauses(onConflictClause).Create(&ips).Error
}
6 changes: 6 additions & 0 deletions grid-proxy/internal/explorer/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (d *PostgresDatabase) Initialize() error {
&types.HealthReport{},
&types.Dmi{},
&types.Speed{},
&types.HasIpv6{},
)
if err != nil {
return errors.Wrap(err, "failed to migrate indexer tables")
Expand Down Expand Up @@ -290,6 +291,7 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node
"resources_cache.node_contracts_count",
"resources_cache.node_gpu_count AS num_gpu",
"health_report.healthy",
"node_ipv6.has_ipv6",
"resources_cache.bios",
"resources_cache.baseboard",
"resources_cache.memory",
Expand All @@ -305,6 +307,7 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node
LEFT JOIN farm ON node.farm_id = farm.farm_id
LEFT JOIN location ON node.location_id = location.id
LEFT JOIN health_report ON node.twin_id = health_report.node_twin_id
LEFT JOIN node_ipv6 ON node.twin_id = node_ipv6.node_twin_id
`)

if filter.HasGPU != nil || filter.GpuDeviceName != nil ||
Expand Down Expand Up @@ -531,6 +534,9 @@ func (d *PostgresDatabase) GetNodes(ctx context.Context, filter types.NodeFilter
if filter.Healthy != nil {
q = q.Where("health_report.healthy = ? ", *filter.Healthy)
}
if filter.HasIpv6 != nil {
q = q.Where("COALESCE(node_ipv6.has_ipv6, false) = ? ", *filter.HasIpv6)
}
if filter.FreeMRU != nil {
q = q.Where("resources_cache.free_mru >= ?", *filter.FreeMRU)
}
Expand Down
1 change: 1 addition & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Database interface {
UpsertNodeHealth(ctx context.Context, healthReports []types.HealthReport) error
UpsertNodeDmi(ctx context.Context, dmis []types.Dmi) error
UpsertNetworkSpeed(ctx context.Context, speeds []types.Speed) error
UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error
}

type ContractBilling types.ContractBilling
Expand Down
48 changes: 48 additions & 0 deletions grid-proxy/internal/indexer/ipv6.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package indexer

import (
"context"
"time"

"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/explorer/db"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer"
)

const cmd = "zos.network.has_ipv6"

var _ Work[types.HasIpv6] = (*Ipv6Work)(nil)

type Ipv6Work struct {
finders map[string]time.Duration
}

func NewIpv6Work(interval uint) *Ipv6Work {
return &Ipv6Work{
finders: map[string]time.Duration{
"up": time.Duration(interval) * time.Minute,
},
}
}

func (w *Ipv6Work) Finders() map[string]time.Duration {
return w.finders
}

func (w *Ipv6Work) Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]types.HasIpv6, error) {
var has_ipv6 bool
if err := callNode(ctx, rmb, cmd, nil, id, has_ipv6); err != nil {
return []types.HasIpv6{}, nil
}

return []types.HasIpv6{
{
NodeTwinId: id,
HasIpv6: has_ipv6,
},
}, nil
}

func (w *Ipv6Work) Upsert(ctx context.Context, db db.Database, batch []types.HasIpv6) error {
return db.UpsertNodeIpv6Report(ctx, batch)
}
11 changes: 11 additions & 0 deletions grid-proxy/pkg/types/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ func (HealthReport) TableName() string {
return "health_report"
}

// HasIpv6 holds the state of node having ipv6
// used as gorm model
type HasIpv6 struct {
NodeTwinId uint32 `gorm:"unique;not null"`
HasIpv6 bool
}

func (HasIpv6) TableName() string {
return "node_ipv6"
}

// Speed holds upload/download speeds in `bit/sec` for a node
// used as both gorm model and server json response
type Speed struct {
Expand Down
1 change: 1 addition & 0 deletions grid-proxy/pkg/types/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,5 @@ type NodeFilter struct {
PriceMin *float64 `schema:"price_min,omitempty"`
PriceMax *float64 `schema:"price_max,omitempty"`
Excluded []uint64 `schema:"excluded,omitempty"`
HasIpv6 *bool `schema:"has_ipv6,omitempty"`
}
29 changes: 29 additions & 0 deletions grid-proxy/tests/queries/mock_client/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type DBData struct {
Regions map[string]string
Locations map[string]Location
HealthReports map[uint32]bool
NodeIpv6 map[uint32]bool
DMIs map[uint32]types.Dmi
Speeds map[uint32]types.Speed
PricingPolicies map[uint]PricingPolicy
Expand Down Expand Up @@ -568,6 +569,30 @@ func loadHealthReports(db *sql.DB, data *DBData) error {
return nil
}

func loadNodeIpv6(db *sql.DB, data *DBData) error {
rows, err := db.Query(`
SELECT
COALESCE(node_twin_id, 0),
COALESCE(has_ipv6, false)
FROM
node_ipv6;`)
if err != nil {
return err
}
for rows.Next() {
var node types.HasIpv6
if err := rows.Scan(
&node.NodeTwinId,
&node.HasIpv6,
); err != nil {
return err
}
data.NodeIpv6[node.NodeTwinId] = node.HasIpv6
}

return nil
}

func loadDMIs(db *sql.DB, gormDB *gorm.DB, data *DBData) error {
var dmis []types.Dmi
err := gormDB.Table("dmi").Scan(&dmis).Error
Expand Down Expand Up @@ -692,6 +717,7 @@ func Load(db *sql.DB, gormDB *gorm.DB) (DBData, error) {
HealthReports: make(map[uint32]bool),
DMIs: make(map[uint32]types.Dmi),
Speeds: make(map[uint32]types.Speed),
NodeIpv6: make(map[uint32]bool),
PricingPolicies: make(map[uint]PricingPolicy),
DB: db,
}
Expand Down Expand Up @@ -740,6 +766,9 @@ func Load(db *sql.DB, gormDB *gorm.DB) (DBData, error) {
if err := loadHealthReports(db, &data); err != nil {
return data, err
}
if err := loadNodeIpv6(db, &data); err != nil {
return data, err
}
if err := loadDMIs(db, gormDB, &data); err != nil {
return data, err
}
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/tests/queries/mock_client/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ func (n *Node) satisfies(f types.NodeFilter, data *DBData) bool {
return false
}

if f.HasIpv6 != nil && *f.HasIpv6 != data.NodeIpv6[uint32(n.TwinID)] {
return false
}

if f.FreeSRU != nil && int64(*f.FreeSRU) > int64(free.SRU) {
return false
}
Expand Down
7 changes: 7 additions & 0 deletions grid-proxy/tests/queries/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ var nodeFilterRandomValueGenerator = map[string]func(agg NodesAggregate) interfa
}
return &v
},
"HasIpv6": func(_ NodesAggregate) interface{} {
v := true
if flip(.5) {
v = false
}
return &v
},
"FreeMRU": func(agg NodesAggregate) interface{} {
if flip(.1) {
return &agg.freeMRUs[rand.Intn(len(agg.freeMRUs))]
Expand Down
27 changes: 27 additions & 0 deletions grid-proxy/tools/db/crafter/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,3 +937,30 @@ func (c *Crafter) GeneratePricingPolicies() error {

return err
}

func (c *Crafter) GenerateNodeIpv6() error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this being used in generateData. It should be, right?

start := c.NodeStart
end := c.NodeStart + c.NodeCount
nodeTwinsStart := c.TwinStart + (c.FarmStart + c.FarmCount)

var reports []types.HasIpv6
for i := start; i < end; i++ {
has_ipv6 := true
if flip(.5) {
has_ipv6 = false
}

report := types.HasIpv6{
NodeTwinId: uint32(nodeTwinsStart + i),
HasIpv6: has_ipv6,
}
reports = append(reports, report)
}

if err := c.gormDB.Create(reports).Error; err != nil {
return fmt.Errorf("failed to insert node has ipv6 reports: %w", err)
}
fmt.Println("node has ipv6 reports generated")

return nil
}
6 changes: 5 additions & 1 deletion grid-proxy/tools/db/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ func generateData(db *sql.DB, gormDB *gorm.DB, seed int) error {
}

if err := generator.GenerateHealthReports(); err != nil {
return fmt.Errorf("failed to generate dmi reports: %w", err)
return fmt.Errorf("failed to generate health reports: %w", err)
}

if err := generator.GenerateNodeIpv6(); err != nil {
return fmt.Errorf("failed to generate node ipv6 reports: %w", err)
}

if err := generator.GeneratePricingPolicies(); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions grid-proxy/tools/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1080,3 +1080,16 @@ CREATE TABLE public.speed(

ALTER TABLE public.speed
OWNER TO postgres;

--
-- Name: node_ipv6; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE IF NOT EXISTS public.node_ipv6 (
node_twin_id bigint NOT NULL,
has_ipv6 boolean
);

ALTER TABLE public.node_ipv6
OWNER TO postgres;

Loading