Skip to content

Commit

Permalink
Merge pull request #6 from MysteriumNetwork/feature/traffic-stats
Browse files Browse the repository at this point in the history
Traffic statistics "Data transfered" for node sessions #4
  • Loading branch information
Waldz committed May 16, 2017
2 parents 9ad52aa + dbd472b commit 723c908
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 139 deletions.
75 changes: 75 additions & 0 deletions bytescount_client/middleware.go
@@ -0,0 +1,75 @@
package bytescount_client

import (
"fmt"
"github.com/mysterium/node/openvpn"
"github.com/mysterium/node/server"
"github.com/mysterium/node/server/dto"
"net"
"regexp"
"strconv"
"time"
)

type middleware struct {
mysteriumClient server.Client
interval time.Duration
sessionId string

connection net.Conn
}

func NewMiddleware(mysteriumClient server.Client, sessionId string, interval time.Duration) openvpn.ManagementMiddleware {
return &middleware{
mysteriumClient: mysteriumClient,
interval: interval,
sessionId: sessionId,

connection: nil,
}
}

func (middleware *middleware) Start(connection net.Conn) error {
middleware.connection = connection

command := fmt.Sprintf("bytecount %d\n", int(middleware.interval.Seconds()))
_, err := middleware.connection.Write([]byte(command))

return err
}

func (middleware *middleware) Stop() error {
_, err := middleware.connection.Write([]byte("bytecount 0\n"))
return err
}

func (middleware *middleware) ConsumeLine(line string) (consumed bool, err error) {
rule, err := regexp.Compile("^>BYTECOUNT:(.*),(.*)$")
if err != nil {
return
}

match := rule.FindStringSubmatch(line)
consumed = len(match) > 0
if !consumed {
return
}

bytesIn, err := strconv.Atoi(match[1])
if err != nil {
return
}

bytesOut, err := strconv.Atoi(match[2])
if err != nil {
return
}

err = middleware.mysteriumClient.SessionSendStats(middleware.sessionId, dto.SessionStats{
Id: middleware.sessionId,
BytesSent: bytesOut,
BytesReceived: bytesIn,
})

return
}
43 changes: 43 additions & 0 deletions bytescount_client/middleware_test.go
@@ -0,0 +1,43 @@
package bytescount_client

import (
"errors"
"github.com/mysterium/node/server"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func Test_Factory(t *testing.T) {
middleware := NewMiddleware(server.NewClientFake(), "session-test", 1*time.Minute)
assert.NotNil(t, middleware)
}

func Test_ConsumeLine(t *testing.T) {
var tests = []struct {
line string
expectedConsumed bool
expectedError error
}{
{">BYTECOUNT:3018,3264", true, nil},
{">BYTECOUNT:0,3264", true, nil},
{">BYTECOUNT:3018,", true, errors.New(`strconv.ParseInt: parsing "": invalid syntax`)},
{">BYTECOUNT:,", true, errors.New(`strconv.ParseInt: parsing "": invalid syntax`)},
{"OTHER", false, nil},
{"BYTECOUNT", false, nil},
{"BYTECOUNT:", false, nil},
{"BYTECOUNT:3018,3264", false, nil},
{">BYTECOUNTT:3018,3264", false, nil},
}

middleware := NewMiddleware(server.NewClientFake(), "session-test", 1*time.Minute)
for _, test := range tests {
consumed, err := middleware.ConsumeLine(test.line)
if test.expectedError != nil {
assert.Error(t, test.expectedError, err.Error(), test.line)
} else {
assert.NoError(t, err, test.line)
}
assert.Equal(t, test.expectedConsumed, consumed, test.line)
}
}
13 changes: 9 additions & 4 deletions cmd/mysterium_client/command_run/command.go
@@ -1,17 +1,19 @@
package command_run

import (
"github.com/mysterium/node/bytescount_client"
"github.com/mysterium/node/openvpn"
"github.com/mysterium/node/server"
"io"
"time"
)

type commandRun struct {
output io.Writer
outputError io.Writer

mysteriumClient server.Client
vpnClient *openvpn.Client
vpnClient *openvpn.Client
}

func (cmd *commandRun) Run(options CommandOptions) error {
Expand All @@ -28,12 +30,15 @@ func (cmd *commandRun) Run(options CommandOptions) error {
return err
}

cmd.vpnClient = openvpn.NewClient(vpnConfig, options.DirectoryRuntime)
cmd.vpnClient = openvpn.NewClient(
vpnConfig,
options.DirectoryRuntime,
bytescount_client.NewMiddleware(cmd.mysteriumClient, vpnSession.Id, 1*time.Minute),
)
if err := cmd.vpnClient.Start(); err != nil {
return err
}


return nil
}

Expand All @@ -43,4 +48,4 @@ func (cmd *commandRun) Wait() error {

func (cmd *commandRun) Kill() {
cmd.vpnClient.Stop()
}
}
4 changes: 2 additions & 2 deletions openvpn/client.go
Expand Up @@ -2,14 +2,14 @@ package openvpn

import "sync"

func NewClient(config *ClientConfig, directoryRuntime string) *Client {
func NewClient(config *ClientConfig, directoryRuntime string, middlewares ...ManagementMiddleware) *Client {
// Add the management interface socketAddress to the config
socketAddress := tempFilename(directoryRuntime, "openvpn-management-", ".sock")
config.SetManagementSocket(socketAddress)

return &Client{
config: config,
management: NewManagement(socketAddress, "[client-managemnet] "),
management: NewManagement(socketAddress, "[client-management] ", middlewares...),
process: NewProcess("[client-openvpn] "),
}
}
Expand Down
3 changes: 1 addition & 2 deletions openvpn/config.go
Expand Up @@ -36,8 +36,7 @@ func (c *Config) setFlag(name string) {

func (c *Config) SetManagementSocket(socketAddress string) {
c.setParam("management", socketAddress+" unix")
c.setFlag("management-signal")
c.setFlag("management-up-down")
c.setFlag("management-client")
}

func (c *Config) SetPort(port int) {
Expand Down

0 comments on commit 723c908

Please sign in to comment.