Skip to content

Commit

Permalink
add redis database backend
Browse files Browse the repository at this point in the history
Signed-off-by: Seán C McCord <ulexus@gmail.com>
  • Loading branch information
Ulexus committed Aug 8, 2021
1 parent 924fed4 commit d782452
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 87 deletions.
121 changes: 82 additions & 39 deletions db/db.go
Original file line number Diff line number Diff line change
@@ -1,112 +1,155 @@
package db

import (
"context"
"fmt"
"sync"
"time"

"github.com/talos-systems/wglan-manager/types"
"go.uber.org/zap"
)

// AddressExpirationTimeout is the amount of time after which addresses of a node should be expired.
const AddressExpirationTimeout = 10 * time.Minute

type DB interface {
// Add adds a set of known Endpoints to a node, creating the node, if it does not exist.
Add(cluster string, n *types.Node) error
// Add adds a set of known Endpoints to a node, creating the node, if it does not exist.
Add(ctx context.Context, cluster string, n *types.Node) error

// AddAddresses adds a set of addresses for a node.
AddAddresses(cluster string, id string, ep ...*types.Address) error
AddAddresses(ctx context.Context, cluster string, id string, ep ...*types.Address) error

// Clean executes a database cleanup routine.
Clean()

// Get returns the details of the node.
Get(cluster string, id string) (*types.Node,error)
// Get returns the details of the node.
Get(ctx context.Context, cluster string, id string) (*types.Node, error)

// List returns the set of Nodes for the given Cluster.
List(cluster string) ([]*types.Node,error)
List(ctx context.Context, cluster string) ([]*types.Node, error)
}

type ramDB struct {
db map[string]map[string]*types.Node
mu sync.RWMutex
logger *zap.Logger
db map[string]map[string]*types.Node
mu sync.RWMutex
}

// New returns a new database.
func New() DB {
return &ramDB{
db: make(map[string]map[string]*types.Node),
}
func New(logger *zap.Logger) DB {
return &ramDB{
logger: logger,
db: make(map[string]map[string]*types.Node),
}
}

// Add implements DB
func (d *ramDB) Add(cluster string, n *types.Node) error {
d.mu.Lock()
defer d.mu.Unlock()
func (d *ramDB) Add(ctx context.Context, cluster string, n *types.Node) error {
d.mu.Lock()
defer d.mu.Unlock()

c, ok := d.db[cluster]
if !ok {
c = make(map[string]*types.Node)
d.db[cluster] = c
}

if existing, ok := c[n.ID]; ok {
if existing, ok := c[n.ID]; ok {
existing.AddAddresses(n.Addresses...)

return nil
}

c[n.ID] = n
c[n.ID] = n

return nil
return nil
}

func (d *ramDB) AddAddresses(cluster string, id string, addresses ...*types.Address) error {
d.mu.Lock()
defer d.mu.Unlock()
func (d *ramDB) AddAddresses(ctx context.Context, cluster string, id string, addresses ...*types.Address) error {
d.mu.Lock()
defer d.mu.Unlock()

c, ok := d.db[cluster]
if !ok {
return fmt.Errorf("cluster does not exist")
}

n, ok := c[id]
if !ok {
n, ok := c[id]
if !ok {
return fmt.Errorf("node does not exist")
}
}

n.AddAddresses(addresses...)

return nil
}

func (d *ramDB) List(cluster string) (list []*types.Node, err error) {
func (d *ramDB) List(ctx context.Context, cluster string) (list []*types.Node, err error) {
c, ok := d.db[cluster]
if !ok {
return nil, fmt.Errorf("cluster %q not found", cluster)
}

var i int

list = make([]*types.Node, len(c))

for _, n := range c {
list[i] = n
n.ExpireAddressesOlderThan(AddressExpirationTimeout)

i++
if len(n.Addresses) > 0 {
list = append(list, n)
}
}

return list, nil
}

// Get implements DB
func (d *ramDB) Get(cluster string, id string) (*types.Node,error) {
d.mu.RLock()
defer d.mu.Unlock()
func (d *ramDB) Get(ctx context.Context, cluster string, id string) (*types.Node, error) {
d.mu.RLock()
defer d.mu.Unlock()

c, ok := d.db[cluster]
if !ok {
return nil, fmt.Errorf("cluster %q not found", cluster)
}

n, ok := c[id]
if !ok {
return nil, fmt.Errorf("node %q in cluster %q not found", id, cluster)
}
n, ok := c[id]
if !ok {
return nil, fmt.Errorf("node %q in cluster %q not found", id, cluster)
}

return n, nil
}

// Clean runs the database cleanup routine.
func (d *ramDB) Clean() {
d.mu.Lock()
defer d.mu.Unlock()

var clusterDeleteList []string

for clusterID, c := range d.db {
var nodeDeleteList []string

for id, n := range c {

n.ExpireAddressesOlderThan(AddressExpirationTimeout)

return n, nil
if len(n.Addresses) < 1 {
nodeDeleteList = append(nodeDeleteList, id)
}
}

for _, id := range nodeDeleteList {
c[id] = nil
delete(c, id)
}

if len(c) < 0 {
clusterDeleteList = append(clusterDeleteList, clusterID)
}
}

for _, id := range clusterDeleteList {
delete(d.db, id)
}
}
162 changes: 162 additions & 0 deletions db/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package db

import (
"context"
"errors"
"fmt"
"time"

"github.com/go-redis/redis/v8"
"github.com/talos-systems/wglan-manager/types"
"go.uber.org/zap"
)

const redisTTL = 12 * time.Minute

type redisDB struct {
logger *zap.Logger

rc *redis.Client
}

func NewRedis(addr string, logger *zap.Logger) (DB, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer cancel()

rc := redis.NewClient(&redis.Options{
Addr: addr,
})

if err := rc.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("failed to connect to redis: %w", err)
}

return &redisDB{
rc: rc,
logger: logger,
}, nil
}

func (d *redisDB) clusterNodesKey(cluster string) string {
return fmt.Sprintf("cluster:%s:nodelist", cluster)
}

func (d *redisDB) clusterNodeKey(cluster, id string) string {
return fmt.Sprintf("cluster:%s:node:%s", cluster, id)
}

func (d *redisDB) clusterAddressKey(cluster string, addr *types.Address) string {
if !addr.IP.IsZero() {
return fmt.Sprintf("cluster:%s:address:%s", cluster, addr.IP.String())
}

return fmt.Sprintf("cluster:%s:address:%s", cluster, addr.Name)
}

// Add implements db.DB
func (d *redisDB) Add(ctx context.Context, cluster string, n *types.Node) error {
tx := d.rc.TxPipeline()

// Store the node data
tx.Set(ctx, d.clusterNodeKey(cluster, n.ID), n, redisTTL)

// Add the node to the cluster
if err := tx.SAdd(ctx, d.clusterNodesKey(cluster), n.ID).Err(); err != nil {
return fmt.Errorf("failed to add node %s to cluster %q: %w", n.Name, cluster, err)
}

tx.Expire(ctx, d.clusterNodesKey(cluster), redisTTL)

// Update the address assignments
for _, addr := range n.Addresses {
tx.Set(ctx, d.clusterAddressKey(cluster, addr), n.ID, redisTTL)
}

_, err := tx.Exec(ctx)

return err
}

// AddAddresses implements db.DB
func (d *redisDB) AddAddresses(ctx context.Context, cluster string, id string, ep ...*types.Address) error {
n, err := d.Get(ctx, cluster, id)
if err != nil {
return fmt.Errorf("failed to retrieve node %q from cluster %q: %w", id, cluster, err)
}

n.AddAddresses(ep...)

return d.Add(ctx, cluster, n)
}

// Clean implements db.DB
func (d *redisDB) Clean() {
return // no-op
}

// Get implements db.DB
func (d *redisDB) Get(ctx context.Context, cluster string, id string) (*types.Node, error) {
n := new(types.Node)

if err := d.rc.Get(ctx, d.clusterNodeKey(cluster, id)).Scan(n); err != nil {
if errors.Is(redis.Nil, err) {
return nil, err
}
return nil, fmt.Errorf("failed to parse node %q of cluster %q: %w", id, cluster, err)
}

var validAddresses []*types.Address

for _, a := range n.Addresses {
owner, err := d.rc.Get(ctx, d.clusterAddressKey(cluster, a)).Result()
if err == nil && owner == id {
validAddresses = append(validAddresses, a)
}
}

n.Addresses = validAddresses

return n, nil
}

// List implements db.DB
func (d *redisDB) List(ctx context.Context, cluster string) ([]*types.Node, error) {
nodeList, err := d.rc.SMembers(ctx, d.clusterNodesKey(cluster)).Result()
if err != nil {
return nil, fmt.Errorf("failed to get members of cluster %q: %w", cluster, err)
}

var ret []*types.Node

for _, id := range nodeList {
n, err := d.Get(ctx, cluster, id)
if err != nil {
if errors.Is(redis.Nil, err) {
d.logger.Debug("removing stale node from cluster",
zap.String("node", id),
zap.String("cluster", cluster),
)

if err = d.rc.SRem(ctx, d.clusterNodesKey(cluster), id).Err(); err != nil {
d.logger.Warn("failed to remove node from cluster set which did not exist",
zap.String("node", id),
zap.String("cluster", cluster),
zap.Error(err),
)
}
} else {
d.logger.Error("failed to get node listen in nodeList",
zap.String("node", id),
zap.String("cluster", cluster),
zap.Error(err),
)
}

continue
}

ret = append(ret, n)
}

return ret, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/talos-systems/wglan-manager
go 1.16

require (
github.com/go-redis/redis/v8 v8.11.2
github.com/gofiber/fiber/v2 v2.8.0
go.uber.org/zap v1.16.0
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 // indirect
Expand Down

0 comments on commit d782452

Please sign in to comment.