Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/pfdhcp-optimize #7710

Merged
merged 4 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions go/cmd/pfdhcp/keysoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@ func MysqlInsert(key string, value string) bool {
if err := MySQLdatabase.PingContext(ctx); err != nil {
log.LoggerWContext(ctx).Error("Unable to ping database, reconnect: " + err.Error())
}
rows, err := MySQLdatabase.Query("replace into key_value_storage values(?,?)", "/dhcpd/"+key, value)
defer rows.Close()

_, err := MySQLdatabase.Exec(
`
INSERT into key_value_storage values(?,?)
ON DUPLICATE KEY UPDATE value = VALUES(value)
`,
"/dhcpd/"+key,
value,
)

if err != nil {
log.LoggerWContext(ctx).Error("Error while inserting into MySQL: " + err.Error())
return false
}

return true
}

Expand Down
6 changes: 0 additions & 6 deletions go/cmd/pfdhcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,12 +719,6 @@ func (I *Interface) ServeDHCP(ctx context.Context, p dhcp.Packet, msgType dhcp.M
// Update Global Caches
GlobalIPCache.Set(reqIP.String(), answer.MAC.String(), cacheDuration)
GlobalMacCache.Set(answer.MAC.String(), reqIP.String(), cacheDuration)
// Update ip4log from pfdhcp. Commented for now.
// err := MysqlUpdateIP4Log(answer.MAC.String(), reqIP.String(), cacheDuration)
// if err != nil {
// log.LoggerWContext(ctx).Info(err.Error())
// }
// Update the cache
log.LoggerWContext(ctx).Info("DHCPACK on " + reqIP.String() + " to " + clientMac + " (" + clientHostname + ")")

handler.hwcache.Set(answer.MAC.String(), Index, cacheDuration)
Expand Down
56 changes: 0 additions & 56 deletions go/cmd/pfdhcp/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,62 +430,6 @@ func IsIPv6(address net.IP) bool {
return strings.Count(address.String(), ":") >= 2
}

// MysqlUpdateIP4Log update the ip4log table
func MysqlUpdateIP4Log(mac string, ip string, duration time.Duration) error {
if err := MySQLdatabase.PingContext(ctx); err != nil {
log.LoggerWContext(ctx).Error("Unable to ping database, reconnect: " + err.Error())
}

MAC2IP, err := MySQLdatabase.Prepare("SELECT mac, ip, start_time, end_time FROM ip4log WHERE mac = ? AND (end_time = 0 OR ( end_time + INTERVAL 30 SECOND ) > NOW()) ORDER BY start_time DESC LIMIT 1")
if err != nil {
return err
}

IP2MAC, err := MySQLdatabase.Prepare("SELECT mac, ip, start_time, end_time FROM ip4log WHERE ip = ? AND (end_time = 0 OR end_time > NOW()) ORDER BY start_time DESC")
if err != nil {
return err
}

IPClose, err := MySQLdatabase.Prepare(" UPDATE ip4log SET end_time = NOW() WHERE ip = ?")
if err != nil {
return err
}

IPInsert, err := MySQLdatabase.Prepare("INSERT INTO ip4log (mac, ip, start_time, end_time) VALUES (?, ?, NOW(), DATE_ADD(NOW(), INTERVAL ? SECOND))")
if err != nil {
return err
}

var (
oldMAC string
oldIP string
)
err = MAC2IP.QueryRow(mac).Scan(&oldIP)
if err != nil {
return err
}
err = IP2MAC.QueryRow(ip).Scan(&oldMAC)
if err != nil {
return err
}
if oldMAC != mac {
_, err = IPClose.Exec(ip)
if err != nil {
return err
}
}
if oldIP != ip {
_, err = IPClose.Exec(oldIP)
if err != nil {
return err
}
}
IPInsert.Exec(mac, ip, duration.Seconds())

return err

}

func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
Expand Down
70 changes: 63 additions & 7 deletions go/dhcp/pool/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"errors"
"math"
"strconv"
"sync"

Expand All @@ -22,11 +23,11 @@ type Mysql struct {

// NewMysqlPool return a new mysql pool
func NewMysqlPool(context context.Context, capacity uint64, name string, algorithm int, StatsdClient *statsd.Client, sql *sql.DB) (Backend, error) {
dp := Mysql{}
dp := &Mysql{}
dp.PoolName = name
dp.SQL = sql
dp.NewDHCPPool(context, capacity, algorithm, StatsdClient)
return &dp, nil
return dp, nil
}

// NewDHCPPool initialize the DHCPPool
Expand All @@ -43,13 +44,67 @@ func (dp *Mysql) NewDHCPPool(context context.Context, capacity uint64, algorithm
ctx: ctx,
statsd: StatsdClient,
}
rows, _ := dp.SQL.Query("DELETE FROM dhcppool WHERE pool_name=?", dp.PoolName)
rows.Close()
for i := uint64(0); i < capacity; i++ {
rows, _ := dp.SQL.Query("INSERT INTO dhcppool (pool_name, idx, released) VALUES (?, ?, NOW()) ON DUPLICATE KEY UPDATE id=id", dp.PoolName, i)
rows.Close()

_, err := dp.SQL.Exec("DELETE FROM dhcppool WHERE pool_name=?", dp.PoolName)
if err != nil {
return
}
dp.initializePool(capacity)
dp.DHCPPool = d

}

const maxBatch = 512 * 512

func (dp *Mysql) initializePool(capacity uint64) {
start := uint64(0)
for capacity > maxBatch {
dp.initializeLargePool(start, maxBatch)
start += maxBatch
capacity -= maxBatch
}

if capacity <= 1000 {
_, err := dp.SQL.Exec(
`
INSERT INTO dhcppool (pool_name, idx, released)
(SELECT ?, num, NOW() FROM (
WITH RECURSIVE seq AS (SELECT 0 AS num UNION ALL SELECT num + 1 FROM seq WHERE num < ? - 1)
SELECT num + ? as num FROM seq
) as x)
`,
dp.PoolName,
capacity,
start,
)

_ = err

return
}

if capacity <= maxBatch {
dp.initializeLargePool(start, capacity)
}

}

func (dp *Mysql) initializeLargePool(start, capacity uint64) {
split := uint64(math.Ceil(math.Sqrt(float64(capacity))))
_, _ = dp.SQL.Exec(
`
INSERT INTO dhcppool (pool_name, idx, released)
(SELECT ?, num + ?, NOW() FROM (
WITH RECURSIVE seq AS (SELECT 0 AS num UNION ALL SELECT num + 1 FROM seq WHERE num < ? - 1)
SELECT a.num * ? + b.num AS num FROM seq AS a JOIN seq AS b ORDER BY a.num, b.num
) as x WHERE num < ?);
`,
dp.PoolName,
start,
split,
split,
capacity,
)
}

// GetDHCPPool return the DHCPPool
Expand Down Expand Up @@ -187,6 +242,7 @@ func (dp *Mysql) GetFreeIPIndex(mac string) (uint64, string, error) {
return 0, FreeMac, err

}

count, err2 := res.RowsAffected()

if err2 != nil {
Expand Down
24 changes: 24 additions & 0 deletions go/dhcp/pool/mysql_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package pool

import (
"context"
"testing"

"github.com/inverse-inc/packetfence/go/db"
)

func BenchmarkInitializePool254(b *testing.B) {
ctx := context.Background()
db, _ := db.DbFromConfig(ctx)
for n := 0; n < b.N; n++ {
_, _ = NewMysqlPool(ctx, 254, "bench_254", Random, nil, db)
}
}

func BenchmarkInitializePool65534(b *testing.B) {
ctx := context.Background()
db, _ := db.DbFromConfig(ctx)
for n := 0; n < b.N; n++ {
_, _ = NewMysqlPool(ctx, 65534, "bench_65534", Random, nil, db)
}
}
41 changes: 41 additions & 0 deletions go/dhcp/pool/mysql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package pool

import (
"context"
"database/sql"
"fmt"
"testing"

"github.com/inverse-inc/packetfence/go/db"
)

func TestInitializePool(t *testing.T) {
ctx := context.Background()
db, err := db.DbFromConfig(ctx)
if err != nil {
t.Fatalf("Cannot connect to database: %s", err.Error())
}

for _, i := range []uint64{254, 1000, 1001, maxBatch, maxBatch - 2, maxBatch*4 - 2} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
testInitializePool(t, i, db)
})
}

}

func testInitializePool(t *testing.T, capacity uint64, db *sql.DB) {
algo := Random
backend, err := NewMysqlPool(ctx, capacity, "test_254", algo, nil, db)
if err != nil {
t.Fatalf("Cannot create backend: %s", err.Error())
}

count := uint64(0)
db.QueryRow("SELECT COUNT(*) from dhcppool where pool_name = ?", "test_254").Scan(&count)
if count != capacity {
t.Fatalf("Count %d does not match %d", capacity, count)
}

_ = backend
}