Skip to content

Commit

Permalink
Merge pull request #120 from MysteriumNetwork/feature/MYST-269-statis…
Browse files Browse the repository at this point in the history
…tics-endpoint

Add statistics endpoint with bytes sent/received
  • Loading branch information
donce committed Jan 26, 2018
2 parents 49a08fc + d5f3270 commit 068fff9
Show file tree
Hide file tree
Showing 15 changed files with 223 additions and 51 deletions.
8 changes: 6 additions & 2 deletions client_connection/manager.go
Expand Up @@ -123,7 +123,8 @@ func statusDisconnecting() ConnectionStatus {
return ConnectionStatus{Disconnecting, "", nil}
}

func ConfigureVpnClientFactory(mysteriumAPIClient server.Client, vpnClientRuntimeDirectory string, signerFactory identity.SignerFactory) VpnClientFactory {
func ConfigureVpnClientFactory(mysteriumAPIClient server.Client, vpnClientRuntimeDirectory string,
signerFactory identity.SignerFactory, statsKeeper *bytescount.SessionStatsKeeper) VpnClientFactory {
return func(vpnSession session.SessionDto, id identity.Identity) (openvpn.Client, error) {
vpnConfig, err := openvpn.NewClientConfigFromString(
vpnSession.Config,
Expand All @@ -133,10 +134,13 @@ func ConfigureVpnClientFactory(mysteriumAPIClient server.Client, vpnClientRuntim
return nil, err
}

statsSaver := bytescount.NewSessionStatsSaver(statsKeeper)
statsSender := bytescount.NewSessionStatsSender(mysteriumAPIClient, vpnSession.ID, signerFactory(id))
statsHandler := bytescount.NewCompositeStatsHandler(statsSaver, statsSender)

authenticator := auth.NewAuthenticatorFake()
vpnMiddlewares := []openvpn.ManagementMiddleware{
bytescount.NewMiddleware(statsSender, 1*time.Minute),
bytescount.NewMiddleware(statsHandler, 1*time.Minute),
auth.NewMiddleware(authenticator),
}
return openvpn.NewClient(
Expand Down
7 changes: 5 additions & 2 deletions cmd/mysterium_client/run/command.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/mysterium/node/identity"
"github.com/mysterium/node/ip"
"github.com/mysterium/node/openvpn"
"github.com/mysterium/node/openvpn/middlewares/client/bytescount"
"github.com/mysterium/node/server"
"github.com/mysterium/node/tequilapi"
tequilapi_endpoints "github.com/mysterium/node/tequilapi/endpoints"
Expand Down Expand Up @@ -43,14 +44,16 @@ func NewCommandWith(
return identity.NewSigner(keystoreInstance, id)
}

vpnClientFactory := client_connection.ConfigureVpnClientFactory(mysteriumClient, options.DirectoryRuntime, signerFactory)
statsKeeper := &bytescount.SessionStatsKeeper{}

vpnClientFactory := client_connection.ConfigureVpnClientFactory(mysteriumClient, options.DirectoryRuntime, signerFactory, statsKeeper)

connectionManager := client_connection.NewManager(mysteriumClient, dialogEstablisherFactory, vpnClientFactory)

router := tequilapi.NewAPIRouter()
tequilapi_endpoints.AddRoutesForIdentities(router, identityManager, mysteriumClient, signerFactory)
ipResolver := ip.NewResolver()
tequilapi_endpoints.AddRoutesForConnection(router, connectionManager, ipResolver)
tequilapi_endpoints.AddRoutesForConnection(router, connectionManager, ipResolver, statsKeeper)
tequilapi_endpoints.AddRoutesForProposals(router, mysteriumClient)

httpAPIServer := tequilapi.NewServer(options.TequilapiAddress, options.TequilapiPort, router)
Expand Down
14 changes: 14 additions & 0 deletions openvpn/middlewares/client/bytescount/composite_stats_handler.go
@@ -0,0 +1,14 @@
package bytescount

// NewCompositeStatsHandler composes multiple stats handlers into single one, which executes all handlers sequentially
func NewCompositeStatsHandler(statsHandlers ...SessionStatsHandler) SessionStatsHandler {
return func(stats SessionStats) error {
for _, handler := range statsHandlers {
err := handler(stats)
if err != nil {
return err
}
}
return nil
}
}
@@ -0,0 +1,40 @@
package bytescount

import (
"errors"
"github.com/stretchr/testify/assert"
"testing"
)

var stats = SessionStats{BytesSent: 1, BytesReceived: 1}

func TestCompositeHandlerWithNoHandlers(t *testing.T) {
stats := SessionStats{BytesSent: 1, BytesReceived: 1}

compositeHandler := NewCompositeStatsHandler()
assert.NoError(t, compositeHandler(stats))
}

func TestCompositeHandlerWithSuccessfulHandler(t *testing.T) {
statsRecorder := fakeStatsRecorder{}
compositeHandler := NewCompositeStatsHandler(statsRecorder.record)
assert.NoError(t, compositeHandler(stats))
assert.Equal(t, stats, statsRecorder.LastSessionStats)
}

func TestCompositeHandlerWithFailingHandler(t *testing.T) {
failingHandler := func(stats SessionStats) error { return errors.New("fake error") }
compositeHandler := NewCompositeStatsHandler(failingHandler)
assert.Error(t, compositeHandler(stats), "fake error")
}

func TestCompositeHandlerWithMultipleHandlers(t *testing.T) {
recorder1 := fakeStatsRecorder{}
recorder2 := fakeStatsRecorder{}

compositeHandler := NewCompositeStatsHandler(recorder1.record, recorder2.record)
assert.NoError(t, compositeHandler(stats))

assert.Equal(t, stats, recorder1.LastSessionStats)
assert.Equal(t, stats, recorder2.LastSessionStats)
}
6 changes: 6 additions & 0 deletions openvpn/middlewares/client/bytescount/dto.go
@@ -0,0 +1,6 @@
package bytescount

// SessionStats represents statistics, generated by bytescount middleware
type SessionStats struct {
BytesSent, BytesReceived int
}
10 changes: 10 additions & 0 deletions openvpn/middlewares/client/bytescount/fake_stats_handler.go
@@ -0,0 +1,10 @@
package bytescount

type fakeStatsRecorder struct {
LastSessionStats SessionStats
}

func (sender *fakeStatsRecorder) record(sessionStats SessionStats) error {
sender.LastSessionStats = sessionStats
return nil
}
17 changes: 11 additions & 6 deletions openvpn/middlewares/client/bytescount/middleware.go
Expand Up @@ -9,18 +9,22 @@ import (
"time"
)

// SessionStatsHandler is invoked when middleware receives statistics
type SessionStatsHandler func(SessionStats) error

type middleware struct {
sessionStatsSender SessionStatsSender
interval time.Duration
sessionStatsHandler SessionStatsHandler
interval time.Duration

state openvpn.State
connection net.Conn
}

func NewMiddleware(sessionStatsSender SessionStatsSender, interval time.Duration) openvpn.ManagementMiddleware {
// NewMiddleware returns new bytescount middleware
func NewMiddleware(sessionStatsHandler SessionStatsHandler, interval time.Duration) openvpn.ManagementMiddleware {
return &middleware{
sessionStatsSender: sessionStatsSender,
interval: interval,
sessionStatsHandler: sessionStatsHandler,
interval: interval,

connection: nil,
}
Expand Down Expand Up @@ -62,7 +66,8 @@ func (middleware *middleware) ConsumeLine(line string) (consumed bool, err error
return
}

err = middleware.sessionStatsSender(bytesOut, bytesIn)
stats := SessionStats{BytesSent: bytesOut, BytesReceived: bytesIn}
err = middleware.sessionStatsHandler(stats)

return
}
26 changes: 8 additions & 18 deletions openvpn/middlewares/client/bytescount/middleware_test.go
Expand Up @@ -45,25 +45,15 @@ func (conn *fakeConnection) SetWriteDeadline(t time.Time) error {
return nil
}

type fakeStatsSender struct {
lastBytesSent, lastBytesReceived int
}

func (sender *fakeStatsSender) send(bytesSent, bytesReceived int) error {
sender.lastBytesSent = bytesSent
sender.lastBytesReceived = bytesReceived
return nil
}

func Test_Factory(t *testing.T) {
statsSender := fakeStatsSender{}
middleware := NewMiddleware(statsSender.send, 1*time.Minute)
statsRecorder := fakeStatsRecorder{}
middleware := NewMiddleware(statsRecorder.record, 1*time.Minute)
assert.NotNil(t, middleware)
}

func Test_Start(t *testing.T) {
statsSender := fakeStatsSender{}
middleware := NewMiddleware(statsSender.send, 1*time.Minute)
statsRecorder := fakeStatsRecorder{}
middleware := NewMiddleware(statsRecorder.record, 1*time.Minute)
connection := &fakeConnection{}
middleware.Start(connection)
assert.Equal(t, []byte("bytecount 60\n"), connection.lastDataWritten)
Expand All @@ -89,16 +79,16 @@ func Test_ConsumeLine(t *testing.T) {
}

for _, test := range tests {
statsSender := &fakeStatsSender{}
middleware := NewMiddleware(statsSender.send, 1*time.Minute)
statsRecorder := &fakeStatsRecorder{}
middleware := NewMiddleware(statsRecorder.record, 1*time.Minute)
consumed, err := middleware.ConsumeLine(test.line)
if test.expectedError != nil {
assert.Error(t, test.expectedError, err.Error(), test.line)
} else {
assert.NoError(t, err, test.line)
}
assert.Equal(t, test.expectedConsumed, consumed, test.line)
assert.Equal(t, test.expectedBytesReceived, statsSender.lastBytesReceived)
assert.Equal(t, test.expectedBytesSent, statsSender.lastBytesSent)
assert.Equal(t, test.expectedBytesReceived, statsRecorder.LastSessionStats.BytesReceived)
assert.Equal(t, test.expectedBytesSent, statsRecorder.LastSessionStats.BytesSent)
}
}
16 changes: 16 additions & 0 deletions openvpn/middlewares/client/bytescount/stats_keeper.go
@@ -0,0 +1,16 @@
package bytescount

// SessionStatsKeeper keeps session stats
type SessionStatsKeeper struct {
sessionStats SessionStats
}

// Save saves session stats to keeper
func (keeper *SessionStatsKeeper) Save(stats SessionStats) {
keeper.sessionStats = stats
}

// Retrieve retrieves session stats from keeper
func (keeper *SessionStatsKeeper) Retrieve() SessionStats {
return keeper.sessionStats
}
14 changes: 14 additions & 0 deletions openvpn/middlewares/client/bytescount/stats_keeper_test.go
@@ -0,0 +1,14 @@
package bytescount

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestSessionStatsStoreWorks(t *testing.T) {
statsKeeper := &SessionStatsKeeper{}
stats := SessionStats{BytesSent: 1, BytesReceived: 2}

statsKeeper.Save(stats)
assert.Equal(t, stats, statsKeeper.Retrieve())
}
9 changes: 9 additions & 0 deletions openvpn/middlewares/client/bytescount/stats_saver.go
@@ -0,0 +1,9 @@
package bytescount

// NewSessionStatsSaver returns stats handler, which saves stats stats keeper
func NewSessionStatsSaver(statsKeeper *SessionStatsKeeper) SessionStatsHandler {
return func(sessionStats SessionStats) error {
statsKeeper.Save(sessionStats)
return nil
}
}
15 changes: 15 additions & 0 deletions openvpn/middlewares/client/bytescount/stats_saver_test.go
@@ -0,0 +1,15 @@
package bytescount

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestNewSessionStatsSaver(t *testing.T) {
statsKeeper := &SessionStatsKeeper{}

saver := NewSessionStatsSaver(statsKeeper)
stats := SessionStats{BytesSent: 1, BytesReceived: 2}
saver(stats)
assert.Equal(t, stats, statsKeeper.Retrieve())
}
10 changes: 6 additions & 4 deletions openvpn/middlewares/client/bytescount/stats_sender.go
Expand Up @@ -7,16 +7,18 @@ import (
"github.com/mysterium/node/session"
)

// SessionStatsSender sends statistics to server
type SessionStatsSender func(bytesSent, bytesReceived int) error

func NewSessionStatsSender(mysteriumClient server.Client, sessionID session.SessionID, signer identity.Signer) SessionStatsSender {
// NewSessionStatsSender returns new session stats handler, which sends statistics to server
func NewSessionStatsSender(mysteriumClient server.Client, sessionID session.SessionID, signer identity.Signer) SessionStatsHandler {
sessionIDString := string(sessionID)
return func(bytesSent, bytesReceived int) error {
return func(sessionStats SessionStats) error {
return mysteriumClient.SendSessionStats(
sessionIDString,
dto.SessionStats{
BytesSent: bytesSent,
BytesReceived: bytesReceived,
BytesSent: sessionStats.BytesSent,
BytesReceived: sessionStats.BytesReceived,
},
signer,
)
Expand Down
34 changes: 26 additions & 8 deletions tequilapi/endpoints/connection.go
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/mysterium/node/client_connection"
"github.com/mysterium/node/identity"
"github.com/mysterium/node/ip"
"github.com/mysterium/node/openvpn/middlewares/client/bytescount"
"github.com/mysterium/node/tequilapi/utils"
"github.com/mysterium/node/tequilapi/validation"
"net/http"
Expand All @@ -22,14 +23,16 @@ type statusResponse struct {
}

type connectionEndpoint struct {
manager client_connection.Manager
ipResolver ip.Resolver
manager client_connection.Manager
ipResolver ip.Resolver
statsKeeper *bytescount.SessionStatsKeeper
}

func NewConnectionEndpoint(manager client_connection.Manager, ipResolver ip.Resolver) *connectionEndpoint {
func NewConnectionEndpoint(manager client_connection.Manager, ipResolver ip.Resolver, statsKeeper *bytescount.SessionStatsKeeper) *connectionEndpoint {
return &connectionEndpoint{
manager: manager,
ipResolver: ipResolver,
manager: manager,
ipResolver: ipResolver,
statsKeeper: statsKeeper,
}
}

Expand Down Expand Up @@ -81,13 +84,28 @@ func (ce *connectionEndpoint) GetIP(writer http.ResponseWriter, request *http.Re
utils.WriteAsJSON(response, writer)
}

// TODO: Uppercase IPResolver?
func AddRoutesForConnection(router *httprouter.Router, manager client_connection.Manager, ipResolver ip.Resolver) {
connectionEndpoint := NewConnectionEndpoint(manager, ipResolver)
// GetStatistics returns statistics about current connection
func (ce *connectionEndpoint) GetStatistics(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
stats := ce.statsKeeper.Retrieve()
response := struct {
BytesSent int `json:"bytesSent"`
BytesReceived int `json:"bytesReceived"`
}{
BytesSent: stats.BytesSent,
BytesReceived: stats.BytesReceived,
}
utils.WriteAsJSON(response, writer)
}

// AddRoutesForConnection adds connections routes to given router
func AddRoutesForConnection(router *httprouter.Router, manager client_connection.Manager, ipResolver ip.Resolver,
statsKeeper *bytescount.SessionStatsKeeper) {
connectionEndpoint := NewConnectionEndpoint(manager, ipResolver, statsKeeper)
router.GET("/connection", connectionEndpoint.Status)
router.PUT("/connection", connectionEndpoint.Create)
router.DELETE("/connection", connectionEndpoint.Kill)
router.GET("/connection/ip", connectionEndpoint.GetIP)
router.GET("/connection/statistics", connectionEndpoint.GetStatistics)
}

func toConnectionRequest(req *http.Request) (*connectionRequest, error) {
Expand Down

0 comments on commit 068fff9

Please sign in to comment.