Skip to content

Commit

Permalink
Persist bandwidth data in netquery DBs when enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
ppacher committed Jul 21, 2023
1 parent b7fd1fc commit 5dcb6b2
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 45 deletions.
2 changes: 1 addition & 1 deletion firewall/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
var module *modules.Module

func init() {
module = modules.Register("filter", prep, start, stop, "core", "interception", "intel")
module = modules.Register("filter", prep, start, stop, "core", "interception", "intel", "netquery")
subsystems.Register(
"filter",
"Privacy Filter",
Expand Down
18 changes: 15 additions & 3 deletions firewall/packet_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/safing/portmaster/firewall/inspection"
"github.com/safing/portmaster/firewall/interception"
"github.com/safing/portmaster/netenv"
"github.com/safing/portmaster/netquery"
"github.com/safing/portmaster/network"
"github.com/safing/portmaster/network/netutils"
"github.com/safing/portmaster/network/packet"
Expand Down Expand Up @@ -616,7 +617,7 @@ func bandwidthUpdateHandler(ctx context.Context) error {
return nil
case bwUpdate := <-interception.BandwidthUpdates:
if bwUpdate != nil {
updateBandwidth(bwUpdate)
updateBandwidth(ctx, bwUpdate)
// DEBUG:
// log.Debugf("filter: bandwidth update: %s", bwUpdate)
} else {
Expand All @@ -626,7 +627,7 @@ func bandwidthUpdateHandler(ctx context.Context) error {
}
}

func updateBandwidth(bwUpdate *packet.BandwidthUpdate) {
func updateBandwidth(ctx context.Context, bwUpdate *packet.BandwidthUpdate) {
// Check if update makes sense.
if bwUpdate.RecvBytes == 0 && bwUpdate.SentBytes == 0 {
return
Expand Down Expand Up @@ -657,7 +658,18 @@ func updateBandwidth(bwUpdate *packet.BandwidthUpdate) {
log.Warningf("filter: unsupported bandwidth update method: %d", bwUpdate.Method)
}

// TODO: Send update.
if netquery.DefaultModule != nil && conn.BandwidthEnabled {
if err := netquery.DefaultModule.Store.UpdateBandwidth(
ctx,
conn.HistoryEnabled,
conn.Process().GetID(),
conn.ID,
&conn.RecvBytes,
&conn.SentBytes,
); err != nil {
log.Errorf("firewall: failed to persist bandwidth data: %s", err)
}
}
}

func statLogger(ctx context.Context) error {
Expand Down
18 changes: 9 additions & 9 deletions netquery/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ type (
Allowed *bool `sqlite:"allowed"`
ProfileRevision int `sqlite:"profile_revision"`
ExitNode *string `sqlite:"exit_node"`
BWIncoming uint64 `sqlite:"bw_incoming,default=0"`
BWOutgoing uint64 `sqlite:"bw_outgoing,default=0"`
BytesReceived uint64 `sqlite:"bytes_received,default=0"`
BytesSent uint64 `sqlite:"bytes_sent,default=0"`

// TODO(ppacher): support "NOT" in search query to get rid of the following helper fields
Active bool `sqlite:"active"` // could use "ended IS NOT NULL" or "ended IS NULL"
Expand Down Expand Up @@ -400,13 +400,13 @@ func (db *Database) UpdateBandwidth(ctx context.Context, enableHistory bool, pro

parts := []string{}
if incoming != nil {
parts = append(parts, "bw_incoming = :bw_incoming")
params[":bw_incoming"] = *incoming
parts = append(parts, "bytes_received = :bytes_received")
params[":bytes_received"] = *incoming
}

if outgoing != nil {
parts = append(parts, "bw_outgoing = :bw_outgoing")
params[":bw_outgoing"] = *outgoing
parts = append(parts, "bytes_sent = :bytes_sent")
params[":bytes_sent"] = *outgoing
}

updateSet := strings.Join(parts, ", ")
Expand Down Expand Up @@ -438,11 +438,11 @@ func (db *Database) UpdateBandwidth(ctx context.Context, enableHistory bool, pro
// connection pool.
func (db *Database) Save(ctx context.Context, conn Conn, enableHistory bool) error {
// convert the connection to a param map where each key is already translated
// to the sql column name. We also skip bw_incoming and bw_outgoing since those
// to the sql column name. We also skip bytes_received and bytes_sent since those
// will be updated independenly from the connection object.
connMap, err := orm.ToParamMap(ctx, conn, "", orm.DefaultEncodeConfig, []string{
"bw_incoming",
"bw_outgoing",
"bytes_received",
"bytes_sent",
})
if err != nil {
return fmt.Errorf("failed to encode connection for SQL: %w", err)
Expand Down
17 changes: 1 addition & 16 deletions netquery/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"github.com/safing/portbase/log"
"github.com/safing/portbase/runtime"
"github.com/safing/portmaster/network"
"github.com/safing/spn/access"
"github.com/safing/spn/access/account"
)

type (
Expand Down Expand Up @@ -117,20 +115,7 @@ func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connect

log.Tracef("netquery: updating connection %s", conn.ID)

// check if we should persist the connection in the history database.
// Also make sure the current SPN User/subscription allows use of the history.
historyEnabled := conn.Process().Profile().HistoryEnabled()
if historyEnabled {
user, err := access.GetUser()
if err != nil {
// there was an error so disable history
historyEnabled = false
} else if !user.MayUse(account.FeatureHistory) {
historyEnabled = false
}
}

if err := mng.store.Save(ctx, *model, historyEnabled); err != nil {
if err := mng.store.Save(ctx, *model, conn.HistoryEnabled); err != nil {
log.Errorf("netquery: failed to save connection %s in sqlite database: %s", conn.ID, err)

continue
Expand Down
36 changes: 20 additions & 16 deletions netquery/module_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,26 @@ import (
"github.com/safing/portmaster/network"
)

var DefaultModule *module

type module struct {
*modules.Module

db *database.Interface
sqlStore *Database
mng *Manager
feed chan *network.Connection
Store *Database

db *database.Interface
mng *Manager
feed chan *network.Connection
}

func init() {
m := new(module)
m.Module = modules.Register(
DefaultModule = new(module)

DefaultModule.Module = modules.Register(
"netquery",
m.prepare,
m.start,
m.stop,
DefaultModule.prepare,
DefaultModule.start,
DefaultModule.stop,
"api",
"network",
"database",
Expand All @@ -44,7 +48,7 @@ func init() {
"history",
"Network History",
"Keep Network History Data",
m.Module,
DefaultModule.Module,
"config:history/",
nil,
)
Expand All @@ -58,25 +62,25 @@ func (m *module) prepare() error {
Internal: true,
})

m.sqlStore, err = NewInMemory()
m.Store, err = NewInMemory()
if err != nil {
return fmt.Errorf("failed to create in-memory database: %w", err)
}

m.mng, err = NewManager(m.sqlStore, "netquery/data/", runtime.DefaultRegistry)
m.mng, err = NewManager(m.Store, "netquery/data/", runtime.DefaultRegistry)
if err != nil {
return fmt.Errorf("failed to create manager: %w", err)
}

m.feed = make(chan *network.Connection, 1000)

queryHander := &QueryHandler{
Database: m.sqlStore,
Database: m.Store,
IsDevMode: config.Concurrent.GetAsBool(config.CfgDevModeKey, false),
}

chartHandler := &ChartHandler{
Database: m.sqlStore,
Database: m.Store,
}

if err := api.RegisterEndpoint(api.Endpoint{
Expand Down Expand Up @@ -204,7 +208,7 @@ func (m *module) start() error {
return nil
case <-time.After(10 * time.Second):
threshold := time.Now().Add(-network.DeleteConnsAfterEndedThreshold)
count, err := m.sqlStore.Cleanup(ctx, threshold)
count, err := m.Store.Cleanup(ctx, threshold)
if err != nil {
log.Errorf("netquery: failed to count number of rows in memory: %s", err)
} else {
Expand All @@ -218,7 +222,7 @@ func (m *module) start() error {
// the runtime database.
// Only expose in development mode.
if config.GetAsBool(config.CfgDevModeKey, false)() {
_, err := NewRuntimeQueryRunner(m.sqlStore, "netquery/query/", runtime.DefaultRegistry)
_, err := NewRuntimeQueryRunner(m.Store, "netquery/query/", runtime.DefaultRegistry)
if err != nil {
return fmt.Errorf("failed to set up runtime SQL query runner: %w", err)
}
Expand Down
23 changes: 23 additions & 0 deletions network/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/safing/portmaster/process"
_ "github.com/safing/portmaster/process/tags"
"github.com/safing/portmaster/resolver"
"github.com/safing/spn/access"
"github.com/safing/spn/access/account"
"github.com/safing/spn/navigator"
)

Expand Down Expand Up @@ -218,6 +220,13 @@ type Connection struct { //nolint:maligned // TODO: fix alignment
// addedToMetrics signifies if the connection has already been counted in
// the metrics.
addedToMetrics bool

// HistoryEnabled is set to true when the connection should be persisted
// in the history database.
HistoryEnabled bool
// BanwidthEnabled is set to true if connection bandwidth data should be persisted
// in netquery.
BandwidthEnabled bool
}

// Reason holds information justifying a verdict, as well as additional
Expand Down Expand Up @@ -420,7 +429,21 @@ func (conn *Connection) GatherConnectionInfo(pkt packet.Packet) (err error) {
// Inherit internal status of profile.
if localProfile := conn.process.Profile().LocalProfile(); localProfile != nil {
conn.Internal = localProfile.Internal

// check if we should persist the connection in the history database.
// Also make sure the current SPN User/subscription allows use of the history.
user, err := access.GetUser()
if err == nil {
if user.MayUse(account.FeatureHistory) {
conn.HistoryEnabled = localProfile.HistoryEnabled()
}

if user.MayUse(account.FeatureBWVis) {
conn.BandwidthEnabled = true
}
}
}

} else {
conn.process = nil
if pkt.InfoOnly() {
Expand Down

0 comments on commit 5dcb6b2

Please sign in to comment.